LDAP 相关

轻量级目录访问协议。文中采用Python 来实现。代码运行在域控服务器上。

准备工作

  • 微软AD 域
  • AD 域证书服务
    • 证书颁发机构
    • 证书颁发机构Web 注册
    • 证书注册策略Web 服务(非必须)
  • 证书颁发机构
    • ldap
    • pinyin

安装AD 域证书服务

安装AD 域证书的服务较简单,对于本文来说,只需安装证书颁发机构证书颁发机构Web 注册两个服务就可以了。默认安装,如果不想在后面续签CA 证书,可以将证书期限设置为100 年。

连接AD 域服务器

可以选择是否采用ssl 协议来连接AD 域服务器。如果不采用ssl 则只能查询,否则还可以修改AD 域的内容。但是对于域外的计算机来说,配置CA 证书的教程不太好搜,于是我们可以在与服务器上安装AD 域证书服务,同时把ldap3 脚本托管在域服务器上。(因为加域的机器默认会信任域控签发的CA),LDAP3 的使用教程请参考官方文档

连接

以ssl 连接为例

from ldap3 import MODIFY_REPLACE, Server, Connection, ALL, NTLM
import json
from datetime import datetime, timedelta
from dateutil.parser import parse

server = Server('192.168.227.10',  use_ssl=True,
                get_info=ALL, connect_timeout=5)
# server = Server('192.168.1.2',   # 域外查询
#                get_info=ALL, connect_timeout=5)

conn = Connection(server=server, user='12TALL\ldap3',
                  password='password',
                  authentication=NTLM,
                  auto_bind=True)

print(conn.extend.standard.who_am_i())  
# 输出:
# u:12TALL\Administrator

ldap 查询条件

  • 整个ldap 查询条件必须被括号包起来
  • 逻辑运算以逻辑运算符开头,后接一系列条件,最后用括号包起来
  • 运算符
    • =:等于
    • >=:大于等于
    • <=:小于等于
    • *:通配符
  • 逻辑运算符在表达式前面
    • &:逻辑与
    • |:逻辑或
    • !:逻辑非

查询用户

一般用于管理员查询用户信息使用

def get_user(account):
    conn.search(search_base='dc=12tall,dc=com',
                # 唯一用户名
                search_filter=f'(&(samAccountName={account}))',
                # 要查询的属性
                attributes=['memberOf', 'sn', 'department',
                            'createTimeStamp', 'accountExpires',
                            'userAccountControl', 'objectClass',
                            'pwdLastSet'],
                paged_size=5)

    return conn.response_to_json()

用户自查

一般用于用户登录认证

def get_user_info(account, password):
    srv = Server('192.168.227.10', get_info=ALL, connect_timeout=5)
    # 注意:在bind 不成功时程序会报异常,而不是返回空值
    c = Connection(server=srv, user=f'12TALL\{account}',
                   password=password, authentication=NTLM,
                   auto_bind=True)
    c.search(search_base='dc=12tall,dc=com',
             # 唯一用户名
             search_filter=f'(&(samAccountName={account}))',
             # 要查询的属性
             attributes=['memberOf', 'sn', 'department',
                         'createTimeStamp', 'accountExpires',
                         'userAccountControl', 'objectClass',
                         'pwdLastSet'],
             paged_size=5)

    res = c.response_to_json()
    c.unbind()
    return res

查询组用户


def get_group_users(grp_name):
    user_list = []
    dn = json.loads(get_user(grp_name))['entries'][0]['dn']

    conn.search(search_base=dn,
                search_filter='(|(objectCategory=group)(objectCategory=user))',
                search_scope='SUBTREE',
                attributes=['member', 'objectClass',
                            'userAccountControl', 'samAccountName'],
                size_limit=5)
    for user in conn.entries[0].member:
        user_list.append(user)

    return user_list


if __name__ == "__main__":
    print(json.dumps(get_group_users('Domain Admins')))

添加账号

主要是对照着填参数,比较繁琐。新增组织与新增用户基本操作一致。

def add_user(account, password, group='12TALL'):
    group_dn = json.loads(get_user(group))['entries'][0]['dn']
    user_dn = f'cn={account},{",".join(group_dn.split(",")[1:])}'
    print(group_dn, user_dn)
    conn.add(user_dn,
             attributes={
                 'objectClass': ['top', 'person', 'organizationalPerson', 'user'],
                 'sAMAccountName': account,
                 'userPrincipalName': account,
                 'accountExpires': datetime.today()+timedelta(days=100*365),
                 'sn': '姓',
                 'GivenName':'名',
                 # 英文缩写
                 'Initials':'xingm', 
                 # 显示名为用户名
                 'displayName': account,
                 'telephoneNumber': '0000',
                 'Mail': 'mail@12tall.com',
                 "description": '描述'
             })
    # 添加用户到组
    conn.extend.microsoft.add_members_to_groups(user_dn, group_dn)
    # 修改用户密码
    conn.extend.microsoft.modify_password(
        user_dn, new_password=password)
    # 激活用户
    conn.modify(user_dn, {'userAccountControl': [(MODIFY_REPLACE, [512])]})
    return account, password


if __name__ == "__main__":
    print(add_user('u02', 'Cisco1234', 'Domain Admins'))

修改用户

Windows 域的修改操作

def enable_user(account):
    user_dn = json.loads(get_user(account))['entries'][0]['dn']
    conn.modify(user_dn, {'userAccountControl': [(MODIFY_REPLACE, [512])]})
    return


def disable_user(account):
    user_dn = json.loads(get_user(account))['entries'][0]['dn']
    conn.modify(user_dn, {'userAccountControl': [(MODIFY_REPLACE, [514])]})
    return


def change_password(account, password):
    user_dn = json.loads(get_user(account))['entries'][0]['dn']
    conn.extend.microsoft.modify_password(user_dn, new_password=password)
    return


def set_account_expires(account, expire_time):
    user_dn = json.loads(get_user(account))['entries'][0]['dn']
    conn.modify(user_dn, {'accountExpires': [(MODIFY_REPLACE, [expire_time])]})
    return


def add_account_to_group(account, group):
    group_dn = json.loads(get_user(group))['entries'][0]['dn']
    user_dn = json.loads(get_user(account))['entries'][0]['dn']
    conn.extend.microsoft.add_members_to_groups(user_dn, group_dn)
    

def del_account_from_group(account, group):
    group_dn = json.loads(get_user(group))['entries'][0]['dn']
    user_dn = json.loads(get_user(account))['entries'][0]['dn']
    conn.extend.microsoft.remove_members_from_groups(user_dn, group_dn)


if __name__ == "__main__":
    del_account_from_group('u02','Domain Admins')

参考链接

  1. Pyhton LDAP操纵微软域
  2. Welcome to ldap3’s documentation
  3. Ldap3 库使用方法(一)
  4. LDAP的filter查询详解
  5. AD用户属性对照表