Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.221.217.100
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/kubernetes/core/plugins/inventory/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/kubernetes/core/plugins/inventory//k8s.py
# Copyright (c) 2018 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import absolute_import, division, print_function

__metaclass__ = type

DOCUMENTATION = """
    name: k8s
    author:
      - Chris Houseknecht (@chouseknecht)
      - Fabian von Feilitzsch (@fabianvf)

    short_description: Kubernetes (K8s) inventory source

    description:
      - Fetch containers and services for one or more clusters.
      - Groups by cluster name, namespace, namespace_services, namespace_pods, and labels.
      - Uses the kubectl connection plugin to access the Kubernetes cluster.
      - Uses k8s.(yml|yaml) YAML configuration file to set parameter values.

    options:
      plugin:
         description: token that ensures this is a source file for the 'k8s' plugin.
         required: True
         choices: ['kubernetes.core.k8s', 'k8s', 'community.kubernetes.k8s']
      connections:
          description:
          - Optional list of cluster connection settings. If no connections are provided, the default
            I(~/.kube/config) and active context will be used, and objects will be returned for all namespaces
            the active user is authorized to access.
          suboptions:
              name:
                  description:
                  - Optional name to assign to the cluster. If not provided, a name is constructed from the server
                    and port.
              kubeconfig:
                  description:
                  - Path to an existing Kubernetes config file. If not provided, and no other connection
                    options are provided, the Kubernetes client will attempt to load the default
                    configuration file from I(~/.kube/config). Can also be specified via K8S_AUTH_KUBECONFIG
                    environment variable.
              context:
                  description:
                  - The name of a context found in the config file. Can also be specified via K8S_AUTH_CONTEXT environment
                    variable.
              host:
                  description:
                  - Provide a URL for accessing the API. Can also be specified via K8S_AUTH_HOST environment variable.
              api_key:
                  description:
                  - Token used to authenticate with the API. Can also be specified via K8S_AUTH_API_KEY environment
                    variable.
              username:
                  description:
                  - Provide a username for authenticating with the API. Can also be specified via K8S_AUTH_USERNAME
                    environment variable.
              password:
                  description:
                  - Provide a password for authenticating with the API. Can also be specified via K8S_AUTH_PASSWORD
                    environment variable.
              client_cert:
                  description:
                  - Path to a certificate used to authenticate with the API. Can also be specified via K8S_AUTH_CERT_FILE
                    environment variable.
                  aliases: [ cert_file ]
              client_key:
                  description:
                  - Path to a key file used to authenticate with the API. Can also be specified via K8S_AUTH_KEY_FILE
                    environment variable.
                  aliases: [ key_file ]
              ca_cert:
                  description:
                  - Path to a CA certificate used to authenticate with the API. Can also be specified via
                    K8S_AUTH_SSL_CA_CERT environment variable.
                  aliases: [ ssl_ca_cert ]
              validate_certs:
                  description:
                  - "Whether or not to verify the API server's SSL certificates. Can also be specified via
                    K8S_AUTH_VERIFY_SSL environment variable."
                  type: bool
                  aliases: [ verify_ssl ]
              namespaces:
                  description:
                  - List of namespaces. If not specified, will fetch all containers for all namespaces user is authorized
                    to access.

    requirements:
    - "python >= 3.6"
    - "kubernetes >= 12.0.0"
    - "PyYAML >= 3.11"
"""

EXAMPLES = """
# File must be named k8s.yaml or k8s.yml

# Authenticate with token, and return all pods and services for all namespaces
plugin: kubernetes.core.k8s
connections:
  - host: https://192.168.64.4:8443
    api_key: xxxxxxxxxxxxxxxx
    validate_certs: false

# Use default config (~/.kube/config) file and active context, and return objects for a specific namespace
plugin: kubernetes.core.k8s
connections:
  - namespaces:
    - testing

# Use a custom config file, and a specific context.
plugin: kubernetes.core.k8s
connections:
  - kubeconfig: /path/to/config
    context: 'awx/192-168-64-4:8443/developer'
"""

import json

from ansible.errors import AnsibleError
from ansible_collections.kubernetes.core.plugins.module_utils.common import (
    HAS_K8S_MODULE_HELPER,
    k8s_import_exception,
)
from ansible_collections.kubernetes.core.plugins.module_utils.k8s.client import (
    get_api_client,
)
from ansible.plugins.inventory import BaseInventoryPlugin, Constructable, Cacheable

try:
    from kubernetes.dynamic.exceptions import DynamicApiError
except ImportError:
    pass


def format_dynamic_api_exc(exc):
    if exc.body:
        if exc.headers and exc.headers.get("Content-Type") == "application/json":
            message = json.loads(exc.body).get("message")
            if message:
                return message
        return exc.body
    else:
        return "%s Reason: %s" % (exc.status, exc.reason)


class K8sInventoryException(Exception):
    pass


class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
    NAME = "kubernetes.core.k8s"

    connection_plugin = "kubernetes.core.kubectl"
    transport = "kubectl"

    def parse(self, inventory, loader, path, cache=True):
        super(InventoryModule, self).parse(inventory, loader, path)
        cache_key = self._get_cache_prefix(path)
        config_data = self._read_config_data(path)
        self.setup(config_data, cache, cache_key)

    def setup(self, config_data, cache, cache_key):
        connections = config_data.get("connections")

        if not HAS_K8S_MODULE_HELPER:
            raise K8sInventoryException(
                "This module requires the Kubernetes Python client. Try `pip install kubernetes`. Detail: {0}".format(
                    k8s_import_exception
                )
            )

        source_data = None
        if cache and cache_key in self._cache:
            try:
                source_data = self._cache[cache_key]
            except KeyError:
                pass

        if not source_data:
            self.fetch_objects(connections)

    def fetch_objects(self, connections):

        if connections:
            if not isinstance(connections, list):
                raise K8sInventoryException("Expecting connections to be a list.")

            for connection in connections:
                if not isinstance(connection, dict):
                    raise K8sInventoryException(
                        "Expecting connection to be a dictionary."
                    )
                client = get_api_client(**connection)
                name = connection.get(
                    "name", self.get_default_host_name(client.configuration.host)
                )
                if connection.get("namespaces"):
                    namespaces = connection["namespaces"]
                else:
                    namespaces = self.get_available_namespaces(client)
                for namespace in namespaces:
                    self.get_pods_for_namespace(client, name, namespace)
                    self.get_services_for_namespace(client, name, namespace)
        else:
            client = get_api_client()
            name = self.get_default_host_name(client.configuration.host)
            namespaces = self.get_available_namespaces(client)
            for namespace in namespaces:
                self.get_pods_for_namespace(client, name, namespace)
                self.get_services_for_namespace(client, name, namespace)

    @staticmethod
    def get_default_host_name(host):
        return (
            host.replace("https://", "")
            .replace("http://", "")
            .replace(".", "-")
            .replace(":", "_")
        )

    def get_available_namespaces(self, client):
        v1_namespace = client.resources.get(api_version="v1", kind="Namespace")
        try:
            obj = v1_namespace.get()
        except DynamicApiError as exc:
            self.display.debug(exc)
            raise K8sInventoryException(
                "Error fetching Namespace list: %s" % format_dynamic_api_exc(exc)
            )
        return [namespace.metadata.name for namespace in obj.items]

    def get_pods_for_namespace(self, client, name, namespace):
        v1_pod = client.resources.get(api_version="v1", kind="Pod")
        try:
            obj = v1_pod.get(namespace=namespace)
        except DynamicApiError as exc:
            self.display.debug(exc)
            raise K8sInventoryException(
                "Error fetching Pod list: %s" % format_dynamic_api_exc(exc)
            )

        namespace_group = "namespace_{0}".format(namespace)
        namespace_pods_group = "{0}_pods".format(namespace_group)

        self.inventory.add_group(name)
        self.inventory.add_group(namespace_group)
        self.inventory.add_child(name, namespace_group)
        self.inventory.add_group(namespace_pods_group)
        self.inventory.add_child(namespace_group, namespace_pods_group)

        for pod in obj.items:
            pod_name = pod.metadata.name
            pod_groups = []
            pod_annotations = (
                {} if not pod.metadata.annotations else dict(pod.metadata.annotations)
            )

            if pod.metadata.labels:
                # create a group for each label_value
                for key, value in pod.metadata.labels:
                    group_name = "label_{0}_{1}".format(key, value)
                    if group_name not in pod_groups:
                        pod_groups.append(group_name)
                    self.inventory.add_group(group_name)
                pod_labels = dict(pod.metadata.labels)
            else:
                pod_labels = {}

            if not pod.status.containerStatuses:
                continue

            for container in pod.status.containerStatuses:
                # add each pod_container to the namespace group, and to each label_value group
                container_name = "{0}_{1}".format(pod.metadata.name, container.name)
                self.inventory.add_host(container_name)
                self.inventory.add_child(namespace_pods_group, container_name)
                if pod_groups:
                    for group in pod_groups:
                        self.inventory.add_child(group, container_name)

                # Add hostvars
                self.inventory.set_variable(container_name, "object_type", "pod")
                self.inventory.set_variable(container_name, "labels", pod_labels)
                self.inventory.set_variable(
                    container_name, "annotations", pod_annotations
                )
                self.inventory.set_variable(
                    container_name, "cluster_name", pod.metadata.clusterName
                )
                self.inventory.set_variable(
                    container_name, "pod_node_name", pod.spec.nodeName
                )
                self.inventory.set_variable(container_name, "pod_name", pod.spec.name)
                self.inventory.set_variable(
                    container_name, "pod_host_ip", pod.status.hostIP
                )
                self.inventory.set_variable(
                    container_name, "pod_phase", pod.status.phase
                )
                self.inventory.set_variable(container_name, "pod_ip", pod.status.podIP)
                self.inventory.set_variable(
                    container_name, "pod_self_link", pod.metadata.selfLink
                )
                self.inventory.set_variable(
                    container_name, "pod_resource_version", pod.metadata.resourceVersion
                )
                self.inventory.set_variable(container_name, "pod_uid", pod.metadata.uid)
                self.inventory.set_variable(
                    container_name, "container_name", container.image
                )
                self.inventory.set_variable(
                    container_name, "container_image", container.image
                )
                if container.state.running:
                    self.inventory.set_variable(
                        container_name, "container_state", "Running"
                    )
                if container.state.terminated:
                    self.inventory.set_variable(
                        container_name, "container_state", "Terminated"
                    )
                if container.state.waiting:
                    self.inventory.set_variable(
                        container_name, "container_state", "Waiting"
                    )
                self.inventory.set_variable(
                    container_name, "container_ready", container.ready
                )
                self.inventory.set_variable(
                    container_name, "ansible_remote_tmp", "/tmp/"
                )
                self.inventory.set_variable(
                    container_name, "ansible_connection", self.connection_plugin
                )
                self.inventory.set_variable(
                    container_name, "ansible_{0}_pod".format(self.transport), pod_name
                )
                self.inventory.set_variable(
                    container_name,
                    "ansible_{0}_container".format(self.transport),
                    container.name,
                )
                self.inventory.set_variable(
                    container_name,
                    "ansible_{0}_namespace".format(self.transport),
                    namespace,
                )

    def get_services_for_namespace(self, client, name, namespace):
        v1_service = client.resources.get(api_version="v1", kind="Service")
        try:
            obj = v1_service.get(namespace=namespace)
        except DynamicApiError as exc:
            self.display.debug(exc)
            raise K8sInventoryException(
                "Error fetching Service list: %s" % format_dynamic_api_exc(exc)
            )

        namespace_group = "namespace_{0}".format(namespace)
        namespace_services_group = "{0}_services".format(namespace_group)

        self.inventory.add_group(name)
        self.inventory.add_group(namespace_group)
        self.inventory.add_child(name, namespace_group)
        self.inventory.add_group(namespace_services_group)
        self.inventory.add_child(namespace_group, namespace_services_group)

        for service in obj.items:
            service_name = service.metadata.name
            service_labels = (
                {} if not service.metadata.labels else dict(service.metadata.labels)
            )
            service_annotations = (
                {}
                if not service.metadata.annotations
                else dict(service.metadata.annotations)
            )

            self.inventory.add_host(service_name)

            if service.metadata.labels:
                # create a group for each label_value
                for key, value in service.metadata.labels:
                    group_name = "label_{0}_{1}".format(key, value)
                    self.inventory.add_group(group_name)
                    self.inventory.add_child(group_name, service_name)

            try:
                self.inventory.add_child(namespace_services_group, service_name)
            except AnsibleError:
                raise

            ports = [
                {
                    "name": port.name,
                    "port": port.port,
                    "protocol": port.protocol,
                    "targetPort": port.targetPort,
                    "nodePort": port.nodePort,
                }
                for port in service.spec.ports or []
            ]

            # add hostvars
            self.inventory.set_variable(service_name, "object_type", "service")
            self.inventory.set_variable(service_name, "labels", service_labels)
            self.inventory.set_variable(
                service_name, "annotations", service_annotations
            )
            self.inventory.set_variable(
                service_name, "cluster_name", service.metadata.clusterName
            )
            self.inventory.set_variable(service_name, "ports", ports)
            self.inventory.set_variable(service_name, "type", service.spec.type)
            self.inventory.set_variable(
                service_name, "self_link", service.metadata.selfLink
            )
            self.inventory.set_variable(
                service_name, "resource_version", service.metadata.resourceVersion
            )
            self.inventory.set_variable(service_name, "uid", service.metadata.uid)

            if service.spec.externalTrafficPolicy:
                self.inventory.set_variable(
                    service_name,
                    "external_traffic_policy",
                    service.spec.externalTrafficPolicy,
                )
            if service.spec.externalIPs:
                self.inventory.set_variable(
                    service_name, "external_ips", service.spec.externalIPs
                )

            if service.spec.externalName:
                self.inventory.set_variable(
                    service_name, "external_name", service.spec.externalName
                )

            if service.spec.healthCheckNodePort:
                self.inventory.set_variable(
                    service_name,
                    "health_check_node_port",
                    service.spec.healthCheckNodePort,
                )
            if service.spec.loadBalancerIP:
                self.inventory.set_variable(
                    service_name, "load_balancer_ip", service.spec.loadBalancerIP
                )
            if service.spec.selector:
                self.inventory.set_variable(
                    service_name, "selector", dict(service.spec.selector)
                )

            if (
                hasattr(service.status.loadBalancer, "ingress")
                and service.status.loadBalancer.ingress
            ):
                load_balancer = [
                    {"hostname": ingress.hostname, "ip": ingress.ip}
                    for ingress in service.status.loadBalancer.ingress
                ]
                self.inventory.set_variable(
                    service_name, "load_balancer", load_balancer
                )

Anon7 - 2022
AnonSec Team