Source code for data_manipulation.openldap_
import subprocess
from urllib.parse import urlparse
[docs]
def validate_ldap_uri(uri: str) -> bool:
"""Validate LDAP URI format."""
try:
parsed = urlparse(uri)
return parsed.scheme in ("ldap", "ldaps") and bool(parsed.netloc)
except ValueError:
return False
[docs]
def ldapsearch(
search_base: str,
ldap_uri: str,
bind_dn: str,
password: str,
search_filter: str,
) -> subprocess.CompletedProcess:
"""Executes an LDAP search query using the ldapsearch command.
Args:
search_base (str): Base DN for the search operation.
ldap_uri (str): LDAP server URI (e.g., "ldap://example.com:389").
bind_dn (str): Distinguished Name (DN) for binding to the LDAP server.
password (str): Password for authentication.
search_filter (str): LDAP search filter (e.g., "(objectClass=person)").
Returns:
subprocess.CompletedProcess: Result of the ldapsearch command execution.
Raises:
ValueError: If any input parameters are empty or invalid.
subprocess.SubprocessError: If the ldapsearch command fails to execute.
FileNotFoundError: If ldapsearch command is not available.
Examples:
>>> result = ldapsearch(
... "dc=example,dc=com",
... "ldap://ldap.example.com",
... "cn=admin,dc=example,dc=com",
... "password",
... "(objectClass=person)"
... )
>>> result.returncode == 0 # True if search was successful
True
"""
# Input validation
if not all([search_base, ldap_uri, bind_dn, password, search_filter]):
raise ValueError("All parameters must be non-empty strings")
if not validate_ldap_uri(ldap_uri):
raise ValueError(f"Invalid LDAP URI format: {ldap_uri}")
try:
# Use list arguments instead of shell=True for security
cmd = [
"ldapsearch",
"-b",
search_base,
"-H",
ldap_uri,
"-D",
bind_dn,
"-w",
password,
search_filter,
]
return subprocess.run(
cmd,
capture_output=True,
text=True,
check=False, # Don't raise on non-zero exit
)
except FileNotFoundError:
raise FileNotFoundError(
"ldapsearch command not found. Please install OpenLDAP client tools."
)
except subprocess.SubprocessError as e:
raise subprocess.SubprocessError(f"Failed to execute ldapsearch: {str(e)}")
if __name__ == "__main__":
import doctest
doctest.testmod()