Server IP : 85.214.239.14 / Your IP : 3.15.211.252 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/community/dns/plugins/module_utils/ |
Upload File : |
# -*- coding: utf-8 -*- # # Copyright (c) 2017-2020 Felix Fontein # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import (absolute_import, division, print_function) __metaclass__ = type from ansible.module_utils.common.text.converters import to_native from ansible.module_utils.six import string_types try: import lxml.etree HAS_LXML_ETREE = True except ImportError: HAS_LXML_ETREE = False from ansible_collections.community.dns.plugins.module_utils.http import ( NetworkError, ) class WSDLException(Exception): pass class WSDLNetworkError(WSDLException): pass class WSDLError(WSDLException): def __init__(self, origin, error_code, message): super(WSDLError, self).__init__('{0} ({1}): {2}'.format(origin, error_code, message)) self.error_origin = origin self.error_code = error_code self.error_message = message class WSDLCodingException(WSDLException): pass def _split_text_namespace(node, text): i = text.find(':') if i < 0: return text, None ns = node.nsmap.get(text[:i]) text = text[i + 1:] return text, ns _NAMESPACE_ENVELOPE = 'http://schemas.xmlsoap.org/soap/envelope/' _NAMESPACE_XSI = 'http://www.w3.org/2001/XMLSchema-instance' _NAMESPACE_XSD = 'http://www.w3.org/2001/XMLSchema' _NAMESPACE_XML_SOAP = 'http://xml.apache.org/xml-soap' _NAMESPACE_XML_SOAP_ENCODING = 'http://schemas.xmlsoap.org/soap/encoding/' def _set_type(node, type_value, namespace=None): if namespace is not None: type_value = lxml.etree.QName(namespace, type_value) node.set(lxml.etree.QName(_NAMESPACE_XSI, 'type').text, type_value) def encode_wsdl(node, value): if value is None: node.set(lxml.etree.QName(_NAMESPACE_XSI, 'nil').text, 'true') elif isinstance(value, string_types): _set_type(node, 'xsd:string') node.text = value elif isinstance(value, int): _set_type(node, 'xsd:int') node.text = str(value) elif isinstance(value, bool): _set_type(node, 'xsd:boolean') node.text = ('true' if value else 'false') elif isinstance(value, dict): _set_type(node, 'Map', _NAMESPACE_XML_SOAP) for key, val in sorted(value.items()): child = lxml.etree.Element('item') ke = lxml.etree.Element('key') encode_wsdl(ke, key) child.append(ke) ve = lxml.etree.Element('value') encode_wsdl(ve, val) child.append(ve) node.append(child) elif isinstance(value, list): _set_type(node, 'SOAP-ENC:Array') for elt in value: child = lxml.etree.Element('item') encode_wsdl(child, elt) node.append(child) else: raise WSDLCodingException('Do not know how to encode {0}!'.format(type(value))) def _decode_wsdl_array(result, node, root_ns, ids): for item in node: if item.tag != 'item': raise WSDLCodingException('Invalid child tag "{0}" in map!'.format(item.tag)) result.append(decode_wsdl(item, root_ns, ids)) def decode_wsdl(node, root_ns, ids): href = node.get('href') nil = node.get(lxml.etree.QName(_NAMESPACE_XSI, 'nil')) id = node.get('id') if href is not None: if not href.startswith('#'): raise WSDLCodingException('Global reference "{0}" not supported!'.format(href)) href = href[1:] if href not in ids: raise WSDLCodingException('ID "{0}" not yet defined!'.format(href)) result = ids[href] elif nil == 'true': result = None else: type_with_ns = node.get(lxml.etree.QName(_NAMESPACE_XSI, 'type')) if type_with_ns is None: raise WSDLCodingException('Element "{0}" has no "xsi:type" tag!'.format(node)) type, ns = _split_text_namespace(node, type_with_ns) if ns is None: raise WSDLCodingException('Cannot find namespace for "{0}"!'.format(type_with_ns)) if ns == _NAMESPACE_XSD: if type == 'boolean': if node.text == 'true': result = True elif node.text == 'false': result = False else: raise WSDLCodingException('Invalid value for boolean: "{0}"'.format(node.text)) elif type == 'int': result = int(node.text) elif type == 'string': result = node.text else: raise WSDLCodingException('Unknown XSD type "{0}"!'.format(type)) elif ns == _NAMESPACE_XML_SOAP: if type == 'Map': result = dict() if id is not None: ids[id] = result for item in node: if item.tag != 'item': raise WSDLCodingException('Invalid child tag "{0}" in map!'.format(item.tag)) key = item.find('key') if key is None: raise WSDLCodingException('Cannot find key for "{0}"!'.format(item)) key = decode_wsdl(key, root_ns, ids) value = item.find('value') if value is None: raise WSDLCodingException('Cannot find value for "{0}"!'.format(item)) value = decode_wsdl(value, root_ns, ids) result[key] = value return result else: raise WSDLCodingException('Unknown XSD type "{0}"!'.format(type)) elif ns == _NAMESPACE_XML_SOAP_ENCODING: if type == 'Array': result = [] if id is not None: ids[id] = result _decode_wsdl_array(result, node, root_ns, ids) else: raise WSDLCodingException('Unknown XSD type "{0}"!'.format(type)) elif ns == root_ns: array_type = node.get(lxml.etree.QName(_NAMESPACE_XML_SOAP_ENCODING, 'arrayType')) if array_type is not None: result = [] if id is not None: ids[id] = result _decode_wsdl_array(result, node, root_ns, ids) else: result = dict() if id is not None: ids[id] = result for item in node: result[item.tag] = decode_wsdl(item, root_ns, ids) else: raise WSDLCodingException('Unknown type namespace "{0}" (with type "{1}")!'.format(ns, type)) if id is not None: ids[id] = result return result class Parser(object): def _parse(self, result, node, where): for child in node: tag = lxml.etree.QName(child.tag) if tag.namespace != self._api: raise WSDLCodingException('Cannot interpret {0} item of type "{1}"!'.format(where, tag)) for res in child.iter('return'): result[tag.localname] = decode_wsdl(res, self._api, {}) def __init__(self, api, root): self._main_ns = _NAMESPACE_ENVELOPE self._api = api self._root = root for fault in self._root.iter(lxml.etree.QName(self._main_ns, 'Fault').text): fault_code = fault.find('faultcode') fault_code_val = None fault_string = fault.find('faultstring') origin = 'server' if fault_code is not None and fault_code.text: code, code_ns = _split_text_namespace(fault, fault_code.text) fault_code_val = code if code_ns == self._main_ns: origin = code.lower() if fault_string is not None and fault_string.text: raise WSDLError(origin, fault_code_val, fault_string.text) raise WSDLError(origin, fault_code_val, lxml.etree.tostring(fault).decode('utf-8')) self._header = dict() self._body = dict() for header in self._root.iter(lxml.etree.QName(self._main_ns, 'Header').text): self._parse(self._header, header, 'header') for body in self._root.iter(lxml.etree.QName(self._main_ns, 'Body').text): self._parse(self._body, body, 'body') def get_header(self, header): return self._header[header] def get_result(self, body): return self._body[body] def __str__(self): return 'header={0}, body={1}'.format(self._header, self._body) def __repr__(self): return '''<?xml version='1.0' encoding='utf-8'?>''' + '\n' + lxml.etree.tostring(self._root, pretty_print=True).decode('utf-8') class Composer(object): @staticmethod def _create(tag, namespace=None, **kwarg): if namespace: return lxml.etree.Element(lxml.etree.QName(namespace, tag), **kwarg) else: return lxml.etree.Element(tag, **kwarg) def __str__(self): return '''<?xml version='1.0' encoding='utf-8'?>''' + '\n' + lxml.etree.tostring(self._root, pretty_print=True).decode('utf-8') def _create_envelope(self, tag, **kwarg): return self._create(tag, self._main_ns, **kwarg) def __init__(self, http_helper, api, namespaces=None): self._http_helper = http_helper self._main_ns = _NAMESPACE_ENVELOPE self._api = api # Compose basic document all_namespaces = { 'SOAP-ENV': _NAMESPACE_ENVELOPE, 'xsd': _NAMESPACE_XSD, 'xsi': _NAMESPACE_XSI, 'ns2': 'auth', 'SOAP-ENC': _NAMESPACE_XML_SOAP_ENCODING, } if namespaces is not None: all_namespaces.update(namespaces) self._root = self._create_envelope('Envelope', nsmap=all_namespaces) self._root.set(lxml.etree.QName(self._main_ns, 'encodingStyle').text, _NAMESPACE_XML_SOAP_ENCODING) self._header = self._create_envelope('Header') self._root.append(self._header) self._body = self._create_envelope('Body') self._root.append(self._body) self._command = None def add_auth(self, username, password): auth = self._create('authenticate', 'auth') user = self._create('UserName') user.text = username auth.append(user) pw = self._create('Password') pw.text = password auth.append(pw) self._header.append(auth) def add_simple_command(self, command, **args): self._command = command command = self._create(command, self._api) for arg, value in args.items(): arg = self._create(arg) encode_wsdl(arg, value) command.append(arg) self._body.append(command) def execute(self, debug=False): payload = b'''<?xml version='1.0' encoding='utf-8'?>''' + b'\n' + lxml.etree.tostring(self._root) + b'\n' try: headers = { 'Content-Type': 'text/xml; charset=utf-8', 'Content-Length': str(len(payload)), } if self._command: headers['SOAPAction'] = '"{0}#{1}"'.format(self._api, self._command) result, info = self._http_helper.fetch_url(self._api, data=payload, method='POST', timeout=300, headers=headers) code = info['status'] except NetworkError as e: raise WSDLNetworkError(to_native(e)) # if debug: # q.q('Result: {0}, content: {1}'.format(code, result.decode('utf-8'))) if code < 200 or code >= 300: Parser(self._api, lxml.etree.fromstring(result)) raise WSDLError('server', 'Error {0} while executing WSDL command:\n{1}'.format(code, result.decode('utf-8'))) return Parser(self._api, lxml.etree.fromstring(result))