Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.191.200.36
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/ansible_collections/dellemc/os10/plugins/modules/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/ansible_collections/dellemc/os10/plugins/modules//bgp_validate.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import (absolute_import, division, print_function)

__copyright__ = "(c) 2020 Dell Inc. or its subsidiaries. All rights reserved."

__metaclass__ = type

DOCUMENTATION = '''
module: bgp_validate
author: "Senthil Kumar Ganesan (@skg-net)"
short_description: Validate the bgp neighbor state,raise error if it is not in established state
description:

  - Troubleshoot the bgp neighor state info using show ip bgp summary and show ip interface brief.

options:
    show_ip_bgp:
        description:
            - show ip bgp summary output
        type: 'list'
        required: True
    show_ip_intf_brief:
        description:
            - show ip interface brief output
        type: 'list'
        required: True
    bgp_neighbors:
        description:
            - planned neighbours input from group_var to compare actual
        type: 'list'
        required: True
'''
EXAMPLES = '''
Copy below YAML into a playbook (e.g. play.yml) and run as follows:

#$ ansible-playbook -i inv play.yml
name: Validate BGP configuration
hosts: localhost
connection: local
gather_facts: False
tasks:
  - name: "Get Dell EMC OS10 Show ip bgp summary"
    os10_command:
      commands:
        - command: "show ip bgp summary | display-xml"
        - command: "show ip interface brief | display-xml"
      provider: "{{ hostvars[item].cli }}"
    with_items: "{{ groups['all'] }}"
    register: show_bgp
  - set_fact:
       output_bgp:  "{{ output_bgp|default([])+ [{'host': item.invocation.module_args.provider.host, 'inv_name': item.item,
                                                  'stdout_show_bgp': item.stdout.0, 'stdout_show_ip': item.stdout.1}] }}"
    loop: "{{ show_bgp.results }}"
  - debug: var=output_bgp
  - local_action: copy content={{ output_bgp }} dest=show
  - name: call lib to convert bgp info from xml to dict format
    base_xml_to_dict:
       cli_responses: "{{ item.stdout_show_bgp }}"
    with_items:
      - "{{ output_bgp }}"
    register: show_bgp_list
  - name: call lib to convert ip interface info from xml to dict format
    base_xml_to_dict:
       cli_responses: "{{ item.stdout_show_ip }}"
    with_items:
      - "{{ output_bgp }}"
    register: show_ip_intf_list
  - name: call lib for bgp validation
    bgp_validate:
      show_ip_bgp: "{{ show_bgp_list.results  }}"
      show_ip_intf_brief: "{{ show_ip_intf_list.results  }}"
      bgp_neighbors: "{{ intended_bgp_neighbors }}"
'''

from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_native
from collections import OrderedDict
import traceback


class BgpValidation(object):
    def __init__(self):
        self.module = AnsibleModule(argument_spec=self.get_fields())
        self.show_ip_bgp = self.module.params['show_ip_bgp']
        self.show_ip_intf_brief = self.module.params['show_ip_intf_brief']
        self.bgp_neighbors = self.module.params['bgp_neighbors']
        self.exit_msg = OrderedDict()

    def get_fields(self):
        spec_fields = {
            'show_ip_bgp': {
                'type': 'list',
                'required': True
            },
            'show_ip_intf_brief': {
                'type': 'list',
                'required': True
            },
            'bgp_neighbors': {
                'type': 'list',
                'required': True
            }
        }
        return spec_fields

    def parse_bgp_output(self):
        show_bgp_dict = {}
        for show_list in self.show_ip_bgp:
            show_bgp_list = list()
            item = show_list.get("item")
            inv_name = None
            if item is not None:
                inv_name = item.get("inv_name")
            msg = show_list.get("msg")
            if msg is not None:
                result = msg.get("result")
                if result is not None:
                    for sub_result in result:
                        bgp_dict = {}
                        rpc_reply = sub_result.get("rpc-reply")
                        if rpc_reply is not None:
                            bulk = rpc_reply.get("bulk")
                            if bulk is not None:
                                data = bulk.get("data")
                                if data is not None and "peer-oper" in data:
                                    peer_oper = data.get("peer-oper")
                                    if peer_oper is not None and "remote-address" in peer_oper:
                                        bgp_dict["remote_address"] = peer_oper.get(
                                            "remote-address")
                                        bgp_dict["bgp-state"] = peer_oper.get(
                                            "bgp-state")
                                        show_bgp_list.append(bgp_dict)
                    show_bgp_dict[inv_name] = show_bgp_list
        return show_bgp_dict

    def parse_ip_intf_output(self):
        show_ip_dict = {}
        for show_list in self.show_ip_intf_brief:
            show_ip_list = list()
            item = show_list.get("item")
            inv_name = None
            if item is not None:
                inv_name = item.get("inv_name")
            msg = show_list.get("msg")
            if msg is not None:
                result = msg.get("result")
                if result is not None:
                    for sub_result in result:
                        rpc_reply = sub_result.get("rpc-reply")
                        if rpc_reply is not None:
                            bulk = rpc_reply.get("bulk")
                            if bulk is not None:
                                data = bulk.get("data")
                                if data is not None:
                                    sub_val = data.get("interface")
                                    if sub_val is not None:
                                        for val in sub_val:
                                            intf_dict = {}
                                            if "ipv4-info" in val:
                                                ipv4_info = val.get(
                                                    "ipv4-info")
                                                if ipv4_info is not None and "addr" in ipv4_info:
                                                    intf_dict["address"] = ipv4_info.get(
                                                        "addr")
                                                    intf_dict["if_name"] = val.get(
                                                        "name")
                                                    intf_dict["oper_status"] = val.get(
                                                        "oper-status")
                                            if bool(intf_dict):
                                                show_ip_list.append(intf_dict)
                    show_ip_dict[inv_name] = show_ip_list
        return show_ip_dict

    def get_intf_info_per_ip(self, intf_dict):
        final_intf_dict = {}
        for key1, value1 in intf_dict.items():
            intf_list = value1
            intf_dict = {}
            for ip in intf_list:
                intf_info = {}
                ip_address = ip.get("address")
                intf_address = ip_address.split('/')
                intf_ip = intf_address[0]
                intf_info["if_name"] = ip.get("if_name")
                intf_info["oper_status"] = ip.get("oper_status")
                intf_info["dest_switch"] = key1
                intf_dict[intf_ip] = intf_info
            if bool(intf_dict):
                final_intf_dict[key1] = intf_dict
        return final_intf_dict

    def get_intf_info_from_neighbor_ip(
            self, source_switch, neighbor_ip, intf_dict):
        final_intf_info = {}
        intf_dict_per_ip = self.get_intf_info_per_ip(intf_dict)
        for key, value in intf_dict_per_ip.items():
            switch_name = key
            if source_switch == switch_name:
                continue
            intf_info = value.get(neighbor_ip)
            if intf_info is None:
                continue
            final_intf_info = intf_info
            break
        return final_intf_info

    def get_bgp_final_nbr_list(self, bgp_dict, intf_dict):
        actual_bgp_dict = {}
        final_bgp_dict = {}
        for key, value in bgp_dict.items():
            actual_bgp_list = list()
            bgp_list = value
            source_switch = key
            for bgp in bgp_list:
                final_dict = {}
                bgp_state = bgp.get("bgp-state")
                remote_address = bgp.get("remote_address")
                reason = "neighbor config missing"
                error_type = "config_missing"
                intf_info = self.get_intf_info_from_neighbor_ip(
                    source_switch, remote_address, intf_dict)
                if bool(intf_info):
                    dest_switch = intf_info.get("dest_switch")
                    remote_port = intf_info.get("if_name")
                    oper_status = intf_info.get("oper_status")
                    final_dict["source_switch"] = source_switch
                    final_dict["bgp_neighbor"] = remote_address
                    final_dict["bgp_state"] = bgp_state
                    if bgp_state != "established":
                        if oper_status != "up":
                            reason = (
                                "remote port {0} {1} is {2}" .format(
                                    dest_switch, remote_port, oper_status))
                            error_type = "remote_port_down"
                        final_dict["error_type"] = error_type
                        final_dict["possible_reason"] = reason
                else:
                    final_dict["source_switch"] = source_switch
                    final_dict["bgp_neighbor"] = remote_address
                    final_dict["bgp_state"] = bgp_state
                    final_dict["error_type"] = error_type
                    final_dict["possible_reason"] = reason
                actual_bgp_list.append(final_dict)
            actual_bgp_dict[source_switch] = actual_bgp_list
        # check actual with intended neighbor to display the result
        intended_list = list()
        for intended_bgp_neighbor in self.bgp_neighbors:
            planned_source_switch = intended_bgp_neighbor.get("source_switch")
            planned_nbr_list = intended_bgp_neighbor.get("neighbor_ip")
            actual_nbr_list = actual_bgp_dict.get(planned_source_switch)
            if planned_nbr_list is None or actual_nbr_list is None:
                continue
            for actual_nbr in actual_nbr_list:
                actual_source_switch = actual_nbr.get("source_switch")
                actual_bgp_neighbor = actual_nbr.get("bgp_neighbor")
                actual_bgp_state = actual_nbr.get("bgp_state")
                if actual_bgp_neighbor in planned_nbr_list:
                    # Don't add established neighbor in result
                    if actual_bgp_state != "established":
                        intended_list.append(actual_nbr)
                    planned_nbr_list.remove(actual_bgp_neighbor)
                else:
                    reason = "neighbor {0} is not an intended, please add this neighbor in the intended_bgp_neighbors".format(
                        actual_bgp_neighbor)
                    actual_nbr["bgp_neighbor"] = "-"
                    actual_nbr["error_type"] = "not_an_intended_neighbor"
                    actual_nbr["possible_reason"] = reason
                    intended_list.append(actual_nbr)
            # Add the missed planned info which are not present in actual
            # results
            for planned_nbr in planned_nbr_list:
                reason = "neighbor config missing"
                temp_dict = {}
                temp_dict["source_switch"] = planned_source_switch
                temp_dict["bgp_neighbor"] = planned_nbr
                temp_dict["error_type"] = "config_missing"
                temp_dict["possible_reason"] = reason
                intended_list.append(temp_dict)
        return intended_list

    def perform_action(self):
        try:
            bgp_dict = self.parse_bgp_output()
            intf_dict = self.parse_ip_intf_output()
            final_bgp_list = self.get_bgp_final_nbr_list(bgp_dict, intf_dict)
            self.exit_msg.update({"results": final_bgp_list})
            self.module.exit_json(changed=False, msg=self.exit_msg)
        except Exception as e:
            self.module.fail_json(
                msg=to_native(e),
                exception=traceback.format_exc())


def main():
    module_instance = BgpValidation()
    module_instance.perform_action()


if __name__ == '__main__':
    main()

Anon7 - 2022
AnonSec Team