Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 13.59.182.74
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/2/cwd/proc/2/root/lib/python3/dist-packages/ansible/cli/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/2/cwd/proc/2/root/lib/python3/dist-packages/ansible/cli/console.py
#!/usr/bin/env python
# Copyright: (c) 2014, Nandor Sivok <dominis@haxor.hu>
# Copyright: (c) 2016, Redhat Inc
# Copyright: (c) 2018, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# PYTHON_ARGCOMPLETE_OK

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

# ansible.cli needs to be imported first, to ensure the source bin/* scripts run that code first
from ansible.cli import CLI

import atexit
import cmd
import getpass
import readline
import os
import sys

from ansible import constants as C
from ansible import context
from ansible.cli.arguments import option_helpers as opt_help
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.module_utils._text import to_native, to_text
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.parsing.splitter import parse_kv
from ansible.playbook.play import Play
from ansible.plugins.list import list_plugins
from ansible.plugins.loader import module_loader, fragment_loader
from ansible.utils import plugin_docs
from ansible.utils.color import stringc
from ansible.utils.display import Display

display = Display()


class ConsoleCLI(CLI, cmd.Cmd):
    '''
       A REPL that allows for running ad-hoc tasks against a chosen inventory
       from a nice shell with built-in tab completion (based on dominis'
       ansible-shell).

       It supports several commands, and you can modify its configuration at
       runtime:

       - `cd [pattern]`: change host/group (you can use host patterns eg.: app*.dc*:!app01*)
       - `list`: list available hosts in the current path
       - `list groups`: list groups included in the current path
       - `become`: toggle the become flag
       - `!`: forces shell module instead of the ansible module (!yum update -y)
       - `verbosity [num]`: set the verbosity level
       - `forks [num]`: set the number of forks
       - `become_user [user]`: set the become_user
       - `remote_user [user]`: set the remote_user
       - `become_method [method]`: set the privilege escalation method
       - `check [bool]`: toggle check mode
       - `diff [bool]`: toggle diff mode
       - `timeout [integer]`: set the timeout of tasks in seconds (0 to disable)
       - `help [command/module]`: display documentation for the command or module
       - `exit`: exit ansible-console
    '''

    name = 'ansible-console'
    modules = []  # type: list[str] | None
    ARGUMENTS = {'host-pattern': 'A name of a group in the inventory, a shell-like glob '
                                 'selecting hosts in inventory or any combination of the two separated by commas.'}

    # use specific to console, but fallback to highlight for backwards compatibility
    NORMAL_PROMPT = C.COLOR_CONSOLE_PROMPT or C.COLOR_HIGHLIGHT

    def __init__(self, args):

        super(ConsoleCLI, self).__init__(args)

        self.intro = 'Welcome to the ansible console. Type help or ? to list commands.\n'

        self.groups = []
        self.hosts = []
        self.pattern = None
        self.variable_manager = None
        self.loader = None
        self.passwords = dict()

        self.cwd = '*'

        # Defaults for these are set from the CLI in run()
        self.remote_user = None
        self.become = None
        self.become_user = None
        self.become_method = None
        self.check_mode = None
        self.diff = None
        self.forks = None
        self.task_timeout = None
        self.collections = None

        cmd.Cmd.__init__(self)

    def init_parser(self):
        super(ConsoleCLI, self).init_parser(
            desc="REPL console for executing Ansible tasks.",
            epilog="This is not a live session/connection: each task is executed in the background and returns its results."
        )
        opt_help.add_runas_options(self.parser)
        opt_help.add_inventory_options(self.parser)
        opt_help.add_connect_options(self.parser)
        opt_help.add_check_options(self.parser)
        opt_help.add_vault_options(self.parser)
        opt_help.add_fork_options(self.parser)
        opt_help.add_module_options(self.parser)
        opt_help.add_basedir_options(self.parser)
        opt_help.add_runtask_options(self.parser)
        opt_help.add_tasknoplay_options(self.parser)

        # options unique to shell
        self.parser.add_argument('pattern', help='host pattern', metavar='pattern', default='all', nargs='?')
        self.parser.add_argument('--step', dest='step', action='store_true',
                                 help="one-step-at-a-time: confirm each task before running")

    def post_process_args(self, options):
        options = super(ConsoleCLI, self).post_process_args(options)
        display.verbosity = options.verbosity
        self.validate_conflicts(options, runas_opts=True, fork_opts=True)
        return options

    def get_names(self):
        return dir(self)

    def cmdloop(self):
        try:
            cmd.Cmd.cmdloop(self)

        except KeyboardInterrupt:
            self.cmdloop()

        except EOFError:
            self.display("[Ansible-console was exited]")
            self.do_exit(self)

    def set_prompt(self):
        login_user = self.remote_user or getpass.getuser()
        self.selected = self.inventory.list_hosts(self.cwd)
        prompt = "%s@%s (%d)[f:%s]" % (login_user, self.cwd, len(self.selected), self.forks)
        if self.become and self.become_user in [None, 'root']:
            prompt += "# "
            color = C.COLOR_ERROR
        else:
            prompt += "$ "
            color = self.NORMAL_PROMPT
        self.prompt = stringc(prompt, color, wrap_nonvisible_chars=True)

    def list_modules(self):
        return list_plugins('module', self.collections)

    def default(self, line, forceshell=False):
        """ actually runs modules """
        if line.startswith("#"):
            return False

        if not self.cwd:
            display.error("No host found")
            return False

        # defaults
        module = 'shell'
        module_args = line

        if forceshell is not True:
            possible_module, *possible_args = line.split()
            if module_loader.find_plugin(possible_module):
                # we found module!
                module = possible_module
                if possible_args:
                    module_args = ' '.join(possible_args)
                else:
                    module_args = ''

        if self.callback:
            cb = self.callback
        elif C.DEFAULT_LOAD_CALLBACK_PLUGINS and C.DEFAULT_STDOUT_CALLBACK != 'default':
            cb = C.DEFAULT_STDOUT_CALLBACK
        else:
            cb = 'minimal'

        result = None
        try:
            check_raw = module in C._ACTION_ALLOWS_RAW_ARGS
            task = dict(action=dict(module=module, args=parse_kv(module_args, check_raw=check_raw)), timeout=self.task_timeout)
            play_ds = dict(
                name="Ansible Shell",
                hosts=self.cwd,
                gather_facts='no',
                tasks=[task],
                remote_user=self.remote_user,
                become=self.become,
                become_user=self.become_user,
                become_method=self.become_method,
                check_mode=self.check_mode,
                diff=self.diff,
                collections=self.collections,
            )
            play = Play().load(play_ds, variable_manager=self.variable_manager, loader=self.loader)
        except Exception as e:
            display.error(u"Unable to build command: %s" % to_text(e))
            return False

        try:
            # now create a task queue manager to execute the play
            self._tqm = None
            try:
                self._tqm = TaskQueueManager(
                    inventory=self.inventory,
                    variable_manager=self.variable_manager,
                    loader=self.loader,
                    passwords=self.passwords,
                    stdout_callback=cb,
                    run_additional_callbacks=C.DEFAULT_LOAD_CALLBACK_PLUGINS,
                    run_tree=False,
                    forks=self.forks,
                )

                result = self._tqm.run(play)
                display.debug(result)
            finally:
                if self._tqm:
                    self._tqm.cleanup()
                if self.loader:
                    self.loader.cleanup_all_tmp_files()

            if result is None:
                display.error("No hosts found")
                return False
        except KeyboardInterrupt:
            display.error('User interrupted execution')
            return False
        except Exception as e:
            if self.verbosity >= 3:
                import traceback
                display.v(traceback.format_exc())
            display.error(to_text(e))
            return False

    def emptyline(self):
        return

    def do_shell(self, arg):
        """
        You can run shell commands through the shell module.

        eg.:
        shell ps uax | grep java | wc -l
        shell killall python
        shell halt -n

        You can use the ! to force the shell module. eg.:
        !ps aux | grep java | wc -l
        """
        self.default(arg, True)

    def help_shell(self):
        display.display("You can run shell commands through the shell module.")

    def do_forks(self, arg):
        """Set the number of forks"""
        if arg:
            try:
                forks = int(arg)
            except TypeError:
                display.error('Invalid argument for "forks"')
                self.usage_forks()

            if forks > 0:
                self.forks = forks
                self.set_prompt()

            else:
                display.display('forks must be greater than or equal to 1')
        else:
            self.usage_forks()

    def help_forks(self):
        display.display("Set the number of forks to use per task")
        self.usage_forks()

    def usage_forks(self):
        display.display('Usage: forks <number>')

    do_serial = do_forks
    help_serial = help_forks

    def do_collections(self, arg):
        """Set list of collections for 'short name' usage"""
        if arg in ('', 'none'):
            self.collections = None
        elif not arg:
            self.usage_collections()
        else:
            collections = arg.split(',')
            for collection in collections:
                if self.collections is None:
                    self.collections = []
                self.collections.append(collection.strip())

        if self.collections:
            display.v('Collections name search is set to: %s' % ', '.join(self.collections))
        else:
            display.v('Collections name search is using defaults')

    def help_collections(self):
        display.display("Set the collection name search path when using short names for plugins")
        self.usage_collections()

    def usage_collections(self):
        display.display('Usage: collections <collection1>[, <collection2> ...]\n Use empty quotes or "none" to reset to default.\n')

    def do_verbosity(self, arg):
        """Set verbosity level"""
        if not arg:
            display.display('Usage: verbosity <number>')
        else:
            try:
                display.verbosity = int(arg)
                display.v('verbosity level set to %s' % arg)
            except (TypeError, ValueError) as e:
                display.error('The verbosity must be a valid integer: %s' % to_text(e))

    def help_verbosity(self):
        display.display("Set the verbosity level, equivalent to -v for 1 and -vvvv for 4.")

    def do_cd(self, arg):
        """
            Change active host/group. You can use hosts patterns as well eg.:
            cd webservers
            cd webservers:dbservers
            cd webservers:!phoenix
            cd webservers:&staging
            cd webservers:dbservers:&staging:!phoenix
        """
        if not arg:
            self.cwd = '*'
        elif arg in '/*':
            self.cwd = 'all'
        elif self.inventory.get_hosts(arg):
            self.cwd = arg
        else:
            display.display("no host matched")

        self.set_prompt()

    def help_cd(self):
        display.display("Change active host/group. ")
        self.usage_cd()

    def usage_cd(self):
        display.display("Usage: cd <group>|<host>|<host pattern>")

    def do_list(self, arg):
        """List the hosts in the current group"""
        if not arg:
            for host in self.selected:
                display.display(host.name)
        elif arg == 'groups':
            for group in self.groups:
                display.display(group)
        else:
            display.error('Invalid option passed to "list"')
            self.help_list()

    def help_list(self):
        display.display("List the hosts in the current group or a list of groups if you add 'groups'.")

    def do_become(self, arg):
        """Toggle whether plays run with become"""
        if arg:
            self.become = boolean(arg, strict=False)
            display.v("become changed to %s" % self.become)
            self.set_prompt()
        else:
            display.display("Please specify become value, e.g. `become yes`")

    def help_become(self):
        display.display("Toggle whether the tasks are run with become")

    def do_remote_user(self, arg):
        """Given a username, set the remote user plays are run by"""
        if arg:
            self.remote_user = arg
            self.set_prompt()
        else:
            display.display("Please specify a remote user, e.g. `remote_user root`")

    def help_remote_user(self):
        display.display("Set the user for use as login to the remote target")

    def do_become_user(self, arg):
        """Given a username, set the user that plays are run by when using become"""
        if arg:
            self.become_user = arg
        else:
            display.display("Please specify a user, e.g. `become_user jenkins`")
            display.v("Current user is %s" % self.become_user)
        self.set_prompt()

    def help_become_user(self):
        display.display("Set the user for use with privilege escalation (which remote user attempts to 'become' when become is enabled)")

    def do_become_method(self, arg):
        """Given a become_method, set the privilege escalation method when using become"""
        if arg:
            self.become_method = arg
            display.v("become_method changed to %s" % self.become_method)
        else:
            display.display("Please specify a become_method, e.g. `become_method su`")
            display.v("Current become_method is %s" % self.become_method)

    def help_become_method(self):
        display.display("Set the privilege escalation plugin to use when become is enabled")

    def do_check(self, arg):
        """Toggle whether plays run with check mode"""
        if arg:
            self.check_mode = boolean(arg, strict=False)
            display.display("check mode changed to %s" % self.check_mode)
        else:
            display.display("Please specify check mode value, e.g. `check yes`")
            display.v("check mode is currently %s." % self.check_mode)

    def help_check(self):
        display.display("Toggle check_mode for the tasks")

    def do_diff(self, arg):
        """Toggle whether plays run with diff"""
        if arg:
            self.diff = boolean(arg, strict=False)
            display.display("diff mode changed to %s" % self.diff)
        else:
            display.display("Please specify a diff value , e.g. `diff yes`")
            display.v("diff mode is currently %s" % self.diff)

    def help_diff(self):
        display.display("Toggle diff output for the tasks")

    def do_timeout(self, arg):
        """Set the timeout"""
        if arg:
            try:
                timeout = int(arg)
                if timeout < 0:
                    display.error('The timeout must be greater than or equal to 1, use 0 to disable')
                else:
                    self.task_timeout = timeout
            except (TypeError, ValueError) as e:
                display.error('The timeout must be a valid positive integer, or 0 to disable: %s' % to_text(e))
        else:
            self.usage_timeout()

    def help_timeout(self):
        display.display("Set task timeout in seconds")
        self.usage_timeout()

    def usage_timeout(self):
        display.display('Usage: timeout <seconds>')

    def do_exit(self, args):
        """Exits from the console"""
        sys.stdout.write('\nAnsible-console was exited.\n')
        return -1

    def help_exit(self):
        display.display("LEAVE!")

    do_EOF = do_exit
    help_EOF = help_exit

    def helpdefault(self, module_name):
        if module_name:
            in_path = module_loader.find_plugin(module_name)
            if in_path:
                oc, a, _dummy1, _dummy2 = plugin_docs.get_docstring(in_path, fragment_loader)
                if oc:
                    display.display(oc['short_description'])
                    display.display('Parameters:')
                    for opt in oc['options'].keys():
                        display.display('  ' + stringc(opt, self.NORMAL_PROMPT) + ' ' + oc['options'][opt]['description'][0])
                else:
                    display.error('No documentation found for %s.' % module_name)
            else:
                display.error('%s is not a valid command, use ? to list all valid commands.' % module_name)

    def help_help(self):
        display.warning("Don't be redundant!")

    def complete_cd(self, text, line, begidx, endidx):
        mline = line.partition(' ')[2]
        offs = len(mline) - len(text)

        if self.cwd in ('all', '*', '\\'):
            completions = self.hosts + self.groups
        else:
            completions = [x.name for x in self.inventory.list_hosts(self.cwd)]

        return [to_native(s)[offs:] for s in completions if to_native(s).startswith(to_native(mline))]

    def completedefault(self, text, line, begidx, endidx):
        if line.split()[0] in self.list_modules():
            mline = line.split(' ')[-1]
            offs = len(mline) - len(text)
            completions = self.module_args(line.split()[0])

            return [s[offs:] + '=' for s in completions if s.startswith(mline)]

    def module_args(self, module_name):
        in_path = module_loader.find_plugin(module_name)
        oc, a, _dummy1, _dummy2 = plugin_docs.get_docstring(in_path, fragment_loader, is_module=True)
        return list(oc['options'].keys())

    def run(self):

        super(ConsoleCLI, self).run()

        sshpass = None
        becomepass = None

        # hosts
        self.pattern = context.CLIARGS['pattern']
        self.cwd = self.pattern

        # Defaults from the command line
        self.remote_user = context.CLIARGS['remote_user']
        self.become = context.CLIARGS['become']
        self.become_user = context.CLIARGS['become_user']
        self.become_method = context.CLIARGS['become_method']
        self.check_mode = context.CLIARGS['check']
        self.diff = context.CLIARGS['diff']
        self.forks = context.CLIARGS['forks']
        self.task_timeout = context.CLIARGS['task_timeout']

        # set module path if needed
        if context.CLIARGS['module_path']:
            for path in context.CLIARGS['module_path']:
                if path:
                    module_loader.add_directory(path)

        # dynamically add 'cannonical' modules as commands, aliases coudld be used and dynamically loaded
        self.modules = self.list_modules()
        for module in self.modules:
            setattr(self, 'do_' + module, lambda arg, module=module: self.default(module + ' ' + arg))
            setattr(self, 'help_' + module, lambda module=module: self.helpdefault(module))

        (sshpass, becomepass) = self.ask_passwords()
        self.passwords = {'conn_pass': sshpass, 'become_pass': becomepass}

        self.loader, self.inventory, self.variable_manager = self._play_prereqs()

        hosts = self.get_host_list(self.inventory, context.CLIARGS['subset'], self.pattern)

        self.groups = self.inventory.list_groups()
        self.hosts = [x.name for x in hosts]

        # This hack is to work around readline issues on a mac:
        #  http://stackoverflow.com/a/7116997/541202
        if 'libedit' in readline.__doc__:
            readline.parse_and_bind("bind ^I rl_complete")
        else:
            readline.parse_and_bind("tab: complete")

        histfile = os.path.join(os.path.expanduser("~"), ".ansible-console_history")
        try:
            readline.read_history_file(histfile)
        except IOError:
            pass

        atexit.register(readline.write_history_file, histfile)
        self.set_prompt()
        self.cmdloop()

    def __getattr__(self, name):
        ''' handle not found to populate dynamically a module function if module matching name exists '''
        attr = None

        if name.startswith('do_'):
            module = name.replace('do_', '')
            if module_loader.find_plugin(module):
                setattr(self, name, lambda arg, module=module: self.default(module + ' ' + arg))
                attr = object.__getattr__(self, name)
        elif name.startswith('help_'):
            module = name.replace('help_', '')
            if module_loader.find_plugin(module):
                setattr(self, name, lambda module=module: self.helpdefault(module))
                attr = object.__getattr__(self, name)

        if attr is None:
            raise AttributeError(f"{self.__class__} does not have a {name} attribute")

        return attr


def main(args=None):
    ConsoleCLI.cli_executor(args)


if __name__ == '__main__':
    main()

Anon7 - 2022
AnonSec Team