Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.227.140.251
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/community/fortios/plugins/modules/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/community/fortios/plugins/modules/fmgr_query.py
#!/usr/bin/python
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
#

from __future__ import absolute_import, division, print_function
__metaclass__ = type

DOCUMENTATION = '''
---
module: fmgr_query
notes:
    - Full Documentation at U(https://ftnt-ansible-docs.readthedocs.io/en/latest/).
author: Luke Weighall (@lweighall)
short_description: Query FortiManager data objects for use in Ansible workflows.
description:
  - Provides information on data objects within FortiManager so that playbooks can perform conditionals.

options:
  adom:
    description:
      - The ADOM the configuration should belong to.
    required: false
    default: root

  object:
    description:
      - The data object we wish to query (device, package, rule, etc). Will expand choices as improves.
    required: true
    choices:
    - device
    - cluster_nodes
    - task
    - custom

  custom_endpoint:
    description:
        - ADVANCED USERS ONLY! REQUIRES KNOWLEDGE OF FMGR JSON API!
        - The HTTP Endpoint on FortiManager you wish to GET from.
    required: false

  custom_dict:
    description:
        - ADVANCED USERS ONLY! REQUIRES KNOWLEDGE OF FMGR JSON API!
        - DICTIONARY JSON FORMAT ONLY -- Custom dictionary/datagram to send to the endpoint.
    required: false

  device_ip:
    description:
      - The IP of the device you want to query.
    required: false

  device_unique_name:
    description:
      - The desired "friendly" name of the device you want to query.
    required: false

  device_serial:
    description:
      - The serial number of the device you want to query.
    required: false

  task_id:
    description:
      - The ID of the task you wish to query status on. If left blank and object = 'task' a list of tasks are returned.
    required: false

  nodes:
    description:
      - A LIST of firewalls in the cluster you want to verify i.e. ["firewall_A","firewall_B"].
    required: false
'''


EXAMPLES = '''
- name: QUERY FORTIGATE DEVICE BY IP
  community.fortios.fmgr_query:
    object: "device"
    adom: "ansible"
    device_ip: "10.7.220.41"

- name: QUERY FORTIGATE DEVICE BY SERIAL
  community.fortios.fmgr_query:
    adom: "ansible"
    object: "device"
    device_serial: "FGVM000000117992"

- name: QUERY FORTIGATE DEVICE BY FRIENDLY NAME
  community.fortios.fmgr_query:
    adom: "ansible"
    object: "device"
    device_unique_name: "ansible-fgt01"

- name: VERIFY CLUSTER MEMBERS AND STATUS
  community.fortios.fmgr_query:
    adom: "ansible"
    object: "cluster_nodes"
    device_unique_name: "fgt-cluster01"
    nodes: ["ansible-fgt01", "ansible-fgt02", "ansible-fgt03"]

- name: GET STATUS OF TASK ID
  community.fortios.fmgr_query:
    adom: "ansible"
    object: "task"
    task_id: "3"

- name: USE CUSTOM TYPE TO QUERY AVAILABLE SCRIPTS
  community.fortios.fmgr_query:
    adom: "ansible"
    object: "custom"
    custom_endpoint: "/dvmdb/adom/ansible/script"
    custom_dict: { "type": "cli" }
'''

RETURN = """
api_result:
  description: full API response, includes status code and message
  returned: always
  type: str
"""

from ansible.module_utils.basic import AnsibleModule, env_fallback
from ansible.module_utils.connection import Connection
from ansible_collections.fortinet.fortios.plugins.module_utils.fortimanager.fortimanager import FortiManagerHandler
from ansible_collections.fortinet.fortios.plugins.module_utils.fortimanager.common import FMGBaseException
from ansible_collections.fortinet.fortios.plugins.module_utils.fortimanager.common import FMGRCommon
from ansible_collections.fortinet.fortios.plugins.module_utils.fortimanager.common import FMGRMethods
from ansible_collections.fortinet.fortios.plugins.module_utils.fortimanager.common import DEFAULT_RESULT_OBJ
from ansible_collections.fortinet.fortios.plugins.module_utils.fortimanager.common import FAIL_SOCKET_MSG


def fmgr_get_custom(fmgr, paramgram):
    """
    :param fmgr: The fmgr object instance from fortimanager.py
    :type fmgr: class object
    :param paramgram: The formatted dictionary of options to process
    :type paramgram: dict
    :return: The response from the FortiManager
    :rtype: dict
    """
    # IF THE CUSTOM DICTIONARY (OFTEN CONTAINING FILTERS) IS DEFINED CREATED THAT
    if paramgram["custom_dict"] is not None:
        datagram = paramgram["custom_dict"]
    else:
        datagram = dict()

    # SET THE CUSTOM ENDPOINT PROVIDED
    url = paramgram["custom_endpoint"]
    # MAKE THE CALL AND RETURN RESULTS
    response = fmgr.process_request(url, datagram, FMGRMethods.GET)
    return response


def fmgr_get_task_status(fmgr, paramgram):
    """
    :param fmgr: The fmgr object instance from fortimanager.py
    :type fmgr: class object
    :param paramgram: The formatted dictionary of options to process
    :type paramgram: dict
    :return: The response from the FortiManager
    :rtype: dict
    """
    # IF THE TASK_ID IS DEFINED, THEN GET THAT SPECIFIC TASK
    # OTHERWISE, GET ALL RECENT TASKS IN A LIST
    if paramgram["task_id"] is not None:

        datagram = {
            "adom": paramgram["adom"]
        }
        url = '/task/task/{task_id}'.format(task_id=paramgram["task_id"])
        response = fmgr.process_request(url, datagram, FMGRMethods.GET)
    else:
        datagram = {
            "adom": paramgram["adom"]
        }
        url = '/task/task'
        response = fmgr.process_request(url, datagram, FMGRMethods.GET)
    return response


def fmgr_get_device(fmgr, paramgram):
    """
    This method is used to get information on devices. This will not work on HA_SLAVE nodes, only top level devices.
    Such as cluster objects and standalone devices.

    :param fmgr: The fmgr object instance from fortimanager.py
    :type fmgr: class object
    :param paramgram: The formatted dictionary of options to process
    :type paramgram: dict
    :return: The response from the FortiManager
    :rtype: dict
    """
    # FIRST TRY TO RUN AN UPDATE ON THE DEVICE
    # RUN A QUICK CLUSTER REFRESH/UPDATE ATTEMPT TO ENSURE WE'RE GETTING THE LATEST INFORMOATION
    response = DEFAULT_RESULT_OBJ
    url = ""
    datagram = {}

    update_url = '/dvm/cmd/update/device'
    update_dict = {
        "adom": paramgram['adom'],
        "device": paramgram['device_unique_name'],
        "flags": "create_task"
    }
    # DO THE UPDATE CALL
    fmgr.process_request(update_url, update_dict, FMGRMethods.EXEC)

    # SET THE URL
    url = '/dvmdb/adom/{adom}/device'.format(adom=paramgram["adom"])
    device_found = 0
    response = []

    # TRY TO FIND IT FIRST BY SERIAL NUMBER
    if paramgram["device_serial"] is not None:
        datagram = {
            "filter": ["sn", "==", paramgram["device_serial"]]
        }
        response = fmgr.process_request(url, datagram, FMGRMethods.GET)
        if len(response[1]) >= 0:
            device_found = 1

    # CHECK IF ANYTHING WAS RETURNED, IF NOT TRY DEVICE NAME PARAMETER
    if device_found == 0 and paramgram["device_unique_name"] is not None:
        datagram = {
            "filter": ["name", "==", paramgram["device_unique_name"]]
        }
        response = fmgr.process_request(url, datagram, FMGRMethods.GET)
        if len(response[1]) >= 0:
            device_found = 1

    # CHECK IF ANYTHING WAS RETURNED, IF NOT TRY DEVICE IP ADDRESS
    if device_found == 0 and paramgram["device_ip"] is not None:
        datagram = {
            "filter": ["ip", "==", paramgram["device_ip"]]
        }
        response = fmgr.process_request(url, datagram, FMGRMethods.GET)
        if len(response[1]) >= 0:
            device_found = 1

    return response


def fmgr_get_cluster_nodes(fmgr, paramgram):
    """
    This method is used to get information on devices. This WILL work on HA_SLAVE nodes, but NOT top level standalone
    devices.
    Such as cluster objects and standalone devices.

    :param fmgr: The fmgr object instance from fortimanager.py
    :type fmgr: class object
    :param paramgram: The formatted dictionary of options to process
    :type paramgram: dict
    :return: The response from the FortiManager
    :rtype: dict
    """
    response = DEFAULT_RESULT_OBJ
    url = ""
    datagram = {}
    # USE THE DEVICE METHOD TO GET THE CLUSTER INFORMATION SO WE CAN SEE THE HA_SLAVE NODES
    response = fmgr_get_device(fmgr, paramgram)
    # CHECK FOR HA_SLAVE NODES, IF CLUSTER IS MISSING COMPLETELY THEN QUIT
    try:
        returned_nodes = response[1][0]["ha_slave"]
        num_of_nodes = len(returned_nodes)
    except Exception:
        error_msg = {"cluster_status": "MISSING"}
        return error_msg

    # INIT LOOP RESOURCES
    loop_count = 0
    good_nodes = []
    expected_nodes = list(paramgram["nodes"])
    missing_nodes = list(paramgram["nodes"])
    bad_status_nodes = []

    # LOOP THROUGH THE NODES AND GET THEIR STATUS TO BUILD THE RETURN JSON OBJECT
    # WE'RE ALSO CHECKING THE NODES IF THEY ARE BAD STATUS, OR PLAIN MISSING
    while loop_count < num_of_nodes:
        node_append = {
            "node_name": returned_nodes[loop_count]["name"],
            "node_serial": returned_nodes[loop_count]["sn"],
            "node_parent": returned_nodes[loop_count]["did"],
            "node_status": returned_nodes[loop_count]["status"],
        }
        # IF THE NODE IS IN THE EXPECTED NODES LIST AND WORKING THEN ADD IT TO GOOD NODES LIST
        if node_append["node_name"] in expected_nodes and node_append["node_status"] == 1:
            good_nodes.append(node_append["node_name"])
        # IF THE NODE IS IN THE EXPECTED NODES LIST BUT NOT WORKING THEN ADDED IT TO BAD_STATUS_NODES
        # IF THE NODE STATUS IS NOT 1 THEN ITS BAD
        if node_append["node_name"] in expected_nodes and node_append["node_status"] != 1:
            bad_status_nodes.append(node_append["node_name"])
        # REMOVE THE NODE FROM MISSING NODES LIST IF NOTHING IS WRONG WITH NODE -- LEAVING US A LIST OF
        # NOT WORKING NODES
        missing_nodes.remove(node_append["node_name"])
        loop_count += 1

    # BUILD RETURN OBJECT FROM NODE LISTS
    nodes = {
        "good_nodes": good_nodes,
        "expected_nodes": expected_nodes,
        "missing_nodes": missing_nodes,
        "bad_nodes": bad_status_nodes,
        "query_status": "good",
    }
    if len(nodes["good_nodes"]) == len(nodes["expected_nodes"]):
        nodes["cluster_status"] = "OK"
    else:
        nodes["cluster_status"] = "NOT-COMPLIANT"
    return nodes


def main():
    argument_spec = dict(
        adom=dict(required=False, type="str", default="root"),
        object=dict(required=True, type="str", choices=["device", "cluster_nodes", "task", "custom"]),
        custom_endpoint=dict(required=False, type="str"),
        custom_dict=dict(required=False, type="dict"),
        device_ip=dict(required=False, type="str"),
        device_unique_name=dict(required=False, type="str"),
        device_serial=dict(required=False, type="str"),
        nodes=dict(required=False, type="list"),
        task_id=dict(required=False, type="str")
    )

    module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=False, )
    paramgram = {
        "adom": module.params["adom"],
        "object": module.params["object"],
        "device_ip": module.params["device_ip"],
        "device_unique_name": module.params["device_unique_name"],
        "device_serial": module.params["device_serial"],
        "nodes": module.params["nodes"],
        "task_id": module.params["task_id"],
        "custom_endpoint": module.params["custom_endpoint"],
        "custom_dict": module.params["custom_dict"]
    }
    module.paramgram = paramgram
    fmgr = None
    if module._socket_path:
        connection = Connection(module._socket_path)
        fmgr = FortiManagerHandler(connection, module)
        fmgr.tools = FMGRCommon()
    else:
        module.fail_json(**FAIL_SOCKET_MSG)

    results = DEFAULT_RESULT_OBJ

    try:
        # IF OBJECT IS DEVICE
        if paramgram["object"] == "device" and any(v is not None for v in [paramgram["device_unique_name"],
                                                                           paramgram["device_serial"],
                                                                           paramgram["device_ip"]]):
            results = fmgr_get_device(fmgr, paramgram)
            if results[0] not in [0]:
                module.fail_json(msg="Device query failed!")
            elif len(results[1]) == 0:
                module.exit_json(msg="Device NOT FOUND!")
            else:
                module.exit_json(msg="Device Found", **results[1][0])
    except Exception as err:
        raise FMGBaseException(err)

    try:
        # IF OBJECT IS CLUSTER_NODES
        if paramgram["object"] == "cluster_nodes" and paramgram["nodes"] is not None:
            results = fmgr_get_cluster_nodes(fmgr, paramgram)
            if results["cluster_status"] == "MISSING":
                module.exit_json(msg="No cluster device found!", **results)
            elif results["query_status"] == "good":
                module.exit_json(msg="Cluster Found - Showing Nodes", **results)
            elif results is None:
                module.fail_json(msg="Query FAILED -- Check module or playbook syntax")
    except Exception as err:
        raise FMGBaseException(err)

    try:
        # IF OBJECT IS TASK
        if paramgram["object"] == "task":
            results = fmgr_get_task_status(fmgr, paramgram)
            if results[0] != 0:
                module.fail_json(**results[1])
            if results[0] == 0:
                module.exit_json(**results[1])
    except Exception as err:
        raise FMGBaseException(err)

    try:
        # IF OBJECT IS CUSTOM
        if paramgram["object"] == "custom":
            results = fmgr_get_custom(fmgr, paramgram)
            if results[0] != 0:
                module.fail_json(msg="QUERY FAILED -- Please check syntax check JSON guide if needed.")
            if results[0] == 0:
                results_len = len(results[1])
                if results_len > 0:
                    results_combine = dict()
                    if isinstance(results[1], dict):
                        results_combine["results"] = results[1]
                    if isinstance(results[1], list):
                        results_combine["results"] = results[1][0:results_len]
                    module.exit_json(msg="Custom Query Success", **results_combine)
                else:
                    module.exit_json(msg="NO RESULTS")
    except Exception as err:
        raise FMGBaseException(err)

    return module.exit_json(**results[1])


if __name__ == "__main__":
    main()

Anon7 - 2022
AnonSec Team