理解 Python 中的协程

asyncio 是 Python 下开发 I/O 密集型应用的最佳选择,而使用 asyncio 又离不开协程

什么是协程

协程是一种可以中断运行,在某个时间点再恢复运行的程序组件。这个定义和协程的字面意思一点关系没有,刚接触时,理解起来是十分晦涩的。我们先看其英文 coroutine 会更好理解一些,“co-“ 这个前缀有”一起,共同”的意思,比如 co-author 表示合著一本书的作者,coexist 表示共存。类似的我们可以把 coroutine 理解为共同,协作运行的程序组件。

如何理解这个协作运行呢?我们常用的函数不也是在协作运行吗?我们一般所说的函数,也就是 subroutine(子程序)也是一种程序组件。我们已经很熟悉函数的工作方式,函数总是有且只有一个入口,一旦从入口开始执行,便会一直执行到函数结束。现代计算机是以线程为单位来调度任务的,我们执行一个任务,通常先创建一个进程,每个进程会有一个主线程,主线程从 main 函数开始执行。我们也可以为进程创建新线程,每个线程都需要指定一个函数作为入口函数,线程启动后便会开始执行这个函数。一个线程开始执行某个函数后,便会一直执行到它结束 ,如果中间需要调用其它函数,也是在这个函数的Context 之上,再压新栈去执行其它函数。

协程与 subroutine 不同的是,协程在执行过程中可以暂停,把线程让出(yield)给其它代码去执行,然后在某个时间点再恢复执行,这种 yield 不是调用关系而是切换,线程从一个协程的 Context 切换到了另一个协程的 Context。在这种情况下,多个协程便可以在同一线程下并发执行。也就是说,协程的暂停后再恢复机制,是为了能够让出线程的使用权,而让出线程的使用权后,就可以在当前线程去执行其它协程,也就实现了多个协程共同运行。通常也会认为 subroutine 是 coroutine 的一个特例,subroutine 是在执行中不会让出 (yeild) 的 coroutine。

以 Python 中的 sleep 为例,我们让代码睡眠 5 秒钟后继续执行,我们在函数中调用 time.sleep(5) 时,当前函数停止执行了,当前线程也是停住的,不会执行其它代码。而在协程中调用 asyncio.sleep(5) 时,当前协程停止执行了,但是当前线程仍然会继续运行,去执行其它代码,5 秒钟后当前协程自动恢复执行。需要注意的是,我们在函数中调用的是 time.sleep(5),而在协程中调用的是 asyncio.sleep(5),asyncio.sleep 是协程版本的 sleep 函数。如果在协程中调用 time.sleep(5),协程停住的5秒钟中,线程同样也是停住的,必须调用协程版本的 asyncio.sleep 才能实现协程的效果。

协程的优势

协程这种可以执行中让出线程的特性具有十分巨大的优势,特别是在I/O应用中。

绝大部分应用在运行中都离不开 I/O 操作,比如网络I/O,磁盘I/O等。Linux 提供了 5 种 I/O 模型,有阻塞式,非阻塞式,I/O 多路复用,信号驱动 I/O,异步 I/O 等,在此基础上,针对不同的应用场景又有多种不同的开发范式。我们以 Web 服务器为例,简单介绍两种常见的实现方法。

Socket 编程将网络通信的两个端点抽象为 两个 Socket。Socket 可以工作在阻塞模式或非阻塞模式下,阻塞模式的意思是 I/O 函数在调用时是阻塞的,例如 调用Socket 的 read 函数,在读到数据或者异常之前一直不会返回,函数不返回就意味着对应的线程是停住的。为了能支持多个 Socket 同时通信,必须为每个 Socket 创建一个线程。有些 Web 服务器会为每个客户端连接创建一个 Worker 线程,这并没有什么问题,这种模型特别适用于 HTTP 这种 Request/Response 的模式。唯一的问题是,线程会占用一定资源,线程之间频繁的切换会浪费 CPU 时间,线程越多用的内存越多(在内存很便宜的现在,这倒不是什么大问题),所以线程的数量是有限制的,一般的 Web 服务器 worker 线程数量在几百个左右已经很多了。这种方法也被称为多线程的阻塞模式,这种模式是最容易理解和实现的一种模式,但也是最有局限性的一种模式。

既然阻塞模式下的 read 会停住线程,那么非阻塞模式是不是会好一点儿呢?当 Socket 被创建为非阻塞模式时,在非阻塞模式的 Socket 上调用 read 函数,如果当前没有数据可以读取,read 函数并不会阻塞,而是返回 EWOULDBLOCK 错误,这个错误告诉我们现在没有新数据,可以过一段时间再重试,这样当前的线程就可以继续去执行其它代码,过段时间重新调用 read。这里其实有个更大的问题,Socket 什么时候有数据可读是不确定的,如果我们通过轮询去检查有没有新数据,这不仅浪费资源,还没办法在数据到达的第一时间去读取。我们需要一种机制,在 Socket 可读时能通知到我们。select 函数便是为此而来。select 函数可以告诉我们一组 Socket 中,哪些可以读,可以写,或者有异常发生。我们只要新建一个线程,不断地调用 select 就可以监视多个 Socket 的状态变化,这便是 I/O 多路复用模式。

多路复用模式相对于阻塞模式性能会更好(select 模型其实性能一般,linux 下性能更好的是 epoll 模型,运行原理大体一样),也更灵活,但是也更复杂。因为 I/O Ready 的消息需要一个机制才能通知给应用代码。我们一般称之为事件驱动模式(event-driven model),常用的范式是 Reactor,将 I/O Ready 的消息,转换成事件,然后调用事件的 handle 函数,应用代码在 handle 函数中实现自己的逻辑。

对于应用代码的开发者来说,我们很显然更喜欢多线程的阻塞模式,因为 I/O 代码和应用代码是连贯在一起的,read 完以后就可以处理数据,处理完以后就可以 send,send 成功就可以放心地认为数据发送成功了。事件驱动模式下,I/O 代码和应用代码是分离的,假如我们在应用代码中调用 send 函数,这只是告诉 I/O 代码我们要发送这些数据,这些数据有没有发送成功?要等事件通知。大多数情况下,我们都需要使用多线程的事件驱动模式,哪些数据需要保障线程安全,哪些不需要,需要开发者对底层 I/O 代码有精确的了解,才实现出高性能的网络应用。

回过头来看协程的特点,如果用协程来解决这个问题就会非常舒服了,因为协程在执行中可以中断,我们在调用 read 时,把线程 yield 给其它协程就行,等到有数据可读时,原来的协程可以从 read 继续执行。在我们看来就像是调用了阻塞模式的 read,完全不需要考虑这背后事件是怎么工作的。由于这些协程都是在同一个线程中执行,我们也不需要考虑线程安全问题。

对于 Python 来说,协程更加有优势。由于 Global Interpreter Lock 的存在,Python 中的多线程并不能被称为真正的多线程,Global Interpreter Lock 限制一个进程同一时间只执行一个线程,所以即使在 Python 中使用了多线程,实际上同一时间只有一个线程在运行。使用协程后就会充分利用一个线程,而不需要多线程之间无谓的切换损耗。

协程的局限性

因为协程都在同一个线程上执行,如果某个协程在执行上花费了太多的 CPU 时间,就会导致所有协程的延迟。我们在上文提到过,协程中不能调用 time.sleep 去睡眠,因为它会导致整个线程睡眠,这也会导致其它所有协程都被延迟。所以,协程更适用于 I/O bound 的场景,而不适用于 CPU bound 的场景,使用协程时需要特别注意,大量消耗 CPU 的任务,应该放在协程外,以异步任务的形式执行。

协程并不能提高性能,除了你有大量线程(成千上万个?)运行的场景。协程实现异步 I/O 仍然是基于操作系统提供的 I/O 模型,相对于非协程的实现并不会有性能上的提升。但是协程确实让代码更简洁了,让开发更容易了,这会大大减少开发成本。

在 Python 中使用协程

Generator

Python 中首先支持的是 Generator,Generator 是一种可以当成 iterator 来使用的函数。

def firstn(n):
    num = 0
    while num < n:
        yield num
        num += 1

for num in firstn(10):
    print(num)

上面的代码中,我们定义了名为 firstn 的 generator。generator 可以通过 for each 或者 next() 函数读取。如果我们如果查看 firstn(10) 的值,会发现它返回的是一个 generator object。

>>> firstn(10)
<generator object firstn at 0x7fe40d412cd0>

>>> obj = firstn(10)
>>> obj
<generator object firstn at 0x7fe40d412c50>
>>> next(obj)
0
>>> next(obj)
1

起作用的是 yield 关键字,yield 关键字,将这个函数变成了 generator。

Python Coroutine

Python 中的 coroutine 与 generator, 略有不同,可以认为是一种特殊的 generator。

def grep_word(match):
    print("Print sentence match word: {}".format(match))
    while True:
        sentence = (yield)
        if match in sentence:
            print(sentence)

corou = grep_word("Hello")

# grep_word 会执行到 sentence = (yield) 处暂停
next(corou)

corou.send("First Sentence")
corou.send("Hello world")

与 generator 不同的是,coroutine 中的 yield 没有参数。我们查看 corou 的值,会发现它也是个 generator object。调用 next() 时,grep_word 会开始执行,执行到 sentence = (yield) 时暂停。调用 corou.send() 时,把 send 的值 传递给 sentence ,并继续执行。

>>> corou = grep_word("Hello")
>>> corou
<generator object grep_word at 0x7f95b7ce9cd0>
>>> next(corou)
Print sentence match word: Hello
>>> corou.send("First Sentence")
>>> corou.send("Hello world")
Hello world

可以看出,generator 是生成数据的,而 coroutine 是消费数据的。

实现 Echo Server

有了 Coroutine 作为基础,我们已经可以使用 coroutine 来实现网络服务,内容较多,这里不做详细的说明,有兴趣的同学可以参考 Python 大神 David Beazley 的文章 A Curious Course on Coroutines and Concurrency。这篇文章中有 coroutine 的详细介绍,以及 coroutine 很多很奇妙的应用场景。

下面这段代码是从原文摘录的,很容易理解。重点是 server 和 handle_client 这两个 coroutine, 以及 Scheduler 实现的 event loop。

import select
import socket
from queue import Queue


class Task:
    '''所有任务的基类'''
    taskid = 0

    def __init__(self, target) -> None:
        Task.taskid += 1
        self.tid = Task.taskid
        self.target = target
        self.sendval = None

    def run(self):
        return self.target.send(self.sendval)


class SystemCall:
    '''如果 Task.run 返回了 SystemCall, 
       Scheduler 会调用 SystemCall.handle 来执行需要的操作'''

    def handle(self):
        pass


class Scheduler:
    '''运行 event loop, 调度 Task 运行'''

    def __init__(self) -> None:
        self.ready = Queue()
        self.taskmap = {}
        self.read_waiting = {}
        self.write_waiting = {}

    def wait_for_read(self, task, fd):
        self.read_waiting[fd] = task

    def wait_for_write(self, task, fd):
        self.write_waiting[fd] = task

    def new(self, target):
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask
        self.schedule(newtask)
        return newtask.tid

    def iotask(self):
        while True:
            if self.ready.empty():
                self.iopoll(None)
            else:
                self.iopoll(0)
            yield

    def iopoll(self, timeout):
        if self.read_waiting or self.write_waiting:
            r, w, e = select.select(
                self.read_waiting, self.write_waiting, [], timeout)
            for fd in r:
                self.schedule(self.read_waiting.pop(fd))
            for fd in w:
                self.schedule(self.write_waiting.pop(fd))

    def schedule(self, task):
        self.ready.put(task)

    def exit(self, task):
        print("Task %d terminated" % task.tid)
        del self.taskmap[task.tid]

    def mainloop(self):
        '''主循环,调度运行所有的 Task'''
        self.new(self.iotask())
        while self.taskmap:
            task = self.ready.get()
            try:
                result = task.run()
                if isinstance(result, SystemCall):
                    result.task = task
                    result.sched = self
                    result.handle()
                    continue
            except StopIteration:
                self.exit(task)
                continue
            self.schedule(task)


class ReadWait(SystemCall):
    def __init__(self, f) -> None:
        self.f = f

    def handle(self):
        fd = self.f.fileno()
        self.sched.wait_for_read(self.task, fd)


class WriteWait(SystemCall):
    def __init__(self, f) -> None:
        self.f = f

    def handle(self):
        fd = self.f.fileno()
        self.sched.wait_for_write(self.task, fd)


class NewTask(SystemCall):
    '''创建一个新的 Task'''

    def __init__(self, target) -> None:
        self.target = target

    def handle(self):
        tid = self.sched.new(self.target)
        self.task.sendval = tid
        # 触发自己的 Task 继续执行
        self.sched.schedule(self.task)


def handle_client(client, addr):
    print(f"Connection from {addr}")
    while True:
        # 等待可读
        yield ReadWait(client)
        data = client.recv(65535)
        if not data:
            break
        # 等待可写
        yield WriteWait(client)
        client.send(data)

    client.close()
    print(f"Client closed {addr}")


def server(port):
    print(f'Server starting port: {port}')
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("", port))
    sock.listen(5)
    while True:
        # 等待新的连接到达
        yield ReadWait(sock)
        client, addr = sock.accept()
        # 使用 handle_client 创建一个新的 Task, 处理客户端的请求
        yield NewTask(handle_client(client, addr))


sched = Scheduler()
sched.new(server(9998))
sched.mainloop()

asyncio 库

从 Echo Server 这个例子可以看出,如果要使用 coroutine ,必须要实现一个 event loop,也就是例子中的 Scheduler.mainloop,因为必须调用 coroutine 的 send 函数,coroutine 才会持续执行,我们需要 event loop 来完成这个工作,并且在 coroutine 之间进行调度。这是一个不小的轮子。Python 3.4 开始提供了专门用于运行和管理 coroutine 的 asyncio 库,连同 async 和 await 关键字,大大方便了使用 Coroutine 开发应用程序。

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

asyncio.run(main())

上面的例子中,async def 关键字将 main 函数定义为 coroutine。 如果我们查看 main() 的返回值,会发现这是一个 coroutine objectasyncio.run() 会创建一个 event loop,并在 event loop 中执行所有的 coroutine, 直到全部结束。

>>> obj = main()
>>> obj
<coroutine object main at 0x7f24bf44cf80>

使用 asyncio 来实现 Echo Server 变得非常简单。

import asyncio


async def handle_tcp_echo(reader, writer):
    address = writer.get_extra_info('peername')
    print(f'connection accept from {address}')

    while True:
        data = await reader.read(65535)
        if not data:
            print(f'connection closed {address}')
            writer.close()
            return

        writer.write(data)
        await writer.drain()


async def run_server():
    server = await asyncio.start_server(handle_tcp_echo, "", 9998)
    async with server:
        await server.serve_forever()

asyncio.run(run_server())

我们实现了 run_server, handle_tcp_echo 两个 coroutine。除了几个 async 和 await 关键字,和阻塞模式的写法基本一样。