Server IP : 85.214.239.14 / Your IP : 18.118.138.241 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/2/cwd/proc/2/root/proc/2/cwd/proc/3/cwd/lib/python3/dist-packages/libcloud/common/ |
Upload File : |
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from libcloud.common.base import ConnectionKey, JsonResponse import json import base64 import requests import os import time # normally we only use itsyou.online but you might want to work # against staging.itsyou.online for some testing IYO_URL = os.environ.get("IYO_URL", "https://itsyou.online") class G8Connection(ConnectionKey): """ Connection class for G8 """ responseCls = JsonResponse def add_default_headers(self, headers): """ Add headers that are necessary for every request """ self.driver.key = maybe_update_jwt(self.driver.key) headers['Authorization'] = "bearer {}".format(self.driver.key) headers['Content-Type'] = 'application/json' return headers def base64url_decode(input): """ Helper method to base64url_decode a string. :param input: Input to decode :type input: str :rtype: str """ rem = len(input) % 4 if rem > 0: input += b'=' * (4 - rem) return base64.urlsafe_b64decode(input) def is_jwt_expired(jwt): """ Check if jwt is expired :param jwt: jwt token to validate expiration :type jwt: str :rtype: bool """ jwt = jwt.encode('utf-8') signing_input, _ = jwt.rsplit(b'.', 1) _, claims_segment = signing_input.split(b'.', 1) claimsdata = base64url_decode(claims_segment) if isinstance(claimsdata, bytes): claimsdata = claimsdata.decode('utf-8') data = json.loads(claimsdata) # check if it's about to expire in the next minute return data['exp'] < time.time() + 60 def maybe_update_jwt(jwt): """ Update jwt if it is expired :param jwt: jwt token to validate expiration :type jwt: str :rtype: str """ if is_jwt_expired(jwt): return refresh_jwt(jwt) return jwt def refresh_jwt(jwt): """ Refresh jwt :param jwt: jwt token to refresh :type jwt: str :rtype: str """ url = IYO_URL + "/v1/oauth/jwt/refresh" headers = {"Authorization": "bearer {}".format(jwt)} response = requests.get(url, headers=headers) response.raise_for_status() return response.text