Server IP : 85.214.239.14 / Your IP : 18.217.10.152 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/ibm/qradar/plugins/module_utils/ |
Upload File : |
#!/usr/bin/env python # -*- coding: utf-8 -*- # (c) 2019, Adam Miller (admiller@redhat.com) # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import absolute_import, division, print_function __metaclass__ = type from ansible.module_utils.urls import CertificateError from ansible.module_utils.six.moves.urllib.parse import quote_plus from ansible.module_utils.connection import ConnectionError from ansible.module_utils.connection import Connection from ansible.module_utils._text import to_text from ansible.module_utils.six import iteritems from copy import copy import json BASE_HEADERS = {"Content-Type": "application/json", "Version": "9.1"} def find_dict_in_list(some_list, key, value): text_type = False try: to_text(value) text_type = True except TypeError: pass for some_dict in some_list: if key in some_dict: if text_type: if to_text(some_dict[key]).strip() == to_text(value).strip(): return some_dict, some_list.index(some_dict) else: if some_dict[key] == value: return some_dict, some_list.index(some_dict) return None def set_offense_values(module, qradar_request): if module.params["closing_reason"]: found_closing_reason = qradar_request.get_by_path( "api/siem/offense_closing_reasons?filter={0}".format( quote_plus( 'text="{0}"'.format(module.params["closing_reason"]) ) ) ) if found_closing_reason: module.params["closing_reason_id"] = found_closing_reason[0]["id"] else: module.fail_json( "Unable to find closing_reason text: {0}".format( module.params["closing_reason"] ) ) if module.params["status"]: module.params["status"] = module.params["status"].upper() def remove_unsupported_keys_from_payload_dict(payload, supported_key_list): """The fn to remove the unsupported keys from payload :param payload: Payload from response :param supported_key_list: Module supported params :rtype: A dict :returns: payload with only supported and module expected params """ temp_payload = [] for each in payload: temp_dict = copy(each) if isinstance(each, dict): for every_key in each.keys(): if every_key not in supported_key_list: temp_dict.pop(every_key) temp_payload.append(temp_dict) if temp_payload: return temp_payload return payload def list_to_dict(input_dict): """The fn to convert list of dict to dict :param input_dict: input list having list of dict :rtype: A dict :returns: dict with converted all list to dict type """ if isinstance(input_dict, dict): for k, v in iteritems(input_dict): if isinstance(v, dict): list_to_dict(v) elif isinstance(v, list): temp_dict = {} for each in v: if isinstance(each, dict): if each.get("id") or each.get("id") == 0: each.pop("id") each_key_values = "_".join( [str(x) for x in each.values()] ) temp_dict.update({each_key_values: each}) input_dict[k] = temp_dict class QRadarRequest(object): def __init__( self, module=None, connection=None, headers=None, not_rest_data_keys=None, task_vars=None, ): self.module = module if module: # This will be removed, once all of the available modules # are moved to use action plugin design, as otherwise test # would start to complain without the implementation. self.connection = Connection(self.module._socket_path) elif connection: self.connection = connection try: self.connection.load_platform_plugins("ibm.qradar.qradar") self.connection.set_options(var_options=task_vars) except ConnectionError: raise # This allows us to exclude specific argspec keys from being included by # the rest data that don't follow the qradar_* naming convention if not_rest_data_keys: self.not_rest_data_keys = not_rest_data_keys else: self.not_rest_data_keys = [] self.not_rest_data_keys.append("validate_certs") self.headers = headers if headers else BASE_HEADERS def _httpapi_error_handle(self, method, uri, payload=None): # FIXME - make use of handle_httperror(self, exception) where applicable # https://docs.ansible.com/ansible/latest/network/dev_guide/developing_plugins_network.html#developing-plugins-httpapi code = 99999 response = {} try: code, response = self.connection.send_request( method, uri, payload=payload, headers=self.headers ) except ConnectionError as e: self.module.fail_json( msg="connection error occurred: {0}".format(e) ) except CertificateError as e: self.module.fail_json( msg="certificate error occurred: {0}".format(e) ) except ValueError as e: try: self.module.fail_json( msg="certificate not found: {0}".format(e) ) except AttributeError: pass if code == 404: if ( to_text("Object not found") in to_text(response) or to_text("Could not find object") in to_text(response) or to_text("No offense was found") in to_text(response) ): return {} if to_text("The rule does not exist.") in to_text( response["description"] ): return code, {} if code == 409: if "code" in response: if response["code"] in [1002, 1004]: # https://www.ibm.com/support/knowledgecenter/SS42VS_7.3.1/com.ibm.qradar.doc/9.2--staged_config-deploy_status-POST.html # Documentation says we should get 1002, but I'm getting 1004 from QRadar return response else: self.module.fail_json( msg="qradar httpapi returned error {0} with message {1}".format( code, response ) ) elif not (code >= 200 and code < 300): try: self.module.fail_json( msg="qradar httpapi returned error {0} with message {1}".format( code, response ) ) except AttributeError: pass return code, response def get(self, url, **kwargs): return self._httpapi_error_handle("GET", url, **kwargs) def put(self, url, **kwargs): return self._httpapi_error_handle("PUT", url, **kwargs) def post(self, url, **kwargs): return self._httpapi_error_handle("POST", url, **kwargs) def patch(self, url, **kwargs): return self._httpapi_error_handle("PATCH", url, **kwargs) def delete(self, url, **kwargs): return self._httpapi_error_handle("DELETE", url, **kwargs) def get_data(self): """ Get the valid fields that should be passed to the REST API as urlencoded data so long as the argument specification to the module follows the convention: - the key to the argspec item does not start with qradar_ - the key does not exist in the not_data_keys list """ try: qradar_data = {} for param in self.module.params: if (self.module.params[param]) is not None and ( param not in self.not_rest_data_keys ): qradar_data[param] = self.module.params[param] return qradar_data except TypeError as e: self.module.fail_json( msg="invalid data type provided: {0}".format(e) ) def post_by_path(self, rest_path, data=None): """ POST with data to path """ if data is None: data = json.dumps(self.get_data()) elif data is False: # Because for some reason some QRadar REST API endpoint use the # query string to modify state return self.post("/{0}".format(rest_path)) return self.post("/{0}".format(rest_path), payload=data) def create_update(self, rest_path, data=None): """ Create or Update a file/directory monitor data input in qradar """ if data is None: data = json.dumps(self.get_data()) # return self.post("/{0}".format(rest_path), payload=data) return self.patch("/{0}".format(rest_path), payload=data) # PATCH