Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.216.32.251
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/2/cwd/proc/3/root/usr/lib/python3/dist-packages/libcloud/container/drivers/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/2/cwd/proc/3/root/usr/lib/python3/dist-packages/libcloud/container/drivers/lxd.py
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import base64
import re
import os

import collections

try:
    import simplejson as json
except Exception:
    import json

from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import b

from libcloud.common.base import JsonResponse, ConnectionUserAndKey
from libcloud.common.base import KeyCertificateConnection
from libcloud.common.types import InvalidCredsError

from libcloud.container.base import (Container, ContainerDriver,
                                     ContainerImage)
from libcloud.common.exceptions import BaseHTTPError

from libcloud.compute.base import StorageVolume

from libcloud.container.providers import Provider
from libcloud.container.types import ContainerState

# Acceptable success strings comping from LXD API
LXD_API_SUCCESS_STATUS = ['Success']
LXD_API_STATE_ACTIONS = ['stop', 'start', 'restart', 'freeze', 'unfreeze']
LXD_API_IMAGE_SOURCE_TYPE = ["image", "migration", "copy", "none"]

# the wording used by LXD to indicate that an error
# occurred for a request
LXD_ERROR_STATUS_RESP = 'error'


# helpers
def strip_http_prefix(host):
    # strip the prefix
    prefixes = ['http://', 'https://']
    for prefix in prefixes:
        if host.startswith(prefix):
            host = host.strip(prefix)
    return host


def check_certificates(key_file, cert_file, **kwargs):
    """
    Basic checks for the provided certificates in LXDtlsConnection
    """

    # there is no point attempting to connect if either is missing
    if key_file is None or cert_file is None:
        raise InvalidCredsError("TLS Connection requires specification "
                                "of a key file and a certificate file")

    # if they are not none they may be empty strings
    # or certificates that are not appropriate
    if key_file == '' or cert_file == '':
        raise InvalidCredsError("TLS Connection requires specification "
                                "of a key file and a certificate file")

    # if none of the above check the types
    if 'key_files_allowed' in kwargs.keys():
        key_file_suffix = key_file.split('.')

        if key_file_suffix[-1] not in kwargs['key_files_allowed']:
            raise InvalidCredsError("Valid key files are: " +
                                    str(kwargs['key_files_allowed']) +
                                    "you provided: " + key_file_suffix[-1])

            # if none of the above check the types
    if 'cert_files_allowed' in kwargs.keys():
        cert_file_suffix = cert_file.split('.')

        if cert_file_suffix[-1] not in kwargs['cert_files_allowed']:
            raise InvalidCredsError("Valid certification files are: " +
                                    str(kwargs['cert_files_allowed']) +
                                    "you provided: " + cert_file_suffix[-1])

    # if all these are good check the paths
    keypath = os.path.expanduser(key_file)
    is_file_path = os.path.exists(keypath) and os.path.isfile(keypath)
    if not is_file_path:
        raise InvalidCredsError('You need a key file to authenticate with '
                                'LXD tls. This can be found in the server.')

    certpath = os.path.expanduser(cert_file)
    is_file_path = os.path.exists(certpath) and os.path.isfile(certpath)
    if not is_file_path:
        raise InvalidCredsError('You need a certificate file to '
                                'authenticate with LXD tls. '
                                'This can be found in the server.')


def assert_response(response_dict, status_code):
    """
    Basic checks that the response is of the type
    the client is expecting
    """

    # if the type of the response is an error
    if response_dict['type'] == LXD_ERROR_STATUS_RESP:
        # an error returned
        raise LXDAPIException(message="response type is error",
                              response_dict=response_dict)

    # anything else apart from the status_code given should be treated as error
    if response_dict['status_code'] != status_code:
        # we have an unknown error
        msg = "Status code should be {0}\
         but is {1}".format(status_code, response_dict['status_code'])
        raise LXDAPIException(message=msg,
                              response_dict=response_dict)


class LXDAPIException(Exception):
    """
    Basic exception to be thrown when LXD API
    returns with some kind of error
    """

    def __init__(self, message="Unknown Error Occurred", response_dict=None,
                 error_type=""):
        self.message = message
        self.response_dict = response_dict
        self.type = error_type

        super(LXDAPIException, self).__init__(message)

    def __str__(self):
        return self.message


class LXDStoragePool(object):
    """
    Utility class representing an LXD storage pool
    https://lxd.readthedocs.io/en/latest/storage/
    """
    def __init__(self, name, driver, used_by, config, managed):

        # the name of the storage pool
        self.name = name

        # the driver (or type of storage pool). e.g. ‘zfs’ or ‘btrfs’, etc.
        self.driver = driver

        # which containers (by API endpoint /1.0/containers/<name>)
        # are using this storage-pool.
        self.used_by = used_by

        # a dictionary with some information about the storage-pool.
        # e.g. size, source (path), volume.size, etc.
        self.config = config

        # Boolean that indicates whether LXD manages the pool or not.
        self.managed = managed


class LXDNetwork(object):
    """
    Utility class representing an LXD network
    """

    @classmethod
    def build_from_response(cls, metadata):
        lxd_network = LXDNetwork()
        lxd_network.name = metadata.get("name", None)
        lxd_network.id = metadata.get("name", None)
        lxd_network.description = metadata.get("description", None)
        lxd_network.type = metadata.get("type", None)
        lxd_network.config = metadata.get("config", None)
        lxd_network.status = metadata.get("status", None)
        lxd_network.locations = metadata.get("locations", None)
        lxd_network.managed = metadata.get("managed", None)
        lxd_network.used_by = metadata.get("used_by", None)
        return lxd_network

    def __init__(self):
        self.name = None
        self.id = None
        self.description = None
        self.type = None
        self.config = None
        self.status = None
        self.locations = None
        self.managed = None
        self.used_by = None
        self.extra = {}


class LXDServerInfo(object):
    """
    Wraps the response form /1.0
    """

    @classmethod
    def build_from_response(cls, metadata):

        server_info = LXDServerInfo()
        server_info.api_extensions = metadata.get("api_extensions", None)
        server_info.api_status = metadata.get("api_status", None)
        server_info.api_version = metadata.get("api_version", None)
        server_info.auth = metadata.get("auth", None)
        server_info.config = metadata.get("config", None)
        server_info.environment = metadata.get("environment", None)
        server_info.public = metadata.get("public", None)
        return server_info

    def __init__(self):

        # List of API extensions added after
        # the API was marked stable
        self.api_extensions = None

        # API implementation status
        # (one of, development, stable or deprecated)
        self.api_status = None

        # The API version as a string
        self.api_version = None

        # Authentication state,
        # one of "guest", "untrusted" or "trusted"
        self.auth = None

        self.config = None

        # Various information about the host (OS, kernel, ...)
        self.environment = None

        self.public = None

    def __str__(self):
        return str(self.api_extensions) + str(self.api_status) + \
            str(self.api_version) + str(self.auth) + str(self.config) + \
            str(self.environment) + \
            str(self.public)


LXDContainerExecuteResult = collections.namedtuple('LXDContainerExecuteResult',
                                                   ['uuid',
                                                    'secret_0',
                                                    'secret_1',
                                                    'secret_2',
                                                    'control',
                                                    'output',
                                                    'result'])


class LXDResponse(JsonResponse):
    valid_response_codes = [httplib.OK, httplib.ACCEPTED, httplib.CREATED,
                            httplib.NO_CONTENT]

    def parse_body(self):

        if len(self.body) == 0 and not self.parse_zero_length_body:
            return self.body

        try:
            content_type = self.headers.get('content-type', 'application/json')
            if content_type == 'application/json' or content_type == '':
                if self.headers.get('transfer-encoding') == 'chunked' and \
                        'fromImage' in self.request.url:
                    body = [json.loads(chunk) for chunk in
                            self.body.strip().replace('\r', '').split('\n')]
                else:
                    body = json.loads(self.body)
            else:
                body = self.body
        except ValueError:
            m = re.search('Error: (.+?)"', self.body)
            if m:
                error_msg = m.group(1)
                raise Exception(error_msg)
            else:
                msg = ('ConnectionError: Failed to parse JSON response '
                       '(body=%s)' % (self.body))
                raise Exception(msg)
        return body

    def parse_error(self):
        if self.status == 401:
            raise InvalidCredsError('Invalid credentials')
        return self.body

    def success(self):
        return self.status in self.valid_response_codes


class LXDConnection(ConnectionUserAndKey):
    responseCls = LXDResponse
    timeout = 60

    def add_default_headers(self, headers):
        """
        Add parameters that are necessary for every request
        If user and password are specified, include a base http auth
        header
        """
        headers['Content-Type'] = 'application/json'
        if self.user_id and self.key:
            user_b64 = base64.b64encode(b('%s:%s' % (self.user_id, self.key)))
            headers['Authorization'] = 'Basic %s' % (user_b64.decode('utf-8'))
        return headers


class LXDtlsConnection(KeyCertificateConnection):

    responseCls = LXDResponse

    def __init__(self, key, secret, secure=True,
                 host='localhost', port=8443, ca_cert='',
                 key_file=None, cert_file=None,
                 certificate_validator=None, **kwargs):

        if certificate_validator is not None:
            certificate_validator(key_file=key_file, cert_file=cert_file)
        else:
            check_certificates(key_file=key_file,
                               cert_file=cert_file, **kwargs)

        super(LXDtlsConnection, self).__init__(key_file=key_file,
                                               cert_file=cert_file,
                                               secure=secure, host=host,
                                               port=port, url=None,
                                               proxy_url=None,
                                               timeout=None, backoff=None,
                                               retry_delay=None)

        self.key_file = key_file
        self.cert_file = cert_file

    def add_default_headers(self, headers):
        headers['Content-Type'] = 'application/json'
        return headers


class LXDContainerDriver(ContainerDriver):
    """
    Driver for LXD REST API of LXC containers
    https://lxd.readthedocs.io/en/stable-2.0/rest-api/
    https://github.com/lxc/lxd/blob/master/doc/rest-api.md
    """
    type = Provider.LXD
    name = 'LXD'
    website = 'https://linuxcontainers.org/'
    connectionCls = LXDConnection

    # LXD supports clustering but still the functionality
    # is not implemented yet on our side
    supports_clusters = False
    version = '1.0'
    default_time_out = 30

    # default configuration when creating a container
    # if the architecture is not specified
    # by the client code then the underlying
    # host architecture should be picked up by
    # LXC.
    default_architecture = ''
    default_profiles = 'default'

    # An ephemeral container means that it
    # will be restroyed once it is stopped
    default_ephemeral = False

    def __init__(self, key='', secret='', secure=False,
                 host='localhost', port=8443, key_file=None,
                 cert_file=None, ca_cert=None,
                 certificate_validator=check_certificates):

        if key_file:

            if not cert_file:
                # LXD tls authentication-
                # We pass two files, a key_file with the
                # private key and cert_file with the certificate
                # libcloud will handle them through LibcloudHTTPSConnection

                raise LXDAPIException(message='Need both private key and'
                                              ' certificate files for '
                                              'tls authentication')

            self.connectionCls = LXDtlsConnection
            self.key_file = key_file
            self.cert_file = cert_file
            self.certificate_validator = certificate_validator
            secure = True

        if host.startswith('https://'):
            secure = True

        host = strip_http_prefix(host=host)

        super(LXDContainerDriver, self).__init__(key=key, secret=secret,
                                                 secure=secure, host=host,
                                                 port=port,
                                                 key_file=key_file,
                                                 cert_file=cert_file)

        if ca_cert:
            self.connection.connection.ca_cert = ca_cert
        else:
            # do not verify SSL certificate
            self.connection.connection.ca_cert = False

        self.connection.secure = secure
        self.connection.host = host
        self.connection.port = port
        self.version = self._get_api_version()

    def build_operation_websocket_url(self, uuid, w_secret):

        uri = 'wss://%s:%s/%s/operations/%s/' \
              'websocket?secret=%s' % (self.connection.host,
                                       self.connection.port,
                                       self.version,
                                       uuid, w_secret)
        return uri

    def ex_get_api_endpoints(self):
        """
        Description: List of supported APIs
        Authentication: guest
        Operation: sync
        Return: list of supported API endpoint URLs

        """
        response = self.connection.request("/")
        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)
        return response_dict["metadata"]

    def ex_get_server_configuration(self):
        """

        Description: Server configuration and environment information
        Authentication: guest, untrusted or trusted
        Operation: sync
        Return: Dict representing server state

        The returned configuration depends on whether the connection
        is trusted or not
        :rtype: :class: .LXDServerInfo

        """
        response = self.connection.request("/%s" % (self.version))
        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)
        meta = response_dict["metadata"]
        return LXDServerInfo.build_from_response(metadata=meta)

    def deploy_container(self, name, image, cluster=None,
                         parameters=None, start=True,
                         ex_architecture=default_architecture,
                         ex_profiles=None,
                         ex_ephemeral=default_ephemeral,
                         ex_config=None, ex_devices=None,
                         ex_instance_type=None,
                         ex_timeout=default_time_out):

        """
        Create a new container
        Authentication: trusted
        Operation: async

        :param name: The name of the new container.
        64 chars max, ASCII, no slash, no colon and no comma
        :type  name: ``str``

        :param image: The container image to deploy. Currently not used
        :type  image: :class:`.ContainerImage`

        :param cluster: The cluster to deploy to, None is default
        :type  cluster: :class:`.ContainerCluster`

        :param parameters: Container Image parameters.
        This parameter should represent the
        the ``source`` dictioanry expected by the  LXD API call. For more
        information how this parameter should be structured see
        https://github.com/lxc/lxd/blob/master/doc/rest-api.md
        :type  parameters: ``str``

        :param start: Start the container on deployment. True is the default
        :type  start: ``bool``

        :param ex_architecture: string e.g. x86_64
        :type  ex_architecture: ``str``

        :param ex_profiles: List of profiles
        :type  ex_profiles: ``list``

        :param ex_ephemeral: Whether to destroy the container on shutdown
        :type  ex_ephemeral: ``bool``

        :param ex_config: Config override e.g.  {"limits.cpu": "2"}
        :type  ex_config: ``dict``

        :param ex_devices: optional list of devices the container should have
        :type  ex_devices: ``dict``

        :param ex_instance_type: An optional instance type
        to use as basis for limits e.g. "c2.micro"
        :type  ex_instance_type: ``str``

        :param ex_timeout: Timeout
        :type  ex_timeout: ``int``

        :rtype: :class:`libcloud.container.base.Container`
        """

        if parameters:
            parameters = json.loads(parameters)

            if parameters["source"].get("mode", None) == "pull":

                try:
                    # this means the image must be downloaded
                    image = self.install_image(path=None,
                                               ex_timeout=ex_timeout,
                                               **parameters)
                except Exception as e:
                    raise LXDAPIException(message='Deploying '
                                                  'container failed:  '
                                                  'Image could not '
                                                  'be installed. %r' % e)

                # if the image was installed then we need to change
                # how parameters are structured
                parameters = {"source": {"type": "image",
                                         "fingerprint":
                                             image.extra['fingerprint']}}

        cont_params = \
            LXDContainerDriver._fix_cont_params(architecture=ex_architecture,
                                                profiles=ex_profiles,
                                                ephemeral=ex_ephemeral,
                                                config=ex_config,
                                                devices=ex_devices,
                                                instance_type=ex_instance_type)

        container = self._deploy_container_from_image(name=name, image=image,
                                                      parameters=parameters,
                                                      cont_params=cont_params,
                                                      timeout=ex_timeout)

        if start:
            container.start()

        return container

    def get_container(self, id, ex_get_ip_addr=True):

        """
        Get a container by ID

        :param id: The ID of the container to get
        :type  id: ``str``

        :param ex_get_ip_addr: Indicates whether ip addresses
        should also be included. This requires an extra GET request
        :type  ex_get_ip_addr: ``boolean```

        :rtype: :class:`libcloud.container.base.Container`
        """
        req = "/%s/containers/%s" % (self.version, id)
        response = self.connection.request(req)
        result_dict = response.parse_body()
        assert_response(response_dict=result_dict, status_code=200)

        metadata = result_dict["metadata"]

        ips = []
        if ex_get_ip_addr:
            req = "/%s/containers/%s/state" % (self.version, id)
            ip_response = self.connection.request(req)

            ip_result_dict = ip_response.parse_body()
            assert_response(response_dict=ip_result_dict, status_code=200)

            if ip_result_dict["metadata"]["network"] is not None:
                networks = ip_result_dict["metadata"]["network"]["eth0"]

                # the list of addresses
                addresses = networks["addresses"]

                for item in addresses:
                    ips.append(item["address"])

        metadata.update({"ips": ips})
        return self._to_container(metadata=metadata)

    def start_container(self, container, ex_timeout=default_time_out,
                        ex_force=True, ex_stateful=True):
        """
        Start a container

        :param container: The container to start
        :type  container: :class:`libcloud.container.base.Container`

        :param ex_timeout: Time to wait for the operation to complete
        :type  ex_timeout: ``int``

        :param ex_force:
        :type  ex_force: ``boolean``

        :param ex_stateful:
        :type  ex_stateful: ``boolean``

        :rtype: :class:`libcloud.container.base.Container`
        """
        return self._do_container_action(container=container, action='start',
                                         timeout=ex_timeout,
                                         force=ex_force, stateful=ex_stateful)

    def stop_container(self, container, ex_timeout=default_time_out,
                       ex_force=True, ex_stateful=True):
        """
        Stop the given container

        :param container: The container to be stopped
        :type  container: :class:`libcloud.container.base.Container`

        :param ex_timeout: Time to wait for the operation to complete
        :type  ex_timeout: ``int``

        :param ex_force:
        :type  ex_force: ``boolean``

        :param ex_stateful:
        :type  ex_stateful: ``boolean``

        :rtype: :class:`libcloud.container.base.Container
        """
        return self._do_container_action(container=container, action='stop',
                                         timeout=ex_timeout,
                                         force=ex_force, stateful=ex_stateful)

    def restart_container(self, container, ex_timeout=default_time_out,
                          ex_force=True, ex_stateful=True):
        """
        Restart a deployed container

        :param container: The container to restart
        :type  container: :class:`.Container`

        :param ex_timeout: Time to wait for the operation to complete
        :type  ex_timeout: ``int``

        :param ex_force:
        :type  ex_force: ``boolean``

        :param ex_stateful:
        :type  ex_stateful: ``boolean``

        :rtype: :class:`libcloud.container.base.Container
        """
        return self._do_container_action(container=container, action='restart',
                                         timeout=ex_timeout,
                                         force=ex_force, stateful=ex_stateful)

    def ex_freeze_container(self, container, ex_timeout=default_time_out):

        """
        Set the given container into a freeze state

        :param container: The container to restart
        :type  container: :class:`.Container`

        :param ex_timeout: Time to wait for the operation to complete
        :type  ex_timeout: ``int``

        :rtype :class: `libcloud.container.base.Container
        """

        return self._do_container_action(container=container,
                                         action='freeze',
                                         timeout=ex_timeout,
                                         force=True, stateful=True)

    def ex_unfreeze_container(self, container, ex_timeout=default_time_out):

        """
        Set the given container into  unfreeze state

        :param container: The container to restart
        :type  container: :class:`.Container`

        :param ex_timeout: Time to wait for the operation to complete
        :type  ex_timeout: ``int``

        :rtype :class: `libcloud.container.base.Container
        """

        return self._do_container_action(container=container,
                                         action='unfreeze',
                                         timeout=ex_timeout,
                                         force=True, stateful=True)

    def destroy_container(self, container, ex_timeout=default_time_out):
        """
        Destroy a deployed container. Raises and exception

        if the container is running

        :param container: The container to destroy
        :type  container: :class:`.Container`

        :param ex_timeout: Time to wait for the operation to complete
        :type  ex_timeout ``int``

        :rtype: :class:`libcloud.container.base.Container
        """

        # Return: background operation or standard error
        req = '/%s/containers/%s' % (self.version, container.name)

        try:
            response = self.connection.request(req, method='DELETE')

            response_dict = response.parse_body()
            assert_response(response_dict=response_dict, status_code=100)
        except BaseHTTPError as err:
            # handle the case where the container is running
            raise self._get_lxd_api_exception_for_error(err)

        try:

            # wait until the timeout...but util getting here the operation
            # may have finished already
            id = response_dict['metadata']['id']
            req = '/%s/operations/%s/wait?timeout=%s' % (self.version,
                                                         id,
                                                         ex_timeout)
            response = self.connection.request(req)
        except BaseHTTPError as err:

            lxd_exception = self._get_lxd_api_exception_for_error(err)
            # if not found assume the operation completed
            if lxd_exception.message != "not found":
                raise lxd_exception

        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)

        # return a dummy container
        container = Container(driver=self, name=container.name,
                              id=container.name,
                              state=ContainerState.TERMINATED,
                              image=None, ip_addresses=[],
                              extra=None)
        return container

    def ex_execute_cmd_on_container(self, cont_id, command, **config):
        """
        Description: run a remote command
        Operation: async

        Return: Depends on the  the configuration

        if wait-for-websocket=true and interactive=false
        returns a LXDContainerExecuteResult with:
            uuid=uuid,
            secret_0=fds["0"],
            secret_1=fds["1"],
            secret_2=fds["2"],
            control=fds["control"],
            output={}, result=None

        if wait-for-websocket=true and interactive=true
        returns a LXDContainerExecuteResult with:
            uuid=uuid,
            secret_0=fds["0"],
            secret_1=None,
            secret_2=None,
            control=fds["control"],
            output={}, result=None

        if interactive=false and record-output=true
        returns a LXDContainerExecuteResult with:
            uuid=uuid,
            secret_0=None,
            secret_1=None,
            secret_2=None,
            control=None,
            output=output, result=result

        if none of the above it assumes that the command has
        been executed and returns LXDContainerExecuteResult with:
            uuid=uuid,
            secret_0=None,
            secret_1=None,
            secret_2=None,
            control=None,
            output=None, result=result


        in all the above uuid is the operation id

        :param cont_id: The container name to run the commands
        ":type cont_id: ``str``

        :param command: a list of strings indicating the commands
        and their arguments e.g: ["/bin/bash ls -l"]
        :type  command ``list``

        :param config: Dict with extra arguments.

            For example:

            width:  Initial width of the terminal default 80
            height: Initial height of the terminal default 25
            user:   User to run the command as default 1000
            group: Group to run the  command as default 1000
            cwd: Current working directory default /tmp

            wait-for-websocket: Whether to wait for a connection
            before starting the process. Default False

            record-output: Whether to store stdout and stderr
            (only valid with wait-for-websocket=false)
            (requires API extension container_exec_recording). Default False

            interactive: Whether to allocate a pts device
            instead of PIPEs. Default true

        :type config ``dict``

        :rtype LXDContainerExecuteResult
        """

        input = {"command": command}
        input = LXDContainerDriver._create_exec_configuration(input, **config)
        data = json.dumps(input)
        req = "/%s/containers/%s/exec" % (self.version, cont_id)

        # Return: background operation +
        # optional websocket information or standard error
        response = self.connection.request(req, method="POST", data=data)

        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=100)

        fds = response_dict['metadata']['metadata']['fds']
        uuid = response_dict['metadata']['id']

        if input["wait-for-websocket"] is True and\
                input["interactive"] is False:
            return LXDContainerExecuteResult(uuid=uuid,
                                             secret_0=fds["0"],
                                             secret_1=fds["1"],
                                             secret_2=fds["2"],
                                             control=fds["control"],
                                             output={}, result=None)

        elif input["wait-for-websocket"] is True and\
                input["interactive"] is True:

            return LXDContainerExecuteResult(uuid=uuid,
                                             secret_0=fds["0"],
                                             secret_1=None,
                                             secret_2=None,
                                             control=fds["control"],
                                             output={}, result=None)

        elif input["interactive"] is False and\
                input["record-output"] is True:

            output = response_dict['metadata']['metadata']['output']
            result = response_dict['metadata']['metadata']['result']
            return LXDContainerExecuteResult(uuid=uuid,
                                             secret_0=None,
                                             secret_1=None,
                                             secret_2=None,
                                             control=None,
                                             output=output, result=result)

        else:

            result = response_dict['metadata']['metadata']['result']
            return LXDContainerExecuteResult(uuid=uuid,
                                             secret_0=None,
                                             secret_1=None,
                                             secret_2=None,
                                             control=None,
                                             output={}, result=result)

    def list_containers(self, image=None, cluster=None, ex_detailed=True):
        """
        List the deployed container images

        :param image: Filter to containers with a certain image
        :type  image: :class:`.ContainerImage`

        :param cluster: Filter to containers in a cluster
        :type  cluster: :class:`.ContainerCluster`

        :param ex_detailed: Flag indicating whether detail info
        of the containers is required. This will cause a
        GET request for every container present in the
        host. Default is True
        :type ex_detailed: ``bool``

        :rtype: ``list`` of :class:`libcloud.container.base.Container
        """

        result = self.connection.request('/%s/containers' % self.version)
        result = result.parse_body()

        # how to treat the errors????
        assert_response(response_dict=result, status_code=200)

        meta = result['metadata']
        containers = []
        for item in meta:
            container_id = item.split('/')[-1]
            if not ex_detailed:
                container = Container(driver=self, name=container_id,
                                      state=ContainerState.UNKNOWN,
                                      id=container_id,
                                      image=image, ip_addresses=[],
                                      extra={})
            else:
                container = self.get_container(id=container_id)
            containers.append(container)

        return containers

    def ex_get_image(self, fingerprint):
        """
        Returns a container image from the given image fingerprint

        :param fingerprint: image fingerprint
        :type  fingerprint: ``str``

        :rtype: :class:`.ContainerImage`
        """
        req = '/%s/images/%s' % (self.version, fingerprint)
        response = self.connection.request(req)

        #  parse the LXDResponse into a dictionary
        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)

        return self._to_image(metadata=response_dict['metadata'])

    def install_image(self, path, ex_timeout=default_time_out,
                      **ex_img_data):

        """
        Install a container image from a remote path. Not that the
        path currently is not used. Image data should be provided
        under the key 'ex_img_data'. Creating an image in LXD is an
        asynchronous operation

        :param path: Path to the container image
        :type  path: ``str``

        :param ex_timeout: Time to wait before signaling timeout
        :type  ex_timeout: ``int``

        :param ex_img_data: Dictionary describing the image data
        :type  ex_img_data: ``dict``

        :rtype: :class:`.ContainerImage`
        """

        if not ex_img_data:
            msg = "Install an image for LXD requires " \
                  "specification of image_data"
            raise LXDAPIException(message=msg)

        # Return: background operation or standard error
        data = ex_img_data['source']

        config = {
            'public': data.get('public', True),
            'auto_update': data.get('auto_update', False),
            'aliases': [data.get('aliases', {})],
            'source': {
                'type': 'url',
                'mode': 'pull',
                'url': data['url']
            }
        }

        config = json.dumps(config)

        # background operation or standard error
        response = self.connection.request('/%s/images' % (self.version),
                                           method='POST', data=config)

        response_dict = response.parse_body()

        # a background operation is expected
        # to be returned status_code = 100 --> Operation created
        assert_response(response_dict=response_dict, status_code=100)

        try:

            # wait until the timeout...but until getting here the operation
            # may have finished already
            id = response_dict['metadata']['id']
            req = '/%s/operations/%s/wait?timeout=%s' % (self.version,
                                                         id,
                                                         ex_timeout)
            response = self.connection.request(req)
        except BaseHTTPError as err:

            lxd_exception = self._get_lxd_api_exception_for_error(err)
            # if not found assume the operation completed
            if lxd_exception.message != "not found":
                raise lxd_exception

        config = json.loads(config)

        # let's see if the image exists
        if len(config['aliases']) != 0 and \
                'name' in config['aliases'][0]:
            image_alias = config['aliases'][0]['name']
        else:
            image_alias = config['source']['url'].split('/')[-1]

        has, fingerprint = self.ex_has_image(alias=image_alias)
        if not has:
            raise LXDAPIException(message="Image %s was "
                                          "not installed " % image_alias)

        return self.ex_get_image(fingerprint=fingerprint)

    def list_images(self):
        """
        List of URLs for images the server is publishing

        :rtype: ``list`` of :class:`.ContainerImage`
        """
        response = self.connection.request('/%s/images' % (self.version))

        #  parse the LXDResponse into a dictionary
        response_dict = response.parse_body()

        assert_response(response_dict=response_dict, status_code=200)

        metadata = response_dict['metadata']
        images = []

        for image in metadata:
            fingerprint = image.split("/")[-1]
            images.append(self.ex_get_image(fingerprint=fingerprint))
        return images

    def ex_has_image(self, alias):
        """
        Helper function. Returns true and the image fingerprint
        if the image with the given alias exists on the host.

        :param alias: the image alias
        :type  alias: ``str``

        :rtype:  ``tupple`` :: (``boolean``, ``str``)
        """

        # get all the images existing on the host
        try:
            response = self.connection.request(
                '/{}/images/aliases/{}'.format(self.version, alias))
            metadata = response.object['metadata']
            return True, metadata.get('target')
        except BaseHTTPError as err:
            lxd_exception = self._get_lxd_api_exception_for_error(err)
            if lxd_exception.message == "not found":
                return False, -1
            else:
                raise lxd_exception
        except Exception as err:
            raise self._get_lxd_api_exception_for_error(err)

    def ex_list_storage_pools(self, detailed=True):
        """
        Returns a list of storage pools defined currently defined on the host

        Description: list of storage pools
        Authentication: trusted
        Operation: sync

        ":rtype: list of StoragePool items
        """

        # Return: list of storage pools that are currently defined on the host
        response = self.connection.request("/%s/storage-pools" % self.version)

        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)

        pools = []
        for pool_item in response_dict['metadata']:
            pool_name = pool_item.split('/')[-1]

            if not detailed:
                # attempt to create a minimal StoragePool
                pools.append(self._to_storage_pool({"name": pool_name,
                                                    "driver": None,
                                                    "used_by": None,
                                                    "config": None,
                                                    "managed": None}))
            else:
                pools.append(self.ex_get_storage_pool(id=pool_name))

        return pools

    def ex_get_storage_pool(self, id):
        """
        Returns  information about a storage pool
        :param id: the name of the storage pool
        :rtype: :class: StoragePool
        """

        # Return: dict representing a storage pool
        req = "/%s/storage-pools/%s" % (self.version, id)
        response = self.connection.request(req)

        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)

        if not response_dict['metadata']:
            msg = "Storage pool with name {0} has no data".format(id)
            raise LXDAPIException(message=msg)

        return self._to_storage_pool(data=response_dict['metadata'])

    def ex_create_storage_pool(self, definition):

        """
        Create a storage_pool from definition.

        Implements POST /1.0/storage-pools

        The `definition` parameter defines
        what the storage pool will be.  An
        example config for the zfs driver is:

                   {
                       "config": {
                           "size": "10GB"
                       },
                       "driver": "zfs",
                       "name": "pool1"
                   }

        Note that **all** fields in the `definition` parameter are strings.
        Note that size has to be at least 64MB in order to create the pool

        For further details on the storage pool types see:
        https://lxd.readthedocs.io/en/latest/storage/

        The function returns the a `StoragePool` instance, if it is
        successfully created, otherwise an LXDAPIException is raised.

        :param definition: the fields to pass to the LXD API endpoint
        :type definition: dict

        :returns: a storage pool if successful,
        raises NotFound if not found
        :rtype: :class:`StoragePool`

        :raises: :class:`LXDAPIExtensionNotAvailable`
        if the 'storage' api extension is missing.
        :raises: :class:`LXDAPIException`
        if the storage pool couldn't be created.
        """

        if not definition:
            raise LXDAPIException("Cannot create a storage pool "
                                  " without a definition")

        data = json.dumps(definition)

        # Return: standard return value or standard error
        response = self.connection.request("/%s/storage-pools" % self.version,
                                           method='POST', data=data)

        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)

        return self.ex_get_storage_pool(id=definition["name"])

    def ex_delete_storage_pool(self, id):
        """Delete the storage pool.

        Implements DELETE /1.0/storage-pools/<self.name>

        Deleting a storage pool may fail if it is being used.  See the LXD
        documentation for further details.

        :raises: :class:`LXDAPIException` if the storage pool can't be deleted.
        """

        # Return: standard return value or standard error
        req = "/%s/storage-pools/%s" % (self.version, id)
        response = self.connection.request(req, method='DELETE')

        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)

    def ex_list_storage_pool_volumes(self, pool_id, detailed=True):
        """
        Description: list of storage volumes
        associated with the given storage pool

        :param pool_id: the id of the storage pool to query
        :param detailed: boolean flag.
        If True extra API calls are made to fill in the missing details
                                       of the storage volumes

        Authentication: trusted
        Operation: sync
        Return: list of storage volumes that
        currently exist on a given storage pool

        :rtype: A list of :class: StorageVolume
        """

        req = "/%s/storage-pools/%s/volumes" % (self.version, pool_id)
        response = self.connection.request(req)
        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)

        volumes = []

        for volume in response_dict['metadata']:
            volume = volume.split("/")
            name = volume[-1]
            type = volume[-2]

            if not detailed:

                metadata = {'config': {'size': None}, "name": name,
                            "type": type, "used_by": None}
                volumes.append(self._to_storage_volume(pool_id=pool_id,
                                                       metadata=metadata))
            else:
                volume = self.ex_get_storage_pool_volume(pool_id=pool_id,
                                                         type=type, name=name)
                volumes.append(volume)

        return volumes

    def ex_get_storage_pool_volume(self, pool_id, type, name):
        """
        Description: information about a storage volume
        of a given type on a storage pool
        Introduced: with API extension storage
        Authentication: trusted
        Operation: sync
        Return: A StorageVolume  representing a storage volume
        """
        req = "/%s/storage-pools/%s/volumes/%s/%s" % (self.version,
                                                      pool_id,
                                                      type, name)
        response = self.connection.request(req)
        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)

        return self._to_storage_volume(pool_id=pool_id,
                                       metadata=response_dict["metadata"])

    def ex_get_volume_by_name(self, name, vol_type="custom"):
        """
        Returns a storage volume that has the given name.
        The function will loop over all storage-polls available
        and will pick the first volume from the first storage poll
        that matches the given name. Thus this function can be
        quite expensive

        :param name: The name of the volume to look for
        :type  name: str

        :param vol_type: The type of the volume default is custom
        :type  vol_type: str

        :return: A StorageVolume  representing a storage volume
        """

        req = '/%s/storage-pools' % self.version
        response = self.connection.request(req)
        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)

        pools = response_dict['metadata']

        for pool in pools:

            pool_id = pool.split('/')[-1]

            volumes = self.ex_list_storage_pool_volumes(pool_id=pool_id)

            for vol in volumes:
                if vol.name == name:
                    return vol

        return None

    def create_volume(self, pool_id, definition, **kwargs):

        """
        Create a new storage volume on a given storage pool
        Operation: sync or async (when copying an existing volume)
        :return: A StorageVolume  representing a storage volume
        """

        if not definition:
            raise LXDAPIException("Cannot create a storage volume "
                                  "without a definition")

        size_type = definition.pop('size_type')
        definition['config']['size'] = \
            str(LXDContainerDriver._to_bytes(definition['config']['size'],
                                             size_type=size_type))

        data = json.dumps(definition)

        # Return: standard return value or standard error
        req = "/%s/storage-pools/%s/volumes" % (self.version, pool_id)
        response = self.connection.request(req, method='POST', data=data)

        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)

        return self.ex_get_storage_pool_volume(pool_id=pool_id,
                                               type=definition["type"],
                                               name=definition["name"])

    def attach_volume(self, container_id, volume_id,
                      pool_id, name, path, ex_timeout=default_time_out):
        """
        Attach the volume with id volume_id
        to the container with id container_id
        """
        container = self.get_container(id=container_id)
        config = container.extra

        # expand the devices for the container
        config['devices'] = {

            name: {"path": path,
                   "type": "disk",
                   "source": volume_id,
                   "pool": pool_id
                   }
        }

        data = json.dumps(config)

        req = "/%s/containers/%s" % (self.version, container_id)
        response = self.connection.request(req,
                                           method="PUT",
                                           data=data)

        response_dict = response.parse_body()

        # a background operation is expected
        # to be returned status_code = 100 --> Operation created
        assert_response(response_dict=response_dict, status_code=100)

        try:

            # wait until the timeout...but util getting here the operation
            # may have finished already
            oid = response_dict['metadata']['id']
            req = '/%s/operations/%s/wait?timeout=%s' % (self.version,
                                                         oid,
                                                         ex_timeout)
            response = self.connection.request(req)
        except BaseHTTPError as err:

            lxd_exception = self._get_lxd_api_exception_for_error(err)
            # if not found assume the operation completed
            if lxd_exception.message != "not found":
                raise lxd_exception

        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)
        return self.get_container(id=container_id, ex_get_ip_addr=True)

    def ex_replace_storage_volume_config(self, pool_id, type,
                                         name, definition):
        """
        Replace the storage volume information
        :param pool_id:
        :param type:
        :param name:
        :param definition
        """

        if not definition:
            raise LXDAPIException("Cannot create a storage "
                                  "volume without a definition")

        data = json.dumps(definition)
        response = self.connection.request("/%s/storage-pools/%s/volumes/%s/%s"
                                           % (self.version, pool_id,
                                              type, name),
                                           method="PUT", data=data)

        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)
        return self.ex_get_storage_pool_volume(pool_id=pool_id,
                                               type=type, name=name)

    def ex_delete_storage_pool_volume(self, pool_id, type, name):
        """
        Delete a storage volume of a given type on a given storage pool

        :param pool_id:
        :type ``str``

        :param type:
        :type  ``str``

        :param name:
        :type ``str``

        :return:
        """

        try:

            req = "/%s/storage-pools/%s/volumes/%s/%s" % (self.version,
                                                          pool_id,
                                                          type,
                                                          name)
            response = self.connection.request(req, method="DELETE")

            response_dict = response.parse_body()
            assert_response(response_dict=response_dict, status_code=200)
        except BaseHTTPError as err:
            raise self._get_lxd_api_exception_for_error(err)

        return True

    def ex_list_networks(self):
        """
        Returns a list of networks.
        Implements GET /1.0/networks
        Authentication: trusted
        Operation: sync

        :rtype: list of LXDNetwork objects
        """

        req = "/%s/networks" % (self.version)
        response = self.connection.request(req)

        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)

        nets = response_dict["metadata"]
        networks = []
        for net in nets:
            name = net.split('/')[-1]
            networks.append(self.ex_get_network(name=name))
        return networks

    def ex_get_network(self, name):
        """
        Returns the LXD network with the given name.
        Implements GET /1.0/networks/<name>

        Authentication: trusted
        Operation: sync

        :param name: The name of the network to return
        :type  name: str

        :rtype: LXDNetwork
        """
        req = '/%s/networks/%s' % (self.version, name)
        response = self.connection.request(req)
        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)

        return LXDNetwork.build_from_response(response_dict["metadata"])

    def ex_create_network(self, name, **kwargs):
        """
        Create a new network with the given name and
        and the specified configuration

        Authentication: trusted
        Operation: sync

        :param name: The name of the new network
        :type  name: str
        """

        kwargs['name'] = name
        data = json.dumps(kwargs)
        req = '/%s/networks' % self.version
        # Return: standard return value or standard error
        response = self.connection.request(req, method="POST", data=data)
        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)
        return self.ex_get_network(name=name)

    def ex_delete_network(self, name):
        """
        Delete the network with the given name
        Authentication: trusted
        Operation: sync

        :param name: The network name to delete
        :type  name: str

        :return: True is successfully deleted the network
        """

        req = '/%s/networks/%s' % (self.version, name)
        response = self.connection.request(req, method='DELETE')
        response_dict = response.parse_body()
        assert_response(response_dict=response_dict, status_code=200)

        return True

    def _to_container(self, metadata):
        """
        Returns Container instance built from the given metadata

        :param metadata: dictionary with the container metadata
        :type  metadata: ``dict``

        :rtype :class:`libcloud.container.base.Container
        """

        name = metadata['name']
        state = metadata['status']

        if state == 'Running':
            state = ContainerState.RUNNING
        elif state == 'Frozen':
            state = ContainerState.PAUSED
        else:
            state = ContainerState.STOPPED

        extra = metadata
        img_id = metadata['config'].get('volatile.base_image', None)
        img_version = metadata['config'].get('image.version', None)
        ips = metadata["ips"]

        image = ContainerImage(id=img_id, name=img_id, path=None,
                               version=img_version, driver=self, extra=None)

        container = Container(driver=self, name=name, id=name,
                              state=state, image=image,
                              ip_addresses=ips, extra=extra)

        return container

    def _do_container_action(self, container, action,
                             timeout, force, stateful):
        """
        change the container state by performing the given action
        action may be either stop, start, restart, freeze or unfreeze
        """

        if action not in LXD_API_STATE_ACTIONS:
            raise ValueError("Invalid action specified")

        # cache the state of the container
        state = container.state

        data = {"action": action, "timeout": timeout}
        data = json.dumps(data)

        # checkout this for stateful:
        # https://discuss.linuxcontainers.org/t/error-in-live-migration/1928
        # looks like we are getting "err":"Unable to perform
        # container live migration. CRIU isn't installed"
        # in the response when stateful is True so remove it for now
        req = '/%s/containers/%s/state' % (self.version, container.name)
        response = self.connection.request(req, method='PUT', data=data)

        response_dict = response.parse_body()

        # a background operation is expected to
        # be returned status_code = 100 --> Operation created
        assert_response(response_dict=response_dict,
                        status_code=100)

        if not timeout:
            timeout = LXDContainerDriver.default_time_out

        try:
            id = response_dict['metadata']['id']
            req = '/%s/operations/%s/wait?timeout=%s' % (self.version,
                                                         id, timeout)
            response = self.connection.request(req)

        except BaseHTTPError as err:
            lxd_exception = self._get_lxd_api_exception_for_error(err)
            # if not found assume the operation completed
            if lxd_exception.message != "not found":
                raise lxd_exception

        # if the container is ephemeral and the action is to stop
        # then the container is removed so return sth dummy
        if state == ContainerState.RUNNING and\
                container.extra['ephemeral'] and action == 'stop':
            # return a dummy container otherwise we get 404 error
            container = Container(driver=self, name=container.name,
                                  id=container.name,
                                  state=ContainerState.TERMINATED, image=None,
                                  ip_addresses=[], extra=None)
            return container

        return self.get_container(id=container.name)

    def _to_image(self, metadata):
        """
        Returns a container image from the given metadata

        :param metadata:
        :type  metadata: ``dict``

        :rtype: :class:`.ContainerImage`
        """
        fingerprint = metadata.get('fingerprint')
        aliases = metadata.get('aliases', [])

        if aliases:
            name = metadata.get('aliases')[0].get('name')
        else:
            name = metadata.get('properties', {}).get('description') \
                or fingerprint

        version = metadata.get('update_source', {}).get('alias')
        extra = metadata

        return ContainerImage(id=fingerprint, name=name, path=None,
                              version=version, driver=self, extra=extra)

    def _to_storage_pool(self, data):
        """
        Given a dictionary with the storage pool configuration
        it returns a StoragePool object
        :param data: the storage pool configuration
        :return: :class: .StoragePool
        """

        return LXDStoragePool(name=data['name'], driver=data['driver'],
                              used_by=data['used_by'], config=['config'],
                              managed=False)

    def _deploy_container_from_image(self, name, image, parameters,
                                     cont_params,
                                     timeout=default_time_out):
        """
        Deploy a new container from the given image

        :param name: the name of the container
        :param image: .ContainerImage

        :param parameters: dictionary describing the source attribute
        :type  parameters ``dict``

        :param cont_params: dictionary describing the container configuration
        :type  cont_params: dict

        :param timeout: Time to wait for the operation before timeout
        :type  timeout: int

        :rtype: :class: .Container
        """

        if cont_params is None:
            raise LXDAPIException(message="cont_params "
                                          "must be a valid dict")

        # container without a pre-populated rootfs
        # see https://github.com/lxc/lxd/blob/master/doc/rest-api.md
        # can be "image", "migration", "copy" or "none"
        data = {'name': name, 'source': {'type': 'none'}}

        if parameters:
            data['source'].update(parameters["source"])

        if data['source']['type'] not in LXD_API_IMAGE_SOURCE_TYPE:
            msg = "source type must in " + str(LXD_API_IMAGE_SOURCE_TYPE)
            raise LXDAPIException(message=msg)

        # add also the other container parameters
        data.update(cont_params)

        data = json.dumps(data)

        # Return: background operation or standard error
        response = self.connection.request('/%s/containers' % self.version,
                                           method='POST', data=data)
        response_dict = response.parse_body()

        # a background operation is expected to
        # be returned status_code = 100 --> Operation created
        assert_response(response_dict=response_dict, status_code=100)

        # make sure we don't wait indefinitely
        # until the operation is done
        if not timeout:
            timeout = LXDContainerDriver.default_time_out

        try:
            # wait untitl the timeout...but util getting here the operation
            # may have finished already
            id = response_dict['metadata']['id']
            req_str = '/%s/operations/%s/wait?timeout=%s' % (self.version,
                                                             id,
                                                             timeout)
            response = self.connection.request(req_str)
        except BaseHTTPError as err:
            lxd_exception = self._get_lxd_api_exception_for_error(err)
            # if not found assume the operation completed
            if lxd_exception.message != "not found":
                raise lxd_exception
        return self.get_container(id=name)

    def _to_storage_volume(self, pool_id, metadata):
        """
        Returns StorageVolume object from metadata
        :param metadata: dict representing the volume
        :rtype: StorageVolume
        """

        size = 0
        if "size" in metadata['config'].keys():
            size = LXDContainerDriver._to_gb(metadata['config'].pop('size'))

        extra = {"pool_id": pool_id,
                 "type": metadata["type"],
                 "used_by": metadata["used_by"],
                 "config": metadata['config']}

        return StorageVolume(id=metadata['name'], name=metadata['name'],
                             driver=self, size=size, extra=extra)

    def _get_api_version(self):
        """
        Get the LXD API version
        """
        return LXDContainerDriver.version

    def _ex_connection_class_kwargs(self):
        """
        Return extra connection keyword arguments which are passed to the
        Connection class constructor.
        """

        if hasattr(self, "key_file") and hasattr(self, "cert_file"):
            return {"key_file": self.key_file,
                    "cert_file": self.cert_file,
                    "certificate_validator": self.certificate_validator}
        return super(LXDContainerDriver, self)._ex_connection_class_kwargs()

    @staticmethod
    def _create_exec_configuration(input, **config):
        """
        Prepares the input parameters for executyion API call
        """
        if "environment" in config.keys():
            input["environment"] = config["environment"]

        if "width" in config.keys():
            input["width"] = int(config["width"])
        else:
            input["width"] = 80

        if "height" in config.keys():
            input["height"] = int(config["height"])
        else:
            input["height"] = 25

        if "user" in config.keys():
            input["user"] = config["user"]

        if "group" in config.keys():
            input["group"] = config["group"]

        if "cwd" in config.keys():
            input["cwd"] = config["cwd"]

        if "wait-for-websocket" in config.keys():
            input["wait-for-websocket"] = config["wait-for-websocket"]
        else:
            input["wait-for-websocket"] = False

        if "record-output" in config.keys():
            input["record-output"] = config["record-output"]

        if "interactive" in config.keys():
            input["interactive"] = config["interactive"]

        return input

    @staticmethod
    def _fix_cont_params(architecture, profiles,
                         ephemeral, config,
                         devices, instance_type):
        """
        Returns a dict with the container parameters
        """

        cont_params = {}

        # add also the other container parameters
        if architecture is not None:
            cont_params['architecture'] = architecture

        if profiles is not None:
            cont_params["profiles"] = profiles
        else:
            cont_params["profiles"] = [LXDContainerDriver.default_profiles]

        if ephemeral is not None:
            cont_params["ephemeral"] = ephemeral
        else:
            cont_params["ephemeral"] = LXDContainerDriver.default_ephemeral

        if config is not None:
            cont_params["config"] = config

        if devices is not None:
            cont_params["devices"] = devices

        if instance_type is not None:
            cont_params["instance_type"] = instance_type

        return cont_params

    def _get_lxd_api_exception_for_error(self, error):
        error_dict = json.loads(error.message)
        message = error_dict.get('error')
        return LXDAPIException(message=message,
                               error_type=error_dict.get('type', ''))

    @staticmethod
    def _to_gb(size):
        """
        Convert the given size in bytes to gigabyte
        :param size: in bytes
        :return: int representing the gigabytes
        """
        size = int(size)
        return size // 10**9

    @staticmethod
    def _to_bytes(size, size_type='GB'):
        """
        convert the given size in GB to bytes
        :param size: in GBs
        :return: int representing bytes
        """
        size = int(size)
        if size_type == 'GB':
            return size * 10**9
        elif size_type == 'MB':
            return size * 10 ** 6

Anon7 - 2022
AnonSec Team