Server IP : 85.214.239.14 / Your IP : 18.116.10.48 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/2/root/usr/lib/python3/dist-packages/ansible_collections/cisco/mso/plugins/httpapi/ |
Upload File : |
# Copyright: (c) 2020, Lionel Hercot (@lhercot) <lhercot@cisco.com> # Copyright: (c) 2020, Cindy Zhao (@cizhao) <cizhao@cisco.com> # # GNU General Public License v3.0+ (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import absolute_import, division, print_function __metaclass__ = type DOCUMENTATION = """ --- name: mso short_description: MSO Ansible HTTPAPI Plugin. description: - This MSO plugin provides the HTTPAPI transport methods needed to initiate a connection to MSO, send API requests and process the response. version_added: "1.2.0" """ import json import re import pickle # import ipaddress import traceback from ansible.module_utils.six import PY3 from ansible.module_utils._text import to_text from ansible.module_utils.connection import ConnectionError from ansible.plugins.httpapi import HttpApiBase class HttpApi(HttpApiBase): def __init__(self, *args, **kwargs): super(HttpApi, self).__init__(*args, **kwargs) self.platform = "cisco.mso" self.headers = {"Content-Type": "application/json"} self.params = {} self.auth = None self.backup_hosts = None self.host_counter = 0 self.error = None self.method = "GET" self.path = "" self.status = -1 self.info = {} def get_platform(self): return self.platform def set_params(self, params): self.params = params def set_backup_hosts(self): try: list_of_hosts = re.sub(r"[[\]]", "", self.connection.get_option("host")).split(",") # ipaddress.ip_address(list_of_hosts[0]) return list_of_hosts except Exception: return [] def login(self, username, password): """Log in to MSO""" # Perform login request self.connection.queue_message("vvvv", "Starting Login to {0}".format(self.connection.get_option("host"))) method = "POST" path = "/mso/api/v1/auth/login" full_path = self.connection.get_option("host") + path if (self.params.get("login_domain") is not None) and (self.params.get("login_domain") != "Local"): domain_id = self._get_login_domain_id(self.params.get("login_domain")) payload = {"username": self.connection.get_option("remote_user"), "password": self.connection.get_option("password"), "domainId": domain_id} else: payload = {"username": self.connection.get_option("remote_user"), "password": self.connection.get_option("password")} # Override the global username/password with the ones specified per task if self.params.get("username") is not None: payload["username"] = self.params.get("username") if self.params.get("password") is not None: payload["password"] = self.params.get("password") data = json.dumps(payload) try: self.connection.queue_message("vvvv", "login() - connection.send({0}, {1}, {2}, {3})".format(path, data, method, self.headers)) response, response_data = self.connection.send(path, data, method=method, headers=self.headers) # Handle MSO response self.status = response.getcode() if self.status != 201: self.connection.queue_message("vvvv", "login status incorrect status={0}".format(self.status)) json_response = self._response_to_json(response_data) self.error = dict(code=self.status, message="Authentication failed: {0}".format(json_response)) raise ConnectionError(json.dumps(self._verify_response(response, method, full_path, response_data))) self.connection._auth = {"Authorization": "Bearer {0}".format(self._response_to_json(response_data).get("token"))} except ConnectionError: self.connection.queue_message("vvvv", "login() - ConnectionError Exception") raise except Exception as e: self.connection.queue_message("vvvv", "login() - Generic Exception") self.error = dict(code=self.status, message="Authentication failed: Request failed: {0}".format(e)) raise ConnectionError(json.dumps(self._verify_response(None, method, full_path, None))) def logout(self): method = "DELETE" path = "/mso/api/v1/auth/logout" try: response, response_data = self.connection.send(path, {}, method=method, headers=self.headers) except Exception as e: self.error = dict(code=self.status, message="Error on attempt to logout from MSO. {0}".format(e)) raise ConnectionError(json.dumps(self._verify_response(None, method, self.connection.get_option("host") + path, None))) self.connection._auth = None def send_request(self, method, path, data=None): """This method handles all MSO REST API requests other than login""" self.error = None self.path = "" self.status = -1 self.info = {} self.method = "GET" if data is None: data = {} self.connection.queue_message("vvvv", "send_request method called") # # Case1: List of hosts is provided # self.backup_hosts = self.set_backup_hosts() # if not self.backup_hosts: if self.connection._connected is True and self.params.get("host") != self.connection.get_option("host"): self.connection._connected = False self.connection.queue_message( "vvvv", "send_request reseting connection as host has changed from {0} to {1}".format(self.connection.get_option("host"), self.params.get("host")), ) if self.params.get("host") is not None: self.connection.set_option("host", self.params.get("host")) else: try: with open("my_hosts.pk", "rb") as fi: self.host_counter = pickle.load(fi) except FileNotFoundError: pass try: self.connection.set_option("host", self.backup_hosts[self.host_counter]) except (IndexError, TypeError): pass if self.params.get("port") is not None: self.connection.set_option("port", self.params.get("port")) if self.params.get("username") is not None: self.connection.set_option("remote_user", self.params.get("username")) if self.params.get("password") is not None: self.connection.set_option("password", self.params.get("password")) if self.params.get("use_proxy") is not None: self.connection.set_option("use_proxy", self.params.get("use_proxy")) if self.params.get("use_ssl") is not None: self.connection.set_option("use_ssl", self.params.get("use_ssl")) if self.params.get("validate_certs") is not None: self.connection.set_option("validate_certs", self.params.get("validate_certs")) # Perform some very basic path input validation. path = str(path) if path[0] != "/": self.error = dict(code=self.status, message="Value of <path> does not appear to be formated properly") raise ConnectionError(json.dumps(self._verify_response(None, method, path, None))) full_path = self.connection.get_option("host") + path try: self.connection.queue_message("vvvv", "send_request() - connection.send({0}, {1}, {2}, {3})".format(path, data, method, self.headers)) response, rdata = self.connection.send(path, data, method=method, headers=self.headers) except ConnectionError: self.connection.queue_message("vvvv", "login() - ConnectionError Exception") raise except Exception as e: self.connection.queue_message("vvvv", "send_request() - Generic Exception") if self.error is None: self.error = dict(code=self.status, message="MSO HTTPAPI send_request() Exception: {0} - {1}".format(e, traceback.format_exc())) raise ConnectionError(json.dumps(self._verify_response(None, method, full_path, None))) return self._verify_response(response, method, full_path, rdata) def handle_error(self): self.host_counter += 1 if self.host_counter == len(self.backup_hosts): raise ConnectionError("No hosts left in cluster to continue operation") with open("my_hosts.pk", "wb") as host_file: pickle.dump(self.host_counter, host_file) try: self.connection.set_option("host", self.backup_hosts[self.host_counter]) except IndexError: pass self.login(self.connection.get_option("remote_user"), self.connection.get_option("password")) return True def _verify_response(self, response, method, path, data): """Process the return code and response object from MSO""" response_data = None response_code = -1 self.info.update(dict(url=path)) if data is not None: response_data = self._response_to_json(data) if response is not None: response_code = response.getcode() path = response.geturl() self.info.update(self._get_formated_info(response)) # Handle possible MSO error information if response_code not in [200, 201, 202, 204]: self.error = dict(code=self.status, message=response_data) self.info["method"] = method if self.error is not None: self.info["error"] = self.error self.info["body"] = response_data return self.info def _response_to_json(self, response_data): """Convert response_data to json format""" try: response_value = response_data.getvalue() except Exception: response_value = response_data response_text = to_text(response_value) try: return json.loads(response_text) if response_text else {} # JSONDecodeError only available on Python 3.5+ except Exception as e: # Expose RAW output for troubleshooting self.error = dict(code=-1, message="Unable to parse output as JSON, see 'raw' output. {0}".format(e)) self.info["raw"] = response_text return def _get_login_domain_id(self, domain_name): """Get a domain and return its id""" if domain_name is None: return None method = "GET" path = "/mso/api/v1/auth/login-domains" full_path = self.connection.get_option("host") + path # TODO: Replace response by - response, data = self.connection.send(path, None, method=method, headers=self.headers) if data is not None: response_data = self._response_to_json(data) domains = response_data.get("domains") if domains is not None: for domain in domains: if domain.get("name") == domain_name: if "id" in domain: return domain.get("id") else: self.error = dict(code=-1, message="Login domain lookup failed for domain '{0}': {1}".format(domain_name, domain)) raise ConnectionError(json.dumps(self._verify_response(None, method, full_path, None))) self.error = dict(code=-1, message="Login domain '{0}' is not a valid domain name.".format(domain_name)) raise ConnectionError(json.dumps(self._verify_response(None, method, full_path, None))) else: self.error = dict(code=-1, message="Key 'domains' missing from data") raise ConnectionError(json.dumps(self._verify_response(None, method, full_path, None))) def _get_formated_info(self, response): """The code in this function is based out of Ansible fetch_url code at https://github.com/ansible/ansible/blob/devel/lib/ansible/module_utils/urls.py""" info = dict(msg="OK (%s bytes)" % response.headers.get("Content-Length", "unknown"), url=response.geturl(), status=response.getcode()) # Lowercase keys, to conform to py2 behavior, so that py3 and py2 are predictable info.update(dict((k.lower(), v) for k, v in response.info().items())) # Don't be lossy, append header values for duplicate headers # In Py2 there is nothing that needs done, py2 does this for us if PY3: temp_headers = {} for name, value in response.headers.items(): # The same as above, lower case keys to match py2 behavior, and create more consistent results name = name.lower() if name in temp_headers: temp_headers[name] = ", ".join((temp_headers[name], value)) else: temp_headers[name] = value info.update(temp_headers) return info