Postgresql源码(110)分析dsm动态共享内存分配与共享内存mq实例
相关
《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》
《Linux内存映射函数mmap与匿名内存块》
《Linux共享内存与子进程继承》
0 概念
数据结构含义:
- dsm_segment(动态共享内存段):
- 每个后端进程可以通过使用dsm_segment来访问共享内存。dsm_segment包含了内存分配和释放等操作所需的信息,并提供了一组函数来管理和操作共享内存。每个会话(session)都可以有一个或多个dsm_segment。
- 提供共享内存,dsm_segment表示申请的一个共享内存段,对于dsm api来说这是代表共享内存的最小单位。
- shm_mq_handle(共享内存消息队列句柄)
- shm_mq_handle则是用于在共享内存中进行消息传递的句柄。它提供了一组函数来发送和接收消息,并提供了同步和互斥机制,确保多个进程之间的顺序和一致性。
- 使用dsm_segment提供共享内存段做进程通信。
在实际使用中,通常会将它们组合在一起,以实现共享内存中的消息传递机制。
1 shm_toc初始化一段共享内存,共享内存是从哪来的?
在PG代码中可以看到shm_toc初始化一段内存,在头部放置shm_toc,这块内存叫做一个内存段,shm_toc_create函数接受已经申请好的内存地址,在头部初始化shm_toc(表示内存段头),后面可以存放用户自定义数据,比如message queue所需的结构体、数据等。
初始化一段共享内存,在头上放shm_toc结构,明显是用来记录内存段管理的一些数据。
struct shm_toc
{uint64 toc_magic; /* Magic number identifying this TOC */slock_t toc_mutex; /* Spinlock for mutual exclusion */Size toc_total_bytes; /* Bytes managed by this TOC */Size toc_allocated_bytes; /* Bytes allocated of those managed */uint32 toc_nentry; /* Number of entries in TOC */shm_toc_entry toc_entry[FLEXIBLE_ARRAY_MEMBER];
};
初始化
shm_toc *
shm_toc_create(uint64 magic, void *address, Size nbytes)
{shm_toc *toc = (shm_toc *) address;Assert(nbytes > offsetof(shm_toc, toc_entry));toc->toc_magic = magic;SpinLockInit(&toc->toc_mutex);/** The alignment code in shm_toc_allocate() assumes that the starting* value is buffer-aligned.*/toc->toc_total_bytes = BUFFERALIGN_DOWN(nbytes);toc->toc_allocated_bytes = 0;toc->toc_nentry = 0;return toc;
}
那么shm_toc_create用的内存是从哪来的?
2 动态mmap一段新的共享内存(dsm机制)
- Postgresql能看到很多dsm开头的函数,这类函数属于运行时动态申请共享内存模块( dynamic shared memory)。
- 《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》介绍过,PG的共享内存是在启动时,直接用
mmap
一次性申请大匿名块,然后自己切分使用的。但如果运行时需要申请新的、不定大小的共享内存块,肯定无法再启动时预先申请,这就引入了dsm模块。
3 dsm申请共享内存应用实例:mq
在源码中有一个非常好的例子,可以用来分析dsm申请内存用作mq的方法:
src/test/modules/test_shm_mq
下面对代码流程做一些分析,主要分析动态申请共享内存的过程,不涉及初始化后的使用流程。
3.1 内存初始化:test_shm_mq_setup
void
test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,shm_mq_handle **output, shm_mq_handle **input)
{dsm_segment *seg;test_shm_mq_header *hdr;shm_mq *outq = NULL; /* placate compiler */shm_mq *inq = NULL; /* placate compiler */worker_state *wstate;/* Set up a dynamic shared memory segment. */setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);*segp = seg;/* Register background workers. */wstate = setup_background_workers(nworkers, seg);/* Attach the queues. */*output = shm_mq_attach(outq, seg, wstate->handle[0]);*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);/* Wait for workers to become ready. */wait_for_workers_to_become_ready(wstate, hdr);/** Once we reach this point, all workers are ready. We no longer need to* kill them if we die; they'll die on their own as the message queues* shut down.*/cancel_on_dsm_detach(seg, cleanup_background_workers,PointerGetDatum(wstate));pfree(wstate);
}
第一步:申请共享内存setup_dynamic_shared_memory
static void
setup_dynamic_shared_memory(int64 queue_size, int nworkers,dsm_segment **segp, test_shm_mq_header **hdrp,shm_mq **outp, shm_mq **inp)
{shm_toc_estimator e;int i;Size segsize;dsm_segment *seg;shm_toc *toc;test_shm_mq_header *hdr;
用estimator来评估需要多少共享内存(可以不用自己算了:)
评估器的接口:
- shm_toc_initialize_estimator:初始化记录(key的个数和chunk的总大小)。
- shm_toc_estimate_chunk:需要多大的chunk?调用一次记录一个指定大小的chunk,对齐到32字节上。
- shm_toc_estimate_keys:需要几个key?这里记录一下
- shm_toc_estimate:把之前记录的结构换算成大小
评估器的原理:记录key的个数和chunk的总大小即可。
shm_toc_initialize_estimator(&e);shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));for (i = 0; i <= nworkers; ++i)shm_toc_estimate_chunk(&e, (Size) queue_size);shm_toc_estimate_keys(&e, 2 + nworkers);segsize = shm_toc_estimate(&e);
评估完了开始用dsm_create申请新的共享内存了!
dsm_impl_op→dsm_impl_op→dsm_impl_mmap来具体从os申请共享内存,不深入了。
现在拿到seg就是dsm_segment动态共享内存段,代表一段共享内存(段内部有dsm_control一套管理机制,需要深入的话可以看看dsm_create代码),这里只需要指导这是一段按需要大小mmap出来的共享内存就好了。
/* Create the shared memory segment and establish a table of contents. */seg = dsm_create(shm_toc_estimate(&e), 0);
在共享内存其实的位置,放一个shm_toc结构,作为后面数据的表头和索引
---------------- 共享内存起始地址
struct shm_toc
{uint64 toc_magic; /* Magic number identifying this TOC */slock_t toc_mutex; /* Spinlock for mutual exclusion */Size toc_total_bytes; /* Bytes managed by this TOC */Size toc_allocated_bytes; /* Bytes allocated of those managed */uint32 toc_nentry; /* Number of entries in TOC */shm_toc_entry toc_entry[0];
};
---------------- 申请空间是从最高地址开始的用的,索引数组toc_entry从低到高,数据从高到低,和page一样
toc_entry[0] -------------\
---------------- |
toc_entry[1] |
---------------- |
toc_entry[2] ---------\ |
---------------- | |
data <-----------------/ |
... |
data |
... |
data <----------------------/
----------------
toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),segsize);
申请一段用户自定义数据空间,放一个test_shm_mq_header结构。
hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));SpinLockInit(&hdr->mutex);hdr->workers_total = nworkers;hdr->workers_attached = 0;hdr->workers_ready = 0;
初始化好数据结构后,把数据插进去,位置前面shm_toc_allocate已经申请好了,这一步shm_toc_insert主要是配置toc_entry那个索引数组!具体就是toc_entry[0]的key和offset,下次用的时候用0就能找到offset了。
shm_toc_insert(toc, 0, hdr);/* Set up one message queue per worker, plus one. */for (i = 0; i <= nworkers; ++i){shm_mq *mq;
每个worker申请一个queue_size,按上面方法allocate+insert。
mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),(Size) queue_size);shm_toc_insert(toc, i + 1, mq);if (i == 0){
把mq用户需要的数据结构传进去进行需要的配置即可,shm_toc_insert完了也可以用,alloc出来的地址直接指向共享内存,可以在当前进程一直使用。
/* We send messages to the first queue. */shm_mq_set_sender(mq, MyProc);*outp = mq;}if (i == nworkers){/* We receive messages from the last queue. */shm_mq_set_receiver(mq, MyProc);*inp = mq;}}/* Return results to caller. */*segp = seg;*hdrp = hdr;
}
第二步:配置bgworker:setup_background_workers
static worker_state *
setup_background_workers(int nworkers, dsm_segment *seg)
{MemoryContext oldcontext;BackgroundWorker worker;worker_state *wstate;int i;oldcontext = MemoryContextSwitchTo(CurTransactionContext);
setup_background_workers函数目标是构造worker_state,下面是worker_state的内存结构:
------------------------------
typedef struct
{int nworkers;BackgroundWorkerHandle *handle[]; // flex数组,每个位置对应一个worker{int slot;uint64 generation;}
} worker_state;
------------------------------
handle[0]
------------------------------
handle[1]
------------------------------
handle[2]
------------------------------
wstate = MemoryContextAlloc(TopTransactionContext,offsetof(worker_state, handle) +sizeof(BackgroundWorkerHandle *) * nworkers);wstate->nworkers = 0;on_dsm_detach(seg, cleanup_background_workers,PointerGetDatum(wstate));/* Configure a worker. */
配通用的worker通用部分:
memset(&worker, 0, sizeof(worker));worker.bgw_flags = BGWORKER_SHMEM_ACCESS;worker.bgw_start_time = BgWorkerStart_ConsistentState;worker.bgw_restart_time = BGW_NEVER_RESTART;sprintf(worker.bgw_library_name, "test_shm_mq");sprintf(worker.bgw_function_name, "test_shm_mq_main");snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq");worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));/* set bgw_notify_pid, so we can detect if the worker stops */worker.bgw_notify_pid = MyProcPid;
把worker通用的部分传进去,拼每个worker特有的部分
/* Register the workers. */for (i = 0; i < nworkers; ++i){
从BackgroundWorkerData->slot
数组(bgworker管理的全局变量记录使用所有使用中未使用的bgworker,每个worker一个slot)里面拿到一个没使用的slot,记录slot的id到handle[i]
中并返回即可。
if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))ereport(ERROR,(errcode(ERRCODE_INSUFFICIENT_RESOURCES),errmsg("could not register background process"),errhint("You may need to increase max_worker_processes.")));++wstate->nworkers;}/* All done. */MemoryContextSwitchTo(oldcontext);return wstate;
}
第三步:剩余流程
void
test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,shm_mq_handle **output, shm_mq_handle **input)
{dsm_segment *seg;test_shm_mq_header *hdr;shm_mq *outq = NULL; /* placate compiler */shm_mq *inq = NULL; /* placate compiler */worker_state *wstate;/* Set up a dynamic shared memory segment. */setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);*segp = seg;/* Register background workers. */wstate = setup_background_workers(nworkers, seg);
第三步到这里了,shm_mq_handle的作用是构建一个shm_mq_handle,记录dsm seg、mq、bgworker的handle
/* Attach the queues. */*output = shm_mq_attach(outq, seg, wstate->handle[0]);*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
用取一个PID的方式检查worker是不是ready了。
wait_for_workers_to_become_ready(wstate, hdr);cancel_on_dsm_detach(seg, cleanup_background_workers,PointerGetDatum(wstate));pfree(wstate);
}
到这里内存初始化的工作做完了。后面可以直接调用mq的api发送信息,本篇不在关注。
相关文章:
Postgresql源码(110)分析dsm动态共享内存分配与共享内存mq实例
相关 《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》 《Linux内存映射函数mmap与匿名内存块》 《Linux共享内存与子进程继承》 0 概念 数据结构含义: dsm_segment(动态共享内存段):…...

51单片机学习--蜂鸣器播放音乐
由原理图可知,蜂鸣器BEEP与P1_5 相关,但其实这个原理图有错,实测接的是P2_5 下面这个代码就是以500HZ的频率响500ms的例子 sbit Buzzer P2^5;unsigned char KeyNum; unsigned int i;void main() {while(1){KeyNum Key();if(KeyNum){for(i …...

【Vue组件eval方法的使用】
Vue页面中条件可以放在当前vue页面中而无需影响到组件 如 这是我的表格操作列按钮,需求是第四个按钮如果表格当前数据的is_execl字段为0则显示否则隐藏 这种条件判断很频繁 如果像之前一样给一个标识,页面多了就难以维护,而且判断条件如果不…...

C++ 多文件结构和编译预处理命令
1. C程序的一般组织结构 C源程序的结构基本上都是由3个部分构成:类的定义、类的成员的实现和主函数。因为所编写的程序比较小,所以这三个部分都写在了同一个文件当中。在规模比较大的项目中,往往需要多个源程序文件,每个源程序文…...

QT实现中英文键盘
使用Qt中实现中英文键盘,支持各种linux嵌入式设备。 实现思路:需要一个中文字体库,将字体库加载到一个Hash容器,字母和拼音作为key值,对应的中文作为value值。 核心代码: #include "UKeyBoard.h"…...
java中并发编程CompletableFuture和supplyAsync的用法
在Java中,并发编程可以使用CompletableFuture类来实现异步操作和并行任务。其中,supplyAsync是CompletableFuture类的一个静态方法,用于执行一个有返回值的异步任务。 supplyAsync方法的语法如下: public static <U> Comp…...
chrony服务器
目录 1.chrony简介 2.chrony安装配置 2.1 chrony安装及配置 非常重要:在配置之前,检查防火墙和selinux是否关闭 3.将chrony配置为其他主机的时间服务器 3.1 修改chrony配置文件 4.chronyc命令 1.chrony简介 chrony 是开源的遵循网络时间协议&…...

春秋云镜 CVE-2021-24762
春秋云镜 CVE-2021-24762 WordPress Plugin Perfect Survey 注入 靶标介绍 WordPress Perfect Survey plugin在1.5.2之前版本存在SQL注入漏洞,该漏洞源于基于数据库的应用缺少对外部输入SQL语句的验证。攻击者可利用该漏洞执行非法SQL命令。 启动场景 漏洞利用 …...

K8s中的Service
Service 1.Service存在的意义? (1)pod注册在service里面,做服务发现使用 (2)定义一组Pod访问策略(负载均衡) 2.Pod和Service关系 通过service实现Pod的负载均衡 3.常用Service类…...
[软件工程] 全局分析规格说明书模板
1 价值需求 描述目标系统的价值需求,可以附上商业模式画布。 1.1 利益相关者 描述目标系统的利益相关者,包括终端用户、企业组织、投资人等。 1.2 系统愿景 描述利益相关者共同达成一致的愿景,该愿景的描述需要对准企业的战略目标。 1.3 系统…...

【JAVASE】封装
⭐ 作者:小胡_不糊涂 🌱 作者主页:小胡_不糊涂的个人主页 📀 收录专栏:浅谈Java 💖 持续更文,关注博主少走弯路,谢谢大家支持 💖 封装 1. 什么是封装2. 访问限定符3. 封装…...

Java多线程(四)
目录 一、线程的状态 1.1 观察线程的所有状态 1.2 线程状态和状态转移的意义 1.2.1 NEW、RUNNABLE、TERMINATED状态转换 1.2.2 WAITING、BLOCKED、TIMED_WAITING状态转换 1.2.3 yield()大公无私让出cpu 一、线程的状态 1.1 观察线程的所有状态 public class Demo9 {public st…...
Linux 文件系统预留空间
在一次磁盘打满的场景下,使用了df -h命令来查看磁盘使用率。 [roottest tmp]# df -h 文件系统 容量 已用 可用 已用% 挂载点 devtmpfs 28G 0 28G 0% /dev tmpfs 28G 0 28G 0% /dev/shm tmpfs 28G 2.8G …...
篇一:单例模式:C++中的独一无二
篇一:“单例模式:C中的独一无二” 设计模式在软件开发中起到了至关重要的作用,其中单例模式是最为常用且基础的设计模式之一。单例模式确保一个类只有一个实例,并提供全局访问点,以保证系统中的某个对象只有唯一的存在…...

JVM之内存结构
1.程序计数器 定义:程序计数器(Program Counter Register)是JVM中一块较小的内存空间。解释器在解释JVM指令为机器码以供CPU执行时,会去程序计数器当中找到jvm指令的执行地址。 作用:记住下一条jvm指令的执行地址 特…...
C#实现结构体与字节流的相互转化
unity项目中,涉及到与C的相互通信,而通信接口为C封好的动态库。所以,传输信息时,需要向C端发送字节流信息。 对此,需将结构体数据转为字节流,其代码如下: public static byte[] StructToBytes(…...

用LangChain开源框架实现知识机器人
前言 Large Language Models (LLMs)在2020年OpenAI 的 GPT-3 的发布而进入世界舞台 。从那时起,他们稳步增长进入公众视野。 众所周知 OpenAI 的 API 无法联网,所以大家如果想通过它的API实现联网搜索并给出回答、总结 PDF 文档、基于某个 Youtube 视频…...

HCIP——前期综合实验
前期综合实验 一、实验拓扑二、实验要求三、实验思路四、实验步骤1、配置接口IP地址2、交换机配置划分vlan10以及vlan203、总部分部,骨干网配置OSPF分部总部骨干网 4、配置BGP建立邻居关系总部骨干网分部 5、发布用户网段6、将下一跳改为本地7、允许AS重复8、重发布…...

【2023年电赛】运动目标控制与自动追踪系统(E 题)最简单实现
本方案的思路是最简单的不涉及复杂算法:识别矩形框,标记矩形框,输出坐标和中心点,计算长度,控制舵机移动固定长度!仅供完成基础功能参考,不喜勿喷! # 实现运动目标控制与自动追踪系…...

【IMX6ULL驱动开发学习】22.IMX6ULL开发板读取ADC(以MQ-135为例)
IMX6ULL一共有两个ADC,每个ADC都有八个通道,但他们共用一个ADC控制器 1.设备树 在imx6ull.dtsi文件中已经帮我们定义好了adc1的节点部分信息 adc1: adc02198000 {compatible "fsl,imx6ul-adc", "fsl,vf610-adc";reg <0x0219…...

K8S认证|CKS题库+答案| 11. AppArmor
目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...

Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件
今天呢,博主的学习进度也是步入了Java Mybatis 框架,目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学,希望能对大家有所帮助,也特别欢迎大家指点不足之处,小生很乐意接受正确的建议&…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享
文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的,根据Excel列的需求预估的工时直接打骨折,不要问我为什么,主要…...

CMake 从 GitHub 下载第三方库并使用
有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...

如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...
鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南
1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发,使用DevEco Studio作为开发工具,采用Java语言实现,包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...
动态 Web 开发技术入门篇
一、HTTP 协议核心 1.1 HTTP 基础 协议全称 :HyperText Transfer Protocol(超文本传输协议) 默认端口 :HTTP 使用 80 端口,HTTPS 使用 443 端口。 请求方法 : GET :用于获取资源,…...
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的“no matching...“系列算法协商失败问题
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的"no matching..."系列算法协商失败问题 摘要: 近期,在使用较新版本的OpenSSH客户端连接老旧SSH服务器时,会遇到 "no matching key exchange method found", "n…...
08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险
C#入门系列【类的基本概念】:开启编程世界的奇妙冒险 嘿,各位编程小白探险家!欢迎来到 C# 的奇幻大陆!今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类!别害怕,跟着我,保准让你轻松搞…...

Razor编程中@Html的方法使用大全
文章目录 1. 基础HTML辅助方法1.1 Html.ActionLink()1.2 Html.RouteLink()1.3 Html.Display() / Html.DisplayFor()1.4 Html.Editor() / Html.EditorFor()1.5 Html.Label() / Html.LabelFor()1.6 Html.TextBox() / Html.TextBoxFor() 2. 表单相关辅助方法2.1 Html.BeginForm() …...