Server IP : 85.214.239.14 / Your IP : 18.223.170.103 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /usr/lib/python3/dist-packages/ansible_collections/cisco/dnac/plugins/module_utils/ |
Upload File : |
#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (c) 2021, Cisco Systems # GNU General Public License v3.0+ (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import (absolute_import, division, print_function) __metaclass__ = type try: from dnacentersdk import api, exceptions except ImportError: DNAC_SDK_IS_INSTALLED = False else: DNAC_SDK_IS_INSTALLED = True from ansible.module_utils._text import to_native from ansible.module_utils.common import validation try: import logging except ImportError: LOGGING_IN_STANDARD = False else: LOGGING_IN_STANDARD = True import os.path import datetime import inspect def log(msg): with open('dnac.log', 'a') as of: callerframerecord = inspect.stack()[1] frame = callerframerecord[0] info = inspect.getframeinfo(frame) d = datetime.datetime.now().replace(microsecond=0).isoformat() of.write("---- %s ---- %s@%s ---- %s \n" % (d, info.lineno, info.function, msg)) def is_list_complex(x): return isinstance(x[0], dict) or isinstance(x[0], list) def has_diff_elem(ls1, ls2): return any((elem not in ls1 for elem in ls2)) def compare_list(list1, list2): len_list1 = len(list1) len_list2 = len(list2) if len_list1 != len_list2: return False if len_list1 == 0: return True attempt_std_cmp = list1 == list2 if attempt_std_cmp: return True if not is_list_complex(list1) and not is_list_complex(list2): return set(list1) == set(list2) # Compare normally if it exceeds expected size * 2 (len_list1==len_list2) MAX_SIZE_CMP = 100 # Fail fast if elem not in list, thanks to any and generators if len_list1 > MAX_SIZE_CMP: return attempt_std_cmp else: # not changes 'has diff elem' to list1 != list2 ':lists are not equal' return not (has_diff_elem(list1, list2)) or not (has_diff_elem(list2, list1)) def fn_comp_key(k, dict1, dict2): return dnac_compare_equality(dict1.get(k), dict2.get(k)) def dnac_compare_equality(current_value, requested_value): # print("dnac_compare_equality", current_value, requested_value) if requested_value is None: return True if current_value is None: return True if isinstance(current_value, dict) and isinstance(requested_value, dict): all_dict_params = list(current_value.keys()) + list(requested_value.keys()) return not any((not fn_comp_key(param, current_value, requested_value) for param in all_dict_params)) elif isinstance(current_value, list) and isinstance(requested_value, list): return compare_list(current_value, requested_value) else: return current_value == requested_value def simple_cmp(obj1, obj2): return obj1 == obj2 def get_dict_result(result, key, value, cmp_fn=simple_cmp): if isinstance(result, list): if len(result) == 1: if isinstance(result[0], dict): result = result[0] if result.get(key) is not None and result.get(key) != value: result = None else: result = None else: for item in result: if isinstance(item, dict) and (item.get(key) is None or item.get(key) == value): result = item return result result = None elif not isinstance(result, dict): result = None elif result.get(key) is not None and result.get(key) != value: result = None return result def dnac_argument_spec(): argument_spec = dict( dnac_host=dict(type="str", required=True), dnac_port=dict(type="int", required=False, default=443), dnac_username=dict(type="str", default="admin", aliases=["user"]), dnac_password=dict(type="str", no_log=True), dnac_verify=dict(type="bool", default=True), dnac_version=dict(type="str", default="2.2.3.3"), dnac_debug=dict(type="bool", default=False), validate_response_schema=dict(type="bool", default=True), ) return argument_spec def validate_list_of_dicts(param_list, spec, module=None): """Validate/Normalize playbook params. Will raise when invalid parameters found. param_list: a playbook parameter list of dicts spec: an argument spec dict e.g. spec = dict(ip=dict(required=True, type='bool'), foo=dict(type='str', default='bar')) return: list of normalized input data """ v = validation normalized = [] invalid_params = [] for list_entry in param_list: valid_params_dict = {} for param in spec: item = list_entry.get(param) log(str(item)) if item is None: if spec[param].get("required"): invalid_params.append( "{0} : Required parameter not found".format(param) ) else: item = spec[param].get("default") else: type = spec[param].get("type") if type == "str": item = v.check_type_str(item) if spec[param].get("length_max"): if 1 <= len(item) <= spec[param].get("length_max"): pass else: invalid_params.append( "{0}:{1} : The string exceeds the allowed " "range of max {2} char".format( param, item, spec[param].get("length_max") ) ) elif type == "int": item = v.check_type_int(item) min_value = 1 if spec[param].get("range_min") is not None: min_value = spec[param].get("range_min") if spec[param].get("range_max"): if min_value <= item <= spec[param].get("range_max"): pass else: invalid_params.append( "{0}:{1} : The item exceeds the allowed " "range of max {2}".format( param, item, spec[param].get("range_max") ) ) elif type == "bool": item = v.check_type_bool(item) elif type == "list": item = v.check_type_list(item) elif type == "dict": item = v.check_type_dict(item) choice = spec[param].get("choices") if choice: if item not in choice: invalid_params.append( "{0} : Invalid choice provided".format(item) ) no_log = spec[param].get("no_log") if no_log: if module is not None: module.no_log_values.add(item) else: msg = "\n\n'{0}' is a no_log parameter".format(param) msg += "\nAnsible module object must be passed to this " msg += "\nfunction to ensure it is not logged\n\n" raise Exception(msg) valid_params_dict[param] = item normalized.append(valid_params_dict) return normalized, invalid_params class DNACSDK(object): def __init__(self, params): self.result = dict(changed=False, result="") self.validate_response_schema = params.get("validate_response_schema") if DNAC_SDK_IS_INSTALLED: self.api = api.DNACenterAPI( username=params.get("dnac_username"), password=params.get("dnac_password"), base_url="https://{dnac_host}:{dnac_port}".format( dnac_host=params.get("dnac_host"), dnac_port=params.get("dnac_port") ), version=params.get("dnac_version"), verify=params.get("dnac_verify"), debug=params.get("dnac_debug"), ) if params.get("dnac_debug") and LOGGING_IN_STANDARD: logging.getLogger('dnacentersdk').addHandler(logging.StreamHandler()) else: self.fail_json(msg="DNA Center Python SDK is not installed. Execute 'pip install dnacentersdk'") def changed(self): self.result["changed"] = True def object_created(self): self.changed() self.result["result"] = "Object created" def object_updated(self): self.changed() self.result["result"] = "Object updated" def object_deleted(self): self.changed() self.result["result"] = "Object deleted" def object_already_absent(self): self.result["result"] = "Object already absent" def object_already_present(self): self.result["result"] = "Object already present" def object_present_and_different(self): self.result["result"] = "Object already present, but it has different values to the requested" def object_modify_result(self, changed=None, result=None): if result is not None: self.result["result"] = result if changed: self.changed() def is_file(self, file_path): return os.path.isfile(file_path) def extract_file_name(self, file_path): return os.path.basename(file_path) def _exec(self, family, function, params=None, op_modifies=False, **kwargs): try: family = getattr(self.api, family) func = getattr(family, function) except Exception as e: self.fail_json(msg=e) try: if params: file_paths_params = kwargs.get('file_paths', []) # This substitution is for the import file operation if file_paths_params and isinstance(file_paths_params, list): multipart_fields = {} for (key, value) in file_paths_params: if isinstance(params.get(key), str) and self.is_file(params[key]): file_name = self.extract_file_name(params[key]) file_path = params[key] multipart_fields[value] = (file_name, open(file_path, 'rb')) params.setdefault("multipart_fields", multipart_fields) params.setdefault("multipart_monitor_callback", None) if not self.validate_response_schema and op_modifies: params["active_validation"] = False response = func(**params) else: response = func() except exceptions.dnacentersdkException as e: self.fail_json( msg=( "An error occured when executing operation." " The error was: {error}" ).format(error=to_native(e)) ) return response def fail_json(self, msg, **kwargs): self.result.update(**kwargs) raise Exception(msg) def exit_json(self): return self.result def main(): pass if __name__ == "__main__": main()