Server IP : 85.214.239.14 / Your IP : 52.15.73.112 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/dellemc/openmanage/plugins/module_utils/ |
Upload File : |
# -*- coding: utf-8 -*- # Dell EMC OpenManage Ansible Modules # Version 5.0.1 # Copyright (C) 2019-2022 Dell Inc. or its subsidiaries. All Rights Reserved. # Redistribution and use in source and binary forms, with or without modification, # are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE # USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # from __future__ import (absolute_import, division, print_function) __metaclass__ = type import json import os import time from ansible.module_utils.urls import open_url, ConnectionError, SSLValidationError from ansible.module_utils.six.moves.urllib.error import URLError, HTTPError from ansible.module_utils.six.moves.urllib.parse import urlencode ome_auth_params = { "hostname": {"required": True, "type": "str"}, "username": {"required": True, "type": "str"}, "password": {"required": True, "type": "str", "no_log": True}, "port": {"type": "int", "default": 443}, "validate_certs": {"type": "bool", "default": True}, "ca_path": {"type": "path"}, "timeout": {"type": "int", "default": 30}, } SESSION_RESOURCE_COLLECTION = { "SESSION": "SessionService/Sessions", "SESSION_ID": "SessionService/Sessions('{Id}')", } JOB_URI = "JobService/Jobs({job_id})" JOB_SERVICE_URI = "JobService/Jobs" class OpenURLResponse(object): """Handles HTTPResponse""" def __init__(self, resp): self.body = None self.resp = resp if self.resp: self.body = self.resp.read() @property def json_data(self): try: return json.loads(self.body) except ValueError: raise ValueError("Unable to parse json") @property def status_code(self): return self.resp.getcode() @property def success(self): return self.status_code in (200, 201, 202, 204) @property def token_header(self): return self.resp.headers.get('X-Auth-Token') class RestOME(object): """Handles OME API requests""" def __init__(self, module_params=None, req_session=False): self.module_params = module_params self.hostname = self.module_params["hostname"] self.username = self.module_params["username"] self.password = self.module_params["password"] self.port = self.module_params["port"] self.validate_certs = self.module_params.get("validate_certs", True) self.ca_path = self.module_params.get("ca_path") self.timeout = self.module_params.get("timeout", 30) self.req_session = req_session self.session_id = None self.protocol = 'https' self._headers = {'Content-Type': 'application/json', 'Accept': 'application/json'} def _get_base_url(self): """builds base url""" return '{0}://{1}:{2}/api'.format(self.protocol, self.hostname, self.port) def _build_url(self, path, query_param=None): """builds complete url""" url = path base_uri = self._get_base_url() if path: url = '{0}/{1}'.format(base_uri, path) if query_param: """Ome filtering does not work as expected when '+' is passed, urlencode will encode spaces as '+' so replace it to '%20'""" url += "?{0}".format(urlencode(query_param).replace('+', '%20')) return url def _url_common_args_spec(self, method, api_timeout, headers=None): """Creates an argument common spec""" req_header = self._headers if headers: req_header.update(headers) if api_timeout is None: api_timeout = self.timeout if self.ca_path is None: self.ca_path = self._get_omam_ca_env() url_kwargs = { "method": method, "validate_certs": self.validate_certs, "ca_path": self.ca_path, "use_proxy": True, "headers": req_header, "timeout": api_timeout, "follow_redirects": 'all', } return url_kwargs def _args_without_session(self, method, api_timeout, headers=None): """Creates an argument spec in case of basic authentication""" req_header = self._headers if headers: req_header.update(headers) url_kwargs = self._url_common_args_spec(method, api_timeout, headers=headers) url_kwargs["url_username"] = self.username url_kwargs["url_password"] = self.password url_kwargs["force_basic_auth"] = True return url_kwargs def _args_with_session(self, method, api_timeout, headers=None): """Creates an argument spec, in case of authentication with session""" url_kwargs = self._url_common_args_spec(method, api_timeout, headers=headers) url_kwargs["force_basic_auth"] = False return url_kwargs def invoke_request(self, method, path, data=None, query_param=None, headers=None, api_timeout=None, dump=True): """ Sends a request through open_url Returns :class:`OpenURLResponse` object. :arg method: HTTP verb to use for the request :arg path: path to request without query parameter :arg data: (optional) Payload to send with the request :arg query_param: (optional) Dictionary of query parameter to send with request :arg headers: (optional) Dictionary of HTTP Headers to send with the request :arg api_timeout: (optional) How long to wait for the server to send data before giving up :arg dump: (Optional) boolean value for dumping payload data. :returns: OpenURLResponse """ try: if 'X-Auth-Token' in self._headers: url_kwargs = self._args_with_session(method, api_timeout, headers=headers) else: url_kwargs = self._args_without_session(method, api_timeout, headers=headers) if data and dump: data = json.dumps(data) url = self._build_url(path, query_param=query_param) resp = open_url(url, data=data, **url_kwargs) resp_data = OpenURLResponse(resp) except (HTTPError, URLError, SSLValidationError, ConnectionError) as err: raise err return resp_data def __enter__(self): """Creates sessions by passing it to header""" if self.req_session: payload = {'UserName': self.username, 'Password': self.password, 'SessionType': 'API', } path = SESSION_RESOURCE_COLLECTION["SESSION"] resp = self.invoke_request('POST', path, data=payload) if resp and resp.success: self.session_id = resp.json_data.get("Id") self._headers["X-Auth-Token"] = resp.token_header else: msg = "Could not create the session" raise ConnectionError(msg) return self def __exit__(self, exc_type, exc_value, traceback): """Deletes a session id, which is in use for request""" if self.session_id: path = SESSION_RESOURCE_COLLECTION["SESSION_ID"].format(Id=self.session_id) self.invoke_request('DELETE', path) return False def get_all_report_details(self, uri): """ This implementation mainly dependent on '@odata.count' value. Currently first request without query string, always returns total number of available reports in '@odata.count'. """ try: resp = self.invoke_request('GET', uri) data = resp.json_data report_list = data["value"] total_count = data['@odata.count'] remaining_count = total_count - len(report_list) first_page_count = len(report_list) while remaining_count > 0: resp = self.invoke_request('GET', uri, query_param={"$top": first_page_count, "$skip": len(report_list)}) data = resp.json_data value = data["value"] report_list.extend(value) remaining_count = remaining_count - len(value) return {"resp_obj": resp, "report_list": report_list} except (URLError, HTTPError, SSLValidationError, ConnectionError, TypeError, ValueError) as err: raise err def get_job_type_id(self, jobtype_name): """This provides an ID of the job type.""" job_type_id = None resp = self.invoke_request('GET', "JobService/JobTypes") data = resp.json_data["value"] for each in data: if each["Name"] == jobtype_name: job_type_id = each["Id"] break return job_type_id def get_device_id_from_service_tag(self, service_tag): """ :param service_tag: service tag of the device :return: dict Id: int: device id value: dict: device id details not_found_msg: str: message if service tag not found """ device_id = None query = "DeviceServiceTag eq '{0}'".format(service_tag) response = self.invoke_request("GET", "DeviceService/Devices", query_param={"$filter": query}) value = response.json_data.get("value", []) device_info = {} if value: device_info = value[0] device_id = device_info["Id"] return {"Id": device_id, "value": device_info} def get_all_items_with_pagination(self, uri): """ This implementation mainly to get all available items from ome for pagination supported GET uri :param uri: uri which supports pagination :return: dict. """ try: resp = self.invoke_request('GET', uri) data = resp.json_data total_items = data.get("value", []) total_count = data.get('@odata.count', 0) next_link = data.get('@odata.nextLink', '') while next_link: resp = self.invoke_request('GET', next_link.split('/api')[-1]) data = resp.json_data value = data["value"] next_link = data.get('@odata.nextLink', '') total_items.extend(value) return {"total_count": total_count, "value": total_items} except (URLError, HTTPError, SSLValidationError, ConnectionError, TypeError, ValueError) as err: raise err def get_device_type(self): """ Returns device type map where as key is type and value is type name eg: {1000: "SERVER", 2000: "CHASSIS", 4000: "NETWORK_IOM", "8000": "STORAGE_IOM", 3000: "STORAGE"} :return: dict, first item dict gives device type map """ device_map = {} response = self.invoke_request("GET", "DeviceService/DeviceType") if response.json_data.get("value"): device_map = dict([(item["DeviceType"], item["Name"]) for item in response.json_data["value"]]) return device_map def get_job_info(self, job_id): try: job_status_map = { 2020: "Scheduled", 2030: "Queued", 2040: "Starting", 2050: "Running", 2060: "Completed", 2070: "Failed", 2090: "Warning", 2080: "New", 2100: "Aborted", 2101: "Paused", 2102: "Stopped", 2103: "Canceled" } failed_job_status = [2070, 2090, 2100, 2101, 2102, 2103] job_url = JOB_URI.format(job_id=job_id) job_resp = self.invoke_request('GET', job_url) job_dict = job_resp.json_data job_status = job_dict['LastRunStatus']['Id'] if job_status in [2060, 2020]: job_failed = False message = "Job {0} successfully.".format(job_status_map[job_status]) exit_poll = True return exit_poll, job_failed, message elif job_status in failed_job_status: exit_poll = True job_failed = True message = "Job is in {0} state, and is not completed.".format(job_status_map[job_status]) return exit_poll, job_failed, message return False, False, None except HTTPError: job_failed = True message = "Unable to track the job status of {0}.".format(job_id) exit_poll = True return exit_poll, job_failed, message def job_tracking(self, job_id, job_wait_sec=600, sleep_time=60): """ job_id: job id job_wait_sec: Maximum time to wait to fetch the final job details in seconds sleep_time: Maximum time to sleep in seconds in each job details fetch """ max_sleep_time = job_wait_sec sleep_interval = sleep_time while max_sleep_time: if max_sleep_time > sleep_interval: max_sleep_time = max_sleep_time - sleep_interval else: sleep_interval = max_sleep_time max_sleep_time = 0 time.sleep(sleep_interval) exit_poll, job_failed, job_message = self.get_job_info(job_id) if exit_poll is True: return job_failed, job_message return True, "The job is not complete after {0} seconds.".format(job_wait_sec) def strip_substr_dict(self, odata_dict, chkstr='@odata.'): cp = odata_dict.copy() klist = cp.keys() for k in klist: if chkstr in str(k).lower(): odata_dict.pop(k) return odata_dict def job_submission(self, job_name, job_desc, targets, params, job_type, schedule="startnow", state="Enabled"): job_payload = {"JobName": job_name, "JobDescription": job_desc, "Schedule": schedule, "State": state, "Targets": targets, "Params": params, "JobType": job_type} response = self.invoke_request("POST", JOB_SERVICE_URI, data=job_payload) return response def test_network_connection(self, share_address, share_path, share_type, share_user=None, share_password=None, share_domain=None): job_type = {"Id": 56, "Name": "ValidateNWFileShare_Task"} params = [ {"Key": "checkPathOnly", "Value": "false"}, {"Key": "shareType", "Value": share_type}, {"Key": "ShareNetworkFilePath", "Value": share_path}, {"Key": "shareAddress", "Value": share_address}, {"Key": "testShareWriteAccess", "Value": "true"} ] if share_user is not None: params.append({"Key": "UserName", "Value": share_user}) if share_password is not None: params.append({"Key": "Password", "Value": share_password}) if share_domain is not None: params.append({"Key": "domainName", "Value": share_domain}) job_response = self.job_submission("Validate Share", "Validate Share", [], params, job_type) return job_response def check_existing_job_state(self, job_type_name): query_param = {"$filter": "LastRunStatus/Id eq 2030 or LastRunStatus/Id eq 2040 or LastRunStatus/Id eq 2050"} job_resp = self.invoke_request("GET", JOB_SERVICE_URI, query_param=query_param) job_lst = job_resp.json_data["value"] if job_resp.json_data.get("value") is not None else [] for job in job_lst: if job["JobType"]["Name"] == job_type_name: job_allowed = False available_jobs = job break else: job_allowed = True available_jobs = job_lst return job_allowed, available_jobs def _get_omam_ca_env(self): """Check if the value is set in REQUESTS_CA_BUNDLE or CURL_CA_BUNDLE or OMAM_CA_BUNDLE or returns None""" return os.environ.get("REQUESTS_CA_BUNDLE") or os.environ.get("CURL_CA_BUNDLE") or os.environ.get("OMAM_CA_BUNDLE")