Postgresql源码(110)分析dsm动态共享内存分配与共享内存mq实例
相关
《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》
《Linux内存映射函数mmap与匿名内存块》
《Linux共享内存与子进程继承》
0 概念
数据结构含义:
- dsm_segment(动态共享内存段):
- 每个后端进程可以通过使用dsm_segment来访问共享内存。dsm_segment包含了内存分配和释放等操作所需的信息,并提供了一组函数来管理和操作共享内存。每个会话(session)都可以有一个或多个dsm_segment。
- 提供共享内存,dsm_segment表示申请的一个共享内存段,对于dsm api来说这是代表共享内存的最小单位。
- shm_mq_handle(共享内存消息队列句柄)
- shm_mq_handle则是用于在共享内存中进行消息传递的句柄。它提供了一组函数来发送和接收消息,并提供了同步和互斥机制,确保多个进程之间的顺序和一致性。
- 使用dsm_segment提供共享内存段做进程通信。
在实际使用中,通常会将它们组合在一起,以实现共享内存中的消息传递机制。
1 shm_toc初始化一段共享内存,共享内存是从哪来的?
在PG代码中可以看到shm_toc初始化一段内存,在头部放置shm_toc,这块内存叫做一个内存段,shm_toc_create函数接受已经申请好的内存地址,在头部初始化shm_toc(表示内存段头),后面可以存放用户自定义数据,比如message queue所需的结构体、数据等。
初始化一段共享内存,在头上放shm_toc结构,明显是用来记录内存段管理的一些数据。
struct shm_toc
{uint64 toc_magic; /* Magic number identifying this TOC */slock_t toc_mutex; /* Spinlock for mutual exclusion */Size toc_total_bytes; /* Bytes managed by this TOC */Size toc_allocated_bytes; /* Bytes allocated of those managed */uint32 toc_nentry; /* Number of entries in TOC */shm_toc_entry toc_entry[FLEXIBLE_ARRAY_MEMBER];
};
初始化
shm_toc *
shm_toc_create(uint64 magic, void *address, Size nbytes)
{shm_toc *toc = (shm_toc *) address;Assert(nbytes > offsetof(shm_toc, toc_entry));toc->toc_magic = magic;SpinLockInit(&toc->toc_mutex);/** The alignment code in shm_toc_allocate() assumes that the starting* value is buffer-aligned.*/toc->toc_total_bytes = BUFFERALIGN_DOWN(nbytes);toc->toc_allocated_bytes = 0;toc->toc_nentry = 0;return toc;
}
那么shm_toc_create用的内存是从哪来的?
2 动态mmap一段新的共享内存(dsm机制)
- Postgresql能看到很多dsm开头的函数,这类函数属于运行时动态申请共享内存模块( dynamic shared memory)。
- 《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》介绍过,PG的共享内存是在启动时,直接用
mmap一次性申请大匿名块,然后自己切分使用的。但如果运行时需要申请新的、不定大小的共享内存块,肯定无法再启动时预先申请,这就引入了dsm模块。
3 dsm申请共享内存应用实例:mq
在源码中有一个非常好的例子,可以用来分析dsm申请内存用作mq的方法:
src/test/modules/test_shm_mq
下面对代码流程做一些分析,主要分析动态申请共享内存的过程,不涉及初始化后的使用流程。
3.1 内存初始化:test_shm_mq_setup
void
test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,shm_mq_handle **output, shm_mq_handle **input)
{dsm_segment *seg;test_shm_mq_header *hdr;shm_mq *outq = NULL; /* placate compiler */shm_mq *inq = NULL; /* placate compiler */worker_state *wstate;/* Set up a dynamic shared memory segment. */setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);*segp = seg;/* Register background workers. */wstate = setup_background_workers(nworkers, seg);/* Attach the queues. */*output = shm_mq_attach(outq, seg, wstate->handle[0]);*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);/* Wait for workers to become ready. */wait_for_workers_to_become_ready(wstate, hdr);/** Once we reach this point, all workers are ready. We no longer need to* kill them if we die; they'll die on their own as the message queues* shut down.*/cancel_on_dsm_detach(seg, cleanup_background_workers,PointerGetDatum(wstate));pfree(wstate);
}
第一步:申请共享内存setup_dynamic_shared_memory
static void
setup_dynamic_shared_memory(int64 queue_size, int nworkers,dsm_segment **segp, test_shm_mq_header **hdrp,shm_mq **outp, shm_mq **inp)
{shm_toc_estimator e;int i;Size segsize;dsm_segment *seg;shm_toc *toc;test_shm_mq_header *hdr;
用estimator来评估需要多少共享内存(可以不用自己算了:)
评估器的接口:
- shm_toc_initialize_estimator:初始化记录(key的个数和chunk的总大小)。
- shm_toc_estimate_chunk:需要多大的chunk?调用一次记录一个指定大小的chunk,对齐到32字节上。
- shm_toc_estimate_keys:需要几个key?这里记录一下
- shm_toc_estimate:把之前记录的结构换算成大小
评估器的原理:记录key的个数和chunk的总大小即可。
shm_toc_initialize_estimator(&e);shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));for (i = 0; i <= nworkers; ++i)shm_toc_estimate_chunk(&e, (Size) queue_size);shm_toc_estimate_keys(&e, 2 + nworkers);segsize = shm_toc_estimate(&e);
评估完了开始用dsm_create申请新的共享内存了!
dsm_impl_op→dsm_impl_op→dsm_impl_mmap来具体从os申请共享内存,不深入了。
现在拿到seg就是dsm_segment动态共享内存段,代表一段共享内存(段内部有dsm_control一套管理机制,需要深入的话可以看看dsm_create代码),这里只需要指导这是一段按需要大小mmap出来的共享内存就好了。
/* Create the shared memory segment and establish a table of contents. */seg = dsm_create(shm_toc_estimate(&e), 0);
在共享内存其实的位置,放一个shm_toc结构,作为后面数据的表头和索引
---------------- 共享内存起始地址
struct shm_toc
{uint64 toc_magic; /* Magic number identifying this TOC */slock_t toc_mutex; /* Spinlock for mutual exclusion */Size toc_total_bytes; /* Bytes managed by this TOC */Size toc_allocated_bytes; /* Bytes allocated of those managed */uint32 toc_nentry; /* Number of entries in TOC */shm_toc_entry toc_entry[0];
};
---------------- 申请空间是从最高地址开始的用的,索引数组toc_entry从低到高,数据从高到低,和page一样
toc_entry[0] -------------\
---------------- |
toc_entry[1] |
---------------- |
toc_entry[2] ---------\ |
---------------- | |
data <-----------------/ |
... |
data |
... |
data <----------------------/
----------------
toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),segsize);
申请一段用户自定义数据空间,放一个test_shm_mq_header结构。
hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));SpinLockInit(&hdr->mutex);hdr->workers_total = nworkers;hdr->workers_attached = 0;hdr->workers_ready = 0;
初始化好数据结构后,把数据插进去,位置前面shm_toc_allocate已经申请好了,这一步shm_toc_insert主要是配置toc_entry那个索引数组!具体就是toc_entry[0]的key和offset,下次用的时候用0就能找到offset了。
shm_toc_insert(toc, 0, hdr);/* Set up one message queue per worker, plus one. */for (i = 0; i <= nworkers; ++i){shm_mq *mq;
每个worker申请一个queue_size,按上面方法allocate+insert。
mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),(Size) queue_size);shm_toc_insert(toc, i + 1, mq);if (i == 0){
把mq用户需要的数据结构传进去进行需要的配置即可,shm_toc_insert完了也可以用,alloc出来的地址直接指向共享内存,可以在当前进程一直使用。
/* We send messages to the first queue. */shm_mq_set_sender(mq, MyProc);*outp = mq;}if (i == nworkers){/* We receive messages from the last queue. */shm_mq_set_receiver(mq, MyProc);*inp = mq;}}/* Return results to caller. */*segp = seg;*hdrp = hdr;
}
第二步:配置bgworker:setup_background_workers
static worker_state *
setup_background_workers(int nworkers, dsm_segment *seg)
{MemoryContext oldcontext;BackgroundWorker worker;worker_state *wstate;int i;oldcontext = MemoryContextSwitchTo(CurTransactionContext);
setup_background_workers函数目标是构造worker_state,下面是worker_state的内存结构:
------------------------------
typedef struct
{int nworkers;BackgroundWorkerHandle *handle[]; // flex数组,每个位置对应一个worker{int slot;uint64 generation;}
} worker_state;
------------------------------
handle[0]
------------------------------
handle[1]
------------------------------
handle[2]
------------------------------
wstate = MemoryContextAlloc(TopTransactionContext,offsetof(worker_state, handle) +sizeof(BackgroundWorkerHandle *) * nworkers);wstate->nworkers = 0;on_dsm_detach(seg, cleanup_background_workers,PointerGetDatum(wstate));/* Configure a worker. */
配通用的worker通用部分:
memset(&worker, 0, sizeof(worker));worker.bgw_flags = BGWORKER_SHMEM_ACCESS;worker.bgw_start_time = BgWorkerStart_ConsistentState;worker.bgw_restart_time = BGW_NEVER_RESTART;sprintf(worker.bgw_library_name, "test_shm_mq");sprintf(worker.bgw_function_name, "test_shm_mq_main");snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq");worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));/* set bgw_notify_pid, so we can detect if the worker stops */worker.bgw_notify_pid = MyProcPid;
把worker通用的部分传进去,拼每个worker特有的部分
/* Register the workers. */for (i = 0; i < nworkers; ++i){
从BackgroundWorkerData->slot数组(bgworker管理的全局变量记录使用所有使用中未使用的bgworker,每个worker一个slot)里面拿到一个没使用的slot,记录slot的id到handle[i]中并返回即可。
if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))ereport(ERROR,(errcode(ERRCODE_INSUFFICIENT_RESOURCES),errmsg("could not register background process"),errhint("You may need to increase max_worker_processes.")));++wstate->nworkers;}/* All done. */MemoryContextSwitchTo(oldcontext);return wstate;
}
第三步:剩余流程
void
test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,shm_mq_handle **output, shm_mq_handle **input)
{dsm_segment *seg;test_shm_mq_header *hdr;shm_mq *outq = NULL; /* placate compiler */shm_mq *inq = NULL; /* placate compiler */worker_state *wstate;/* Set up a dynamic shared memory segment. */setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);*segp = seg;/* Register background workers. */wstate = setup_background_workers(nworkers, seg);
第三步到这里了,shm_mq_handle的作用是构建一个shm_mq_handle,记录dsm seg、mq、bgworker的handle
/* Attach the queues. */*output = shm_mq_attach(outq, seg, wstate->handle[0]);*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
用取一个PID的方式检查worker是不是ready了。
wait_for_workers_to_become_ready(wstate, hdr);cancel_on_dsm_detach(seg, cleanup_background_workers,PointerGetDatum(wstate));pfree(wstate);
}
到这里内存初始化的工作做完了。后面可以直接调用mq的api发送信息,本篇不在关注。
相关文章:
Postgresql源码(110)分析dsm动态共享内存分配与共享内存mq实例
相关 《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》 《Linux内存映射函数mmap与匿名内存块》 《Linux共享内存与子进程继承》 0 概念 数据结构含义: dsm_segment(动态共享内存段):…...
51单片机学习--蜂鸣器播放音乐
由原理图可知,蜂鸣器BEEP与P1_5 相关,但其实这个原理图有错,实测接的是P2_5 下面这个代码就是以500HZ的频率响500ms的例子 sbit Buzzer P2^5;unsigned char KeyNum; unsigned int i;void main() {while(1){KeyNum Key();if(KeyNum){for(i …...
【Vue组件eval方法的使用】
Vue页面中条件可以放在当前vue页面中而无需影响到组件 如 这是我的表格操作列按钮,需求是第四个按钮如果表格当前数据的is_execl字段为0则显示否则隐藏 这种条件判断很频繁 如果像之前一样给一个标识,页面多了就难以维护,而且判断条件如果不…...
C++ 多文件结构和编译预处理命令
1. C程序的一般组织结构 C源程序的结构基本上都是由3个部分构成:类的定义、类的成员的实现和主函数。因为所编写的程序比较小,所以这三个部分都写在了同一个文件当中。在规模比较大的项目中,往往需要多个源程序文件,每个源程序文…...
QT实现中英文键盘
使用Qt中实现中英文键盘,支持各种linux嵌入式设备。 实现思路:需要一个中文字体库,将字体库加载到一个Hash容器,字母和拼音作为key值,对应的中文作为value值。 核心代码: #include "UKeyBoard.h"…...
java中并发编程CompletableFuture和supplyAsync的用法
在Java中,并发编程可以使用CompletableFuture类来实现异步操作和并行任务。其中,supplyAsync是CompletableFuture类的一个静态方法,用于执行一个有返回值的异步任务。 supplyAsync方法的语法如下: public static <U> Comp…...
chrony服务器
目录 1.chrony简介 2.chrony安装配置 2.1 chrony安装及配置 非常重要:在配置之前,检查防火墙和selinux是否关闭 3.将chrony配置为其他主机的时间服务器 3.1 修改chrony配置文件 4.chronyc命令 1.chrony简介 chrony 是开源的遵循网络时间协议&…...
春秋云镜 CVE-2021-24762
春秋云镜 CVE-2021-24762 WordPress Plugin Perfect Survey 注入 靶标介绍 WordPress Perfect Survey plugin在1.5.2之前版本存在SQL注入漏洞,该漏洞源于基于数据库的应用缺少对外部输入SQL语句的验证。攻击者可利用该漏洞执行非法SQL命令。 启动场景 漏洞利用 …...
K8s中的Service
Service 1.Service存在的意义? (1)pod注册在service里面,做服务发现使用 (2)定义一组Pod访问策略(负载均衡) 2.Pod和Service关系 通过service实现Pod的负载均衡 3.常用Service类…...
[软件工程] 全局分析规格说明书模板
1 价值需求 描述目标系统的价值需求,可以附上商业模式画布。 1.1 利益相关者 描述目标系统的利益相关者,包括终端用户、企业组织、投资人等。 1.2 系统愿景 描述利益相关者共同达成一致的愿景,该愿景的描述需要对准企业的战略目标。 1.3 系统…...
【JAVASE】封装
⭐ 作者:小胡_不糊涂 🌱 作者主页:小胡_不糊涂的个人主页 📀 收录专栏:浅谈Java 💖 持续更文,关注博主少走弯路,谢谢大家支持 💖 封装 1. 什么是封装2. 访问限定符3. 封装…...
Java多线程(四)
目录 一、线程的状态 1.1 观察线程的所有状态 1.2 线程状态和状态转移的意义 1.2.1 NEW、RUNNABLE、TERMINATED状态转换 1.2.2 WAITING、BLOCKED、TIMED_WAITING状态转换 1.2.3 yield()大公无私让出cpu 一、线程的状态 1.1 观察线程的所有状态 public class Demo9 {public st…...
Linux 文件系统预留空间
在一次磁盘打满的场景下,使用了df -h命令来查看磁盘使用率。 [roottest tmp]# df -h 文件系统 容量 已用 可用 已用% 挂载点 devtmpfs 28G 0 28G 0% /dev tmpfs 28G 0 28G 0% /dev/shm tmpfs 28G 2.8G …...
篇一:单例模式:C++中的独一无二
篇一:“单例模式:C中的独一无二” 设计模式在软件开发中起到了至关重要的作用,其中单例模式是最为常用且基础的设计模式之一。单例模式确保一个类只有一个实例,并提供全局访问点,以保证系统中的某个对象只有唯一的存在…...
JVM之内存结构
1.程序计数器 定义:程序计数器(Program Counter Register)是JVM中一块较小的内存空间。解释器在解释JVM指令为机器码以供CPU执行时,会去程序计数器当中找到jvm指令的执行地址。 作用:记住下一条jvm指令的执行地址 特…...
C#实现结构体与字节流的相互转化
unity项目中,涉及到与C的相互通信,而通信接口为C封好的动态库。所以,传输信息时,需要向C端发送字节流信息。 对此,需将结构体数据转为字节流,其代码如下: public static byte[] StructToBytes(…...
用LangChain开源框架实现知识机器人
前言 Large Language Models (LLMs)在2020年OpenAI 的 GPT-3 的发布而进入世界舞台 。从那时起,他们稳步增长进入公众视野。 众所周知 OpenAI 的 API 无法联网,所以大家如果想通过它的API实现联网搜索并给出回答、总结 PDF 文档、基于某个 Youtube 视频…...
HCIP——前期综合实验
前期综合实验 一、实验拓扑二、实验要求三、实验思路四、实验步骤1、配置接口IP地址2、交换机配置划分vlan10以及vlan203、总部分部,骨干网配置OSPF分部总部骨干网 4、配置BGP建立邻居关系总部骨干网分部 5、发布用户网段6、将下一跳改为本地7、允许AS重复8、重发布…...
【2023年电赛】运动目标控制与自动追踪系统(E 题)最简单实现
本方案的思路是最简单的不涉及复杂算法:识别矩形框,标记矩形框,输出坐标和中心点,计算长度,控制舵机移动固定长度!仅供完成基础功能参考,不喜勿喷! # 实现运动目标控制与自动追踪系…...
【IMX6ULL驱动开发学习】22.IMX6ULL开发板读取ADC(以MQ-135为例)
IMX6ULL一共有两个ADC,每个ADC都有八个通道,但他们共用一个ADC控制器 1.设备树 在imx6ull.dtsi文件中已经帮我们定义好了adc1的节点部分信息 adc1: adc02198000 {compatible "fsl,imx6ul-adc", "fsl,vf610-adc";reg <0x0219…...
sqlserver 根据指定字符 解析拼接字符串
DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...
用机器学习破解新能源领域的“弃风”难题
音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...
七、数据库的完整性
七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...
C/C++ 中附加包含目录、附加库目录与附加依赖项详解
在 C/C 编程的编译和链接过程中,附加包含目录、附加库目录和附加依赖项是三个至关重要的设置,它们相互配合,确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中,这些概念容易让人混淆,但深入理解它们的作用和联…...
MySQL 8.0 事务全面讲解
以下是一个结合两次回答的 MySQL 8.0 事务全面讲解,涵盖了事务的核心概念、操作示例、失败回滚、隔离级别、事务性 DDL 和 XA 事务等内容,并修正了查看隔离级别的命令。 MySQL 8.0 事务全面讲解 一、事务的核心概念(ACID) 事务是…...
【Linux】自动化构建-Make/Makefile
前言 上文我们讲到了Linux中的编译器gcc/g 【Linux】编译器gcc/g及其库的详细介绍-CSDN博客 本来我们将一个对于编译来说很重要的工具:make/makfile 1.背景 在一个工程中源文件不计其数,其按类型、功能、模块分别放在若干个目录中,mak…...
在 Spring Boot 项目里,MYSQL中json类型字段使用
前言: 因为程序特殊需求导致,需要mysql数据库存储json类型数据,因此记录一下使用流程 1.java实体中新增字段 private List<User> users 2.增加mybatis-plus注解 TableField(typeHandler FastjsonTypeHandler.class) private Lis…...
MFE(微前端) Module Federation:Webpack.config.js文件中每个属性的含义解释
以Module Federation 插件详为例,Webpack.config.js它可能的配置和含义如下: 前言 Module Federation 的Webpack.config.js核心配置包括: name filename(定义应用标识) remotes(引用远程模块࿰…...
