Postgresql源码(122)Listen / Notify与事务的联动机制
前言
Notify和Listen是Postgresql提供的不同会话间异步消息通信功能,例子:
LISTEN virtual;
NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448.
NOTIFY virtual, 'This is the payload';
Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 8448.LISTEN foo;
SELECT pg_notify('fo' || 'o', 'pay' || 'load');
Asynchronous notification "foo" with payload "payload" received from server process with PID 14728.
功能使用PG的基础设施shm_mq + 信号机制拼装实现。
监听、通知的行为也兼容了数据库的事务的功能,事务回滚会删除监听、事务提交会触发通知。
本文对异步消息队列与事务的联动机制做一些分析。
事务提交触发
NOTIFY的功能必须等到事务提交才会触发:
postgres=# listen a1;
LISTEN
postgres=# begin;
BEGIN
postgres=*# notify a1;
NOTIFY
postgres=*# notify a1;
NOTIFY
postgres=*# commit;
COMMIT
Asynchronous notification "a1" received from server process with PID 17111.
流程比较简单,先从pendingActions中注册监听。再发信号触发异步notify。
void
AtCommit_Notify(void)
{...if (pendingActions != NULL){foreach(p, pendingActions->actions){ListenAction *actrec = (ListenAction *) lfirst(p);switch (actrec->action){case LISTEN_LISTEN:Exec_ListenCommit(actrec->channel);break;case LISTEN_UNLISTEN:Exec_UnlistenCommit(actrec->channel);break;case LISTEN_UNLISTEN_ALL:Exec_UnlistenAllCommit();break;}}}...if (pendingNotifies != NULL)SignalBackends();...
}
事务回滚清理
回滚后监听和通知都会清理:
postgres=# begin;
BEGIN
postgres=*# listen k123;
LISTEN
postgres=*# notify k123;
NOTIFY
postgres=*# abort;
ROLLBACK
postgres=# notify k123;
NOTIFY
postgres=#
事务回滚时执行清理动作:
void
AtAbort_Notify(void)
{if (amRegisteredListener && listenChannels == NIL)asyncQueueUnregister();pendingActions = NULL;pendingNotifies = NULL;
}
全部清理干净。
子事务提交不触发,交接给上一层事务
提交的子事务将notify交接给上一层事务。
postgres=# listen k000;
LISTEN
postgres=# begin;
BEGIN
postgres=*# savepoint sp1;
SAVEPOINT
postgres=*# savepoint sp2;
SAVEPOINT
postgres=*# notify k000;
NOTIFY
postgres=*# release sp2;
RELEASE
postgres=*# commit;
COMMIT
Asynchronous notification "k000" received from server process with PID 18902.
实现:
void
AtSubCommit_Notify(void)
{int my_level = GetCurrentTransactionNestLevel();if (pendingActions != NULL &&pendingActions->nestingLevel >= my_level){if (pendingActions->upper == NULL ||pendingActions->upper->nestingLevel < my_level - 1){--pendingActions->nestingLevel;}else{ActionList *childPendingActions = pendingActions;pendingActions = pendingActions->upper;pendingActions->actions =list_concat(pendingActions->actions,childPendingActions->actions);pfree(childPendingActions);}}if (pendingNotifies != NULL &&pendingNotifies->nestingLevel >= my_level){Assert(pendingNotifies->nestingLevel == my_level);if (pendingNotifies->upper == NULL ||pendingNotifies->upper->nestingLevel < my_level - 1){--pendingNotifies->nestingLevel;}else{NotificationList *childPendingNotifies = pendingNotifies;ListCell *l;pendingNotifies = pendingNotifies->upper;foreach(l, childPendingNotifies->events){Notification *childn = (Notification *) lfirst(l);if (!AsyncExistsPendingNotify(childn))AddEventToPendingNotifies(childn);}pfree(childPendingNotifies);}}
}
-
pendingActions:用于保存channel信息(LISTEN命令使用,Async_Listen中配置)
-
pendingNotifies:用于保存channel和payload信息(NOTIFY命令使用,Async_Notify中配置)
子事务提交时,notify并不会真正触发,也是和其他资源一样,将自己绑定的nestingLevel转移到上一层(注意这里是绑的nestingLevel不是xid比较合理)。
整体上会有两种情况:
情况一:子事务有间隔,走这个分支pendingActions->upper->nestingLevel < my_level - 1
begin;
savepoint sp1;
notify ch123;
savepoint sp2;
savepoint sp3;
notify ch789;
release sp3;
情况二:子事务无间隔,走else分支
begin;
savepoint sp1;
notify ch123;
savepoint sp2;
notify ch456;
savepoint sp3;
notify ch789;
release sp3;
pendingActions和pendingNotifies都有自己的upper指针形成链式结构,两种数据结构在子事务提交时的行为都是将信息转移到上一层中,区别是pendingActions直接挂到上一层的actions链表;pendingNotifies调用AddEventToPendingNotifies接口完成同样的动作。
子事务回滚不触发,清理属于子事务的pendings
回滚的子事务会删除监听。
postgres=# begin;
BEGIN
postgres=*# savepoint sp1;
SAVEPOINT
postgres=*# listen k123;
LISTEN
postgres=*# savepoint sp2;
SAVEPOINT
postgres=*# listen k000;
LISTEN
postgres=*# rollback to sp2;
ROLLBACK
postgres=*# notify k123;
NOTIFY
postgres=*# notify k000;
NOTIFY
postgres=*# commit;
COMMIT
Asynchronous notification "k123" received from server process with PID 18098.
postgres=#
void
AtSubAbort_Notify(void)
{int my_level = GetCurrentTransactionNestLevel();...while (pendingActions != NULL &&pendingActions->nestingLevel >= my_level){ActionList *childPendingActions = pendingActions;pendingActions = pendingActions->upper;pfree(childPendingActions);}while (pendingNotifies != NULL &&pendingNotifies->nestingLevel >= my_level){NotificationList *childPendingNotifies = pendingNotifies;pendingNotifies = pendingNotifies->upper;pfree(childPendingNotifies);}
}
子事务回滚的话,全部是直接删除,不在做向上归属。
Listen/Notify的实现原理
(This content is a summary derived from code comments.)
- 同一台机器上有多个后端进程。多个后端进程监听多个通道。(在代码的其他部分,通道也被称为“conditions”。)
- 在基于磁盘的存储中有一个中央队列(目录 pg_notify/),通过 slru.c 模块将活跃使用的页面映射到共享内存中。所有的通知消息都被放置在队列中,稍后由监听的后端进程读取。没有集中的信息知道哪个后端进程监听哪个通道;每个后端进程都有自己感兴趣的通道列表。虽然只有一个队列,但通知被视为数据库本地的;这是通过在每个通知消息中包含发送者的数据库 OID 来实现的。监听的后端进程会忽略不匹配其数据库 OID 的消息。这一点很重要,因为它确保了发送者和接收者有相同的数据库编码,不会错误解释通道名称或有效载荷字符串中的非 ASCII 文本。由于通知不期望在数据库崩溃后存活,我们可以在任何重启时简单地清除 pg_notify 数据,并且不需要 WAL 支持或 fsync。
- 每个至少监听一个频道的后端进程都会通过将其进程ID注册到AsyncQueueControl的数组中来进行注册。然后,它会扫描中央队列中的所有传入通知,首先将通知的数据库OID与自身的数据库OID进行比较,然后将通知的频道与其监听的频道列表进行比较。如果匹配成功,它会将通知事件传递给前端。不匹配的事件将被简单地跳过。
- NOTIFY语句(Async_Notify例程)将通知存储在后端本地列表中,直到事务结束才会处理。来自同一事务的重复通知只发送一次通知。这样做是为了节省工作量,例如,当触发器在一个200万行的表上触发时,会为每一行的更改发送一个通知。如果应用程序需要接收每个已发送的单个通知,可以在额外的有效负载参数中轻松添加一些唯一的字符串。当事务准备提交时,PreCommit_Notify()将待处理的通知添加到队列的头部。队列的头指针始终指向下一个空闲位置,而位置只是一个页号和该页中的偏移量。这是在将事务标记为已提交之前完成的。如果在写入通知时遇到问题,我们仍然可以调用elog(ERROR, …),事务将回滚。一旦我们将所有通知放入队列中,我们将返回到CommitTransaction(),然后执行实际的事务提交。在提交后,我们会再次被调用(AtCommit_Notify())。在这里,我们对有效的监听状态(listenChannels)进行任何实际的更新。然后,我们向可能对我们的消息感兴趣的后端进程发送信号(包括我们自己的后端进程,如果正在监听)。这是通过SignalBackends()完成的,它会扫描监听后端进程的列表,并向每个监听后端进程发送一个PROCSIG_NOTIFY_INTERRUPT信号(我们不知道哪个后端进程在监听哪个频道,因此必须向它们全部发送信号)。但是,我们可以排除那些已经是最新状态的后端进程,并且还可以排除其他数据库中的后端进程(除非它们远远落后,应该被踢出以使其前进指针)。最后,在完全退出事务并即将进入空闲状态时,我们会扫描队列中需要发送到前端的消息(可能是来自其他后端的通知,或者是自己发送的通知)。这一步骤不是CommitTransaction序列的一部分,有两个重要原因。首先,我们在向前端发送数据时可能会出现错误,而在事务提交后进行清理时出现错误是非常糟糕的。其次,在某些情况下,一个过程在单个前端命令中发出多个提交,我们不希望在命令完成之前向前端发送通知;但是对于其他后端来说,每次提交后的通知应该立即发送出去。
- 收到PROCSIG_NOTIFY_INTERRUPT信号后,信号处理程序会设置进程的latch,如果该后端处于空闲状态(即等待前端命令并且不在事务块内),则会立即触发事件处理(参见ProcessClientReadInterrupt())。否则,处理程序可能只设置一个标志,在下次进入空闲状态之前进行处理。入站通知处理包括读取自上次扫描以来到达的所有通知。我们读取每个通知,直到达到未提交事务的通知或者头指针的位置。
- 为了限制磁盘空间的消耗,需要推进尾指针,以便可以截断旧的页面。这是相对昂贵的操作(特别是,它需要一个独占锁),因此我们不希望经常执行。如果发送后端将队列头推进到新页面,则会执行此操作,但每QUEUE_CLEANUP_DELAY页只执行一次。
一个在相同频道上监听的应用程序将会收到自己发送的NOTIFY消息。如果这些消息对应用程序没有用处,可以通过将NOTIFY消息中的be_pid与应用程序自身后端的PID进行比较来忽略它们。(从FE/BE协议2.0开始,在启动期间,后端的PID会提供给前端。)上述设计确保通过忽略自我通知,不会错过来自其他后端的通知。用于通知管理的共享内存使用量(NUM_NOTIFY_BUFFERS)可以根据需要进行调整,而不会影响除性能之外的任何内容。可以同时排队的通知数据的最大量由max_notify_queue_pages GUC确定。
相关文章:
Postgresql源码(122)Listen / Notify与事务的联动机制
前言 Notify和Listen是Postgresql提供的不同会话间异步消息通信功能,例子: LISTEN virtual; NOTIFY virtual; Asynchronous notification "virtual" received from server process with PID 8448. NOTIFY virtual, This is the payload; Asy…...
QT 数据库的增加操作和画图 Win
第一步、先配置CMakeLists.txt 在CMakeLists.txt中添加 find_package(Qt6 REQUIRED COMPONENTS Sql) find_package(Qt6 REQUIRED COMPONENTS Charts)target_link_libraries(${PROJECT_NAME} PRIVATE Qt6::Sql) target_link_libraries(${PROJECT_NAME} PRIVATE Qt6::Charts)避…...
【JS逆向学习】同花顺(q.10jqka)补环境
逆向目标 目标网址:https://q.10jqka.com.cn/ 目标接口: https://q.10jqka.com.cn/index/index/board/all/field/zdf/order/desc/page/3/ajax/1/ 目标参数:cookie 逆向过程 老规矩,先分析网络请求,发现是 cookie 加…...
解决MobaXterm网络错误连接超时问题
报错页面: 报错原因: ①网络断开了 ②网络端口,端口号改变 解决办法: ①重新连接网络按R ②固定端口号 第一步:编辑------>虚拟机网络编辑器(我的Linux在虚拟机里) 第二步:用…...
突发!AI独角兽「竹间智能」被曝停工停产6个月
大家好我是二狗。 今天早上起来刷朋友圈,看到一张截图——AI创企竹间智能,宣称因为公司所处的经营环境艰难,部分部门和岗位将从即日起停工停产6个月。 图源:(企服科学) 下面是文字版: 由于公司…...
Qt应用软件【协议篇】GPIO控制LED灯
GPIO简介 GPIO(General Purpose Input/Output,通用输入输出)是一种通用的端口定义,在各种计算机、嵌入式系统和微控制器中广泛应用。通过GPIO,计算机或微控制器可以与外部世界进行交互,例如读取传感器数据或控制外部设备(如LED灯、电机等)。 GPIO的应用场景 按钮和开…...
vulfocus靶场搭建
vulfocus靶场搭建 什么是vulfocus搭建教程靶场配置场景靶场编排靶场优化 什么是vulfocus Vulfocus 是一个漏洞集成平台,将漏洞环境 docker 镜像,放入即可使用,开箱即用,我们可以通过搭建该靶场,简单方便地复现一些框架…...
Swift基础知识:30.Swift访问控制
在 Swift 中,访问控制(Access Control)是一种用于限制代码模块对其他代码模块的访问权限的机制。通过访问控制,可以控制代码中各个部分的可见性和可访问性,以便于提高代码的安全性、可维护性和可复用性。 访问级别 S…...
ElasticSearch聚合操作
目录 ElasticSearch聚合操作 基本语法 聚合的分类 后续示例数据 Metric Aggregation Bucket Aggregation ES聚合分析不精准原因分析 提高聚合精确度 ElasticSearch聚合操作 Elasticsearch除搜索以外,提供了针对ES 数据进行统计分析的功能。聚合(aggregation…...
普中51单片机学习(定时器和计数器)
定时器和计数器 51单片机有两组定时器/计数器,因为既可以定时,又可以计数,故称之为定时器/计数器。定时器/计数器和单片机的CPU是相互独立的。定时器/计数器工作的过程是自动完成的,不需要CPU的参与。51单片机中的定时器/计数器是…...
having子句
目录 having子句 having和where的区别 Oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 现在要求查询出每个职位的名称,职位的平均工资,但是要求显示平均工资高于 200 的职位 按照职位先进行分组,同…...
STM32H7 系列 MCU 内部 SRAM
通过参看《STM32H7 参考手册》“2.4 Embedded SRAM”章节知道 The STM32H743/53xx and STM32H750xB 内存特性: Up to 864 Kbytes of System SRAM 128 Kbytes of data TCM RAM 64 Kbytes of instruction TCM RAM 4 Kbytes of backup SRAM 1.1 TCM SRAM TCM : Tightly-Coupled …...
备战蓝桥杯---动态规划(应用2(一些十分巧妙的优化dp的手段))
好久不见,甚是想念,最近一直在看过河这道题(感觉最近脑子有点宕机QAQ),现在算是有点懂了,打算记录下这道又爱又恨的题。(如有错误欢迎大佬帮忙指出) 话不多说,直接看题&…...
从 git 分支中合并特定文件,而不是整个分支的内容
问题 在git 中,我们可以使用 git merge 命令,合并整个分支,覆盖当前分支的内容,但是有时候我们并不想这么做,而是想 merge 某个文件。那么下面提供两种办法。 方法一 使用 git checkout,从别的分支&#x…...
pycharm 远程运行报错 Failed to prepare environment
什么也没动的情况下,远程连接后运行是没问题的,突然在运行时就运行不了了,解决方案 清理缓存: 有时候 PyCharm 的内部缓存可能出现问题,可以尝试清除缓存(File > Invalidate Caches / Restart࿰…...
(十二)【Jmeter】线程(Threads(Users))之setUp 线程组
简述 操作路径如下: 作用:在正式测试开始前执行预加载或预热操作,为测试做准备。配置:设置预加载或预热操作的采样器、循环次数等参数。使用场景:确保在正式测试开始前应用程序已经达到稳定状态,减少测试结果的偏差。优点:提供预加载或预热操作,确保测试的准确性。缺…...
代码随想录算法训练营第二十五天|216.组合总和III,17.电话号码的字母组合
目录 216.组合总和II 17.电话号码的字母组合 216.组合总和II 如果把 组合问题理解了,本题就容易一些了。 题目链接/文章讲解:代码随想录 视频讲解:和组合问题有啥区别?回溯算法如何剪枝?| LeetCode:216.…...
c#创建安装windows服务
背景:最近在做设备数据对接采集时,遇到一些设备不是标准的Service-Client接口,导致采集的数据不够准确;比如设备如果中途开关机后,加工的数量就会从0开始重新计数,因此需要实时监控设备的数据,进行叠加处理;考略到工厂设备比较多,实时监听接口的数据为每秒3次,因此将…...
【JVM】打破双亲委派机制
📝个人主页:五敷有你 🔥系列专栏:JVM ⛺️稳中求进,晒太阳 打破双亲委派机制 打破双亲委派机制三种方法 自定义类加载器 ClassLoader包含了四个核心方法 //由类加载器子类实现,获取二进制数据调用…...
程序员要了解的AI基本知识
一.AI从业人员的三个层次 AI从业人员的层次是不同的,所以需要的知识面也是不同的。下面大致给出了3个层面。 1.学术研究者 他们的工作是从理论上诠释机器学习的各个方面,试图找出“这样设计模型/参数为什么效果更好”,并且为其他从业者提供…...
CentOS下的分布式内存计算Spark环境部署
一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架,相比 MapReduce 具有以下核心优势: 内存计算:数据可常驻内存,迭代计算性能提升 10-100 倍(文档段落:3-79…...
【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例
文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...
2025盘古石杯决赛【手机取证】
前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来,实在找不到,希望有大佬教一下我。 还有就会议时间,我感觉不是图片时间,因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...
大数据学习(132)-HIve数据分析
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言Ǵ…...
OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 在 GPU 上对图像执行 均值漂移滤波(Mean Shift Filtering),用于图像分割或平滑处理。 该函数将输入图像中的…...
腾讯云V3签名
想要接入腾讯云的Api,必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口,但总是卡在签名这一步,最后放弃选择SDK,这次终于自己代码实现。 可能腾讯云翻新了接口文档,现在阅读起来,清晰了很多&…...
PHP 8.5 即将发布:管道操作符、强力调试
前不久,PHP宣布了即将在 2025 年 11 月 20 日 正式发布的 PHP 8.5!作为 PHP 语言的又一次重要迭代,PHP 8.5 承诺带来一系列旨在提升代码可读性、健壮性以及开发者效率的改进。而更令人兴奋的是,借助强大的本地开发环境 ServBay&am…...
Chromium 136 编译指南 Windows篇:depot_tools 配置与源码获取(二)
引言 工欲善其事,必先利其器。在完成了 Visual Studio 2022 和 Windows SDK 的安装后,我们即将接触到 Chromium 开发生态中最核心的工具——depot_tools。这个由 Google 精心打造的工具集,就像是连接开发者与 Chromium 庞大代码库的智能桥梁…...
Vue3 PC端 UI组件库我更推荐Naive UI
一、Vue3生态现状与UI库选择的重要性 随着Vue3的稳定发布和Composition API的广泛采用,前端开发者面临着UI组件库的重新选择。一个好的UI库不仅能提升开发效率,还能确保项目的长期可维护性。本文将对比三大主流Vue3 UI库(Naive UI、Element …...
