Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.128.30.53
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/wti/remote/plugins/lookup/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/wti/remote/plugins/lookup/cpm_snmp_config.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# (C) 2019 Red Hat Inc.
# Copyright (C) 2020 Western Telematic Inc.
#
# GNU General Public License v3.0+
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#
# Module to configure WTI network SNMP Parameters on WTI OOB and PDU devices.
# CPM remote_management
#
from __future__ import absolute_import, division, print_function
__metaclass__ = type

ANSIBLE_METADATA = {
    'metadata_version': '1.1',
    'status': ['preview'],
    'supported_by': 'community'
}

DOCUMENTATION = """
---
module: cpm_snmp_config
version_added: "2.10.0"
author:
    - "Western Telematic Inc. (@wtinetworkgear)"
short_description: Set network IPTables parameters in WTI OOB and PDU devices
description:
    - "Set network IPTables parameters in WTI OOB and PDU devices"
options:
    cpm_url:
        description:
            - This is the URL of the WTI device to send the module.
        type: str
        required: true
    cpm_username:
        description:
            - This is the Username of the WTI device to send the module.
        type: str
        required: true
    cpm_password:
        description:
            - This is the Password of the WTI device to send the module.
        type: str
        required: true
    use_https:
        description:
            - Designates to use an https connection or http connection.
        type: bool
        required: false
        default: true
    validate_certs:
        description:
            - If false, SSL certificates will not be validated. This should only be used
            - on personally controlled sites using self-signed certificates.
        type: bool
        required: false
        default: true
    use_proxy:
        description:
            - Flag to control if the lookup will observe HTTP proxy environment variables when present.
        type: bool
        required: false
        default: false
    protocol:
        description:
            - The protocol that the SNMP entry should be applied. 0 = ipv4, 1 = ipv6.
        type: int
        required: false
        choices: [ 0, 1 ]
    enable:
        description:
            - The activates SNMP polling for the specified interface and protocol.
        type: int
        required: false
        choices: [ 0, 1 ]
    interface:
        description:
            - The ethernet port for the SNMP we are defining.
        type: str
        required: true
        choices:
            - eth0
            - eth1
            - ppp0
            - qmimux0
        type: str
    readonly:
        description:
            - Controls the ability to change configuration parameters with SNMP.
        type: int
        required: false
        choices: [ 0, 1 ]
    version:
        description:
            - Defined which version of SNMP the device will respond to 0 = V1/V2 Only, 1 = V3 Only, 2 = V1/V2/V3.
        type: int
        required: false
        choices: [ 0, 1, 2 ]
    contact:
        description:
            - The name of the administrator responsible for SNMP issues.
        type: str
        required: false
    location:
        description:
            - The location of the SNMP Server.
        type: str
        required: false
    systemname:
        description:
            - The hostname of the WTI Device.
        type: str
        required: false
    rocommunity:
        description:
            - Read Only Community Password, not used for SNMP V3.
        type: str
        required: false
    rwcommunity:
        description:
            - Read/Write Community Password, not used for SNMP V3.
        type: str
        required: false
    clear:
        description:
            - Removes all the users for the protocol being defined before setting the newly defined entries.
        type: int
        required: false
        choices: [ 0, 1 ]
    index:
        description:
            - Index of the user being modified (V3 only).
        type: list
        elements: raw
        required: false
    username:
        description:
            - Sets the User Name for SNMPv3 access (V3 only).
        type: list
        elements: raw
        required: false
    authpriv:
        description:
            - Configures the Authentication and Privacy features for SNMPv3 communication, 0 = Auth/NoPriv, 1 = Auth/Priv (V3 only).
        type: list
        elements: raw
        required: false
    authpass:
        description:
            - Sets the Authentication Password for SNMPv3 (V3 only).
        type: list
        elements: raw
        required: false
    authproto:
        description:
            - Which authentication protocol will be used, 0 = MD5, 1 = SHA1 (V3 only).
        type: list
        elements: raw
        required: false
    privpass:
        description:
            - Sets the Privacy Password for SNMPv3 (V3 only) (V3 only).
        type: list
        elements: raw
        required: false
    privproto:
        description:
            - Which privacy protocol will be used, 0 = DES, 1 = AES128 (V3 only).
        type: list
        elements: raw
        required: false
notes:
  - Use C(groups/cpm) in C(module_defaults) to set common options used between CPM modules.
"""

EXAMPLES = """
# Sets the device SNMP Parameters
- name: Set the an SNMP Parameter for a WTI device
  cpm_iptables_config:
    cpm_url: "nonexist.wti.com"
    cpm_username: "super"
    cpm_password: "super"
    interface: "eth0"
    use_https: true
    validate_certs: false
    protocol: 0
    clear: 1
    enable: 1
    readonly: 0
    version: 0
    rocommunity: "ropassword"
    rwcommunity: "rwpassword"

# Sets the device SNMP Parameters
- name: Set the SNMP Parameters a WTI device
  cpm_iptables_config:
    cpm_url: "nonexist.wti.com"
    cpm_username: "super"
    cpm_password: "super"
    use_https: true
    validate_certs: false
    version: 1
    index:
      - 1
      - 2
    username:
      - "username1"
      - "username2"
    authpriv:
      - 1
      - 1
    authpass:
      - "authpass1"
      - "uthpass2"
    authproto:
      - 1
      - 1
    privpass:
      - "authpass1"
      - "uthpass2"
    privproto:
      - 1
      - 1
"""

RETURN = """
data:
  description: The output JSON returned from the commands sent
  returned: always
  type: complex
  contains:
    snmpaccess:
      description: Current k/v pairs of interface info for the WTI device after module execution.
      returned: always
      type: dict
      sample: [{ "eth0": { "ietf-ipv4": { "clear": 1, "enable": 0, "readonly": 0, "version": 0, "users": [
              { "username": "username1", "authpass": "testpass", "authpriv": "1", "authproto": "0", "privpass": "privpass1",
                "privproto": "0", "index": "1" }]}}}]
"""

from collections import OrderedDict
import base64
import json

from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_text, to_bytes, to_native
from ansible.module_utils.six.moves.urllib.error import HTTPError, URLError
from ansible.module_utils.urls import open_url, ConnectionError, SSLValidationError


def assemble_json(cpmmodule, existing_interface):
    total_username = total_indices = 0
    is_clear = is_changed = protocol = loop = 0
    json_load = ""
    ietfstring = "ietf-ipv4"
    snmpenable = snmpversion = snmpreadonly = None
    snmpsystemname = None
    snmpcontact = None
    snmplocation = None
    snmprocommunity = None
    snmprwcommunity = None
    user_load = ""

    indices = []
    usernamearray = []
    authpriv = []
    authpassarray = []
    authproto = []
    privpassarray = []
    privproto = []

    for x in range(0, 48):
        indices.insert(x, None)
        usernamearray.insert(x, None)
        authpriv.insert(x, None)
        authpassarray.insert(x, None)
        authproto.insert(x, None)
        privpassarray.insert(x, None)
        privproto.insert(x, None)

    ports = cpmmodule.params['interface']

    if (cpmmodule.params['clear'] is not None):
        is_clear = int(cpmmodule.params['clear'])

    if (cpmmodule.params['protocol'] is not None):
        protocol = int(cpmmodule.params['protocol'])
        if (protocol == 1):
            ietfstring = "ietf-ipv6"

    if (cpmmodule.params['enable'] is not None):
        snmpenable = int(cpmmodule.params['enable'])

    if (cpmmodule.params['version'] is not None):
        snmpversion = int(cpmmodule.params['version'])

    if (cpmmodule.params['readonly'] is not None):
        snmpreadonly = int(cpmmodule.params['readonly'])

    if (cpmmodule.params['systemname'] is not None):
        snmpsystemname = to_native(cpmmodule.params['systemname'])

    if (cpmmodule.params['contact'] is not None):
        snmpcontact = to_native(cpmmodule.params['contact'])

    if (cpmmodule.params['location'] is not None):
        snmplocation = to_native(cpmmodule.params['location'])

    if (cpmmodule.params['rocommunity'] is not None):
        snmprocommunity = to_native(cpmmodule.params['rocommunity'])

    if (cpmmodule.params['rwcommunity'] is not None):
        snmprwcommunity = to_native(cpmmodule.params['rwcommunity'])

    index = cpmmodule.params['index']
    if (index is not None):
        if isinstance(index, list):
            for x in index:
                indices.insert(total_indices, (int(to_native(x))) - 1)
                total_indices += 1

    ii = 0
    index = cpmmodule.params['authpriv']
    if (index is not None):
        if isinstance(index, list):
            for x in index:
                authpriv.insert(ii, int(to_native(x)))
                ii += 1

    ii = 0
    index = cpmmodule.params['authproto']
    if (index is not None):
        if isinstance(index, list):
            for x in index:
                authproto.insert(ii, int(to_native(x)))
                ii += 1

    ii = 0
    index = cpmmodule.params['privproto']
    if (index is not None):
        if isinstance(index, list):
            for x in index:
                privproto.insert(ii, int(to_native(x)))
                ii += 1

    total_username = 0
    usernamearray = cpmmodule.params['username']
    if (usernamearray is not None):
        if isinstance(usernamearray, list):
            for x in usernamearray:
                usernamearray[total_username] = to_native(x)
                total_username += 1

    ii = 0
    authpassarray = cpmmodule.params['authpass']
    if (authpassarray is not None):
        if isinstance(authpassarray, list):
            for x in authpassarray:
                authpassarray[ii] = to_native(x)
                ii += 1

    ii = 0
    authpassarray = cpmmodule.params['authpass']
    if (authpassarray is not None):
        if isinstance(authpassarray, list):
            for x in authpassarray:
                authpassarray[ii] = to_native(x)
                ii += 1

    ii = 0
    privpassarray = cpmmodule.params['privpass']
    if (privpassarray is not None):
        if isinstance(privpassarray, list):
            for x in privpassarray:
                privpassarray[ii] = to_native(x)
                ii += 1

    if (total_indices > 0):
        if (total_username != total_indices):
            return None

    for x in range(0, total_username):
        if (usernamearray[x] is not None):
            if (loop > 0):
                user_load = '%s,' % (user_load)

            user_load = '%s{"index": "%d"' % (user_load, (indices[x] + 1))

            if (usernamearray[x] is not None):
                if (existing_interface["snmpaccess"][0][ports][0][ietfstring]["users"][(indices[x])]["username"] != usernamearray[x]):
                    is_changed = True

                user_load = '%s,"username": "%s"' % (user_load, usernamearray[x])
            else:
                user_load = '%s,"username": "%s"' % (user_load, existing_interface["snmpaccess"][0][ports][0][ietfstring]["users"][(indices[x])]["username"])

            if (authpassarray[x] is not None):
                if (existing_interface["snmpaccess"][0][ports][0][ietfstring]["users"][(indices[x])]["authpass"] != authpassarray[x]):
                    is_changed = True
                user_load = '%s,"authpass": "%s"' % (user_load, authpassarray[x])
            else:
                user_load = '%s,"authpass": "%s"' % (user_load, existing_interface["snmpaccess"][0][ports][0][ietfstring]["users"][(indices[x])]["authpass"])

            if (privpassarray[x] is not None):
                if (existing_interface["snmpaccess"][0][ports][0][ietfstring]["users"][(indices[x])]["privpass"] != privpassarray[x]):
                    is_changed = True
                user_load = '%s,"privpass": "%s"' % (user_load, privpassarray[x])
            else:
                user_load = '%s,"privpass": "%s"' % (user_load, existing_interface["snmpaccess"][0][ports][0][ietfstring]["users"][(indices[x])]["privpass"])

            if (authpriv[x] is not None):
                if (int(existing_interface["snmpaccess"][0][ports][0][ietfstring]["users"][(indices[x])]["authpriv"]) != int(authpriv[x])):
                    is_changed = True
                user_load = '%s,"authpriv": "%s"' % (user_load, authpriv[x])
            else:
                user_load = '%s,"authpriv": "%s"' % (user_load, existing_interface["snmpaccess"][0][ports][0][ietfstring]["users"][(indices[x])]["authpriv"])

            if (authproto[x] is not None):
                if (int(existing_interface["snmpaccess"][0][ports][0][ietfstring]["users"][(indices[x])]["authproto"]) != int(authproto[x])):
                    is_changed = True
                user_load = '%s,"authproto": "%s"' % (user_load, authproto[x])
            else:
                user_load = '%s,"authproto": "%s"' % (user_load, existing_interface["snmpaccess"][0][ports][0][ietfstring]["users"][(indices[x])]["authproto"])

            if (privproto[x] is not None):
                if (int(existing_interface["snmpaccess"][0][ports][0][ietfstring]["users"][(indices[x])]["privproto"]) != int(privproto[x])):
                    is_changed = True
                user_load = '%s,"privproto": "%s"' % (user_load, privproto[x])
            else:
                user_load = '%s,"privproto": "%s"' % (user_load, existing_interface["snmpaccess"][0][ports][0][ietfstring]["users"][(indices[x])]["privproto"])

            user_load = '%s}' % (user_load)
            loop += 1

    if (loop > 0):
        json_load = '{"snmpaccess": [{"%s": { "%s": { "clear": %d, "change": %d' % (ports, ietfstring, is_clear, is_changed)

    if (snmpenable is not None):
        if (existing_interface["snmpaccess"][0][ports][0][ietfstring]["enable"] != snmpenable):
            is_changed = True
        json_load = '%s, "enable": %d' % (json_load, snmpenable)
    else:
        json_load = '%s,"enable": "%s"' % (json_load, existing_interface["snmpaccess"][0][ports][0][ietfstring]["enable"])

    if (snmpversion is not None):
        if (existing_interface["snmpaccess"][0][ports][0][ietfstring]["version"] != snmpversion):
            is_changed = True
        json_load = '%s, "version": %d' % (json_load, snmpversion)
    else:
        json_load = '%s,"version": "%s"' % (json_load, existing_interface["snmpaccess"][0][ports][0][ietfstring]["version"])

    if (snmpreadonly is not None):
        if (existing_interface["snmpaccess"][0][ports][0][ietfstring]["readonly"] != snmpreadonly):
            is_changed = True
        json_load = '%s, "readonly": %d' % (json_load, snmpreadonly)
    else:
        json_load = '%s,"readonly": "%s"' % (json_load, existing_interface["snmpaccess"][0][ports][0][ietfstring]["readonly"])

    if (snmpsystemname is not None):
        if (existing_interface["snmpaccess"][0][ports][0][ietfstring]["systemname"] != snmpsystemname):
            is_changed = True
        json_load = '%s, "systemname": %d' % (json_load, snmpsystemname)
    else:
        json_load = '%s,"systemname": "%s"' % (json_load, existing_interface["snmpaccess"][0][ports][0][ietfstring]["systemname"])

    if (snmpcontact is not None):
        if (existing_interface["snmpaccess"][0][ports][0][ietfstring]["contact"] != snmpcontact):
            is_changed = True
        json_load = '%s, "contact": %d' % (json_load, snmpcontact)
    else:
        json_load = '%s,"contact": "%s"' % (json_load, existing_interface["snmpaccess"][0][ports][0][ietfstring]["contact"])

    if (snmplocation is not None):
        if (existing_interface["snmpaccess"][0][ports][0][ietfstring]["location"] != snmplocation):
            is_changed = True
        json_load = '%s, "location": %d' % (json_load, snmplocation)
    else:
        json_load = '%s,"location": "%s"' % (json_load, existing_interface["snmpaccess"][0][ports][0][ietfstring]["location"])

    if (snmprocommunity is not None):
        if (existing_interface["snmpaccess"][0][ports][0][ietfstring]["rocommunity"] != snmprocommunity):
            is_changed = True
        json_load = '%s, "rocommunity": %d' % (json_load, snmprocommunity)
    else:
        json_load = '%s,"rocommunity": "%s"' % (json_load, existing_interface["snmpaccess"][0][ports][0][ietfstring]["rocommunity"])

    if (snmprwcommunity is not None):
        if (existing_interface["snmpaccess"][0][ports][0][ietfstring]["rwcommunity"] != snmprwcommunity):
            is_changed = True
        json_load = '%s, "rwcommunity": %d' % (json_load, snmprwcommunity)
    else:
        json_load = '%s,"rwcommunity": "%s"' % (json_load, existing_interface["snmpaccess"][0][ports][0][ietfstring]["rwcommunity"])

    if (len(user_load) > 0):
        json_load = '%s, "users": [ %s ]' % (json_load, user_load)

    json_load = '%s}}}]}' % (json_load)

    return is_changed, json_load


def run_module():
    # define the available arguments/parameters that a user can pass to
    # the module
    module_args = dict(
        cpm_url=dict(type='str', required=True),
        cpm_username=dict(type='str', required=True),
        cpm_password=dict(type='str', required=True, no_log=True),
        interface=dict(type='str', required=True, choices=["eth0", "eth1", "ppp0", "qmimux0"]),
        protocol=dict(type='int', required=False, default=0, choices=[0, 1]),
        clear=dict(type='int', required=False, default=None, choices=[0, 1]),
        enable=dict(type='int', required=False, default=None, choices=[0, 1]),
        version=dict(type='int', required=False, default=None, choices=[0, 1, 2]),
        readonly=dict(type='int', required=False, default=None, choices=[0, 1]),
        systemname=dict(type='str', required=False, default=None),
        contact=dict(type='str', required=False, default=None),
        location=dict(type='str', required=False, default=None),
        rocommunity=dict(type='str', required=False, default=None, no_log=True),
        rwcommunity=dict(type='str', required=False, default=None, no_log=True),
        index=dict(type='list', elements='raw', required=False, default=None),
        username=dict(type='list', elements='raw', required=False),
        authpriv=dict(type='list', elements='raw', required=False, default=None),
        authproto=dict(type='list', elements='raw', required=False, default=None),
        privproto=dict(type='list', elements='raw', required=False, default=None),
        authpass=dict(type='list', elements='raw', required=False, no_log=True),
        privpass=dict(type='list', elements='raw', required=False, no_log=True),
        use_https=dict(type='bool', default=True),
        validate_certs=dict(type='bool', default=True),
        use_proxy=dict(type='bool', default=False)
    )

    result = dict(
        changed=False,
        data=''
    )

    module = AnsibleModule(argument_spec=module_args, supports_check_mode=True)

    auth = to_text(base64.b64encode(to_bytes('{0}:{1}'.format(to_native(module.params['cpm_username']), to_native(module.params['cpm_password'])),
                   errors='surrogate_or_strict')))

    if module.params['use_https'] is True:
        transport = "https://"
    else:
        transport = "http://"

    fullurl = ("%s%s/api/v2/config/snmpaccess?ports=%s" % (transport, to_native(module.params['cpm_url']), to_native(module.params['interface'])))
    method = 'GET'
    try:
        response = open_url(fullurl, data=None, method=method, validate_certs=module.params['validate_certs'], use_proxy=module.params['use_proxy'],
                            headers={'Content-Type': 'application/json', 'Authorization': "Basic %s" % auth})

    except HTTPError as e:
        fail_json = dict(msg='GET: Received HTTP error for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
        module.fail_json(**fail_json)
    except URLError as e:
        fail_json = dict(msg='GET: Failed lookup url for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
        module.fail_json(**fail_json)
    except SSLValidationError as e:
        fail_json = dict(msg='GET: Error validating the server''s certificate for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
        module.fail_json(**fail_json)
    except ConnectionError as e:
        fail_json = dict(msg='GET: Error connecting to {0} : {1}'.format(fullurl, to_native(e)), changed=False)
        module.fail_json(**fail_json)

    was_changed = False
    result['data'] = json.loads(response.read())
    was_changed, payload = assemble_json(module, result['data'])
#    result['data'] = payload

    if module.check_mode:
        if (payload is not None) and (len(payload) > 0):
            result['changed'] = True
    else:
        if (payload is not None) and (len(payload) > 0):
            fullurl = ("%s%s/api/v2/config/snmpaccess" % (transport, to_native(module.params['cpm_url'])))
            method = 'POST'

            try:
                response = open_url(fullurl, data=payload, method=method, validate_certs=module.params['validate_certs'], use_proxy=module.params['use_proxy'],
                                    headers={'Content-Type': 'application/json', 'Authorization': "Basic %s" % auth})

            except HTTPError as e:
                fail_json = dict(msg='POST: Received HTTP error for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
                module.fail_json(**fail_json)
            except URLError as e:
                fail_json = dict(msg='POST: Failed lookup url for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
                module.fail_json(**fail_json)
            except SSLValidationError as e:
                fail_json = dict(msg='POST: Error validating the server''s certificate for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
                module.fail_json(**fail_json)
            except ConnectionError as e:
                fail_json = dict(msg='POST: Error connecting to {0} : {1}'.format(fullurl, to_native(e)), changed=False)
                module.fail_json(**fail_json)

            result['changed'] = was_changed
            result['data'] = json.loads(response.read())

    module.exit_json(**result)


def main():
    run_module()


if __name__ == '__main__':
    main()

Anon7 - 2022
AnonSec Team