Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.15.208.206
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/wti/remote/plugins/modules/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/wti/remote/plugins/modules/cpm_firmware_update.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# (C) 2019 Red Hat Inc.
# Copyright (C) 2019 Western Telematic Inc.
#
# GNU General Public License v3.0+
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#
# Module to upgeade the firmware on WTI OOB and PDU devices.
# CPM remote_management
#
from __future__ import absolute_import, division, print_function
__metaclass__ = type

ANSIBLE_METADATA = {
    'metadata_version': '1.1',
    'status': ['preview'],
    'supported_by': 'community'
}

DOCUMENTATION = """
---
module: cpm_firmware_update
version_added: "2.9.0"
author: "Western Telematic Inc. (@wtinetworkgear)"
short_description: Set Serial port parameters in WTI OOB and PDU devices
description:
    - "Set Serial port parameters in WTI OOB and PDU devices"
options:
    cpm_url:
        description:
            - This is the URL of the WTI device to send the module.
        required: true
        type: str
    cpm_username:
        description:
            - This is the Username of the WTI device to send the module.
        required: true
        type: str
    cpm_password:
        description:
            - This is the Password of the WTI device to send the module.
        required: true
        type: str
    cpm_path:
        description:
            - This is the directory path to store the WTI device configuration file.
        required: false
        type: str
        default: "/tmp/"
    cpm_file:
        description:
            - If a file is defined, this file will be used to update the WTI device.
        required: false
        type: str
    use_force:
        description:
            - If set to True, the upgrade will happen even if the device doesnt need it.
        required: false
        type: bool
        default: false
    use_https:
        description:
            - Designates to use an https connection or http connection.
        required: false
        type: bool
        default: true
    validate_certs:
        description:
            - If false, SSL certificates will not be validated. This should only be used
              - on personally controlled sites using self-signed certificates.
        required: false
        type: bool
        default: true
    use_proxy:
        description: Flag to control if the lookup will observe HTTP proxy environment variables when present.
        required: false
        type: bool
        default: false
    family:
        description:
            - Force the download to both either Console (1) or Power (0)
        required: false
        type: int
        default: 1
        choices: [ 0, 1 ]
    removefileonexit:
        description:
            - After an upgrade, remove the upgrade OS image
        required: false
        type: int
        default: 1
        choices: [ 0, 1 ]

notes:
    - Use C(groups/cpm) in C(module_defaults) to set common options used between CPM modules.
"""

EXAMPLES = """
# Upgrade the firmware of a WTI device
- name: Upgrade the firmware of a WTI device
  cpm_firmware_update:
    cpm_url: "nonexist.wti.com"
    cpm_username: "super"
    cpm_password: "super"
    use_https: true
    validate_certs: false


# Upgrade the firmware of a WTI device and keep the download OS image after exit
- name: Upgrade the firmware of a WTI device and keep the download OS image after exit
  cpm_firmware_update:
    cpm_url: "nonexist.wti.com"
    cpm_username: "super"
    cpm_password: "super"
    use_https: true
    validate_certs: false
    removefileonexit: "0"
"""

RETURN = """
data:
    description: The output XML configuration of the WTI device being updated
    returned: always
    type: complex
    contains:
        filelength:
            description: Length of the file uploaded in bytes
            returned: success
            type: int
            sample:
                - filelength: 329439
        status:
            description: List of status returns from backup operation
            returned: success
            type: list
            sample:
                - code: 0
                - text: "ok"
                - unittimestamp: "2020-02-14T00:18:57+00:00"
"""

import base64
import os
import json
import tempfile
import traceback
import shutil

try:
    import requests
    HAS_REQUESTS_LIBRARY = True
except ImportError:
    HAS_REQUESTS_LIBRARY = False

from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_text, to_bytes, to_native
from ansible.module_utils.six.moves.urllib.error import HTTPError, URLError
from ansible.module_utils.urls import open_url, ConnectionError, SSLValidationError
from ansible.module_utils.urls import fetch_url, url_argument_spec


def run_module():
    # define the available arguments/parameters that a user can pass to the module
    module_args = dict(
        cpm_url=dict(type='str', required=True),
        cpm_username=dict(type='str', required=True),
        cpm_password=dict(type='str', required=True, no_log=True),
        cpm_path=dict(type='str', default="/tmp/"),
        cpm_file=dict(type='str', default=None),
        family=dict(type='int', default=1, choices=[0, 1]),
        removefileonexit=dict(type='int', default=1, choices=[0, 1]),
        use_force=dict(type='bool', default=False),
        use_https=dict(type='bool', default=True),
        validate_certs=dict(type='bool', default=True),
        use_proxy=dict(type='bool', default=False)
    )

    result = dict(
        changed=False,
        data=''
    )

    family = None
    online_file_location = None
    usersuppliedfilename = None
    forceupgrade = False
    localfilefamily = -1

    module = AnsibleModule(argument_spec=module_args, supports_check_mode=True)

    if HAS_REQUESTS_LIBRARY is False:
        fail_json = dict(msg='IMPORT: requests import not installed', changed=False)
        module.fail_json(**fail_json)

    if module.params['cpm_file'] is not None:
        usersuppliedfilename = ("%s%s" % (to_native(module.params['cpm_path']), to_native(module.params['cpm_file'])))

    if module.params['use_force'] is True:
        forceupgrade = True

    # if a local file was defined lets see what family it is: Console or Power
    if (usersuppliedfilename is not None):
        try:
            ifilesize = os.path.getsize(usersuppliedfilename)
            file = open(usersuppliedfilename, 'rb')
            file.seek(ifilesize - 20)
            fileread = file.read()
            if (fileread.find(b"TSM") >= 0):
                localfilefamily = 1
            elif (fileread.find(b"VMR") >= 0):
                localfilefamily = 0
            file.close()
#        print("User Supplied file [%s] is a %s type." %(usersuppliedfilename, ("Console" if localfilefamily == 1 else "Power")))
        except Exception as e:
            fail_json = dict(msg='FILE: User Supplied file {0} does not exist : {1}'.format(usersuppliedfilename, to_native(e)), changed=False)
            module.fail_json(**fail_json)

    auth = to_text(base64.b64encode(to_bytes('{0}:{1}'.format(to_native(module.params['cpm_username']), to_native(module.params['cpm_password'])),
                   errors='surrogate_or_strict')))

    if module.params['use_https'] is True:
        protocol = "https://"
    else:
        protocol = "http://"

    # 1. Get the Version of the WTI device
    fullurl = ("%s%s/api/v2/status/firmware" % (protocol, to_native(module.params['cpm_url'])))
    method = 'GET'
    try:
        response = open_url(fullurl, data=None, method=method, validate_certs=module.params['validate_certs'], use_proxy=module.params['use_proxy'],
                            headers={'Content-Type': 'application/json', 'Authorization': "Basic %s" % auth})

    except HTTPError as e:
        fail_json = dict(msg='GET: Received HTTP error for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
        module.fail_json(**fail_json)
    except URLError as e:
        fail_json = dict(msg='GET: Failed lookup url for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
        module.fail_json(**fail_json)
    except SSLValidationError as e:
        fail_json = dict(msg='GET: Error validating the server''s certificate for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
        module.fail_json(**fail_json)
    except ConnectionError as e:
        fail_json = dict(msg='GET: Error connecting to {0} : {1}'.format(fullurl, to_native(e)), changed=False)
        module.fail_json(**fail_json)

    result['data'] = json.loads(response.read())
    statuscode = result['data']["status"]["code"]

    local_release_version = result['data']["config"]["firmware"]
    try:
        family = int(result['data']["config"]["family"])
    except Exception as e:
        family = 1

#    print("Device reports Version: %s, Family: %s\n" % (local_release_version, ("Console" if family == 1 else "Power")))
    if (localfilefamily != -1):
        if (family != localfilefamily):
            fail_json = dict(msg='FAMILY MISMATCH: Your local file is a: %s type, the device is a %s type'
                             % (("Console" if localfilefamily == 1 else "Power"), ("Console" if family == 1 else "Power")), changed=False)
            module.fail_json(**fail_json)

    # 2. Go online and find the latest version of the os image for this device family
    if (localfilefamily == -1):
        fullurl = ("https://my.wti.com/update/version.aspx?fam=%s" % (family))

        method = 'GET'
        try:
            response = open_url(fullurl, data=None, method=method, validate_certs=module.params['validate_certs'], use_proxy=module.params['use_proxy'],
                                headers={'Content-Type': 'application/json', 'Authorization': "Basic %s" % auth})

        except HTTPError as e:
            fail_json = dict(msg='GET: Received HTTP error for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
            module.fail_json(**fail_json)
        except URLError as e:
            fail_json = dict(msg='GET: Failed lookup url for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
            module.fail_json(**fail_json)
        except SSLValidationError as e:
            fail_json = dict(msg='GET: Error validating the server''s certificate for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
            module.fail_json(**fail_json)
        except ConnectionError as e:
            fail_json = dict(msg='GET: Error connecting to {0} : {1}'.format(fullurl, to_native(e)), changed=False)
            module.fail_json(**fail_json)

        result['data'] = json.loads(response.read())
        remote_release_version = result['data']["config"]["firmware"]

        if ((float(local_release_version) < 6.58) & (family == 1)) | ((float(local_release_version) < 2.15) & (family == 0)):
            fail_json = dict(msg='ERROR: WTI Device does not support remote upgrade', changed=False)
            module.fail_json(**fail_json)

        statuscode = result['data']['status']['code']
    else:
        remote_release_version = 0

    if (int(statuscode) == 0):
        local_filename = None
        if ((float(local_release_version) < float(remote_release_version)) or (forceupgrade == 1)) or (localfilefamily >= 0):
            if (module.check_mode is False):
                if (localfilefamily == -1):
                    online_file_location = result['data']["config"]["imageurl"]

                    local_filename = online_file_location[online_file_location.rfind("/") + 1:]
                    local_filename = tempfile.gettempdir() + "/" + local_filename

                    response = requests.get(online_file_location, stream=True)
                    handle = open(local_filename, "wb")
                    for chunk in response.iter_content(chunk_size=512):
                        if chunk:  # filter out keep-alive new chunks
                            handle.write(chunk)
                    handle.close()
                else:
                    if (family == localfilefamily):
                        local_filename = usersuppliedfilename
                    else:
                        print("FAMILY MISMATCH: Your local file is a %s type, and the device is a %s type\n\n"
                              % (("Console" if localfilefamily == 1 else "Power"), ("Console" if family == 1 else "Power")))
                        exit(3)
                # SEND the file to the WTI device
                # 3. upload new os image to WTI device
                fullurl = ("%s%s/cgi-bin/getfile" % (protocol, to_native(module.params['cpm_url'])))
                files = {'file': ('name.binary', open(local_filename, 'rb'), 'application/octet-stream')}

                try:
                    response = requests.post(fullurl, files=files, auth=(to_native(module.params['cpm_username']),
                                             to_native(module.params['cpm_password'])), verify=(module.params['validate_certs']), stream=True)
                    result['data'] = response.json()

                    if (response.status_code == 200):
                        if (int(result['data']['status']['code']) == 0):
                            result['changed'] = True
                        else:
                            fail_json = dict(msg='FAIL: Upgrade Failed for {0}'.format(fullurl), changed=False)
                            module.fail_json(**fail_json)

                except requests.exceptions.RequestException as e:  # This is the correct syntax
                    fail_json = dict(msg='GET: Received HTTP error for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
                    module.fail_json(**fail_json)

                # only remove if the file was downloaded
                if (localfilefamily == -1):
                    if (int(module.params['removefileonexit']) == 1):
                        os.remove(local_filename)
        else:
            result['data'] = "{ \"filelength\": \"0\", \"status\": { \"code\": \"1\", \"text\": \"device up to date\" } }"
    else:
        result['data'] = "{ \"filelength\": \"0\", \"status\": { \"code\": \"2\", \"text\": \"device bad family code: %s\" } }" % (family)

    module.exit_json(**result)


def main():
    run_module()


if __name__ == '__main__':
    main()

Anon7 - 2022
AnonSec Team