Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.223.210.196
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/share/doc/python3-dnspython/examples/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/share/doc/python3-dnspython/examples/doq.py
import asyncio
import threading

import dns.asyncbackend
import dns.asyncquery
import dns.message
import dns.query
import dns.quic
import dns.rdatatype

try:
    import trio

    have_trio = True
except ImportError:
    have_trio = False

# This demo assumes you have the aioquic example doq_server.py running on localhost
# on port 4784 on localhost.
peer_address = "127.0.0.1"
peer_port = 4784
query_name = "www.dnspython.org"
tls_verify_mode = False


def squery(rdtype="A", connection=None):
    q = dns.message.make_query(query_name, rdtype)
    r = dns.query.quic(
        q, peer_address, port=peer_port, connection=connection, verify=tls_verify_mode
    )
    print(r)


def srun():
    squery()


def smultirun():
    with dns.quic.SyncQuicManager(verify_mode=tls_verify_mode) as manager:
        connection = manager.connect(peer_address, peer_port)
        t1 = threading.Thread(target=squery, args=["A", connection])
        t1.start()
        t2 = threading.Thread(target=squery, args=["AAAA", connection])
        t2.start()
        t1.join()
        t2.join()


async def aquery(rdtype="A", connection=None):
    q = dns.message.make_query(query_name, rdtype)
    r = await dns.asyncquery.quic(
        q, peer_address, port=peer_port, connection=connection, verify=tls_verify_mode
    )
    print(r)


def arun():
    asyncio.run(aquery())


async def amulti():
    async with dns.quic.AsyncioQuicManager(verify_mode=tls_verify_mode) as manager:
        connection = manager.connect(peer_address, peer_port)
        t1 = asyncio.Task(aquery("A", connection))
        t2 = asyncio.Task(aquery("AAAA", connection))
        await t1
        await t2


def amultirun():
    asyncio.run(amulti())


if have_trio:

    def trun():
        trio.run(aquery)

    async def tmulti():
        async with trio.open_nursery() as nursery:
            async with dns.quic.TrioQuicManager(
                nursery, verify_mode=tls_verify_mode
            ) as manager:
                async with trio.open_nursery() as query_nursery:
                    # We run queries in a separate nursery so we can demonstrate
                    # waiting for them all to exit without waiting for the manager to
                    # exit as well.
                    connection = manager.connect(peer_address, peer_port)
                    query_nursery.start_soon(aquery, "A", connection)
                    query_nursery.start_soon(aquery, "AAAA", connection)

    def tmultirun():
        trio.run(tmulti)


def main():
    print("*** Single Queries ***")
    print("--- Sync ---")
    srun()
    print("--- Asyncio ---")
    dns.asyncbackend.set_default_backend("asyncio")
    arun()
    if have_trio:
        print("--- Trio ---")
        dns.asyncbackend.set_default_backend("trio")
        trun()
    print("*** Multi-connection Queries ***")
    print("--- Sync ---")
    smultirun()
    print("--- Asyncio ---")
    dns.asyncbackend.set_default_backend("asyncio")
    amultirun()
    if have_trio:
        print("--- Trio ---")
        dns.asyncbackend.set_default_backend("trio")
        tmultirun()


if __name__ == "__main__":
    main()

Anon7 - 2022
AnonSec Team