Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.142.198.148
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/libcloud/dns/drivers/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/libcloud/dns/drivers/vultr.py
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Vultr DNS Driver
"""
import json
from typing import Optional, List, Dict, Any

from libcloud.utils.py3 import urlencode
from libcloud.common.vultr import VultrConnection
from libcloud.common.vultr import VultrResponse
from libcloud.common.vultr import VultrConnectionV2, VultrResponseV2
from libcloud.common.vultr import DEFAULT_API_VERSION
from libcloud.dns.base import DNSDriver, Zone, Record
from libcloud.dns.types import ZoneDoesNotExistError, RecordDoesNotExistError
from libcloud.dns.types import ZoneAlreadyExistsError, RecordAlreadyExistsError
from libcloud.dns.types import Provider, RecordType


__all__ = [
    'ZoneRequiredException',
    'VultrDNSResponse',
    'VultrDNSConnection',
    'VultrDNSDriver',
]


class ZoneRequiredException(Exception):
    pass


class VultrDNSResponse(VultrResponse):
    pass


class VultrDNSConnection(VultrConnection):
    responseCls = VultrDNSResponse


class VultrDNSResponseV2(VultrResponseV2):
    pass


class VultrDNSConnectionV2(VultrConnectionV2):
    responseCls = VultrDNSResponseV2


class VultrDNSDriver(DNSDriver):
    type = Provider.VULTR
    name = 'Vultr DNS'
    website = 'https://www.vultr.com'

    def __new__(cls, key, secret=None, secure=True, host=None, port=None,
                api_version=DEFAULT_API_VERSION, region=None, **kwargs):
        if cls is VultrDNSDriver:
            if api_version == '1':
                cls = VultrDNSDriverV1
            elif api_version == '2':
                cls = VultrDNSDriverV2
            else:
                raise NotImplementedError(
                    'No Vultr driver found for API version: %s' %
                    (api_version))
        return super().__new__(cls)


class VultrDNSDriverV1(VultrDNSDriver):

    connectionCls = VultrDNSConnection

    RECORD_TYPE_MAP = {

        RecordType.A: 'A',
        RecordType.AAAA: 'AAAA',
        RecordType.TXT: 'TXT',
        RecordType.CNAME: 'CNAME',
        RecordType.MX: 'MX',
        RecordType.NS: 'NS',
        RecordType.SRV: 'SRV',
    }

    def list_zones(self):
        """
        Return a list of records for the provided zone.

        :param zone: Zone to list records for.
        :type zone: :class:`Zone`

        :return: ``list`` of :class:`Record`
        """
        action = '/v1/dns/list'
        params = {'api_key': self.key}
        response = self.connection.request(action=action,
                                           params=params)
        zones = self._to_zones(response.objects[0])

        return zones

    def list_records(self, zone):
        """
        Returns a list of records for the provided zone.

        :param zone: zone to list records for
        :type zone: `Zone`

        :rtype: list of :class: `Record`
        """
        if not isinstance(zone, Zone):
            raise ZoneRequiredException('zone should be of type Zone')

        zones = self.list_zones()

        if not self.ex_zone_exists(zone.domain, zones):
            raise ZoneDoesNotExistError(value='', driver=self,
                                        zone_id=zone.domain)

        action = '/v1/dns/records'
        params = {'domain': zone.domain}
        response = self.connection.request(action=action,
                                           params=params)
        records = self._to_records(response.objects[0], zone=zone)

        return records

    def get_zone(self, zone_id):
        """
        Returns a `Zone` instance.

        :param zone_id: name of the zone user wants to get.
        :type zone_id: ``str``

        :rtype: :class:`Zone`
        """
        ret_zone = None

        action = '/v1/dns/list'
        params = {'api_key': self.key}
        response = self.connection.request(action=action,
                                           params=params)
        zones = self._to_zones(response.objects[0])

        if not self.ex_zone_exists(zone_id, zones):
            raise ZoneDoesNotExistError(value=None, zone_id=zone_id,
                                        driver=self)

        for zone in zones:
            if zone_id == zone.domain:
                ret_zone = zone

        return ret_zone

    def get_record(self, zone_id, record_id):
        """
        Returns a Record instance.

        :param zone_id: name of the required zone
        :type zone_id: ``str``

        :param record_id: ID of the required record
        :type record_id: ``str``

        :rtype: :class: `Record`
        """
        ret_record = None
        zone = self.get_zone(zone_id=zone_id)
        records = self.list_records(zone=zone)

        if not self.ex_record_exists(record_id, records):
            raise RecordDoesNotExistError(value='', driver=self,
                                          record_id=record_id)

        for record in records:
            if record_id == record.id:
                ret_record = record

        return ret_record

    def create_zone(self, domain, type='master', ttl=None, extra=None):
        """
        Returns a `Zone` object.

        :param domain: Zone domain name, (e.g. example.com).
        :type domain: ``str``

        :param type: Zone type (master / slave).
        :type  type: ``str``

        :param ttl: TTL for new records. (optional)
        :type  ttl: ``int``

        :param extra: (optional) Extra attributes (driver specific).
                      (e.g. {'serverip':'127.0.0.1'})
        """
        extra = extra or {}
        if extra and extra.get('serverip'):
            serverip = extra['serverip']

        params = {'api_key': self.key}
        data = urlencode({'domain': domain, 'serverip': serverip})
        action = '/v1/dns/create_domain'
        zones = self.list_zones()
        if self.ex_zone_exists(domain, zones):
            raise ZoneAlreadyExistsError(value='', driver=self,
                                         zone_id=domain)

        self.connection.request(params=params, action=action, data=data,
                                method='POST')
        zone = Zone(id=domain, domain=domain, type=type, ttl=ttl,
                    driver=self, extra=extra)

        return zone

    def create_record(self, name, zone, type, data, extra=None):
        """
        Create a new record.

        :param name: Record name without the domain name (e.g. www).
                     Note: If you want to create a record for a base domain
                     name, you should specify empty string ('') for this
                     argument.
        :type  name: ``str``

        :param zone: Zone where the requested record is created.
        :type  zone: :class:`Zone`

        :param type: DNS record type (A, AAAA, ...).
        :type  type: :class:`RecordType`

        :param data: Data for the record (depends on the record type).
        :type  data: ``str``

        :param extra: Extra attributes (driver specific). (optional)
        :type extra: ``dict``

        :rtype: :class:`Record`
        """
        extra = extra or {}

        ret_record = None
        old_records_list = self.list_records(zone=zone)
        # check if record already exists
        # if exists raise RecordAlreadyExistsError
        for record in old_records_list:
            if record.name == name and record.data == data:
                raise RecordAlreadyExistsError(value='', driver=self,
                                               record_id=record.id)

        MX = self.RECORD_TYPE_MAP.get('MX')
        SRV = self.RECORD_TYPE_MAP.get('SRV')

        if extra and extra.get('priority'):
            priority = int(extra['priority'])

        post_data = {'domain': zone.domain, 'name': name,
                     'type': self.RECORD_TYPE_MAP.get(type), 'data': data}

        if type == MX or type == SRV:
            post_data['priority'] = priority

        encoded_data = urlencode(post_data)
        params = {'api_key': self.key}
        action = '/v1/dns/create_record'

        self.connection.request(action=action, params=params,
                                data=encoded_data, method='POST')
        updated_zone_records = zone.list_records()

        for record in updated_zone_records:
            if record.name == name and record.data == data:
                ret_record = record

        return ret_record

    def delete_zone(self, zone):
        """
        Delete a zone.

        Note: This will delete all the records belonging to this zone.

        :param zone: Zone to delete.
        :type  zone: :class:`Zone`

        :rtype: ``bool``
        """
        action = '/v1/dns/delete_domain'
        params = {'api_key': self.key}
        data = urlencode({'domain': zone.domain})
        zones = self.list_zones()
        if not self.ex_zone_exists(zone.domain, zones):
            raise ZoneDoesNotExistError(value='', driver=self,
                                        zone_id=zone.domain)

        response = self.connection.request(params=params, action=action,
                                           data=data, method='POST')

        return response.status == 200

    def delete_record(self, record):
        """
        Delete a record.

        :param record: Record to delete.
        :type  record: :class:`Record`

        :rtype: ``bool``
        """
        action = '/v1/dns/delete_record'
        params = {'api_key': self.key}
        data = urlencode({'RECORDID': record.id,
                         'domain': record.zone.domain})

        zone_records = self.list_records(record.zone)
        if not self.ex_record_exists(record.id, zone_records):
            raise RecordDoesNotExistError(value='', driver=self,
                                          record_id=record.id)

        response = self.connection.request(action=action, params=params,
                                           data=data, method='POST')

        return response.status == 200

    def ex_zone_exists(self, zone_id, zones_list):
        """
        Function to check if a `Zone` object exists.

        :param zone_id: Name of the `Zone` object.
        :type zone_id: ``str``

        :param zones_list: A list containing `Zone` objects
        :type zones_list: ``list``

        :rtype: Returns `True` or `False`
        """

        zone_ids = []
        for zone in zones_list:
            zone_ids.append(zone.domain)

        return zone_id in zone_ids

    def ex_record_exists(self, record_id, records_list):
        """
        :param record_id: Name of the `Record` object.
        :type record_id: ``str``

        :param records_list: A list containing `Record` objects
        :type records_list: ``list``

        :rtype: ``bool``
        """
        record_ids = []
        for record in records_list:
            record_ids.append(record.id)

        return record_id in record_ids

    def _to_zone(self, item):
        """
        Build an object `Zone` from the item dictionary

        :param item: item to build the zone from
        :type item: `dictionary`

        :rtype: :instance: `Zone`
        """
        type = 'master'
        extra = {'date_created': item['date_created']}

        zone = Zone(id=item['domain'], domain=item['domain'], driver=self,
                    type=type, ttl=None, extra=extra)

        return zone

    def _to_zones(self, items):
        """
        Returns a list of `Zone` objects.

        :param: items: a list that contains dictionary objects to be passed
        to the _to_zone function.
        :type items: ``list``
        """
        zones = []
        for item in items:
            zones.append(self._to_zone(item))

        return zones

    def _to_record(self, item, zone):
        extra = {}

        if item.get('priority'):
            extra['priority'] = item['priority']

        type = self._string_to_record_type(item['type'])
        record = Record(id=item['RECORDID'], name=item['name'], type=type,
                        data=item['data'], zone=zone, driver=self, extra=extra)

        return record

    def _to_records(self, items, zone):
        records = []
        for item in items:
            records.append(self._to_record(item, zone=zone))

        return records


class VultrDNSDriverV2(VultrDNSDriver):
    connectionCls = VultrDNSConnectionV2

    RECORD_TYPE_MAP = {
        RecordType.A: 'A',
        RecordType.AAAA: 'AAAA',
        RecordType.CNAME: 'CNAME',
        RecordType.NS: 'NS',
        RecordType.MX: 'MX',
        RecordType.SRV: 'SRV',
        RecordType.TXT: 'TXT',
        RecordType.CAA: 'CAA',
        RecordType.SSHFP: 'SSHFP',
    }

    def list_zones(self) -> List[Zone]:
        """Return a list of zones.

        :return: ``list`` of :class:`Zone`
        """
        data = self._paginated_request('/v2/domains', 'domains')
        return [self._to_zone(item) for item in data]

    def get_zone(self, zone_id: str) -> Zone:
        """Return a Zone instance.

        :param zone_id: ID of the required zone
        :type  zone_id: ``str``

        :rtype: :class:`Zone`
        """
        resp = self.connection.request('/v2/domains/%s' % zone_id)
        return self._to_zone(resp.object['domain'])

    def create_zone(self,
                    domain: str,
                    type: str = 'master',
                    ttl: Optional[int] = None,
                    extra: Optional[Dict[str, Any]] = None,
                    ) -> Zone:
        """Create a new zone.

        :param domain: Zone domain name (e.g. example.com)
        :type domain: ``str``

        :param type: Zone type. Only 'master' value is supported.
        :type  type: ``str``

        :param ttl: TTL for new records. (unused)
        :type  ttl: ``int``

        :param extra: Extra attributes 'ip': ``str`` IP for a default A record
                                       'dns_sec': ``bool`` Enable DSNSEC.
        :type extra: ``dict``

        :rtype: :class:`Zone`
        """

        data = {
            'domain': domain,
        }

        extra = extra or {}
        if 'ip' in extra:
            data['ip'] = extra['ip']

        if 'dns_sec' in extra:
            data['dns_sec'] = ('enabled'
                               if extra['dns_sec'] is True else 'disabled')

        resp = self.connection.request('/v2/domains',
                                       data=json.dumps(data),
                                       method='POST')
        return self._to_zone(resp.object['domain'])

    def delete_zone(self, zone: Zone) -> bool:
        """Delete a zone.

        Note: This will delete all the records belonging to this zone.

        :param zone: Zone to delete.
        :type  zone: :class:`Zone`

        :rtype: ``bool``
        """
        resp = self.connection.request('/v2/domains/%s' % zone.domain,
                                       method='DELETE')
        return resp.success()

    def list_records(self, zone: Zone) -> List[Record]:
        """Return a list of records for the provided zone.

        :param zone: Zone to list records for.
        :type zone: :class:`Zone`

        :return: ``list`` of :class:`Record`
        """
        data = self._paginated_request('/v2/domains/%s/records' % zone.domain,
                                       'records')
        return [self._to_record(item, zone) for item in data]

    def get_record(self, zone_id: str, record_id: str) -> Record:
        """Return a Record instance.

        :param zone_id: ID of the required zone
        :type  zone_id: ``str``

        :param record_id: ID of the required record
        :type  record_id: ``str``

        :rtype: :class:`Record`
        """
        resp = self.connection.request('/v2/domains/%s/records/%s' %
                                       (zone_id, record_id))

        # Avoid making an extra API call, as zone_id is enough for
        # standard fields
        zone = Zone(id=zone_id,
                    domain=zone_id,
                    type='master',
                    ttl=None,
                    driver=self)

        return self._to_record(resp.object['record'], zone)

    def create_record(self,
                      name: str,
                      zone: Zone,
                      type: RecordType,
                      data: str,
                      extra: Optional[Dict[str, Any]] = None
                      ) -> Record:
        """Create a new record.

        :param name: Record name without the domain name (e.g. www).
                     Note: If you want to create a record for a base domain
                     name, you should specify empty string ('') for this
                     argument.
        :type  name: ``str``

        :param zone: Zone where the requested record is created.
        :type  zone: :class:`Zone`

        :param type: DNS record type (A, AAAA, ...).
        :type  type: :class:`RecordType`

        :param data: Data for the record (depends on the record type).
        :type  data: ``str``

        :keyword extra: Extra attributes 'ttl': Time to live in seconds
                                         'priority': DNS priority. Only
                                                     required for MX and SRV
        :type extra: ``dict``

        :rtype: :class:`Record`
        """
        data = {
            'name': name,
            'type': self.RECORD_TYPE_MAP[type],
            'data': data,
        }
        extra = extra or {}
        if 'ttl' in extra:
            data['ttl'] = int(extra['ttl'])

        if 'priority' in extra:
            data['priority'] = int(extra['priority'])

        resp = self.connection.request('/v2/domains/%s/records' % zone.domain,
                                       data=json.dumps(data),
                                       method='POST')

        return self._to_record(resp.object['record'], zone)

    def update_record(self,
                      record: Record,
                      name: Optional[str] = None,
                      type: Optional[RecordType] = None,
                      data: Optional[str] = None,
                      extra: Optional[Dict[str, Any]] = None
                      ) -> bool:
        """Update an existing record.

        :param record: Record to update.
        :type  record: :class:`Record`

        :keyword name: Record name without the domain name (e.g. www).
                     Note: If you want to create a record for a base domain
                     name, you should specify empty string ('') for this
                     argument.
        :type  name: ``str``

        :keyword type: DNS record type. (Unused)
        :type  type: :class:`RecordType`

        :keyword data: Data for the record (depends on the record type).
        :type  data: ``str``

        :keyword extra: Extra attributes 'ttl': Time to live in seconds
                                         'priority': DNS priority. Only
                                                     required for MX and SRV
        :type  extra: ``dict``

        :rtype: ``bool``
        """
        body = {}
        if name:
            body['name'] = name

        if data:
            body['data'] = data

        extra = extra or {}
        if 'ttl' in extra:
            body['ttl'] = int(extra['ttl'])

        if 'priority' in extra:
            body['priority'] = int(extra['priority'])

        resp = self.connection.request('/v2/domains/%s/records/%s' %
                                       (record.zone.domain, record.id),
                                       data=json.dumps(body),
                                       method='PATCH')

        return resp.success()

    def delete_record(self, record: Record) -> bool:
        """Delete a record.

        :param record: Record to delete.
        :type  record: :class:`Record`

        :rtype: ``bool``
        """
        resp = self.connection.request('/v2/domains/%s/records/%s' %
                                       (record.zone.domain, record.id),
                                       method='DELETE')

        return resp.success()

    def _to_zone(self, data: Dict[str, Any]) -> Zone:
        type_ = 'master'
        domain = data['domain']
        extra = {
            'date_created': data['date_created'],
        }
        return Zone(id=domain,
                    domain=domain,
                    driver=self,
                    type=type_,
                    ttl=None,
                    extra=extra)

    def _to_record(self, data: Dict[str, Any], zone: Zone) -> Record:
        id_ = data['id']
        name = data['name']
        type_ = self._string_to_record_type(data['type'])
        data_ = data['data']
        ttl = data['ttl']
        extra = {
            'priority': data['priority'],
        }

        return Record(id=id_,
                      name=name,
                      type=type_,
                      data=data_,
                      ttl=ttl,
                      driver=self,
                      zone=zone,
                      extra=extra)

    def _paginated_request(self,
                           url: str,
                           key: str,
                           params: Optional[Dict[str, Any]] = None,
                           ) -> List[Any]:
        """Perform multiple calls to get the full list of items when
        the API responses are paginated.

        :param url: API endpoint
        :type url: ``str``

        :param key: Result object key
        :type key: ``str``

        :param params: Request parameters
        :type params: ``dict``

        :return: ``list`` of API response objects
        :rtype: ``list``
        """
        params = params if params is not None else {}
        resp = self.connection.request(url, params=params).object
        data = list(resp.get(key, []))
        objects = data
        while True:
            next_page = resp['meta']['links']['next']
            if next_page:
                params['cursor'] = next_page
                resp = self.connection.request(url, params=params).object
                data = list(resp.get(key, []))
                objects.extend(data)
            else:
                return objects

Anon7 - 2022
AnonSec Team