Last active
October 22, 2025 18:49
-
-
Save simonLeary42/373a3db2b454ef0321f57cc35eed6de0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import logging | |
| from typing import Any, List | |
| from ldap3 import BASE, MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE, Connection | |
| from ldap3.core.exceptions import LDAPException | |
| from ldap3.utils.log import ERROR, set_library_log_detail_level | |
| """ | |
| wrapper interface for ldap3 | |
| sacrifices performance for simlicity | |
| raises exceptions whenever possible | |
| TODO | |
| * is existence checking necessary? how do errors look without it? | |
| * how does ldap3 builtin logging (set_library_log_detail_level) look? | |
| * how does it look when you give Connection as the argument to an exception? | |
| """ | |
| logger = logging.getLogger(__name__) | |
| set_library_log_detail_level(ERROR) | |
| def _raise_for_result(conn): | |
| if conn.result["result"] != 0: | |
| raise LDAPException(conn) | |
| def _raise_if_does_not_exist(conn: Connection, dn: str, controls=None) -> None: | |
| if not conn.search(dn, "(objectClass=*)", attributes=[], search_scope=BASE, controls=controls): | |
| raise KeyError(dn) | |
| def _raise_if_already_exists(conn: Connection, dn: str, controls=None) -> None: | |
| if conn.search(dn, "(objectClass=*)", attributes=[], search_scope=BASE, controls=controls): | |
| raise RuntimeError(f'entry "{dn}" already exists!') | |
| def add_attribute_values(conn: Connection, dn: str, attribute_name: str, values: list, controls=None) -> None: | |
| if len(values) == 0: raise RuntimeError("please specify which values to add") | |
| _raise_if_does_not_exist(dn) | |
| conn.modify(dn, {attribute_name: [MODIFY_ADD, values]}, controls=controls) | |
| _raise_for_result(conn) | |
| def overwrite_attribute_values(conn: Connection, dn: str, attribute_name: str, values: list, controls=None) -> None: | |
| _raise_if_does_not_exist(dn) | |
| conn.modify(dn, {attribute_name: [MODIFY_REPLACE, values]}, controls=controls) | |
| _raise_for_result(conn) | |
| def delete_attribute_values(conn: Connection, dn: str, attribute_name: str, values: list, controls=None) -> None: | |
| _raise_if_does_not_exist(dn) | |
| if len(values) == 0: raise RuntimeError("please specify which values to delete") | |
| conn.modify(dn, {attribute_name: [MODIFY_DELETE, values]}, controls=controls) | |
| _raise_for_result(conn) | |
| def create_entry(conn: Connection, dn: str, object_class: list, attributes: dict, controls=None) -> None: | |
| _raise_if_already_exists(dn) | |
| conn.add(dn, object_class=object_class, attributes=attributes, controls=controls) | |
| _raise_for_result(conn) | |
| def move_entry(conn: Connection, dn: str, rdn: str, destination_ou_dn: str, controls=None) -> None: | |
| _raise_if_does_not_exist(dn) | |
| conn.modify_dn(dn, rdn, new_superior=destination_ou_dn, controls=controls) | |
| _raise_for_result(conn) | |
| def delete_entry(conn: Connection, dn: str, controls=None) -> None: | |
| _raise_if_does_not_exist(dn) | |
| conn.delete(dn, controls=controls) | |
| _raise_for_result(conn) | |
| def search(conn: Connection, *args, **kwargs) -> List: | |
| conn.search(*args, **kwargs) | |
| _raise_for_result(conn) | |
| return conn.entries | |
| def get_attributes(conn: Connection, dn: str, attributes: List[str], controls=None) -> Any: | |
| _raise_if_does_not_exist(dn) | |
| conn.search(dn, attributes=attributes, controls=controls) | |
| _raise_for_result(conn) | |
| return conn.entries[0] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment