Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.219.112.243
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/community/network/plugins/modules/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/community/network/plugins/modules/ce_switchport.py
#!/usr/bin/python
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
#

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

DOCUMENTATION = '''
---
module: ce_switchport
short_description: Manages Layer 2 switchport interfaces on HUAWEI CloudEngine switches.
description:
    - Manages Layer 2 switchport interfaces on HUAWEI CloudEngine switches.
author: QijunPan (@QijunPan)
notes:
    - When C(state=absent), VLANs can be added/removed from trunk links and
      the existing access VLAN can be 'unconfigured' to just having VLAN 1 on that interface.
    - When working with trunks VLANs the keywords add/remove are always sent
      in the C(port trunk allow-pass vlan) command. Use verbose mode to see commands sent.
    - When C(state=unconfigured), the interface will result with having a default Layer 2 interface, i.e. vlan 1 in access mode.
    - This module requires the netconf system service be enabled on the remote device being managed.
    - Recommended connection is C(ansible.netcommon.netconf).
options:
    interface:
        description:
            - Full name of the interface, i.e. 40GE1/0/22.
        required: true
    mode:
        description:
            - The link type of an interface.
        choices: ['access','trunk', 'hybrid', 'dot1qtunnel']
    default_vlan:
        description:
            - If C(mode=access, or mode=dot1qtunnel), used as the access VLAN ID, in the range from 1 to 4094.
    pvid_vlan:
        description:
            - If C(mode=trunk, or mode=hybrid), used as the trunk native VLAN ID, in the range from 1 to 4094.
    trunk_vlans:
        description:
            - If C(mode=trunk), used as the VLAN range to ADD or REMOVE
              from the trunk, such as 2-10 or 2,5,10-15, etc.
    untagged_vlans:
        description:
            - If C(mode=hybrid), used as the VLAN range to ADD or REMOVE
              from the trunk, such as 2-10 or 2,5,10-15, etc.
    tagged_vlans:
        description:
            - If C(mode=hybrid), used as the VLAN range to ADD or REMOVE
              from the trunk, such as 2-10 or 2,5,10-15, etc.
    state:
        description:
            - Manage the state of the resource.
        default: present
        choices: ['present', 'absent', 'unconfigured']
'''

EXAMPLES = '''
- name: Switchport module test
  hosts: cloudengine
  gather_facts: no
  vars:
    ansible_user: root
    ansible_password: PASSWORD
    ansible_connection: ansible.netcommon.netconf
    ansible_network_os: community.network.ce

  tasks:
  - name: Ensure 10GE1/0/22 is in its default switchport state
    community.network.ce_switchport:
      interface: 10GE1/0/22
      state: unconfigured

  - name: Ensure 10GE1/0/22 is configured for access vlan 20
    community.network.ce_switchport:
      interface: 10GE1/0/22
      mode: access
      default_vlan: 20

  - name: Ensure 10GE1/0/22 only has vlans 5-10 as trunk vlans
    community.network.ce_switchport:
      interface: 10GE1/0/22
      mode: trunk
      pvid_vlan: 10
      trunk_vlans: 5-10

  - name: Ensure 10GE1/0/22 is a trunk port and ensure 2-50 are being tagged (doesn't mean others aren't also being tagged)
    community.network.ce_switchport:
      interface: 10GE1/0/22
      mode: trunk
      pvid_vlan: 10
      trunk_vlans: 2-50

  - name: Ensure these VLANs are not being tagged on the trunk
    community.network.ce_switchport:
      interface: 10GE1/0/22
      mode: trunk
      trunk_vlans: 51-4000
      state: absent
'''

RETURN = '''
proposed:
    description: k/v pairs of parameters passed into module
    returned: always
    type: dict
    sample: {"default_vlan": "20", "interface": "10GE1/0/22", "mode": "access"}
existing:
    description: k/v pairs of existing switchport
    returned: always
    type: dict
    sample: {"default_vlan": "10", "interface": "10GE1/0/22",
             "mode": "access", "switchport": "enable"}
end_state:
    description: k/v pairs of switchport after module execution
    returned: always
    type: dict
    sample: {"default_vlan": "20", "interface": "10GE1/0/22",
             "mode": "access", "switchport": "enable"}
updates:
    description: command string sent to the device
    returned: always
    type: list
    sample: ["10GE1/0/22", "port default vlan 20"]
changed:
    description: check to see if a change was made on the device
    returned: always
    type: bool
    sample: true
'''

import re
from xml.etree import ElementTree as ET
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.network.plugins.module_utils.network.cloudengine.ce import get_nc_config, set_nc_config, ce_argument_spec


CE_NC_GET_PORT_ATTR = """
<filter type="subtree">
  <ethernet xmlns="http://www.huawei.com/netconf/vrp" content-version="1.0" format-version="1.0">
    <ethernetIfs>
      <ethernetIf>
        <ifName>%s</ifName>
        <l2Enable></l2Enable>
        <l2Attribute>
          <linkType></linkType>
          <pvid></pvid>
          <trunkVlans></trunkVlans>
          <untagVlans></untagVlans>
        </l2Attribute>
      </ethernetIf>
    </ethernetIfs>
  </ethernet>
</filter>
"""

CE_NC_SET_PORT = """
<ethernet xmlns="http://www.huawei.com/netconf/vrp" content-version="1.0" format-version="1.0">
<ethernetIfs>
    <ethernetIf operation="merge">
        <ifName>%s</ifName>
        <l2Attribute>
            <linkType>%s</linkType>
            <pvid>%s</pvid>
            <trunkVlans>%s</trunkVlans>
            <untagVlans>%s</untagVlans>
        </l2Attribute>
    </ethernetIf>
</ethernetIfs>
</ethernet>
"""

CE_NC_SET_PORT_MODE = """
<ethernet xmlns="http://www.huawei.com/netconf/vrp" content-version="1.0" format-version="1.0">
<ethernetIfs>
    <ethernetIf operation="merge">
        <ifName>%s</ifName>
        <l2Attribute>
            <linkType>%s</linkType>
        </l2Attribute>
    </ethernetIf>
</ethernetIfs>
</ethernet>
"""

CE_NC_SET_DEFAULT_PORT = """
<config>
   <ethernet xmlns="http://www.huawei.com/netconf/vrp" content-version="1.0" format-version="1.0">
    <ethernetIfs>
        <ethernetIf operation="merge">
            <ifName>%s</ifName>
            <l2Attribute>
                <linkType>access</linkType>
                <pvid>1</pvid>
                <trunkVlans></trunkVlans>
                <untagVlans></untagVlans>
            </l2Attribute>
        </ethernetIf>
    </ethernetIfs>
  </ethernet>
</config>
"""


SWITCH_PORT_TYPE = ('ge', '10ge', '25ge',
                    '4x10ge', '40ge', '100ge', 'eth-trunk')


def get_interface_type(interface):
    """Gets the type of interface, such as 10GE, ETH-TRUNK, VLANIF..."""

    if interface is None:
        return None

    iftype = None

    if interface.upper().startswith('GE'):
        iftype = 'ge'
    elif interface.upper().startswith('10GE'):
        iftype = '10ge'
    elif interface.upper().startswith('25GE'):
        iftype = '25ge'
    elif interface.upper().startswith('4X10GE'):
        iftype = '4x10ge'
    elif interface.upper().startswith('40GE'):
        iftype = '40ge'
    elif interface.upper().startswith('100GE'):
        iftype = '100ge'
    elif interface.upper().startswith('VLANIF'):
        iftype = 'vlanif'
    elif interface.upper().startswith('LOOPBACK'):
        iftype = 'loopback'
    elif interface.upper().startswith('METH'):
        iftype = 'meth'
    elif interface.upper().startswith('ETH-TRUNK'):
        iftype = 'eth-trunk'
    elif interface.upper().startswith('VBDIF'):
        iftype = 'vbdif'
    elif interface.upper().startswith('NVE'):
        iftype = 'nve'
    elif interface.upper().startswith('TUNNEL'):
        iftype = 'tunnel'
    elif interface.upper().startswith('ETHERNET'):
        iftype = 'ethernet'
    elif interface.upper().startswith('FCOE-PORT'):
        iftype = 'fcoe-port'
    elif interface.upper().startswith('FABRIC-PORT'):
        iftype = 'fabric-port'
    elif interface.upper().startswith('STACK-PORT'):
        iftype = 'stack-port'
    elif interface.upper().startswith('NULL'):
        iftype = 'null'
    else:
        return None

    return iftype.lower()


def is_portswitch_enalbed(iftype):
    """"[undo] portswitch"""

    return bool(iftype in SWITCH_PORT_TYPE)


def vlan_bitmap_undo(bitmap):
    """convert vlan bitmap to undo bitmap"""

    vlan_bit = ['F'] * 1024

    if not bitmap or len(bitmap) == 0:
        return ''.join(vlan_bit)

    bit_len = len(bitmap)
    for num in range(bit_len):
        undo = (~int(bitmap[num], 16)) & 0xF
        vlan_bit[num] = hex(undo)[2]

    return ''.join(vlan_bit)


def is_vlan_bitmap_empty(bitmap):
    """check vlan bitmap empty"""

    if not bitmap or len(bitmap) == 0:
        return True

    bit_len = len(bitmap)
    for num in range(bit_len):
        if bitmap[num] != '0':
            return False

    return True


class SwitchPort(object):
    """
    Manages Layer 2 switchport interfaces.
    """

    def __init__(self, argument_spec):
        self.spec = argument_spec
        self.module = None
        self.init_module()

        # interface and vlan info
        self.interface = self.module.params['interface']
        self.mode = self.module.params['mode']
        self.state = self.module.params['state']
        self.default_vlan = self.module.params['default_vlan']
        self.pvid_vlan = self.module.params['pvid_vlan']
        self.trunk_vlans = self.module.params['trunk_vlans']
        self.untagged_vlans = self.module.params['untagged_vlans']
        self.tagged_vlans = self.module.params['tagged_vlans']

        # state
        self.changed = False
        self.updates_cmd = list()
        self.results = dict()
        self.proposed = dict()
        self.existing = dict()
        self.end_state = dict()
        self.intf_info = dict()         # interface vlan info
        self.intf_type = None           # loopback tunnel ...

    def init_module(self):
        """ init module """

        required_if = [('state', 'absent', ['mode']), ('state', 'present', ['mode'])]
        mutually_exclusive = [['default_vlan', 'trunk_vlans'],
                              ['default_vlan', 'pvid_vlan'],
                              ['default_vlan', 'untagged_vlans'],
                              ['trunk_vlans', 'untagged_vlans'],
                              ['trunk_vlans', 'tagged_vlans'],
                              ['default_vlan', 'tagged_vlans']]

        self.module = AnsibleModule(
            argument_spec=self.spec, required_if=required_if, supports_check_mode=True, mutually_exclusive=mutually_exclusive)

    def check_response(self, xml_str, xml_name):
        """Check if response message is already succeed."""

        if "<ok/>" not in xml_str:
            self.module.fail_json(msg='Error: %s failed.' % xml_name)

    def get_interface_dict(self, ifname):
        """ get one interface attributes dict."""

        intf_info = dict()
        conf_str = CE_NC_GET_PORT_ATTR % ifname
        xml_str = get_nc_config(self.module, conf_str)
        if "<data/>" in xml_str:
            return intf_info
        xml_str = xml_str.replace('\r', '').replace('\n', '').\
            replace('xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"', "").\
            replace('xmlns="http://www.huawei.com/netconf/vrp"', "")
        tree = ET.fromstring(xml_str)
        l2Enable = tree.find('ethernet/ethernetIfs/ethernetIf/l2Enable')
        intf_info["l2Enable"] = l2Enable.text
        port_type = tree.find('ethernet/ethernetIfs/ethernetIf/l2Attribute')
        for pre in port_type:
            intf_info[pre.tag] = pre.text
        intf_info["ifName"] = ifname
        if intf_info["trunkVlans"] is None:
            intf_info["trunkVlans"] = ""
        if intf_info["untagVlans"] is None:
            intf_info["untagVlans"] = ""
        return intf_info

    def is_l2switchport(self):
        """Check layer2 switch port"""

        return bool(self.intf_info["l2Enable"] == "enable")

    def merge_access_vlan(self, ifname, default_vlan):
        """Merge access interface vlan"""

        change = False
        conf_str = ""

        self.updates_cmd.append("interface %s" % ifname)
        if self.state == "present":
            if self.intf_info["linkType"] == "access":
                if default_vlan and self.intf_info["pvid"] != default_vlan:
                    self.updates_cmd.append(
                        "port default vlan %s" % default_vlan)
                    conf_str = CE_NC_SET_PORT % (ifname, "access", default_vlan, "", "")
                    change = True
            else:  # not access
                self.updates_cmd.append("port link-type access")
                if default_vlan:
                    self.updates_cmd.append(
                        "port default vlan %s" % default_vlan)
                    conf_str = CE_NC_SET_PORT % (ifname, "access", default_vlan, "", "")
                else:
                    conf_str = CE_NC_SET_PORT % (ifname, "access", "1", "", "")
                change = True
        elif self.state == "absent":
            if self.intf_info["linkType"] == "access":
                if default_vlan and self.intf_info["pvid"] == default_vlan and default_vlan != "1":
                    self.updates_cmd.append(
                        "undo port default vlan %s" % default_vlan)
                    conf_str = CE_NC_SET_PORT % (ifname, "access", "1", "", "")
                    change = True

        if not change:
            self.updates_cmd.pop()   # remove interface
            return
        conf_str = "<config>" + conf_str + "</config>"
        rcv_xml = set_nc_config(self.module, conf_str)
        self.check_response(rcv_xml, "MERGE_ACCESS_PORT")
        self.changed = True

    def merge_trunk_vlan(self, ifname, pvid_vlan, trunk_vlans):
        """Merge trunk interface vlan"""

        change = False
        xmlstr = ""
        pvid = ""
        trunk = ""
        self.updates_cmd.append("interface %s" % ifname)
        if trunk_vlans:
            vlan_list = self.vlan_range_to_list(trunk_vlans)
            vlan_map = self.vlan_list_to_bitmap(vlan_list)
        if self.state == "present":
            if self.intf_info["linkType"] == "trunk":
                if pvid_vlan and self.intf_info["pvid"] != pvid_vlan:
                    self.updates_cmd.append(
                        "port trunk pvid vlan %s" % pvid_vlan)
                    pvid = pvid_vlan
                    change = True

                if trunk_vlans:
                    add_vlans = self.vlan_bitmap_add(
                        self.intf_info["trunkVlans"], vlan_map)
                    if not is_vlan_bitmap_empty(add_vlans):
                        self.updates_cmd.append(
                            "port trunk allow-pass %s"
                            % trunk_vlans.replace(',', ' ').replace('-', ' to '))
                        trunk = "%s:%s" % (add_vlans, add_vlans)
                        change = True
                if pvid or trunk:
                    xmlstr += CE_NC_SET_PORT % (ifname, "trunk", pvid, trunk, "")
                    if not pvid:
                        xmlstr = xmlstr.replace("<pvid></pvid>", "")
                    if not trunk:
                        xmlstr = xmlstr.replace("<trunkVlans></trunkVlans>", "")

            else:   # not trunk
                self.updates_cmd.append("port link-type trunk")
                change = True
                if pvid_vlan:
                    self.updates_cmd.append(
                        "port trunk pvid vlan %s" % pvid_vlan)
                    pvid = pvid_vlan
                if trunk_vlans:
                    self.updates_cmd.append(
                        "port trunk allow-pass %s"
                        % trunk_vlans.replace(',', ' ').replace('-', ' to '))
                    trunk = "%s:%s" % (vlan_map, vlan_map)
                if pvid or trunk:
                    xmlstr += CE_NC_SET_PORT % (ifname, "trunk", pvid, trunk, "")
                    if not pvid:
                        xmlstr = xmlstr.replace("<pvid></pvid>", "")
                    if not trunk:
                        xmlstr = xmlstr.replace("<trunkVlans></trunkVlans>", "")

                if not pvid_vlan and not trunk_vlans:
                    xmlstr += CE_NC_SET_PORT_MODE % (ifname, "trunk")
                    self.updates_cmd.append(
                        "undo port trunk allow-pass vlan 1")
        elif self.state == "absent":
            if self.intf_info["linkType"] == "trunk":
                if pvid_vlan and self.intf_info["pvid"] == pvid_vlan and pvid_vlan != '1':
                    self.updates_cmd.append(
                        "undo port trunk pvid vlan %s" % pvid_vlan)
                    pvid = "1"
                    change = True
                if trunk_vlans:
                    del_vlans = self.vlan_bitmap_del(
                        self.intf_info["trunkVlans"], vlan_map)
                    if not is_vlan_bitmap_empty(del_vlans):
                        self.updates_cmd.append(
                            "undo port trunk allow-pass %s"
                            % trunk_vlans.replace(',', ' ').replace('-', ' to '))
                        undo_map = vlan_bitmap_undo(del_vlans)
                        trunk = "%s:%s" % (undo_map, del_vlans)
                        change = True
                if pvid or trunk:
                    xmlstr += CE_NC_SET_PORT % (ifname, "trunk", pvid, trunk, "")
                    if not pvid:
                        xmlstr = xmlstr.replace("<pvid></pvid>", "")
                    if not trunk:
                        xmlstr = xmlstr.replace("<trunkVlans></trunkVlans>", "")

        if not change:
            self.updates_cmd.pop()
            return
        conf_str = "<config>" + xmlstr + "</config>"
        rcv_xml = set_nc_config(self.module, conf_str)
        self.check_response(rcv_xml, "MERGE_TRUNK_PORT")
        self.changed = True

    def merge_hybrid_vlan(self, ifname, pvid_vlan, tagged_vlans, untagged_vlans):
        """Merge hybrid interface vlan"""

        change = False
        xmlstr = ""
        pvid = ""
        tagged = ""
        untagged = ""
        self.updates_cmd.append("interface %s" % ifname)
        if tagged_vlans:
            vlan_targed_list = self.vlan_range_to_list(tagged_vlans)
            vlan_targed_map = self.vlan_list_to_bitmap(vlan_targed_list)
        if untagged_vlans:
            vlan_untarged_list = self.vlan_range_to_list(untagged_vlans)
            vlan_untarged_map = self.vlan_list_to_bitmap(vlan_untarged_list)
        if self.state == "present":
            if self.intf_info["linkType"] == "hybrid":
                if pvid_vlan and self.intf_info["pvid"] != pvid_vlan:
                    self.updates_cmd.append(
                        "port hybrid pvid vlan %s" % pvid_vlan)
                    pvid = pvid_vlan
                    change = True
                if tagged_vlans:
                    add_vlans = self.vlan_bitmap_add(
                        self.intf_info["trunkVlans"], vlan_targed_map)
                    if not is_vlan_bitmap_empty(add_vlans):
                        self.updates_cmd.append(
                            "port hybrid tagged vlan %s"
                            % tagged_vlans.replace(',', ' ').replace('-', ' to '))
                        tagged = "%s:%s" % (add_vlans, add_vlans)
                        change = True
                if untagged_vlans:
                    add_vlans = self.vlan_bitmap_add(
                        self.intf_info["untagVlans"], vlan_untarged_map)
                    if not is_vlan_bitmap_empty(add_vlans):
                        self.updates_cmd.append(
                            "port hybrid untagged vlan %s"
                            % untagged_vlans.replace(',', ' ').replace('-', ' to '))
                        untagged = "%s:%s" % (add_vlans, add_vlans)
                        change = True
                if pvid or tagged or untagged:
                    xmlstr += CE_NC_SET_PORT % (ifname, "hybrid", pvid, tagged, untagged)
                    if not pvid:
                        xmlstr = xmlstr.replace("<pvid></pvid>", "")
                    if not tagged:
                        xmlstr = xmlstr.replace("<trunkVlans></trunkVlans>", "")
                    if not untagged:
                        xmlstr = xmlstr.replace("<untagVlans></untagVlans>", "")
            else:
                self.updates_cmd.append("port link-type hybrid")
                change = True
                if pvid_vlan:
                    self.updates_cmd.append(
                        "port hybrid pvid vlan %s" % pvid_vlan)
                    pvid = pvid_vlan
                if tagged_vlans:
                    self.updates_cmd.append(
                        "port hybrid tagged vlan %s"
                        % tagged_vlans.replace(',', ' ').replace('-', ' to '))
                    tagged = "%s:%s" % (vlan_targed_map, vlan_targed_map)
                if untagged_vlans:
                    self.updates_cmd.append(
                        "port hybrid untagged vlan %s"
                        % untagged_vlans.replace(',', ' ').replace('-', ' to '))
                    untagged = "%s:%s" % (vlan_untarged_map, vlan_untarged_map)
                if pvid or tagged or untagged:
                    xmlstr += CE_NC_SET_PORT % (ifname, "hybrid", pvid, tagged, untagged)
                    if not pvid:
                        xmlstr = xmlstr.replace("<pvid></pvid>", "")
                    if not tagged:
                        xmlstr = xmlstr.replace("<trunkVlans></trunkVlans>", "")
                    if not untagged:
                        xmlstr = xmlstr.replace("<untagVlans></untagVlans>", "")
                if not pvid_vlan and not tagged_vlans and not untagged_vlans:
                    xmlstr += CE_NC_SET_PORT_MODE % (ifname, "hybrid")
                    self.updates_cmd.append(
                        "undo port hybrid untagged vlan 1")
        elif self.state == "absent":
            if self.intf_info["linkType"] == "hybrid":
                if pvid_vlan and self.intf_info["pvid"] == pvid_vlan and pvid_vlan != '1':
                    self.updates_cmd.append(
                        "undo port hybrid pvid vlan %s" % pvid_vlan)
                    pvid = "1"
                    change = True
                if tagged_vlans:
                    del_vlans = self.vlan_bitmap_del(
                        self.intf_info["trunkVlans"], vlan_targed_map)
                    if not is_vlan_bitmap_empty(del_vlans):
                        self.updates_cmd.append(
                            "undo port hybrid tagged vlan %s"
                            % tagged_vlans.replace(',', ' ').replace('-', ' to '))
                        undo_map = vlan_bitmap_undo(del_vlans)
                        tagged = "%s:%s" % (undo_map, del_vlans)
                        change = True
                if untagged_vlans:
                    del_vlans = self.vlan_bitmap_del(
                        self.intf_info["untagVlans"], vlan_untarged_map)
                    if not is_vlan_bitmap_empty(del_vlans):
                        self.updates_cmd.append(
                            "undo port hybrid untagged vlan %s"
                            % untagged_vlans.replace(',', ' ').replace('-', ' to '))
                        undo_map = vlan_bitmap_undo(del_vlans)
                        untagged = "%s:%s" % (undo_map, del_vlans)
                        change = True
                if pvid or tagged or untagged:
                    xmlstr += CE_NC_SET_PORT % (ifname, "hybrid", pvid, tagged, untagged)
                    if not pvid:
                        xmlstr = xmlstr.replace("<pvid></pvid>", "")
                    if not tagged:
                        xmlstr = xmlstr.replace("<trunkVlans></trunkVlans>", "")
                    if not untagged:
                        xmlstr = xmlstr.replace("<untagVlans></untagVlans>", "")

        if not change:
            self.updates_cmd.pop()
            return

        conf_str = "<config>" + xmlstr + "</config>"
        rcv_xml = set_nc_config(self.module, conf_str)
        self.check_response(rcv_xml, "MERGE_HYBRID_PORT")
        self.changed = True

    def merge_dot1qtunnel_vlan(self, ifname, default_vlan):
        """Merge dot1qtunnel"""

        change = False
        conf_str = ""

        self.updates_cmd.append("interface %s" % ifname)
        if self.state == "present":
            if self.intf_info["linkType"] == "dot1qtunnel":
                if default_vlan and self.intf_info["pvid"] != default_vlan:
                    self.updates_cmd.append(
                        "port default vlan %s" % default_vlan)
                    conf_str = CE_NC_SET_PORT % (ifname, "dot1qtunnel", default_vlan, "", "")
                    change = True
            else:
                self.updates_cmd.append("port link-type dot1qtunnel")
                if default_vlan:
                    self.updates_cmd.append(
                        "port default vlan %s" % default_vlan)
                    conf_str = CE_NC_SET_PORT % (ifname, "dot1qtunnel", default_vlan, "", "")
                else:
                    conf_str = CE_NC_SET_PORT % (ifname, "dot1qtunnel", "1", "", "")
                change = True
        elif self.state == "absent":
            if self.intf_info["linkType"] == "dot1qtunnel":
                if default_vlan and self.intf_info["pvid"] == default_vlan and default_vlan != "1":
                    self.updates_cmd.append(
                        "undo port default vlan %s" % default_vlan)
                    conf_str = CE_NC_SET_PORT % (ifname, "dot1qtunnel", "1", "", "")
                    change = True
        if not change:
            self.updates_cmd.pop()   # remove interface
            return
        conf_str = "<config>" + conf_str + "</config>"
        rcv_xml = set_nc_config(self.module, conf_str)
        self.check_response(rcv_xml, "MERGE_DOT1QTUNNEL_PORT")
        self.changed = True

    def default_switchport(self, ifname):
        """Set interface default or unconfigured"""

        change = False
        if self.intf_info["linkType"] != "access":
            self.updates_cmd.append("interface %s" % ifname)
            self.updates_cmd.append("port link-type access")
            self.updates_cmd.append("port default vlan 1")
            change = True
        else:
            if self.intf_info["pvid"] != "1":
                self.updates_cmd.append("interface %s" % ifname)
                self.updates_cmd.append("port default vlan 1")
                change = True

        if not change:
            return

        conf_str = CE_NC_SET_DEFAULT_PORT % ifname
        rcv_xml = set_nc_config(self.module, conf_str)
        self.check_response(rcv_xml, "DEFAULT_INTF_VLAN")
        self.changed = True

    def vlan_series(self, vlanid_s):
        """ convert vlan range to vlan list """

        vlan_list = []
        peerlistlen = len(vlanid_s)
        if peerlistlen != 2:
            self.module.fail_json(msg='Error: Format of vlanid is invalid.')
        for num in range(peerlistlen):
            if not vlanid_s[num].isdigit():
                self.module.fail_json(
                    msg='Error: Format of vlanid is invalid.')
        if int(vlanid_s[0]) > int(vlanid_s[1]):
            self.module.fail_json(msg='Error: Format of vlanid is invalid.')
        elif int(vlanid_s[0]) == int(vlanid_s[1]):
            vlan_list.append(str(vlanid_s[0]))
            return vlan_list
        for num in range(int(vlanid_s[0]), int(vlanid_s[1])):
            vlan_list.append(str(num))
        vlan_list.append(vlanid_s[1])

        return vlan_list

    def vlan_region(self, vlanid_list):
        """ convert vlan range to vlan list """

        vlan_list = []
        peerlistlen = len(vlanid_list)
        for num in range(peerlistlen):
            if vlanid_list[num].isdigit():
                vlan_list.append(vlanid_list[num])
            else:
                vlan_s = self.vlan_series(vlanid_list[num].split('-'))
                vlan_list.extend(vlan_s)

        return vlan_list

    def vlan_range_to_list(self, vlan_range):
        """ convert vlan range to vlan list """

        vlan_list = self.vlan_region(vlan_range.split(','))

        return vlan_list

    def vlan_list_to_bitmap(self, vlanlist):
        """ convert vlan list to vlan bitmap """

        vlan_bit = ['0'] * 1024
        bit_int = [0] * 1024

        vlan_list_len = len(vlanlist)
        for num in range(vlan_list_len):
            tagged_vlans = int(vlanlist[num])
            if tagged_vlans <= 0 or tagged_vlans > 4094:
                self.module.fail_json(
                    msg='Error: Vlan id is not in the range from 1 to 4094.')
            j = tagged_vlans // 4
            bit_int[j] |= 0x8 >> (tagged_vlans % 4)
            vlan_bit[j] = hex(bit_int[j])[2]

        vlan_xml = ''.join(vlan_bit)

        return vlan_xml

    def bitmap_to_vlan_list(self, bitmap):
        """convert VLAN bitmap to VLAN list"""

        vlan_list = list()
        if not bitmap:
            return vlan_list

        for i in range(len(bitmap)):
            if bitmap[i] == '0':
                continue
            bit = int(bitmap[i], 16)
            if bit & 0x8:
                vlan_list.append(str(i * 4))
            if bit & 0x4:
                vlan_list.append(str(i * 4 + 1))
            if bit & 0x2:
                vlan_list.append(str(i * 4 + 2))
            if bit & 0x1:
                vlan_list.append(str(i * 4 + 3))

        return vlan_list

    def vlan_bitmap_add(self, oldmap, newmap):
        """vlan add bitmap"""

        vlan_bit = ['0'] * 1024

        if len(newmap) != 1024:
            self.module.fail_json(msg='Error: New vlan bitmap is invalid.')

        if len(oldmap) != 1024 and len(oldmap) != 0:
            self.module.fail_json(msg='Error: old vlan bitmap is invalid.')

        if len(oldmap) == 0:
            return newmap

        for num in range(1024):
            new_tmp = int(newmap[num], 16)
            old_tmp = int(oldmap[num], 16)
            add = (~(new_tmp & old_tmp)) & new_tmp
            vlan_bit[num] = hex(add)[2]

        vlan_xml = ''.join(vlan_bit)

        return vlan_xml

    def vlan_bitmap_del(self, oldmap, delmap):
        """vlan del bitmap"""

        vlan_bit = ['0'] * 1024

        if not oldmap or len(oldmap) == 0:
            return ''.join(vlan_bit)

        if len(oldmap) != 1024 or len(delmap) != 1024:
            self.module.fail_json(msg='Error: vlan bitmap is invalid.')

        for num in range(1024):
            tmp = int(delmap[num], 16) & int(oldmap[num], 16)
            vlan_bit[num] = hex(tmp)[2]

        vlan_xml = ''.join(vlan_bit)

        return vlan_xml

    def check_params(self):
        """Check all input params"""

        # interface type check
        if self.interface:
            self.intf_type = get_interface_type(self.interface)
            if not self.intf_type:
                self.module.fail_json(
                    msg='Error: Interface name of %s is error.' % self.interface)

        if not self.intf_type or not is_portswitch_enalbed(self.intf_type):
            self.module.fail_json(msg='Error: Interface %s is error.')

        # check default_vlan
        if self.default_vlan:
            if not self.default_vlan.isdigit():
                self.module.fail_json(msg='Error: Access vlan id is invalid.')
            if int(self.default_vlan) <= 0 or int(self.default_vlan) > 4094:
                self.module.fail_json(
                    msg='Error: Access vlan id is not in the range from 1 to 4094.')

        # check pvid_vlan
        if self.pvid_vlan:
            if not self.pvid_vlan.isdigit():
                self.module.fail_json(msg='Error: Pvid vlan id is invalid.')
            if int(self.pvid_vlan) <= 0 or int(self.pvid_vlan) > 4094:
                self.module.fail_json(
                    msg='Error: Pvid vlan id is not in the range from 1 to 4094.')

        # get interface info
        self.intf_info = self.get_interface_dict(self.interface)
        if not self.intf_info:
            self.module.fail_json(msg='Error: Interface does not exist.')

        if not self.is_l2switchport():
            self.module.fail_json(
                msg='Error: Interface is not layer2 switch port.')
        if self.state == "unconfigured":
            if any([self.mode, self.default_vlan, self.pvid_vlan, self.trunk_vlans, self.untagged_vlans, self.tagged_vlans]):
                self.module.fail_json(
                    msg='Error: When state is unconfigured, only interface name exists.')
        else:
            if self.mode == "access":
                if any([self.pvid_vlan, self.trunk_vlans, self.untagged_vlans, self.tagged_vlans]):
                    self.module.fail_json(
                        msg='Error: When mode is access, only default_vlan can be supported.')
            elif self.mode == "trunk":
                if any([self.default_vlan, self.untagged_vlans, self.tagged_vlans]):
                    self.module.fail_json(
                        msg='Error: When mode is trunk, only pvid_vlan and trunk_vlans can exist.')
            elif self.mode == "hybrid":
                if any([self.default_vlan, self.trunk_vlans]):
                    self.module.fail_json(
                        msg='Error: When mode is hybrid, default_vlan and trunk_vlans cannot exist')
            else:
                if any([self.pvid_vlan, self.trunk_vlans, self.untagged_vlans, self.tagged_vlans]):
                    self.module.fail_json(
                        msg='Error: When mode is dot1qtunnel, only default_vlan can be supported.')

    def get_proposed(self):
        """get proposed info"""

        self.proposed['state'] = self.state
        self.proposed['interface'] = self.interface
        self.proposed['mode'] = self.mode
        if self.mode:
            if self.mode == "access":
                self.proposed['access_pvid'] = self.default_vlan
            elif self.mode == "trunk":
                self.proposed['pvid_vlan'] = self.pvid_vlan
                self.proposed['trunk_vlans'] = self.trunk_vlans
            elif self.mode == "hybrid":
                self.proposed['pvid_vlan'] = self.pvid_vlan
                self.proposed['untagged_vlans'] = self.untagged_vlans
                self.proposed['tagged_vlans'] = self.tagged_vlans
            else:
                self.proposed['dot1qtunnel_pvid'] = self.default_vlan

    def get_existing(self):
        """get existing info"""

        if self.intf_info:
            self.existing["interface"] = self.intf_info["ifName"]
            self.existing["switchport"] = self.intf_info["l2Enable"]
            self.existing["mode"] = self.intf_info["linkType"]
            if self.intf_info["linkType"] == "access":
                self.existing['access_pvid'] = self.intf_info["pvid"]
            elif self.intf_info["linkType"] == "trunk":
                self.existing['trunk_pvid'] = self.intf_info["pvid"]
                self.existing['trunk_vlans'] = self.bitmap_to_vlan_list(self.intf_info["trunkVlans"])
            elif self.intf_info["linkType"] == "hybrid":
                self.existing['hybrid_pvid'] = self.intf_info["pvid"]
                self.existing['hybrid_untagged_vlans'] = self.intf_info["untagVlans"]
                self.existing['hybrid_tagged_vlans'] = self.bitmap_to_vlan_list(self.intf_info["trunkVlans"])
            else:
                self.existing['dot1qtunnel_pvid'] = self.intf_info["pvid"]

    def get_end_state(self):
        """get end state info"""

        end_info = self.get_interface_dict(self.interface)
        if end_info:
            self.end_state["interface"] = end_info["ifName"]
            self.end_state["switchport"] = end_info["l2Enable"]
            self.end_state["mode"] = end_info["linkType"]
            if end_info["linkType"] == "access":
                self.end_state['access_pvid'] = end_info["pvid"]
            elif end_info["linkType"] == "trunk":
                self.end_state['trunk_pvid'] = end_info["pvid"]
                self.end_state['trunk_vlans'] = self.bitmap_to_vlan_list(end_info["trunkVlans"])
            elif end_info["linkType"] == "hybrid":
                self.end_state['hybrid_pvid'] = end_info["pvid"]
                self.end_state['hybrid_untagged_vlans'] = end_info["untagVlans"]
                self.end_state['hybrid_tagged_vlans'] = self.bitmap_to_vlan_list(end_info["trunkVlans"])
            else:
                self.end_state['dot1qtunnel_pvid'] = end_info["pvid"]
        if self.end_state == self.existing:
            self.changed = False

    def work(self):
        """worker"""

        self.check_params()
        if not self.intf_info:
            self.module.fail_json(msg='Error: interface does not exist.')
        self.get_existing()
        self.get_proposed()

        # present or absent
        if self.state == "present" or self.state == "absent":
            if self.mode == "access":
                self.merge_access_vlan(self.interface, self.default_vlan)
            elif self.mode == "trunk":
                self.merge_trunk_vlan(
                    self.interface, self.pvid_vlan, self.trunk_vlans)
            elif self.mode == "hybrid":
                self.merge_hybrid_vlan(self.interface, self.pvid_vlan, self.tagged_vlans, self.untagged_vlans)
            else:
                self.merge_dot1qtunnel_vlan(self.interface, self.default_vlan)

        # unconfigured
        else:
            self.default_switchport(self.interface)

        self.get_end_state()
        self.results['changed'] = self.changed
        self.results['proposed'] = self.proposed
        self.results['existing'] = self.existing
        self.results['end_state'] = self.end_state
        if self.changed:
            self.results['updates'] = self.updates_cmd
        else:
            self.results['updates'] = list()

        self.module.exit_json(**self.results)


def main():
    """Module main"""

    argument_spec = dict(
        interface=dict(required=True, type='str'),
        mode=dict(choices=['access', 'trunk', 'dot1qtunnel', 'hybrid'], required=False),
        default_vlan=dict(type='str', required=False),
        pvid_vlan=dict(type='str', required=False),
        trunk_vlans=dict(type='str', required=False),
        untagged_vlans=dict(type='str', required=False),
        tagged_vlans=dict(type='str', required=False),
        state=dict(choices=['absent', 'present', 'unconfigured'],
                   default='present')
    )

    argument_spec.update(ce_argument_spec)
    switchport = SwitchPort(argument_spec)
    switchport.work()


if __name__ == '__main__':
    main()

Anon7 - 2022
AnonSec Team