Postgresql源码(122)Listen / Notify与事务的联动机制
前言
Notify和Listen是Postgresql提供的不同会话间异步消息通信功能,例子:
LISTEN virtual;
NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448.
NOTIFY virtual, 'This is the payload';
Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 8448.LISTEN foo;
SELECT pg_notify('fo' || 'o', 'pay' || 'load');
Asynchronous notification "foo" with payload "payload" received from server process with PID 14728.
功能使用PG的基础设施shm_mq + 信号机制拼装实现。
监听、通知的行为也兼容了数据库的事务的功能,事务回滚会删除监听、事务提交会触发通知。
本文对异步消息队列与事务的联动机制做一些分析。
事务提交触发
NOTIFY的功能必须等到事务提交才会触发:
postgres=# listen a1;
LISTEN
postgres=# begin;
BEGIN
postgres=*# notify a1;
NOTIFY
postgres=*# notify a1;
NOTIFY
postgres=*# commit;
COMMIT
Asynchronous notification "a1" received from server process with PID 17111.
流程比较简单,先从pendingActions中注册监听。再发信号触发异步notify。
void
AtCommit_Notify(void)
{...if (pendingActions != NULL){foreach(p, pendingActions->actions){ListenAction *actrec = (ListenAction *) lfirst(p);switch (actrec->action){case LISTEN_LISTEN:Exec_ListenCommit(actrec->channel);break;case LISTEN_UNLISTEN:Exec_UnlistenCommit(actrec->channel);break;case LISTEN_UNLISTEN_ALL:Exec_UnlistenAllCommit();break;}}}...if (pendingNotifies != NULL)SignalBackends();...
}
事务回滚清理
回滚后监听和通知都会清理:
postgres=# begin;
BEGIN
postgres=*# listen k123;
LISTEN
postgres=*# notify k123;
NOTIFY
postgres=*# abort;
ROLLBACK
postgres=# notify k123;
NOTIFY
postgres=#
事务回滚时执行清理动作:
void
AtAbort_Notify(void)
{if (amRegisteredListener && listenChannels == NIL)asyncQueueUnregister();pendingActions = NULL;pendingNotifies = NULL;
}
全部清理干净。
子事务提交不触发,交接给上一层事务
提交的子事务将notify交接给上一层事务。
postgres=# listen k000;
LISTEN
postgres=# begin;
BEGIN
postgres=*# savepoint sp1;
SAVEPOINT
postgres=*# savepoint sp2;
SAVEPOINT
postgres=*# notify k000;
NOTIFY
postgres=*# release sp2;
RELEASE
postgres=*# commit;
COMMIT
Asynchronous notification "k000" received from server process with PID 18902.
实现:
void
AtSubCommit_Notify(void)
{int my_level = GetCurrentTransactionNestLevel();if (pendingActions != NULL &&pendingActions->nestingLevel >= my_level){if (pendingActions->upper == NULL ||pendingActions->upper->nestingLevel < my_level - 1){--pendingActions->nestingLevel;}else{ActionList *childPendingActions = pendingActions;pendingActions = pendingActions->upper;pendingActions->actions =list_concat(pendingActions->actions,childPendingActions->actions);pfree(childPendingActions);}}if (pendingNotifies != NULL &&pendingNotifies->nestingLevel >= my_level){Assert(pendingNotifies->nestingLevel == my_level);if (pendingNotifies->upper == NULL ||pendingNotifies->upper->nestingLevel < my_level - 1){--pendingNotifies->nestingLevel;}else{NotificationList *childPendingNotifies = pendingNotifies;ListCell *l;pendingNotifies = pendingNotifies->upper;foreach(l, childPendingNotifies->events){Notification *childn = (Notification *) lfirst(l);if (!AsyncExistsPendingNotify(childn))AddEventToPendingNotifies(childn);}pfree(childPendingNotifies);}}
}
-
pendingActions:用于保存channel信息(LISTEN命令使用,Async_Listen中配置)
-
pendingNotifies:用于保存channel和payload信息(NOTIFY命令使用,Async_Notify中配置)
子事务提交时,notify并不会真正触发,也是和其他资源一样,将自己绑定的nestingLevel转移到上一层(注意这里是绑的nestingLevel不是xid比较合理)。
整体上会有两种情况:
情况一:子事务有间隔,走这个分支pendingActions->upper->nestingLevel < my_level - 1
begin;
savepoint sp1;
notify ch123;
savepoint sp2;
savepoint sp3;
notify ch789;
release sp3;
情况二:子事务无间隔,走else
分支
begin;
savepoint sp1;
notify ch123;
savepoint sp2;
notify ch456;
savepoint sp3;
notify ch789;
release sp3;
pendingActions和pendingNotifies都有自己的upper指针形成链式结构,两种数据结构在子事务提交时的行为都是将信息转移到上一层中,区别是pendingActions直接挂到上一层的actions链表;pendingNotifies调用AddEventToPendingNotifies接口完成同样的动作。
子事务回滚不触发,清理属于子事务的pendings
回滚的子事务会删除监听。
postgres=# begin;
BEGIN
postgres=*# savepoint sp1;
SAVEPOINT
postgres=*# listen k123;
LISTEN
postgres=*# savepoint sp2;
SAVEPOINT
postgres=*# listen k000;
LISTEN
postgres=*# rollback to sp2;
ROLLBACK
postgres=*# notify k123;
NOTIFY
postgres=*# notify k000;
NOTIFY
postgres=*# commit;
COMMIT
Asynchronous notification "k123" received from server process with PID 18098.
postgres=#
void
AtSubAbort_Notify(void)
{int my_level = GetCurrentTransactionNestLevel();...while (pendingActions != NULL &&pendingActions->nestingLevel >= my_level){ActionList *childPendingActions = pendingActions;pendingActions = pendingActions->upper;pfree(childPendingActions);}while (pendingNotifies != NULL &&pendingNotifies->nestingLevel >= my_level){NotificationList *childPendingNotifies = pendingNotifies;pendingNotifies = pendingNotifies->upper;pfree(childPendingNotifies);}
}
子事务回滚的话,全部是直接删除,不在做向上归属。
Listen/Notify的实现原理
(This content is a summary derived from code comments.)
- 同一台机器上有多个后端进程。多个后端进程监听多个通道。(在代码的其他部分,通道也被称为“conditions”。)
- 在基于磁盘的存储中有一个中央队列(目录 pg_notify/),通过 slru.c 模块将活跃使用的页面映射到共享内存中。所有的通知消息都被放置在队列中,稍后由监听的后端进程读取。没有集中的信息知道哪个后端进程监听哪个通道;每个后端进程都有自己感兴趣的通道列表。虽然只有一个队列,但通知被视为数据库本地的;这是通过在每个通知消息中包含发送者的数据库 OID 来实现的。监听的后端进程会忽略不匹配其数据库 OID 的消息。这一点很重要,因为它确保了发送者和接收者有相同的数据库编码,不会错误解释通道名称或有效载荷字符串中的非 ASCII 文本。由于通知不期望在数据库崩溃后存活,我们可以在任何重启时简单地清除 pg_notify 数据,并且不需要 WAL 支持或 fsync。
- 每个至少监听一个频道的后端进程都会通过将其进程ID注册到AsyncQueueControl的数组中来进行注册。然后,它会扫描中央队列中的所有传入通知,首先将通知的数据库OID与自身的数据库OID进行比较,然后将通知的频道与其监听的频道列表进行比较。如果匹配成功,它会将通知事件传递给前端。不匹配的事件将被简单地跳过。
- NOTIFY语句(Async_Notify例程)将通知存储在后端本地列表中,直到事务结束才会处理。来自同一事务的重复通知只发送一次通知。这样做是为了节省工作量,例如,当触发器在一个200万行的表上触发时,会为每一行的更改发送一个通知。如果应用程序需要接收每个已发送的单个通知,可以在额外的有效负载参数中轻松添加一些唯一的字符串。当事务准备提交时,PreCommit_Notify()将待处理的通知添加到队列的头部。队列的头指针始终指向下一个空闲位置,而位置只是一个页号和该页中的偏移量。这是在将事务标记为已提交之前完成的。如果在写入通知时遇到问题,我们仍然可以调用elog(ERROR, …),事务将回滚。一旦我们将所有通知放入队列中,我们将返回到CommitTransaction(),然后执行实际的事务提交。在提交后,我们会再次被调用(AtCommit_Notify())。在这里,我们对有效的监听状态(listenChannels)进行任何实际的更新。然后,我们向可能对我们的消息感兴趣的后端进程发送信号(包括我们自己的后端进程,如果正在监听)。这是通过SignalBackends()完成的,它会扫描监听后端进程的列表,并向每个监听后端进程发送一个PROCSIG_NOTIFY_INTERRUPT信号(我们不知道哪个后端进程在监听哪个频道,因此必须向它们全部发送信号)。但是,我们可以排除那些已经是最新状态的后端进程,并且还可以排除其他数据库中的后端进程(除非它们远远落后,应该被踢出以使其前进指针)。最后,在完全退出事务并即将进入空闲状态时,我们会扫描队列中需要发送到前端的消息(可能是来自其他后端的通知,或者是自己发送的通知)。这一步骤不是CommitTransaction序列的一部分,有两个重要原因。首先,我们在向前端发送数据时可能会出现错误,而在事务提交后进行清理时出现错误是非常糟糕的。其次,在某些情况下,一个过程在单个前端命令中发出多个提交,我们不希望在命令完成之前向前端发送通知;但是对于其他后端来说,每次提交后的通知应该立即发送出去。
- 收到PROCSIG_NOTIFY_INTERRUPT信号后,信号处理程序会设置进程的latch,如果该后端处于空闲状态(即等待前端命令并且不在事务块内),则会立即触发事件处理(参见ProcessClientReadInterrupt())。否则,处理程序可能只设置一个标志,在下次进入空闲状态之前进行处理。入站通知处理包括读取自上次扫描以来到达的所有通知。我们读取每个通知,直到达到未提交事务的通知或者头指针的位置。
- 为了限制磁盘空间的消耗,需要推进尾指针,以便可以截断旧的页面。这是相对昂贵的操作(特别是,它需要一个独占锁),因此我们不希望经常执行。如果发送后端将队列头推进到新页面,则会执行此操作,但每QUEUE_CLEANUP_DELAY页只执行一次。
一个在相同频道上监听的应用程序将会收到自己发送的NOTIFY消息。如果这些消息对应用程序没有用处,可以通过将NOTIFY消息中的be_pid与应用程序自身后端的PID进行比较来忽略它们。(从FE/BE协议2.0开始,在启动期间,后端的PID会提供给前端。)上述设计确保通过忽略自我通知,不会错过来自其他后端的通知。用于通知管理的共享内存使用量(NUM_NOTIFY_BUFFERS)可以根据需要进行调整,而不会影响除性能之外的任何内容。可以同时排队的通知数据的最大量由max_notify_queue_pages GUC确定。
相关文章:
Postgresql源码(122)Listen / Notify与事务的联动机制
前言 Notify和Listen是Postgresql提供的不同会话间异步消息通信功能,例子: LISTEN virtual; NOTIFY virtual; Asynchronous notification "virtual" received from server process with PID 8448. NOTIFY virtual, This is the payload; Asy…...

QT 数据库的增加操作和画图 Win
第一步、先配置CMakeLists.txt 在CMakeLists.txt中添加 find_package(Qt6 REQUIRED COMPONENTS Sql) find_package(Qt6 REQUIRED COMPONENTS Charts)target_link_libraries(${PROJECT_NAME} PRIVATE Qt6::Sql) target_link_libraries(${PROJECT_NAME} PRIVATE Qt6::Charts)避…...

【JS逆向学习】同花顺(q.10jqka)补环境
逆向目标 目标网址:https://q.10jqka.com.cn/ 目标接口: https://q.10jqka.com.cn/index/index/board/all/field/zdf/order/desc/page/3/ajax/1/ 目标参数:cookie 逆向过程 老规矩,先分析网络请求,发现是 cookie 加…...

解决MobaXterm网络错误连接超时问题
报错页面: 报错原因: ①网络断开了 ②网络端口,端口号改变 解决办法: ①重新连接网络按R ②固定端口号 第一步:编辑------>虚拟机网络编辑器(我的Linux在虚拟机里) 第二步:用…...

突发!AI独角兽「竹间智能」被曝停工停产6个月
大家好我是二狗。 今天早上起来刷朋友圈,看到一张截图——AI创企竹间智能,宣称因为公司所处的经营环境艰难,部分部门和岗位将从即日起停工停产6个月。 图源:(企服科学) 下面是文字版: 由于公司…...
Qt应用软件【协议篇】GPIO控制LED灯
GPIO简介 GPIO(General Purpose Input/Output,通用输入输出)是一种通用的端口定义,在各种计算机、嵌入式系统和微控制器中广泛应用。通过GPIO,计算机或微控制器可以与外部世界进行交互,例如读取传感器数据或控制外部设备(如LED灯、电机等)。 GPIO的应用场景 按钮和开…...

vulfocus靶场搭建
vulfocus靶场搭建 什么是vulfocus搭建教程靶场配置场景靶场编排靶场优化 什么是vulfocus Vulfocus 是一个漏洞集成平台,将漏洞环境 docker 镜像,放入即可使用,开箱即用,我们可以通过搭建该靶场,简单方便地复现一些框架…...
Swift基础知识:30.Swift访问控制
在 Swift 中,访问控制(Access Control)是一种用于限制代码模块对其他代码模块的访问权限的机制。通过访问控制,可以控制代码中各个部分的可见性和可访问性,以便于提高代码的安全性、可维护性和可复用性。 访问级别 S…...

ElasticSearch聚合操作
目录 ElasticSearch聚合操作 基本语法 聚合的分类 后续示例数据 Metric Aggregation Bucket Aggregation ES聚合分析不精准原因分析 提高聚合精确度 ElasticSearch聚合操作 Elasticsearch除搜索以外,提供了针对ES 数据进行统计分析的功能。聚合(aggregation…...

普中51单片机学习(定时器和计数器)
定时器和计数器 51单片机有两组定时器/计数器,因为既可以定时,又可以计数,故称之为定时器/计数器。定时器/计数器和单片机的CPU是相互独立的。定时器/计数器工作的过程是自动完成的,不需要CPU的参与。51单片机中的定时器/计数器是…...

having子句
目录 having子句 having和where的区别 Oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 现在要求查询出每个职位的名称,职位的平均工资,但是要求显示平均工资高于 200 的职位 按照职位先进行分组,同…...
STM32H7 系列 MCU 内部 SRAM
通过参看《STM32H7 参考手册》“2.4 Embedded SRAM”章节知道 The STM32H743/53xx and STM32H750xB 内存特性: Up to 864 Kbytes of System SRAM 128 Kbytes of data TCM RAM 64 Kbytes of instruction TCM RAM 4 Kbytes of backup SRAM 1.1 TCM SRAM TCM : Tightly-Coupled …...

备战蓝桥杯---动态规划(应用2(一些十分巧妙的优化dp的手段))
好久不见,甚是想念,最近一直在看过河这道题(感觉最近脑子有点宕机QAQ),现在算是有点懂了,打算记录下这道又爱又恨的题。(如有错误欢迎大佬帮忙指出) 话不多说,直接看题&…...
从 git 分支中合并特定文件,而不是整个分支的内容
问题 在git 中,我们可以使用 git merge 命令,合并整个分支,覆盖当前分支的内容,但是有时候我们并不想这么做,而是想 merge 某个文件。那么下面提供两种办法。 方法一 使用 git checkout,从别的分支&#x…...

pycharm 远程运行报错 Failed to prepare environment
什么也没动的情况下,远程连接后运行是没问题的,突然在运行时就运行不了了,解决方案 清理缓存: 有时候 PyCharm 的内部缓存可能出现问题,可以尝试清除缓存(File > Invalidate Caches / Restart࿰…...

(十二)【Jmeter】线程(Threads(Users))之setUp 线程组
简述 操作路径如下: 作用:在正式测试开始前执行预加载或预热操作,为测试做准备。配置:设置预加载或预热操作的采样器、循环次数等参数。使用场景:确保在正式测试开始前应用程序已经达到稳定状态,减少测试结果的偏差。优点:提供预加载或预热操作,确保测试的准确性。缺…...

代码随想录算法训练营第二十五天|216.组合总和III,17.电话号码的字母组合
目录 216.组合总和II 17.电话号码的字母组合 216.组合总和II 如果把 组合问题理解了,本题就容易一些了。 题目链接/文章讲解:代码随想录 视频讲解:和组合问题有啥区别?回溯算法如何剪枝?| LeetCode:216.…...

c#创建安装windows服务
背景:最近在做设备数据对接采集时,遇到一些设备不是标准的Service-Client接口,导致采集的数据不够准确;比如设备如果中途开关机后,加工的数量就会从0开始重新计数,因此需要实时监控设备的数据,进行叠加处理;考略到工厂设备比较多,实时监听接口的数据为每秒3次,因此将…...

【JVM】打破双亲委派机制
📝个人主页:五敷有你 🔥系列专栏:JVM ⛺️稳中求进,晒太阳 打破双亲委派机制 打破双亲委派机制三种方法 自定义类加载器 ClassLoader包含了四个核心方法 //由类加载器子类实现,获取二进制数据调用…...
程序员要了解的AI基本知识
一.AI从业人员的三个层次 AI从业人员的层次是不同的,所以需要的知识面也是不同的。下面大致给出了3个层面。 1.学术研究者 他们的工作是从理论上诠释机器学习的各个方面,试图找出“这样设计模型/参数为什么效果更好”,并且为其他从业者提供…...
反向工程与模型迁移:打造未来商品详情API的可持续创新体系
在电商行业蓬勃发展的当下,商品详情API作为连接电商平台与开发者、商家及用户的关键纽带,其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息(如名称、价格、库存等)的获取与展示,已难以满足市场对个性化、智能…...
代理篇12|深入理解 Vite中的Proxy接口代理配置
在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...

七、数据库的完整性
七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...
Caliper 配置文件解析:fisco-bcos.json
config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...
vue3 daterange正则踩坑
<el-form-item label"空置时间" prop"vacantTime"> <el-date-picker v-model"form.vacantTime" type"daterange" start-placeholder"开始日期" end-placeholder"结束日期" clearable :editable"fal…...
如何配置一个sql server使得其它用户可以通过excel odbc获取数据
要让其他用户通过 Excel 使用 ODBC 连接到 SQL Server 获取数据,你需要完成以下配置步骤: ✅ 一、在 SQL Server 端配置(服务器设置) 1. 启用 TCP/IP 协议 打开 “SQL Server 配置管理器”。导航到:SQL Server 网络配…...

【51单片机】4. 模块化编程与LCD1602Debug
1. 什么是模块化编程 传统编程会将所有函数放在main.c中,如果使用的模块多,一个文件内会有很多代码,不利于组织和管理 模块化编程则是将各个模块的代码放在不同的.c文件里,在.h文件里提供外部可调用函数声明,其他.c文…...
GeoServer发布PostgreSQL图层后WFS查询无主键字段
在使用 GeoServer(版本 2.22.2) 发布 PostgreSQL(PostGIS)中的表为地图服务时,常常会遇到一个小问题: WFS 查询中,主键字段(如 id)莫名其妙地消失了! 即使你在…...
el-amap-bezier-curve运用及线弧度设置
文章目录 简介示例线弧度属性主要弧度相关属性其他相关样式属性完整示例链接简介 el-amap-bezier-curve 是 Vue-Amap 组件库中的一个组件,用于在 高德地图 上绘制贝塞尔曲线。 基本用法属性path定义曲线的路径,可以是多个弧线段的组合。stroke-weight线条的宽度。stroke…...

多模态学习路线(2)——DL基础系列
目录 前言 一、归一化 1. Layer Normalization (LN) 2. Batch Normalization (BN) 3. Instance Normalization (IN) 4. Group Normalization (GN) 5. Root Mean Square Normalization(RMSNorm) 二、激活函数 1. Sigmoid激活函数(二分类&…...