性能文章>Redis 多线程网络模型[译]>

Redis 多线程网络模型[译]原创

373634

说明

今天读取了一篇关于【Redis 多线程网络模型】的国外文章,写的非常不错,深入浅出,从单线程、多线程、io多路复用以及他们实现的思路,所以把他简单翻译下;

大家可以看下原文:
https://www.sobyte.net/post/2022-03/redis-multi-threaded-network-model/

概述

Redis已经成为当前技术选择中高性能缓存解决方案的行业标准,因此Redis成为后端开发人员的基本技能之一。

Redis本质上是一个web服务器,对于web服务器来说,网络模型是它的本质,如果您了解Web服务器的网络模型,就会了解它的本质。

本文分步介绍 Redis 网络模型,并分析其如何从单线程演进到多线程。此外,我们还分析了 Redis 网络模型中做出的许多选择背后的思想,以帮助读者更好地理解 Redis 网络模型的设计。

Redis有多快

根据官方基准测试,在具有平均硬件的 Linux 机器上运行的单个 Redis 实例通常可以达到简单命令(O(N) 或O(log(N)) 的 QPS 8w+,而流水线批处理的 QPS 高达 100w,仅从性能来看,Redis 可以称为高性能缓存解决方案。

Redis为什么这么快

Redis的高性能归功于以下基本要素。

image.png

  • C实现,虽然C有助于Redis的性能,但是语言不是核心因素
  • 内存中的io,RRedis在纯内存操作方面比其他基于磁盘的数据库具有天然的性能优势。
  • I/O 多路复用,用于基于 epoll/select/kqueue 和其他 I/O 多路复用技术的高吞吐量网络 I/O。
  • 单线程模型,单线程不能利用多核的优势,但另一方面,它避免了多线程频繁的上下文切换,以及锁等同步机制的开销。

为什么 Redis 选择单线程?

  • Redis 的核心网模型是单线程的,一开始引起了很多质疑,而 Redis 官方对此的回答是:

CPU 成为 Redis 瓶颈的情况并不常见,因为通常 Redis 要么是内存限制,要么是网络绑定的。例如,使用流水线 在普通 Linux 系统上运行的 Redis 每秒甚至可以交付 100 万个请求,因此如果您的应用程序主要使用 O(N) 或 O(log(N)) 命令,它几乎不会使用太多 CPU。

  • 从本质上讲,这意味着 CPU 通常不是数据库的瓶颈,因为大多数请求不是 CPU 密集型的,而是 I/O 密集型的。
  • 特别是在 Redis 的情况下,如果你不考虑像 RDB/AOF 这样的持久性方案,Redis 是一个完全在内存中的操作,它非常快。
  • Redis 真正的性能瓶颈是网络 I/O,即客户端和服务器之间网络传输的延迟,因此 Redis 选择单线程 I/O 复用来实现其核心网络模型。
  • 以上是比较笼统的官方回答,但其实选择单线程的更具体原因可以总结如下:

避免过多的上下文切换开销

  • 在多线程调度过程中,需要在CPU之间切换线程上下文,上下文切换涉及一系列寄存器替换,程序堆栈复位甚至CPU缓存和TLB快速表报废,如程序计数器,堆栈指针和程序状态字。由于单个进程中的多个线程共享进程地址空间,因此线程上下文比进程上下文小得多,并且在跨进程调度的情况下,需要切换整个进程地址空间。
  • 在单线程调度的情况下,可以避免进程内频繁的线程切换开销,因为程序始终在进程中的单个线程内运行,并且没有多线程切换方案。

避免同步机制的开销

如果 Redis 选择多线程模型,并且因为 Redis 是一个数据库,难免会涉及到底层的数据同步问题,这必然会引入一些同步机制,比如锁,我们知道 Redis 提供的不仅仅是简单的键值数据结构,还有列表、集合、哈希等丰富的数据结构。
不同的数据结构对同步访问的锁定粒度不同,这可能会导致数据操作过程中大量的锁定和解锁开销,增加程序复杂性并降低性能。

### 简单且可维护

Redis 的作者 Salvatore Sanfilippo(别名 antirez)在 Redis 的设计和代码中有着一种近乎偏执的简单哲学,当你阅读 Redis 源代码或向其提交 PR 时,你可以感受到这种偏执。因此,在早期的 Redis 中,简单易维护的代码必然是其核心指导原则之一。而多线程的引入不可避免地增加了代码复杂性并降低了可维护性。

事实上,多线程编程并不完美。首先,多线程编程的引入将不再保持代码逻辑串行,代码执行的顺序将变得不可预测,如果不小心会导致各种并发编程问题;其次,多线程模式也使调试更加复杂和麻烦。网络上有一张有趣的图片,生动地描绘了并发编程面临的困境。

您对多线程编程的期望与实际的多线程编程。

image.png

如果 Redis 使用多线程模型,则必须将所有底层数据结构实现为线程安全。这反过来又使 Redis 实现更加复杂。

简而言之,Redis 选择单线程是在保持代码简单和可维护性与保持足够性能之间的权衡。

Redis 真的是单线程的吗?

在讨论这个问题之前,我们需要澄清“单线程”概念的界限:它是否涵盖核心网络模型或整个 Redis?如果是前者,那么答案是肯定的。网络模型是单线程的,直到 Redis 在 v6.0 中正式引入多线程;如果是后者,那么答案是否定的。Redis 早在 v4.0 中就引入了多线程。

因此,在讨论 Redis 中的多线程时,在 Redis 版本中描述两个要点非常重要。

  • Redis v4.0(为异步任务引入了多线程)
  • Redis v6.0(在网络模型中正式实现 I/O 多线程)

单线程事件循环

  • 我们先来剖析一下 Redis 的核心网络模型。从 Redis v1.0 到 v6.0,Redis 的核心网络模型一直是典型的单 Reactor 模型:使用 epoll/select/kqueue 等多路复用技术在单线程事件循环中处理事件(客户端请求),最后将响应数据写回客户端。

image.png

  • 这里有几个核心概念需要学习:
    • 客户端:客户端对象,Redis 是典型的 CS 架构(客户端 <–> 服务器),其中客户端通过套接字与服务器建立网络通道,然后发送请求的命令,服务器执行请求的命令并回复。Redis 使用结构客户端来存储客户端的所有相关信息,包括但不限于 wrapped socket connection -- *conn 、 currently selected database pointer -- *db 、 read buffer -- querybuf 、 write buffer -- buf 、 write data linked list -- reply 等。
    • aeApiPoll:I/O复用API,是基于epoll_wait/select/kevent等系统调用进行包装,监听要触发的读写事件,然后进行处理,它是Event Loop(Event Loop)的核心功能,是事件驱动运行的基础。
    • acceptTcpHandler:连接应答处理器,底层使用系统调用 accept 接受来自客户端的新连接,并将新连接注册绑定命令读取处理器用于后续处理新客户端TCP连接;除了这个处理器之外,还有一个相应的 acceptUnixHandler 用于处理Unix域套接字和 acceptTLSHandler 用于处理TLS加密连接。
    • readQueryFromClient :一个命令读取处理器,用于解析和执行客户端请求的命令。
    • beforeSleep:在事件循环进入aeApiPoll并等待事件到达之前执行的函数。它包含一些例行任务,例如将来自 client->buf 或 client->reply (此处需要两个缓冲区)的响应写回客户端,将 AOF 缓冲区中的数据保存到磁盘等。还有一个在aeApiPoll之后执行的afterSleep函数。
    • sendReplyToClient:命令回复处理程序,当一个事件循环后写缓冲区中仍有数据时,该处理程序将被注册并绑定到相应的连接,当连接触发写就绪事件时,它会将写缓冲区中的剩余数据写回客户端。
  • Redis 在内部实现了基于 epoll/select/kqueue/evport 的高性能事件库 AE,以实现 Linux/MacOS/FreeBSD/Solaris 的高性能事件循环模型。Redis的核心网络模型正式建立在AE之上,包括I/O复用,以及各种处理器绑定的注册,都是基于AE。

在这一点上,我们可以描述客户端从 Redis 请求命令的工作方式。
- Redis 服务器启动,打开主线程事件循环,将 acceptTcpHandler 连接应答处理器注册到与用户配置的侦听端口对应的文件描述符,并等待新连接到达。
- 在客户端和服务器之间建立网络连接。
- 调用 acceptTcpHandler ,主线程使用 AE 的 API 将 readQueryFromClient 命令读取处理器绑定到新连接对应的文件描述符,并初始化 client 绑定此客户端连接。
- 客户端发送请求命令,触发读取就绪事件,主线程调用 readQueryFromClient 将客户端通过套接字发送的命令读取到 client->querybuf 读取缓冲区中。
- 接下来调用 processInputBuffer ,其中 processInlineBuffer 或 processMultibulkBuffer 用于根据 Redis 协议解析命令,最后调用 processCommand 执行命令。
- 根据所请求命令的类型(SET、GET、DEL、EXEC 等),分配合适的命令执行器来执行它,最后调用 addReply 家族中的一系列函数将响应数据写入对应 client 的写缓冲区: client->buf 或 client->reply , client->buf 是首选写出缓冲区,固定大小为 16KB, 一般可以缓冲足够的响应数据,但如果客户端在时间窗口内需要非常大的响应,那么它会自动切换到 client->reply 链表,理论上可以容纳无**的数据(受机器物理内存限制)最后将 client 添加到 LIFO 队列 clients_pending_write 中。
- 在事件循环中,主线程执行 beforeSleep –> handleClientsWithPendingWrites ,遍历 clients_pending_write 队列,调用 writeToClient 将 client 的写缓冲区中的数据返回给客户端,如果写缓冲区中还有剩余数据,则注册 sendReplyToClient 命令以回复处理器,并为连接提供写就绪事件, 并等待客户端写入,然后再继续写回事件循环中的剩余响应数据。

对于那些想要利用多核性能的人来说,官方的 Redis 解决方案简单而残酷:在同一台机器上运行更多 Redis 实例。事实上,为了确保高可用性,在线业务不太可能处于独立模式。更常见的是使用多节点、多数据分片的 Redis 分布式集群,以提高性能,保证高可用性。

多线程异步任务

以上是 Redis 的核心网络模型,直到 Redis v6.0 才转变为多线程模型。但这并不意味着 Redis 一直是单线程的。

Redis 在 v4.0 中引入了多线程来做一些异步操作,主要是针对非常耗时的命令。通过异步执行这些命令,可以避免阻塞单线程事件循环。

我们知道 Redis DEL 命令用于删除一个或多个存储的键值,它是一个阻塞命令。在大多数情况下,您要删除的键不会存储很多值,最多几十个或几百个对象,因此可以快速执行。但是,如果要删除包含数百万个对象的非常大的键值对,则此命令可能会阻塞至少几秒钟,并且由于事件循环是单线程的,因此它将阻止随后的其他事件,从而导致吞吐量降低。

Redis的作者Antirez对解决这个问题进行了大量思考。起初,他提出了一个增量解决方案:使用计时器和数据光标,他一次删除少量数据,比如 1000 个对象,最终清除所有数据。但是这个解决方案有一个致命的缺陷:如果其他客户端继续将数据写入一个同时被逐步删除的密钥,并且删除率跟不上正在写入的数据,那么内存将被无休止地消耗,这可以通过一个聪明的解决方案来解决,但这个实现使 Redis 更加复杂。多线程似乎是一个滴水不漏的解决方案:简单易懂。因此,最终,antirez 选择引入多线程来实现此类非阻塞命令。 更多Antirez对此的想法可以在他的博客中找到:懒惰的Redis更好Redis。

因此,在 Redis v4.0 之后,添加了一些非阻塞命令,如 UNLINK 、 FLUSHALL ASYNC 、 FLUSHDB ASYNC 。

image.png

UNLINK 命令其实是 DEL 的异步版本,它不会同步删除数据,只是暂时从密钥空间中删除密钥,然后将任务添加到异步队列中,最后后台线程会删除它。但这里我们需要考虑一种情况,如果我们使用 UNLINK 删除一个非常小的键,异步方式做会有更多的开销,所以它会先计算一个开销阈值,只有当这个值大于 64 时我们才会使用异步方式删除密钥, 对于列表、集合和哈希等基本数据类型,阈值是其中存储的对象数存储的对象数。

Redis 多线程网络模型

如前所述,Redis 最初选择单线程网络模型的原因是因为 CPU 通常不是性能瓶颈,瓶颈往往是内存和网络,因此单线程就足够了。那么为什么 Redis 现在引入多线程呢?一个简单的事实是,Redis的网络I/O瓶颈越来越明显。

随着互联网的快速增长,互联网业务系统处理的在线流量越来越多,而 Redis 的单线程模式导致系统在网络 I/O 上消耗大量 CPU 时间,从而降低吞吐量。有两种方法可以提高 Redis 的性能。
- 优化的网络 I/O 模块
- 提高机器内存读写速度

后者取决于硬件的发展,暂时无法解决。因此,我们只能从前者入手,网络I/O的优化可以分为两个方向。
- 零拷贝技术或 DPDK 技术
- 利用多核

零拷贝技术有其局限性,无法完全适应 Redis 等复杂的网络 I/O 场景。(有关 CPU 时间和 Linux 零拷贝技术的网络 I/O 消耗的更多信息,请阅读上一篇文章。通过绕过NIC I/O绕过内核堆栈的DPDK技术过于复杂,需要内核甚至硬件支持。

因此,利用多核是优化网络 I/O 的最经济高效的方法。

在 6.0 版本之后,Redis 正式将多线程引入核心网络模型,也称为 I/O 线程,而 Redis 现在有了真正的多线程模型。在上一节中,我们了解了 Redis 6.0 之前的单线程事件循环模型,它实际上是一个非常经典的 Reactor 模型。

image.png

image.png
Reactor 模式用于 Linux 平台上大多数主流的高性能网络库/框架,如 netty、libevent、libuv、POE(Perl)、Twisted (Python) 等。

反应器模式本质上是指使用 I/O multiplexing (I/O multiplexing) + non-blocking I/O (non-blocking I/O) 模式。

在 6.0 版本之前,Redis 的核心网络模型是单个 Reactor 模型:所有事件都在单个线程中处理,尽管在 4.0 版本中引入了多线程,但它更像是针对特定场景(删除超大键值等)的补丁,不能被视为核心网络模型的多线程。

一般来说,单反应器模型在引入多线程后演变为多反应器模型,基本工作模型如下。

![[Pasted image 20230513094632.png]]

image.png
此模式不是单线程事件循环,而是有多个线程(子反应器),每个线程维护一个单独的事件循环,主反应器接收新连接并将其分发给子反应器进行独立处理,子反应器将响应写回客户端。

多反应器模式通常可以等同于Master-Workers模式,例如Nginx和Memcached,它们使用这种多线程模型,尽管实现细节因项目而异,但模式通常是一致的。

设计思维

Redis 也实现了多线程,但不是在标准的多反应器/主工作线程模式中,原因我们将在后面分析。现在,我们来看一下 Redis 多线程网络模型的总体设计。

image.png

  1. Redis 服务器启动,打开主线程事件循环,将 acceptTcpHandler 连接应答处理器注册到与用户配置的侦听端口对应的文件描述符,并等待新连接到达。
  2. 在客户端和服务器之间建立网络连接。
  3. 调用 acceptTcpHandler ,主线程使用 AE 的 API 将 readQueryFromClient 命令读取处理器绑定到新连接对应的文件描述符,并初始化 client 绑定此客户端连接。
  4. 客户端发送请求命令,触发读取就绪事件。服务器的主线程不是通过套接字读取客户端的请求命令,而是首先将 client 放入 LIFO 队列 clients_pending_read 中。
  5. 在事件循环中,主线程执行 beforeSleep –> handleClientsWithPendingReadsUsingThreads ,使用轮询负载均衡策略在 I/O 线程之间均匀分配 clients_pending_read 队列中的连接 I/O 线程通过套接字读取客户端请求的命令,将其存储在 client->querybuf 中并解析第一个命令,但不执行它, 而主线程正忙于轮询并等待所有 I/O 线程完成读取任务。
  6. 当主线程和所有 I/O 线程完成读取后,主线程完成繁忙轮询,遍历 clients_pending_read 队列,执行所有客户端连接的请求命令,并首先调用 processCommandResetClient 以执行已解析的第一个命令。然后调用 processInputBuffer 解析并执行客户端连接的所有命令,使用 processInlineBuffer 或 processMultibulkBuffer 根据 Redis 协议解析命令,最后调用 processCommand 执行名为 processCommand 的命令来执行命令。
  7. 根据所请求命令的类型(SET、GET、DEL、EXEC 等),分配相应的命令执行器来执行它,最后调用 addReply 家族中的一系列函数将响应数据写入对应的 client 写出缓冲区: client->buf 或 client->reply , client->buf 是首选的写出缓冲区,固定大小为 16KB, 它通常会缓冲足够的响应数据,但如果客户端需要在时间范围内响应大量数据,则会自动切换到 client->reply 链表。理论上,链表可以容纳无**的数据(受机器物理内存的限制),最后将 client 添加到 LIFO 队列 clients_pending_write 中。
  8. 在事件循环中,主线程执行 beforeSleep –> handleClientsWithPendingWritesUsingThreads ,使用轮询负载平衡策略将 clients_pending_write 队列中的连接均匀分布到 I/O 线程和主线程本身。I/O 线程通过调用 writeToClient 将 client 的写缓冲区中的数据写回客户端,主线程忙于轮询所有 I/O 线程以完成其写入任务,主线程忙于轮询并等待所有 I/O 线程完成其写入任务。
  9. 当主线程和所有 I/O 线程完成写出后,主线程完成繁忙轮询并遍历 clients_pending_write 队列。如果 client 写入缓冲区中仍有数据,它将 sendReplyToClient 注册到该连接的写就绪事件,并等待客户端写入,然后再继续写回事件循环中的剩余响应数据。

这里的大多数逻辑与之前的单线程模型中相同,唯一的变化是异步读取客户端请求并将响应数据写回 I/O 线程的逻辑。这里需要特别注意的是:I/O 线程只读取和解析客户端命令,并不实际执行它们,客户端命令的执行最终在主线程上完成。

源代码剖析

以下所有代码均基于 Redis v6.0.10 版本。

多线程初始化

```c
void initThreadedIO(void) {
    server.io_threads_active = 0; /* We start with threads not active. */

    // 如果用户只配置了一个 I/O 线程,则不会创建新线程(效率低),直接在主线程里处理 I/O。
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    // 根据用户配置的 I/O 线程数,启动线程。
    for (int i = 0; i < server.io_threads_num; i++) {
        // 初始化 I/O 线程的本地任务队列。
        io_threads_list[i] = listCreate();
        if (i == 0) continue; // 线程 0 是主线程。

        // 初始化 I/O 线程并启动。
        pthread_t tid;
        // 每个 I/O 线程会分配一个本地锁,用来休眠和唤醒线程。
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        // 每个 I/O 线程分配一个原子计数器,用来记录当前遗留的任务数量。
       io_threads_pending[i] = 0;
        // 主线程在启动 I/O 线程的时候会默认先锁住它,直到有 I/O 任务才唤醒它。
        pthread_mutex_lock(&io_threads_mutex[i]);
        // 启动线程,进入 I/O 线程的主逻辑函数 IOThreadMain。
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}
  1. initThreadedIO 在 Redis 服务器启动时的初始化工作结束时调用,以初始化 I/O 多线程并启动它。
  2. Redis 多线程模式默认关闭,需要用户在 redis.conf 配置文件中启用。
io-threads 4
io-threads-do-reads yes

读取请求

  • 当客户端发送请求命令时,它会在 Redis 主线程中触发事件循环,并回调命令处理程序 readQueryFromClient 。在以前的单线程模型中,此方法将直接读取和分析客户端命令并执行它。但是,在多线程模式下, client 将添加到 clients_pending_read 任务队列中,然后将主线程分配给 I/O 线程以读取客户端请求的命令。
void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;

    // 检查是否开启了多线程,如果是则把 client 加入异步队列之后返回。
    if (postponeClientRead(c)) return;
    
    // 省略代码,下面的代码逻辑和单线程版本几乎是一样的。
    ... 
}

int postponeClientRead(client *c) {
    // 当多线程 I/O 模式开启、主线程没有在处理阻塞任务时,将 client 加入异步队列。
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
    {
        // 给 client 打上 CLIENT_PENDING_READ 标识,表示该 client 需要被多线程处理,
        // 后续在 I/O 线程中会在读取和解析完客户端命令之后判断该标识并放弃执行命令,让主线程去执行。
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
       return 0;
    }
}

然后,主线程在事件循环的 beforeSleep() 方法中调用

int handleClientsWithPendingReadsUsingThreads(void) {
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);

    // 遍历待读取的 client 队列 clients_pending_read,
    // 通过 RR 轮询均匀地分配给 I/O 线程和主线程自己(编号 0)。
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 设置当前 I/O 操作为读取操作,给每个 I/O 线程的计数器设置分配的任务数量,
    // 让 I/O 线程可以开始工作:只读取和解析命令,不执行。
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // 主线程自己也会去执行读取客户端请求命令的任务,以达到最大限度利用 CPU。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    // 忙轮询,累加所有 I/O 线程的原子任务计数器,直到所有计数器的遗留任务数量都是 0,
    // 表示所有任务都已经执行完成,结束轮询。
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O READ All threads finshed\n");

    // 遍历待读取的 client 队列,清除 CLIENT_PENDING_READ 和 CLIENT_PENDING_COMMAND 标记,
    // 然后解析并执行所有 client 的命令。
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);

        if (c->flags & CLIENT_PENDING_COMMAND) {
            c->flags &= ~CLIENT_PENDING_COMMAND;
            // client 的第一条命令已经被解析好了,直接尝试执行。
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid
                 * processing the client later. So we just go
                 * to the next. */
                continue;
            }
        }
        processInputBuffer(c); // 继续解析并执行 client 命令。

        // 命令执行完成之后,如果 client 中有响应数据需要回写到客户端,则将 client 加入到待写出队列 clients_pending_write
        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
            clientInstallWriteHandler(c);
    }

    /* Update processed count on server */
    server.stat_io_reads_processed += processed;

    return processed;
}

这里的核心工作是。

  • 循环访问要读取的 client 队列 clients_pending_read ,并将所有任务分配给 I/O 线程和主线程,以通过 RR 策略读取和分析客户端命令。
  • 忙于轮询,等待所有 I/O 线程完成其任务。
  • 最后循环访问 clients_pending_read 并执行所有 client 命令。

写回响应

读取、解析并执行命令后,客户端命令的响应数据已存储在 client->buf 或 client->reply 中。接下来,您需要将响应数据写回客户端。同样,在 beforeSleep 中,主线程调用 handleClientsWithPendingWritesUsingThreads 。

int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */

    // 如果用户设置的 I/O 线程数等于 1 或者当前 clients_pending_write 队列中待写出的 client
    // 数量不足 I/O 线程数的两倍,则不用多线程的逻辑,让所有 I/O 线程进入休眠,
    // 直接在主线程把所有 client 的相应数据回写到客户端。
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    // 唤醒正在休眠的 I/O 线程(如果有的话)。
    if (!server.io_threads_active) startThreadedIO();

    if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);

    // 遍历待写出的 client 队列 clients_pending_write,
    // 通过 RR 轮询均匀地分配给 I/O 线程和主线程自己(编号 0)。
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;

        /* Remove clients from the list of pending writes since
         * they are going to be closed ASAP. */
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);
            continue;
        }

        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 设置当前 I/O 操作为写出操作,给每个 I/O 线程的计数器设置分配的任务数量,
    // 让 I/O 线程可以开始工作,把写出缓冲区(client->buf 或 c->reply)中的响应数据回写到客户端。
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // 主线程自己也会去执行读取客户端请求命令的任务,以达到最大限度利用 CPU。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    // 忙轮询,累加所有 I/O 线程的原子任务计数器,直到所有计数器的遗留任务数量都是 0。
    // 表示所有任务都已经执行完成,结束轮询。
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O WRITE All threads finshed\n");

    // 最后再遍历一次 clients_pending_write 队列,检查是否还有 client 的写出缓冲区中有残留数据,
    // 如果有,那就为 client 注册一个命令回复器 sendReplyToClient,等待客户端写就绪再继续把数据回写。
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        // 检查 client 的写出缓冲区是否还有遗留数据。
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);

    /* Update processed count on server */
    server.stat_io_writes_processed += processed;

    return processed;
}

这里的核心工作是。

  1. 检查当前任务负载,如果当前任务数不足以在多线程模式下处理,请休眠 I/O 线程并将响应数据直接同步写回客户端。
  2. 唤醒处于休眠状态的 I/O 线程(如果有)。
  3. 循环访问 client 队列 clients_pending_write 并将所有任务分配给 I/O 线程和主线程,以通过 RR 策略将响应数据写回客户端。
  4. 忙于轮询等待所有 I/O 线程完成其任务。
  5. 最后遍历 clients_pending_write ,为那些仍有响应数据的 clients 注册命令回复处理程序 sendReplyToClient ,并等待客户端可写,然后再继续写回事件循环中的剩余响应数据。

I/O 线程主逻辑

void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    long id = (unsigned long)myid;
    char thdname[16];

    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    // 设置 I/O 线程的 CPU 亲和性,尽可能将 I/O 线程(以及主线程,不在这里设置)绑定到用户配置的
    // CPU 列表上。
    redisSetCpuAffinity(server.server_cpulist);
    makeThreadKillable();

    while(1) {
        // 忙轮询,100w 次循环,等待主线程分配 I/O 任务。
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        // 如果 100w 次忙轮询之后如果还是没有任务分配给它,则通过尝试加锁进入休眠,
        // 等待主线程分配任务之后调用 startThreadedIO 解锁,唤醒 I/O 线程去执行。
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        serverAssert(io_threads_pending[id] != 0);

        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));


        // 注意:主线程分配任务给 I/O 线程之时,
        // 会把任务加入每个线程的本地任务队列 io_threads_list[id],
        // 但是当 I/O 线程开始执行任务之后,主线程就不会再去访问这些任务队列,避免数据竞争。
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            // 如果当前是写出操作,则把 client 的写出缓冲区中的数据回写到客户端。
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
              // 如果当前是读取操作,则socket 读取客户端的请求命令并解析第一条命令。
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        // 所有任务执行完之后把自己的计数器置 0,主线程通过累加所有 I/O 线程的计数器
        // 判断是否所有 I/O 线程都已经完成工作。
        io_threads_pending[id] = 0;

        if (tio_debug) printf("[%ld] Done\n", id);
    }
}
  • 启动 I/O 线程后,它首先进入繁忙轮询以确定原子计数器中的任务数。如果它不为零,则主线程已为其分配了一个任务并开始执行它,否则它会忙于轮询一百万次并等待。如果仍然为 0,则尝试添加本地锁,因为主线程在启动所有 I/O 线程时已经提前锁定了它们的本地锁,因此 I/O 线程将休眠并等待主线程唤醒。
  • 主线程将尝试在每个事件循环中调用 startThreadedIO ,以唤醒 I/O 线程以执行任务。如果收到客户端请求命令,I/O 线程将被唤醒并开始工作,以执行读取和分析命令或根据主线程设置的 io_threads_op 标志写回响应数据的任务。收到主线程的通知后,I/O 线程会遍历自己的本地任务队列 io_threads_list[id] ,取出一个 client 来执行任务。
    • 如果当前操作是写入操作,请调用 writeToClient 将响应数据从 client->buf 或 client->reply 通过套接字写回客户端。
    • 如果当前操作是读取操作,调用 readQueryFromClient 通过套接字读取客户端命令,存储在 client->querybuf 中,然后调用 processInputBuffer 解析命令,只会以第一个命令结束,然后完成而不执行它。
    • 执行所有任务后,将其自己的原子计数器设置为 0,以告知主线程它已完成其工作。
void processInputBuffer(client *c) {

...

    while(c->qb_pos < sdslen(c->querybuf)) {
        /* Return if clients are paused. */
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;

        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & CLIENT_BLOCKED) break;

        /* Don't process more buffers from clients that have already pending
         * commands to execute in c->argv. */
        if (c->flags & CLIENT_PENDING_COMMAND) break;
        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            // 判断 client 是否具有 CLIENT_PENDING_READ 标识,如果是处于多线程 I/O 的模式下,
            // 那么此前已经在 readQueryFromClient -> postponeClientRead 中为 client 打上该标识,
            // 则立刻跳出循环结束,此时第一条命令已经解析完成,但是不执行命令。
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }

            // 执行客户端命令
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid exiting this
                 * loop and trimming the client buffer later. So we return
                 * ASAP in that case. */
                return;
            }
        }
    }

...
}
  • 首次启动 I/O 线程时,应特别注意当前线程的 CPU 关联性,即将当前线程绑定到用户配置的 CPU。启动主 Redis 服务器线程(即 Redis 的核心网络模型)时设置相同的 CPU 关联。Redis 本身是一个对吞吐量和延迟极其敏感的系统,因此用户需要 Redis 对 CPU 资源进行更精细的控制。这里有两个主要注意事项:CPU 缓存和 NUMA 体系结构。
  • 首先是CPU缓存(我们谈论的是将一级缓存和二级缓存集成到CPU中的硬件架构)。想象一下这样的场景:主 Redis 进程在 CPU-1 上运行,向客户端提供数据,Redis 启动一个子进程以实现数据持久性(BGSAVE 或 AOF)。系统调度后,子进程接管主进程的CPU-1,主进程调度在CPU-2上运行,导致CPU-1缓存中的指令和数据被消除,CPU-2将指令和数据重新加载到自己的本地缓存中,浪费CPU资源,降低性能。

image.png

因此,通过设置 CPU 关联,Redis 可以将主进程/线程和子进程/线程隔离,将它们绑定到不同的内核,使它们不会相互干扰,从而可以有效提高系统性能。

第二个考虑因素基于 NUMA 体系结构。在NUMA系统下,内存控制器芯片集成在处理器内部,形成CPU本地内存。对本地内存的访问仅通过内存通道而不是通过系统总线,大大降低了访问延迟,同时多个处理器通过QPI数据链路互连,跨NUMA节点的内存访问开销远高于本地内存访问。

image.png

因此,Redis 还可以通过设置 CPU 关联性来大幅提升性能,使主进程/线程尽可能在 NUMA 节点上的固定 CPU 上运行,使用更多的本地内存,而不是跨节点访问数据。
有关 NUMA 的更多信息,请自行查看。以后有时间我会单独写一篇关于它的文章。

阅读过源代码的读者可能会想知道的最后一点是,Redis 似乎并没有在多线程模式下锁定数据。事实上,Redis 的多线程模型自始至终都是无锁的,这是通过原子操作 + 交错访问实现的,主线程和 I/O 线程之间共享三个变量: io_threads_pending 计数器、 io_threads_op I/O 标识符和 io_threads_list list 的线程本地任务队列。

io_threads_pending 是一个不需要锁保护的原子变量,而 io_threads_op 和 io_threads_list 是两个变量,它们通过控制主线程和 I/O 线程之间的交错访问来规避共享数据竞争问题:I/O 线程通过繁忙的轮询和锁定休眠启动并等待来自主线程的信号。启动后,I/O 线程通过繁忙的轮询和锁定睡眠等待来自主线程的信号。 在唤醒 I/O 线程开始工作之前,它不会访问自己的本地任务队列 io_threads_list[id] ,直到它将所有任务分配给每个 I/O 线程的本地队列,并且主线程只会访问自己的本地任务队列 io_threads_list[ 0] ,不会访问 I/O 线程的本地队列, 这可确保主线程始终在 I/O 线程之前访问 io_threads_list ,并且永远不会再次访问,从而确保交错访问。同样,对于 io_threads_op ,主线程在唤醒 I/O 线程之前设置 io_threads_op 的值,并且在 I/O 线程运行时不会再次访问此变量。

image.png

性能改进

Redis将核心网络模型转换为多线程模式,以追求最终的性能提升,因此基准数据是真实可靠的。

image.png
测试数据表明,使用多线程模式时,Redis的性能可以显著提高两倍。更详细的性能分析数据可以在这篇文章中找到:基于实验的 Redis 多线程 I/O 基准测试。

以下是 Mito 技术团队测算的新旧 Redis 版本性能对比,仅供参考。

image.png

模型缺陷

首先,正如我前面提到的,Redis 的多线程网络模型实际上并不是标准的多反应器/Master-Worker 模型,并且与其他主流开源 Web 服务器模型不同。工人模型,子反应堆/工人将完成 network read -> data parsing -> command execution -> network write 流程,主反应堆/主反应堆只负责分配任务。在 Redis 的多线程场景中,I/O 线程的任务只是通过套接字读取和分析客户端请求,而不是实际执行命令。所有客户端命令最终都需要返回给主线程执行,因此多核的利用率不是很高。此外,每次主线程都必须忙于轮询所有 I/O 线程以完成其任务,然后再继续执行逻辑的其余部分。

我认为 Redis 设计多线程网络模型的主要原因是为了保持兼容性,因为 Redis 以前是单线程的,所有客户端命令都在单线程事件循环中执行,因此 Redis 中的所有数据结构都是非线程安全的。现在有了多线程,如果我们遵循标准的多反应器/主工作线程模型,所有内置的数据结构都必须重构才能实现线程安全,这是大量的工作和麻烦。

因此,在我看来,Redis 目前的多线程解决方案更像是一种折衷:它保持了原始系统的兼容性,同时利用多个内核来提高 I/O 性能。

其次,目前 Redis 的多线程模型在主线程和 I/O 线程之间的通信过于简单和残酷:忙于轮询和锁定,因为通过旋转繁忙轮询等待会导致 Redis 偶尔出现高占用率,导致启动时和运行时短暂的 CPU 空闲。这种通信机制的最终实现看起来非常不直观和简单,我们希望 Redis 稍后会在当前解决方案的基础上进行改进。

总结

Redis 是缓存系统的事实标准,其基本原则值得深入研究。但是作者antirez,一个简单的开发者,对向Redis添加任何新功能都非常谨慎,所以核心Redis网络模型最终在Redis最初发布十年后转换为多线程模型,在此期间,许多Redis多线程替代方案甚至诞生了。尽管Antirez一直在推迟多线程解决方案,但它从未停止思考多线程的可行性。Redis多线程网络模型的转型并非一蹴而就,涉及项目的方方面面,所以我们可以看到最终的Redis解决方案并不完美,没有采用主流的多线程设计。

让我们回顾一下 Redis 多线程网络模型的设计。

  • 使用 I/O 线程的多线程网络 I/O,其中 I/O 线程仅负责网络 I/O 和命令解析,不执行客户端命令。
  • 使用原子操作 + 交错访问实现无锁多线程模型。
  • 通过设置 CPU 关联将主进程与其他子进程隔离开来,以便多线程网络模型可以最大限度地提高性能。

通读本文后,相信读者应该能够了解计算机领域涉及的各种技术,以实现良好的网络系统:设计模式、网络 I/O、并发编程、操作系统底层,甚至计算机硬件。它还需要对项目迭代和重构采取谨慎的方法,并对技术解决方案进行深入思考,而不仅仅是编写好代码的困难部分。

点赞收藏
分类:标签:
零点的架构之路
请先登录,查看3条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

Redis系列:RDB内存快照提供持久化能力

Redis系列:RDB内存快照提供持久化能力

Redis stream 用做消息队列完美吗?

Redis stream 用做消息队列完美吗?

4
3