Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.145.103.119
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/community/general/plugins/module_utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/community/general/plugins/module_utils/online.py
# -*- coding: utf-8 -*-
# Copyright (c) Ansible project
# Simplified BSD License (see LICENSES/BSD-2-Clause.txt or https://opensource.org/licenses/BSD-2-Clause)
# SPDX-License-Identifier: BSD-2-Clause

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import json
import sys

from ansible.module_utils.basic import env_fallback
from ansible.module_utils.urls import fetch_url


def online_argument_spec():
    return dict(
        api_token=dict(required=True, fallback=(env_fallback, ['ONLINE_TOKEN', 'ONLINE_API_KEY', 'ONLINE_OAUTH_TOKEN', 'ONLINE_API_TOKEN']),
                       no_log=True, aliases=['oauth_token']),
        api_url=dict(fallback=(env_fallback, ['ONLINE_API_URL']), default='https://api.online.net', aliases=['base_url']),
        api_timeout=dict(type='int', default=30, aliases=['timeout']),
        validate_certs=dict(default=True, type='bool'),
    )


class OnlineException(Exception):

    def __init__(self, message):
        self.message = message


class Response(object):

    def __init__(self, resp, info):
        self.body = None
        if resp:
            self.body = resp.read()
        self.info = info

    @property
    def json(self):
        if not self.body:
            if "body" in self.info:
                return json.loads(self.info["body"])
            return None
        try:
            return json.loads(self.body)
        except ValueError:
            return None

    @property
    def status_code(self):
        return self.info["status"]

    @property
    def ok(self):
        return self.status_code in (200, 201, 202, 204)


class Online(object):

    def __init__(self, module):
        self.module = module
        self.headers = {
            'Authorization': "Bearer %s" % self.module.params.get('api_token'),
            'User-Agent': self.get_user_agent_string(module),
            'Content-type': 'application/json',
        }
        self.name = None

    def get_resources(self):
        results = self.get('/%s' % self.name)
        if not results.ok:
            raise OnlineException('Error fetching {0} ({1}) [{2}: {3}]'.format(
                self.name, '%s/%s' % (self.module.params.get('api_url'), self.name),
                results.status_code, results.json['message']
            ))

        return results.json

    def _url_builder(self, path):
        if path[0] == '/':
            path = path[1:]
        return '%s/%s' % (self.module.params.get('api_url'), path)

    def send(self, method, path, data=None, headers=None):
        url = self._url_builder(path)
        data = self.module.jsonify(data)

        if headers is not None:
            self.headers.update(headers)

        resp, info = fetch_url(
            self.module, url, data=data, headers=self.headers, method=method,
            timeout=self.module.params.get('api_timeout')
        )

        # Exceptions in fetch_url may result in a status -1, the ensures a proper error to the user in all cases
        if info['status'] == -1:
            self.module.fail_json(msg=info['msg'])

        return Response(resp, info)

    @staticmethod
    def get_user_agent_string(module):
        return "ansible %s Python %s" % (module.ansible_version, sys.version.split(' ', 1)[0])

    def get(self, path, data=None, headers=None):
        return self.send('GET', path, data, headers)

    def put(self, path, data=None, headers=None):
        return self.send('PUT', path, data, headers)

    def post(self, path, data=None, headers=None):
        return self.send('POST', path, data, headers)

    def delete(self, path, data=None, headers=None):
        return self.send('DELETE', path, data, headers)

    def patch(self, path, data=None, headers=None):
        return self.send("PATCH", path, data, headers)

    def update(self, path, data=None, headers=None):
        return self.send("UPDATE", path, data, headers)

Anon7 - 2022
AnonSec Team