Server IP : 85.214.239.14 / Your IP : 18.117.10.207 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/root/lib/python3/dist-packages/ansible/vars/ |
Upload File : |
# Copyright (c) 2018 Ansible Project # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # Make coding more python3-ish from __future__ import (absolute_import, division, print_function) __metaclass__ = type import os from ansible import constants as C from ansible.errors import AnsibleError from ansible.inventory.host import Host from ansible.module_utils._text import to_bytes from ansible.plugins.loader import vars_loader from ansible.utils.collection_loader import AnsibleCollectionRef from ansible.utils.display import Display from ansible.utils.vars import combine_vars display = Display() def get_plugin_vars(loader, plugin, path, entities): data = {} try: data = plugin.get_vars(loader, path, entities) except AttributeError: try: for entity in entities: if isinstance(entity, Host): data |= plugin.get_host_vars(entity.name) else: data |= plugin.get_group_vars(entity.name) except AttributeError: if hasattr(plugin, 'run'): raise AnsibleError("Cannot use v1 type vars plugin %s from %s" % (plugin._load_name, plugin._original_path)) else: raise AnsibleError("Invalid vars plugin %s from %s" % (plugin._load_name, plugin._original_path)) return data def get_vars_from_path(loader, path, entities, stage): data = {} vars_plugin_list = list(vars_loader.all()) for plugin_name in C.VARIABLE_PLUGINS_ENABLED: if AnsibleCollectionRef.is_valid_fqcr(plugin_name): vars_plugin = vars_loader.get(plugin_name) if vars_plugin is None: # Error if there's no play directory or the name is wrong? continue if vars_plugin not in vars_plugin_list: vars_plugin_list.append(vars_plugin) for plugin in vars_plugin_list: # legacy plugins always run by default, but they can set REQUIRES_ENABLED=True to opt out. builtin_or_legacy = plugin.ansible_name.startswith('ansible.builtin.') or '.' not in plugin.ansible_name # builtin is supposed to have REQUIRES_ENABLED=True, the following is for legacy plugins... needs_enabled = not builtin_or_legacy if hasattr(plugin, 'REQUIRES_ENABLED'): needs_enabled = plugin.REQUIRES_ENABLED elif hasattr(plugin, 'REQUIRES_WHITELIST'): display.deprecated("The VarsModule class variable 'REQUIRES_WHITELIST' is deprecated. " "Use 'REQUIRES_ENABLED' instead.", version=2.18) needs_enabled = plugin.REQUIRES_WHITELIST # A collection plugin was enabled to get to this point because vars_loader.all() does not include collection plugins. # Warn if a collection plugin has REQUIRES_ENABLED because it has no effect. if not builtin_or_legacy and (hasattr(plugin, 'REQUIRES_ENABLED') or hasattr(plugin, 'REQUIRES_WHITELIST')): display.warning( "Vars plugins in collections must be enabled to be loaded, REQUIRES_ENABLED is not supported. " "This should be removed from the plugin %s." % plugin.ansible_name ) elif builtin_or_legacy and needs_enabled and not plugin.matches_name(C.VARIABLE_PLUGINS_ENABLED): continue has_stage = hasattr(plugin, 'get_option') and plugin.has_option('stage') # if a plugin-specific setting has not been provided, use the global setting # older/non shipped plugins that don't support the plugin-specific setting should also use the global setting use_global = (has_stage and plugin.get_option('stage') is None) or not has_stage if use_global: if C.RUN_VARS_PLUGINS == 'demand' and stage == 'inventory': continue elif C.RUN_VARS_PLUGINS == 'start' and stage == 'task': continue elif has_stage and plugin.get_option('stage') not in ('all', stage): continue data = combine_vars(data, get_plugin_vars(loader, plugin, path, entities)) return data def get_vars_from_inventory_sources(loader, sources, entities, stage): data = {} for path in sources: if path is None: continue if ',' in path and not os.path.exists(path): # skip host lists continue elif not os.path.isdir(to_bytes(path)): # always pass the directory of the inventory source file path = os.path.dirname(path) data = combine_vars(data, get_vars_from_path(loader, path, entities, stage)) return data