Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.15.211.252
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/community/dns/plugins/module_utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/community/dns/plugins/module_utils/wsdl.py
# -*- coding: utf-8 -*-
#
# Copyright (c) 2017-2020 Felix Fontein
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type


from ansible.module_utils.common.text.converters import to_native
from ansible.module_utils.six import string_types

try:
    import lxml.etree
    HAS_LXML_ETREE = True
except ImportError:
    HAS_LXML_ETREE = False

from ansible_collections.community.dns.plugins.module_utils.http import (
    NetworkError,
)


class WSDLException(Exception):
    pass


class WSDLNetworkError(WSDLException):
    pass


class WSDLError(WSDLException):
    def __init__(self, origin, error_code, message):
        super(WSDLError, self).__init__('{0} ({1}): {2}'.format(origin, error_code, message))
        self.error_origin = origin
        self.error_code = error_code
        self.error_message = message


class WSDLCodingException(WSDLException):
    pass


def _split_text_namespace(node, text):
    i = text.find(':')
    if i < 0:
        return text, None
    ns = node.nsmap.get(text[:i])
    text = text[i + 1:]
    return text, ns


_NAMESPACE_ENVELOPE = 'http://schemas.xmlsoap.org/soap/envelope/'
_NAMESPACE_XSI = 'http://www.w3.org/2001/XMLSchema-instance'
_NAMESPACE_XSD = 'http://www.w3.org/2001/XMLSchema'
_NAMESPACE_XML_SOAP = 'http://xml.apache.org/xml-soap'
_NAMESPACE_XML_SOAP_ENCODING = 'http://schemas.xmlsoap.org/soap/encoding/'


def _set_type(node, type_value, namespace=None):
    if namespace is not None:
        type_value = lxml.etree.QName(namespace, type_value)
    node.set(lxml.etree.QName(_NAMESPACE_XSI, 'type').text, type_value)


def encode_wsdl(node, value):
    if value is None:
        node.set(lxml.etree.QName(_NAMESPACE_XSI, 'nil').text, 'true')
    elif isinstance(value, string_types):
        _set_type(node, 'xsd:string')
        node.text = value
    elif isinstance(value, int):
        _set_type(node, 'xsd:int')
        node.text = str(value)
    elif isinstance(value, bool):
        _set_type(node, 'xsd:boolean')
        node.text = ('true' if value else 'false')
    elif isinstance(value, dict):
        _set_type(node, 'Map', _NAMESPACE_XML_SOAP)
        for key, val in sorted(value.items()):
            child = lxml.etree.Element('item')
            ke = lxml.etree.Element('key')
            encode_wsdl(ke, key)
            child.append(ke)
            ve = lxml.etree.Element('value')
            encode_wsdl(ve, val)
            child.append(ve)
            node.append(child)
    elif isinstance(value, list):
        _set_type(node, 'SOAP-ENC:Array')
        for elt in value:
            child = lxml.etree.Element('item')
            encode_wsdl(child, elt)
            node.append(child)
    else:
        raise WSDLCodingException('Do not know how to encode {0}!'.format(type(value)))


def _decode_wsdl_array(result, node, root_ns, ids):
    for item in node:
        if item.tag != 'item':
            raise WSDLCodingException('Invalid child tag "{0}" in map!'.format(item.tag))
        result.append(decode_wsdl(item, root_ns, ids))


def decode_wsdl(node, root_ns, ids):
    href = node.get('href')
    nil = node.get(lxml.etree.QName(_NAMESPACE_XSI, 'nil'))
    id = node.get('id')
    if href is not None:
        if not href.startswith('#'):
            raise WSDLCodingException('Global reference "{0}" not supported!'.format(href))
        href = href[1:]
        if href not in ids:
            raise WSDLCodingException('ID "{0}" not yet defined!'.format(href))
        result = ids[href]
    elif nil == 'true':
        result = None
    else:
        type_with_ns = node.get(lxml.etree.QName(_NAMESPACE_XSI, 'type'))
        if type_with_ns is None:
            raise WSDLCodingException('Element "{0}" has no "xsi:type" tag!'.format(node))
        type, ns = _split_text_namespace(node, type_with_ns)
        if ns is None:
            raise WSDLCodingException('Cannot find namespace for "{0}"!'.format(type_with_ns))
        if ns == _NAMESPACE_XSD:
            if type == 'boolean':
                if node.text == 'true':
                    result = True
                elif node.text == 'false':
                    result = False
                else:
                    raise WSDLCodingException('Invalid value for boolean: "{0}"'.format(node.text))
            elif type == 'int':
                result = int(node.text)
            elif type == 'string':
                result = node.text
            else:
                raise WSDLCodingException('Unknown XSD type "{0}"!'.format(type))
        elif ns == _NAMESPACE_XML_SOAP:
            if type == 'Map':
                result = dict()
                if id is not None:
                    ids[id] = result
                for item in node:
                    if item.tag != 'item':
                        raise WSDLCodingException('Invalid child tag "{0}" in map!'.format(item.tag))
                    key = item.find('key')
                    if key is None:
                        raise WSDLCodingException('Cannot find key for "{0}"!'.format(item))
                    key = decode_wsdl(key, root_ns, ids)
                    value = item.find('value')
                    if value is None:
                        raise WSDLCodingException('Cannot find value for "{0}"!'.format(item))
                    value = decode_wsdl(value, root_ns, ids)
                    result[key] = value
                return result
            else:
                raise WSDLCodingException('Unknown XSD type "{0}"!'.format(type))
        elif ns == _NAMESPACE_XML_SOAP_ENCODING:
            if type == 'Array':
                result = []
                if id is not None:
                    ids[id] = result
                _decode_wsdl_array(result, node, root_ns, ids)
            else:
                raise WSDLCodingException('Unknown XSD type "{0}"!'.format(type))
        elif ns == root_ns:
            array_type = node.get(lxml.etree.QName(_NAMESPACE_XML_SOAP_ENCODING, 'arrayType'))
            if array_type is not None:
                result = []
                if id is not None:
                    ids[id] = result
                _decode_wsdl_array(result, node, root_ns, ids)
            else:
                result = dict()
                if id is not None:
                    ids[id] = result
                for item in node:
                    result[item.tag] = decode_wsdl(item, root_ns, ids)
        else:
            raise WSDLCodingException('Unknown type namespace "{0}" (with type "{1}")!'.format(ns, type))
    if id is not None:
        ids[id] = result
    return result


class Parser(object):
    def _parse(self, result, node, where):
        for child in node:
            tag = lxml.etree.QName(child.tag)
            if tag.namespace != self._api:
                raise WSDLCodingException('Cannot interpret {0} item of type "{1}"!'.format(where, tag))
            for res in child.iter('return'):
                result[tag.localname] = decode_wsdl(res, self._api, {})

    def __init__(self, api, root):
        self._main_ns = _NAMESPACE_ENVELOPE
        self._api = api
        self._root = root
        for fault in self._root.iter(lxml.etree.QName(self._main_ns, 'Fault').text):
            fault_code = fault.find('faultcode')
            fault_code_val = None
            fault_string = fault.find('faultstring')
            origin = 'server'
            if fault_code is not None and fault_code.text:
                code, code_ns = _split_text_namespace(fault, fault_code.text)
                fault_code_val = code
                if code_ns == self._main_ns:
                    origin = code.lower()
            if fault_string is not None and fault_string.text:
                raise WSDLError(origin, fault_code_val, fault_string.text)
            raise WSDLError(origin, fault_code_val, lxml.etree.tostring(fault).decode('utf-8'))
        self._header = dict()
        self._body = dict()
        for header in self._root.iter(lxml.etree.QName(self._main_ns, 'Header').text):
            self._parse(self._header, header, 'header')
        for body in self._root.iter(lxml.etree.QName(self._main_ns, 'Body').text):
            self._parse(self._body, body, 'body')

    def get_header(self, header):
        return self._header[header]

    def get_result(self, body):
        return self._body[body]

    def __str__(self):
        return 'header={0}, body={1}'.format(self._header, self._body)

    def __repr__(self):
        return '''<?xml version='1.0' encoding='utf-8'?>''' + '\n' + lxml.etree.tostring(self._root, pretty_print=True).decode('utf-8')


class Composer(object):
    @staticmethod
    def _create(tag, namespace=None, **kwarg):
        if namespace:
            return lxml.etree.Element(lxml.etree.QName(namespace, tag), **kwarg)
        else:
            return lxml.etree.Element(tag, **kwarg)

    def __str__(self):
        return '''<?xml version='1.0' encoding='utf-8'?>''' + '\n' + lxml.etree.tostring(self._root, pretty_print=True).decode('utf-8')

    def _create_envelope(self, tag, **kwarg):
        return self._create(tag, self._main_ns, **kwarg)

    def __init__(self, http_helper, api, namespaces=None):
        self._http_helper = http_helper
        self._main_ns = _NAMESPACE_ENVELOPE
        self._api = api
        # Compose basic document
        all_namespaces = {
            'SOAP-ENV': _NAMESPACE_ENVELOPE,
            'xsd': _NAMESPACE_XSD,
            'xsi': _NAMESPACE_XSI,
            'ns2': 'auth',
            'SOAP-ENC': _NAMESPACE_XML_SOAP_ENCODING,
        }
        if namespaces is not None:
            all_namespaces.update(namespaces)
        self._root = self._create_envelope('Envelope', nsmap=all_namespaces)
        self._root.set(lxml.etree.QName(self._main_ns, 'encodingStyle').text, _NAMESPACE_XML_SOAP_ENCODING)
        self._header = self._create_envelope('Header')
        self._root.append(self._header)
        self._body = self._create_envelope('Body')
        self._root.append(self._body)
        self._command = None

    def add_auth(self, username, password):
        auth = self._create('authenticate', 'auth')
        user = self._create('UserName')
        user.text = username
        auth.append(user)
        pw = self._create('Password')
        pw.text = password
        auth.append(pw)
        self._header.append(auth)

    def add_simple_command(self, command, **args):
        self._command = command
        command = self._create(command, self._api)
        for arg, value in args.items():
            arg = self._create(arg)
            encode_wsdl(arg, value)
            command.append(arg)
        self._body.append(command)

    def execute(self, debug=False):
        payload = b'''<?xml version='1.0' encoding='utf-8'?>''' + b'\n' + lxml.etree.tostring(self._root) + b'\n'
        try:
            headers = {
                'Content-Type': 'text/xml; charset=utf-8',
                'Content-Length': str(len(payload)),
            }
            if self._command:
                headers['SOAPAction'] = '"{0}#{1}"'.format(self._api, self._command)
            result, info = self._http_helper.fetch_url(self._api, data=payload, method='POST', timeout=300, headers=headers)
            code = info['status']
        except NetworkError as e:
            raise WSDLNetworkError(to_native(e))
        # if debug:
        #     q.q('Result: {0}, content: {1}'.format(code, result.decode('utf-8')))
        if code < 200 or code >= 300:
            Parser(self._api, lxml.etree.fromstring(result))
            raise WSDLError('server', 'Error {0} while executing WSDL command:\n{1}'.format(code, result.decode('utf-8')))
        return Parser(self._api, lxml.etree.fromstring(result))

Anon7 - 2022
AnonSec Team