Server IP : 85.214.239.14 / Your IP : 3.138.138.87 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /usr/lib/python3/dist-packages/ansible_collections/community/routeros/plugins/modules/ |
Upload File : |
#!/usr/bin/python # -*- coding: utf-8 -*- # Copyright (c) 2020, Nikolay Dachev <nikolay@dachev.info> # GNU General Public License v3.0+ https://www.gnu.org/licenses/gpl-3.0.txt # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import (absolute_import, division, print_function) __metaclass__ = type DOCUMENTATION = ''' --- module: api author: "Nikolay Dachev (@NikolayDachev)" short_description: Ansible module for RouterOS API description: - Ansible module for RouterOS API with the Python C(librouteros) library. - This module can add, remove, update, query and execute arbitrary command in RouterOS via API. notes: - I(add), I(remove), I(update), I(cmd) and I(query) are mutually exclusive. - Use the M(community.routeros.api_modify) and M(community.routeros.api_find_and_modify) modules for more specific modifications, and the M(community.routeros.api_info) module for a more controlled way of returning all entries for a path. extends_documentation_fragment: - community.routeros.api - community.routeros.attributes - community.routeros.attributes.actiongroup_api attributes: check_mode: support: none diff_mode: support: none platform: support: full platforms: RouterOS action_group: version_added: 2.1.0 options: path: description: - Main path for all other arguments. - If other arguments are not set, api will return all items in selected path. - Example C(ip address). Equivalent of RouterOS CLI C(/ip address print). required: true type: str add: description: - Will add selected arguments in selected path to RouterOS config. - Example C(address=1.1.1.1/32 interface=ether1). - Equivalent in RouterOS CLI C(/ip address add address=1.1.1.1/32 interface=ether1). type: str remove: description: - Remove config/value from RouterOS by '.id'. - Example C(*03) will remove config/value with C(id=*03) in selected path. - Equivalent in RouterOS CLI C(/ip address remove numbers=1). - Note C(number) in RouterOS CLI is different from C(.id). type: str update: description: - Update config/value in RouterOS by '.id' in selected path. - Example C(.id=*03 address=1.1.1.3/32) and path C(ip address) will replace existing ip address with C(.id=*03). - Equivalent in RouterOS CLI C(/ip address set address=1.1.1.3/32 numbers=1). - Note C(number) in RouterOS CLI is different from C(.id). type: str query: description: - Query given path for selected query attributes from RouterOS aip. - WHERE is key word which extend query. WHERE format is key operator value - with spaces. - WHERE valid operators are C(==) or C(eq), C(!=) or C(not), C(>) or C(more), C(<) or C(less). - Example path C(ip address) and query C(.id address) will return only C(.id) and C(address) for all items in C(ip address) path. - Example path C(ip address) and query C(.id address WHERE address == 1.1.1.3/32). will return only C(.id) and C(address) for items in C(ip address) path, where address is eq to 1.1.1.3/32. - Example path C(interface) and query C(mtu name WHERE mut > 1400) will return only interfaces C(mtu,name) where mtu is bigger than 1400. - Equivalent in RouterOS CLI C(/interface print where mtu > 1400). type: str extended_query: description: - Extended query given path for selected query attributes from RouterOS API. - Extended query allow conjunctive input. If there is no matching entry, an empty list will be returned. type: dict suboptions: attributes: description: - The list of attributes to return. - Every attribute used in a I(where) clause need to be listed here. type: list elements: str required: true where: description: - Allows to restrict the objects returned. - The conditions here must all match. An I(or) condition needs at least one of its conditions to match. type: list elements: dict suboptions: attribute: description: - The attribute to match. Must be part of I(attributes). - Either I(or) or all of I(attribute), I(is), and I(value) have to be specified. type: str is: description: - The operator to use for matching. - For equality use C(==) or C(eq). For less use C(<) or C(less). For more use C(>) or C(more). - Use C(in) to check whether the value is part of a list. In that case, I(value) must be a list. - Either I(or) or all of I(attribute), I(is), and I(value) have to be specified. type: str choices: ["==", "!=", ">", "<", "in", "eq", "not", "more", "less"] value: description: - The value to compare to. Must be a list for I(is=in). - Either I(or) or all of I(attribute), I(is), and I(value) have to be specified. type: raw or: description: - A list of conditions so that at least one of them has to match. - Either I(or) or all of I(attribute), I(is), and I(value) have to be specified. type: list elements: dict suboptions: attribute: description: - The attribute to match. Must be part of I(attributes). type: str required: true is: description: - The operator to use for matching. - For equality use C(==) or C(eq). For less use C(<) or C(less). For more use C(>) or C(more). - Use C(in) to check whether the value is part of a list. In that case, I(value) must be a list. type: str choices: ["==", "!=", ">", "<", "in", "eq", "not", "more", "less"] required: true value: description: - The value to compare to. Must be a list for I(is=in). type: raw required: true cmd: description: - Execute any/arbitrary command in selected path, after the command we can add C(.id). - Example path C(system script) and cmd C(run .id=*03) is equivalent in RouterOS CLI C(/system script run number=0). - Example path C(ip address) and cmd C(print) is equivalent in RouterOS CLI C(/ip address print). type: str seealso: - ref: ansible_collections.community.routeros.docsite.quoting description: How to quote and unquote commands and arguments - module: community.routeros.api_facts - module: community.routeros.api_find_and_modify - module: community.routeros.api_info - module: community.routeros.api_modify ''' EXAMPLES = ''' - name: Get example - ip address print community.routeros.api: hostname: "{{ hostname }}" password: "{{ password }}" username: "{{ username }}" path: "ip address" register: ipaddrd_printout - name: Dump "Get example" output ansible.builtin.debug: msg: '{{ ipaddrd_printout }}' - name: Add example - ip address community.routeros.api: hostname: "{{ hostname }}" password: "{{ password }}" username: "{{ username }}" path: "ip address" add: "address=192.168.255.10/24 interface=ether2" - name: Query example - ".id, address" in "ip address WHERE address == 192.168.255.10/24" community.routeros.api: hostname: "{{ hostname }}" password: "{{ password }}" username: "{{ username }}" path: "ip address" query: ".id address WHERE address == {{ ip2 }}" register: queryout - name: Dump "Query example" output ansible.builtin.debug: msg: '{{ queryout }}' - name: Extended query example - ".id,address,network" where address is not 192.168.255.10/24 or is 10.20.36.20/24 community.routeros.api: hostname: "{{ hostname }}" password: "{{ password }}" username: "{{ username }}" path: "ip address" extended_query: attributes: - network - address - .id where: - attribute: "network" is: "==" value: "192.168.255.0" - or: - attribute: "address" is: "!=" value: "192.168.255.10/24" - attribute: "address" is: "eq" value: "10.20.36.20/24" - attribute: "network" is: "in" value: - "10.20.36.0" - "192.168.255.0" register: extended_queryout - name: Dump "Extended query example" output ansible.builtin.debug: msg: '{{ extended_queryout }}' - name: Update example - ether2 ip addres with ".id = *14" community.routeros.api: hostname: "{{ hostname }}" password: "{{ password }}" username: "{{ username }}" path: "ip address" update: >- .id=*14 address=192.168.255.20/24 comment={{ 'Update 192.168.255.10/24 to 192.168.255.20/24 on ether2' | community.routeros.quote_argument_value }} - name: Remove example - ether2 ip 192.168.255.20/24 with ".id = *14" community.routeros.api: hostname: "{{ hostname }}" password: "{{ password }}" username: "{{ username }}" path: "ip address" remove: "*14" - name: Arbitrary command example "/system identity print" community.routeros.api: hostname: "{{ hostname }}" password: "{{ password }}" username: "{{ username }}" path: "system identity" cmd: "print" register: arbitraryout - name: Dump "Arbitrary command example" output ansible.builtin.debug: msg: '{{ arbitraryout }}' ''' RETURN = ''' --- message: description: All outputs are in list with dictionary elements returned from RouterOS api. sample: - address: 1.2.3.4 - address: 2.3.4.5 type: list returned: always ''' from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.common.text.converters import to_native from ansible_collections.community.routeros.plugins.module_utils.quoting import ( ParseError, convert_list_to_dictionary, parse_argument_value, split_routeros_command, ) from ansible_collections.community.routeros.plugins.module_utils.api import ( api_argument_spec, check_has_library, create_api, ) import re try: from librouteros.exceptions import LibRouterosError from librouteros.query import Key, Or except Exception: # Handled in api module_utils pass class ROS_api_module: def __init__(self): module_args = dict( path=dict(type='str', required=True), add=dict(type='str'), remove=dict(type='str'), update=dict(type='str'), cmd=dict(type='str'), query=dict(type='str'), extended_query=dict(type='dict', options=dict( attributes=dict(type='list', elements='str', required=True), where=dict( type='list', elements='dict', options={ 'attribute': dict(type='str'), 'is': dict(type='str', choices=["==", "!=", ">", "<", "in", "eq", "not", "more", "less"]), 'value': dict(type='raw'), 'or': dict(type='list', elements='dict', options={ 'attribute': dict(type='str', required=True), 'is': dict(type='str', choices=["==", "!=", ">", "<", "in", "eq", "not", "more", "less"], required=True), 'value': dict(type='raw', required=True), }), }, required_together=[('attribute', 'is', 'value')], mutually_exclusive=[('attribute', 'or')], required_one_of=[('attribute', 'or')], ), )), ) module_args.update(api_argument_spec()) self.module = AnsibleModule(argument_spec=module_args, supports_check_mode=False, mutually_exclusive=(('add', 'remove', 'update', 'cmd', 'query', 'extended_query'),),) check_has_library(self.module) self.api = create_api(self.module) self.path = self.module.params['path'].split() self.add = self.module.params['add'] self.remove = self.module.params['remove'] self.update = self.module.params['update'] self.arbitrary = self.module.params['cmd'] self.where = None self.query = self.module.params['query'] self.extended_query = self.module.params['extended_query'] self.result = dict( message=[]) # create api base path self.api_path = self.api_add_path(self.api, self.path) # api calls try: if self.add: self.api_add() elif self.remove: self.api_remove() elif self.update: self.api_update() elif self.query: self.check_query() self.api_query() elif self.extended_query: self.check_extended_query() self.api_extended_query() elif self.arbitrary: self.api_arbitrary() else: self.api_get_all() except UnicodeEncodeError as exc: self.module.fail_json(msg='Error while encoding text: {error}'.format(error=exc)) def check_query(self): where_index = self.query.find(' WHERE ') if where_index < 0: self.query = self.split_params(self.query) else: where = self.query[where_index + len(' WHERE '):] self.query = self.split_params(self.query[:where_index]) # where must be of the format '<attribute> <operator> <value>' m = re.match(r'^\s*([^ ]+)\s+([^ ]+)\s+(.*)$', where) if not m: self.errors("invalid syntax for 'WHERE %s'" % where) try: self.where = [ m.group(1), # attribute m.group(2), # operator parse_argument_value(m.group(3).rstrip())[0], # value ] except ParseError as exc: self.errors("invalid syntax for 'WHERE %s': %s" % (where, exc)) try: idx = self.query.index('WHERE') self.where = self.query[idx + 1:] self.query = self.query[:idx] except ValueError: # Raised when WHERE has not been found pass def check_extended_query_syntax(self, test_atr, or_msg=''): if test_atr['is'] == "in" and not isinstance(test_atr['value'], list): self.errors("invalid syntax 'extended_query':'where':%s%s 'value' must be a type list" % (or_msg, test_atr)) def check_extended_query(self): if self.extended_query["where"]: for i in self.extended_query['where']: if i["or"] is not None: if len(i['or']) < 2: self.errors("invalid syntax 'extended_query':'where':'or':%s 'or' requires minimum two items" % i["or"]) for orv in i['or']: self.check_extended_query_syntax(orv, ":'or':") else: self.check_extended_query_syntax(i) def list_to_dic(self, ldict): return convert_list_to_dictionary(ldict, skip_empty_values=True, require_assignment=True) def split_params(self, params): if not isinstance(params, str): raise AssertionError('Parameters can only be a string, received %s' % type(params)) try: return split_routeros_command(params) except ParseError as e: self.module.fail_json(msg=to_native(e)) def api_add_path(self, api, path): api_path = api.path() for p in path: api_path = api_path.join(p) return api_path def api_get_all(self): try: for i in self.api_path: self.result['message'].append(i) self.return_result(False, True) except LibRouterosError as e: self.errors(e) def api_add(self): param = self.list_to_dic(self.split_params(self.add)) try: self.result['message'].append("added: .id= %s" % self.api_path.add(**param)) self.return_result(True) except LibRouterosError as e: self.errors(e) def api_remove(self): try: self.api_path.remove(self.remove) self.result['message'].append("removed: .id= %s" % self.remove) self.return_result(True) except LibRouterosError as e: self.errors(e) def api_update(self): param = self.list_to_dic(self.split_params(self.update)) if '.id' not in param.keys(): self.errors("missing '.id' for %s" % param) try: self.api_path.update(**param) self.result['message'].append("updated: %s" % param) self.return_result(True) except LibRouterosError as e: self.errors(e) def api_query(self): keys = {} for k in self.query: if 'id' in k and k != ".id": self.errors("'%s' must be '.id'" % k) keys[k] = Key(k) try: if self.where: if self.where[1] in ('==', 'eq'): select = self.api_path.select(*keys).where(keys[self.where[0]] == self.where[2]) elif self.where[1] in ('!=', 'not'): select = self.api_path.select(*keys).where(keys[self.where[0]] != self.where[2]) elif self.where[1] in ('>', 'more'): select = self.api_path.select(*keys).where(keys[self.where[0]] > self.where[2]) elif self.where[1] in ('<', 'less'): select = self.api_path.select(*keys).where(keys[self.where[0]] < self.where[2]) else: self.errors("'%s' is not operator for 'where'" % self.where[1]) else: select = self.api_path.select(*keys) for row in select: self.result['message'].append(row) if len(self.result['message']) < 1: msg = "no results for '%s 'query' %s" % (' '.join(self.path), ' '.join(self.query)) if self.where: msg = msg + ' WHERE %s' % ' '.join(self.where) self.result['message'].append(msg) self.return_result(False) except LibRouterosError as e: self.errors(e) def build_api_extended_query(self, item): if item['attribute'] not in self.extended_query['attributes']: self.errors("'%s' attribute is not in attributes: %s" % (item, self.extended_query['attributes'])) if item['is'] in ('eq', '=='): return self.query_keys[item['attribute']] == item['value'] elif item['is'] in ('not', '!='): return self.query_keys[item['attribute']] != item['value'] elif item['is'] in ('less', '<'): return self.query_keys[item['attribute']] < item['value'] elif item['is'] in ('more', '>'): return self.query_keys[item['attribute']] > item['value'] elif item['is'] == 'in': return self.query_keys[item['attribute']].In(*item['value']) else: self.errors("'%s' is not operator for 'is'" % item['is']) def api_extended_query(self): self.query_keys = {} for k in self.extended_query['attributes']: if k == 'id': self.errors("'extended_query':'attributes':'%s' must be '.id'" % k) self.query_keys[k] = Key(k) try: if self.extended_query['where']: where_args = [] for i in self.extended_query['where']: if i['or']: where_or_args = [] for ior in i['or']: where_or_args.append(self.build_api_extended_query(ior)) where_args.append(Or(*where_or_args)) else: where_args.append(self.build_api_extended_query(i)) select = self.api_path.select(*self.query_keys).where(*where_args) else: select = self.api_path.select(*self.extended_query['attributes']) for row in select: self.result['message'].append(row) self.return_result(False) except LibRouterosError as e: self.errors(e) def api_arbitrary(self): param = {} self.arbitrary = self.split_params(self.arbitrary) arb_cmd = self.arbitrary[0] if len(self.arbitrary) > 1: param = self.list_to_dic(self.arbitrary[1:]) try: arbitrary_result = self.api_path(arb_cmd, **param) for i in arbitrary_result: self.result['message'].append(i) self.return_result(False) except LibRouterosError as e: self.errors(e) def return_result(self, ch_status=False, status=True): if not status: self.module.fail_json(msg=self.result['message']) else: self.module.exit_json(changed=ch_status, msg=self.result['message']) def errors(self, e): if e.__class__.__name__ == 'TrapError': self.result['message'].append("%s" % e) self.return_result(False, False) self.result['message'].append("%s" % e) self.return_result(False, False) def main(): ROS_api_module() if __name__ == '__main__': main()