Redis多线程模型源码解析
1. 配置启用多线程
默认情况下多线程是默认关闭的,如果想要启动多线程,需要在配置文件中做适当的修改。
修改redis.conf 文件如下
io-threads 4 #启用的 io 线程数量
io-threads-do-reads yes #读请求也使用io线程
2 源码解析
进入到Redis的main入口函数
int main(int argc, char **argv) {// ...// 初始化服务initServer();// ...// InitServerLast-》启动 io 线程InitServerLast();// ...// 事件循环aeMain(server.el);// ...
}
2.1 对initServer()
源码解析
其中 initServer()
主要做如下几件事
- 初始化读任务队列、写任务队列(存放client对象)
- 创建一个epoll对象
- 对配置的监听端口进行监听
- 把监听到的socket连接让epoll管理起来
//server.c
void initServer(void) {// 1 初始化 server 对象//1.1 将来主线程的任务都会放到这两个队列中server.clients_pending_write = listCreate();server.clients_pending_read = listCreate();......// 2 初始化回调 events,创建 epoll(通过epoll_create函数创建)server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);// 3 绑定监听服务端口。(监听连接请求)listenToPort(server.port,server.ipfd,&server.ipfd_count);// 4 注册accept事件到epoll(通过epoll_ctl函数)for (j = 0; j < server.ipfd_count; j++) {aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL);}...
}
2.2 对InitServerLast()
源码解析
该函数在server.c文件中,它调用了initThreadedIO函数来对IO线程初始化
void InitServerLast() {bioInit();initThreadedIO();//初始化IO线程set_jemalloc_bg_thread(server.jemalloc_bg_thread);server.initial_memory_usage = zmalloc_used_memory();
}
initThreadedIO()
在networking.c
文件中:
- 初始化全局变量 server.io_threads_active线程活跃状态为0,表示未激活IO多线程
- 对
server.io_threads_num
(在配置文件中是否修改,标识是否启用)的值进行判断,io_threads_num
表示设置的IO线程数量
如果线程数设置为1,表示不开启多线程直接返回即可;如果线程数超过了IO_THREADS_MAX_NUM
设置的最大值(128),则报错并停止redis服务。 - 根据线程数的设置创建线程
- 初始化
io_thread_list[i]
。io_threads_list是一个数组,数组中的每一个元素都是一个list,里面存储每个线程要处理的客户端列表,下标是0的即io_threads_lis[0]存储的是主线程要处理的客户端列表 - 初始化
io_threads_pending[i]
为0,io_threads_pending数组存储每个线程等待处理的客户端个数 - 调用
pthread_create
(库函数)创建线程,并且注册线程回调函数IOThreadMain
- 初始化
/* 初始化线程 */
void initThreadedIO(void) {server.io_threads_active = 0; /* 初始化线程活跃状态为0,表示未激活IO多线程 *//* 如果IO线程数为1,直接返回即可 */if (server.io_threads_num == 1) return;/* 如果IO线程数超过了最大限制,打印错误,停止redis服务 */if (server.io_threads_num > IO_THREADS_MAX_NUM) {serverLog(LL_WARNING,"Fatal: too many I/O threads configured. ""The maximum number is %d.", IO_THREADS_MAX_NUM);exit(1);}/* 根据线程数设置创建线程 */for (int i = 0; i < server.io_threads_num; i++) {/* 创建List */io_threads_list[i] = listCreate();if (i == 0) continue; /* 下标为0的存储的是主线程 */pthread_t tid;pthread_mutex_init(&io_threads_mutex[i],NULL);// 初始化待处理的客户端数量为0setIOPendingCount(i, 0);pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */// 创建线程, 并且注册线程回调函数`IOThreadMain`if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");exit(1);}/* 将创建的线程加入io_threads线程组中*/io_threads[i] = tid;}
}// setIOPendingCount在networking.c文件
static inline void setIOPendingCount(int i, unsigned long count) {// 设置io_threads_pending[i]的值为countatomicSetWithSync(io_threads_pending[i], count);
}
IOThreadMain函数,入参是一个线程id,开启死循环,主要逻辑如下:
- 从io_thread_list数组中获取当前线程id要处理的客户端列表,放入列表迭代器中
- 开始迭代(遍历),取出每一个待处理的客户端client,断读写状态 。
- 可写状态,调用writeToClient处理
- 可写装填,调用readQueryFromClient处理
void *IOThreadMain(void *myid) {/* The ID is the thread number (from 0 to server.iothreads_num-1), and is* used by the thread to just manipulate a single sub-array of clients. *///myid是线程ID,从0开始,到到 server.iothreads_num-1,0是主线程long id = (unsigned long)myid;char thdname[16];//.....//循环while(1) {/* Wait for start */for (int j = 0; j < 1000000; j++) {if (getIOPendingCount(id) != 0) break;//跳出来}//....../* Process: note that the main thread will never touch our list* before we drop the pending count to 0. */listIter li;listNode *ln;//获取每一个io线程要处理的客户端,放入迭代器listRewind(io_threads_list[id],&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);if (io_threads_op == IO_THREADS_OP_WRITE) {//做写操作writeToClient(c,0);} else if (io_threads_op == IO_THREADS_OP_READ) {//做读操作。只会读取数据,解析,并不会执行命令(在主线程完成)readQueryFromClient(c->conn);} else {serverPanic("io_threads_op value is unknown");}}listEmpty(io_threads_list[id]);//处理完毕之后,io_threads_pending数组设置为0,表示当前这一次列表中所有的客户端对象已经处理完毕// 注意这个是在while循环里面setIOPendingCount(id, 0);}
}
2.2.2 读写操作
Redis在处理客户端读事件和写事件时会根据一定条件推迟客户端的读取操作或者往客户端写数据操作,将待处理的读客户端和待处理的写客户端分别加入到全局变量server的clients_pending_read
和clients_pending_write
列表中。
2.2.2.1 读操作
readQueryFromClient主要从客户端读取数据。里面调用postponeClientRead
函数判断是否需要推迟客户端的读取操作 ,如果满足条件,会将客户端状态设置为会将客户端标识置为CLIENT_PENDING_READ延迟读状态,并将待读取数据的客户端client加入到server.clients_pending_read中。
2.2.2.2 写操作
writeToClient。经过一些列的判断,将客户端的标识置为延迟写CLIENT_PENDING_WRITE状态,并将客户端加入到待写回的列表server.clients_pending_write中。
2.3 aeMain(server.el)
进入事件循环
void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);}
}====》
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{// 2.3 事件循环处理3:epoll_wait 前进行读写任务队列处理if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)eventLoop->beforesleep(eventLoop);//epoll_wait发现事件并进行处理numevents = aeApiPoll(eventLoop, tvp);// 从已就绪数组中获取事件aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];//如果是读事件,并且有读回调函数//2.1 如果是 listen socket 读事件,则处理新连接请求//2.2 如果是客户连接socket 读事件,处理客户连接上的读请求fe->rfileProc()//如果是写事件,并且有写回调函数fe->wfileProc()......
}
2.3.1 IO线程分配
beforesleep(eventLoop)
。上面我们已经知道了IO线程的初始化、IO线程的运行函数IOThreadMain主要处理逻辑,以及延迟读写的客户端是何时分别加入到server全局变量的clients_pending_read和clients_pending_write中的,接下来去看下时何时为客户端分配线程。
void beforeSleep(struct aeEventLoop *eventLoop) {UNUSED(eventLoop);// 省略...handleBlockedClientsTimeout();/* 调用了handleClientsWithPendingReadsUsingThreads为延迟读客户端分配线程 */handleClientsWithPendingReadsUsingThreads(); // 省略.. /* 调用了handleClientsWithPendingWritesUsingThreads为延迟写客户端分配线程 */handleClientsWithPendingWritesUsingThreads(); // 省略...
}
2.3.1.1 延迟读的客户端分配线程
handleClientsWithPendingReadsUsingThreads()
,主要逻辑如下:
- 从server.cleints_pending_read获取延迟读取的客户端,将其加入到迭代列表
- 遍历延迟操作的客户端列表,获取每一个待处理的客户端,根据取模的方式,将客户端分配到对应线程(io_threads_list[target_id])的列表中。
- 将io_threads_op线程操作状态置为读操作
- 遍历线程数,获取每一个线程要处理的客户端个数,将其设置到线程对应的io_threads_pending[j]中,io_threads_pending数组中记录了每个线程等待处理的客户端个数(得到要处理的客户端总数)
- 获取io_threads_list[0]中待处理的客户端列表,io_threads_list[0]存储的是主线程(当IO线程来使用)的数据,因为当前执行handleClientsWithPendingReadsUsingThreads函数的线程正是主线程,所以让主线程来处理io_threads_list[0]中存放的待处理客户端
- 主线程遍历io_threads_list[0]中每一个待处理的客户端,调用readQueryFromClient处理,从客户端读取数据
- 主线程开启while循环准备执行客户端命令(注意这里才开始执行命令,多线程只负责解析不负责执行)
int handleClientsWithPendingReadsUsingThreads(void) {if (!server.io_threads_active || !server.io_threads_do_reads) return 0;int processed = listLength(server.clients_pending_read);if (processed == 0) return 0;listIter li;listNode *ln;// 获取待读取的客户端列表clients_pending_read加入到迭代链表中listRewind(server.clients_pending_read,&li);int item_id = 0;// 遍历待读取的客户端。将客户端加到指定线程的任务队列里 io_threads_list[target_id]while((ln = listNext(&li))) {// 获取客户端client *c = listNodeValue(ln);// 根据线程数取模,轮询分配线程int target_id = item_id % server.io_threads_num;// 分配线程,加入到线程对应的io_threads_listlistAddNodeTail(io_threads_list[target_id],c);item_id++;}/* 将线程的操作状态置为读操作*/io_threads_op = IO_THREADS_OP_READ;// 遍历线程数。开始workerfor (int j = 1; j < server.io_threads_num; j++) {// 获取每个线程待处理客户端的个数int count = listLength(io_threads_list[j]);// 将待处理客户端的个数设置到线程对应的io_threads_pending[j]中,io_threads_pending数组中记录了每个线程要处理的客户端个数setIOPendingCount(j, count);}/* 获取io_threads_list[0]中待处理的客户端列表,io_threads_list[0]存储的是主线程的数据*//* handleClientsWithPendingReadsUsingThreads函数的执行者刚好就是主线程,所以让主线程处理io_threads_list[0]中的数据*/listRewind(io_threads_list[0],&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);// 调用readQueryFromClientreadQueryFromClient(c->conn);}listEmpty(io_threads_list[0]);/* 等待其他线程处理完毕 */while(1) {unsigned long pending = 0;for (int j = 1; j < server.io_threads_num; j++)// 获取每一个客户端处理的客户端个数pending += getIOPendingCount(j);// 如果为0表示所有线程对应的客户端都处理完毕if (pending == 0) break;}/* 再次判断server.clients_pending_read是否有待处理的客户端*/while(listLength(server.clients_pending_read)) {// 获取列表第一个元素ln = listFirst(server.clients_pending_read);// 获取客户端client *c = listNodeValue(ln);c->flags &= ~CLIENT_PENDING_READ;// 删除节点listDelNode(server.clients_pending_read,ln);serverAssert(!(c->flags & CLIENT_BLOCKED));// processPendingCommandsAndResetClient函数中会判断客户端标识是否是CLIENT_PENDING_COMMAND状态,如果是调用processCommandAndResetClient函数处理请求命令。执行命令,将结果写到缓冲区if (processPendingCommandsAndResetClient(c) == C_ERR) {continue;}// 由于客户端输入缓冲区可能有其他的命令未读取,这里解析命令并执行processInputBuffer(c);if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))clientInstallWriteHandler(c);}/* Update processed count on server */server.stat_io_reads_processed += processed;return processed;
}
2.3.1.2 延迟写的客户端分配线程
handleClientsWithPendingWritesUsingThreads 和上面类似
参考网络IO-事件驱动框架源码分析(多线程)
相关文章:

Redis多线程模型源码解析
1. 配置启用多线程 默认情况下多线程是默认关闭的,如果想要启动多线程,需要在配置文件中做适当的修改。 修改redis.conf 文件如下 io-threads 4 #启用的 io 线程数量 io-threads-do-reads yes #读请求也使用io线程2 源码解析 进入到Redis的main入口函…...

搭建zabbix4.0监控服务实例
一.Zabbix服务介绍 1.1服务介绍 Zabbix是基于WEB界面的分布式系统监控的开源解决方案,Zabbix能够监控各种网络参数,保证服务器系统安全稳定的运行,并提供灵活的通知机制让SA快速定位并解决存在的各种问题。 1.2 Zabbix优点 Zabbix分布式监…...

Xcode 系统崩溃问题01
参考链接:https://www.5axxw.com/questions/content/x2zlpx 问题:崩溃提示: Message from debugger: The LLDB RPC server has crashed. You may need to manually terminate your process. The crash log is located in ~/Library/Logs/Dia…...

SpringMVC文件上传、下载、国际化配置
Java知识点总结:想看的可以从这里进入 目录3.6、文件上传、下载3.6.1、文件上传3.6.2、文件下载3.7、国际化配置3.6、文件上传、下载 3.6.1、文件上传 form 表单想要具有文件上传功能,其必须满足以下 3 个条件。 form 表单的 method 属性必须设置为 p…...

计算机图形学07:有效边表法的多边形扫描转换
作者:非妃是公主 专栏:《计算机图形学》 博客地址:https://blog.csdn.net/myf_666 个性签:顺境不惰,逆境不馁,以心制境,万事可成。——曾国藩 文章目录专栏推荐专栏系列文章序一、算法原理二、…...

UNIX编程--Makefile入门
Makefile 文件命名和规则 文件命名 makefile 或者 Makefile Makefile 规则 一个 Makefile 文件中可以有一个或者多个规则目标 ... : 依赖 ...命令 (shell 命令)...目标:最终要生成的文件,伪目标除外依赖:生成目标所需的文件或是目…...

【数据结构初阶】手撕单链表
目录一.链表概念和结构二.单链表功能的实现1.打印单链表内容2.申请单链表节点3.头插和尾插4.头删和尾删5.单链表查找6.pos位置前后插入7.pos位置删除三.链表面试题剖析一.链表概念和结构 概念:链表是一种物理存储结构上非连续、非顺序的存储结构,数据元素…...

angular中http请求和传值
有关angular传值的相关内容 <number-info[subTitle]"customTitle"[total]"item.ENERGY_RATE %"[subTotal]"item.ENERGY_RATE_DIFF %"[status]"item.ENERGY_RATE_DIFF > 0 ? up : down">在number-info上面,会是一个delon/c…...

VSCode问题记录
20230304 - 0. 引言 这几年的编程方式还真是各种变化,从一开始直接VIM,到后面使用jupyter进行机器学习相关,然后再过渡到vim的形式并加以tmux批量化,最后去年使用了vscode作为IDE。随着工具的变化,那么很多习惯也都随…...

html基础学习
初识HTML HTML: 超文本标记语言 一.HTML的基本结构 根控制标记(头) 头控制标记(头) 标题 标题标记 头控制标记(尾) 网页显示区域(一般要实现的代码都在这里写) </body> 根控制标记(尾) 二.网页的基本标签 标题标签 <h1> 一级标题</h1> <…...

leetcode_贪心算法
贪心算法相关题简单题目455.分发饼干1005.K次取反后最大化的数组和860.柠檬水找零序列问题376.摆动序列法一:贪心法法二:动态规划单调递增的数字简化版本有点难度53.最大子序和贪心算法动态规划134.加油站968.监控二叉树两个维度权衡问题分发糖果406.根据…...

C语言每日一题】——杨氏矩阵
【C语言每日一题】——倒置字符串😎前言🙌杨氏矩阵🙌总结撒花💞😎博客昵称:博客小梦 😊最喜欢的座右铭:全神贯注的上吧!!! 😊作者简介…...

最佳iOS设备管理器imazing 2.16.9官网Mac/Windows下载电脑版怎么下载安装
imazing 2.16.9官网Mac/Windows下载电脑版是款针对苹果设备所打造的管理工具。iMazing为用户提供多种设备管理功能,每一位用户都能以自己的形式管理苹果设备。iMazing与苹果设备连接后,用户就可以轻松传输文件,浏览保存信息等。 应用介绍 iM…...

八大排序算法之堆排序的实现+经典TopK问题
目录 一.堆元素的上下调整接口 1.前言 2.堆元素向上调整算法接口 3.堆元素向下调整算法接口 二.堆排序的实现 1.空间复杂度为O(N)的堆排序(以排升序为例) 思路分析: 代码实现: 排序测试: 时空复杂度分析: 2. 空间复杂度为O(1)的堆排序(以排降序为例) 将数组arr调…...

使用AppSmith(PagePlug )低代码平台快速构建小程序应用实践
文章目录一、入门(一)介绍(二)功能特性(三)体验一下(四)参考教程二、使用Appsmith构建商城微信小程序(一)说明(二)应用配置࿰…...

第52章 短信验证服务和登录的后端定义实现
1 Services.Messages.SmsValidate using Core.Domain.Messages; using Data; using Microsoft.EntityFrameworkCore; namespace Services.Messages { /// <summary> /// 【短信验证服务--类】 /// <remarks> /// 摘要: /// 通过类中的方法成员实…...

谷歌验证码的使用
1. 表单重复提交之验证码 1.1 表单重复提交三种常见情况 提交完表单。服务器使用请求转来进行页面跳转。这个时候,用户按下功能键 F5,就会发起最后一次的请求。造成表单重复提交问题。解决方法:使用重定向来进行跳转用户正常提交服务器&…...

Git学习入门(1)- git的安装与配置
title: git学习(1) - git的安装与配置CSDN: https://blog.csdn.net/jj6666djdbbd?typeblogBlog: https://helloylh.comGithub: https://github.com/luumodtags: gitabbrlink: 12001description: 本文主要讲解了git的安装,配置基本工作date: …...

【Python】使用Playwright断言方法验证网页和Web应用程序状态
作为测试框架,Playwright 提供了一系列断言方法,您可以使用它们来验证网页和 Web 应用程序的状态。在这篇博客中,田辛老师将介绍 Playwright 中可用的各种断言方法,并为每种方法提供示例。 assert page.url() expected_url &…...

libgdx导入blender模型
具体就是参考 官网 https://libgdx.com/wiki/graphics/3d/importing-blender-models-in-libgdx blender 教程可以看八个案例教程带你从0到1入门blender【已完结】 这里贴一下过程图。 1.初始环境搭建略过。 2.打开blender 选中摄像机和灯光,右键进行删除。 3.选中…...

【20230227】回溯算法小结
回溯法又叫回溯搜索法,是搜索的一种方式。回溯法本质是穷举所有可能。如果想让回溯法高效一些,可以加一些剪枝操作。回溯算法解决的经典问题:组合问题切割问题子集问题排列问题棋盘问题如何去理解回溯法?回溯法解决的问题都可以抽…...

centos安装rocketmq
centos安装rocketmq1 下载rocketmq二进制包2 解压二进制包3 修改broker.conf4 修改runbroker.sh和runserver.sh的JVM参数5 启动NameServer和Broker6 安装rockermq dashboard(可视化控制台)1 下载rocketmq二进制包 点击rocketmq二进制包下载地址,下载完成之后通过ft…...

汇编语言程序设计(二)之寄存器
系列文章 汇编语言程序设计(一) 寄存器 在学习汇编的过程中,我们经常需要操作寄存器,那么寄存器又是什么呢?它是用来干什么的? 它有什么分类?又该如何操作?… 你可能会有许多的…...

华为OD机试Golang解题 - 单词接龙 | 独家
华为Od必看系列 华为OD机试 全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典文章目录 华为Od必看系列使用说明本期题目…...

Elasticsearch的搜索命令
Elasticsearch的搜索命令 文章目录Elasticsearch的搜索命令数据准备URI Searchq(查询字符串)analyzer(指定查询字符串时使用的分析器)df(指定查询字段)_source(指定返回文档的字段)s…...

为什么人们宁可用Lombok,也不把成员设为public?
目录专栏导读一、从零了解JavaBean1、基本概念2、JavaBean的特征3、JavaBean的优点二、定义最简单的JavaBean三、思考一个问题,为何属性是private,然后用get/set方法?四、下面系统的分析以下,why?五、不和谐的声音,禁…...

【Redis】Redis 如何实现分布式锁
Redis 如何实现分布式锁1. 什么是分布式锁1.1 分布式锁的特点1.2 分布式锁的场景1.3 分布式锁的实现方式2. Redis 实现分布式锁2.1 setnx expire2.2 set ex px nx2.3 set ex px nx 校验唯一随机值,再删除2.4 Redisson 实现分布式锁1. 什么是分布式锁 分布式锁其实…...

C++ 断言
文章目录前言assertstatic_assert前言 断言(Assertion)是一种常用的编程手段,用于排除程序中不应该出现的逻辑错误。它是一种很好的Debug工具。其作用是判断表达式是否为真。C提供了assert和static_assert来进行断言。在C库中也有断言,其中断言与C的相同…...

C++修炼之练气期第五层——引用
目录 1.引用的概念 2.引用的性质 3.常量引用 4.使用场景 1.作参数 2.作返回值 5.传值与传引用的效率比较 6.值和引用作为返回值的性能比较 7.引用与指针 指针与引用的不同点 要说C语言中哪个知识点最难学难懂,大部分人可能和我一样的答案——指针。C既然…...

从企业数字化发展的四个阶段,看数字化创新战略
《Edge: Value-Driven Digital Transformation》一书根据信息技术与企业业务发展的关系把企业的数字化分为了四个阶段: 技术与业务无关技术作为服务提供者开始合作科技引领差异化优势以技术为业务核心 下图展示了这四个阶段的特点: 通过了解和分析各个…...