Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.117.145.41
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/libcloud/common/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/libcloud/common/kubernetes.py
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Module which contains common Kubernetes related code.
"""

from typing import Optional

import os
import base64
import warnings

from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import b

from libcloud.common.base import JsonResponse, ConnectionUserAndKey
from libcloud.common.base import KeyCertificateConnection, ConnectionKey
from libcloud.common.types import InvalidCredsError

__all__ = [
    'KubernetesException',

    'KubernetesBasicAuthConnection',
    'KubernetesTLSAuthConnection',
    'KubernetesTokenAuthConnection',

    'KubernetesDriverMixin',

    'VALID_RESPONSE_CODES'
]

VALID_RESPONSE_CODES = [httplib.OK, httplib.ACCEPTED, httplib.CREATED,
                        httplib.NO_CONTENT]


class KubernetesException(Exception):

    def __init__(self, code, message):
        self.code = code
        self.message = message
        self.args = (code, message)

    def __str__(self):
        return "%s %s" % (self.code, self.message)

    def __repr__(self):
        return "KubernetesException %s %s" % (self.code, self.message)


class KubernetesResponse(JsonResponse):

    valid_response_codes = [httplib.OK, httplib.ACCEPTED, httplib.CREATED,
                            httplib.NO_CONTENT]

    def parse_error(self):
        if self.status == 401:
            raise InvalidCredsError('Invalid credentials')
        return self.body

    def success(self):
        return self.status in self.valid_response_codes


class KubernetesTLSAuthConnection(KeyCertificateConnection):
    responseCls = KubernetesResponse
    timeout = 60

    def __init__(self, key, secure=True, host='localhost',
                 port='6443', key_file=None, cert_file=None,
                 **kwargs):

        super(KubernetesTLSAuthConnection, self).__init__(
            key_file=key_file,
            cert_file=cert_file,
            secure=secure, host=host,
            port=port, url=None,
            proxy_url=None,
            timeout=None,
            backoff=None,
            retry_delay=None)

        if key_file:
            keypath = os.path.expanduser(key_file)
            is_file_path = os.path.exists(keypath) and os.path.isfile(keypath)

            if not is_file_path:
                raise InvalidCredsError(
                    'You need an key PEM file to authenticate '
                    'via tls. For more info please visit:'
                    'https://kubernetes.io/docs/concepts/'
                    'cluster-administration/certificates/')

            self.key_file = key_file

            certpath = os.path.expanduser(cert_file)
            is_file_path = os.path.exists(
                certpath) and os.path.isfile(certpath)

            if not is_file_path:
                raise InvalidCredsError(
                    'You need an certificate PEM file to authenticate '
                    'via tls. For more info please visit:'
                    'https://kubernetes.io/docs/concepts/'
                    'cluster-administration/certificates/'
                )

            self.cert_file = cert_file

    def add_default_headers(self, headers):
        if 'Content-Type' not in headers:
            headers['Content-Type'] = 'application/json'
        return headers


class KubernetesTokenAuthConnection(ConnectionKey):
    responseCls = KubernetesResponse
    timeout = 60

    def add_default_headers(self, headers):
        if 'Content-Type' not in headers:
            headers['Content-Type'] = 'application/json'
        if self.key:
            headers['Authorization'] = 'Bearer ' + self.key
        else:
            raise ValueError("Please provide a valid token in the key param")
        return headers


class KubernetesBasicAuthConnection(ConnectionUserAndKey):
    responseCls = KubernetesResponse
    timeout = 60

    def add_default_headers(self, headers):
        """
        Add parameters that are necessary for every request
        If user and password are specified, include a base http auth
        header
        """
        if 'Content-Type' not in headers:
            headers['Content-Type'] = 'application/json'

        if self.user_id and self.key:
            auth_string = b('%s:%s' % (self.user_id, self.key))
            user_b64 = base64.b64encode(auth_string)
            headers['Authorization'] = 'Basic %s' % (user_b64.decode('utf-8'))
        return headers


class KubernetesDriverMixin(object):
    """
    Base driver class to be used with various Kubernetes drivers.

    NOTE: This base class can be used in different APIs such as container and
    compute one.
    """

    def __init__(self, key=None, secret=None, secure=False, host='localhost',
                 port=4243, key_file=None, cert_file=None, ca_cert=None,
                 ex_token_bearer_auth=False):
        """
        :param    key: API key or username to be used (required)
        :type     key: ``str``

        :param    secret: Secret password to be used (required)
        :type     secret: ``str``

        :param    secure: Whether to use HTTPS or HTTP. Note: Some providers
                          only support HTTPS, and it is on by default.
        :type     secure: ``bool``

        :param    host: Override hostname used for connections.
        :type     host: ``str``

        :param    port: Override port used for connections.
        :type     port: ``int``

        :param    key_file: Path to the key file used to authenticate (when
                            using key file auth).
        :type     key_file: ``str``

        :param    cert_file: Path to the cert file used to authenticate (when
                             using key file auth).
        :type     cert_file: ``str``

        :param    ex_token_bearer_auth: True to use token bearer auth.
        :type     ex_token_bearer_auth: ``bool``

        :return: ``None``
        """
        if ex_token_bearer_auth:
            self.connectionCls = KubernetesTokenAuthConnection
            if not key:
                msg = 'The token must be a string provided via "key" argument'
                raise ValueError(msg)
            secure = True

        if key_file or cert_file:
            # Certificate based auth is used
            if not (key_file and cert_file):
                raise ValueError("Both key and certificate files are needed")

        if key_file:
            self.connectionCls = KubernetesTLSAuthConnection
            self.key_file = key_file
            self.cert_file = cert_file
            secure = True

        if host and host.startswith('https://'):
            secure = True

        host = self._santize_host(host=host)

        super(KubernetesDriverMixin, self).__init__(key=key, secret=secret,
                                                    secure=secure,
                                                    host=host,
                                                    port=port,
                                                    key_file=key_file,
                                                    cert_file=cert_file)

        if ca_cert:
            self.connection.connection.ca_cert = ca_cert
        else:
            # do not verify SSL certificate
            warnings.warn("Kubernetes has its own CA, since you didn't supply "
                          "a CA certificate be aware that SSL verification "
                          "will be disabled for this session.")
            self.connection.connection.ca_cert = False

        self.connection.secure = secure
        self.connection.host = host

        if port is not None:
            self.connection.port = port

    def _ex_connection_class_kwargs(self):
        kwargs = {}
        if hasattr(self, 'key_file'):
            kwargs['key_file'] = self.key_file
        if hasattr(self, 'cert_file'):
            kwargs['cert_file'] = self.cert_file
        return kwargs

    def _santize_host(self, host=None):
        # type: (Optional[str]) -> Optional[str]
        """
        Sanitize "host" argument any remove any protocol prefix (if specified).
        """
        if not host:
            return None

        prefixes = ['http://', 'https://']
        for prefix in prefixes:
            if host.startswith(prefix):
                host = host.lstrip(prefix)

        return host

Anon7 - 2022
AnonSec Team