Server IP : 85.214.239.14 / Your IP : 3.133.122.95 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/community/network/plugins/cliconf/ |
Upload File : |
# Copyright: (c) 2018, Ansible Project # GNU General Public License v3.0+ # (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import (absolute_import, division, print_function) __metaclass__ = type DOCUMENTATION = ''' --- author: Unknown (!UNKNOWN) name: edgeos short_description: Use edgeos cliconf to run command on EdgeOS platform description: - This edgeos plugin provides low level abstraction apis for sending and receiving CLI commands from Ubiquiti EdgeOS network devices. ''' import re import json from itertools import chain from ansible.errors import AnsibleConnectionFailure from ansible.module_utils._text import to_text from ansible.module_utils.common._collections_compat import Mapping from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import to_list from ansible.plugins.cliconf import CliconfBase class Cliconf(CliconfBase): def get_device_info(self): device_info = {} device_info['network_os'] = 'edgeos' reply = self.get('show version') data = to_text(reply, errors='surrogate_or_strict').strip() match = re.search(r'Version:\s*v?(\S+)', data) if match: device_info['network_os_version'] = match.group(1) match = re.search(r'HW model:\s*(\S+)', data) if match: device_info['network_os_model'] = match.group(1) reply = self.get('show host name') device_info['network_os_hostname'] = to_text(reply, errors='surrogate_or_strict').strip() return device_info def get_config(self, source='running', flags=None, format='text'): return self.send_command('show configuration commands|cat') def edit_config(self, candidate=None, commit=True, replace=False, comment=None): for cmd in chain(['configure'], to_list(candidate)): self.send_command(cmd) def get(self, command, prompt=None, answer=None, sendonly=False, newline=True, check_all=False): return self.send_command(command=command, prompt=prompt, answer=answer, sendonly=sendonly, newline=newline, check_all=check_all) def commit(self, comment=None): if comment: command = 'commit comment "{0}"'.format(comment) else: command = 'commit' self.send_command(command) def discard_changes(self, *args, **kwargs): self.send_command('exit discard') def run_commands(self, commands=None, check_rc=True): if commands is None: raise ValueError("'commands' value is required") responses = list() for cmd in to_list(commands): if not isinstance(cmd, Mapping): cmd = {'command': cmd} output = cmd.pop('output', None) if output: raise ValueError("'output' value %s is not supported for run_commands" % output) try: out = self.send_command(**cmd) except AnsibleConnectionFailure as e: if check_rc: raise out = getattr(e, 'err', e) responses.append(out) return responses def get_device_operations(self): return { 'supports_diff_replace': False, 'supports_commit': True, 'supports_rollback': False, 'supports_defaults': False, 'supports_onbox_diff': False, 'supports_commit_comment': True, 'supports_multiline_delimiter': False, 'supports_diff_match': False, 'supports_diff_ignore_lines': False, 'supports_generate_diff': False, 'supports_replace': False } def get_capabilities(self): result = super(Cliconf, self).get_capabilities() result['rpc'] += ['commit', 'discard_changes', 'run_commands'] result['device_operations'] = self.get_device_operations() return json.dumps(result)