当前位置: 首页 > news >正文

Redis多线程模型源码解析

1. 配置启用多线程

默认情况下多线程是默认关闭的,如果想要启动多线程,需要在配置文件中做适当的修改。
修改redis.conf 文件如下

io-threads 4 #启用的 io 线程数量
io-threads-do-reads yes #读请求也使用io线程

2 源码解析

进入到Redis的main入口函数

int main(int argc, char **argv) {// ...// 初始化服务initServer();// ...// InitServerLast-》启动 io 线程InitServerLast();// ...// 事件循环aeMain(server.el);// ...
}

2.1 对initServer()源码解析

其中 initServer()主要做如下几件事

  • 初始化读任务队列、写任务队列(存放client对象)
  • 创建一个epoll对象
  • 对配置的监听端口进行监听
  • 把监听到的socket连接让epoll管理起来
//server.c
void initServer(void) {// 1 初始化 server 对象//1.1 将来主线程的任务都会放到这两个队列中server.clients_pending_write = listCreate();server.clients_pending_read = listCreate();......// 2 初始化回调 events,创建 epoll(通过epoll_create函数创建)server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);// 3 绑定监听服务端口。(监听连接请求)listenToPort(server.port,server.ipfd,&server.ipfd_count);// 4 注册accept事件到epoll(通过epoll_ctl函数)for (j = 0; j < server.ipfd_count; j++) {aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL);}...
}

2.2 对InitServerLast()源码解析

该函数在server.c文件中,它调用了initThreadedIO函数来对IO线程初始化

void InitServerLast() {bioInit();initThreadedIO();//初始化IO线程set_jemalloc_bg_thread(server.jemalloc_bg_thread);server.initial_memory_usage = zmalloc_used_memory();
}

initThreadedIO()networking.c文件中:

  1. 初始化全局变量 server.io_threads_active线程活跃状态为0,表示未激活IO多线程
  2. server.io_threads_num(在配置文件中是否修改,标识是否启用)的值进行判断,io_threads_num表示设置的IO线程数量
    如果线程数设置为1,表示不开启多线程直接返回即可;如果线程数超过了IO_THREADS_MAX_NUM设置的最大值(128),则报错并停止redis服务。
  3. 根据线程数的设置创建线程
    • 初始化io_thread_list[i]io_threads_list是一个数组,数组中的每一个元素都是一个list,里面存储每个线程要处理的客户端列表,下标是0的即io_threads_lis[0]存储的是主线程要处理的客户端列表
    • 初始化io_threads_pending[i]为0,io_threads_pending数组存储每个线程等待处理的客户端个数
    • 调用pthread_create(库函数)创建线程,并且注册线程回调函数IOThreadMain
/* 初始化线程 */
void initThreadedIO(void) {server.io_threads_active = 0; /* 初始化线程活跃状态为0,表示未激活IO多线程 *//* 如果IO线程数为1,直接返回即可 */if (server.io_threads_num == 1) return;/* 如果IO线程数超过了最大限制,打印错误,停止redis服务 */if (server.io_threads_num > IO_THREADS_MAX_NUM) {serverLog(LL_WARNING,"Fatal: too many I/O threads configured. ""The maximum number is %d.", IO_THREADS_MAX_NUM);exit(1);}/* 根据线程数设置创建线程 */for (int i = 0; i < server.io_threads_num; i++) {/* 创建List */io_threads_list[i] = listCreate();if (i == 0) continue; /* 下标为0的存储的是主线程 */pthread_t tid;pthread_mutex_init(&io_threads_mutex[i],NULL);// 初始化待处理的客户端数量为0setIOPendingCount(i, 0);pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */// 创建线程, 并且注册线程回调函数`IOThreadMain`if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");exit(1);}/* 将创建的线程加入io_threads线程组中*/io_threads[i] = tid;}
}// setIOPendingCount在networking.c文件
static inline void setIOPendingCount(int i, unsigned long count) {// 设置io_threads_pending[i]的值为countatomicSetWithSync(io_threads_pending[i], count);
}

IOThreadMain函数,入参是一个线程id,开启死循环,主要逻辑如下:

  1. 从io_thread_list数组中获取当前线程id要处理的客户端列表,放入列表迭代器中
  2. 开始迭代(遍历),取出每一个待处理的客户端client,断读写状态 。
    • 可写状态,调用writeToClient处理
    • 可写装填,调用readQueryFromClient处理
void *IOThreadMain(void *myid) {/* The ID is the thread number (from 0 to server.iothreads_num-1), and is* used by the thread to just manipulate a single sub-array of clients. *///myid是线程ID,从0开始,到到 server.iothreads_num-1,0是主线程long id = (unsigned long)myid;char thdname[16];//.....//循环while(1) {/* Wait for start */for (int j = 0; j < 1000000; j++) {if (getIOPendingCount(id) != 0) break;//跳出来}//....../* Process: note that the main thread will never touch our list* before we drop the pending count to 0. */listIter li;listNode *ln;//获取每一个io线程要处理的客户端,放入迭代器listRewind(io_threads_list[id],&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);if (io_threads_op == IO_THREADS_OP_WRITE) {//做写操作writeToClient(c,0);} else if (io_threads_op == IO_THREADS_OP_READ) {//做读操作。只会读取数据,解析,并不会执行命令(在主线程完成)readQueryFromClient(c->conn);} else {serverPanic("io_threads_op value is unknown");}}listEmpty(io_threads_list[id]);//处理完毕之后,io_threads_pending数组设置为0,表示当前这一次列表中所有的客户端对象已经处理完毕// 注意这个是在while循环里面setIOPendingCount(id, 0);}
}

2.2.2 读写操作

Redis在处理客户端读事件和写事件时会根据一定条件推迟客户端的读取操作或者往客户端写数据操作,将待处理的读客户端和待处理的写客户端分别加入到全局变量server的clients_pending_readclients_pending_write列表中。

2.2.2.1 读操作

readQueryFromClient主要从客户端读取数据。里面调用postponeClientRead函数判断是否需要推迟客户端的读取操作 ,如果满足条件,会将客户端状态设置为会将客户端标识置为CLIENT_PENDING_READ延迟读状态,并将待读取数据的客户端client加入到server.clients_pending_read中

2.2.2.2 写操作

writeToClient。经过一些列的判断,将客户端的标识置为延迟写CLIENT_PENDING_WRITE状态,并将客户端加入到待写回的列表server.clients_pending_write中。

2.3 aeMain(server.el)

进入事件循环

void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);}
}====int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{// 2.3 事件循环处理3:epoll_wait 前进行读写任务队列处理if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)eventLoop->beforesleep(eventLoop);//epoll_wait发现事件并进行处理numevents = aeApiPoll(eventLoop, tvp);// 从已就绪数组中获取事件aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];//如果是读事件,并且有读回调函数//2.1 如果是 listen socket 读事件,则处理新连接请求//2.2 如果是客户连接socket 读事件,处理客户连接上的读请求fe->rfileProc()//如果是写事件,并且有写回调函数fe->wfileProc()......
}

2.3.1 IO线程分配

beforesleep(eventLoop)。上面我们已经知道了IO线程的初始化、IO线程的运行函数IOThreadMain主要处理逻辑,以及延迟读写的客户端是何时分别加入到server全局变量的clients_pending_read和clients_pending_write中的,接下来去看下时何时为客户端分配线程。

void beforeSleep(struct aeEventLoop *eventLoop) {UNUSED(eventLoop);// 省略...handleBlockedClientsTimeout();/* 调用了handleClientsWithPendingReadsUsingThreads为延迟读客户端分配线程 */handleClientsWithPendingReadsUsingThreads(); // 省略..   /* 调用了handleClientsWithPendingWritesUsingThreads为延迟写客户端分配线程 */handleClientsWithPendingWritesUsingThreads();   // 省略...
}
2.3.1.1 延迟读的客户端分配线程

handleClientsWithPendingReadsUsingThreads(),主要逻辑如下:

  1. 从server.cleints_pending_read获取延迟读取的客户端,将其加入到迭代列表
  2. 遍历延迟操作的客户端列表,获取每一个待处理的客户端,根据取模的方式,将客户端分配到对应线程(io_threads_list[target_id])的列表中
  3. 将io_threads_op线程操作状态置为读操作
  4. 遍历线程数,获取每一个线程要处理的客户端个数,将其设置到线程对应的io_threads_pending[j]中,io_threads_pending数组中记录了每个线程等待处理的客户端个数得到要处理的客户端总数
  5. 获取io_threads_list[0]中待处理的客户端列表,io_threads_list[0]存储的是主线程(当IO线程来使用)的数据,因为当前执行handleClientsWithPendingReadsUsingThreads函数的线程正是主线程,所以让主线程来处理io_threads_list[0]中存放的待处理客户端
  6. 主线程遍历io_threads_list[0]中每一个待处理的客户端,调用readQueryFromClient处理,从客户端读取数据
  7. 主线程开启while循环准备执行客户端命令(注意这里才开始执行命令,多线程只负责解析不负责执行
int handleClientsWithPendingReadsUsingThreads(void) {if (!server.io_threads_active || !server.io_threads_do_reads) return 0;int processed = listLength(server.clients_pending_read);if (processed == 0) return 0;listIter li;listNode *ln;// 获取待读取的客户端列表clients_pending_read加入到迭代链表中listRewind(server.clients_pending_read,&li);int item_id = 0;// 遍历待读取的客户端。将客户端加到指定线程的任务队列里 io_threads_list[target_id]while((ln = listNext(&li))) {// 获取客户端client *c = listNodeValue(ln);// 根据线程数取模,轮询分配线程int target_id = item_id % server.io_threads_num;// 分配线程,加入到线程对应的io_threads_listlistAddNodeTail(io_threads_list[target_id],c);item_id++;}/* 将线程的操作状态置为读操作*/io_threads_op = IO_THREADS_OP_READ;// 遍历线程数。开始workerfor (int j = 1; j < server.io_threads_num; j++) {// 获取每个线程待处理客户端的个数int count = listLength(io_threads_list[j]);// 将待处理客户端的个数设置到线程对应的io_threads_pending[j]中,io_threads_pending数组中记录了每个线程要处理的客户端个数setIOPendingCount(j, count);}/* 获取io_threads_list[0]中待处理的客户端列表,io_threads_list[0]存储的是主线程的数据*//* handleClientsWithPendingReadsUsingThreads函数的执行者刚好就是主线程,所以让主线程处理io_threads_list[0]中的数据*/listRewind(io_threads_list[0],&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);// 调用readQueryFromClientreadQueryFromClient(c->conn);}listEmpty(io_threads_list[0]);/* 等待其他线程处理完毕 */while(1) {unsigned long pending = 0;for (int j = 1; j < server.io_threads_num; j++)// 获取每一个客户端处理的客户端个数pending += getIOPendingCount(j);// 如果为0表示所有线程对应的客户端都处理完毕if (pending == 0) break;}/* 再次判断server.clients_pending_read是否有待处理的客户端*/while(listLength(server.clients_pending_read)) {// 获取列表第一个元素ln = listFirst(server.clients_pending_read);// 获取客户端client *c = listNodeValue(ln);c->flags &= ~CLIENT_PENDING_READ;// 删除节点listDelNode(server.clients_pending_read,ln);serverAssert(!(c->flags & CLIENT_BLOCKED));// processPendingCommandsAndResetClient函数中会判断客户端标识是否是CLIENT_PENDING_COMMAND状态,如果是调用processCommandAndResetClient函数处理请求命令。执行命令,将结果写到缓冲区if (processPendingCommandsAndResetClient(c) == C_ERR) {continue;}// 由于客户端输入缓冲区可能有其他的命令未读取,这里解析命令并执行processInputBuffer(c);if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))clientInstallWriteHandler(c);}/* Update processed count on server */server.stat_io_reads_processed += processed;return processed;
}
2.3.1.2 延迟写的客户端分配线程

handleClientsWithPendingWritesUsingThreads 和上面类似

参考网络IO-事件驱动框架源码分析(多线程)

相关文章:

Redis多线程模型源码解析

1. 配置启用多线程 默认情况下多线程是默认关闭的&#xff0c;如果想要启动多线程&#xff0c;需要在配置文件中做适当的修改。 修改redis.conf 文件如下 io-threads 4 #启用的 io 线程数量 io-threads-do-reads yes #读请求也使用io线程2 源码解析 进入到Redis的main入口函…...

搭建zabbix4.0监控服务实例

一.Zabbix服务介绍 1.1服务介绍 Zabbix是基于WEB界面的分布式系统监控的开源解决方案&#xff0c;Zabbix能够监控各种网络参数&#xff0c;保证服务器系统安全稳定的运行&#xff0c;并提供灵活的通知机制让SA快速定位并解决存在的各种问题。 1.2 Zabbix优点 Zabbix分布式监…...

Xcode 系统崩溃问题01

参考链接&#xff1a;https://www.5axxw.com/questions/content/x2zlpx 问题&#xff1a;崩溃提示&#xff1a; Message from debugger: The LLDB RPC server has crashed. You may need to manually terminate your process. The crash log is located in ~/Library/Logs/Dia…...

SpringMVC文件上传、下载、国际化配置

Java知识点总结&#xff1a;想看的可以从这里进入 目录3.6、文件上传、下载3.6.1、文件上传3.6.2、文件下载3.7、国际化配置3.6、文件上传、下载 3.6.1、文件上传 form 表单想要具有文件上传功能&#xff0c;其必须满足以下 3 个条件。 form 表单的 method 属性必须设置为 p…...

计算机图形学07:有效边表法的多边形扫描转换

作者&#xff1a;非妃是公主 专栏&#xff1a;《计算机图形学》 博客地址&#xff1a;https://blog.csdn.net/myf_666 个性签&#xff1a;顺境不惰&#xff0c;逆境不馁&#xff0c;以心制境&#xff0c;万事可成。——曾国藩 文章目录专栏推荐专栏系列文章序一、算法原理二、…...

UNIX编程--Makefile入门

Makefile 文件命名和规则 文件命名 makefile 或者 Makefile Makefile 规则 一个 Makefile 文件中可以有一个或者多个规则目标 ... &#xff1a; 依赖 ...命令 (shell 命令)...目标&#xff1a;最终要生成的文件&#xff0c;伪目标除外依赖&#xff1a;生成目标所需的文件或是目…...

【数据结构初阶】手撕单链表

目录一.链表概念和结构二.单链表功能的实现1.打印单链表内容2.申请单链表节点3.头插和尾插4.头删和尾删5.单链表查找6.pos位置前后插入7.pos位置删除三.链表面试题剖析一.链表概念和结构 概念&#xff1a;链表是一种物理存储结构上非连续、非顺序的存储结构&#xff0c;数据元素…...

angular中http请求和传值

有关angular传值的相关内容 <number-info[subTitle]"customTitle"[total]"item.ENERGY_RATE %"[subTotal]"item.ENERGY_RATE_DIFF %"[status]"item.ENERGY_RATE_DIFF > 0 ? up : down">在number-info上面,会是一个delon/c…...

VSCode问题记录

20230304 - 0. 引言 这几年的编程方式还真是各种变化&#xff0c;从一开始直接VIM&#xff0c;到后面使用jupyter进行机器学习相关&#xff0c;然后再过渡到vim的形式并加以tmux批量化&#xff0c;最后去年使用了vscode作为IDE。随着工具的变化&#xff0c;那么很多习惯也都随…...

html基础学习

初识HTML HTML: 超文本标记语言 一.HTML的基本结构 根控制标记(头) ​ 头控制标记(头) ​ 标题 标题标记 ​ 头控制标记(尾) ​ 网页显示区域(一般要实现的代码都在这里写) </body> 根控制标记(尾) 二.网页的基本标签 标题标签 <h1> 一级标题</h1> <…...

leetcode_贪心算法

贪心算法相关题简单题目455.分发饼干1005.K次取反后最大化的数组和860.柠檬水找零序列问题376.摆动序列法一&#xff1a;贪心法法二&#xff1a;动态规划单调递增的数字简化版本有点难度53.最大子序和贪心算法动态规划134.加油站968.监控二叉树两个维度权衡问题分发糖果406.根据…...

C语言每日一题】——杨氏矩阵

【C语言每日一题】——倒置字符串&#x1f60e;前言&#x1f64c;杨氏矩阵&#x1f64c;总结撒花&#x1f49e;&#x1f60e;博客昵称&#xff1a;博客小梦 &#x1f60a;最喜欢的座右铭&#xff1a;全神贯注的上吧&#xff01;&#xff01;&#xff01; &#x1f60a;作者简介…...

最佳iOS设备管理器imazing 2.16.9官网Mac/Windows下载电脑版怎么下载安装

imazing 2.16.9官网Mac/Windows下载电脑版是款针对苹果设备所打造的管理工具。iMazing为用户提供多种设备管理功能&#xff0c;每一位用户都能以自己的形式管理苹果设备。iMazing与苹果设备连接后&#xff0c;用户就可以轻松传输文件&#xff0c;浏览保存信息等。 应用介绍 iM…...

八大排序算法之堆排序的实现+经典TopK问题

目录 一.堆元素的上下调整接口 1.前言 2.堆元素向上调整算法接口 3.堆元素向下调整算法接口 二.堆排序的实现 1.空间复杂度为O(N)的堆排序(以排升序为例) 思路分析: 代码实现: 排序测试: ​时空复杂度分析: 2. 空间复杂度为O(1)的堆排序(以排降序为例) 将数组arr调…...

使用AppSmith(PagePlug )低代码平台快速构建小程序应用实践

文章目录一、入门&#xff08;一&#xff09;介绍&#xff08;二&#xff09;功能特性&#xff08;三&#xff09;体验一下&#xff08;四&#xff09;参考教程二、使用Appsmith构建商城微信小程序&#xff08;一&#xff09;说明&#xff08;二&#xff09;应用配置&#xff0…...

第52章 短信验证服务和登录的后端定义实现

1 Services.Messages.SmsValidate using Core.Domain.Messages; using Data; using Microsoft.EntityFrameworkCore; namespace Services.Messages { /// <summary> /// 【短信验证服务--类】 /// <remarks> /// 摘要&#xff1a; /// 通过类中的方法成员实…...

谷歌验证码的使用

1. 表单重复提交之验证码 1.1 表单重复提交三种常见情况 提交完表单。服务器使用请求转来进行页面跳转。这个时候&#xff0c;用户按下功能键 F5&#xff0c;就会发起最后一次的请求。造成表单重复提交问题。解决方法&#xff1a;使用重定向来进行跳转用户正常提交服务器&…...

Git学习入门(1)- git的安装与配置

title: git学习&#xff08;1&#xff09; - git的安装与配置CSDN: https://blog.csdn.net/jj6666djdbbd?typeblogBlog: https://helloylh.comGithub: https://github.com/luumodtags: gitabbrlink: 12001description: 本文主要讲解了git的安装&#xff0c;配置基本工作date: …...

【Python】使用Playwright断言方法验证网页和Web应用程序状态

作为测试框架&#xff0c;Playwright 提供了一系列断言方法&#xff0c;您可以使用它们来验证网页和 Web 应用程序的状态。在这篇博客中&#xff0c;田辛老师将介绍 Playwright 中可用的各种断言方法&#xff0c;并为每种方法提供示例。 assert page.url() expected_url &…...

libgdx导入blender模型

具体就是参考 官网 https://libgdx.com/wiki/graphics/3d/importing-blender-models-in-libgdx blender 教程可以看八个案例教程带你从0到1入门blender【已完结】 这里贴一下过程图。 1.初始环境搭建略过。 2.打开blender 选中摄像机和灯光&#xff0c;右键进行删除。 3.选中…...

Python爬虫实战:研究MechanicalSoup库相关技术

一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...

XCTF-web-easyupload

试了试php&#xff0c;php7&#xff0c;pht&#xff0c;phtml等&#xff0c;都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接&#xff0c;得到flag...

在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能

下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能&#xff0c;包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...

UE5 学习系列(三)创建和移动物体

这篇博客是该系列的第三篇&#xff0c;是在之前两篇博客的基础上展开&#xff0c;主要介绍如何在操作界面中创建和拖动物体&#xff0c;这篇博客跟随的视频链接如下&#xff1a; B 站视频&#xff1a;s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...

鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南

1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发&#xff0c;使用DevEco Studio作为开发工具&#xff0c;采用Java语言实现&#xff0c;包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...

重启Eureka集群中的节点,对已经注册的服务有什么影响

先看答案&#xff0c;如果正确地操作&#xff0c;重启Eureka集群中的节点&#xff0c;对已经注册的服务影响非常小&#xff0c;甚至可以做到无感知。 但如果操作不当&#xff0c;可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)

引言 在人工智能飞速发展的今天&#xff0c;大语言模型&#xff08;Large Language Models, LLMs&#xff09;已成为技术领域的焦点。从智能写作到代码生成&#xff0c;LLM 的应用场景不断扩展&#xff0c;深刻改变了我们的工作和生活方式。然而&#xff0c;理解这些模型的内部…...

rknn toolkit2搭建和推理

安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 &#xff0c;不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源&#xff08;最常用&#xff09; conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...

人工智能 - 在Dify、Coze、n8n、FastGPT和RAGFlow之间做出技术选型

在Dify、Coze、n8n、FastGPT和RAGFlow之间做出技术选型。这些平台各有侧重&#xff0c;适用场景差异显著。下面我将从核心功能定位、典型应用场景、真实体验痛点、选型决策关键点进行拆解&#xff0c;并提供具体场景下的推荐方案。 一、核心功能定位速览 平台核心定位技术栈亮…...

webpack面试题

面试题&#xff1a;webpack介绍和简单使用 一、webpack&#xff08;模块化打包工具&#xff09;1. webpack是把项目当作一个整体&#xff0c;通过给定的一个主文件&#xff0c;webpack将从这个主文件开始找到你项目当中的所有依赖文件&#xff0c;使用loaders来处理它们&#x…...