Python/asyncio: Difference between revisions
< Python
Jump to navigation
Jump to search
No edit summary |
No edit summary |
||
| (9 intermediate revisions by the same user not shown) | |||
| Line 1: | Line 1: | ||
== functions == | |||
https://docs.python.org/3/library/asyncio-task.html#running-an-asyncio-program | |||
{| class="wikitable" | |||
! function || arg || return | |||
|- | |||
| asyncio.run() || coroutine || | |||
|- | |||
| asyncio.gather() || *awaitable || coroutine | |||
|- | |||
| asyncio.sleep() || float || coroutine | |||
|- | |||
| asyncio.shield() || awaitable || awaitable | |||
|- | |||
| asyncio.wait_for() || awaitable || coroutine | |||
|- | |||
| asyncio.wait() || awaitable || coroutine | |||
|- | |||
| asyncio.to_thread() || function || coroutine | |||
|- | |||
| asyncio.run_coroutine_threadsafe() || coroutine || future | |||
|- | |||
| asyncio.create_task() || coroutine || task | |||
|- | |||
| asyncio.current_task() || || task | |||
|- | |||
| asyncio.all_tasks() || || set(task) | |||
|- | |||
| asyncio.open_connection() || || | |||
|- | |||
| asyncio.start_server() || || | |||
|- | |||
| asyncio.open_unix_connection() || || | |||
|- | |||
| asyncio.start_unix_server() || || | |||
|- | |||
| async.create_subprocess_shell() || || | |||
|- | |||
| async.create_subprocess_exec() || || | |||
|} | |||
== Topics == | |||
* await | |||
* Lock | |||
* Event | |||
* Condition | |||
* Semaphore | |||
* Queue | |||
* Subprocess | |||
* Cross loop IPC | |||
<source lang="python3"> | <source lang="python3"> | ||
import asyncio | import asyncio | ||
| Line 43: | Line 96: | ||
asyncio.run(main()) | asyncio.run(main()) | ||
</source> | |||
<source lang="python3"> | |||
''' | |||
I/O test for asyncio.Queue | |||
QSIZE = 300000 | |||
QFILL = 300000 | |||
FORCE_SWITCH = False | |||
enqueue(): Put 100000 items. | |||
enqueue(): Put 200000 items. | |||
enqueue(): Put 300000 items. | |||
dequeue(): Get 100000 items. | |||
dequeue(): Get 200000 items. | |||
dequeue(): Get 300000 items. | |||
asynio.Queue put() and get() 300000 items. | |||
0.67 seconds, 448386.61 items per second. | |||
QSIZE = 100000 | |||
QFILL = 300000 | |||
FORCE_SWITCH = False | |||
enqueue(): Put 100000 items. | |||
dequeue(): Get 100000 items. | |||
enqueue(): Put 200000 items. | |||
dequeue(): Get 200000 items. | |||
enqueue(): Put 300000 items. | |||
dequeue(): Get 300000 items. | |||
asynio.Queue put() and get() 300000 items. | |||
0.68 seconds, 442956.25 items per second. | |||
QSIZE = 300000 | |||
QFILL = 300000 | |||
FORCE_SWITCH = True | |||
enqueue(): Put 100000 items. | |||
dequeue(): Get 100000 items. | |||
enqueue(): Put 200000 items. | |||
dequeue(): Get 200000 items. | |||
enqueue(): Put 300000 items. | |||
dequeue(): Get 300000 items. | |||
asynio.Queue put() and get() 300000 items. | |||
0.68 seconds, 440712.40 items per second. | |||
''' | |||
import asyncio | |||
import time | |||
QSIZE = 300000 | |||
QFILL = 300000 | |||
FORCE_SWITCH = False | |||
OUTPUT_INTERVAL = 100000 | |||
TICK = { | |||
'security_id': '2330.TW', | |||
'open': 599.05, | |||
'close': 600.15, | |||
'high': 612.05, | |||
'low': 593.75, | |||
'5d': 595.25, | |||
'10d': 590.85, | |||
'20d': 588.65, | |||
'60d': 550.50, | |||
'120d': 510.45, | |||
'240d': 450.15 | |||
} | |||
async def enqueue(): | |||
global q | |||
i = 0 | |||
while i < QFILL: | |||
await q.put(TICK.copy()) | |||
i += 1 | |||
if i % OUTPUT_INTERVAL == 0: | |||
print('enqueue(): Put %d items.' % i) | |||
if FORCE_SWITCH: | |||
await asyncio.sleep(0) | |||
async def dequeue(): | |||
global q | |||
i = 0 | |||
while i < QFILL: | |||
await q.get() | |||
i += 1 | |||
if i % OUTPUT_INTERVAL == 0: | |||
print('dequeue(): Get %d items.' % i) | |||
if FORCE_SWITCH: | |||
await asyncio.sleep(0) | |||
async def main(): | |||
global q | |||
q = asyncio.Queue(maxsize=QSIZE) | |||
begin = time.time() | |||
await asyncio.gather(dequeue(), enqueue()) | |||
elapsed = time.time() - begin | |||
print('asynio.Queue put() and get() %d items.' % QFILL) | |||
print('%.2f seconds, %.2f items per second.' % (elapsed, QFILL / elapsed)) | |||
if __name__ == '__main__': | |||
asyncio.run(main()) | |||
</source> | </source> | ||
Latest revision as of 08:40, 21 June 2021
functions
https://docs.python.org/3/library/asyncio-task.html#running-an-asyncio-program
| function | arg | return |
|---|---|---|
| asyncio.run() | coroutine | |
| asyncio.gather() | *awaitable | coroutine |
| asyncio.sleep() | float | coroutine |
| asyncio.shield() | awaitable | awaitable |
| asyncio.wait_for() | awaitable | coroutine |
| asyncio.wait() | awaitable | coroutine |
| asyncio.to_thread() | function | coroutine |
| asyncio.run_coroutine_threadsafe() | coroutine | future |
| asyncio.create_task() | coroutine | task |
| asyncio.current_task() | task | |
| asyncio.all_tasks() | set(task) | |
| asyncio.open_connection() | ||
| asyncio.start_server() | ||
| asyncio.open_unix_connection() | ||
| asyncio.start_unix_server() | ||
| async.create_subprocess_shell() | ||
| async.create_subprocess_exec() |
Topics
- await
- Lock
- Event
- Condition
- Semaphore
- Queue
- Subprocess
- Cross loop IPC
import asyncio
async def job(n):
print('begin job #{}.'.format(n))
await asyncio.sleep(2)
return n * n
async def test_await():
# Sequentially
r = await job(1)
print(r)
r = await job(2)
print(r)
r = await job(3)
print(r)
async def test_gather():
# Run all then get all
r = await asyncio.gather(
job(1),
job(2),
job(3)
)
print(r)
async def test_create_task():
# Run first, get result later
a1 = asyncio.create_task(job(1))
a2 = asyncio.create_task(job(2))
a3 = asyncio.create_task(job(3))
r1 = await a1
print(r1)
r2 = await a2
print(r2)
r3 = await a3
print(r3)
async def main():
await test_await()
await test_gather()
await test_create_task()
asyncio.run(main())
'''
I/O test for asyncio.Queue
QSIZE = 300000
QFILL = 300000
FORCE_SWITCH = False
enqueue(): Put 100000 items.
enqueue(): Put 200000 items.
enqueue(): Put 300000 items.
dequeue(): Get 100000 items.
dequeue(): Get 200000 items.
dequeue(): Get 300000 items.
asynio.Queue put() and get() 300000 items.
0.67 seconds, 448386.61 items per second.
QSIZE = 100000
QFILL = 300000
FORCE_SWITCH = False
enqueue(): Put 100000 items.
dequeue(): Get 100000 items.
enqueue(): Put 200000 items.
dequeue(): Get 200000 items.
enqueue(): Put 300000 items.
dequeue(): Get 300000 items.
asynio.Queue put() and get() 300000 items.
0.68 seconds, 442956.25 items per second.
QSIZE = 300000
QFILL = 300000
FORCE_SWITCH = True
enqueue(): Put 100000 items.
dequeue(): Get 100000 items.
enqueue(): Put 200000 items.
dequeue(): Get 200000 items.
enqueue(): Put 300000 items.
dequeue(): Get 300000 items.
asynio.Queue put() and get() 300000 items.
0.68 seconds, 440712.40 items per second.
'''
import asyncio
import time
QSIZE = 300000
QFILL = 300000
FORCE_SWITCH = False
OUTPUT_INTERVAL = 100000
TICK = {
'security_id': '2330.TW',
'open': 599.05,
'close': 600.15,
'high': 612.05,
'low': 593.75,
'5d': 595.25,
'10d': 590.85,
'20d': 588.65,
'60d': 550.50,
'120d': 510.45,
'240d': 450.15
}
async def enqueue():
global q
i = 0
while i < QFILL:
await q.put(TICK.copy())
i += 1
if i % OUTPUT_INTERVAL == 0:
print('enqueue(): Put %d items.' % i)
if FORCE_SWITCH:
await asyncio.sleep(0)
async def dequeue():
global q
i = 0
while i < QFILL:
await q.get()
i += 1
if i % OUTPUT_INTERVAL == 0:
print('dequeue(): Get %d items.' % i)
if FORCE_SWITCH:
await asyncio.sleep(0)
async def main():
global q
q = asyncio.Queue(maxsize=QSIZE)
begin = time.time()
await asyncio.gather(dequeue(), enqueue())
elapsed = time.time() - begin
print('asynio.Queue put() and get() %d items.' % QFILL)
print('%.2f seconds, %.2f items per second.' % (elapsed, QFILL / elapsed))
if __name__ == '__main__':
asyncio.run(main())