Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.133.152.151
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/ansible/playbook/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/ansible/playbook//block.py
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.

# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import ansible.constants as C
from ansible.errors import AnsibleParserError
from ansible.playbook.attribute import FieldAttribute, NonInheritableFieldAttribute
from ansible.playbook.base import Base
from ansible.playbook.conditional import Conditional
from ansible.playbook.collectionsearch import CollectionSearch
from ansible.playbook.helpers import load_list_of_tasks
from ansible.playbook.role import Role
from ansible.playbook.taggable import Taggable
from ansible.utils.sentinel import Sentinel


class Block(Base, Conditional, CollectionSearch, Taggable):

    # main block fields containing the task lists
    block = NonInheritableFieldAttribute(isa='list', default=list)
    rescue = NonInheritableFieldAttribute(isa='list', default=list)
    always = NonInheritableFieldAttribute(isa='list', default=list)

    # other fields for task compat
    notify = FieldAttribute(isa='list')
    delegate_to = FieldAttribute(isa='string')
    delegate_facts = FieldAttribute(isa='bool')

    # for future consideration? this would be functionally
    # similar to the 'else' clause for exceptions
    # otherwise = FieldAttribute(isa='list')

    def __init__(self, play=None, parent_block=None, role=None, task_include=None, use_handlers=False, implicit=False):
        self._play = play
        self._role = role
        self._parent = None
        self._dep_chain = None
        self._use_handlers = use_handlers
        self._implicit = implicit

        if task_include:
            self._parent = task_include
        elif parent_block:
            self._parent = parent_block

        super(Block, self).__init__()

    def __repr__(self):
        return "BLOCK(uuid=%s)(id=%s)(parent=%s)" % (self._uuid, id(self), self._parent)

    def __eq__(self, other):
        '''object comparison based on _uuid'''
        return self._uuid == other._uuid

    def __ne__(self, other):
        '''object comparison based on _uuid'''
        return self._uuid != other._uuid

    def get_vars(self):
        '''
        Blocks do not store variables directly, however they may be a member
        of a role or task include which does, so return those if present.
        '''

        all_vars = {}

        if self._parent:
            all_vars |= self._parent.get_vars()

        all_vars |= self.vars.copy()

        return all_vars

    @staticmethod
    def load(data, play=None, parent_block=None, role=None, task_include=None, use_handlers=False, variable_manager=None, loader=None):
        implicit = not Block.is_block(data)
        b = Block(play=play, parent_block=parent_block, role=role, task_include=task_include, use_handlers=use_handlers, implicit=implicit)
        return b.load_data(data, variable_manager=variable_manager, loader=loader)

    @staticmethod
    def is_block(ds):
        is_block = False
        if isinstance(ds, dict):
            for attr in ('block', 'rescue', 'always'):
                if attr in ds:
                    is_block = True
                    break
        return is_block

    def preprocess_data(self, ds):
        '''
        If a simple task is given, an implicit block for that single task
        is created, which goes in the main portion of the block
        '''

        if not Block.is_block(ds):
            if isinstance(ds, list):
                return super(Block, self).preprocess_data(dict(block=ds))
            else:
                return super(Block, self).preprocess_data(dict(block=[ds]))

        return super(Block, self).preprocess_data(ds)

    def _load_block(self, attr, ds):
        try:
            return load_list_of_tasks(
                ds,
                play=self._play,
                block=self,
                role=self._role,
                task_include=None,
                variable_manager=self._variable_manager,
                loader=self._loader,
                use_handlers=self._use_handlers,
            )
        except AssertionError as e:
            raise AnsibleParserError("A malformed block was encountered while loading a block", obj=self._ds, orig_exc=e)

    def _load_rescue(self, attr, ds):
        try:
            return load_list_of_tasks(
                ds,
                play=self._play,
                block=self,
                role=self._role,
                task_include=None,
                variable_manager=self._variable_manager,
                loader=self._loader,
                use_handlers=self._use_handlers,
            )
        except AssertionError as e:
            raise AnsibleParserError("A malformed block was encountered while loading rescue.", obj=self._ds, orig_exc=e)

    def _load_always(self, attr, ds):
        try:
            return load_list_of_tasks(
                ds,
                play=self._play,
                block=self,
                role=self._role,
                task_include=None,
                variable_manager=self._variable_manager,
                loader=self._loader,
                use_handlers=self._use_handlers,
            )
        except AssertionError as e:
            raise AnsibleParserError("A malformed block was encountered while loading always", obj=self._ds, orig_exc=e)

    def _validate_always(self, attr, name, value):
        if value and not self.block:
            raise AnsibleParserError("'%s' keyword cannot be used without 'block'" % name, obj=self._ds)

    _validate_rescue = _validate_always

    def get_dep_chain(self):
        if self._dep_chain is None:
            if self._parent:
                return self._parent.get_dep_chain()
            else:
                return None
        else:
            return self._dep_chain[:]

    def copy(self, exclude_parent=False, exclude_tasks=False):
        def _dupe_task_list(task_list, new_block):
            new_task_list = []
            for task in task_list:
                new_task = task.copy(exclude_parent=True)
                if task._parent:
                    new_task._parent = task._parent.copy(exclude_tasks=True)
                    if task._parent == new_block:
                        # If task._parent is the same as new_block, just replace it
                        new_task._parent = new_block
                    else:
                        # task may not be a direct child of new_block, search for the correct place to insert new_block
                        cur_obj = new_task._parent
                        while cur_obj._parent and cur_obj._parent != new_block:
                            cur_obj = cur_obj._parent

                        cur_obj._parent = new_block
                else:
                    new_task._parent = new_block
                new_task_list.append(new_task)
            return new_task_list

        new_me = super(Block, self).copy()
        new_me._play = self._play
        new_me._use_handlers = self._use_handlers

        if self._dep_chain is not None:
            new_me._dep_chain = self._dep_chain[:]

        new_me._parent = None
        if self._parent and not exclude_parent:
            new_me._parent = self._parent.copy(exclude_tasks=True)

        if not exclude_tasks:
            new_me.block = _dupe_task_list(self.block or [], new_me)
            new_me.rescue = _dupe_task_list(self.rescue or [], new_me)
            new_me.always = _dupe_task_list(self.always or [], new_me)

        new_me._role = None
        if self._role:
            new_me._role = self._role

        new_me.validate()
        return new_me

    def serialize(self):
        '''
        Override of the default serialize method, since when we're serializing
        a task we don't want to include the attribute list of tasks.
        '''

        data = dict()
        for attr in self.fattributes:
            if attr not in ('block', 'rescue', 'always'):
                data[attr] = getattr(self, attr)

        data['dep_chain'] = self.get_dep_chain()

        if self._role is not None:
            data['role'] = self._role.serialize()
        if self._parent is not None:
            data['parent'] = self._parent.copy(exclude_tasks=True).serialize()
            data['parent_type'] = self._parent.__class__.__name__

        return data

    def deserialize(self, data):
        '''
        Override of the default deserialize method, to match the above overridden
        serialize method
        '''

        # import is here to avoid import loops
        from ansible.playbook.task_include import TaskInclude
        from ansible.playbook.handler_task_include import HandlerTaskInclude

        # we don't want the full set of attributes (the task lists), as that
        # would lead to a serialize/deserialize loop
        for attr in self.fattributes:
            if attr in data and attr not in ('block', 'rescue', 'always'):
                setattr(self, attr, data.get(attr))

        self._dep_chain = data.get('dep_chain', None)

        # if there was a serialized role, unpack it too
        role_data = data.get('role')
        if role_data:
            r = Role()
            r.deserialize(role_data)
            self._role = r

        parent_data = data.get('parent')
        if parent_data:
            parent_type = data.get('parent_type')
            if parent_type == 'Block':
                p = Block()
            elif parent_type == 'TaskInclude':
                p = TaskInclude()
            elif parent_type == 'HandlerTaskInclude':
                p = HandlerTaskInclude()
            p.deserialize(parent_data)
            self._parent = p
            self._dep_chain = self._parent.get_dep_chain()

    def set_loader(self, loader):
        self._loader = loader
        if self._parent:
            self._parent.set_loader(loader)
        elif self._role:
            self._role.set_loader(loader)

        dep_chain = self.get_dep_chain()
        if dep_chain:
            for dep in dep_chain:
                dep.set_loader(loader)

    def _get_parent_attribute(self, attr, omit=False):
        '''
        Generic logic to get the attribute or parent attribute for a block value.
        '''
        fattr = self.fattributes[attr]

        extend = fattr.extend
        prepend = fattr.prepend

        try:
            # omit self, and only get parent values
            if omit:
                value = Sentinel
            else:
                value = getattr(self, f'_{attr}', Sentinel)

            # If parent is static, we can grab attrs from the parent
            # otherwise, defer to the grandparent
            if getattr(self._parent, 'statically_loaded', True):
                _parent = self._parent
            else:
                _parent = self._parent._parent

            if _parent and (value is Sentinel or extend):
                try:
                    if getattr(_parent, 'statically_loaded', True):
                        if hasattr(_parent, '_get_parent_attribute'):
                            parent_value = _parent._get_parent_attribute(attr)
                        else:
                            parent_value = getattr(_parent, f'_{attr}', Sentinel)
                        if extend:
                            value = self._extend_value(value, parent_value, prepend)
                        else:
                            value = parent_value
                except AttributeError:
                    pass
            if self._role and (value is Sentinel or extend):
                try:
                    parent_value = getattr(self._role, f'_{attr}', Sentinel)
                    if extend:
                        value = self._extend_value(value, parent_value, prepend)
                    else:
                        value = parent_value

                    dep_chain = self.get_dep_chain()
                    if dep_chain and (value is Sentinel or extend):
                        dep_chain.reverse()
                        for dep in dep_chain:
                            dep_value = getattr(dep, f'_{attr}', Sentinel)
                            if extend:
                                value = self._extend_value(value, dep_value, prepend)
                            else:
                                value = dep_value

                            if value is not Sentinel and not extend:
                                break
                except AttributeError:
                    pass
            if self._play and (value is Sentinel or extend):
                try:
                    play_value = getattr(self._play, f'_{attr}', Sentinel)
                    if play_value is not Sentinel:
                        if extend:
                            value = self._extend_value(value, play_value, prepend)
                        else:
                            value = play_value
                except AttributeError:
                    pass
        except KeyError:
            pass

        return value

    def filter_tagged_tasks(self, all_vars):
        '''
        Creates a new block, with task lists filtered based on the tags.
        '''

        def evaluate_and_append_task(target):
            tmp_list = []
            for task in target:
                if isinstance(task, Block):
                    filtered_block = evaluate_block(task)
                    if filtered_block.has_tasks():
                        tmp_list.append(filtered_block)
                elif ((task.action in C._ACTION_META and task.implicit) or
                        (task.action in C._ACTION_INCLUDE and task.evaluate_tags([], self._play.skip_tags, all_vars=all_vars)) or
                        task.evaluate_tags(self._play.only_tags, self._play.skip_tags, all_vars=all_vars)):
                    tmp_list.append(task)
            return tmp_list

        def evaluate_block(block):
            new_block = block.copy(exclude_parent=True, exclude_tasks=True)
            new_block._parent = block._parent
            new_block.block = evaluate_and_append_task(block.block)
            new_block.rescue = evaluate_and_append_task(block.rescue)
            new_block.always = evaluate_and_append_task(block.always)
            return new_block

        return evaluate_block(self)

    def get_tasks(self):
        def evaluate_and_append_task(target):
            tmp_list = []
            for task in target:
                if isinstance(task, Block):
                    tmp_list.extend(evaluate_block(task))
                else:
                    tmp_list.append(task)
            return tmp_list

        def evaluate_block(block):
            rv = evaluate_and_append_task(block.block)
            rv.extend(evaluate_and_append_task(block.rescue))
            rv.extend(evaluate_and_append_task(block.always))
            return rv

        return evaluate_block(self)

    def has_tasks(self):
        return len(self.block) > 0 or len(self.rescue) > 0 or len(self.always) > 0

    def get_include_params(self):
        if self._parent:
            return self._parent.get_include_params()
        else:
            return dict()

    def all_parents_static(self):
        '''
        Determine if all of the parents of this block were statically loaded
        or not. Since Task/TaskInclude objects may be in the chain, they simply
        call their parents all_parents_static() method. Only Block objects in
        the chain check the statically_loaded value of the parent.
        '''
        from ansible.playbook.task_include import TaskInclude
        if self._parent:
            if isinstance(self._parent, TaskInclude) and not self._parent.statically_loaded:
                return False
            return self._parent.all_parents_static()

        return True

    def get_first_parent_include(self):
        from ansible.playbook.task_include import TaskInclude
        if self._parent:
            if isinstance(self._parent, TaskInclude):
                return self._parent
            return self._parent.get_first_parent_include()
        return None

Anon7 - 2022
AnonSec Team