Postgresql源码(110)分析dsm动态共享内存分配与共享内存mq实例
相关
《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》
《Linux内存映射函数mmap与匿名内存块》
《Linux共享内存与子进程继承》
0 概念
数据结构含义:
- dsm_segment(动态共享内存段):
- 每个后端进程可以通过使用dsm_segment来访问共享内存。dsm_segment包含了内存分配和释放等操作所需的信息,并提供了一组函数来管理和操作共享内存。每个会话(session)都可以有一个或多个dsm_segment。
- 提供共享内存,dsm_segment表示申请的一个共享内存段,对于dsm api来说这是代表共享内存的最小单位。
- shm_mq_handle(共享内存消息队列句柄)
- shm_mq_handle则是用于在共享内存中进行消息传递的句柄。它提供了一组函数来发送和接收消息,并提供了同步和互斥机制,确保多个进程之间的顺序和一致性。
- 使用dsm_segment提供共享内存段做进程通信。
在实际使用中,通常会将它们组合在一起,以实现共享内存中的消息传递机制。
1 shm_toc初始化一段共享内存,共享内存是从哪来的?
在PG代码中可以看到shm_toc初始化一段内存,在头部放置shm_toc,这块内存叫做一个内存段,shm_toc_create函数接受已经申请好的内存地址,在头部初始化shm_toc(表示内存段头),后面可以存放用户自定义数据,比如message queue所需的结构体、数据等。
初始化一段共享内存,在头上放shm_toc结构,明显是用来记录内存段管理的一些数据。
struct shm_toc
{uint64 toc_magic; /* Magic number identifying this TOC */slock_t toc_mutex; /* Spinlock for mutual exclusion */Size toc_total_bytes; /* Bytes managed by this TOC */Size toc_allocated_bytes; /* Bytes allocated of those managed */uint32 toc_nentry; /* Number of entries in TOC */shm_toc_entry toc_entry[FLEXIBLE_ARRAY_MEMBER];
};
初始化
shm_toc *
shm_toc_create(uint64 magic, void *address, Size nbytes)
{shm_toc *toc = (shm_toc *) address;Assert(nbytes > offsetof(shm_toc, toc_entry));toc->toc_magic = magic;SpinLockInit(&toc->toc_mutex);/** The alignment code in shm_toc_allocate() assumes that the starting* value is buffer-aligned.*/toc->toc_total_bytes = BUFFERALIGN_DOWN(nbytes);toc->toc_allocated_bytes = 0;toc->toc_nentry = 0;return toc;
}
那么shm_toc_create用的内存是从哪来的?
2 动态mmap一段新的共享内存(dsm机制)
- Postgresql能看到很多dsm开头的函数,这类函数属于运行时动态申请共享内存模块( dynamic shared memory)。
- 《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》介绍过,PG的共享内存是在启动时,直接用
mmap一次性申请大匿名块,然后自己切分使用的。但如果运行时需要申请新的、不定大小的共享内存块,肯定无法再启动时预先申请,这就引入了dsm模块。
3 dsm申请共享内存应用实例:mq
在源码中有一个非常好的例子,可以用来分析dsm申请内存用作mq的方法:
src/test/modules/test_shm_mq
下面对代码流程做一些分析,主要分析动态申请共享内存的过程,不涉及初始化后的使用流程。
3.1 内存初始化:test_shm_mq_setup
void
test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,shm_mq_handle **output, shm_mq_handle **input)
{dsm_segment *seg;test_shm_mq_header *hdr;shm_mq *outq = NULL; /* placate compiler */shm_mq *inq = NULL; /* placate compiler */worker_state *wstate;/* Set up a dynamic shared memory segment. */setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);*segp = seg;/* Register background workers. */wstate = setup_background_workers(nworkers, seg);/* Attach the queues. */*output = shm_mq_attach(outq, seg, wstate->handle[0]);*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);/* Wait for workers to become ready. */wait_for_workers_to_become_ready(wstate, hdr);/** Once we reach this point, all workers are ready. We no longer need to* kill them if we die; they'll die on their own as the message queues* shut down.*/cancel_on_dsm_detach(seg, cleanup_background_workers,PointerGetDatum(wstate));pfree(wstate);
}
第一步:申请共享内存setup_dynamic_shared_memory
static void
setup_dynamic_shared_memory(int64 queue_size, int nworkers,dsm_segment **segp, test_shm_mq_header **hdrp,shm_mq **outp, shm_mq **inp)
{shm_toc_estimator e;int i;Size segsize;dsm_segment *seg;shm_toc *toc;test_shm_mq_header *hdr;
用estimator来评估需要多少共享内存(可以不用自己算了:)
评估器的接口:
- shm_toc_initialize_estimator:初始化记录(key的个数和chunk的总大小)。
- shm_toc_estimate_chunk:需要多大的chunk?调用一次记录一个指定大小的chunk,对齐到32字节上。
- shm_toc_estimate_keys:需要几个key?这里记录一下
- shm_toc_estimate:把之前记录的结构换算成大小
评估器的原理:记录key的个数和chunk的总大小即可。
shm_toc_initialize_estimator(&e);shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));for (i = 0; i <= nworkers; ++i)shm_toc_estimate_chunk(&e, (Size) queue_size);shm_toc_estimate_keys(&e, 2 + nworkers);segsize = shm_toc_estimate(&e);
评估完了开始用dsm_create申请新的共享内存了!
dsm_impl_op→dsm_impl_op→dsm_impl_mmap来具体从os申请共享内存,不深入了。
现在拿到seg就是dsm_segment动态共享内存段,代表一段共享内存(段内部有dsm_control一套管理机制,需要深入的话可以看看dsm_create代码),这里只需要指导这是一段按需要大小mmap出来的共享内存就好了。
/* Create the shared memory segment and establish a table of contents. */seg = dsm_create(shm_toc_estimate(&e), 0);
在共享内存其实的位置,放一个shm_toc结构,作为后面数据的表头和索引
---------------- 共享内存起始地址
struct shm_toc
{uint64 toc_magic; /* Magic number identifying this TOC */slock_t toc_mutex; /* Spinlock for mutual exclusion */Size toc_total_bytes; /* Bytes managed by this TOC */Size toc_allocated_bytes; /* Bytes allocated of those managed */uint32 toc_nentry; /* Number of entries in TOC */shm_toc_entry toc_entry[0];
};
---------------- 申请空间是从最高地址开始的用的,索引数组toc_entry从低到高,数据从高到低,和page一样
toc_entry[0] -------------\
---------------- |
toc_entry[1] |
---------------- |
toc_entry[2] ---------\ |
---------------- | |
data <-----------------/ |
... |
data |
... |
data <----------------------/
----------------
toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),segsize);
申请一段用户自定义数据空间,放一个test_shm_mq_header结构。
hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));SpinLockInit(&hdr->mutex);hdr->workers_total = nworkers;hdr->workers_attached = 0;hdr->workers_ready = 0;
初始化好数据结构后,把数据插进去,位置前面shm_toc_allocate已经申请好了,这一步shm_toc_insert主要是配置toc_entry那个索引数组!具体就是toc_entry[0]的key和offset,下次用的时候用0就能找到offset了。
shm_toc_insert(toc, 0, hdr);/* Set up one message queue per worker, plus one. */for (i = 0; i <= nworkers; ++i){shm_mq *mq;
每个worker申请一个queue_size,按上面方法allocate+insert。
mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),(Size) queue_size);shm_toc_insert(toc, i + 1, mq);if (i == 0){
把mq用户需要的数据结构传进去进行需要的配置即可,shm_toc_insert完了也可以用,alloc出来的地址直接指向共享内存,可以在当前进程一直使用。
/* We send messages to the first queue. */shm_mq_set_sender(mq, MyProc);*outp = mq;}if (i == nworkers){/* We receive messages from the last queue. */shm_mq_set_receiver(mq, MyProc);*inp = mq;}}/* Return results to caller. */*segp = seg;*hdrp = hdr;
}
第二步:配置bgworker:setup_background_workers
static worker_state *
setup_background_workers(int nworkers, dsm_segment *seg)
{MemoryContext oldcontext;BackgroundWorker worker;worker_state *wstate;int i;oldcontext = MemoryContextSwitchTo(CurTransactionContext);
setup_background_workers函数目标是构造worker_state,下面是worker_state的内存结构:
------------------------------
typedef struct
{int nworkers;BackgroundWorkerHandle *handle[]; // flex数组,每个位置对应一个worker{int slot;uint64 generation;}
} worker_state;
------------------------------
handle[0]
------------------------------
handle[1]
------------------------------
handle[2]
------------------------------
wstate = MemoryContextAlloc(TopTransactionContext,offsetof(worker_state, handle) +sizeof(BackgroundWorkerHandle *) * nworkers);wstate->nworkers = 0;on_dsm_detach(seg, cleanup_background_workers,PointerGetDatum(wstate));/* Configure a worker. */
配通用的worker通用部分:
memset(&worker, 0, sizeof(worker));worker.bgw_flags = BGWORKER_SHMEM_ACCESS;worker.bgw_start_time = BgWorkerStart_ConsistentState;worker.bgw_restart_time = BGW_NEVER_RESTART;sprintf(worker.bgw_library_name, "test_shm_mq");sprintf(worker.bgw_function_name, "test_shm_mq_main");snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq");worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));/* set bgw_notify_pid, so we can detect if the worker stops */worker.bgw_notify_pid = MyProcPid;
把worker通用的部分传进去,拼每个worker特有的部分
/* Register the workers. */for (i = 0; i < nworkers; ++i){
从BackgroundWorkerData->slot数组(bgworker管理的全局变量记录使用所有使用中未使用的bgworker,每个worker一个slot)里面拿到一个没使用的slot,记录slot的id到handle[i]中并返回即可。
if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))ereport(ERROR,(errcode(ERRCODE_INSUFFICIENT_RESOURCES),errmsg("could not register background process"),errhint("You may need to increase max_worker_processes.")));++wstate->nworkers;}/* All done. */MemoryContextSwitchTo(oldcontext);return wstate;
}
第三步:剩余流程
void
test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,shm_mq_handle **output, shm_mq_handle **input)
{dsm_segment *seg;test_shm_mq_header *hdr;shm_mq *outq = NULL; /* placate compiler */shm_mq *inq = NULL; /* placate compiler */worker_state *wstate;/* Set up a dynamic shared memory segment. */setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);*segp = seg;/* Register background workers. */wstate = setup_background_workers(nworkers, seg);
第三步到这里了,shm_mq_handle的作用是构建一个shm_mq_handle,记录dsm seg、mq、bgworker的handle
/* Attach the queues. */*output = shm_mq_attach(outq, seg, wstate->handle[0]);*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
用取一个PID的方式检查worker是不是ready了。
wait_for_workers_to_become_ready(wstate, hdr);cancel_on_dsm_detach(seg, cleanup_background_workers,PointerGetDatum(wstate));pfree(wstate);
}
到这里内存初始化的工作做完了。后面可以直接调用mq的api发送信息,本篇不在关注。
相关文章:
Postgresql源码(110)分析dsm动态共享内存分配与共享内存mq实例
相关 《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》 《Linux内存映射函数mmap与匿名内存块》 《Linux共享内存与子进程继承》 0 概念 数据结构含义: dsm_segment(动态共享内存段):…...
51单片机学习--蜂鸣器播放音乐
由原理图可知,蜂鸣器BEEP与P1_5 相关,但其实这个原理图有错,实测接的是P2_5 下面这个代码就是以500HZ的频率响500ms的例子 sbit Buzzer P2^5;unsigned char KeyNum; unsigned int i;void main() {while(1){KeyNum Key();if(KeyNum){for(i …...
【Vue组件eval方法的使用】
Vue页面中条件可以放在当前vue页面中而无需影响到组件 如 这是我的表格操作列按钮,需求是第四个按钮如果表格当前数据的is_execl字段为0则显示否则隐藏 这种条件判断很频繁 如果像之前一样给一个标识,页面多了就难以维护,而且判断条件如果不…...
C++ 多文件结构和编译预处理命令
1. C程序的一般组织结构 C源程序的结构基本上都是由3个部分构成:类的定义、类的成员的实现和主函数。因为所编写的程序比较小,所以这三个部分都写在了同一个文件当中。在规模比较大的项目中,往往需要多个源程序文件,每个源程序文…...
QT实现中英文键盘
使用Qt中实现中英文键盘,支持各种linux嵌入式设备。 实现思路:需要一个中文字体库,将字体库加载到一个Hash容器,字母和拼音作为key值,对应的中文作为value值。 核心代码: #include "UKeyBoard.h"…...
java中并发编程CompletableFuture和supplyAsync的用法
在Java中,并发编程可以使用CompletableFuture类来实现异步操作和并行任务。其中,supplyAsync是CompletableFuture类的一个静态方法,用于执行一个有返回值的异步任务。 supplyAsync方法的语法如下: public static <U> Comp…...
chrony服务器
目录 1.chrony简介 2.chrony安装配置 2.1 chrony安装及配置 非常重要:在配置之前,检查防火墙和selinux是否关闭 3.将chrony配置为其他主机的时间服务器 3.1 修改chrony配置文件 4.chronyc命令 1.chrony简介 chrony 是开源的遵循网络时间协议&…...
春秋云镜 CVE-2021-24762
春秋云镜 CVE-2021-24762 WordPress Plugin Perfect Survey 注入 靶标介绍 WordPress Perfect Survey plugin在1.5.2之前版本存在SQL注入漏洞,该漏洞源于基于数据库的应用缺少对外部输入SQL语句的验证。攻击者可利用该漏洞执行非法SQL命令。 启动场景 漏洞利用 …...
K8s中的Service
Service 1.Service存在的意义? (1)pod注册在service里面,做服务发现使用 (2)定义一组Pod访问策略(负载均衡) 2.Pod和Service关系 通过service实现Pod的负载均衡 3.常用Service类…...
[软件工程] 全局分析规格说明书模板
1 价值需求 描述目标系统的价值需求,可以附上商业模式画布。 1.1 利益相关者 描述目标系统的利益相关者,包括终端用户、企业组织、投资人等。 1.2 系统愿景 描述利益相关者共同达成一致的愿景,该愿景的描述需要对准企业的战略目标。 1.3 系统…...
【JAVASE】封装
⭐ 作者:小胡_不糊涂 🌱 作者主页:小胡_不糊涂的个人主页 📀 收录专栏:浅谈Java 💖 持续更文,关注博主少走弯路,谢谢大家支持 💖 封装 1. 什么是封装2. 访问限定符3. 封装…...
Java多线程(四)
目录 一、线程的状态 1.1 观察线程的所有状态 1.2 线程状态和状态转移的意义 1.2.1 NEW、RUNNABLE、TERMINATED状态转换 1.2.2 WAITING、BLOCKED、TIMED_WAITING状态转换 1.2.3 yield()大公无私让出cpu 一、线程的状态 1.1 观察线程的所有状态 public class Demo9 {public st…...
Linux 文件系统预留空间
在一次磁盘打满的场景下,使用了df -h命令来查看磁盘使用率。 [roottest tmp]# df -h 文件系统 容量 已用 可用 已用% 挂载点 devtmpfs 28G 0 28G 0% /dev tmpfs 28G 0 28G 0% /dev/shm tmpfs 28G 2.8G …...
篇一:单例模式:C++中的独一无二
篇一:“单例模式:C中的独一无二” 设计模式在软件开发中起到了至关重要的作用,其中单例模式是最为常用且基础的设计模式之一。单例模式确保一个类只有一个实例,并提供全局访问点,以保证系统中的某个对象只有唯一的存在…...
JVM之内存结构
1.程序计数器 定义:程序计数器(Program Counter Register)是JVM中一块较小的内存空间。解释器在解释JVM指令为机器码以供CPU执行时,会去程序计数器当中找到jvm指令的执行地址。 作用:记住下一条jvm指令的执行地址 特…...
C#实现结构体与字节流的相互转化
unity项目中,涉及到与C的相互通信,而通信接口为C封好的动态库。所以,传输信息时,需要向C端发送字节流信息。 对此,需将结构体数据转为字节流,其代码如下: public static byte[] StructToBytes(…...
用LangChain开源框架实现知识机器人
前言 Large Language Models (LLMs)在2020年OpenAI 的 GPT-3 的发布而进入世界舞台 。从那时起,他们稳步增长进入公众视野。 众所周知 OpenAI 的 API 无法联网,所以大家如果想通过它的API实现联网搜索并给出回答、总结 PDF 文档、基于某个 Youtube 视频…...
HCIP——前期综合实验
前期综合实验 一、实验拓扑二、实验要求三、实验思路四、实验步骤1、配置接口IP地址2、交换机配置划分vlan10以及vlan203、总部分部,骨干网配置OSPF分部总部骨干网 4、配置BGP建立邻居关系总部骨干网分部 5、发布用户网段6、将下一跳改为本地7、允许AS重复8、重发布…...
【2023年电赛】运动目标控制与自动追踪系统(E 题)最简单实现
本方案的思路是最简单的不涉及复杂算法:识别矩形框,标记矩形框,输出坐标和中心点,计算长度,控制舵机移动固定长度!仅供完成基础功能参考,不喜勿喷! # 实现运动目标控制与自动追踪系…...
【IMX6ULL驱动开发学习】22.IMX6ULL开发板读取ADC(以MQ-135为例)
IMX6ULL一共有两个ADC,每个ADC都有八个通道,但他们共用一个ADC控制器 1.设备树 在imx6ull.dtsi文件中已经帮我们定义好了adc1的节点部分信息 adc1: adc02198000 {compatible "fsl,imx6ul-adc", "fsl,vf610-adc";reg <0x0219…...
【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型
摘要 拍照搜题系统采用“三层管道(多模态 OCR → 语义检索 → 答案渲染)、两级检索(倒排 BM25 向量 HNSW)并以大语言模型兜底”的整体框架: 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后,分别用…...
以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...
解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八
现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet,点击确认后如下提示 最终上报fail 解决方法 内核升级导致,需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...
将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?
Otsu 是一种自动阈值化方法,用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理,能够自动确定一个阈值,将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...
今日科技热点速览
🔥 今日科技热点速览 🎮 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售,主打更强图形性能与沉浸式体验,支持多模态交互,受到全球玩家热捧 。 🤖 人工智能持续突破 DeepSeek-R1&…...
今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存
文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...
Java线上CPU飙高问题排查全指南
一、引言 在Java应用的线上运行环境中,CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时,通常会导致应用响应缓慢,甚至服务不可用,严重影响用户体验和业务运行。因此,掌握一套科学有效的CPU飙高问题排查方法&…...
Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?
Redis 的发布订阅(Pub/Sub)模式与专业的 MQ(Message Queue)如 Kafka、RabbitMQ 进行比较,核心的权衡点在于:简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...
NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合
在汽车智能化的汹涌浪潮中,车辆不再仅仅是传统的交通工具,而是逐步演变为高度智能的移动终端。这一转变的核心支撑,来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒(T-Box)方案:NXP S32K146 与…...
