import ldap import os,hashlib from base64 import urlsafe_b64encode as encode from ldap.modlist import addModlist from django.conf import settings class LDAPOperations(): def __init__(self): self.connect() def connect(self): if settings.AUTH_LDAP_PROTO == 'ldaps': ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) self.con = ldap.initialize(settings.AUTH_LDAP_PROTO + '://' + settings.AUTH_LDAP_HOST + ':' + settings.AUTH_LDAP_PORT) try: self.con.simple_bind_s(settings.AUTH_LDAP_BIND_DN, settings.AUTH_LDAP_BIND_PASSWORD) except ldap.SERVER_DOWN: raise ldap.SERVER_DOWN('The LDAP library can’t contact the LDAP server. Contact the admin.') def check_attribute(self, attribute, value): """ Takes an attribute and value and checks it against the LDAP server for existence/availability. This is mainly for checking unique attributes ie uid, mail, uidNumber :param attribute: :param value :return: tuple """ query = "(" + attribute + "=" + value + ")" result = self.con.search_s(settings.AUTH_LDAP_BASE_DN, ldap.SCOPE_SUBTREE, query) return result def add_to_group(self, username, group): user_dn = "uid=%s,ou=users,%s" % (username, settings.AUTH_LDAP_BASE_DN,) group_dn = "cn=%s,ou=groups,%s" % (group, settings.AUTH_LDAP_BASE_DN,) return self.con.modify_s( group_dn, [ (ldap.MOD_ADD, 'member', [user_dn.encode("utf-8")]), ], ) def del_from_group(self, username, group): user_dn = "uid=%s,ou=users,%s" % (username, settings.AUTH_LDAP_BASE_DN,) group_dn = "cn=%s,ou=groups,%s" % (group, settings.AUTH_LDAP_BASE_DN,) return self.con.modify_s( group_dn, [ (ldap.MOD_DELETE, 'member', [user_dn.encode("utf-8")]), ], ) def add_user(self, user): modlist = { "objectClass": ["sharixAccount"], "uid": [user.id], "userPassword": ['{ARGON2}'+user.password[6:]], "sn": [user.last_name], "initials": [user.middle_name], "givenName": [user.first_name], "cn": [user.get_full_name()], "displayName": [user.get_full_name()], # "title": [user.title], "mail": [user.email], # "jpegPhoto": [user.avatar], # "employeeType": [user.designation], # "departmentNumber": [user.department], "telephoneNumber": [user.phone_number], # "registeredAddress": [user.address], # "homePhone": [user.phone], # "uidNumber": [uid_number], # "gidNumber": [settings.LDAP_GID], # "loginShell": ["/bin/bash"], # "homeDirectory": ["/home/users/" + user.username] } dn = 'uid=' + modlist['uid'][0] + ',ou=users,' + settings.AUTH_LDAP_BASE_DN # convert modlist to bytes form ie b'abc' modlist_bytes = {} for key in modlist.keys(): modlist_bytes[key] = [i.encode('utf-8') for i in modlist[key] if i is not None] result = self.con.add_s(dn, addModlist(modlist_bytes)) return result def set_password(self, username, password): """ set user password :param username: :param password: :return: ldap result """ dn = "uid=%s,ou=users,%s" % (username, settings.AUTH_LDAP_BASE_DN,) user_result = self.check_attribute('uid', username) # get user tmp_modlist = dict(user_result) old_value = {"userPassword": [tmp_modlist[dn]['userPassword'][0]]} new_value = {"userPassword": [('{ARGON2}'+password[6:]).encode()]} modlist = ldap.modlist.modifyModlist(old_value, new_value) result = self.con.modify_s(dn, modlist) return result def delete_user(self, username): dn = "uid=%s,%s" % (username, settings.AUTH_LDAP_BASE_DN,) response = self.con.delete_s(dn) return response