[Python進階]Python的coroutine

Coroutine(協程)和我們常聽到的 multithread(多執行緒)、multiprocess(多進程)一樣,都是一種併發(concurrent)方式。需要注意的是,併發和併行(parallel)不同,併發是快速的上下文切換,讓使用者感覺程式同時在執行,而併行是指不同 CPU 同時執行多個任務。

隨著互聯網的快速發展,我們遇到了所謂的 C10k 問題,也就是同時連接到伺服器的客戶端數量達到一萬個,這會造成進程上下文切換佔用了大量資源,即使是執行緒也難以承受如此巨大的壓力。於是 Coroutine 登場了。

很多人說 Coroutine 是輕量化的 Thread,那麼 Coroutine 和 Thread 有什麼不同呢?最大的差別在於 Thread 是搶佔式多工,而 Coroutine 是協同式多工。

搶佔式多工

程式有各自的優先權,作業系統會根據程式的優先權安排當下哪個程式能擁有 CPU 資源去執行,另外作業系統有權中斷任何正在執行中的程式。

協同式多工

程式會定時放棄已佔有的執行資源讓其他程式執行。由於是由程式自己讓出執行資源,不需要由底層的作業系統來處理,所以 Coroutine 交替時所產生的上下文切換負擔比 Thread 小。

接下來,我們給個範例,看看 Python 的 Coroutine 怎麼寫。Python 的 Coroutine library 是 asyncio,需要跑在 Python 3.7 以上的環境。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio
import time

async def loading(sec):
print(f'loading... needs {sec} secs to load')
await asyncio.sleep(sec)
print(f'OK {sec}')

async def main(secs):
for sec in secs:
await loading(sec)

asyncio.run(main([5, 3]))
# Output
# loading... needs 5 secs to load
# OK 5
# loading... needs 3 secs to load
# OK 3

async 代表這個函數是非同步函數,意思是會順序執行的。然後來說說執行的方法,執行有三種方式:

  1. 透過 await 來呼叫
    • await 的執行效果和 Python 平常運行程式的效果一樣,會阻塞在這裡,執行完後再繼續。
  2. 透過 asyncio.create_task() 來創建任務
  3. 透過 asyncio.run() 來執行,不用去理會 Event loop 怎麼運行的

上面的程式碼順序的跑了 10 秒是正常的,因為 await 會阻塞。接下來我們要來講 Coroutine 一個重要的概念——Task。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
async def loading(sec):
print(f'loading... needs {sec} secs to load')
await asyncio.sleep(sec)
print(f'OK {sec}')

async def main(secs):
tasks = [asyncio.create_task(loading(sec)) for sec in secs]
for task in tasks:
await task

asyncio.run(main([5, 3]))
# Output:
# loading... needs 5 secs to load
# loading... needs 3 secs to load
# OK 3
# OK 5

我們可以看到,當 Task 被 create 之後就會開始執行,然後我們對每個 Task 加了 await,等所有 Task 都執行完後才能繼續下一步,或是可以看我註解掉的那一行 await asyncio.gather(*tasks),也能達到同樣的目的。

Coroutine runtime

接下來我們來解析 Coroutine 的 runtime 和背後的邏輯,我們一樣用上面的例子,步驟有點多,我們慢慢來:

  1. asyncio.run(main()),程式進入 main() 函數,同時也開啟了 Event loop
  2. 兩個 Task 都被創建,進入 Event loop 等待被執行,執行到 print,輸出 loading...
  3. await 五秒的 Task,Event loop 開始調度 worker1
  4. worker1 開始執行,遇到 await.sleep,從現在的 Task 切出,Event loop 開始調度 worker2
  5. worker2 開始執行,一樣遇到 await.sleep,從現在的 Task 切出
  6. 以上的時間都非常快,由於兩個都在 sleep,所以 Event loop 暫停調度
  7. 3 秒鐘後 worker2 的 sleep 完成,Event loop 將控制權交給三秒的 Task,printOK 3,Task 完成,從 Event loop 退出
  8. 再兩秒鐘後,同理 worker1 的 sleep 完成,print 完後,從 Event loop 退出
  9. Coroutine 全部結束

接下來我們進階一下,如果想給 Coroutine 任務限定執行時間,一但超時就取消,該怎麼做呢?以及如果 runtime 發生錯誤,又該怎麼處理?我們來看看下面的程式碼:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio

async def worker1():
await asyncio.sleep(1)
return 1

async def worker2():
await asyncio.sleep(2)
return 2 / 0

async def worker3():
await asyncio.sleep(3)
return 3

async def main():
task_1 = asyncio.create_task(worker1())
task_2 = asyncio.create_task(worker2())
task_3 = asyncio.create_task(worker3())

await asyncio.sleep(2)
task_3.cancel()

res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)
print(res)

asyncio.run(main())
# Output
# [1, ZeroDivisionError('division by zero'), CancelledError()]

我們可以看到,worker1 正常執行,worker2 執行中遇到錯誤,worker3 執行太久被我們取消了。到這裡,其實 Thread 能做的 Coroutine 也能做。

總結

本篇講解了 Python Coroutine 的基本概念和用法,這裡簡單總結一下:

  • Coroutine 和 Thread 的差別主要是 Coroutine 只能用一個 CPU core,以及 Coroutine 是程式決定什麼時候要切換任務。
  • Coroutine 的寫法更加簡潔清晰,滿足中小級別的併發需求。
  • 寫 Coroutine 的時候,腦海要有清晰的 Event loop 概念,知道什麼時候需要暫停、等待 I/O,什麼時候可以執行到底。