Server IP : 85.214.239.14 / Your IP : 3.22.42.25 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/fortinet/fortimanager/plugins/module_utils/ |
Upload File : |
# This code is part of Ansible, but is an independent component. # This particular file snippet, and this file snippet only, is BSD licensed. # Modules you write using this snippet, which is embedded dynamically by Ansible # still belong to the author of the module, and may assign their own license # to the complete work. # # (c) 2020-2021 Fortinet, Inc # All rights reserved. # # Redistribution and use in source and binary forms, with or without modification, # are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE # USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. from __future__ import absolute_import, division, print_function __metaclass__ = type from ansible.module_utils.basic import _load_params import sys import datetime import copy # import yaml from ansible.module_utils.six import raise_from try: import yaml except ImportError as imp_exc: YAML_IMPORT_ERROR = imp_exc else: YAML_IMPORT_ERROR = None def check_galaxy_version(schema): params = _load_params() if not params: return params_keys = list(params.keys()) if "method" in params_keys and "method" not in schema: error_message = "Legacy playbook detected, please revise the playbook or install latest legacy" error_message += " fortimanager galaxy collection: #ansible-galaxy collection install -f fortinet.fortimanager:1.0.5" raise Exception(error_message) def __strip_revision(schema): if type(schema) == dict: if "revision" in schema and type(schema["revision"]) is dict: revisions = schema["revision"] valid_revision = True for key in revisions: value = revisions[key] if type(value) is not bool: valid_revision = False break for token in key.split('.'): try: int_token = int(token) except Exception as e: valid_revision = False break if not valid_revision: break if valid_revision: del schema["revision"] for key in schema: __strip_revision(schema[key]) elif type(schema) == list: for item in schema: __strip_revision(item) def check_parameter_bypass(schema, module_level2_name): schema = copy.deepcopy(schema) __strip_revision(schema) params = _load_params() if not params: return schema if "bypass_validation" in params and params["bypass_validation"] is True: top_level_schema = dict() for key in schema: if key != module_level2_name: top_level_schema[key] = schema[key] elif ( not params[module_level2_name] or type(params[module_level2_name]) is dict ): top_level_schema[module_level2_name] = dict() top_level_schema[module_level2_name]["required"] = False top_level_schema[module_level2_name]["type"] = "dict" elif type(params[module_level2_name]) is list: top_level_schema[module_level2_name] = dict() top_level_schema[module_level2_name]["required"] = False top_level_schema[module_level2_name]["type"] = "list" else: raise Exception( "Value of %s must be a dict or list" % (module_level2_name) ) return top_level_schema return schema class NAPIManager(object): jrpc_urls = None perobject_jrpc_urls = None module_primary_key = None url_params = None module = None conn = None module_name = None module_level2_name = None top_level_schema_name = None def __init__( self, jrpc_urls, perobject_jrpc_urls, module_primary_key, url_params, module, conn, top_level_schema_name=None, ): self.jrpc_urls = jrpc_urls self.perobject_jrpc_urls = perobject_jrpc_urls self.module_primary_key = module_primary_key self.url_params = url_params self.module = module self.conn = conn self.process_workspace_lock() self.module_name = self.module._name self.module_level2_name = self.module_name.split(".")[-1][5:] self.top_level_schema_name = top_level_schema_name self.system_status = self.get_system_status() self.version_check_warnings = list() self._nr_exported_playbooks = 0 self._nr_valid_selectors = 0 if YAML_IMPORT_ERROR: raise_from( Exception("YAML must be installed to use this plugin"), YAML_IMPORT_ERROR, ) def process_workspace_lock(self): self.conn.process_workspace_locking(self.module.params) def _method_proposed(self): return ( "proposed_method" in self.module.params and self.module.params["proposed_method"] ) def _propose_method(self, default_method): if ( "proposed_method" in self.module.params and self.module.params["proposed_method"] ): return self.module.params["proposed_method"] return default_method def _version_matched(self, revisions): if not revisions or not self.system_status: # if system version is not determined, give up version checking return True, None sys_version_value = ( int(self.system_status["Major"]) * 10000 + int(self.system_status["Minor"]) * 100 + int(self.system_status["Patch"]) ) versions = list(revisions.keys()) versions.sort( key=lambda x: int(x.split(".")[0]) * 10000 + int(x.split(".")[1]) * 100 + int(x.split(".")[2]) ) nearest_index = -1 for i in range(len(versions)): version_value = ( int(versions[i].split(".")[0]) * 10000 + int(versions[i].split(".")[1]) * 100 + int(versions[i].split(".")[2]) ) if version_value <= sys_version_value: nearest_index = i if nearest_index == -1: return False, "not supported until in v%s" % (versions[0]) if revisions[versions[nearest_index]] is True: return True, None latest_index = -1 for i in range(nearest_index + 1, len(versions)): if revisions[versions[i]] is True: latest_index = i break earliest_index = nearest_index while earliest_index >= 0: if revisions[versions[earliest_index]] is True: break earliest_index -= 1 earliest_index = 0 if earliest_index < 0 else earliest_index if latest_index == -1: return False, "not supported since v%s" % (versions[earliest_index]) else: return ( False, "not supported since %s, before %s" % (versions[earliest_index], versions[latest_index]), ) def _get_basic_url(self, is_perobject): url_libs = None if is_perobject: url_libs = self.perobject_jrpc_urls else: url_libs = self.jrpc_urls for uparam in self.url_params: if not self.module.params[uparam]: raise AssertionError("param %s MUST NOT be empty" % (uparam)) the_url = None if "adom" in self.url_params and not url_libs[0].endswith("{adom}"): adom = self.module.params["adom"] if adom == "global": for url in url_libs: if "/global/" in url: the_url = url break if not the_url: self.module.fail_json( msg="No global url for the request, please use other adom." ) else: for url in url_libs: if "/adom/{adom}/" in url: the_url = url break if not the_url: self.module.fail_json( msg="No url for the requested adom:%s, please use other adom." % (adom) ) else: the_url = url_libs[0] if not the_url: raise AssertionError("the_url is not expected to be NULL") _param_applied = list() for uparam in self.url_params: token_hint = "/%s/{%s}/" % (uparam, uparam) token = "/%s/%s/" % (uparam, self.module.params[uparam]) if token_hint in the_url: _param_applied.append(uparam) the_url = the_url.replace(token_hint, token) for uparam in self.url_params: if uparam in _param_applied: continue token_hint = "{%s}" % (uparam) token = self.module.params[uparam] the_url = the_url.replace(token_hint, token) return the_url def _get_base_perobject_url(self, mvalue): url_getting = self._get_basic_url(True) if not url_getting.endswith("}"): # in case of non-regular per-object url. return url_getting last_token = url_getting.split("/")[-1] second_last_token = url_getting.split("/")[-2] if last_token != "{" + second_last_token + "}": raise AssertionError("wrong last_token received") return url_getting.replace("{" + second_last_token + "}", str(mvalue)) def get_object(self, mvalue): url_getting = self._get_base_perobject_url(mvalue) params = [{"url": url_getting}] response = self.conn.send_request("get", params) return response def update_object(self, mvalue): url_updating = self._get_base_perobject_url(mvalue) if not self.top_level_schema_name: raise AssertionError("top level schema name MUST NOT be NULL") params = [ { "url": url_updating, self.top_level_schema_name: self.__tailor_attributes( self.module.params[self.module_level2_name] ), } ] response = self.conn.send_request(self._propose_method("update"), params) return response def create_objejct(self): url_creating = self._get_basic_url(False) if not self.top_level_schema_name: raise AssertionError("top level schema name MUST NOT be NULL") params = [ { "url": url_creating, self.top_level_schema_name: self.__tailor_attributes( self.module.params[self.module_level2_name] ), } ] return self.conn.send_request(self._propose_method("set"), params) def delete_object(self, mvalue): url_deleting = self._get_base_perobject_url(mvalue) params = [{"url": url_deleting}] return self.conn.send_request("delete", params) def get_system_status(self): params = [{"url": "/cli/global/system/status"}] response = self.conn.send_request("get", params) if response[0] == 0: if "data" not in response[1]: raise AssertionError() return response[1]["data"] return None def _compare_subnet(self, object_remote, object_present): if type(object_remote) is not list and len(object_remote) != 2: return True tokens = object_present.split("/") if len(tokens) != 2: return True try: subnet_number = int(tokens[1]) if subnet_number < 0 or subnet_number > 32: return True remote_subnet_number = sum( bin(int(x)).count("1") for x in object_remote[1].split(".") ) if object_remote[0] != tokens[0] or remote_subnet_number != subnet_number: return True else: return False except Exception as e: return True return True def _check_object_difference(self, object_remote, object_present): for key in object_present: value = object_present[key] if not value: continue if key not in object_remote or not object_remote[key]: return True value_type = type(value) if value_type is list: return True elif value_type is dict: if type(object_remote[key]) is not dict: return True elif self._check_object_difference(object_remote[key], value): return True else: value_string = str(value) if ( type(object_remote[key]) is not list and str(object_remote[key]) != value_string ): return True elif type(object_remote[key]) is list: if not self._compare_subnet(object_remote[key], value_string): return False elif ( len(object_remote[key]) > 1 or str(object_remote[key][0]) != value_string ): return True return False def _update_required(self, robject): object_status = robject[0] if object_status != 0: return False object_remote = robject[1]["data"] object_present = self.module.params[self.module_level2_name] return self._check_object_difference(object_remote, object_present) def _process_with_mkey(self, mvalue): mobject = self.get_object(mvalue) update_required = self._update_required(mobject) if self._method_proposed(): update_required = True if self.module.params["state"] == "present": if mobject[0] == 0: if update_required: return self.update_object(mvalue) else: self.module.exit_json(message="Object update skipped!") else: return self.create_objejct() elif self.module.params["state"] == "absent": # in case the `GET` method returns nothing... see module `fmgr_antivirus_mmschecksum` # if mobject[0] == 0: return self.delete_object(mvalue) # else: # self.do_nonexist_exit() else: raise AssertionError("Not Reachable") def _process_without_mkey(self): if self.module.params["state"] == "absent": self.module.fail_json( msg="this module doesn't not support state:absent because of no primary key." ) return self.create_objejct() def process_generic(self, method, param): response = self.conn.send_request(method, param) self.do_exit(response) def process_exec(self, argument_specs=None): track = [self.module_level2_name] if ( "bypass_validation" not in self.module.params or self.module.params["bypass_validation"] is False ): self.check_versioning_mismatch( track, argument_specs[self.module_level2_name] if self.module_level2_name in argument_specs else None, self.module.params[self.module_level2_name] if self.module_level2_name in self.module.params else None, ) the_url = self.jrpc_urls[0] if "adom" in self.url_params and not self.jrpc_urls[0].endswith("{adom}"): if self.module.params["adom"] == "global": for _url in self.jrpc_urls: if "/global/" in _url: the_url = _url break else: for _url in self.jrpc_urls: if "/adom/{adom}/" in _url: the_url = _url break for _param in self.url_params: token_hint = "{%s}" % (_param) token = "%s" % (self.module.params[_param]) the_url = the_url.replace(token_hint, token) api_params = [{"url": the_url}] if self.module_level2_name in self.module.params: if not self.top_level_schema_name: raise AssertionError("top level schema name MUST NOT be NULL") api_params[0][self.top_level_schema_name] = self.__tailor_attributes( self.module.params[self.module_level2_name] ) response = self.conn.send_request("exec", api_params) self.do_exit(response) def __extract_renamed_urls(self, urls): _param_set = list() for url in urls: tokens = url.split("/") if len(tokens) < 2: continue token_2 = tokens[-2] token_1 = tokens[-1] if "{%s}" % (token_2) == token_1 and token_2 not in _param_set: _param_set.append(token_2) return _param_set def process_rename(self, metadata): params = self.module.params if params["rename"]["selector"] not in metadata: raise AssertionError( "unknown selector: %s" % (params["rename"]["selector"]) ) selector = params["rename"]["selector"] rename_urls = metadata[selector]["urls"] rename_mkey = metadata[selector]["mkey"] rename_params = metadata[selector]["params"] for _url_param in self.__extract_renamed_urls(rename_urls): if _url_param not in rename_params: rename_params.append(_url_param) rename_revisions = metadata[selector]["revision"] matched, checking_message = self._version_matched(rename_revisions) if not matched: self.version_check_warnings.append( "selector:%s %s" % (selector, checking_message) ) real_params_keys = set() if self.module.params["rename"]["self"]: real_params_keys = set(self.module.params["rename"]["self"].keys()) if real_params_keys != set(rename_params): self.module.fail_json( msg="expect params in self:%s, given params:%s" % (list(rename_params), list(real_params_keys)) ) url = None if "adom" in rename_params and not rename_urls[0].endswith("{adom}"): if params["rename"]["self"]["adom"] == "global": for _url in rename_urls: if "/global/" in _url: url = _url break else: for _url in rename_urls: if "/adom/{adom}/" in _url: url = _url break else: url = rename_urls[0] if not url: self.module.fail_json( msg="can not find url in following sets:%s! please check params: adom" % (rename_urls) ) _param_applied = list() for _param in rename_params: token_hint = "/%s/{%s}" % (_param, _param) token = "/%s/%s" % (_param, params["rename"]["self"][_param]) if token_hint in url: _param_applied.append(_param) url = url.replace(token_hint, token) for _param in rename_params: if _param in _param_applied: continue token_hint = "{%s}" % (_param) token = params["rename"]["self"][_param] url = url.replace(token_hint, token) if rename_mkey and rename_mkey not in params["rename"]["target"]: self.module.fail_json( msg="Must give the primary key/value in target: %s!" % (rename_mkey) ) api_params = [{"url": url, "data": params["rename"]["target"]}] response = self.conn.send_request("update", api_params) self.do_exit(response) def process_clone(self, metadata): if self.module.params["clone"]["selector"] not in metadata: raise AssertionError("selector is expected in parameters") selector = self.module.params["clone"]["selector"] clone_params_schema = metadata[selector]["params"] clone_urls = metadata[selector]["urls"] clone_revisions = metadata[selector]["revision"] matched, checking_message = self._version_matched(clone_revisions) if not matched: self.version_check_warnings.append( "selector:%s %s" % (selector, checking_message) ) real_params_keys = set() if self.module.params["clone"]["self"]: real_params_keys = set(self.module.params["clone"]["self"].keys()) if real_params_keys != set(clone_params_schema): self.module.fail_json( msg="expect params in self:%s, given params:%s" % (list(clone_params_schema), list(real_params_keys)) ) url = None if "adom" in clone_params_schema and not clone_urls[0].endswith("{adom}"): if self.module.params["clone"]["self"]["adom"] == "global": for _url in clone_urls: if "/global/" in _url: url = _url break else: for _url in clone_urls: if "/adom/{adom}/" in _url: url = _url break else: url = clone_urls[0] if not url: self.module.fail_json( msg="can not find url in following sets:%s! please check params: adom" % (clone_urls) ) _param_applied = list() for _param in clone_params_schema: token_hint = "/%s/{%s}" % (_param, _param) token = "/%s/%s" % (_param, self.module.params["clone"]["self"][_param]) if token_hint in url: _param_applied.append(_param) url = url.replace(token_hint, token) for _param in clone_params_schema: if _param in _param_applied: continue token_hint = "{%s}" % (_param) token = self.module.params["clone"]["self"][_param] url = url.replace(token_hint, token) mkey = metadata[selector]["mkey"] if mkey and mkey not in self.module.params["clone"]["target"]: self.module.fail_json( msg="Must give the primary key/value in target: %s!" % (mkey) ) api_params = [{"url": url, "data": self.module.params["clone"]["target"]}] response = self.conn.send_request("clone", api_params) self.do_exit(response) def process_move(self, metadata): if self.module.params["move"]["selector"] not in metadata: raise AssertionError("selector is expected in parameters") selector = self.module.params["move"]["selector"] move_params = metadata[selector]["params"] move_urls = metadata[selector]["urls"] move_revisions = metadata[selector]["revision"] matched, checking_message = self._version_matched(move_revisions) if not matched: self.version_check_warnings.append( "selector:%s %s" % (selector, checking_message) ) if not len(move_urls): raise AssertionError("unexpected move urls set") real_params_keys = set() if self.module.params["move"]["self"]: real_params_keys = set(self.module.params["move"]["self"].keys()) if real_params_keys != set(move_params): self.module.fail_json( msg="expect params in self:%s, given params:%s" % (list(move_params), list(real_params_keys)) ) url = None if "adom" in move_params and not move_urls[0].endswith("{adom}"): if self.module.params["move"]["self"]["adom"] == "global": for _url in move_urls: if "/global/" in _url: url = _url break else: for _url in move_urls: if "/adom/{adom}/" in _url: url = _url break else: url = move_urls[0] if not url: self.module.fail_json( msg="can not find url in following sets:%s! please check params: adom" % (move_urls) ) _param_applied = list() for _param in move_params: token_hint = "/%s/{%s}" % (_param, _param) token = "/%s/%s" % (_param, self.module.params["move"]["self"][_param]) if token_hint in url: _param_applied.append(_param) url = url.replace(token_hint, token) for _param in move_params: if _param in _param_applied: continue token_hint = "{%s}" % (_param) token = self.module.params["move"]["self"][_param] url = url.replace(token_hint, token) api_params = [ { "url": url, "option": self.module.params["move"]["action"], "target": self.module.params["move"]["target"], } ] response = self.conn.send_request("move", api_params) self.do_exit(response) def __fix_remote_object_internal(self, robject, module_schema, log): if type(robject) is not dict: return True need_bypass = False keys_to_delete = list() for key in robject: value = robject[key] # keys are internal in FMG devices. if key not in module_schema: keys_to_delete.append(key) continue # key is found attr_schema = module_schema[key] attr_type = attr_schema["type"] if attr_type in ["str", "int"]: # Do immediate fix. if type(value) is list: if len(value) == 1: robject[key] = value[0] log.write("\tfix list-to-atomic key:%s\n" % (key)) else: need_bypass = True elif type(value) is dict: need_bypass = True if not value or value == "null": log.write("\tdelete empty key:%s\n" % (key)) keys_to_delete.append(key) elif attr_type == "dict": if "options" in attr_schema and type(value) is dict: need_bypass |= self.__fix_remote_object_internal( value, attr_schema["options"], log ) else: need_bypass = True if not value or value == "null": log.write("\tdelete empty key:%s\n" % (key)) keys_to_delete.append(key) elif attr_type == "list": if "options" in attr_schema and type(value) is list: for sub_value in value: need_bypass |= self.__fix_remote_object_internal( sub_value, attr_schema["options"], log ) else: need_bypass = True if ( type(value) is list and not len(value) or value == "null" or not value ): log.write("\tdelete empty key:%s\n" % (key)) keys_to_delete.append(key) else: raise AssertionError("Unexpected attributetype.") for key in keys_to_delete: log.write("\tdelete unrecognized key:%s\n" % (key)) del robject[key] return need_bypass def __append_whiteblank_per_line(self, blob, num_of_blank): ret = " " * num_of_blank ret += blob.replace("\n", "\n%s" % (" " * num_of_blank)) return ret def _generate_playbook( self, counter, export_path, selector, robject, state_present, need_bypass, url_params, params_schema, log, ): prefix_text = """- name: Exported Playbook hosts: fortimanager00 connection: httpapi collections: - fortinet.fortimanager vars: ansible_httpapi_use_ssl: True ansible_httpapi_validate_certs: False ansible_httpapi_port: 443 tasks: """ with open("%s/%s_%s.yml" % (export_path, selector, counter), "w") as f: f.write(prefix_text) f.write(" - name: exported config for %s\n" % (selector)) f.write(" fmgr_%s:\n" % (selector)) if need_bypass: f.write(" bypass_validation: true\n") if state_present: f.write(" state: present\n") for url_param_key in params_schema: if url_param_key not in url_params: continue url_param_value = url_params[url_param_key] f.write(" %s: %s\n" % (url_param_key, url_param_value)) f.write(" %s:\n" % (selector)) f.write(self.__append_whiteblank_per_line(yaml.dump(robject), 8)) log.write( "\texported playbook: %s/%s_%s.yml\n" % (export_path, selector, counter) ) self._nr_exported_playbooks += 1 def _process_export_response( self, selector, response, schema_invt, log, export_path, url_params, params_schema, ): response_code = response[0] response_data = response[1] if response_code != 0 or "data" not in response_data: log.write("\tno configuration data found\n") return if selector not in schema_invt: log.write("\trequested object has no corresponding ansible module\n") return state_present = schema_invt[selector]["stated"] module_schema = schema_invt[selector]["options"] remote_objects = response_data["data"] counter = 0 if type(remote_objects) is list: for remote_object in remote_objects: need_bypass = self.__fix_remote_object_internal( remote_object, module_schema, log ) self._generate_playbook( counter, export_path, selector, remote_object, state_present, need_bypass, url_params, params_schema, log, ) counter += 1 elif type(remote_objects) is dict: need_bypass = self.__fix_remote_object_internal( remote_objects, module_schema, log ) self._generate_playbook( counter, export_path, selector, remote_objects, state_present, need_bypass, url_params, params_schema, log, ) counter += 1 if not counter: self._nr_valid_selectors += 1 def _process_export_per_selector( self, selector, schema, param, log, export_path, process, schema_invt ): # make urls from schema and parameters provided. url = None export_urls = schema["urls"] if "adom" in param and not export_urls[0].endswith("{adom}"): if param["adom"] == "global": for _url in export_urls: if "/global/" in _url: url = _url break else: for _url in export_urls: if "/adom/{adom}/" in _url: url = _url break if not url: url = export_urls[0] _param_applied = list() for _param_key in param: _param_value = param[_param_key] if _param_key == "adom" and _param_value.lower() == "global": continue token_hint = "/%s/{%s}" % (_param_key, _param_key) token = "/%s/%s" % (_param_key, _param_value) if token_hint in url: _param_applied.append(_param_key) url = url.replace(token_hint, token) for _param_key in param: if _param_key in _param_applied: continue if _param_key == "adom" and _param_value.lower() == "global": continue token_hint = "{%s}" % (_param_key) token = param[_param_key] url = url.replace(token_hint, token) tokens = url.split("/") if tokens[-1].startswith("{") and tokens[-1].endswith("}"): new_url = "" for token in tokens[:-1]: new_url += "/%s" % (token) new_url = new_url.replace("//", "/") url = new_url unresolved_parameter = False tokens = url.split("/") for token in tokens: if token.startswith("{") and token.endswith("}"): unresolved_parameter = True break log.write("[%s]exporting: %s\n" % (process, selector)) log.write("\turl: %s\n" % (url)) if unresolved_parameter: log.write("\t unknown parameter, skipped!\n") return response = self.conn.send_request("get", [{"url": url}]) self._process_export_response( selector, response, schema_invt, log, export_path, param, schema["params"] ) def process_export(self, metadata): from ansible_collections.fortinet.fortimanager.plugins.module_utils.exported_schema import ( schemas as exported_schema_inventory, ) export_selectors = self.module.params["export_playbooks"]["selector"] export_path = "./" if ( "path" in self.module.params["export_playbooks"] and self.module.params["export_playbooks"]["path"] ): export_path = self.module.params["export_playbooks"]["path"] log = open("%s/export.log" % (export_path), "w") log.write("Export time: %s\n" % (str(datetime.datetime.now()))) # Check required parameter. for selector in export_selectors: if selector == "all": continue export_meta = metadata[selector] export_meta_param = export_meta["params"] export_meta_urls = export_meta["urls"] if ( not self.module.params["export_playbooks"]["params"] or selector not in self.module.params["export_playbooks"]["params"] ): self.module.fail_json( "parameter export_playbooks->params needs entry:%s" % (selector) ) if not len(export_meta_urls): raise AssertionError("Invalid schema.") # extracted required parameter. url_tokens = export_meta_urls[0].split("/") required_params = list() for _param in export_meta_param: if "{%s}" % (_param) == url_tokens[-1]: continue required_params.append(_param) for _param in required_params: if ( _param not in self.module.params["export_playbooks"]["params"][selector] ): self.module.fail_json( "required parameters for selector %s: %s" % (selector, required_params) ) # Check required parameter for selector: all if "all" in export_selectors: if ( "all" not in self.module.params["export_playbooks"]["params"] or "adom" not in self.module.params["export_playbooks"]["params"]["all"] ): self.module.fail_json( "required parameters for selector %s: %s" % ("all", ["adom"]) ) # process specific selector and 'all' selectors_to_process = dict() for selector in export_selectors: if selector == "all": continue selectors_to_process[selector] = ( metadata[selector], self.module.params["export_playbooks"]["params"][selector], ) if "all" in export_selectors: for selector in metadata: chosen = True if not len(metadata[selector]["urls"]): raise AssertionError("Invalid Schema.") url_tokens = metadata[selector]["urls"][0].split("/") for _param in metadata[selector]["params"]: if _param == "adom": continue elif "{%s}" % (_param) != url_tokens[-1]: chosen = False break if not chosen or selector in selectors_to_process: continue selectors_to_process[selector] = ( metadata[selector], self.module.params["export_playbooks"]["params"]["all"], ) process_counter = 1 number_selectors = len(selectors_to_process) for selector in selectors_to_process: self._process_export_per_selector( selector, selectors_to_process[selector][0], selectors_to_process[selector][1], log, export_path, "%s/%s" % (process_counter, number_selectors), exported_schema_inventory, ) process_counter += 1 self.module.exit_json( number_of_selectors=number_selectors, number_of_valid_selectors=self._nr_valid_selectors, number_of_exported_playbooks=self._nr_exported_playbooks, system_infomation=self.system_status, ) def process_fact(self, metadata): if self.module.params["facts"]["selector"] not in metadata: raise AssertionError("selector is not expected in parameters") selector = self.module.params["facts"]["selector"] fact_params = metadata[selector]["params"] fact_urls = metadata[selector]["urls"] fact_revisions = metadata[selector]["revision"] matched, checking_message = self._version_matched(fact_revisions) if not matched: self.version_check_warnings.append( "selector:%s %s" % (selector, checking_message) ) if not len(fact_urls): raise AssertionError("unexpected fact urls set") real_params_keys = set() if self.module.params["facts"]["params"]: real_params_keys = set(self.module.params["facts"]["params"].keys()) if real_params_keys != set(fact_params): self.module.fail_json( msg="expect params:%s, given params:%s" % (list(fact_params), list(real_params_keys)) ) url = None if "adom" in fact_params and not fact_urls[0].endswith("{adom}"): if self.module.params["facts"]["params"]["adom"] == "global": for _url in fact_urls: if "/global/" in _url: url = _url break elif ( self.module.params["facts"]["params"]["adom"] != "" and self.module.params["facts"]["params"]["adom"] is not None ): for _url in fact_urls: if "/adom/{adom}/" in _url: url = _url # url = _url.replace('/adom/{adom}/', '/adom/%s/' % (self.module.params['facts']['params']['adom'])) break else: # choose default URL which is for all domains for _url in fact_urls: if "/global/" not in _url and "/adom/{adom}/" not in _url: url = _url break else: url = fact_urls[0] if not url: self.module.fail_json( msg="can not find url in following sets:%s! please check params: adom" % (fact_urls) ) _param_applied = list() for _param in fact_params: _the_param = self.module.params["facts"]["params"][_param] if self.module.params["facts"]["params"][_param] is None: _the_param = "" token_hint = "/%s/{%s}" % (_param, _param) token = "/%s/%s" % (_param, _the_param) if token_hint in url: _param_applied.append(_param) url = url.replace(token_hint, token) for _param in fact_params: if _param in _param_applied: continue token_hint = "{%s}" % (_param) token = ( self.module.params["facts"]["params"][_param] if self.module.params["facts"]["params"][_param] else "" ) url = url.replace(token_hint, token) # Other Filters and Sorters filters = self.module.params["facts"]["filter"] sortings = self.module.params["facts"]["sortings"] fields = self.module.params["facts"]["fields"] options = self.module.params["facts"]["option"] api_params = [{"url": url}] if filters: api_params[0]["filter"] = filters if sortings: api_params[0]["sortings"] = sortings if fields: api_params[0]["fields"] = fields if options: api_params[0]["option"] = options # Now issue the request. response = self.conn.send_request("get", api_params) self.do_exit(response) def process_curd(self, argument_specs=None): if "state" not in self.module.params: raise AssertionError("parameter state is expected") track = [self.module_level2_name] if ( "bypass_validation" not in self.module.params or self.module.params["bypass_validation"] is False ): self.check_versioning_mismatch( track, argument_specs[self.module_level2_name] if self.module_level2_name in argument_specs else None, self.module.params[self.module_level2_name] if self.module_level2_name in self.module.params else None, ) has_mkey = ( self.module_primary_key is not None and type(self.module.params[self.module_level2_name]) is dict ) if has_mkey: mvalue = "" if self.module_primary_key.startswith("complex:"): mvalue_exec_string = self.module_primary_key[len("complex:"):] mvalue_exec_string = mvalue_exec_string.replace( "{{module}}", "self.module.params[self.module_level2_name]" ) # mvalue_exec_string = 'mvalue = %s' % (mvalue_exec_string) # exec(mvalue_exec_string) # On Windows Platform, exec() call doesn't take effect. mvalue = eval(mvalue_exec_string) else: mvalue = self.module.params[self.module_level2_name][ self.module_primary_key ] self.do_exit(self._process_with_mkey(mvalue)) else: self.do_exit(self._process_without_mkey()) def __tailor_attributes(self, data): if type(data) == dict: rdata = dict() for key in data: value = data[key] if value is None: continue rdata[key] = self.__tailor_attributes(value) return rdata elif type(data) == list: rdata = list() for item in data: if item is None: continue rdata.append(self.__tailor_attributes(item)) return rdata else: if data is None: raise AssertionError("data is expected to be not none") return data def process_partial_curd(self, argument_specs=None): track = [self.module_level2_name] if ( "bypass_validation" not in self.module.params or self.module.params["bypass_validation"] is False ): self.check_versioning_mismatch( track, argument_specs[self.module_level2_name] if self.module_level2_name in argument_specs else None, self.module.params[self.module_level2_name] if self.module_level2_name in self.module.params else None, ) the_url = self.jrpc_urls[0] if "adom" in self.url_params and not self.jrpc_urls[0].endswith("{adom}"): if self.module.params["adom"] == "global": for _url in self.jrpc_urls: if "/global/" in _url: the_url = _url break else: for _url in self.jrpc_urls: if "/adom/{adom}/" in _url: the_url = _url break for _param in self.url_params: token_hint = "{%s}" % (_param) token = "%s" % (self.module.params[_param]) the_url = the_url.replace(token_hint, token) the_url = the_url.rstrip("/") api_params = [{"url": the_url}] if self.module_level2_name in self.module.params: if not self.top_level_schema_name: raise AssertionError("top level schem name is not supposed to be empty") api_params[0][self.top_level_schema_name] = self.__tailor_attributes( self.module.params[self.module_level2_name] ) response = self.conn.send_request(self._propose_method("set"), api_params) self.do_exit(response) def check_versioning_mismatch(self, track, schema, params): if not params or not schema: return param_type = schema["type"] if "type" in schema else None revisions = schema["revision"] if "revision" in schema else None matched, checking_message = self._version_matched(revisions) if not matched: param_path = track[0] for _param in track[1:]: param_path += "-->%s" % (_param) self.version_check_warnings.append( "param: %s %s" % (param_path, checking_message) ) if param_type == "dict" and "options" in schema: if type(params) is not dict: raise AssertionError() for sub_param_key in params: sub_param = params[sub_param_key] if sub_param_key in schema["options"]: sub_schema = schema["options"][sub_param_key] track.append(sub_param_key) self.check_versioning_mismatch(track, sub_schema, sub_param) del track[-1] elif param_type == "list" and "options" in schema: if type(params) is not list: raise AssertionError() for grouped_param in params: if type(grouped_param) is not dict: raise AssertionError() for sub_param_key in grouped_param: sub_param = grouped_param[sub_param_key] if sub_param_key in schema["options"]: sub_schema = schema["options"][sub_param_key] track.append(sub_param_key) self.check_versioning_mismatch(track, sub_schema, sub_param) del track[-1] def validate_parameters(self, pvb): for blob in pvb: attribute_path = blob["attribute_path"] pointer = self.module.params ignored = False for attr in attribute_path: if attr not in pointer: # If the parameter is not given, ignore that. ignored = True break pointer = pointer[attr] if ignored: continue lambda_expr = blob["lambda"] lambda_expr = lambda_expr.replace("$", str(pointer)) eval_result = eval(lambda_expr) if not eval_result: if "fail_action" not in blob or blob["fail_action"] == "warn": self.module.warn(blob["hint_message"]) else: # assert blob['fail_action'] == 'quit': self.module.fail_json(msg=blob["hint_message"]) def _do_final_exit(self, rc, result): # XXX: as with https://github.com/fortinet/ansible-fortimanager-generic. # the failing conditions priority: failed_when > rc_failed > rc_succeeded. failed = rc != 0 changed = rc == 0 if "response_code" not in result: raise AssertionError("response_code should be in result") if self.module.params["rc_failed"]: for rc_code in self.module.params["rc_failed"]: if str(result["response_code"]) == str(rc_code): failed = True result[ "result_code_overriding" ] = "rc code:%s is overridden to failure" % (rc_code) elif self.module.params["rc_succeeded"]: for rc_code in self.module.params["rc_succeeded"]: if str(result["response_code"]) == str(rc_code): failed = False result[ "result_code_overriding" ] = "rc code:%s is overridden to success" % (rc_code) if self.system_status: result["system_information"] = self.system_status if len(self.version_check_warnings): version_check_warning = dict() version_check_warning["mismatches"] = self.version_check_warnings if not self.system_status: raise AssertionError() version_check_warning["system_version"] = "v%s.%s.%s" % ( self.system_status["Major"], self.system_status["Minor"], self.system_status["Patch"], ) self.module.warn( "Ansible has detected version mismatch between FortiManager and your playbook, see more details by appending option -vvv" ) self.module.exit_json( rc=rc, meta=result, version_check_warning=version_check_warning, failed=failed, changed=changed, ) else: self.module.exit_json(rc=rc, meta=result, failed=failed, changed=changed) def do_nonexist_exit(self): rc = 0 result = dict() result["response_code"] = -3 result["response_message"] = "object not exist" self._do_final_exit(rc, result) def do_exit(self, response): rc = response[0] result = dict() result["response_data"] = list() if "data" in response[1]: result["response_data"] = response[1]["data"] result["response_code"] = response[1]["status"]["code"] result["response_message"] = response[1]["status"]["message"] result["request_url"] = response[1]["url"] # Fix for fmgr_sys_hitcount if response[1]["url"] == "/sys/hitcount": if isinstance(result["response_data"], list) and len(result["response_data"]) == 0: result["response_data"] = dict() if "taskid" in response[1] and isinstance(result["response_data"], dict) \ and "task" not in result["response_data"]: result["response_data"]["task"] = response[1]["taskid"] # XXX:Do further status mapping self._do_final_exit(rc, result)