Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.227.114.85
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/self/root/lib/python3/dist-packages/ansible_collections/cisco/mso/plugins/module_utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/self/root/lib/python3/dist-packages/ansible_collections/cisco/mso/plugins/module_utils/mso.py
# -*- coding: utf-8 -*-

# Copyright: (c) 2018, Dag Wieers (@dagwieers) <dag@wieers.com>
# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause)

from __future__ import absolute_import, division, print_function

__metaclass__ = type

from copy import deepcopy
import re
import os
import ast
import datetime
import shutil
import tempfile
from ansible.module_utils.basic import json
from ansible.module_utils.basic import env_fallback
from ansible.module_utils.six import PY3
from ansible.module_utils.six.moves import filterfalse
from ansible.module_utils.six.moves.urllib.parse import urlencode, urljoin
from ansible.module_utils.urls import fetch_url
from ansible.module_utils._text import to_native, to_text
from ansible.module_utils.connection import Connection
from ansible_collections.cisco.mso.plugins.module_utils.constants import NDO_API_VERSION_PATH_FORMAT

try:
    from requests_toolbelt.multipart.encoder import MultipartEncoder

    HAS_MULTIPART_ENCODER = True
except ImportError:
    HAS_MULTIPART_ENCODER = False


if PY3:

    def cmp(a, b):
        return (a > b) - (a < b)


def issubset(subset, superset):
    """Recurse through nested dictionary and compare entries"""

    # Both objects are the same object
    if subset is superset:
        return True

    # Both objects are identical
    if subset == superset:
        return True

    # Both objects have a different type
    if type(subset) != type(superset):
        return False

    for key, value in subset.items():
        # Ignore empty values
        if value is None:
            return True

        # Item from subset is missing from superset
        if key not in superset:
            return False

        # Item has different types in subset and superset
        if type(superset.get(key)) != type(value):
            return False

        # Compare if item values are subset
        if isinstance(value, dict):
            if not issubset(superset.get(key), value):
                return False
        elif isinstance(value, list):
            try:
                # NOTE: Fails for lists of dicts
                if not set(value) <= set(superset.get(key)):
                    return False
            except TypeError:
                # Fall back to exact comparison for lists of dicts
                diff = list(filterfalse(lambda i: i in value, superset.get(key))) + list(filterfalse(lambda j: j in superset.get(key), value))
                if diff:
                    return False
        elif isinstance(value, set):
            if not value <= superset.get(key):
                return False
        else:
            if not value == superset.get(key):
                return False

    return True


def update_qs(params):
    """Append key-value pairs to self.filter_string"""
    accepted_params = dict((k, v) for (k, v) in params.items() if v is not None)
    return "?" + urlencode(accepted_params)


def mso_argument_spec():
    return dict(
        host=dict(type="str", required=False, aliases=["hostname"], fallback=(env_fallback, ["MSO_HOST"])),
        port=dict(type="int", required=False, fallback=(env_fallback, ["MSO_PORT"])),
        username=dict(type="str", required=False, fallback=(env_fallback, ["MSO_USERNAME", "ANSIBLE_NET_USERNAME"])),
        password=dict(type="str", required=False, no_log=True, fallback=(env_fallback, ["MSO_PASSWORD", "ANSIBLE_NET_PASSWORD"])),
        output_level=dict(type="str", default="normal", choices=["debug", "info", "normal"], fallback=(env_fallback, ["MSO_OUTPUT_LEVEL"])),
        timeout=dict(type="int", default=30, fallback=(env_fallback, ["MSO_TIMEOUT"])),
        use_proxy=dict(type="bool", fallback=(env_fallback, ["MSO_USE_PROXY"])),
        use_ssl=dict(type="bool", fallback=(env_fallback, ["MSO_USE_SSL"])),
        validate_certs=dict(type="bool", fallback=(env_fallback, ["MSO_VALIDATE_CERTS"])),
        login_domain=dict(type="str", fallback=(env_fallback, ["MSO_LOGIN_DOMAIN"])),
    )


def mso_reference_spec():
    return dict(
        name=dict(type="str", required=True),
        schema=dict(type="str"),
        template=dict(type="str"),
    )


def mso_epg_subnet_spec():
    return dict(
        subnet=dict(type="str", required=True, aliases=["ip"]),
        description=dict(type="str"),
        scope=dict(type="str", default="private", choices=["private", "public"]),
        shared=dict(type="bool", default=False),
        no_default_gateway=dict(type="bool", default=False),
    )


def mso_subnet_spec():
    subnet_spec = mso_epg_subnet_spec()
    subnet_spec.update(dict(querier=dict(type="bool", default=False)))
    return subnet_spec


def mso_bd_subnet_spec():
    subnet_spec = mso_epg_subnet_spec()
    subnet_spec.update(dict(querier=dict(type="bool", default=False)))
    subnet_spec.update(dict(primary=dict(type="bool", default=False)))
    subnet_spec.update(dict(virtual=dict(type="bool", default=False)))
    return subnet_spec


def mso_dhcp_spec():
    return dict(
        dhcp_option_policy=dict(type="dict", options=mso_dhcp_option_spec()),
        name=dict(type="str", required=True),
        version=dict(type="int", required=True),
    )


def mso_dhcp_option_spec():
    return dict(
        name=dict(type="str", required=True),
        version=dict(type="int", required=True),
    )


def mso_contractref_spec():
    return dict(
        name=dict(type="str", required=True),
        schema=dict(type="str"),
        template=dict(type="str"),
        type=dict(type="str", required=True, choices=["consumer", "provider"]),
    )


def mso_expression_spec():
    return dict(
        type=dict(type="str", required=True, aliases=["tag"]),
        operator=dict(type="str", choices=["not_in", "in", "equals", "not_equals", "has_key", "does_not_have_key"], required=True),
        value=dict(type="str"),
    )


def mso_expression_spec_ext_epg():
    return dict(
        type=dict(type="str", choices=["ip_address"], required=True),
        operator=dict(type="str", choices=["equals"], required=True),
        value=dict(type="str", required=True),
    )


def mso_hub_network_spec():
    return dict(
        name=dict(type="str", required=True),
        tenant=dict(type="str", required=True),
    )


def mso_object_migrate_spec():
    return dict(
        epg=dict(type="str", required=True),
        anp=dict(type="str", required=True),
    )


def mso_service_graph_node_spec():
    return dict(
        type=dict(type="str", required=True),
    )


def mso_service_graph_node_device_spec():
    return dict(
        name=dict(type="str", required=True),
    )


def mso_service_graph_connector_spec():
    return dict(
        provider=dict(type="str", required=True),
        consumer=dict(type="str", required=True),
        # Only connectorType bd with value "general" is supported for now thus fixed in code
        #  when connectorType externalEpg is supported "route-peering" should be added
        #  also change SERVICE_NODE_CONNECTOR_TYPE_MAP in constants.py
        #  also verify if connector type is specific to provider or always same for both
        connector_object_type=dict(type="str", default="bd", choices=["bd"]),
        provider_schema=dict(type="str"),
        provider_template=dict(type="str"),
        consumer_schema=dict(type="str"),
        consumer_template=dict(type="str"),
    )


def mso_site_anp_epg_bulk_staticport_spec():
    return dict(
        type=dict(type="str", choices=["port", "vpc", "dpc"]),
        pod=dict(type="str"),  # This parameter is not required for querying all objects
        leaf=dict(type="str"),  # This parameter is not required for querying all objects
        fex=dict(type="str"),  # This parameter is not required for querying all objects
        path=dict(type="str"),  # This parameter is not required for querying all objects
        vlan=dict(type="int"),  # This parameter is not required for querying all objects
        primary_micro_segment_vlan=dict(type="int"),  # This parameter is not required for querying all objects
        deployment_immediacy=dict(type="str", choices=["immediate", "lazy"]),
        mode=dict(type="str", choices=["native", "regular", "untagged"]),
    )


# Copied from ansible's module uri.py (url): https://github.com/ansible/ansible/blob/cdf62edc65f564fff6b7e575e084026fa7faa409/lib/ansible/modules/uri.py
def write_file(module, url, dest, content, resp, tmpsrc=None):
    # create a tempfile with some test content

    if tmpsrc is None and content is not None:
        fd, tmpsrc = tempfile.mkstemp(dir=module.tmpdir)
        f = open(tmpsrc, "wb")
        try:
            f.write(content)
        except Exception as e:
            os.remove(tmpsrc)
            module.fail_json(msg="Failed to create temporary content file: {0}".format(to_native(e)))
        f.close()

    checksum_src = None
    checksum_dest = None

    # raise an error if there is no tmpsrc file
    if not os.path.exists(tmpsrc):
        os.remove(tmpsrc)
        module.fail_json(msg="Source '{0}' does not exist".format(tmpsrc))
    if not os.access(tmpsrc, os.R_OK):
        os.remove(tmpsrc)
        module.fail_json(msg="Source '{0}' is not readable".format(tmpsrc))
    checksum_src = module.sha1(tmpsrc)

    # check if there is no dest file
    if os.path.exists(dest):
        # raise an error if copy has no permission on dest
        if not os.access(dest, os.W_OK):
            os.remove(tmpsrc)
            module.fail_json(msg="Destination '{0}' not writable".format(dest))
        if not os.access(dest, os.R_OK):
            os.remove(tmpsrc)
            module.fail_json(msg="Destination '{0}' not readable".format(dest))
        checksum_dest = module.sha1(dest)
    else:
        if not os.access(os.path.dirname(dest), os.W_OK):
            os.remove(tmpsrc)
            module.fail_json(msg="Destination dir '{0}' not writable".format(os.path.dirname(dest)))

    if checksum_src != checksum_dest:
        try:
            shutil.copyfile(tmpsrc, dest)
        except Exception as e:
            os.remove(tmpsrc)
            module.fail_json(msg="failed to copy {0} to {1}: {2}".format(tmpsrc, dest, to_native(e)))

    os.remove(tmpsrc)


class MSOModule(object):
    def __init__(self, module):
        self.module = module
        self.params = module.params
        self.result = dict(changed=False)
        self.headers = {"Content-Type": "text/json"}
        self.platform = "mso"

        # normal output
        self.existing = dict()

        # mso_rest output
        self.jsondata = None
        self.error = dict(code=None, message=None, info=None)

        # info output
        self.previous = dict()
        self.proposed = dict()
        self.sent = dict()
        self.stdout = None
        self.patch_operation = None

        # debug output
        self.has_modified = False
        self.filter_string = ""
        self.method = None
        self.path = None
        self.response = None
        self.status = None
        self.url = None
        self.httpapi_logs = list()

        if self.module._debug:
            self.module.warn("Enable debug output because ANSIBLE_DEBUG was set.")
            self.params["output_level"] = "debug"

        if self.module._socket_path is None:
            if self.params.get("use_ssl") is None:
                self.params["use_ssl"] = True
            if self.params.get("use_proxy") is None:
                self.params["use_proxy"] = True
            if self.params.get("validate_certs") is None:
                self.params["validate_certs"] = True

            # Ensure protocol is set
            self.params["protocol"] = "https" if self.params.get("use_ssl", True) else "http"

            # Set base_uri
            if self.params.get("port") is not None:
                self.base_only_uri = "{protocol}://{host}:{port}/".format(**self.params)
                self.baseuri = "{0}api/v1/".format(self.base_only_uri)
            else:
                self.base_only_uri = "{protocol}://{host}/".format(**self.params)
                self.baseuri = "{0}api/v1/".format(self.base_only_uri)

            if self.params.get("host") is None:
                self.fail_json(msg="Parameter 'host' is required when not using the HTTP API connection plugin")

            if self.params.get("password"):
                # Perform password-based authentication, log on using password
                self.login()
            else:
                self.fail_json(msg="Parameter 'password' is required for authentication")
        else:
            self.connection = Connection(self.module._socket_path)
            if self.connection.get_platform() == "cisco.nd":
                self.platform = "nd"

    def get_login_domain_id(self, domain):
        """Get a domain and return its id"""
        if domain is None:
            return domain
        d = self.get_obj("auth/login-domains", key="domains", name=domain)
        if not d:
            self.fail_json(msg="Login domain '%s' is not a valid domain name." % domain)
        if "id" not in d:
            self.fail_json(msg="Login domain lookup failed for domain '%s': %s" % (domain, d))
        return d["id"]

    def login(self):
        """Log in to MSO"""

        # Perform login request
        if (self.params.get("login_domain") is not None) and (self.params.get("login_domain") != "Local"):
            domain_id = self.get_login_domain_id(self.params.get("login_domain"))
            payload = {"username": self.params.get("username", "admin"), "password": self.params.get("password"), "domainId": domain_id}
        else:
            payload = {"username": self.params.get("username", "admin"), "password": self.params.get("password")}
        self.url = urljoin(self.baseuri, "auth/login")
        resp, auth = fetch_url(
            self.module,
            self.url,
            data=json.dumps(payload),
            method="POST",
            headers=self.headers,
            timeout=self.params.get("timeout"),
            use_proxy=self.params.get("use_proxy"),
        )

        # Handle MSO response
        if auth.get("status") not in [200, 201]:
            self.response = auth.get("msg")
            self.status = auth.get("status")
            self.fail_json(msg="Authentication failed: {msg}".format(**auth))

        payload = json.loads(resp.read())

        self.headers["Authorization"] = "Bearer {token}".format(**payload)

    def response_json(self, rawoutput):
        """Handle MSO JSON response output"""
        try:
            self.jsondata = json.loads(rawoutput)
        except Exception as e:
            # Expose RAW output for troubleshooting
            self.error = dict(code=-1, message="Unable to parse output as JSON, see 'raw' output. %s" % e)
            self.result["raw"] = rawoutput
            return

        # Handle possible MSO error information
        if self.status not in [200, 201, 202, 204]:
            self.error = self.jsondata

    def request_download(self, path, destination=None, method="GET", api_version="v1"):
        if self.platform != "nd":
            self.url = urljoin(self.baseuri, path)

        redirected = False
        redir_info = {}
        redirect = {}
        content = None
        data = None

        src = self.params.get("src")
        if src:
            try:
                self.headers.update({"Content-Length": os.stat(src).st_size})
                data = open(src, "rb")
            except OSError:
                self.fail_json(msg="Unable to open source file %s" % src, elapsed=0)

        kwargs = {}
        if destination is not None and os.path.isdir(destination):
            # first check if we are redirected to a file download
            if self.platform == "nd":
                redir_info = self.connection.get_remote_file_io_stream(
                    NDO_API_VERSION_PATH_FORMAT.format(api_version=api_version, path=path), self.module.tmpdir, method
                )
                # In place of Content-Disposition, NDO get_remote_file_io_stream returns content-disposition.
                content_disposition = redir_info.get("content-disposition")
            else:
                check, redir_info = fetch_url(self.module, self.url, headers=self.headers, method=method, timeout=self.params.get("timeout"))
                content_disposition = check.headers.get("Content-Disposition")

            if content_disposition:
                file_name = content_disposition.split("filename=")[1]
            else:
                self.fail_json(msg="Failed to fetch {0} backup information from MSO/NDO, response: {1}".format(self.params.get("backup"), redir_info))

            # if we are redirected, update the url with the location header and update dest with the new url filename
            if redir_info["status"] in (301, 302, 303, 307):
                self.url = redir_info.get("location")
                redirected = True
            destination = os.path.join(destination, file_name)

        # if destination file already exist, only download if file newer
        if os.path.exists(destination):
            kwargs["last_mod_time"] = datetime.datetime.utcfromtimestamp(os.path.getmtime(destination))

        if self.platform == "nd":
            if redir_info["status"] == 200 and redirected is False:
                info = redir_info
            else:
                info = self.connection.get_remote_file_io_stream("/mso/{0}".format(self.url.split("/mso/", 1)), self.module.tmpdir, method)
        else:
            resp, info = fetch_url(
                self.module,
                self.url,
                data=data,
                headers=self.headers,
                method=method,
                timeout=self.params.get("timeout"),
                unix_socket=self.params.get("unix_socket"),
                **kwargs
            )

            try:
                content = resp.read()
            except AttributeError:
                # there was no content, but the error read() may have been stored in the info as 'body'
                content = info.pop("body", "")

            if src:
                # Try to close the open file handle
                try:
                    data.close()
                except Exception:
                    pass

        redirect["redirected"] = redirected or info.get("url") != self.url
        redirect.update(redir_info)
        redirect.update(info)

        write_file(self.module, self.url, destination, content, redirect, info.get("tmpsrc"))

        return redirect, destination

    def request_upload(self, path, fields=None, method="POST", api_version="v1"):
        """Generic HTTP MultiPart POST method for MSO uploads."""
        self.path = path
        if self.platform != "nd":
            self.url = urljoin(self.baseuri, path)

        info = dict()

        if self.platform == "nd":
            try:
                if os.path.exists(self.params.get("backup")):
                    info = self.connection.send_file_request(
                        method,
                        NDO_API_VERSION_PATH_FORMAT.format(api_version=api_version, path=path),
                        file=self.params.get("backup"),
                        remote_path=self.params.get("remote_path"),
                    )
                else:
                    self.fail_json(msg="Upload failed due to: No such file or directory, Backup file: '{0}'".format(self.params.get("backup")))
            except Exception as error:
                self.fail_json("NDO upload failed due to: {0}".format(error))
        else:
            if not HAS_MULTIPART_ENCODER:
                self.fail_json(msg="requests-toolbelt is required for the upload state of this module")

            mp_encoder = MultipartEncoder(fields=fields)
            self.headers["Content-Type"] = mp_encoder.content_type
            self.headers["Accept-Encoding"] = "gzip, deflate, br"

            resp, info = fetch_url(
                self.module,
                self.url,
                headers=self.headers,
                data=mp_encoder,
                method=method,
                timeout=self.params.get("timeout"),
                use_proxy=self.params.get("use_proxy"),
            )

        self.response = info.get("msg")
        self.status = info.get("status")

        # Get change status from HTTP headers
        if "modified" in info:
            self.has_modified = True
            if info.get("modified") == "false":
                self.result["changed"] = False
            elif info.get("modified") == "true":
                self.result["changed"] = True

        # 200: OK, 201: Created, 202: Accepted, 204: No Content
        if self.status in (200, 201, 202, 204):
            if self.platform == "nd":
                return info
            else:
                output = resp.read()
                if output:
                    return json.loads(output)

        # 400: Bad Request, 401: Unauthorized, 403: Forbidden,
        # 405: Method Not Allowed, 406: Not Acceptable
        # 500: Internal Server Error, 501: Not Implemented
        elif self.status:
            if self.status >= 400:
                try:
                    if self.platform == "nd":
                        payload = info.get("body")
                    else:
                        payload = json.loads(resp.read())
                except (ValueError, AttributeError):
                    try:
                        payload = json.loads(info.get("body"))
                    except Exception:
                        self.fail_json(msg="MSO Error:", info=info)
                if "code" in payload:
                    self.fail_json(msg="MSO Error {code}: {message}".format(**payload), info=info, payload=payload)
                else:
                    self.fail_json(msg="MSO Error:".format(**payload), info=info, payload=payload)
        else:
            self.fail_json(msg="Backup file upload failed due to: {0}".format(info))
        return {}

    def request(self, path, method=None, data=None, qs=None, api_version="v1"):
        """Generic HTTP method for MSO requests."""
        self.path = path

        if method is not None:
            self.method = method

        # If we PATCH with empty operations, return
        if method == "PATCH" and not data:
            return {}
        else:
            self.patch_operation = data

        # if method in ['PATCH', 'PUT']:
        #     if qs is not None:
        #         qs['enableVersionCheck'] = 'true'
        #     else:
        #         qs = dict(enableVersionCheck='true')

        if method in ["PATCH"]:
            if qs is not None:
                qs["validate"] = "false"
            else:
                qs = dict(validate="false")

        resp = None
        if self.module._socket_path:
            self.connection.set_params(self.params)
            if api_version is not None:
                uri = NDO_API_VERSION_PATH_FORMAT.format(api_version=api_version, path=self.path)
            else:
                uri = self.path

            if qs is not None:
                uri = uri + update_qs(qs)

            try:
                info = self.connection.send_request(method, uri, json.dumps(data))
                self.url = info.get("url")
                self.httpapi_logs.extend(self.connection.pop_messages())
                info.pop("date", None)
            except Exception as e:
                try:
                    error_obj = json.loads(to_text(e))
                except Exception:
                    error_obj = dict(
                        error=dict(code=-1, message="Unable to parse error output as JSON. Raw error message: {0}".format(e), exception=to_text(e))
                    )
                    pass
                self.fail_json(msg=error_obj["error"]["message"])

        else:
            if api_version is not None:
                self.url = "{0}api/{1}/{2}".format(self.base_only_uri, api_version, self.path.lstrip("/"))
            else:
                self.url = "{0}{1}".format(self.base_only_uri, self.path.lstrip("/"))

            if qs is not None:
                self.url = self.url + update_qs(qs)
            resp, info = fetch_url(
                self.module,
                self.url,
                headers=self.headers,
                data=json.dumps(data),
                method=self.method,
                timeout=self.params.get("timeout"),
                use_proxy=self.params.get("use_proxy"),
            )

        self.response = info.get("msg")
        self.status = info.get("status", -1)

        # Get change status from HTTP headers
        if "modified" in info:
            self.has_modified = True
            if info.get("modified") == "false":
                self.result["changed"] = False
            elif info.get("modified") == "true":
                self.result["changed"] = True

        # 200: OK, 201: Created, 202: Accepted
        if self.status in (200, 201, 202):
            try:
                output = resp.read()
                if output:
                    try:
                        return json.loads(output)
                    except Exception as e:
                        self.error = dict(code=-1, message="Unable to parse output as JSON, see 'raw' output. {0}".format(e))
                        self.result["raw"] = output
                        return
            except AttributeError:
                return info.get("body")

        # 204: No Content
        elif self.status == 204:
            return {}

        # 404: Not Found
        elif self.method == "DELETE" and self.status == 404:
            return {}

        # 400: Bad Request, 401: Unauthorized, 403: Forbidden,
        # 405: Method Not Allowed, 406: Not Acceptable
        # 500: Internal Server Error, 501: Not Implemented
        elif self.status >= 400:
            self.result["status"] = self.status
            body = info.get("body")
            if body is not None:
                try:
                    if isinstance(body, dict):
                        payload = body
                    else:
                        payload = json.loads(body)
                except Exception as e:
                    self.error = dict(code=-1, message="Unable to parse output as JSON, see 'raw' output. %s" % e)
                    self.result["raw"] = body
                    self.fail_json(msg="MSO Error:", data=data, info=info)
                self.error = payload
                if "code" in payload:
                    self.fail_json(msg="MSO Error {code}: {message}".format(**payload), data=data, info=info, payload=payload)
                else:
                    self.fail_json(msg="MSO Error:".format(**payload), data=data, info=info, payload=payload)
            else:
                # Connection error
                msg = "Connection failed for {0}. {1}".format(info.get("url"), info.get("msg"))
                self.error = msg
                self.fail_json(msg=msg)
            return {}

    def query_objs(self, path, key=None, api_version="v1", **kwargs):
        """Query the MSO REST API for objects in a path"""
        found = []
        objs = self.request(path, api_version=api_version, method="GET")

        if objs == {} or objs == []:
            return found

        if key is None:
            key = path

        if isinstance(objs, dict):
            if key not in objs:
                self.fail_json(msg="Key '{0}' missing from data".format(key), data=objs)
            objs_list = objs.get(key)
        else:
            objs_list = objs
        for obj in objs_list:
            for kw_key, kw_value in kwargs.items():
                if kw_value is None:
                    continue
                if isinstance(kw_value, dict):
                    obj_value = obj.get(kw_key)
                    if obj_value is not None and isinstance(obj_value, dict):
                        breakout = False
                        for kw_key_lvl2, kw_value_lvl2 in kw_value.items():
                            if obj_value.get(kw_key_lvl2) != kw_value_lvl2:
                                breakout = True
                                break
                        if breakout:
                            break
                    else:
                        break
                elif obj.get(kw_key) != kw_value:
                    break
            else:
                found.append(obj)

        return found

    def query_obj(self, path, api_version="v1", **kwargs):
        """Query the MSO REST API for the whole object at a path"""
        obj = self.request(path, api_version=api_version, method="GET")
        if obj == {}:
            return {}
        for kw_key, kw_value in kwargs.items():
            if kw_value is None:
                continue
            if isinstance(kw_value, dict):
                obj_value = obj.get(kw_key)
                if obj_value is not None and isinstance(obj_value, dict):
                    for kw_key_lvl2, kw_value_lvl2 in kw_value.items():
                        if obj_value.get(kw_key_lvl2) != kw_value_lvl2:
                            return {}
            elif obj.get(kw_key) != kw_value:
                return {}
        return obj

    def get_obj(self, path, api_version="v1", **kwargs):
        """Get a specific object from a set of MSO REST objects"""
        objs = self.query_objs(path, api_version=api_version, **kwargs)
        if len(objs) == 0:
            return {}
        if len(objs) > 1:
            self.fail_json(msg="More than one object matches unique filter: {0}".format(kwargs))
        return objs[0]

    def lookup_schema(self, schema):
        """Look up schema and return its id"""
        if schema is None:
            return schema

        schema_summary = self.query_objs("schemas/list-identity", key="schemas", displayName=schema)
        if not schema_summary:
            self.fail_json(msg="Provided schema '{0}' does not exist.".format(schema))
        schema_id = schema_summary[0].get("id")
        if not schema_id:
            self.fail_json(msg="Schema lookup failed for schema '{0}': '{1}'".format(schema, schema_id))
        return schema_id

    def lookup_domain(self, domain):
        """Look up a domain and return its id"""
        if domain is None:
            return domain

        d = self.get_obj("auth/domains", key="domains", name=domain)
        if not d:
            self.fail_json(msg="Domain '%s' is not a valid domain name." % domain)
        if "id" not in d:
            self.fail_json(msg="Domain lookup failed for domain '%s': %s" % (domain, d))
        return d.get("id")

    def lookup_roles(self, roles):
        """Look up roles and return their ids"""
        if roles is None:
            return roles

        ids = []
        for role in roles:
            access_type = "readWrite"
            try:
                role = ast.literal_eval(role)
                if type(role) is dict and "name" in role:
                    name = role.get("name")
                    if role.get("access_type") == "read":
                        access_type = "readOnly"
            except ValueError:
                name = role

            r = self.get_obj("roles", name=name)
            if not r:
                self.fail_json(msg="Role '%s' is not a valid role name." % name)
            if "id" not in r:
                self.fail_json(msg="Role lookup failed for role '%s': %s" % (name, r))
            ids.append(dict(roleId=r.get("id"), accessType=access_type))
        return ids

    def lookup_site(self, site):
        """Look up a site and return its id"""
        if site is None:
            return site

        s = self.get_obj("sites", name=site)
        if not s:
            self.fail_json(msg="Site '%s' is not a valid site name." % site)
        if "id" not in s:
            self.fail_json(msg="Site lookup failed for site '%s': %s" % (site, s))
        return s.get("id")

    def lookup_sites(self, sites):
        """Look up sites and return their ids"""
        if sites is None:
            return sites

        ids = []
        for site in sites:
            s = self.get_obj("sites", name=site)
            if not s:
                self.fail_json(msg="Site '%s' is not a valid site name." % site)
            if "id" not in s:
                self.fail_json(msg="Site lookup failed for site '%s': %s" % (site, s))
            ids.append(dict(siteId=s.get("id"), securityDomains=[]))
        return ids

    def lookup_tenant(self, tenant):
        """Look up a tenant and return its id"""
        if tenant is None:
            return tenant

        t = self.get_obj("tenants", key="tenants", name=tenant)
        if not t:
            self.fail_json(msg="Tenant '%s' is not valid tenant name." % tenant)
        if "id" not in t:
            self.fail_json(msg="Tenant lookup failed for tenant '%s': %s" % (tenant, t))
        return t.get("id")

    def lookup_remote_location(self, remote_location):
        """Look up a remote location and return its path and id"""
        if remote_location is None:
            return None

        remote = self.get_obj("platform/remote-locations", key="remoteLocations", name=remote_location)
        if "id" not in remote:
            self.fail_json(msg="No remote location found for remote '%s'" % (remote_location))
        remote_info = dict(id=remote.get("id"), path=remote.get("credential")["remotePath"])
        return remote_info

    def lookup_users(self, users):
        """Look up users and return their ids"""
        # Ensure tenant has at least admin user
        if users is None:
            users = ["admin"]
        elif "admin" not in users:
            users.append("admin")

        ids = []
        for user in users:
            if self.platform == "nd":
                u = self.get_obj("users", loginID=user, api_version="v2")
            else:
                u = self.get_obj("users", username=user)
            if not u:
                self.fail_json(msg="User '%s' is not a valid user name." % user)
            if "id" not in u:
                if "userID" not in u:
                    self.fail_json(msg="User lookup failed for user '%s': %s" % (user, u))
                id = dict(userId=u.get("userID"))
            else:
                id = dict(userId=u.get("id"))
            if id in ids:
                self.fail_json(msg="User '%s' is duplicate." % user)
            ids.append(id)

        return ids

    def create_label(self, label, label_type):
        """Create a new label"""
        return self.request("labels", method="POST", data=dict(displayName=label, type=label_type))

    def lookup_labels(self, labels, label_type):
        """Look up labels and return their ids (create if necessary)"""
        if labels is None:
            return None

        ids = []
        for label in labels:
            label_obj = self.get_obj("labels", displayName=label)
            if not label_obj:
                label_obj = self.create_label(label, label_type)
            if "id" not in label_obj:
                self.fail_json(msg="Label lookup failed for label '%s': %s" % (label, label_obj))
            ids.append(label_obj.get("id"))
        return ids

    def anp_ref(self, **data):
        """Create anpRef string"""
        return "/schemas/{schema_id}/templates/{template}/anps/{anp}".format(**data)

    def epg_ref(self, **data):
        """Create epgRef string"""
        return "/schemas/{schema_id}/templates/{template}/anps/{anp}/epgs/{epg}".format(**data)

    def bd_ref(self, **data):
        """Create bdRef string"""
        return "/schemas/{schema_id}/templates/{template}/bds/{bd}".format(**data)

    def contract_ref(self, **data):
        """Create contractRef string"""
        # Support the contract argspec
        if "name" in data:
            data["contract"] = data.get("name")
        return "/schemas/{schema_id}/templates/{template}/contracts/{contract}".format(**data)

    def filter_ref(self, **data):
        """Create a filterRef string"""
        return "/schemas/{schema_id}/templates/{template}/filters/{filter}".format(**data)

    def vrf_ref(self, **data):
        """Create vrfRef string"""
        return "/schemas/{schema_id}/templates/{template}/vrfs/{vrf}".format(**data)

    def l3out_ref(self, **data):
        """Create l3outRef string"""
        return "/schemas/{schema_id}/templates/{template}/l3outs/{l3out}".format(**data)

    def ext_epg_ref(self, **data):
        """Create extEpgRef string"""
        return "/schemas/{schema_id}/templates/{template}/externalEpgs/{external_epg}".format(**data)

    def service_graph_ref(self, **data):
        """Create serviceGraphRef string"""
        return "/schemas/{schema_id}/templates/{template}/serviceGraphs/{service_graph}".format(**data)

    def vrf_dict_from_ref(self, data):
        vrf_ref_regex = re.compile(r"\/schemas\/(.*)\/templates\/(.*)\/vrfs\/(.*)")
        vrf_dict = vrf_ref_regex.search(data)
        return {
            "vrfName": vrf_dict.group(3),
            "schemaId": vrf_dict.group(1),
            "templateName": vrf_dict.group(2),
        }

    def dict_from_ref(self, data):
        if data and data != "":
            ref_regex = re.compile(r"\/schemas\/(.*)\/templates\/(.*?)\/(.*?)\/(.*)")
            dic = ref_regex.search(data)
            if dic is not None:
                schema_id = dic.group(1)
                template_name = dic.group(2)
                category = dic.group(3)
                name = dic.group(4)
                uri_map = {
                    "vrfs": ["vrfName", "schemaId", "templateName"],
                    "bds": ["bdName", "schemaId", "templateName"],
                    "filters": ["filterName", "schemaId", "templateName"],
                    "contracts": ["contractName", "schemaId", "templateName"],
                    "l3outs": ["l3outName", "schemaId", "templateName"],
                    "anps": ["anpName", "schemaId", "templateName"],
                    "serviceGraphs": ["serviceGraphName", "schemaId", "templateName"],
                    "serviceNode": ["serviceNodeName", "schemaId", "templateName", "serviceGraphName"],
                }
                result = {
                    uri_map[category][1]: schema_id,
                    uri_map[category][2]: template_name,
                }

                self.recursive_dict_from_ref_regex(name, result, uri_map[category][0])

                return result
            else:
                self.fail_json(msg="There was no group in search: {data}".format(data=data))

    def recursive_dict_from_ref_regex(self, data, result, category):
        continued_ref_regex = re.compile(r"(.*?)\/([a-zA-Z]+.*)")
        section_ref_regex = re.compile(r"([a-zA-Z]+)\/(.*)")
        dic_name = continued_ref_regex.search(data)
        if dic_name is not None:
            result[category] = dic_name.group(1)
            next_section = dic_name.group(2)
            dic_next_section = section_ref_regex.search(next_section)
            if dic_next_section is not None:
                next_name = dic_next_section.group(2)
                self.recursive_dict_from_ref_regex(next_name, result, dic_next_section.group(1).rstrip("s") + "Name")
        else:
            result[category] = data

    def recursive_dict_from_ref(self, data):
        for key in data:
            if key.endswith("Ref"):
                data[key] = self.dict_from_ref(data.get(key))
            if isinstance(data[key], list):
                for item in data[key]:
                    self.recursive_dict_from_ref(item)
        return data

    def make_reference(self, data, reftype, schema_id, template):
        """Create a reference from a dictionary"""
        # Removes entry from payload
        if data is None:
            return None

        if data.get("schema") is not None:
            schema_obj = self.get_obj("schemas", displayName=data.get("schema"))
            if not schema_obj:
                self.fail_json(msg="Referenced schema '{schema}' in {reftype}ref does not exist".format(reftype=reftype, **data))
            schema_id = schema_obj.get("id")

        if data.get("template") is not None:
            template = data.get("template")

        refname = "%sName" % reftype

        return {
            refname: data.get("name"),
            "schemaId": schema_id,
            "templateName": template,
        }

    def make_subnets(self, data, is_bd_subnet=True):
        """Create a subnets list from input"""
        if data is None:
            return None

        subnets = []
        for subnet in data:
            if "subnet" in subnet:
                subnet["ip"] = subnet.get("subnet")
            if subnet.get("description") is None:
                subnet["description"] = subnet.get("subnet")
            subnet_payload = dict(
                ip=subnet.get("ip"),
                description=str(subnet.get("description")),
                scope=subnet.get("scope"),
                shared=subnet.get("shared"),
                noDefaultGateway=subnet.get("no_default_gateway"),
            )
            if is_bd_subnet:
                subnet_payload.update(dict(querier=subnet.get("querier"), primary=subnet.get("primary"), virtual=subnet.get("virtual")))
            subnets.append(subnet_payload)

        return subnets

    def make_dhcp_label(self, data):
        """Create a DHCP policy from input"""
        if data is None:
            return None
        if type(data) == list:
            dhcps = []
            for dhcp in data:
                if "dhcp_option_policy" in dhcp:
                    dhcp["dhcpOptionLabel"] = dhcp.get("dhcp_option_policy")
                    del dhcp["dhcp_option_policy"]
                dhcps.append(dhcp)
            return dhcps
        if "version" in data:
            data["version"] = int(data.get("version"))
        if data and "dhcp_option_policy" in data:
            dhcp_option_policy = data.get("dhcp_option_policy")
            if dhcp_option_policy is not None and "version" in dhcp_option_policy:
                dhcp_option_policy["version"] = int(dhcp_option_policy.get("version"))
            data["dhcpOptionLabel"] = dhcp_option_policy
            del data["dhcp_option_policy"]
        return data

    def sanitize(self, updates, collate=False, required=None, unwanted=None):
        """Clean up unset keys from a request payload"""
        if required is None:
            required = []
        if unwanted is None:
            unwanted = []
        self.proposed = deepcopy(self.existing)
        self.sent = deepcopy(self.existing)

        if isinstance(self.existing, dict):
            for key in self.existing:
                # Remove References
                if key.endswith("Ref"):
                    del self.proposed[key]
                    del self.sent[key]
                    continue

                # Removed unwanted keys
                elif key in unwanted:
                    del self.proposed[key]
                    del self.sent[key]
                    continue

        if isinstance(updates, dict):
            # Clean up self.sent
            for key in updates:
                # Always retain 'id'
                if key in required:
                    if key in self.existing or updates.get(key) is not None:
                        self.sent[key] = updates.get(key)
                    continue

                # Remove unspecified values
                elif not collate and updates.get(key) is None:
                    if key in self.existing:
                        del self.sent[key]
                    continue

                # Remove identical values
                elif not collate and updates.get(key) == self.existing.get(key):
                    del self.sent[key]
                    continue

                # Add everything else
                if updates.get(key) is not None:
                    self.sent[key] = updates.get(key)

            # Update self.proposed
            self.proposed.update(self.sent)

        elif updates is not None:
            self.sent = updates
            # Update self.proposed
            self.proposed = self.sent

    def delete_keys_from_dict(self, dict_to_sanitize, keys):
        # TODO investigate combine this method above sanitize method
        copy = deepcopy(dict_to_sanitize)
        for (
            k,
            v,
        ) in copy.items():
            if k in keys:
                del dict_to_sanitize[k]
            elif isinstance(v, dict):
                dict_to_sanitize[k] = self.delete_keys_from_dict(v, keys)
            elif isinstance(v, list):
                for index, item in enumerate(v):
                    if isinstance(item, dict):
                        dict_to_sanitize[k][index] = self.delete_keys_from_dict(item, keys)
        return dict_to_sanitize

    def exit_json(self, **kwargs):
        """Custom written method to exit from module."""

        if self.params.get("state") in ("absent", "present", "upload", "restore", "download", "move", "clone"):
            if self.params.get("output_level") in ("debug", "info"):
                self.result["previous"] = self.previous
            # FIXME: Modified header only works for PATCH
            if not self.has_modified and self.previous != self.existing:
                self.result["changed"] = True
        if self.stdout:
            self.result["stdout"] = self.stdout

        # Return the gory details when we need it
        if self.params.get("output_level") == "debug":
            self.result["method"] = self.method
            self.result["response"] = self.response
            self.result["status"] = self.status
            self.result["url"] = self.url
            self.result["httpapi_logs"] = self.httpapi_logs
            self.result["socket"] = self.module._socket_path

            if self.params.get("state") in ("absent", "present"):
                self.result["sent"] = self.sent
                self.result["proposed"] = self.proposed

                if self.method == "PATCH":
                    self.result["patch_operation"] = self.patch_operation

        self.result["current"] = self.existing

        if self.module._diff and self.result.get("changed") is True:
            self.result["diff"] = dict(
                before=self.previous,
                after=self.existing,
            )

        self.result.update(**kwargs)
        self.module.exit_json(**self.result)

    def fail_json(self, msg, **kwargs):
        """Custom written method to return info on failure."""

        if self.params.get("state") in ("absent", "present"):
            if self.params.get("output_level") in ("debug", "info"):
                self.result["previous"] = self.previous
            # FIXME: Modified header only works for PATCH
            if not self.has_modified and self.previous != self.existing:
                self.result["changed"] = True
        if self.stdout:
            self.result["stdout"] = self.stdout

        # Return the gory details when we need it
        if self.params.get("output_level") == "debug":
            if self.url is not None:
                self.result["method"] = self.method
                self.result["response"] = self.response
                self.result["status"] = self.status
                self.result["url"] = self.url
                self.result["httpapi_logs"] = self.httpapi_logs
                self.result["socket"] = self.module._socket_path

            if self.params.get("state") in ("absent", "present"):
                self.result["sent"] = self.sent
                self.result["proposed"] = self.proposed

                if self.method == "PATCH":
                    self.result["patch_operation"] = self.patch_operation

        self.result["current"] = self.existing

        self.result.update(**kwargs)
        self.module.fail_json(msg=msg, **self.result)

    def check_changed(self):
        """Check if changed by comparing new values from existing"""
        existing = self.existing
        if "password" in existing:
            existing["password"] = self.sent.get("password")

        existing = self.remove_keys_from_dict_when_value_empty(existing)
        self.stdout = json.dumps(existing)

        return not issubset(self.sent, existing)

    def update_service_graph_obj(self, service_graph_obj):
        """update filter with more information"""
        service_graph_obj["serviceGraphRef"] = self.dict_from_ref(service_graph_obj.get("serviceGraphRef"))
        for service_node in service_graph_obj["serviceNodesRelationship"]:
            service_node.get("consumerConnector")["bdRef"] = self.dict_from_ref(service_node.get("consumerConnector").get("bdRef"))
            service_node.get("providerConnector")["bdRef"] = self.dict_from_ref(service_node.get("providerConnector").get("bdRef"))
            service_node["serviceNodeRef"] = self.dict_from_ref(service_node.get("serviceNodeRef"))
        if service_graph_obj.get("serviceGraphContractRelationRef"):
            del service_graph_obj["serviceGraphContractRelationRef"]

    def update_filter_obj(self, contract_obj, filter_obj, filter_type, contract_display_name=None, update_filter_ref=True):
        """update filter with more information"""
        if update_filter_ref:
            filter_obj["filterRef"] = self.dict_from_ref(filter_obj.get("filterRef"))
        if contract_display_name:
            filter_obj["displayName"] = contract_display_name
        else:
            filter_obj["displayName"] = contract_obj.get("displayName")
        filter_obj["filterType"] = filter_type
        filter_obj["contractScope"] = contract_obj.get("scope")
        filter_obj["contractFilterType"] = contract_obj.get("filterType")
        # Conditional statement 'description == ""' is needed to set empty string.
        if contract_obj.get("description") or contract_obj.get("description") == "":
            filter_obj["description"] = contract_obj.get("description")
        # Conditional statement is needed to determine if "prio" exist in contract object.
        # Same reason as described mso_schema_template_contract_filter.py.
        if contract_obj.get("prio"):
            filter_obj["prio"] = contract_obj.get("prio")

    def query_schema(self, schema):
        schema_id = self.lookup_schema(schema)
        schema_path = "schemas/{0}".format(schema_id)
        schema_obj = self.query_obj(schema_path, displayName=schema)
        if not schema_obj:
            self.module.fail_json(msg="Schema '{0}' is not a valid schema name.".format(schema))
        return schema_id, schema_path, schema_obj

    def query_service_node_types(self):
        node_objs = self.query_objs("schemas/service-node-types", key="serviceNodeTypes")
        if not node_objs:
            self.module.fail_json(msg="Service node types do not exist")
        return node_objs

    def lookup_service_node_device(self, site_id, tenant, device_name=None, service_node_type=None):
        if service_node_type is None:
            node_devices = self.query_objs("sites/{0}/aci/tenants/{1}/devices".format(site_id, tenant), key="devices")
        else:
            node_devices = self.query_objs("sites/{0}/aci/tenants/{1}/devices?deviceType={2}".format(site_id, tenant, service_node_type), key="devices")
        if device_name is not None:
            for device in node_devices:
                if device_name == device.get("name"):
                    return device
            self.module.fail_json(msg="Provided device '{0}' of type '{1}' does not exist.".format(device_name, service_node_type))
        return node_devices

    # Workaround function due to inconsistency in attributes REQUEST/RESPONSE API
    # Fix for MSO Error 400: Bad Request: (0)(0)(0)(0)/deploymentImmediacy error.path.missing
    def find_dicts_with_target_key(self, target_dict, target, replace, result=None):
        if result is None:
            result = []

        for key, value in target_dict.items():
            if key == target:
                result.append(target_dict)
            if isinstance(value, dict):
                self.find_dicts_with_target_key(value, target, replace, result)
            if isinstance(value, list):
                for entry in value:
                    if isinstance(entry, dict):
                        self.find_dicts_with_target_key(entry, target, replace, result)

        return result

    # Workaround function due to inconsistency in attributes REQUEST/RESPONSE API
    # Fix for MSO Error 400: Bad Request: (0)(0)(0)(0)/deploymentImmediacy error.path.missing
    def replace_keys_in_dict(self, target, replace, target_dict=None):
        if target_dict is None:
            target_dict = self.existing

        key_list = self.find_dicts_with_target_key(target_dict, target, replace)
        for item in key_list:
            item[replace] = item.get(target)
            del item[target]

    # Workaround function to remove null/None fields returned by API RESPONSE
    def remove_keys_from_dict_when_value_empty(self, target_dict, modified_target=None):
        if modified_target is None:
            modified_target = deepcopy(target_dict)

        for key, value in target_dict.items():
            if value is None:
                del modified_target[key]
            elif isinstance(value, dict):
                self.remove_keys_from_dict_when_value_empty(value, modified_target[key])
            elif isinstance(value, list):
                for entry_index, entry in enumerate(value):
                    if isinstance(entry, dict):
                        self.remove_keys_from_dict_when_value_empty(entry, modified_target[key][entry_index])

        return modified_target

    def validate_schema(self, schema_id):
        return self.request("schemas/{id}/validate".format(id=schema_id), method="GET")

Anon7 - 2022
AnonSec Team