Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.148.144.139
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/3/root/lib/python3/dist-packages/ansible_collections/cisco/mso/plugins/module_utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/3/root/lib/python3/dist-packages/ansible_collections/cisco/mso/plugins/module_utils/schema.py
# -*- coding: utf-8 -*-

# Copyright: (c) 2022, Akini Ross (@akinross) <akinross@cisco.com>
# GNU General Public License v3.0+ (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import absolute_import, division, print_function

__metaclass__ = type

from collections import namedtuple

KVPair = namedtuple("KVPair", "key value")
Item = namedtuple("Item", "index details")


class MSOSchema:
    def __init__(self, mso_module, schema_name, template_name=None, site_name=None):
        self.mso = mso_module
        self.schema_name = schema_name
        self.id, self.path, self.schema = mso_module.query_schema(schema_name)
        self.schema_objects = {}
        if template_name:
            self.set_template(template_name)
        if site_name and template_name:
            self.set_site(template_name, site_name)

    @staticmethod
    def get_object_from_list(search_list, kv_list):
        """
        Get the first matched object from a list of mso object dictionaries.
        :param search_list: Objects to search through -> List.
        :param kv_list: Key/value pairs that should match in the object. -> List[KVPair(Str, Str)]
        :return: The index and details of the object. -> Item (Named Tuple)
                 Values of provided keys of all existing objects. -> List
        """

        def kv_match(kvs, item):
            return all((item.get(kv.key) == kv.value for kv in kvs))

        match = next((Item(index, item) for index, item in enumerate(search_list) if kv_match(kv_list, item)), None)
        existing = [item.get(kv.key) for item in search_list for kv in kv_list]
        return match, existing

    def validate_schema_objects_present(self, required_schema_objects):
        """
        Validate that attributes are set to a value that is not equal None.
        :param required_schema_objects: List of schema objects to verify. -> List
        :return: None
        """
        for schema_object in required_schema_objects:
            if schema_object not in self.schema_objects.keys():
                msg = "Required attribute '{0}' is not specified on schema instance with name {1}".format(schema_object, self.schema_name)
                self.mso.fail_json(msg=msg)

    def set_template(self, template_name, fail_module=True):
        """
        Get template item that matches the name of a template.
        :param template_name: Name of the template to match. -> Str
        :param fail_module: When match is not found fail the ansible module. -> Bool
        :return: Template item. -> Item(Int, Dict) | None
        """

        kv_list = [KVPair("name", template_name)]
        match, existing = self.get_object_from_list(self.schema.get("templates"), kv_list)
        if not match and fail_module:
            msg = "Provided template '{0}' not matching existing template(s): {1}".format(template_name, ", ".join(existing))
            self.mso.fail_json(msg=msg)
        self.schema_objects["template"] = match

    def set_template_bd(self, bd, fail_module=True):
        """
        Get template bridge domain item that matches the name of a bd.
        :param bd: Name of the bd to match. -> Str
        :param fail_module: When match is not found fail the ansible module. -> Bool
        :return: Template bd item. -> Item(Int, Dict) | None
        """
        self.validate_schema_objects_present(["template"])
        kv_list = [KVPair("name", bd)]
        match, existing = self.get_object_from_list(self.schema_objects["template"].details.get("bds"), kv_list)
        if not match and fail_module:
            msg = "Provided BD '{0}' not matching existing bd(s): {1}".format(bd, ", ".join(existing))
            self.mso.fail_json(msg=msg)
        self.schema_objects["template_bd"] = match

    def set_template_anp(self, anp, fail_module=True):
        """
        Get template application profile item that matches the name of an anp.
        :param anp: Name of the anp to match. -> Str
        :param fail_module: When match is not found fail the ansible module. -> Bool
        :return: Template anp item. -> Item(Int, Dict) | None
        """
        self.validate_schema_objects_present(["template"])
        kv_list = [KVPair("name", anp)]
        match, existing = self.get_object_from_list(self.schema_objects["template"].details.get("anps"), kv_list)
        if not match and fail_module:
            msg = "Provided ANP '{0}' not matching existing anp(s): {1}".format(anp, ", ".join(existing))
            self.mso.fail_json(msg=msg)
        self.schema_objects["template_anp"] = match

    def set_template_anp_epg(self, epg, fail_module=True):
        """
        Get template endpoint group item that matches the name of an epg.
        :param epg: Name of the epg to match. -> Str
        :param fail_module: When match is not found fail the ansible module. -> Bool
        :return: Template epg item. -> Item(Int, Dict) | None
        """
        self.validate_schema_objects_present(["template_anp"])
        kv_list = [KVPair("name", epg)]
        match, existing = self.get_object_from_list(self.schema_objects["template_anp"].details.get("epgs"), kv_list)
        if not match and fail_module:
            msg = "Provided EPG '{0}' not matching existing epg(s): {1}".format(epg, ", ".join(existing))
            self.mso.fail_json(msg=msg)
        self.schema_objects["template_anp_epg"] = match

    def set_template_external_epg(self, external_epg, fail_module=True):
        """
        Get template external epg item that matches the name of an anp.
        :param anp: Name of the anp to match. -> Str
        :param fail_module: When match is not found fail the ansible module. -> Bool
        :return: Template anp item. -> Item(Int, Dict) | None
        """
        self.validate_schema_objects_present(["template"])
        kv_list = [KVPair("name", external_epg)]
        match, existing = self.get_object_from_list(self.schema_objects["template"].details.get("externalEpgs"), kv_list)
        if not match and fail_module:
            msg = "Provided External EPG '{0}' not matching existing external_epg(s): {1}".format(external_epg, ", ".join(existing))
            self.mso.fail_json(msg=msg)
        self.schema_objects["template_external_epg"] = match

    def set_site(self, template_name, site_name, fail_module=True):
        """
        Get site item that matches the name of a site.
        :param template_name: Name of the template to match. -> Str
        :param site_name: Name of the site to match. -> Str
        :param fail_module: When match is not found fail the ansible module. -> Bool
        :return: Site item. -> Item(Int, Dict) | None
        """
        if not self.schema.get("sites"):
            msg = "No sites associated with schema '{0}'. Associate the site with the schema using (M) mso_schema_site.".format(self.schema_name)
            self.mso.fail_json(msg=msg)

        kv_list = [KVPair("siteId", self.mso.lookup_site(site_name)), KVPair("templateName", template_name)]
        match, existing = self.get_object_from_list(self.schema.get("sites"), kv_list)
        if not match and fail_module:
            msg = "Provided site '{0}' not associated with template '{1}'. Site is currently associated with template(s): {2}".format(
                site_name, template_name, ", ".join(existing[1::2])
            )
            self.mso.fail_json(msg=msg)
        self.schema_objects["site"] = match

    def set_site_bd(self, bd_name, fail_module=True):
        """
        Get site bridge domain item that matches the name of a bd.
        :param bd_name: Name of the bd to match. -> Str
        :param fail_module: When match is not found fail the ansible module. -> Bool
        :return: Site bd item. -> Item(Int, Dict) | None
        """
        self.validate_schema_objects_present(["template", "site"])
        kv_list = [KVPair("bdRef", self.mso.bd_ref(schema_id=self.id, template=self.schema_objects["template"].details.get("name"), bd=bd_name))]
        match, existing = self.get_object_from_list(self.schema_objects["site"].details.get("bds"), kv_list)
        if not match and fail_module:
            msg = "Provided BD '{0}' not matching existing site bd(s): {1}".format(bd_name, ", ".join(existing))
            self.mso.fail_json(msg=msg)
        self.schema_objects["site_bd"] = match

    def set_site_bd_subnet(self, subnet, fail_module=True):
        """
        Get site bridge domain subnet item that matches the ip of a subnet.
        :param subnet: Subnet (ip) to match. -> Str
        :param fail_module: When match is not found fail the ansible module. -> Bool
        :return: Site bd subnet item. -> Item(Int, Dict) | None
        """
        self.validate_schema_objects_present(["site_bd"])
        kv_list = [KVPair("ip", subnet)]
        match, existing = self.get_object_from_list(self.schema_objects["site_bd"].details.get("subnets"), kv_list)
        if not match and fail_module:
            msg = "Provided subnet '{0}' not matching existing site bd subnet(s): {1}".format(subnet, ", ".join(existing))
            self.mso.fail_json(msg=msg)
        self.schema_objects["site_bd_subnet"] = match

    def set_site_anp(self, anp_name, fail_module=True):
        """
        Get site application profile item that matches the name of a anp.
        :param anp_name: Name of the anp to match. -> Str
        :param fail_module: When match is not found fail the ansible module. -> Bool
        :return: Site anp item. -> Item(Int, Dict) | None
        """
        self.validate_schema_objects_present(["template_anp", "site"])
        kv_list = [KVPair("anpRef", self.schema_objects["template_anp"].details.get("anpRef"))]
        match, existing = self.get_object_from_list(self.schema_objects["site"].details.get("anps"), kv_list)
        if not match and fail_module:
            msg = "Provided ANP '{0}' not matching existing site anp(s): {1}".format(anp_name, ", ".join(existing))
            self.mso.fail_json(msg=msg)
        self.schema_objects["site_anp"] = match

    def set_site_anp_epg(self, epg_name, fail_module=True):
        """
        Get site anp epg item that matches the epgs.
        :param epg: epg to match. -> Str
        :param fail_module: When match is not found fail the ansible module. -> Bool
        :return: Site anp epg item. -> Item(Int, Dict) | None
        """
        self.validate_schema_objects_present(["site_anp", "template_anp_epg"])
        kv_list = [KVPair("epgRef", self.schema_objects["template_anp_epg"].details.get("epgRef"))]
        match, existing = self.get_object_from_list(self.schema_objects["site_anp"].details.get("epgs"), kv_list)
        if not match and fail_module:
            msg = "Provided EPG '{0}' not matching existing site anp epg(s): {1}".format(epg_name, ", ".join(existing))
            self.mso.fail_json(msg=msg)
        self.schema_objects["site_anp_epg"] = match

Anon7 - 2022
AnonSec Team