Python/asyncio
< Python
Jump to navigation
Jump to search
functions
https://docs.python.org/3/library/asyncio-task.html#running-an-asyncio-program
| function | arg | return |
|---|---|---|
| asyncio.run() | coroutine | |
| asyncio.gather() | *awaitable | coroutine |
| asyncio.sleep() | float | coroutine |
| asyncio.shield() | awaitable | awaitable |
| asyncio.wait_for() | awaitable | coroutine |
| asyncio.wait() | awaitable | coroutine |
| asyncio.to_thread() | function | coroutine |
| asyncio.run_coroutine_threadsafe() | coroutine | future |
| asyncio.create_task() | coroutine | task |
| asyncio.current_task() | task | |
| asyncio.all_tasks() | set(task) | |
| asyncio.open_connection() | ||
| asyncio.start_server() | ||
| asyncio.open_unix_connection() | ||
| asyncio.start_unix_server() | ||
| async.create_subprocess_shell() | ||
| async.create_subprocess_exec() |
Topics
- await
- Lock
- Event
- Condition
- Semaphore
- Queue
- Subprocess
- Cross loop IPC
import asyncio
async def job(n):
print('begin job #{}.'.format(n))
await asyncio.sleep(2)
return n * n
async def test_await():
# Sequentially
r = await job(1)
print(r)
r = await job(2)
print(r)
r = await job(3)
print(r)
async def test_gather():
# Run all then get all
r = await asyncio.gather(
job(1),
job(2),
job(3)
)
print(r)
async def test_create_task():
# Run first, get result later
a1 = asyncio.create_task(job(1))
a2 = asyncio.create_task(job(2))
a3 = asyncio.create_task(job(3))
r1 = await a1
print(r1)
r2 = await a2
print(r2)
r3 = await a3
print(r3)
async def main():
await test_await()
await test_gather()
await test_create_task()
asyncio.run(main())
'''
I/O test for asyncio.Queue
QSIZE = 300000
QFILL = 300000
FORCE_SWITCH = False
enqueue(): Put 100000 items.
enqueue(): Put 200000 items.
enqueue(): Put 300000 items.
dequeue(): Get 100000 items.
dequeue(): Get 200000 items.
dequeue(): Get 300000 items.
asynio.Queue put() and get() 300000 items.
0.67 seconds, 448386.61 items per second.
QSIZE = 100000
QFILL = 300000
FORCE_SWITCH = False
enqueue(): Put 100000 items.
dequeue(): Get 100000 items.
enqueue(): Put 200000 items.
dequeue(): Get 200000 items.
enqueue(): Put 300000 items.
dequeue(): Get 300000 items.
asynio.Queue put() and get() 300000 items.
0.68 seconds, 442956.25 items per second.
QSIZE = 300000
QFILL = 300000
FORCE_SWITCH = True
enqueue(): Put 100000 items.
dequeue(): Get 100000 items.
enqueue(): Put 200000 items.
dequeue(): Get 200000 items.
enqueue(): Put 300000 items.
dequeue(): Get 300000 items.
asynio.Queue put() and get() 300000 items.
0.68 seconds, 440712.40 items per second.
'''
import asyncio
import time
QSIZE = 300000
QFILL = 300000
FORCE_SWITCH = False
OUTPUT_INTERVAL = 100000
TICK = {
'security_id': '2330.TW',
'open': 599.05,
'close': 600.15,
'high': 612.05,
'low': 593.75,
'5d': 595.25,
'10d': 590.85,
'20d': 588.65,
'60d': 550.50,
'120d': 510.45,
'240d': 450.15
}
async def enqueue():
global q
i = 0
while i < QFILL:
await q.put(TICK.copy())
i += 1
if i % OUTPUT_INTERVAL == 0:
print('enqueue(): Put %d items.' % i)
if FORCE_SWITCH:
await asyncio.sleep(0)
async def dequeue():
global q
i = 0
while i < QFILL:
await q.get()
i += 1
if i % OUTPUT_INTERVAL == 0:
print('dequeue(): Get %d items.' % i)
if FORCE_SWITCH:
await asyncio.sleep(0)
async def main():
global q
q = asyncio.Queue(maxsize=QSIZE)
begin = time.time()
await asyncio.gather(dequeue(), enqueue())
elapsed = time.time() - begin
print('asynio.Queue put() and get() %d items.' % QFILL)
print('%.2f seconds, %.2f items per second.' % (elapsed, QFILL / elapsed))
if __name__ == '__main__':
asyncio.run(main())