Python/asyncio

From Fundamental Ramen
Jump to navigation Jump to search

functions

https://docs.python.org/3/library/asyncio-task.html#running-an-asyncio-program

function arg return
asyncio.run() coroutine
asyncio.gather() *awaitable coroutine
asyncio.sleep() float coroutine
asyncio.shield() awaitable awaitable
asyncio.wait_for() awaitable coroutine
asyncio.wait() awaitable coroutine
asyncio.to_thread() function coroutine
asyncio.run_coroutine_threadsafe() coroutine future
asyncio.create_task() coroutine task
asyncio.current_task() task
asyncio.all_tasks() set(task)
asyncio.open_connection()
asyncio.start_server()
asyncio.open_unix_connection()
asyncio.start_unix_server()
async.create_subprocess_shell()
async.create_subprocess_exec()

Topics

  • await
  • Lock
  • Event
  • Condition
  • Semaphore
  • Queue
  • Subprocess
  • Cross loop IPC
import asyncio

async def job(n):
    print('begin job #{}.'.format(n))
    await asyncio.sleep(2)
    return n * n

async def test_await():
    # Sequentially
    r = await job(1)
    print(r)
    r = await job(2)
    print(r)
    r = await job(3)
    print(r)

async def test_gather():
    # Run all then get all
    r = await asyncio.gather(
      job(1),
      job(2),
      job(3)
    )
    print(r)

async def test_create_task():
    # Run first, get result later
    a1 = asyncio.create_task(job(1))
    a2 = asyncio.create_task(job(2))
    a3 = asyncio.create_task(job(3))
    r1 = await a1
    print(r1)
    r2 = await a2
    print(r2)
    r3 = await a3
    print(r3)

async def main():
    await test_await()
    await test_gather()
    await test_create_task()

asyncio.run(main())
'''
I/O test for asyncio.Queue

QSIZE = 300000
QFILL = 300000
FORCE_SWITCH = False

enqueue(): Put 100000 items.
enqueue(): Put 200000 items.
enqueue(): Put 300000 items.
dequeue(): Get 100000 items.
dequeue(): Get 200000 items.
dequeue(): Get 300000 items.
asynio.Queue put() and get() 300000 items.
0.67 seconds, 448386.61 items per second.

QSIZE = 100000
QFILL = 300000
FORCE_SWITCH = False

enqueue(): Put 100000 items.
dequeue(): Get 100000 items.
enqueue(): Put 200000 items.
dequeue(): Get 200000 items.
enqueue(): Put 300000 items.
dequeue(): Get 300000 items.
asynio.Queue put() and get() 300000 items.
0.68 seconds, 442956.25 items per second.

QSIZE = 300000
QFILL = 300000
FORCE_SWITCH = True

enqueue(): Put 100000 items.
dequeue(): Get 100000 items.
enqueue(): Put 200000 items.
dequeue(): Get 200000 items.
enqueue(): Put 300000 items.
dequeue(): Get 300000 items.
asynio.Queue put() and get() 300000 items.
0.68 seconds, 440712.40 items per second.
'''

import asyncio
import time

QSIZE = 300000
QFILL = 300000
FORCE_SWITCH = False
OUTPUT_INTERVAL = 100000
TICK = {
    'security_id': '2330.TW',
    'open': 599.05,
    'close': 600.15,
    'high': 612.05,
    'low': 593.75,
    '5d': 595.25,
    '10d': 590.85,
    '20d': 588.65,
    '60d': 550.50,
    '120d': 510.45,
    '240d': 450.15
}

async def enqueue():
    global q
    i = 0
    while i < QFILL:
        await q.put(TICK.copy())
        i += 1
        if i % OUTPUT_INTERVAL == 0:
            print('enqueue(): Put %d items.' % i)
            if FORCE_SWITCH:
                await asyncio.sleep(0)

async def dequeue():
    global q
    i = 0
    while i < QFILL:
        await q.get()
        i += 1
        if i % OUTPUT_INTERVAL == 0:
            print('dequeue(): Get %d items.' % i)
            if FORCE_SWITCH:
                await asyncio.sleep(0)

async def main():
    global q
    q = asyncio.Queue(maxsize=QSIZE)
    begin = time.time()
    await asyncio.gather(dequeue(), enqueue())
    elapsed = time.time() - begin
    print('asynio.Queue put() and get() %d items.' % QFILL)
    print('%.2f seconds, %.2f items per second.' % (elapsed, QFILL / elapsed))

if __name__ == '__main__':
    asyncio.run(main())