123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- import ldap
- import os,hashlib
- from base64 import urlsafe_b64encode as encode
- from ldap.modlist import addModlist
- from django.conf import settings
- class LDAPOperations():
- def __init__(self):
- self.connect()
- def connect(self):
- if settings.AUTH_LDAP_PROTO == 'ldaps':
- ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
- self.con = ldap.initialize(settings.AUTH_LDAP_PROTO + '://' + settings.AUTH_LDAP_HOST + ':' + settings.AUTH_LDAP_PORT)
- try:
- self.con.simple_bind_s(settings.AUTH_LDAP_BIND_DN, settings.AUTH_LDAP_BIND_PASSWORD)
- except ldap.SERVER_DOWN:
- raise ldap.SERVER_DOWN('The LDAP library can’t contact the LDAP server. Contact the admin.')
- def check_attribute(self, attribute, value):
- """
- Takes an attribute and value and checks it against the LDAP server for existence/availability.
- This is mainly for checking unique attributes ie uid, mail, uidNumber
- :param attribute:
- :param value
- :return: tuple
- """
- query = "(" + attribute + "=" + value + ")"
- result = self.con.search_s(settings.AUTH_LDAP_BASE_DN, ldap.SCOPE_SUBTREE, query)
- return result
- def add_to_group(self, username, group):
- user_dn = "uid=%s,ou=users,%s" % (username, settings.AUTH_LDAP_BASE_DN,)
- group_dn = "cn=%s,ou=groups,%s" % (group, settings.AUTH_LDAP_BASE_DN,)
- return self.con.modify_s(
- group_dn,
- [
- (ldap.MOD_ADD, 'member', [user_dn.encode("utf-8")]),
- ],
- )
- def del_from_group(self, username, group):
- user_dn = "uid=%s,ou=users,%s" % (username, settings.AUTH_LDAP_BASE_DN,)
- group_dn = "cn=%s,ou=groups,%s" % (group, settings.AUTH_LDAP_BASE_DN,)
- return self.con.modify_s(
- group_dn,
- [
- (ldap.MOD_DELETE, 'member', [user_dn.encode("utf-8")]),
- ],
- )
- def add_user(self, user):
-
- modlist = {
- "objectClass": ["sharixAccount"],
- "uid": [user.id],
- "userPassword": ['{ARGON2}'+user.password[6:]],
- "sn": [user.last_name],
- "initials": [user.middle_name],
- "givenName": [user.first_name],
- "cn": [user.get_full_name()],
- "displayName": [user.get_full_name()],
- # "title": [user.title],
- "mail": [user.email],
- # "jpegPhoto": [user.avatar],
- # "employeeType": [user.designation],
- # "departmentNumber": [user.department],
- "telephoneNumber": [user.phone_number],
- # "registeredAddress": [user.address],
- # "homePhone": [user.phone],
- # "uidNumber": [uid_number],
- # "gidNumber": [settings.LDAP_GID],
- # "loginShell": ["/bin/bash"],
- # "homeDirectory": ["/home/users/" + user.username]
- }
-
- dn = 'uid=' + modlist['uid'][0] + ',ou=users,' + settings.AUTH_LDAP_BASE_DN
- # convert modlist to bytes form ie b'abc'
- modlist_bytes = {}
- for key in modlist.keys():
- modlist_bytes[key] = [i.encode('utf-8') for i in modlist[key] if i
- is not None]
- result = self.con.add_s(dn, addModlist(modlist_bytes))
- return result
- def set_password(self, username, password):
- """
- set user password
- :param username:
- :param password:
- :return: ldap result
- """
- dn = "uid=%s,ou=users,%s" % (username, settings.AUTH_LDAP_BASE_DN,)
- user_result = self.check_attribute('uid', username) # get user
- tmp_modlist = dict(user_result)
- old_value = {"userPassword": [tmp_modlist[dn]['userPassword'][0]]}
- new_value = {"userPassword": [('{ARGON2}'+password[6:]).encode()]}
- modlist = ldap.modlist.modifyModlist(old_value, new_value)
- result = self.con.modify_s(dn, modlist)
- return result
- def delete_user(self, username):
- dn = "uid=%s,%s" % (username, settings.AUTH_LDAP_BASE_DN,)
- response = self.con.delete_s(dn)
- return response
|