Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.147.57.217
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/ovirt/ovirt/plugins/inventory/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/ovirt/ovirt/plugins/inventory/ovirt.py
# -*- coding: utf-8 -*-
# Copyright: (c) 2018, Red Hat, Inc.
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

# TODO Fix DOCUMENTATION to pass the ansible-test validate-modules
DOCUMENTATION = '''
    name: ovirt
    plugin_type: inventory
    short_description: oVirt inventory source
    version_added: "1.0.0"
    author: Bram Verschueren (@bverschueren)
    requirements:
      - ovirt-engine-sdk-python >= 4.2.4
    extends_documentation_fragment:
        - inventory_cache
        - constructed
    description:
      - Get inventory hosts from the ovirt service.
      - Requires a YAML file ending in 'ovirt.yml', 'ovirt4.yml', 'ovirt.yaml', 'ovirt4.yaml'.
    options:
      plugin:
        description: the name of this plugin, it should always be set to 'ovirt' for this plugin to recognise it as it's own.
        required: True
        choices: ['ovirt', 'ovirt.ovirt.ovirt', 'redhat.rhv.ovirt']
      ovirt_url:
        description: URL to ovirt-engine API.
        required: True
        env:
          - name: OVIRT_URL
      ovirt_username:
        description: ovirt authentication user.
        required: True
        env:
          - name: OVIRT_USERNAME
      ovirt_password:
        description: ovirt authentication password.
        required : True
        env:
          - name: OVIRT_PASSWORD
      ovirt_cafile:
        description: path to ovirt-engine CA file. If C(ovirt_cafile) parameter is not set and C(ovirt_insecure) is not True, system wide CA certificate store\
        is used.
        required: False
      ovirt_insecure:
        description: A boolean flag that indicates if the server TLS certificate and host name should be checked.
        required: False
      ovirt_query_filter:
        required: False
        description: dictionary of filter key-values to query VM's. See U(https://ovirt.github.io/ovirt-engine-sdk/master/services.m.html#ovirtsdk4\
.services.VmsService.list) for filter parameters.
      ovirt_hostname_preference:
        required: False
        description:
            - List of options that describe the ordering for which hostnames should be assigned.
            - See U(https://ovirt.github.io/ovirt-engine-api-model/master/#types/vm) for available attributes.
        default: ['fqdn', 'name']
        type: list
'''

EXAMPLES = '''
# Ensure the CA is available:
# $ wget "https://engine/ovirt-engine/services/pki-resource?resource=ca-certificate&format=X509-PEM-CA" -O /path/to/ca.pem
# Sample content of ovirt.yml:
plugin: ovirt.ovirt.ovirt
ovirt_url: https://engine/ovirt-engine/api
ovirt_cafile: /path/to/ca.pem
ovirt_username: ansible-tester
ovirt_password: secure
ovirt_query_filter:
  search: 'name=myvm AND cluster=mycluster'
  case_sensitive: no
  max: 15
keyed_groups:
  - key: cluster
    prefix: 'cluster'
groups:
  dev: "'dev' in tags"
compose:
  ansible_host: devices["eth0"][0]
'''

import sys

from ansible.plugins.inventory import BaseInventoryPlugin, Constructable, Cacheable
from ansible.errors import AnsibleError, AnsibleParserError

HAS_OVIRT_LIB = False

try:
    import ovirtsdk4 as sdk
    HAS_OVIRT_LIB = True
except ImportError:
    HAS_OVIRT_LIB = False


class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):

    NAME = 'ovirt.ovirt.ovirt'

    def _get_dict_of_struct(self, vm):
        '''  Transform SDK Vm Struct type to Python dictionary.
             :param vm: host struct of which to create dict
             :return dict of vm struct type
        '''

        vms_service = self.connection.system_service().vms_service()
        clusters_service = self.connection.system_service().clusters_service()
        vm_service = vms_service.vm_service(vm.id)
        devices = vm_service.reported_devices_service().list()
        tags = vm_service.tags_service().list()
        stats = vm_service.statistics_service().list()
        labels = vm_service.affinity_labels_service().list()
        groups = clusters_service.cluster_service(
            vm.cluster.id
        ).affinity_groups_service().list()

        return {
            'id': vm.id,
            'name': vm.name,
            'host': self.connection.follow_link(vm.host).name if vm.host else None,
            'cluster': self.connection.follow_link(vm.cluster).name,
            'status': str(vm.status),
            'description': vm.description,
            'fqdn': vm.fqdn,
            'os': vm.os.type,
            'template': self.connection.follow_link(vm.template).name,
            'creation_time': str(vm.creation_time),
            'creation_time_timestamp': float(vm.creation_time.strftime("%s.%f")),
            'tags': [tag.name for tag in tags],
            'affinity_labels': [label.name for label in labels],
            'affinity_groups': [
                group.name for group in groups
                if vm.name in [vm.name for vm in self.connection.follow_link(group.vms)]
            ],
            'statistics': dict(
                (stat.name, stat.values[0].datum if stat.values else None) for stat in stats
            ),
            'devices': dict(
                (device.name, [ip.address for ip in device.ips]) for device in devices if device.ips
            ),
        }

    def _query(self, query_filter=None):
        '''
            :param query_filter: dictionary of filter parameter/values
            :return dict of oVirt vm dicts
        '''
        return [self._get_dict_of_struct(host) for host in self._get_hosts(query_filter=query_filter)]

    def _get_hosts(self, query_filter=None):
        '''
            :param filter: dictionary of vm filter parameter/values
            :return list of oVirt vm structs
        '''

        vms_service = self.connection.system_service().vms_service()
        if query_filter is not None:
            return vms_service.list(**query_filter)
        return vms_service.list()

    def _get_query_options(self, param_dict):
        ''' Get filter parameters and cast these to comply with sdk VmsService.list param types
            :param param_dict: dictionary of filter parameters and values
            :return dictionary with casted parameter/value
        '''
        if param_dict is None:
            return None

        FILTER_MAPPING = {
            'all_content': bool,
            'case_sensitive': bool,
            'filter': bool,
            'follow': str,
            'max': int,
            'search': str
        }

        casted_dict = {}

        for (param, value) in param_dict.items():
            try:
                casted_dict[param] = FILTER_MAPPING[param](value)
            except KeyError:
                raise AnsibleError("Unknown filter option '{0}'".format(param))

        return casted_dict

    def _get_hostname(self, host):
        '''
          Get the host's hostname based on prefered attribute
          :param host: dict representation of oVirt VmStruct
          :param return: preferred hostname for the host
        '''
        hostname_preference = self.get_option('ovirt_hostname_preference')
        if not hostname_preference:
            raise AnsibleParserError('Invalid value for option ovirt_hostname_preference: {0}'.format(hostname_preference))
        hostname = None

        for preference in hostname_preference:
            hostname = host.get(preference)
            if hostname is not None:
                return hostname

        raise AnsibleParserError("No valid name found for host id={0}".format(host.get('id')))

    def _populate_from_source(self, source_data):

        for host in source_data:

            hostname = self._get_hostname(host)

            self.inventory.add_host(hostname)

            for fact, value in host.items():
                self.inventory.set_variable(hostname, fact, value)

            strict = self.get_option('strict')
            self._set_composite_vars(self.get_option('compose'), host, hostname, strict=strict)
            self._add_host_to_composed_groups(self.get_option('groups'), host, hostname, strict=strict)
            self._add_host_to_keyed_groups(self.get_option('keyed_groups'), host, hostname, strict=strict)

    def verify_file(self, path):

        valid = False
        if super(InventoryModule, self).verify_file(path):
            if path.endswith(('ovirt.yml', 'ovirt4.yml', 'ovirt.yaml', 'ovirt4.yaml')):
                valid = True
        return valid

    def parse(self, inventory, loader, path, cache=True):

        if not HAS_OVIRT_LIB:
            raise AnsibleError('oVirt inventory script requires ovirt-engine-sdk-python >= 4.2.4')

        super(InventoryModule, self).parse(inventory, loader, path, cache)

        config = self._read_config_data(path)

        self.connection = sdk.Connection(
            url=self.get_option('ovirt_url'),
            username=self.get_option('ovirt_username'),
            password=self.get_option('ovirt_password'),
            ca_file=self.get_option('ovirt_cafile'),
            insecure=self.get_option('ovirt_insecure') if self.get_option('ovirt_insecure') is not None else not self.get_option('ovirt_cafile'),
        )

        query_filter = self._get_query_options(self.get_option('ovirt_query_filter', None))

        cache_key = self.get_cache_key(path)
        source_data = None

        user_cache_setting = self.get_option('cache')
        attempt_to_read_cache = user_cache_setting and cache
        cache_needs_update = user_cache_setting and not cache

        if attempt_to_read_cache:
            try:
                source_data = self._cache[cache_key]
            except KeyError:
                cache_needs_update = True

        if source_data is None:
            source_data = self._query(query_filter=query_filter)

        if cache_needs_update:
            self._cache[cache_key] = source_data

        self._populate_from_source(source_data)
        self.connection.close()

Anon7 - 2022
AnonSec Team