Skip to content

Instantly share code, notes, and snippets.

@simonLeary42
Last active October 22, 2025 18:49
Show Gist options
  • Select an option

  • Save simonLeary42/373a3db2b454ef0321f57cc35eed6de0 to your computer and use it in GitHub Desktop.

Select an option

Save simonLeary42/373a3db2b454ef0321f57cc35eed6de0 to your computer and use it in GitHub Desktop.
import logging
from typing import Any, List
from ldap3 import BASE, MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE, Connection
from ldap3.core.exceptions import LDAPException
from ldap3.utils.log import ERROR, set_library_log_detail_level
"""
wrapper interface for ldap3
sacrifices performance for simlicity
raises exceptions whenever possible
TODO
* is existence checking necessary? how do errors look without it?
* how does ldap3 builtin logging (set_library_log_detail_level) look?
* how does it look when you give Connection as the argument to an exception?
"""
logger = logging.getLogger(__name__)
set_library_log_detail_level(ERROR)
def _raise_for_result(conn):
if conn.result["result"] != 0:
raise LDAPException(conn)
def _raise_if_does_not_exist(conn: Connection, dn: str, controls=None) -> None:
if not conn.search(dn, "(objectClass=*)", attributes=[], search_scope=BASE, controls=controls):
raise KeyError(dn)
def _raise_if_already_exists(conn: Connection, dn: str, controls=None) -> None:
if conn.search(dn, "(objectClass=*)", attributes=[], search_scope=BASE, controls=controls):
raise RuntimeError(f'entry "{dn}" already exists!')
def add_attribute_values(conn: Connection, dn: str, attribute_name: str, values: list, controls=None) -> None:
if len(values) == 0: raise RuntimeError("please specify which values to add")
_raise_if_does_not_exist(dn)
conn.modify(dn, {attribute_name: [MODIFY_ADD, values]}, controls=controls)
_raise_for_result(conn)
def overwrite_attribute_values(conn: Connection, dn: str, attribute_name: str, values: list, controls=None) -> None:
_raise_if_does_not_exist(dn)
conn.modify(dn, {attribute_name: [MODIFY_REPLACE, values]}, controls=controls)
_raise_for_result(conn)
def delete_attribute_values(conn: Connection, dn: str, attribute_name: str, values: list, controls=None) -> None:
_raise_if_does_not_exist(dn)
if len(values) == 0: raise RuntimeError("please specify which values to delete")
conn.modify(dn, {attribute_name: [MODIFY_DELETE, values]}, controls=controls)
_raise_for_result(conn)
def create_entry(conn: Connection, dn: str, object_class: list, attributes: dict, controls=None) -> None:
_raise_if_already_exists(dn)
conn.add(dn, object_class=object_class, attributes=attributes, controls=controls)
_raise_for_result(conn)
def move_entry(conn: Connection, dn: str, rdn: str, destination_ou_dn: str, controls=None) -> None:
_raise_if_does_not_exist(dn)
conn.modify_dn(dn, rdn, new_superior=destination_ou_dn, controls=controls)
_raise_for_result(conn)
def delete_entry(conn: Connection, dn: str, controls=None) -> None:
_raise_if_does_not_exist(dn)
conn.delete(dn, controls=controls)
_raise_for_result(conn)
def search(conn: Connection, *args, **kwargs) -> List:
conn.search(*args, **kwargs)
_raise_for_result(conn)
return conn.entries
def get_attributes(conn: Connection, dn: str, attributes: List[str], controls=None) -> Any:
_raise_if_does_not_exist(dn)
conn.search(dn, attributes=attributes, controls=controls)
_raise_for_result(conn)
return conn.entries[0]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment