Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.138.143.25
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/3/task/3/cwd/proc/self/root/proc/3/root/lib/python3/dist-packages/libcloud/dns/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/3/task/3/cwd/proc/self/root/proc/3/root/lib/python3/dist-packages/libcloud/dns//base.py
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import with_statement

from typing import Dict
from typing import Iterator
from typing import List
from typing import Optional
from typing import Union
from typing import Type
from typing import Any

import datetime

from libcloud import __version__
from libcloud.common.base import Connection
from libcloud.common.base import ConnectionUserAndKey, BaseDriver
from libcloud.dns.types import RecordType

__all__ = [
    'Zone',
    'Record',
    'DNSDriver'
]


class Zone(object):
    """
    DNS zone.
    """

    def __init__(self,
                 id,  # type: str
                 domain,  # type: str
                 type,  # type: str
                 ttl,  # type: int
                 driver,  # type: DNSDriver
                 extra=None  # type: dict
                 ):
        """
        :param id: Zone id.
        :type id: ``str``

        :param domain: The name of the domain.
        :type domain: ``str``

        :param type: Zone type (master, slave).
        :type type: ``str``

        :param ttl: Default TTL for records in this zone (in seconds).
        :type ttl: ``int``

        :param driver: DNSDriver instance.
        :type driver: :class:`DNSDriver`

        :param extra: (optional) Extra attributes (driver specific).
        :type extra: ``dict``
        """
        self.id = str(id) if id else None
        self.domain = domain
        self.type = type
        self.ttl = ttl or None
        self.driver = driver
        self.extra = extra or {}

    def list_records(self):
        # type: () -> List[Record]
        return self.driver.list_records(zone=self)

    def create_record(self, name, type, data, extra=None):
        # type: (str, RecordType, str, Optional[dict]) -> Record
        return self.driver.create_record(name=name, zone=self, type=type,
                                         data=data, extra=extra)

    def update(self,
               domain=None,  # type: Optional[str]
               type=None,  # type: Optional[str]
               ttl=None,  # type: Optional[int]
               extra=None  # type: Optional[dict]
               ):
        # type: (...) -> Zone
        return self.driver.update_zone(zone=self, domain=domain, type=type,
                                       ttl=ttl, extra=extra)

    def delete(self):
        # type: () -> bool
        return self.driver.delete_zone(zone=self)

    def export_to_bind_format(self):
        # type: () -> str
        return self.driver.export_zone_to_bind_format(zone=self)

    def export_to_bind_zone_file(self, file_path):
        # type: (str) -> None
        self.driver.export_zone_to_bind_zone_file(zone=self,
                                                  file_path=file_path)

    def __repr__(self):
        # type: () -> str
        return ('<Zone: domain=%s, ttl=%s, provider=%s ...>' %
                (self.domain, self.ttl, self.driver.name))


class Record(object):
    """
    Zone record / resource.
    """

    def __init__(self,
                 id,  # type: str
                 name,  # type: str
                 type,  # type: RecordType
                 data,  # type: str
                 zone,  # type: Zone
                 driver,  # type: DNSDriver
                 ttl=None,  # type: int
                 extra=None  # type: dict
                 ):
        """
        :param id: Record id
        :type id: ``str``

        :param name: Hostname or FQDN.
        :type name: ``str``

        :param type: DNS record type (A, AAAA, ...).
        :type type: :class:`RecordType`

        :param data: Data for the record (depends on the record type).
        :type data: ``str``

        :param zone: Zone instance.
        :type zone: :class:`Zone`

        :param driver: DNSDriver instance.
        :type driver: :class:`DNSDriver`

        :param ttl: Record TTL.
        :type ttl: ``int``

        :param extra: (optional) Extra attributes (driver specific).
        :type extra: ``dict``
        """
        self.id = str(id) if id else None
        self.name = name
        self.type = type
        self.data = data
        self.zone = zone
        self.driver = driver
        self.ttl = ttl
        self.extra = extra or {}

    def update(self,
               name=None,  # type: Optional[str]
               type=None,  # type: Optional[RecordType]
               data=None,  # type: Optional[str]
               extra=None  # type: Optional[dict]
               ):
        # type: (...) -> Record
        return self.driver.update_record(record=self, name=name, type=type,
                                         data=data, extra=extra)

    def delete(self):
        # type: () -> bool
        return self.driver.delete_record(record=self)

    def _get_numeric_id(self):
        # type: () -> Union[int, str]
        """
        Return numeric ID for the provided record if the ID is a digit.

        This method is used for sorting the values when exporting Zone to a
        BIND format.
        """
        record_id = self.id

        if record_id is None:
            return ''

        if record_id.isdigit():
            record_id_int = int(record_id)
            return record_id_int

        return record_id

    def __repr__(self):
        # type: () -> str
        zone = self.zone.domain if self.zone.domain else self.zone.id
        return ('<Record: zone=%s, name=%s, type=%s, data=%s, provider=%s, '
                'ttl=%s ...>' %
                (zone, self.name, self.type, self.data,
                 self.driver.name, self.ttl))


class DNSDriver(BaseDriver):
    """
    A base DNSDriver class to derive from

    This class is always subclassed by a specific driver.
    """
    connectionCls = ConnectionUserAndKey  # type: Type[Connection]
    name = None  # type: str
    website = None  # type: str

    # Map libcloud record type enum to provider record type name
    RECORD_TYPE_MAP = {}  # type: Dict[RecordType, str]

    def __init__(self,
                 key,  # type: str
                 secret=None,  # type: Optional[str]
                 secure=True,  # type: bool
                 host=None,  # type: Optional[str]
                 port=None,  # type: Optional[int]
                 **kwargs  # type: Optional[Any]
                 ):
        # type: (...) -> None
        """
        :param    key: API key or username to used (required)
        :type     key: ``str``

        :param    secret: Secret password to be used (required)
        :type     secret: ``str``

        :param    secure: Whether to use HTTPS or HTTP. Note: Some providers
                only support HTTPS, and it is on by default.
        :type     secure: ``bool``

        :param    host: Override hostname used for connections.
        :type     host: ``str``

        :param    port: Override port used for connections.
        :type     port: ``int``

        :return: ``None``
        """
        super(DNSDriver, self).__init__(key=key, secret=secret, secure=secure,
                                        host=host, port=port, **kwargs)

    def list_record_types(self):
        # type: () -> List[RecordType]
        """
        Return a list of RecordType objects supported by the provider.

        :return: ``list`` of :class:`RecordType`
        """
        return list(self.RECORD_TYPE_MAP.keys())

    def iterate_zones(self):
        # type: () -> Iterator[Zone]
        """
        Return a generator to iterate over available zones.

        :rtype: ``generator`` of :class:`Zone`
        """
        raise NotImplementedError(
            'iterate_zones not implemented for this driver')

    def list_zones(self):
        # type: () -> List[Zone]
        """
        Return a list of zones.

        :return: ``list`` of :class:`Zone`
        """
        return list(self.iterate_zones())

    def iterate_records(self, zone):
        # type: (Zone) -> Iterator[Record]
        """
        Return a generator to iterate over records for the provided zone.

        :param zone: Zone to list records for.
        :type zone: :class:`Zone`

        :rtype: ``generator`` of :class:`Record`
        """
        raise NotImplementedError(
            'iterate_records not implemented for this driver')

    def list_records(self, zone):
        # type: (Zone) -> List[Record]
        """
        Return a list of records for the provided zone.

        :param zone: Zone to list records for.
        :type zone: :class:`Zone`

        :return: ``list`` of :class:`Record`
        """
        return list(self.iterate_records(zone))

    def get_zone(self, zone_id):
        # type: (str) -> Zone
        """
        Return a Zone instance.

        :param zone_id: ID of the required zone
        :type  zone_id: ``str``

        :rtype: :class:`Zone`
        """
        raise NotImplementedError(
            'get_zone not implemented for this driver')

    def get_record(self, zone_id, record_id):
        # type: (str, str) -> Record
        """
        Return a Record instance.

        :param zone_id: ID of the required zone
        :type  zone_id: ``str``

        :param record_id: ID of the required record
        :type  record_id: ``str``

        :rtype: :class:`Record`
        """
        raise NotImplementedError(
            'get_record not implemented for this driver')

    def create_zone(self, domain, type='master', ttl=None, extra=None):
        # type: (str, str, Optional[int], Optional[dict]) -> Zone
        """
        Create a new zone.

        :param domain: Zone domain name (e.g. example.com)
        :type domain: ``str``

        :param type: Zone type (master / slave).
        :type  type: ``str``

        :param ttl: TTL for new records. (optional)
        :type  ttl: ``int``

        :param extra: Extra attributes (driver specific). (optional)
        :type extra: ``dict``

        :rtype: :class:`Zone`
        """
        raise NotImplementedError(
            'create_zone not implemented for this driver')

    def update_zone(self,
                    zone,  # type: Zone
                    domain,  # type: Optional[str]
                    type='master',  # type: Optional[str]
                    ttl=None,  # type: Optional[int]
                    extra=None  # type: Optional[dict]
                    ):
        # type: (...) -> Zone
        """
        Update an existing zone.

        :param zone: Zone to update.
        :type  zone: :class:`Zone`

        :param domain: Zone domain name (e.g. example.com)
        :type  domain: ``str``

        :param type: Zone type (master / slave).
        :type  type: ``str``

        :param ttl: TTL for new records. (optional)
        :type  ttl: ``int``

        :param extra: Extra attributes (driver specific). (optional)
        :type  extra: ``dict``

        :rtype: :class:`Zone`
        """
        raise NotImplementedError(
            'update_zone not implemented for this driver')

    def create_record(self, name, zone, type, data, extra=None):
        # type: (str, Zone, RecordType, str, Optional[dict]) -> Record
        """
        Create a new record.

        :param name: Record name without the domain name (e.g. www).
                     Note: If you want to create a record for a base domain
                     name, you should specify empty string ('') for this
                     argument.
        :type  name: ``str``

        :param zone: Zone where the requested record is created.
        :type  zone: :class:`Zone`

        :param type: DNS record type (A, AAAA, ...).
        :type  type: :class:`RecordType`

        :param data: Data for the record (depends on the record type).
        :type  data: ``str``

        :param extra: Extra attributes (driver specific). (optional)
        :type extra: ``dict``

        :rtype: :class:`Record`
        """
        raise NotImplementedError(
            'create_record not implemented for this driver')

    def update_record(self,
                      record,  # type: Record
                      name,  # type: Optional[str]
                      type,  # type: Optional[RecordType]
                      data,  # type: Optional[str]
                      extra=None  # type: Optional[dict]
                      ):
        """
        Update an existing record.

        :param record: Record to update.
        :type  record: :class:`Record`

        :param name: Record name without the domain name (e.g. www).
                     Note: If you want to create a record for a base domain
                     name, you should specify empty string ('') for this
                     argument.
        :type  name: ``str``

        :param type: DNS record type (A, AAAA, ...).
        :type  type: :class:`RecordType`

        :param data: Data for the record (depends on the record type).
        :type  data: ``str``

        :param extra: (optional) Extra attributes (driver specific).
        :type  extra: ``dict``

        :rtype: :class:`Record`
        """
        raise NotImplementedError(
            'update_record not implemented for this driver')

    def delete_zone(self, zone):
        # type: (Zone) -> bool
        """
        Delete a zone.

        Note: This will delete all the records belonging to this zone.

        :param zone: Zone to delete.
        :type  zone: :class:`Zone`

        :rtype: ``bool``
        """
        raise NotImplementedError(
            'delete_zone not implemented for this driver')

    def delete_record(self, record):
        # type: (Record) -> bool
        """
        Delete a record.

        :param record: Record to delete.
        :type  record: :class:`Record`

        :rtype: ``bool``
        """
        raise NotImplementedError(
            'delete_record not implemented for this driver')

    def export_zone_to_bind_format(self, zone):
        # type: (Zone) -> str
        """
        Export Zone object to the BIND compatible format.

        :param zone: Zone to export.
        :type  zone: :class:`Zone`

        :return: Zone data in BIND compatible format.
        :rtype: ``str``
        """
        if zone.type != 'master':
            raise ValueError('You can only generate BIND out for master zones')

        lines = []

        # For consistent output, records are sorted based on the id
        records = zone.list_records()
        records = sorted(records, key=Record._get_numeric_id)

        date = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
        values = {'version': __version__, 'date': date}

        lines.append('; Generated by Libcloud v%(version)s on %(date)s UTC' %
                     values)
        lines.append('$ORIGIN %(domain)s.' % {'domain': zone.domain})
        lines.append('$TTL %(domain_ttl)s\n' % {'domain_ttl': zone.ttl})

        for record in records:
            line = self._get_bind_record_line(record=record)
            lines.append(line)

        output = '\n'.join(lines)
        return output

    def export_zone_to_bind_zone_file(self, zone, file_path):
        # type: (Zone, str) -> None
        """
        Export Zone object to the BIND compatible format and write result to a
        file.

        :param zone: Zone to export.
        :type  zone: :class:`Zone`

        :param file_path: File path where the output will be saved.
        :type  file_path: ``str``
        """
        result = self.export_zone_to_bind_format(zone=zone)

        with open(file_path, 'w') as fp:
            fp.write(result)

    def _get_bind_record_line(self, record):
        # type: (Record) -> str
        """
        Generate BIND record line for the provided record.

        :param record: Record to generate the line for.
        :type  record: :class:`Record`

        :return: Bind compatible record line.
        :rtype: ``str``
        """
        parts = []  # type: List[Any]

        if record.name:
            name = '%(name)s.%(domain)s' % {'name': record.name,
                                            'domain': record.zone.domain}
        else:
            name = record.zone.domain

        name += '.'

        ttl = record.extra['ttl'] if 'ttl' in record.extra else record.zone.ttl
        ttl = str(ttl)
        data = record.data

        if record.type in [RecordType.CNAME, RecordType.DNAME, RecordType.MX,
                           RecordType.PTR, RecordType.SRV]:
            # Make sure trailing dot is present
            if data[len(data) - 1] != '.':
                data += '.'

        if record.type in [RecordType.TXT, RecordType.SPF] and ' ' in data:
            # Escape the quotes
            data = data.replace('"', '\\"')

            # Quote the string
            data = '"%s"' % (data)

        if record.type in [RecordType.MX, RecordType.SRV]:
            priority = str(record.extra['priority'])
            parts = [name, ttl, 'IN', str(record.type), priority, data]
        else:
            parts = [name, ttl, 'IN', str(record.type), data]

        line = '\t'.join(parts)
        return line

    def _string_to_record_type(self, string):
        # type: (str) -> RecordType
        """
        Return a string representation of a DNS record type to a
        libcloud RecordType ENUM.

        :rtype: ``str``
        """
        string = string.upper()
        record_type = getattr(RecordType, string)
        return record_type

Anon7 - 2022
AnonSec Team