Server IP : 85.214.239.14 / Your IP : 18.188.41.251 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/2/cwd/proc/2/root/usr/lib/python3/dist-packages/libcloud/common/ |
Upload File : |
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Module which contains common Kubernetes related code. """ from typing import Optional import os import base64 import warnings from libcloud.utils.py3 import httplib from libcloud.utils.py3 import b from libcloud.common.base import JsonResponse, ConnectionUserAndKey from libcloud.common.base import KeyCertificateConnection, ConnectionKey from libcloud.common.types import InvalidCredsError __all__ = [ 'KubernetesException', 'KubernetesBasicAuthConnection', 'KubernetesTLSAuthConnection', 'KubernetesTokenAuthConnection', 'KubernetesDriverMixin', 'VALID_RESPONSE_CODES' ] VALID_RESPONSE_CODES = [httplib.OK, httplib.ACCEPTED, httplib.CREATED, httplib.NO_CONTENT] class KubernetesException(Exception): def __init__(self, code, message): self.code = code self.message = message self.args = (code, message) def __str__(self): return "%s %s" % (self.code, self.message) def __repr__(self): return "KubernetesException %s %s" % (self.code, self.message) class KubernetesResponse(JsonResponse): valid_response_codes = [httplib.OK, httplib.ACCEPTED, httplib.CREATED, httplib.NO_CONTENT] def parse_error(self): if self.status == 401: raise InvalidCredsError('Invalid credentials') return self.body def success(self): return self.status in self.valid_response_codes class KubernetesTLSAuthConnection(KeyCertificateConnection): responseCls = KubernetesResponse timeout = 60 def __init__(self, key, secure=True, host='localhost', port='6443', key_file=None, cert_file=None, **kwargs): super(KubernetesTLSAuthConnection, self).__init__( key_file=key_file, cert_file=cert_file, secure=secure, host=host, port=port, url=None, proxy_url=None, timeout=None, backoff=None, retry_delay=None) if key_file: keypath = os.path.expanduser(key_file) is_file_path = os.path.exists(keypath) and os.path.isfile(keypath) if not is_file_path: raise InvalidCredsError( 'You need an key PEM file to authenticate ' 'via tls. For more info please visit:' 'https://kubernetes.io/docs/concepts/' 'cluster-administration/certificates/') self.key_file = key_file certpath = os.path.expanduser(cert_file) is_file_path = os.path.exists( certpath) and os.path.isfile(certpath) if not is_file_path: raise InvalidCredsError( 'You need an certificate PEM file to authenticate ' 'via tls. For more info please visit:' 'https://kubernetes.io/docs/concepts/' 'cluster-administration/certificates/' ) self.cert_file = cert_file def add_default_headers(self, headers): if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' return headers class KubernetesTokenAuthConnection(ConnectionKey): responseCls = KubernetesResponse timeout = 60 def add_default_headers(self, headers): if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' if self.key: headers['Authorization'] = 'Bearer ' + self.key else: raise ValueError("Please provide a valid token in the key param") return headers class KubernetesBasicAuthConnection(ConnectionUserAndKey): responseCls = KubernetesResponse timeout = 60 def add_default_headers(self, headers): """ Add parameters that are necessary for every request If user and password are specified, include a base http auth header """ if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' if self.user_id and self.key: auth_string = b('%s:%s' % (self.user_id, self.key)) user_b64 = base64.b64encode(auth_string) headers['Authorization'] = 'Basic %s' % (user_b64.decode('utf-8')) return headers class KubernetesDriverMixin(object): """ Base driver class to be used with various Kubernetes drivers. NOTE: This base class can be used in different APIs such as container and compute one. """ def __init__(self, key=None, secret=None, secure=False, host='localhost', port=4243, key_file=None, cert_file=None, ca_cert=None, ex_token_bearer_auth=False): """ :param key: API key or username to be used (required) :type key: ``str`` :param secret: Secret password to be used (required) :type secret: ``str`` :param secure: Whether to use HTTPS or HTTP. Note: Some providers only support HTTPS, and it is on by default. :type secure: ``bool`` :param host: Override hostname used for connections. :type host: ``str`` :param port: Override port used for connections. :type port: ``int`` :param key_file: Path to the key file used to authenticate (when using key file auth). :type key_file: ``str`` :param cert_file: Path to the cert file used to authenticate (when using key file auth). :type cert_file: ``str`` :param ex_token_bearer_auth: True to use token bearer auth. :type ex_token_bearer_auth: ``bool`` :return: ``None`` """ if ex_token_bearer_auth: self.connectionCls = KubernetesTokenAuthConnection if not key: msg = 'The token must be a string provided via "key" argument' raise ValueError(msg) secure = True if key_file or cert_file: # Certificate based auth is used if not (key_file and cert_file): raise ValueError("Both key and certificate files are needed") if key_file: self.connectionCls = KubernetesTLSAuthConnection self.key_file = key_file self.cert_file = cert_file secure = True if host and host.startswith('https://'): secure = True host = self._santize_host(host=host) super(KubernetesDriverMixin, self).__init__(key=key, secret=secret, secure=secure, host=host, port=port, key_file=key_file, cert_file=cert_file) if ca_cert: self.connection.connection.ca_cert = ca_cert else: # do not verify SSL certificate warnings.warn("Kubernetes has its own CA, since you didn't supply " "a CA certificate be aware that SSL verification " "will be disabled for this session.") self.connection.connection.ca_cert = False self.connection.secure = secure self.connection.host = host if port is not None: self.connection.port = port def _ex_connection_class_kwargs(self): kwargs = {} if hasattr(self, 'key_file'): kwargs['key_file'] = self.key_file if hasattr(self, 'cert_file'): kwargs['cert_file'] = self.cert_file return kwargs def _santize_host(self, host=None): # type: (Optional[str]) -> Optional[str] """ Sanitize "host" argument any remove any protocol prefix (if specified). """ if not host: return None prefixes = ['http://', 'https://'] for prefix in prefixes: if host.startswith(prefix): host = host.lstrip(prefix) return host