Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.191.4.136
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/ansible_collections/dellemc/openmanage/plugins/modules/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/ansible_collections/dellemc/openmanage/plugins/modules/ome_active_directory.py
#!/usr/bin/python
# -*- coding: utf-8 -*-

#
# Dell EMC OpenManage Ansible Modules
# Version 5.0.1
# Copyright (C) 2021-2022 Dell Inc. or its subsidiaries. All Rights Reserved.

# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#

from __future__ import (absolute_import, division, print_function)

__metaclass__ = type

DOCUMENTATION = """
---
module: ome_active_directory
short_description: Configure Active Directory groups to be used with Directory Services
description: "This module allows to add, modify, and delete OpenManage Enterprise connection with Active Directory
Service."
version_added: "4.0.0"
author:
  - Jagadeesh N V(@jagadeeshnv)
extends_documentation_fragment:
  - dellemc.openmanage.ome_auth_options
options:
  domain_server:
    type: list
    elements: str
    description:
      - Enter the domain name or FQDN or IP address of the domain controller.
      - If I(domain_controller_lookup) is C(DNS), enter the domain name to query DNS for the domain controllers.
      - "If I(domain_controller_lookup) is C(MANUAL), enter the FQDN or the IP address of the domain controller.
      The maximum number of Active Directory servers that can be added is three."
  domain_controller_lookup:
    type: str
    description:
      - Select the Domain Controller Lookup method.
    choices:
      - DNS
      - MANUAL
    default: DNS
  domain_controller_port:
    type: int
    description:
      - Domain controller port.
      - By default, Global Catalog Address port number 3269 is populated.
      - For the Domain Controller Access, enter 636 as the port number.
      - C(NOTE), Only LDAPS ports are supported.
    default: 3269
  group_domain:
    type: str
    description:
      - Provide the group domain in the format C(example.com) or C(ou=org, dc=example, dc=com).
  id:
    type: int
    description:
      - Provide the ID of the existing Active Directory service connection.
      - This is applicable for modification and deletion.
      - This is mutually exclusive with I(name).
  name:
    type: str
    description:
      - Provide a name for the Active Directory connection.
      - This is applicable for creation and deletion.
      - This is mutually exclusive with I(name).
  network_timeout:
    type: int
    description:
      - Enter the network timeout duration in seconds.
      - The supported timeout duration range is 15 to 300 seconds.
    default: 120
  search_timeout:
    type: int
    description:
      - Enter the search timeout duration in seconds.
      - The supported timeout duration range is 15 to 300 seconds.
    default: 120
  state:
    type: str
    description:
      - C(present) allows to create or modify an Active Directory service.
      - C(absent) allows to delete a Active Directory service.
    choices:
      - present
      - absent
    default: present
  test_connection:
    type: bool
    description:
      - Enables testing the connection to the domain controller.
      - The connection to the domain controller is tested with the provided Active Directory service details.
      - If test fails, module will error out.
      - If C(yes), I(domain_username) and I(domain_password) has to be provided.
    default: no
  domain_password:
    type: str
    description:
      - Provide the domain password.
      - This is applicable when I(test_connection) is C(yes).
  domain_username:
    type: str
    description:
      - Provide the domain username either in the UPN (username@domain) or NetBIOS (domain\\\\username) format.
      - This is applicable when I(test_connection) is C(yes).
  validate_certificate:
    type: bool
    description:
      - Enables validation of SSL certificate of the domain controller.
      - The module will always report change when this is C(yes).
    default: no
  certificate_file:
    type: path
    description:
      - Provide the full path of the SSL certificate.
      - The certificate should be a Root CA Certificate encoded in Base64 format.
      - This is applicable when I(validate_certificate) is C(yes).
requirements:
  - "python >= 3.8.6"
notes:
  - The module will always report change when I(validate_certificate) is C(yes).
  - Run this module from a system that has direct access to OpenManage Enterprise.
  - This module supports C(check_mode).
"""

EXAMPLES = """
---
- name: Add Active Directory service using DNS lookup along with the test connection
  dellemc.openmanage.ome_active_directory:
    hostname: "192.168.0.1"
    username: "username"
    password: "password"
    ca_path: "/path/to/ca_cert.pem"
    name: my_ad1
    domain_server:
      - domainname.com
    group_domain: domainname.com
    test_connection: yes
    domain_username: user@domainname
    domain_password: domain_password

- name: Add Active Directory service using IP address of the domain controller with certificate validation
  dellemc.openmanage.ome_active_directory:
    hostname: "192.168.0.1"
    username: "username"
    password: "password"
    ca_path: "/path/to/ca_cert.pem"
    name: my_ad2
    domain_controller_lookup: MANUAL
    domain_server:
      - 192.68.20.181
    group_domain: domainname.com
    validate_certificate: yes
    certificate_file: "/path/to/certificate/file.cer"

- name: Modify domain controller IP address, network_timeout and group_domain
  dellemc.openmanage.ome_active_directory:
    hostname: "192.168.0.1"
    username: "username"
    password: "password"
    ca_path: "/path/to/ca_cert.pem"
    name: my_ad2
    domain_controller_lookup: MANUAL
    domain_server:
      - 192.68.20.189
    group_domain: newdomain.in
    network_timeout: 150

- name: Delete Active Directory service
  dellemc.openmanage.ome_active_directory:
    hostname: "192.168.0.1"
    username: "username"
    password: "password"
    ca_path: "/path/to/ca_cert.pem"
    name: my_ad2
    state: absent

- name: Test connection to existing Active Directory service with certificate validation
  dellemc.openmanage.ome_active_directory:
    hostname: "192.168.0.1"
    username: "username"
    password: "password"
    ca_path: "/path/to/ca_cert.pem"
    name: my_ad2
    test_connection: yes
    domain_username: user@domainname
    domain_password: domain_password
    validate_certificate: yes
    certificate_file: "/path/to/certificate/file.cer"
"""

RETURN = """
---
msg:
  type: str
  description: Overall status of the Active Directory operation.
  returned: always
  sample: "Successfully renamed the slot(s)."
active_directory:
  type: dict
  description: The Active Directory that was added, modified or deleted by this module.
  returned: on change
  sample: {
    "Name": "ad_test",
    "Id": 21789,
    "ServerType": "MANUAL",
    "ServerName": ["192.168.20.181"],
    "DnsServer": [],
    "GroupDomain": "dellemcdomain.com",
    "NetworkTimeOut": 120,
    "Password": null,
    "SearchTimeOut": 120,
    "ServerPort": 3269,
    "CertificateValidation": false
  }
error_info:
  description: Details of the HTTP Error.
  returned: on HTTP error
  type: dict
  sample: {
     "error_info": {
        "error": {
            "@Message.ExtendedInfo": [
                {
                    "Message": "Unable to connect to the LDAP or AD server because the entered credentials are invalid.",
                    "MessageArgs": [],
                    "MessageId": "CSEC5002",
                    "RelatedProperties": [],
                    "Resolution": "Make sure the server input configuration are valid and retry the operation.",
                    "Severity": "Critical"
                }
            ],
            "code": "Base.1.0.GeneralError",
            "message": "A general error has occurred. See ExtendedInfo for more information."
        }
     }
  }
"""

import json
import os
from ssl import SSLError
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.six.moves.urllib.error import URLError, HTTPError
from ansible.module_utils.urls import ConnectionError
from ansible_collections.dellemc.openmanage.plugins.module_utils.ome import RestOME, ome_auth_params
from ansible.module_utils.common.dict_transformations import recursive_diff

AD_URI = "AccountService/ExternalAccountProvider/ADAccountProvider"
TEST_CONNECTION = "AccountService/ExternalAccountProvider/Actions/ExternalAccountProvider.TestADConnection"
DELETE_AD = "AccountService/ExternalAccountProvider/Actions/ExternalAccountProvider.DeleteExternalAccountProvider"
NO_CHANGES_MSG = "No changes found to be applied."
CHANGES_FOUND = "Changes found to be applied."
MAX_AD_MSG = "Unable to add the account provider because the maximum number of configurations allowed for an" \
             " Active Directory service is {0}."
CREATE_SUCCESS = "Successfully added the Active Directory service."
MODIFY_SUCCESS = "Successfully modified the Active Directory service."
DELETE_SUCCESS = "Successfully deleted the Active Directory service."
DOM_SERVER_MSG = "Specify the domain server. Domain server is required to create an Active Directory service."
GRP_DOM_MSG = "Specify the group domain. Group domain is required to create an Active Directory service."
CERT_INVALID = "The provided certificate file path is invalid or not readable."
DOMAIN_ALLOWED_COUNT = "Maximum entries allowed for {0} lookup type is {1}."
TEST_CONNECTION_SUCCESS = "Test Connection is successful. "
TEST_CONNECTION_FAIL = "Test Connection has failed. "
ERR_READ_FAIL = "Unable to retrieve the error details."
INVALID_ID = "The provided Active Directory ID is invalid."
TIMEOUT_RANGE = "The {0} value is not in the range of {1} to {2}."
MAX_AD = 2
MIN_TIMEOUT = 15
MAX_TIMEOUT = 300


def get_ad(module, rest_obj):
    ad = {}
    prm = module.params
    resp = rest_obj.invoke_request('GET', AD_URI)
    ad_list = resp.json_data.get('value')
    ad_cnt = len(ad_list)
    ky = 'Name'
    vl = 'name'
    if prm.get('id'):
        ky = 'Id'
        vl = 'id'
    for adx in ad_list:
        if str(adx.get(ky)).lower() == str(prm.get(vl)).lower():
            ad = adx
            break
    return ad, ad_cnt


def test_http_error_fail(module, err):
    try:
        error_info = json.load(err)
        err_list = error_info.get('error', {}).get('@Message.ExtendedInfo', [ERR_READ_FAIL])
        if err_list:
            err_rsn = err_list[0].get("Message")
    except Exception:
        err_rsn = ERR_READ_FAIL
    module.fail_json(msg="{0}{1}".format(TEST_CONNECTION_FAIL, err_rsn), error_info=error_info)


def test_connection(module, rest_obj, create_payload):
    try:
        create_payload['UserName'] = module.params.get('domain_username')
        create_payload['Password'] = module.params.get('domain_password')
        rest_obj.invoke_request('POST', TEST_CONNECTION, data=create_payload,
                                api_timeout=create_payload['NetworkTimeOut'])
        create_payload.pop('UserName', None)
        create_payload.pop('Password', None)
    except HTTPError as err:
        test_http_error_fail(module, err)
    except SSLError as err:
        module.fail_json(msg="{0}{1}".format(TEST_CONNECTION_FAIL, str(err)))
    except Exception as err:
        module.fail_json(msg="{0}{1}".format(TEST_CONNECTION_FAIL, str(err)))


def make_payload(prm):
    dc_type = {'DNS': 'DnsServer', 'MANUAL': 'ServerName'}
    tmplt_ad = {'name': 'Name', 'domain_controller_port': 'ServerPort', 'domain_controller_lookup': 'ServerType',
                'domain_server': dc_type[prm.get('domain_controller_lookup')], 'group_domain': 'GroupDomain',
                'network_timeout': 'NetworkTimeOut', 'search_timeout': 'SearchTimeOut',
                'validate_certificate': 'CertificateValidation'}
    payload = dict([(v, prm.get(k)) for k, v in tmplt_ad.items() if prm.get(k) is not None])
    return payload


def validate_n_testconnection(module, rest_obj, payload):
    dc_cnt = {'DNS': 1, 'MANUAL': 3}
    dc_type = {'DNS': 'DnsServer', 'MANUAL': 'ServerName'}
    dc_lookup = payload.get('ServerType')
    if len(payload.get(dc_type[dc_lookup])) > dc_cnt[dc_lookup]:
        module.fail_json(msg=DOMAIN_ALLOWED_COUNT.format(dc_lookup, dc_cnt[dc_lookup]))
    t_list = ['NetworkTimeOut', 'SearchTimeOut']
    for tx in t_list:
        if payload.get(tx) not in range(MIN_TIMEOUT, MAX_TIMEOUT + 1):
            module.fail_json(msg=TIMEOUT_RANGE.format(tx, MIN_TIMEOUT, MAX_TIMEOUT))
    payload['CertificateFile'] = ""
    if payload.get('CertificateValidation'):
        cert_path = module.params.get('certificate_file')
        if os.path.exists(cert_path):
            with open(cert_path, 'r') as certfile:
                cert_data = certfile.read()
                payload['CertificateFile'] = cert_data
        else:
            module.fail_json(msg=CERT_INVALID)
    msg = ""
    if module.params.get('test_connection'):
        test_connection(module, rest_obj, payload)
        msg = TEST_CONNECTION_SUCCESS
    return msg


def create_ad(module, rest_obj):
    prm = module.params
    if not prm.get('domain_server'):
        module.fail_json(msg=DOM_SERVER_MSG)
    if not prm.get('group_domain'):
        module.fail_json(msg=GRP_DOM_MSG)
    create_payload = make_payload(prm)
    msg = validate_n_testconnection(module, rest_obj, create_payload)
    if module.check_mode:
        module.exit_json(msg="{0}{1}".format(msg, CHANGES_FOUND), changed=True)
    resp = rest_obj.invoke_request('POST', AD_URI, data=create_payload)
    ad = resp.json_data
    ad.pop('CertificateFile', "")
    module.exit_json(msg="{0}{1}".format(msg, CREATE_SUCCESS), active_directory=ad, changed=True)


def modify_ad(module, rest_obj, ad):
    prm = module.params
    modify_payload = make_payload(prm)
    ad = rest_obj.strip_substr_dict(ad)
    if ad.get('ServerName'):
        (ad.get('ServerName')).sort()
    if modify_payload.get('ServerName'):
        (modify_payload.get('ServerName')).sort()
    diff = recursive_diff(modify_payload, ad)
    is_change = False
    if diff:
        if diff[0]:
            is_change = True
            ad.update(modify_payload)
    msg = validate_n_testconnection(module, rest_obj, ad)
    if not is_change and not ad.get('CertificateValidation'):
        module.exit_json(msg="{0}{1}".format(msg, NO_CHANGES_MSG), active_directory=ad)
    if module.check_mode:
        module.exit_json(msg="{0}{1}".format(msg, CHANGES_FOUND), changed=True)
    resp = rest_obj.invoke_request('PUT', "{0}({1})".format(AD_URI, ad['Id']), data=ad)
    ad = resp.json_data
    ad.pop('CertificateFile', "")
    module.exit_json(msg="{0}{1}".format(msg, MODIFY_SUCCESS), active_directory=ad, changed=True)


def delete_ad(module, rest_obj, ad):
    ad = rest_obj.strip_substr_dict(ad)
    if module.check_mode:
        module.exit_json(msg=CHANGES_FOUND, active_directory=ad, changed=True)
    resp = rest_obj.invoke_request('POST', DELETE_AD, data={"AccountProviderIds": [int(ad['Id'])]})
    module.exit_json(msg=DELETE_SUCCESS, active_directory=ad, changed=True)


def main():
    specs = {
        "state": {"type": 'str', "choices": ["present", "absent"], "default": 'present'},
        "name": {"type": 'str'},
        "id": {"type": 'int'},
        "domain_controller_lookup": {"type": 'str', "choices": ['MANUAL', 'DNS'], "default": 'DNS'},
        "domain_server": {"type": 'list', "elements": 'str'},
        "group_domain": {"type": 'str'},
        "domain_controller_port": {"type": 'int', "default": 3269},
        "network_timeout": {"type": 'int', "default": 120},
        "search_timeout": {"type": 'int', "default": 120},
        "validate_certificate": {"type": 'bool', "default": False},
        "certificate_file": {"type": 'path'},
        "test_connection": {"type": 'bool', "default": False},
        "domain_username": {"type": 'str'},
        "domain_password": {"type": 'str', "no_log": True}
    }
    specs.update(ome_auth_params)
    module = AnsibleModule(
        argument_spec=specs,
        required_one_of=[('name', 'id')],
        required_if=[
            ('test_connection', True, ('domain_username', 'domain_password',)),
            ('validate_certificate', True, ('certificate_file',))],
        mutually_exclusive=[('name', 'id')],
        supports_check_mode=True)
    try:
        with RestOME(module.params, req_session=True) as rest_obj:
            ad, ad_cnt = get_ad(module, rest_obj)
            if module.params.get('state') == 'present':
                if ad:
                    modify_ad(module, rest_obj, ad)
                else:
                    if module.params.get('id'):
                        module.fail_json(msg=INVALID_ID)
                    if ad_cnt < MAX_AD:
                        create_ad(module, rest_obj)
                    module.fail_json(msg=MAX_AD_MSG.format(MAX_AD))
            else:
                if ad:
                    delete_ad(module, rest_obj, ad)
                module.exit_json(msg=NO_CHANGES_MSG)
    except HTTPError as err:
        module.fail_json(msg=str(err), error_info=json.load(err))
    except URLError as err:
        module.exit_json(msg=str(err), unreachable=True)
    except (
            IOError, ValueError, SSLError, TypeError, ConnectionError, AttributeError, IndexError, KeyError,
            OSError) as err:
        module.fail_json(msg=str(err))


if __name__ == '__main__':
    main()

Anon7 - 2022
AnonSec Team