Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.148.105.152
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/libcloud/dns/drivers/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/libcloud/dns/drivers//nsone.py
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

try:
    import simplejson as json
except ImportError:
    import json

from libcloud.dns.types import Provider, ZoneDoesNotExistError, \
    ZoneAlreadyExistsError, RecordDoesNotExistError, RecordAlreadyExistsError
from libcloud.utils.py3 import httplib
from libcloud.dns.base import DNSDriver, Zone, Record, RecordType
from libcloud.common.nsone import NsOneConnection, NsOneResponse, \
    NsOneException

__all__ = [
    'NsOneDNSDriver'
]


class NsOneDNSResponse(NsOneResponse):
    pass


class NsOneDNSConnection(NsOneConnection):
    responseCls = NsOneDNSResponse


class NsOneDNSDriver(DNSDriver):
    name = 'NS1 DNS'
    website = 'https://ns1.com'
    type = Provider.NSONE
    connectionCls = NsOneDNSConnection

    RECORD_TYPE_MAP = {
        RecordType.A: 'A',
        RecordType.AAAA: 'AAAA',
        RecordType.CNAME: 'CNAME',
        RecordType.MX: 'MX',
        RecordType.NS: 'NS',
        RecordType.PTR: 'PTR',
        RecordType.SOA: 'SOA',
        RecordType.SRV: 'SRV',
        RecordType.TXT: 'TXT'
    }

    def list_zones(self):
        action = '/v1/zones'
        response = self.connection.request(action=action, method='GET')
        zones = self._to_zones(items=response.parse_body())

        return zones

    def get_zone(self, zone_id):
        """
        :param zone_id: Zone domain name (e.g. example.com)
        :return: :class:`Zone`
        """
        action = '/v1/zones/%s' % zone_id
        try:
            response = self.connection.request(action=action, method='GET')
        except NsOneException as e:
            if e.message == 'zone not found':
                raise ZoneDoesNotExistError(value=e.message, driver=self,
                                            zone_id=zone_id)
            else:
                raise e
        zone = self._to_zone(response.objects[0])

        return zone

    def create_zone(self, domain, type='master', ttl=None, extra=None):
        """
        :param domain: Zone domain name (e.g. example.com)
        :type domain: ``str``

        :param type: Zone type (This is not really used. See API docs for extra
          parameters)
        :type type: ``str``

        :param ttl: TTL for new records (This is used through the extra param)
        :type ttl: ``int``

        :param extra: Extra attributes that are specific to the driver
        such as ttl.
        :type extra: ``dict``

        :rtype: :class:`Zone`
        """
        action = '/v1/zones/%s' % domain
        raw_data = {'zone': domain}
        if extra is not None:
            raw_data.update(extra)
        post_data = json.dumps(raw_data)
        try:
            response = self.connection.request(action=action, method='PUT',
                                               data=post_data)
        except NsOneException as e:
            if e.message == 'zone already exists':
                raise ZoneAlreadyExistsError(value=e.message, driver=self,
                                             zone_id=domain)
            else:
                raise e

        zone = self._to_zone(response.object)

        return zone

    def delete_zone(self, zone):
        """
        :param zone: Zone to be deleted.
        :type zone: :class:`Zone`

        :return: Boolean
        """
        action = '/v1/zones/%s' % zone.domain
        """zones_list = self.list_zones()
        if not self.ex_zone_exists(zone_id=zone.id, zones_list=zones_list):
            raise ZoneDoesNotExistError(value='', driver=self, zone_id=zone.id)
        """
        try:
            response = self.connection.request(action=action, method='DELETE')
        except NsOneException as e:
            if e.message == 'zone not found':
                raise ZoneDoesNotExistError(value=e.message, driver=self,
                                            zone_id=zone.id)
            else:
                raise e

        return response.status == httplib.OK

    def list_records(self, zone):
        """
        :param zone: Zone to list records for.
        :type zone: :class:`Zone`

        :return: ``list`` of :class:`Record`
        """
        action = '/v1/zones/%s' % zone.domain
        try:
            response = self.connection.request(action=action, method='GET')
        except NsOneException as e:
            if e.message == 'zone not found':
                raise ZoneDoesNotExistError(value=e.message, driver=self,
                                            zone_id=zone.id)
            else:
                raise e
        records = self._to_records(items=response.parse_body()['records'],
                                   zone=zone)

        return records

    def get_record(self, zone_id, record_id):
        """
        :param zone_id: The id of the zone where to search for
        the record (e.g. example.com)
        :type zone_id: ``str``
        :param record_id: The type of record to search for
        (e.g. A, AAA, MX etc)

        :return: :class:`Record`
        """
        action = '/v1/zones/%s/%s/%s' % (zone_id, zone_id, record_id)
        try:
            response = self.connection.request(action=action, method='GET')
        except NsOneException as e:
            if e.message == 'record not found':
                raise RecordDoesNotExistError(value=e.message, driver=self,
                                              record_id=record_id)
            else:
                raise e
        zone = self.get_zone(zone_id=zone_id)
        record = self._to_record(item=response.parse_body(), zone=zone)

        return record

    def delete_record(self, record):
        """
        :param record: Record to delete.
        :type record: :class:`Record`

        :return: Boolean
        """
        action = '/v1/zones/%s/%s/%s' % (record.zone.domain, record.name,
                                         record.type)
        try:
            response = self.connection.request(action=action, method='DELETE')
        except NsOneException as e:
            if e.message == 'record not found':
                raise RecordDoesNotExistError(value=e.message, driver=self,
                                              record_id=record.id)
            else:
                raise e

        return response.status == httplib.OK

    def create_record(self, name, zone, type, data, extra=None):
        """
        :param name: Name of the record to create (e.g. foo).
        :type name: ``str``
        :param zone: Zone where the record should be created.
        :type zone: :class:`Zone`
        :param type: Type of record (e.g. A, MX etc)
        :type type: ``str``
        :param data: Data of the record (e.g. 127.0.0.1 for the A record)
        :type data: ``str``
        :param extra: Extra data needed to create different types of records
        :type extra: ``dict``
        :return: :class:`Record`
        """
        record_name = '%s.%s' % (name, zone.domain) if name != '' else zone.domain  # noqa

        action = '/v1/zones/%s/%s/%s' % (zone.domain, record_name, type)
        if type == RecordType.MX:
            answer = [extra.get('priority', 10), data]
        else:
            answer = [data]

        raw_data = {
            "answers": [{"answer": answer}],
            "type": type,
            "domain": record_name,
            "zone": zone.domain
        }
        if extra is not None and extra.get('answers'):
            raw_data['answers'] = extra.get('answers')
        post_data = json.dumps(raw_data)
        try:
            response = self.connection.request(action=action, method='PUT',
                                               data=post_data)
        except NsOneException as e:
            if e.message == 'record already exists':
                raise RecordAlreadyExistsError(value=e.message, driver=self,
                                               record_id='')
            else:
                raise e
        record = self._to_record(item=response.parse_body(), zone=zone)

        return record

    def update_record(self, record, name, type, data, extra=None):
        """
        :param record: Record to update
        :type record: :class:`Record`
        :param name: Name of the record to update (e.g. foo).
        :type name: ``str``
        :param type: Type of record (e.g. A, MX etc)
        :type type: ``str``
        :param data: Data of the record (e.g. 127.0.0.1 for the A record)
        :type data: ``str``
        :param extra: Extra data needed to create different types of records
        :type extra: ``dict``
        :return: :class:`Record`
        """
        zone = record.zone
        action = '/v1/zones/%s/%s/%s' % (zone.domain, '%s.%s' %
                                         (name, zone.domain), type)
        raw_data = {
            "answers": [
                {
                    "answer": [
                        data
                    ], }
            ]
        }
        if extra is not None and extra.get('answers'):
            raw_data['answers'] = extra.get('answers')
        post_data = json.dumps(raw_data)
        try:
            response = self.connection.request(action=action, data=post_data,
                                               method='POST')
        except NsOneException as e:
            if e.message == 'record does not exist':
                raise RecordDoesNotExistError(value=e.message, driver=self,
                                              record_id=record.id)
            else:
                raise e
        record = self._to_record(item=response.parse_body(), zone=zone)

        return record

    def ex_zone_exists(self, zone_id, zones_list):
        """
        Function to check if a `Zone` object exists.
        :param zone_id: ID of the `Zone` object.
        :type zone_id: ``str``

        :param zones_list: A list containing `Zone` objects.
        :type zones_list: ``list``.

        :rtype: Returns `True` or `False`.
        """
        zone_ids = []
        for zone in zones_list:
            zone_ids.append(zone.id)

        return zone_id in zone_ids

    def _to_zone(self, item):
        common_attr = ['zone', 'id', 'type']
        extra = {}
        for key in item.keys():
            if key not in common_attr:
                extra[key] = item.get(key)

        zone = Zone(domain=item['zone'], id=item['id'], type=item.get('type'),
                    extra=extra, ttl=extra.get('ttl'), driver=self)

        return zone

    def _to_zones(self, items):
        zones = []
        for item in items:
            zones.append(self._to_zone(item))

        return zones

    def _to_record(self, item, zone):
        common_attr = ['id', 'short_answers', 'answers', 'domain', 'type']
        extra = {}
        for key in item.keys():
            if key not in common_attr:
                extra[key] = item.get(key)
        if item.get('answers') is not None:
            data = item.get('answers')[0]['answer']
        else:
            data = item.get('short_answers')
        record = Record(id=item['id'], name=item['domain'], type=item['type'],
                        data=data, zone=zone, driver=self,
                        extra=extra)

        return record

    def _to_records(self, items, zone):
        records = []
        for item in items:
            records.append(self._to_record(item, zone))

        return records

Anon7 - 2022
AnonSec Team