Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.140.188.201
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/3/root/lib/python3/dist-packages/ansible_collections/cisco/nxos/plugins/modules/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/3/root/lib/python3/dist-packages/ansible_collections/cisco/nxos/plugins/modules/nxos_vrf.py
#!/usr/bin/python
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import absolute_import, division, print_function


__metaclass__ = type


DOCUMENTATION = """
module: nxos_vrf
extends_documentation_fragment:
- cisco.nxos.nxos
short_description: Manages global VRF configuration.
description:
- This module provides declarative management of VRFs on CISCO NXOS network devices.
version_added: 1.0.0
author:
- Jason Edelman (@jedelman8)
- Gabriele Gerbino (@GGabriele)
- Trishna Guha (@trishnaguha)
notes:
- Tested against NXOSv 7.3.(0)D1(1) on VIRL
- Unsupported for Cisco MDS
- Cisco NX-OS creates the default VRF by itself. Therefore, you're not allowed to
  use default as I(vrf) name in this module.
- C(vrf) name must be shorter than 32 chars.
- VRF names are not case sensible in NX-OS. Anyway, the name is stored just like it's
  inserted by the user and it'll not be changed again unless the VRF is removed and
  re-created. i.e. C(vrf=NTC) will create a VRF named NTC, but running it again with
  C(vrf=ntc) will not cause a configuration change.
options:
  name:
    description:
    - Name of VRF to be managed.
    aliases:
    - vrf
    type: str
  admin_state:
    description:
    - Administrative state of the VRF.
    default: up
    choices:
    - up
    - down
    type: str
  vni:
    description:
    - Specify virtual network identifier. Valid values are Integer or keyword 'default'.
    type: str
  rd:
    description:
    - VPN Route Distinguisher (RD). Valid values are a string in one of the route-distinguisher
      formats (ASN2:NN, ASN4:NN, or IPV4:NN); the keyword 'auto', or the keyword 'default'.
    type: str
  interfaces:
    description:
    - List of interfaces to check the VRF has been configured correctly or keyword
      'default'.
    type: list
    elements: str
  associated_interfaces:
    description:
    - This is a intent option and checks the operational state of the for given vrf
      C(name) for associated interfaces. If the value in the C(associated_interfaces)
      does not match with the operational state of vrf interfaces on device it will
      result in failure.
    type: list
    elements: str
  aggregate:
    description: List of VRFs definitions.
    type: list
    elements: dict
    suboptions:
      name:
        description:
        - Name of VRF to be managed.
        aliases:
        - vrf
        type: str
      admin_state:
        description:
        - Administrative state of the VRF.
        choices:
        - up
        - down
        type: str
      vni:
        description:
        - Specify virtual network identifier. Valid values are Integer or keyword 'default'.
        type: str
      rd:
        description:
        - VPN Route Distinguisher (RD). Valid values are a string in one of the route-distinguisher
          formats (ASN2:NN, ASN4:NN, or IPV4:NN); the keyword 'auto', or the keyword 'default'.
        type: str
      interfaces:
        description:
        - List of interfaces to check the VRF has been configured correctly or keyword
          'default'.
        type: list
        elements: str
      associated_interfaces:
        description:
        - This is a intent option and checks the operational state of the for given vrf
          C(name) for associated interfaces. If the value in the C(associated_interfaces)
          does not match with the operational state of vrf interfaces on device it will
          result in failure.
        type: list
        elements: str
      state:
        description:
        - Manages desired state of the resource.
        choices:
        - present
        - absent
        type: str
      description:
        description:
        - Description of the VRF or keyword 'default'.
        type: str
      delay:
        description:
        - Time in seconds to wait before checking for the operational state on remote
          device. This wait is applicable for operational state arguments.
        type: int
  purge:
    description:
    - Purge VRFs not defined in the I(aggregate) parameter.
    type: bool
    default: false
  state:
    description:
    - Manages desired state of the resource.
    default: present
    choices:
    - present
    - absent
    type: str
  description:
    description:
    - Description of the VRF or keyword 'default'.
    type: str
  delay:
    description:
    - Time in seconds to wait before checking for the operational state on remote
      device. This wait is applicable for operational state arguments.
    default: 10
    type: int
"""

EXAMPLES = """
- name: Ensure ntc VRF exists on switch
  cisco.nxos.nxos_vrf:
    name: ntc
    description: testing
    state: present

- name: Aggregate definition of VRFs
  cisco.nxos.nxos_vrf:
    aggregate:
    - {name: test1, description: Testing, admin_state: down}
    - {name: test2, interfaces: Ethernet1/2}

- name: Aggregate definitions of VRFs with Purge
  cisco.nxos.nxos_vrf:
    aggregate:
    - {name: ntc1, description: purge test1}
    - {name: ntc2, description: purge test2}
    state: present
    purge: true

- name: Delete VRFs exist on switch
  cisco.nxos.nxos_vrf:
    aggregate:
    - {name: ntc1}
    - {name: ntc2}
    state: absent

- name: Assign interfaces to VRF declaratively
  cisco.nxos.nxos_vrf:
    name: test1
    interfaces:
    - Ethernet2/3
    - Ethernet2/5

- name: Check interfaces assigned to VRF
  cisco.nxos.nxos_vrf:
    name: test1
    associated_interfaces:
    - Ethernet2/3
    - Ethernet2/5

- name: Ensure VRF is tagged with interface Ethernet2/5 only (Removes from Ethernet2/3)
  cisco.nxos.nxos_vrf:
    name: test1
    interfaces:
    - Ethernet2/5

- name: Delete VRF
  cisco.nxos.nxos_vrf:
    name: ntc
    state: absent
"""

RETURN = """
commands:
  description: commands sent to the device
  returned: always
  type: list
  sample:
    - vrf context ntc
    - no shutdown
    - interface Ethernet1/2
    - no switchport
    - vrf member test2
"""

import re
import time

from copy import deepcopy

from ansible.module_utils.basic import AnsibleModule
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import (
    remove_default_spec,
)

from ansible_collections.cisco.nxos.plugins.module_utils.network.nxos.nxos import (
    get_interface_type,
    load_config,
    run_commands,
)


def search_obj_in_list(name, lst):
    for o in lst:
        if o["name"] == name:
            return o


def execute_show_command(command, module):
    if "show run" not in command:
        output = "json"
    else:
        output = "text"
    cmds = [{"command": command, "output": output}]
    body = run_commands(module, cmds)
    return body


def get_existing_vrfs(module):
    objs = list()
    command = "show vrf all"
    try:
        body = execute_show_command(command, module)[0]
    except IndexError:
        return list()
    try:
        vrf_table = body["TABLE_vrf"]["ROW_vrf"]
    except (TypeError, IndexError, KeyError):
        return list()

    if isinstance(vrf_table, list):
        for vrf in vrf_table:
            obj = {}
            obj["name"] = vrf["vrf_name"]
            objs.append(obj)

    elif isinstance(vrf_table, dict):
        obj = {}
        obj["name"] = vrf_table["vrf_name"]
        objs.append(obj)

    return objs


def map_obj_to_commands(updates, module):
    commands = list()
    want, have = updates
    state = module.params["state"]
    purge = module.params["purge"]

    args = ("rd", "description", "vni")

    for w in want:
        name = w["name"]
        admin_state = w["admin_state"]
        vni = w["vni"]
        interfaces = w.get("interfaces") or []
        if purge:
            state = "absent"
        else:
            state = w["state"]
        del w["state"]

        obj_in_have = search_obj_in_list(name, have)
        if state == "absent" and obj_in_have:
            commands.append("no vrf context {0}".format(name))

        elif state == "present":
            if not obj_in_have:
                commands.append("vrf context {0}".format(name))
                for item in args:
                    candidate = w.get(item)
                    if candidate and candidate != "default":
                        cmd = item + " " + str(candidate)
                        commands.append(cmd)
                if admin_state == "up":
                    commands.append("no shutdown")
                elif admin_state == "down":
                    commands.append("shutdown")
                commands.append("exit")

                if interfaces and interfaces[0] != "default":
                    for i in interfaces:
                        commands.append("interface {0}".format(i))
                        if get_interface_type(i) in (
                            "ethernet",
                            "portchannel",
                        ):
                            commands.append("no switchport")
                        commands.append("vrf member {0}".format(name))

            else:
                # If vni is already configured on vrf, unconfigure it first.
                if vni:
                    if obj_in_have.get("vni") and vni != obj_in_have.get("vni"):
                        commands.append("no vni {0}".format(obj_in_have.get("vni")))

                for item in args:
                    candidate = w.get(item)
                    if candidate == "default":
                        if obj_in_have.get(item):
                            cmd = "no " + item + " " + obj_in_have.get(item)
                            commands.append(cmd)
                    elif candidate and candidate != obj_in_have.get(item):
                        cmd = item + " " + str(candidate)
                        commands.append(cmd)
                if admin_state and admin_state != obj_in_have.get("admin_state"):
                    if admin_state == "up":
                        commands.append("no shutdown")
                    elif admin_state == "down":
                        commands.append("shutdown")

                if commands:
                    commands.insert(0, "vrf context {0}".format(name))
                    commands.append("exit")

                if interfaces and interfaces[0] != "default":
                    if not obj_in_have["interfaces"]:
                        for i in interfaces:
                            commands.append("vrf context {0}".format(name))
                            commands.append("exit")
                            commands.append("interface {0}".format(i))
                            if get_interface_type(i) in (
                                "ethernet",
                                "portchannel",
                            ):
                                commands.append("no switchport")
                            commands.append("vrf member {0}".format(name))

                    elif set(interfaces) != set(obj_in_have["interfaces"]):
                        missing_interfaces = list(set(interfaces) - set(obj_in_have["interfaces"]))
                        for i in missing_interfaces:
                            commands.append("vrf context {0}".format(name))
                            commands.append("exit")
                            commands.append("interface {0}".format(i))
                            if get_interface_type(i) in (
                                "ethernet",
                                "portchannel",
                            ):
                                commands.append("no switchport")
                            commands.append("vrf member {0}".format(name))

                        superfluous_interfaces = list(
                            set(obj_in_have["interfaces"]) - set(interfaces),
                        )
                        for i in superfluous_interfaces:
                            commands.append("vrf context {0}".format(name))
                            commands.append("exit")
                            commands.append("interface {0}".format(i))
                            if get_interface_type(i) in (
                                "ethernet",
                                "portchannel",
                            ):
                                commands.append("no switchport")
                            commands.append("no vrf member {0}".format(name))
                elif interfaces and interfaces[0] == "default":
                    if obj_in_have["interfaces"]:
                        for i in obj_in_have["interfaces"]:
                            commands.append("vrf context {0}".format(name))
                            commands.append("exit")
                            commands.append("interface {0}".format(i))
                            if get_interface_type(i) in (
                                "ethernet",
                                "portchannel",
                            ):
                                commands.append("no switchport")
                            commands.append("no vrf member {0}".format(name))

    if purge:
        existing = get_existing_vrfs(module)
        if existing:
            for h in existing:
                if h["name"] in ("default", "management"):
                    pass
                else:
                    obj_in_want = search_obj_in_list(h["name"], want)
                    if not obj_in_want:
                        commands.append("no vrf context {0}".format(h["name"]))

    return commands


def validate_vrf(name, module):
    if name:
        name = name.strip()
        if name == "default":
            module.fail_json(msg="cannot use default as name of a VRF")
        elif len(name) > 32:
            module.fail_json(msg="VRF name exceeded max length of 32", name=name)
        else:
            return name


def map_params_to_obj(module):
    obj = []
    aggregate = module.params.get("aggregate")
    if aggregate:
        for item in aggregate:
            for key in item:
                if item.get(key) is None:
                    item[key] = module.params[key]

            d = item.copy()
            d["name"] = validate_vrf(d["name"], module)
            obj.append(d)
    else:
        obj.append(
            {
                "name": validate_vrf(module.params["name"], module),
                "description": module.params["description"],
                "vni": module.params["vni"],
                "rd": module.params["rd"],
                "admin_state": module.params["admin_state"],
                "state": module.params["state"],
                "interfaces": module.params["interfaces"],
                "associated_interfaces": module.params["associated_interfaces"],
            },
        )
    return obj


def get_value(arg, config, module):
    extra_arg_regex = re.compile(r"(?:{0}\s)(?P<value>.*)$".format(arg), re.M)
    value = ""
    if arg in config:
        value = extra_arg_regex.search(config).group("value")
    return value


def map_config_to_obj(want, element_spec, module):
    objs = list()

    for w in want:
        obj = deepcopy(element_spec)
        del obj["delay"]
        del obj["state"]

        command = "show vrf {0}".format(w["name"])
        try:
            body = execute_show_command(command, module)[0]
            vrf_table = body["TABLE_vrf"]["ROW_vrf"]
        except (TypeError, IndexError):
            return list()

        name = vrf_table["vrf_name"]
        obj["name"] = name
        obj["admin_state"] = vrf_table["vrf_state"].lower()

        command = "show run all | section vrf.context.{0}".format(name)
        body = execute_show_command(command, module)[0]
        extra_params = ["vni", "rd", "description"]
        for param in extra_params:
            obj[param] = get_value(param, body, module)

        obj["interfaces"] = []
        command = "show vrf {0} interface".format(name)
        try:
            body = execute_show_command(command, module)[0]
            vrf_int = body["TABLE_if"]["ROW_if"]
        except (TypeError, IndexError):
            vrf_int = None

        if vrf_int:
            if isinstance(vrf_int, list):
                for i in vrf_int:
                    intf = i["if_name"]
                    obj["interfaces"].append(intf)
            elif isinstance(vrf_int, dict):
                intf = vrf_int["if_name"]
                obj["interfaces"].append(intf)

        objs.append(obj)
    return objs


def check_declarative_intent_params(want, module, element_spec, result):
    have = None
    is_delay = False

    for w in want:
        if w.get("associated_interfaces") is None:
            continue

        if result["changed"] and not is_delay:
            time.sleep(module.params["delay"])
            is_delay = True

        if have is None:
            have = map_config_to_obj(want, element_spec, module)

        for i in w["associated_interfaces"]:
            obj_in_have = search_obj_in_list(w["name"], have)

            if obj_in_have:
                interfaces = obj_in_have.get("interfaces")
                if interfaces is not None and i not in interfaces:
                    module.fail_json(msg="Interface %s not configured on vrf %s" % (i, w["name"]))


def vrf_error_check(module, commands, responses):
    """Checks for VRF config errors and executes a retry in some circumstances."""
    pattern = "ERROR: Deletion of VRF .* in progress"
    if re.search(pattern, str(responses)):
        # Allow delay/retry for VRF changes
        time.sleep(15)
        responses = load_config(module, commands, opts={"catch_clierror": True})
        if re.search(pattern, str(responses)):
            module.fail_json(msg="VRF config (and retry) failure: %s " % responses)
        module.warn("VRF config delayed by VRF deletion - passed on retry")


def main():
    """main entry point for module execution"""
    element_spec = dict(
        name=dict(type="str", aliases=["vrf"]),
        description=dict(type="str"),
        vni=dict(type="str"),
        rd=dict(type="str"),
        admin_state=dict(type="str", default="up", choices=["up", "down"]),
        interfaces=dict(type="list", elements="str"),
        associated_interfaces=dict(type="list", elements="str"),
        delay=dict(type="int", default=10),
        state=dict(type="str", default="present", choices=["present", "absent"]),
    )

    aggregate_spec = deepcopy(element_spec)

    # remove default in aggregate spec, to handle common arguments
    remove_default_spec(aggregate_spec)

    argument_spec = dict(
        aggregate=dict(type="list", elements="dict", options=aggregate_spec),
        purge=dict(type="bool", default=False),
    )

    argument_spec.update(element_spec)

    required_one_of = [["name", "aggregate"]]
    mutually_exclusive = [["name", "aggregate"]]
    module = AnsibleModule(
        argument_spec=argument_spec,
        required_one_of=required_one_of,
        mutually_exclusive=mutually_exclusive,
        supports_check_mode=True,
    )

    warnings = list()
    result = {"changed": False}
    if warnings:
        result["warnings"] = warnings

    want = map_params_to_obj(module)
    have = map_config_to_obj(want, element_spec, module)

    commands = map_obj_to_commands((want, have), module)
    result["commands"] = commands

    if commands and not module.check_mode:
        responses = load_config(module, commands, opts={"catch_clierror": True})
        vrf_error_check(module, commands, responses)
        result["changed"] = True

    check_declarative_intent_params(want, module, element_spec, result)

    module.exit_json(**result)


if __name__ == "__main__":
    main()

Anon7 - 2022
AnonSec Team