| Server IP : 85.214.239.14 / Your IP : 216.73.216.26 Web Server : Apache/2.4.65 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 8.2.29 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/cwd/proc/3/cwd/srv/modoboa/env/lib64/python3.5/site-packages/django/db/ |
Upload File : |
from django.core import signals
from django.db.utils import (
DEFAULT_DB_ALIAS, DJANGO_VERSION_PICKLE_KEY, ConnectionHandler,
ConnectionRouter, DatabaseError, DataError, Error, IntegrityError,
InterfaceError, InternalError, NotSupportedError, OperationalError,
ProgrammingError,
)
__all__ = [
'connection', 'connections', 'router', 'DatabaseError', 'IntegrityError',
'InternalError', 'ProgrammingError', 'DataError', 'NotSupportedError',
'Error', 'InterfaceError', 'OperationalError', 'DEFAULT_DB_ALIAS',
'DJANGO_VERSION_PICKLE_KEY',
]
connections = ConnectionHandler()
router = ConnectionRouter()
class DefaultConnectionProxy:
"""
Proxy for accessing the default DatabaseWrapper object's attributes. If you
need to access the DatabaseWrapper object itself, use
connections[DEFAULT_DB_ALIAS] instead.
"""
def __getattr__(self, item):
return getattr(connections[DEFAULT_DB_ALIAS], item)
def __setattr__(self, name, value):
return setattr(connections[DEFAULT_DB_ALIAS], name, value)
def __delattr__(self, name):
return delattr(connections[DEFAULT_DB_ALIAS], name)
def __eq__(self, other):
return connections[DEFAULT_DB_ALIAS] == other
# For backwards compatibility. Prefer connections['default'] instead.
connection = DefaultConnectionProxy()
# Register an event to reset saved queries when a Django request is started.
def reset_queries(**kwargs):
for conn in connections.all():
conn.queries_log.clear()
signals.request_started.connect(reset_queries)
# Register an event to reset transaction state and close connections past
# their lifetime.
def close_old_connections(**kwargs):
for conn in connections.all():
conn.close_if_unusable_or_obsolete()
signals.request_started.connect(close_old_connections)
signals.request_finished.connect(close_old_connections)