Server IP : 85.214.239.14 / Your IP : 3.133.134.63 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/community/dns/plugins/module_utils/hetzner/ |
Upload File : |
# -*- coding: utf-8 -*- # # Copyright (c) 2021 Felix Fontein # Copyright (c) 2020 Markus Bergholz <markuman+spambelongstogoogle@gmail.com> # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import (absolute_import, division, print_function) __metaclass__ = type from ansible.module_utils.basic import env_fallback from ansible_collections.community.dns.plugins.module_utils.argspec import ( ArgumentSpec, ) from ansible_collections.community.dns.plugins.module_utils.json_api_helper import ( JSONAPIHelper, ERROR_CODES, UNKNOWN_ERROR, ) from ansible_collections.community.dns.plugins.module_utils.provider import ( ProviderInformation, ) from ansible_collections.community.dns.plugins.module_utils.record import ( DNSRecord, ) from ansible_collections.community.dns.plugins.module_utils.zone import ( DNSZone, ) from ansible_collections.community.dns.plugins.module_utils.zone_record_api import ( DNSAPIError, NOT_PROVIDED, ZoneRecordAPI, filter_records, ) def _create_zone_from_json(source): zone = DNSZone(source['name']) zone.id = source['id'] info = source.copy() info.pop('name') info.pop('id') if 'legacy_ns' in info: info['legacy_ns'] = sorted(info['legacy_ns']) zone.info = info return zone def _create_record_from_json(source, type=None, has_id=True): source = dict(source) result = DNSRecord() if has_id: result.id = source.pop('id') result.type = source.pop('type', type) result.ttl = source.pop('ttl', None) name = source.pop('name', None) if name == '@': name = None result.prefix = name result.target = source.pop('value') source.pop('zone_id', None) result.extra.update(source) return result def _record_to_json(record, zone_id): result = { 'name': record.prefix or '@', 'value': record.target, 'type': record.type, 'zone_id': zone_id, } if record.ttl is not None: result['ttl'] = record.ttl return result class HetznerAPI(ZoneRecordAPI, JSONAPIHelper): def __init__(self, http_helper, token, api='https://dns.hetzner.com/api/', debug=False): JSONAPIHelper.__init__(self, http_helper, token, api=api, debug=debug) def _create_headers(self): return { 'Accept': 'application/json', 'Auth-API-Token': self._token, } def _extract_only_error_message(self, result): # These errors are not documented, but are what I experienced the API seems to return: res = '' if isinstance(result.get('error'), dict): if 'message' in result['error']: res = '{0} with error message "{1}"'.format(res, result['error']['message']) if 'code' in result['error']: res = '{0} (error code {1})'.format(res, result['error']['code']) if result.get('message'): res = '{0} with message "{1}"'.format(res, result['message']) return res def _extract_error_message(self, result): if result is None: return '' if isinstance(result, dict): res = self._extract_only_error_message(result) if res: return res return ' with data: {0}'.format(result) def _validate(self, result=None, info=None, expected=None, method='GET'): super(HetznerAPI, self)._validate(result=result, info=info, expected=expected, method=method) if isinstance(result, dict): error = result.get('error') if isinstance(error, dict): status = error.get('code') if status is None: return url = info['url'] if expected is not None and status in expected: return error_code = ERROR_CODES.get(status, UNKNOWN_ERROR) more = self._extract_error_message(result) raise DNSAPIError( '{0} {1} resulted in API error {2} ({3}){4}'.format(method, url, status, error_code, more)) def _list_pagination(self, url, data_key, query=None, block_size=100, accept_404=False): result = [] page = 1 while True: query_ = query.copy() if query else dict() query_['per_page'] = block_size query_['page'] = page res, info = self._get(url, query_, must_have_content=[200], expected=[200, 404] if accept_404 and page == 1 else [200]) if accept_404 and page == 1 and info['status'] == 404: return None result.extend(res[data_key]) if 'meta' not in res and page == 1: return result if page >= res['meta']['pagination']['last_page']: return result page += 1 def get_zone_by_name(self, name): """ Given a zone name, return the zone contents if found. @param name: The zone name (string) @return The zone information (DNSZone), or None if not found """ result, info = self._get('v1/zones', expected=[200, 404], query=dict(name=name)) for zone in result['zones']: if zone.get('name') == name: return _create_zone_from_json(zone) return None def get_zone_by_id(self, id): """ Given a zone ID, return the zone contents if found. @param id: The zone ID @return The zone information (DNSZone), or None if not found """ result, info = self._get('v1/zones/{id}'.format(id=id), expected=[200, 404], must_have_content=[200]) if info['status'] == 404: return None return _create_zone_from_json(result['zone']) def get_zone_records(self, zone_id, prefix=NOT_PROVIDED, record_type=NOT_PROVIDED): """ Given a zone ID, return a list of records, optionally filtered by the provided criteria. @param zone_id: The zone ID @param prefix: The prefix to filter for, if provided. Since None is a valid value, the special constant NOT_PROVIDED indicates that we are not filtering. @param record_type: The record type to filter for, if provided @return A list of DNSrecord objects, or None if zone was not found """ result = self._list_pagination('v1/records', data_key='records', query=dict(zone_id=zone_id), accept_404=True) if result is None: return None return filter_records( [_create_record_from_json(record) for record in result], prefix=prefix, record_type=record_type, ) def add_record(self, zone_id, record): """ Adds a new record to an existing zone. @param zone_id: The zone ID @param record: The DNS record (DNSRecord) @return The created DNS record (DNSRecord) """ data = _record_to_json(record, zone_id=zone_id) result, info = self._post('v1/records', data=data, expected=[200, 422]) if info['status'] == 422: raise DNSAPIError( 'The new {type} record with value "{target}" and TTL {ttl} has not been accepted by the server{message}'.format( type=record.type, target=record.target, ttl=record.ttl, message=self._extract_only_error_message(result), ) ) return _create_record_from_json(result['record']) def update_record(self, zone_id, record): """ Update a record. @param zone_id: The zone ID @param record: The DNS record (DNSRecord) @return The DNS record (DNSRecord) """ if record.id is None: raise DNSAPIError('Need record ID to update record!') data = _record_to_json(record, zone_id=zone_id) result, info = self._put('v1/records/{id}'.format(id=record.id), data=data, expected=[200, 422]) if info['status'] == 422: raise DNSAPIError( 'The updated {type} record with value "{target}" and TTL {ttl} has not been accepted by the server{message}'.format( type=record.type, target=record.target, ttl=record.ttl, message=self._extract_only_error_message(result), ) ) return _create_record_from_json(result['record']) def delete_record(self, zone_id, record): """ Delete a record. @param zone_id: The zone ID @param record: The DNS record (DNSRecord) @return True in case of success (boolean) """ if record.id is None: raise DNSAPIError('Need record ID to delete record!') dummy, info = self._delete('v1/records/{id}'.format(id=record.id), must_have_content=False, expected=[200, 404]) return info['status'] == 200 @staticmethod def _append(results_per_zone_id, zone_id, result): if zone_id not in results_per_zone_id: results_per_zone_id[zone_id] = [] results_per_zone_id[zone_id].append(result) def add_records(self, records_per_zone_id, stop_early_on_errors=True): """ Add new records to an existing zone. @param records_per_zone_id: Maps a zone ID to a list of DNS records (DNSRecord) @param stop_early_on_errors: If set to ``True``, try to stop changes after the first error happens. This might only work on some APIs. @return A dictionary mapping zone IDs to lists of tuples ``(record, created, failed)``. Here ``created`` indicates whether the record was created (``True``) or not (``False``). If it was created, ``record`` contains the record ID and ``failed`` is ``None``. If it was not created, ``failed`` should be a ``DNSAPIError`` instance indicating why it was not created. It is possible that the API only creates records if all succeed, in that case ``failed`` can be ``None`` even though ``created`` is ``False``. """ json_records = [] for zone_id, records in records_per_zone_id.items(): for record in records: json_records.append(_record_to_json(record, zone_id=zone_id)) data = {'records': json_records} # Error 422 means that at least one of the records was not valid result, info = self._post('v1/records/bulk', data=data, expected=[200, 422]) results_per_zone_id = {} # This is the list of invalid records that was detected before accepting the whole set for json_record in result.get('invalid_records') or []: record = _create_record_from_json(json_record, has_id=False) zone_id = json_record['zone_id'] self._append(results_per_zone_id, zone_id, (record, False, DNSAPIError( 'Creating {type} record "{target}" with TTL {ttl} for zone {zoneID} failed with unknown reason'.format( type=record.type, target=record.target, ttl=record.ttl, zoneID=zone_id)))) # This is the list of valid records that were not processed for json_record in result.get('valid_records') or []: record = _create_record_from_json(json_record, has_id=False) zone_id = json_record['zone_id'] self._append(results_per_zone_id, zone_id, (record, False, None)) # This is the list of correctly processed records for json_record in result.get('records') or []: record = _create_record_from_json(json_record) zone_id = json_record['zone_id'] self._append(results_per_zone_id, zone_id, (record, True, None)) return results_per_zone_id def update_records(self, records_per_zone_id, stop_early_on_errors=True): """ Update multiple records. @param records_per_zone_id: Maps a zone ID to a list of DNS records (DNSRecord) @param stop_early_on_errors: If set to ``True``, try to stop changes after the first error happens. This might only work on some APIs. @return A dictionary mapping zone IDs to lists of tuples ``(record, updated, failed)``. Here ``updated`` indicates whether the record was updated (``True``) or not (``False``). If it was not updated, ``failed`` should be a ``DNSAPIError`` instance. If it was updated, ``failed`` should be ``None``. It is possible that the API only updates records if all succeed, in that case ``failed`` can be ``None`` even though ``updated`` is ``False``. """ # Currently Hetzner's bulk update API seems to be broken, it always returns the error message # "An invalid response was received from the upstream server". That's why for now, we always # fall back to the default implementation. if True: # pylint: disable=using-constant-test return super(HetznerAPI, self).update_records(records_per_zone_id, stop_early_on_errors=stop_early_on_errors) json_records = [] for zone_id, records in records_per_zone_id.items(): for record in records: json_records.append(_record_to_json(record, zone_id=zone_id)) data = {'records': json_records} result, dummy = self._put('v1/records/bulk', data=data, expected=[200]) results_per_zone_id = {} for json_record in result.get('failed_records') or []: record = _create_record_from_json(json_record) zone_id = json_record['zone_id'] self._append(results_per_zone_id, zone_id, (record, False, DNSAPIError( 'Updating {type} record #{id} "{target}" with TTL {ttl} for zone {zoneID} failed with unknown reason'.format( type=record.type, id=record.id, target=record.target, ttl=record.ttl, zoneID=zone_id)))) for json_record in result.get('records') or []: record = _create_record_from_json(json_record) zone_id = json_record['zone_id'] self._append(results_per_zone_id, zone_id, (record, True, None)) return results_per_zone_id class HetznerProviderInformation(ProviderInformation): def get_supported_record_types(self): """ Return a list of supported record types. """ return ['A', 'AAAA', 'NS', 'MX', 'CNAME', 'RP', 'TXT', 'SOA', 'HINFO', 'SRV', 'DANE', 'TLSA', 'DS', 'CAA'] def get_zone_id_type(self): """ Return the (short) type for zone IDs, like ``'int'`` or ``'str'``. """ return 'str' def get_record_id_type(self): """ Return the (short) type for record IDs, like ``'int'`` or ``'str'``. """ return 'str' def get_record_default_ttl(self): """ Return the default TTL for records, like 300, 3600 or None. None means that some other TTL (usually from the zone) will be used. """ return None def normalize_prefix(self, prefix): """ Given a prefix (string or None), return its normalized form. The result should always be None for the trivial prefix, and a non-zero length DNS name for a non-trivial prefix. If a provider supports other identifiers for the trivial prefix, such as '@', this function needs to convert them to None as well. """ return None if prefix in ('@', '') else prefix def supports_bulk_actions(self): """ Return whether the API supports some kind of bulk actions. """ return True def txt_record_handling(self): """ Return how the API handles TXT records. Returns one of the following strings: * 'decoded' - the API works with unencoded values * 'encoded' - the API works with encoded values * 'encoded-no-char-encoding' - the API works with encoded values, but without character encoding """ return 'encoded-no-char-encoding' def create_hetzner_provider_information(): return HetznerProviderInformation() def create_hetzner_argument_spec(): return ArgumentSpec( argument_spec=dict( hetzner_token=dict( type='str', required=True, no_log=True, aliases=['api_token'], fallback=(env_fallback, ['HETZNER_DNS_TOKEN']), ), ), ) def create_hetzner_api(option_provider, http_helper): return HetznerAPI(http_helper, option_provider.get_option('hetzner_token'))