Server IP : 85.214.239.14 / Your IP : 18.118.184.25 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/community/dns/plugins/module_utils/ |
Upload File : |
# -*- coding: utf-8 -*- # Copyright (c) 2021, Felix Fontein <felix@fontein.de> # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import absolute_import, division, print_function __metaclass__ = type import traceback from ansible.module_utils.basic import missing_required_lib from ansible.module_utils.common.text.converters import to_text try: import dns import dns.exception import dns.name import dns.message import dns.query import dns.rcode import dns.rdatatype import dns.resolver except ImportError: DNSPYTHON_IMPORTERROR = traceback.format_exc() else: DNSPYTHON_IMPORTERROR = None class ResolverError(Exception): pass class ResolveDirectlyFromNameServers(object): def __init__(self, timeout=10, timeout_retries=3, always_ask_default_resolver=True): self.cache = {} self.timeout = timeout self.timeout_retries = timeout_retries self.default_resolver = dns.resolver.get_default_resolver() self.default_nameservers = self.default_resolver.nameservers self.always_ask_default_resolver = always_ask_default_resolver def _handle_reponse_errors(self, target, response, nameserver=None, query=None): rcode = response.rcode() if rcode == dns.rcode.NOERROR: return True if rcode == dns.rcode.NXDOMAIN: raise dns.resolver.NXDOMAIN(qnames=[target], responses={target: response}) msg = 'Error %s' % dns.rcode.to_text(rcode) if nameserver: msg = '%s while querying %s' % (msg, nameserver) if query: msg = '%s with query %s' % (msg, query) raise ResolverError(msg) def _handle_timeout(self, function, *args, **kwargs): retry = 0 while True: try: return function(*args, **kwargs) except dns.exception.Timeout as exc: if retry >= self.timeout_retries: raise exc retry += 1 def _lookup_ns_names(self, target, nameservers=None, nameserver_ips=None): if self.always_ask_default_resolver: nameservers = None nameserver_ips = self.default_nameservers if nameservers is None and nameserver_ips is None: nameserver_ips = self.default_nameservers if not nameserver_ips and nameservers: nameserver_ips = self._lookup_address(nameservers[0]) if not nameserver_ips: raise ResolverError('Have neither nameservers nor nameserver IPs') query = dns.message.make_query(target, dns.rdatatype.NS) response = self._handle_timeout(dns.query.udp, query, nameserver_ips[0], timeout=self.timeout) self._handle_reponse_errors(target, response, nameserver=nameserver_ips[0], query='get NS for "%s"' % target) cname = None for rrset in response.answer: if rrset.rdtype == dns.rdatatype.CNAME: cname = dns.name.from_text(to_text(rrset[0])) new_nameservers = [] rrsets = list(response.authority) rrsets.extend(response.answer) for rrset in rrsets: if rrset.rdtype == dns.rdatatype.SOA: # We keep the current nameservers return None, cname if rrset.rdtype == dns.rdatatype.NS: new_nameservers.extend(str(ns_record.target) for ns_record in rrset) return sorted(set(new_nameservers)) if new_nameservers else None, cname def _lookup_address_impl(self, target, rdtype): try: try: answer = self._handle_timeout(self.default_resolver.resolve, target, rdtype=rdtype, lifetime=self.timeout) except AttributeError: # For dnspython < 2.0.0 self.default_resolver.search = False try: answer = self._handle_timeout(self.default_resolver.query, target, rdtype=rdtype, lifetime=self.timeout) except TypeError: # For dnspython < 1.6.0 self.default_resolver.lifetime = self.timeout answer = self._handle_timeout(self.default_resolver.query, target, rdtype=rdtype) return [str(res) for res in answer.rrset] except dns.resolver.NoAnswer: return [] def _lookup_address(self, target): result = self.cache.get((target, 'addr')) if not result: result = self._lookup_address_impl(target, dns.rdatatype.A) result.extend(self._lookup_address_impl(target, dns.rdatatype.AAAA)) self.cache[(target, 'addr')] = result return result def _do_lookup_ns(self, target): nameserver_ips = self.default_nameservers nameservers = None for i in range(2, len(target.labels) + 1): target_part = target.split(i)[1] _nameservers = self.cache.get((str(target_part), 'ns')) if _nameservers is None: nameserver_names, cname = self._lookup_ns_names(target_part, nameservers=nameservers, nameserver_ips=nameserver_ips) if nameserver_names is not None: nameservers = nameserver_names self.cache[(str(target_part), 'ns')] = nameservers self.cache[(str(target_part), 'cname')] = cname else: nameservers = _nameservers nameserver_ips = None return nameservers def _lookup_ns(self, target): result = self.cache.get((str(target), 'ns')) if not result: result = self._do_lookup_ns(target) self.cache[(str(target), 'ns')] = result return result def _get_resolver(self, dnsname, nameservers): cache_index = ('|'.join([str(dnsname)] + sorted(nameservers)), 'resolver') resolver = self.cache.get(cache_index) if resolver is None: resolver = dns.resolver.Resolver(configure=False) resolver.timeout = self.timeout nameserver_ips = set() for nameserver in nameservers: nameserver_ips.update(self._lookup_address(nameserver)) resolver.nameservers = sorted(nameserver_ips) self.cache[cache_index] = resolver return resolver def resolve_nameservers(self, target, resolve_addresses=False): nameservers = self._lookup_ns(dns.name.from_unicode(to_text(target))) if resolve_addresses: nameserver_ips = set() for nameserver in nameservers: nameserver_ips.update(self._lookup_address(nameserver)) nameservers = list(nameserver_ips) return sorted(nameservers) def resolve(self, target, nxdomain_is_empty=True, **kwargs): dnsname = dns.name.from_unicode(to_text(target)) loop_catcher = set() while True: try: nameservers = self._lookup_ns(dnsname) except dns.resolver.NXDOMAIN: if nxdomain_is_empty: return {} raise cname = self.cache.get((str(dnsname), 'cname')) if cname is None: break dnsname = cname if dnsname in loop_catcher: raise ResolverError('Found CNAME loop starting at {0}'.format(target)) loop_catcher.add(dnsname) results = {} for nameserver in nameservers: results[nameserver] = None resolver = self._get_resolver(dnsname, [nameserver]) try: try: response = self._handle_timeout(resolver.resolve, dnsname, lifetime=self.timeout, **kwargs) except AttributeError: # For dnspython < 2.0.0 resolver.search = False try: response = self._handle_timeout(resolver.query, dnsname, lifetime=self.timeout, **kwargs) except TypeError: # For dnspython < 1.6.0 resolver.lifetime = self.timeout response = self._handle_timeout(resolver.query, dnsname, **kwargs) if response.rrset: results[nameserver] = response.rrset except dns.resolver.NoAnswer: pass return results def assert_requirements_present(module): if DNSPYTHON_IMPORTERROR is not None: module.fail_json(msg=missing_required_lib('dnspython'), exception=DNSPYTHON_IMPORTERROR)