Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.223.211.3
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/ansible_collections/dellemc/os10/plugins/action/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/ansible_collections/dellemc/os10/plugins/action/textfsm_parser.py
# -*- coding: utf-8 -*-

# (c) 2020, Ansible by Red Hat, inc
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#
# You should have received a copy of the GNU General Public License
# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

from ansible.module_utils.six import StringIO, string_types

from ansible.plugins.action import ActionBase
from ansible.errors import AnsibleError

try:
    import textfsm
    HAS_TEXTFSM = True
except ImportError:
    HAS_TEXTFSM = False


class ActionModule(ActionBase):

    def run(self, tmp=None, task_vars=None):
        ''' handler for textfsm action '''

        if task_vars is None:
            task_vars = dict()

        result = super(ActionModule, self).run(tmp, task_vars)
        del tmp  # tmp no longer has any effect

        try:
            if not HAS_TEXTFSM:
                raise AnsibleError('textfsm_parser engine requires the TextFSM library to be installed')

            try:
                filename = self._task.args.get('file')
                src = self._task.args.get('src')
                content = self._task.args['content']
                name = self._task.args.get('name')
            except KeyError as exc:
                raise AnsibleError('missing required argument: %s' % exc)

            if src and filename:
                raise AnsibleError('`src` and `file` are mutually exclusive arguments')

            if not isinstance(content, string_types):
                return {'failed': True, 'msg': '`content` must be of type str, got %s' % type(content)}

            if filename:
                tmpl = open(filename)
            else:
                tmpl = StringIO()
                tmpl.write(src.strip())
                tmpl.seek(0)

            try:
                re_table = textfsm.TextFSM(tmpl)
                fsm_results = re_table.ParseText(content)

            except Exception as exc:
                raise AnsibleError(str(exc))

            final_facts = []
            for item in fsm_results:
                facts = {}
                facts.update(dict(zip(re_table.header, item)))
                final_facts.append(facts)

            if name:
                result['ansible_facts'] = {name: final_facts}
            else:
                result['ansible_facts'] = {}

        finally:
            self._remove_tmp_path(self._connection._shell.tmpdir)

        return result

Anon7 - 2022
AnonSec Team