Server IP : 85.214.239.14 / Your IP : 3.141.202.161 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/self/root/lib/python3/dist-packages/ansible_collections/cisco/ise/plugins/plugin_utils/ |
Upload File : |
#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (c) 2021, Cisco Systems # GNU General Public License v3.0+ (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import absolute_import, division, print_function __metaclass__ = type try: from ciscoisesdk import api, exceptions except ImportError: ISE_SDK_IS_INSTALLED = False else: ISE_SDK_IS_INSTALLED = True from ansible.module_utils.basic import AnsibleModule, env_fallback try: from ansible.errors import AnsibleActionFail except ImportError: ANSIBLE_ERRORS_INSTALLED = False else: ANSIBLE_ERRORS_INSTALLED = True try: import logging except ImportError: LOGGING_IN_STANDARD = False else: LOGGING_IN_STANDARD = True def is_list_complex(x): return isinstance(x[0], dict) or isinstance(x[0], list) def has_diff_elem(ls1, ls2): return any((elem not in ls1 for elem in ls2)) def compare_list(list1, list2): len_list1 = len(list1) len_list2 = len(list2) if len_list1 != len_list2: return False if len_list1 == 0: return True attempt_std_cmp = list1 == list2 if attempt_std_cmp: return True if not is_list_complex(list1) and not is_list_complex(list2): return set(list1) == set(list2) # Compare normally if it exceeds expected size * 2 (len_list1==len_list2) MAX_SIZE_CMP = 100 # Fail fast if elem not in list, thanks to any and generators if len_list1 > MAX_SIZE_CMP: return attempt_std_cmp else: # not changes 'has diff elem' to list1 != list2 ':lists are not equal' has_diff_1 = has_diff_elem(list1, list2) has_diff_2 = has_diff_elem(list2, list1) return not has_diff_1 or not has_diff_2 def fn_comp_key(k, dict1, dict2): return ise_compare_equality(dict1.get(k), dict2.get(k)) def ise_compare_equality(current_value, requested_value): if requested_value is None: return True if current_value is None: return True if isinstance(current_value, dict) and isinstance(requested_value, dict): all_dict_params = list(current_value.keys()) + list(requested_value.keys()) return not any((not fn_comp_key(param, current_value, requested_value) for param in all_dict_params)) elif isinstance(current_value, list) and isinstance(requested_value, list): return compare_list(current_value, requested_value) else: return current_value == requested_value def fn_comp_key2(k, dict1, dict2): return ise_compare_equality2(dict1.get(k), dict2.get(k)) def ise_compare_equality2(current_value, requested_value, is_query_param=False): if is_query_param: return True if requested_value is None and current_value is None: return True if requested_value is None: return False if current_value is None: return False if isinstance(current_value, dict) and isinstance(requested_value, dict): all_dict_params = list(current_value.keys()) + list(requested_value.keys()) return not any((not fn_comp_key2(param, current_value, requested_value) for param in all_dict_params)) elif isinstance(current_value, list) and isinstance(requested_value, list): return compare_list(current_value, requested_value) else: return current_value == requested_value def get_dict_result(result, key, value): if isinstance(result, list): if len(result) == 1: if isinstance(result[0], dict): result = result[0] if result.get(key) is not None and result.get(key) != value: result = None else: result = None else: for item in result: if isinstance(item, dict) and (item.get(key) is None or item.get(key) == value): result = item return result result = None elif not isinstance(result, dict): result = None elif result.get(key) is not None and result.get(key) != value: result = None return result def ise_argument_spec(): argument_spec = dict( ise_hostname=dict(type="str", fallback=(env_fallback, ['ISE_HOSTNAME']), required=True), ise_username=dict(type="str", fallback=(env_fallback, ['ISE_USERNAME']), required=True), ise_password=dict(type="str", fallback=(env_fallback, ['ISE_PASSWORD']), required=True, no_log=True), ise_verify=dict(type="bool", default=True, fallback=(env_fallback, ['ISE_VERIFY'])), ise_version=dict(type="str", default="3.1_Patch_1", fallback=(env_fallback, ['ISE_VERSION'])), ise_wait_on_rate_limit=dict(type="bool", default=True, fallback=(env_fallback, ['ISE_WAIT_ON_RATE_LIMIT'])), ise_uses_api_gateway=dict(type="bool", default=True, fallback=(env_fallback, ['ISE_USES_API_GATEWAY'])), ise_uses_csrf_token=dict(type="bool", default=False, fallback=(env_fallback, ['ISE_USES_CSRF_TOKEN'])), ise_debug=dict(type="bool", default=False, fallback=(env_fallback, ['ISE_DEBUG'])), ) return argument_spec def get_ise_url(hostname, port=None): url_result = "https://{hostname}".format(hostname=hostname) if port: url_result = url_result + ":{port}".format(port=port) return url_result class ISESDK(object): def __init__(self, params): self.result = dict(changed=False, result="") if ISE_SDK_IS_INSTALLED: ise_uses_api_gateway = params.get("ise_uses_api_gateway") ui_base_url = None ers_base_url = None mnt_base_url = None px_grid_base_url = None if not ise_uses_api_gateway: ui_base_url = get_ise_url(params.get("ise_hostname"), port="443") ers_base_url = get_ise_url(params.get("ise_hostname"), port="9060") mnt_base_url = get_ise_url(params.get("ise_hostname"), port="443") px_grid_base_url = get_ise_url(params.get("ise_hostname"), port="8910") self.api = api.IdentityServicesEngineAPI( username=params.get("ise_username"), password=params.get("ise_password"), base_url=get_ise_url(params.get("ise_hostname"), port=None), ui_base_url=ui_base_url, ers_base_url=ers_base_url, mnt_base_url=mnt_base_url, px_grid_base_url=px_grid_base_url, verify=params.get("ise_verify"), version=params.get("ise_version"), wait_on_rate_limit=params.get("ise_wait_on_rate_limit"), uses_api_gateway=ise_uses_api_gateway, uses_csrf_token=params.get("ise_uses_csrf_token"), debug=params.get("ise_debug"), ) if params.get("ise_debug") and LOGGING_IN_STANDARD: logging.getLogger('ciscoisesdk').addHandler(logging.StreamHandler()) else: self.fail_json(msg="Cisco ISE Python SDK is not installed. Execute 'pip install ciscoisesdk'") def changed(self): self.result["changed"] = True def object_created(self): self.changed() self.result["result"] = "Object created" def object_updated(self): self.changed() self.result["result"] = "Object updated" def object_deleted(self): self.changed() self.result["result"] = "Object deleted" def object_already_absent(self): self.result["result"] = "Object already absent" def object_already_present(self): self.result["result"] = "Object already present" def object_present_and_different(self): self.result["result"] = "Object already present, but it has different values to the requested" def object_modify_result(self, changed=None, result=None): if result is not None: self.result["result"] = result if changed: self.changed() def exec(self, family, function, params=None, handle_func_exception=True): try: family = getattr(self.api, family) func = getattr(family, function) except Exception as e: self.fail_json( msg=( "An error occured when retrieving operation." " The error was: {error}" ).format(error=e) ) try: if params: response = func(**params) else: response = func() except exceptions.ciscoisesdkException as e: if handle_func_exception: self.fail_json( msg=( "An error occured when executing operation." " The error was: {error}" ).format(error=e) ) else: raise e return response def fail_json(self, msg, **kwargs): self.result.update(**kwargs) raise AnsibleActionFail(msg, kwargs) def exit_json(self): return self.result def main(): pass if __name__ == "__main__": main()