当前位置: 首页 > news >正文

Postgresql源码(110)分析dsm动态共享内存分配与共享内存mq实例

相关
《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》
《Linux内存映射函数mmap与匿名内存块》
《Linux共享内存与子进程继承》

0 概念

数据结构含义:

  1. dsm_segment(动态共享内存段):
    • 每个后端进程可以通过使用dsm_segment来访问共享内存。dsm_segment包含了内存分配和释放等操作所需的信息,并提供了一组函数来管理和操作共享内存。每个会话(session)都可以有一个或多个dsm_segment。
    • 提供共享内存,dsm_segment表示申请的一个共享内存段,对于dsm api来说这是代表共享内存的最小单位。
  2. shm_mq_handle(共享内存消息队列句柄)
    • shm_mq_handle则是用于在共享内存中进行消息传递的句柄。它提供了一组函数来发送和接收消息,并提供了同步和互斥机制,确保多个进程之间的顺序和一致性。
    • 使用dsm_segment提供共享内存段做进程通信。

在实际使用中,通常会将它们组合在一起,以实现共享内存中的消息传递机制。

1 shm_toc初始化一段共享内存,共享内存是从哪来的?

在PG代码中可以看到shm_toc初始化一段内存,在头部放置shm_toc,这块内存叫做一个内存段,shm_toc_create函数接受已经申请好的内存地址,在头部初始化shm_toc(表示内存段头),后面可以存放用户自定义数据,比如message queue所需的结构体、数据等。

初始化一段共享内存,在头上放shm_toc结构,明显是用来记录内存段管理的一些数据。

struct shm_toc
{uint64		toc_magic;		/* Magic number identifying this TOC */slock_t		toc_mutex;		/* Spinlock for mutual exclusion */Size		toc_total_bytes;	/* Bytes managed by this TOC */Size		toc_allocated_bytes;	/* Bytes allocated of those managed */uint32		toc_nentry;		/* Number of entries in TOC */shm_toc_entry toc_entry[FLEXIBLE_ARRAY_MEMBER];
};

初始化

shm_toc *
shm_toc_create(uint64 magic, void *address, Size nbytes)
{shm_toc    *toc = (shm_toc *) address;Assert(nbytes > offsetof(shm_toc, toc_entry));toc->toc_magic = magic;SpinLockInit(&toc->toc_mutex);/** The alignment code in shm_toc_allocate() assumes that the starting* value is buffer-aligned.*/toc->toc_total_bytes = BUFFERALIGN_DOWN(nbytes);toc->toc_allocated_bytes = 0;toc->toc_nentry = 0;return toc;
}

那么shm_toc_create用的内存是从哪来的?

2 动态mmap一段新的共享内存(dsm机制)

  1. Postgresql能看到很多dsm开头的函数,这类函数属于运行时动态申请共享内存模块( dynamic shared memory)。
  2. 《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》介绍过,PG的共享内存是在启动时,直接用mmap 一次性申请大匿名块,然后自己切分使用的。但如果运行时需要申请新的、不定大小的共享内存块,肯定无法再启动时预先申请,这就引入了dsm模块。

3 dsm申请共享内存应用实例:mq

在源码中有一个非常好的例子,可以用来分析dsm申请内存用作mq的方法:

src/test/modules/test_shm_mq

下面对代码流程做一些分析,主要分析动态申请共享内存的过程,不涉及初始化后的使用流程。

3.1 内存初始化:test_shm_mq_setup

void
test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,shm_mq_handle **output, shm_mq_handle **input)
{dsm_segment *seg;test_shm_mq_header *hdr;shm_mq	   *outq = NULL;	/* placate compiler */shm_mq	   *inq = NULL;		/* placate compiler */worker_state *wstate;/* Set up a dynamic shared memory segment. */setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);*segp = seg;/* Register background workers. */wstate = setup_background_workers(nworkers, seg);/* Attach the queues. */*output = shm_mq_attach(outq, seg, wstate->handle[0]);*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);/* Wait for workers to become ready. */wait_for_workers_to_become_ready(wstate, hdr);/** Once we reach this point, all workers are ready.  We no longer need to* kill them if we die; they'll die on their own as the message queues* shut down.*/cancel_on_dsm_detach(seg, cleanup_background_workers,PointerGetDatum(wstate));pfree(wstate);
}

第一步:申请共享内存setup_dynamic_shared_memory

static void
setup_dynamic_shared_memory(int64 queue_size, int nworkers,dsm_segment **segp, test_shm_mq_header **hdrp,shm_mq **outp, shm_mq **inp)
{shm_toc_estimator e;int			i;Size		segsize;dsm_segment *seg;shm_toc    *toc;test_shm_mq_header *hdr;

用estimator来评估需要多少共享内存(可以不用自己算了:)
评估器的接口:

  • shm_toc_initialize_estimator:初始化记录(key的个数和chunk的总大小)。
  • shm_toc_estimate_chunk:需要多大的chunk?调用一次记录一个指定大小的chunk,对齐到32字节上。
  • shm_toc_estimate_keys:需要几个key?这里记录一下
  • shm_toc_estimate:把之前记录的结构换算成大小

评估器的原理:记录key的个数和chunk的总大小即可。

	shm_toc_initialize_estimator(&e);shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));for (i = 0; i <= nworkers; ++i)shm_toc_estimate_chunk(&e, (Size) queue_size);shm_toc_estimate_keys(&e, 2 + nworkers);segsize = shm_toc_estimate(&e);

评估完了开始用dsm_create申请新的共享内存了!

dsm_impl_op→dsm_impl_op→dsm_impl_mmap来具体从os申请共享内存,不深入了。

现在拿到seg就是dsm_segment动态共享内存段,代表一段共享内存(段内部有dsm_control一套管理机制,需要深入的话可以看看dsm_create代码),这里只需要指导这是一段按需要大小mmap出来的共享内存就好了。

	/* Create the shared memory segment and establish a table of contents. */seg = dsm_create(shm_toc_estimate(&e), 0);

在共享内存其实的位置,放一个shm_toc结构,作为后面数据的表头和索引

---------------- 共享内存起始地址
struct shm_toc
{uint64		toc_magic;				/* Magic number identifying this TOC */slock_t		toc_mutex;				/* Spinlock for mutual exclusion */Size		toc_total_bytes;		/* Bytes managed by this TOC */Size		toc_allocated_bytes;	/* Bytes allocated of those managed */uint32		toc_nentry;				/* Number of entries in TOC */shm_toc_entry toc_entry[0];
};
---------------- 申请空间是从最高地址开始的用的,索引数组toc_entry从低到高,数据从高到低,和page一样
toc_entry[0]   -------------\
----------------             |
toc_entry[1]                 |
----------------             |
toc_entry[2]  ---------\     |
----------------       |     |
data <-----------------/     |
...                          |
data                         |
...                          |
data <----------------------/
----------------
	toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),segsize);

申请一段用户自定义数据空间,放一个test_shm_mq_header结构。

	hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));SpinLockInit(&hdr->mutex);hdr->workers_total = nworkers;hdr->workers_attached = 0;hdr->workers_ready = 0;

初始化好数据结构后,把数据插进去,位置前面shm_toc_allocate已经申请好了,这一步shm_toc_insert主要是配置toc_entry那个索引数组!具体就是toc_entry[0]的key和offset,下次用的时候用0就能找到offset了。

	shm_toc_insert(toc, 0, hdr);/* Set up one message queue per worker, plus one. */for (i = 0; i <= nworkers; ++i){shm_mq	   *mq;

每个worker申请一个queue_size,按上面方法allocate+insert。

		mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),(Size) queue_size);shm_toc_insert(toc, i + 1, mq);if (i == 0){

把mq用户需要的数据结构传进去进行需要的配置即可,shm_toc_insert完了也可以用,alloc出来的地址直接指向共享内存,可以在当前进程一直使用。

			/* We send messages to the first queue. */shm_mq_set_sender(mq, MyProc);*outp = mq;}if (i == nworkers){/* We receive messages from the last queue. */shm_mq_set_receiver(mq, MyProc);*inp = mq;}}/* Return results to caller. */*segp = seg;*hdrp = hdr;
}

第二步:配置bgworker:setup_background_workers

static worker_state *
setup_background_workers(int nworkers, dsm_segment *seg)
{MemoryContext oldcontext;BackgroundWorker worker;worker_state *wstate;int			i;oldcontext = MemoryContextSwitchTo(CurTransactionContext);

setup_background_workers函数目标是构造worker_state,下面是worker_state的内存结构:

------------------------------
typedef struct
{int			nworkers;BackgroundWorkerHandle *handle[];  //  flex数组,每个位置对应一个worker{int			slot;uint64		generation;}
} worker_state;
------------------------------
handle[0]    
------------------------------
handle[1]
------------------------------
handle[2]
------------------------------
	wstate = MemoryContextAlloc(TopTransactionContext,offsetof(worker_state, handle) +sizeof(BackgroundWorkerHandle *) * nworkers);wstate->nworkers = 0;on_dsm_detach(seg, cleanup_background_workers,PointerGetDatum(wstate));/* Configure a worker. */

配通用的worker通用部分:

	memset(&worker, 0, sizeof(worker));worker.bgw_flags = BGWORKER_SHMEM_ACCESS;worker.bgw_start_time = BgWorkerStart_ConsistentState;worker.bgw_restart_time = BGW_NEVER_RESTART;sprintf(worker.bgw_library_name, "test_shm_mq");sprintf(worker.bgw_function_name, "test_shm_mq_main");snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq");worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));/* set bgw_notify_pid, so we can detect if the worker stops */worker.bgw_notify_pid = MyProcPid;

把worker通用的部分传进去,拼每个worker特有的部分

	/* Register the workers. */for (i = 0; i < nworkers; ++i){

BackgroundWorkerData->slot数组(bgworker管理的全局变量记录使用所有使用中未使用的bgworker,每个worker一个slot)里面拿到一个没使用的slot,记录slot的id到handle[i]中并返回即可。

		if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))ereport(ERROR,(errcode(ERRCODE_INSUFFICIENT_RESOURCES),errmsg("could not register background process"),errhint("You may need to increase max_worker_processes.")));++wstate->nworkers;}/* All done. */MemoryContextSwitchTo(oldcontext);return wstate;
}

第三步:剩余流程

void
test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,shm_mq_handle **output, shm_mq_handle **input)
{dsm_segment *seg;test_shm_mq_header *hdr;shm_mq	   *outq = NULL;	/* placate compiler */shm_mq	   *inq = NULL;		/* placate compiler */worker_state *wstate;/* Set up a dynamic shared memory segment. */setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);*segp = seg;/* Register background workers. */wstate = setup_background_workers(nworkers, seg);

第三步到这里了,shm_mq_handle的作用是构建一个shm_mq_handle,记录dsm seg、mq、bgworker的handle

	/* Attach the queues. */*output = shm_mq_attach(outq, seg, wstate->handle[0]);*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);

用取一个PID的方式检查worker是不是ready了。

	wait_for_workers_to_become_ready(wstate, hdr);cancel_on_dsm_detach(seg, cleanup_background_workers,PointerGetDatum(wstate));pfree(wstate);
}

到这里内存初始化的工作做完了。后面可以直接调用mq的api发送信息,本篇不在关注。

相关文章:

Postgresql源码(110)分析dsm动态共享内存分配与共享内存mq实例

相关 《Postgresql源码&#xff08;90&#xff09;共享内存申请CreateSharedMemoryAndSemaphores》 《Linux内存映射函数mmap与匿名内存块》 《Linux共享内存与子进程继承》 0 概念 数据结构含义&#xff1a; dsm_segment&#xff08;动态共享内存段&#xff09;&#xff1a;…...

51单片机学习--蜂鸣器播放音乐

由原理图可知&#xff0c;蜂鸣器BEEP与P1_5 相关&#xff0c;但其实这个原理图有错&#xff0c;实测接的是P2_5 下面这个代码就是以500HZ的频率响500ms的例子 sbit Buzzer P2^5;unsigned char KeyNum; unsigned int i;void main() {while(1){KeyNum Key();if(KeyNum){for(i …...

【Vue组件eval方法的使用】

Vue页面中条件可以放在当前vue页面中而无需影响到组件 如 这是我的表格操作列按钮&#xff0c;需求是第四个按钮如果表格当前数据的is_execl字段为0则显示否则隐藏 这种条件判断很频繁 如果像之前一样给一个标识&#xff0c;页面多了就难以维护&#xff0c;而且判断条件如果不…...

C++ 多文件结构和编译预处理命令

1. C程序的一般组织结构 C源程序的结构基本上都是由3个部分构成&#xff1a;类的定义、类的成员的实现和主函数。因为所编写的程序比较小&#xff0c;所以这三个部分都写在了同一个文件当中。在规模比较大的项目中&#xff0c;往往需要多个源程序文件&#xff0c;每个源程序文…...

QT实现中英文键盘

使用Qt中实现中英文键盘&#xff0c;支持各种linux嵌入式设备。 实现思路&#xff1a;需要一个中文字体库&#xff0c;将字体库加载到一个Hash容器&#xff0c;字母和拼音作为key值&#xff0c;对应的中文作为value值。 核心代码&#xff1a; #include "UKeyBoard.h"…...

java中并发编程CompletableFuture和supplyAsync的用法

在Java中&#xff0c;并发编程可以使用CompletableFuture类来实现异步操作和并行任务。其中&#xff0c;supplyAsync是CompletableFuture类的一个静态方法&#xff0c;用于执行一个有返回值的异步任务。 supplyAsync方法的语法如下&#xff1a; public static <U> Comp…...

chrony服务器

目录 1.chrony简介 2.chrony安装配置 2.1 chrony安装及配置 非常重要&#xff1a;在配置之前&#xff0c;检查防火墙和selinux是否关闭 3.将chrony配置为其他主机的时间服务器 3.1 修改chrony配置文件 4.chronyc命令 1.chrony简介 chrony 是开源的遵循网络时间协议&…...

春秋云镜 CVE-2021-24762

春秋云镜 CVE-2021-24762 WordPress Plugin Perfect Survey 注入 靶标介绍 WordPress Perfect Survey plugin在1.5.2之前版本存在SQL注入漏洞&#xff0c;该漏洞源于基于数据库的应用缺少对外部输入SQL语句的验证。攻击者可利用该漏洞执行非法SQL命令。 启动场景 漏洞利用 …...

K8s中的Service

Service 1.Service存在的意义&#xff1f; &#xff08;1&#xff09;pod注册在service里面&#xff0c;做服务发现使用 &#xff08;2&#xff09;定义一组Pod访问策略&#xff08;负载均衡&#xff09; 2.Pod和Service关系 通过service实现Pod的负载均衡 3.常用Service类…...

[软件工程] 全局分析规格说明书模板

1 价值需求 描述目标系统的价值需求&#xff0c;可以附上商业模式画布。 1.1 利益相关者 描述目标系统的利益相关者&#xff0c;包括终端用户、企业组织、投资人等。 1.2 系统愿景 描述利益相关者共同达成一致的愿景&#xff0c;该愿景的描述需要对准企业的战略目标。 1.3 系统…...

【JAVASE】封装

⭐ 作者&#xff1a;小胡_不糊涂 &#x1f331; 作者主页&#xff1a;小胡_不糊涂的个人主页 &#x1f4c0; 收录专栏&#xff1a;浅谈Java &#x1f496; 持续更文&#xff0c;关注博主少走弯路&#xff0c;谢谢大家支持 &#x1f496; 封装 1. 什么是封装2. 访问限定符3. 封装…...

Java多线程(四)

目录 一、线程的状态 1.1 观察线程的所有状态 1.2 线程状态和状态转移的意义 1.2.1 NEW、RUNNABLE、TERMINATED状态转换 1.2.2 WAITING、BLOCKED、TIMED_WAITING状态转换 1.2.3 yield()大公无私让出cpu 一、线程的状态 1.1 观察线程的所有状态 public class Demo9 {public st…...

Linux 文件系统预留空间

在一次磁盘打满的场景下&#xff0c;使用了df -h命令来查看磁盘使用率。 [roottest tmp]# df -h 文件系统 容量 已用 可用 已用% 挂载点 devtmpfs 28G 0 28G 0% /dev tmpfs 28G 0 28G 0% /dev/shm tmpfs 28G 2.8G …...

篇一:单例模式:C++中的独一无二

篇一&#xff1a;“单例模式&#xff1a;C中的独一无二” 设计模式在软件开发中起到了至关重要的作用&#xff0c;其中单例模式是最为常用且基础的设计模式之一。单例模式确保一个类只有一个实例&#xff0c;并提供全局访问点&#xff0c;以保证系统中的某个对象只有唯一的存在…...

JVM之内存结构

1.程序计数器 定义&#xff1a;程序计数器&#xff08;Program Counter Register&#xff09;是JVM中一块较小的内存空间。解释器在解释JVM指令为机器码以供CPU执行时&#xff0c;会去程序计数器当中找到jvm指令的执行地址。 作用&#xff1a;记住下一条jvm指令的执行地址 特…...

C#实现结构体与字节流的相互转化

unity项目中&#xff0c;涉及到与C的相互通信&#xff0c;而通信接口为C封好的动态库。所以&#xff0c;传输信息时&#xff0c;需要向C端发送字节流信息。 对此&#xff0c;需将结构体数据转为字节流&#xff0c;其代码如下&#xff1a; public static byte[] StructToBytes(…...

用LangChain开源框架实现知识机器人

前言 Large Language Models (LLMs)在2020年OpenAI 的 GPT-3 的发布而进入世界舞台 。从那时起&#xff0c;他们稳步增长进入公众视野。 众所周知 OpenAI 的 API 无法联网&#xff0c;所以大家如果想通过它的API实现联网搜索并给出回答、总结 PDF 文档、基于某个 Youtube 视频…...

HCIP——前期综合实验

前期综合实验 一、实验拓扑二、实验要求三、实验思路四、实验步骤1、配置接口IP地址2、交换机配置划分vlan10以及vlan203、总部分部&#xff0c;骨干网配置OSPF分部总部骨干网 4、配置BGP建立邻居关系总部骨干网分部 5、发布用户网段6、将下一跳改为本地7、允许AS重复8、重发布…...

【2023年电赛】运动目标控制与自动追踪系统(E 题)最简单实现

本方案的思路是最简单的不涉及复杂算法&#xff1a;识别矩形框&#xff0c;标记矩形框&#xff0c;输出坐标和中心点&#xff0c;计算长度&#xff0c;控制舵机移动固定长度&#xff01;仅供完成基础功能参考&#xff0c;不喜勿喷&#xff01; # 实现运动目标控制与自动追踪系…...

【IMX6ULL驱动开发学习】22.IMX6ULL开发板读取ADC(以MQ-135为例)

IMX6ULL一共有两个ADC&#xff0c;每个ADC都有八个通道&#xff0c;但他们共用一个ADC控制器 1.设备树 在imx6ull.dtsi文件中已经帮我们定义好了adc1的节点部分信息 adc1: adc02198000 {compatible "fsl,imx6ul-adc", "fsl,vf610-adc";reg <0x0219…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

使用分级同态加密防御梯度泄漏

抽象 联邦学习 &#xff08;FL&#xff09; 支持跨分布式客户端进行协作模型训练&#xff0c;而无需共享原始数据&#xff0c;这使其成为在互联和自动驾驶汽车 &#xff08;CAV&#xff09; 等领域保护隐私的机器学习的一种很有前途的方法。然而&#xff0c;最近的研究表明&…...

在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module

1、为什么要修改 CONNECT 报文&#xff1f; 多租户隔离&#xff1a;自动为接入设备追加租户前缀&#xff0c;后端按 ClientID 拆分队列。零代码鉴权&#xff1a;将入站用户名替换为 OAuth Access-Token&#xff0c;后端 Broker 统一校验。灰度发布&#xff1a;根据 IP/地理位写…...

ffmpeg(四):滤镜命令

FFmpeg 的滤镜命令是用于音视频处理中的强大工具&#xff0c;可以完成剪裁、缩放、加水印、调色、合成、旋转、模糊、叠加字幕等复杂的操作。其核心语法格式一般如下&#xff1a; ffmpeg -i input.mp4 -vf "滤镜参数" output.mp4或者带音频滤镜&#xff1a; ffmpeg…...

Axios请求超时重发机制

Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式&#xff1a; 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...

Spring AI 入门:Java 开发者的生成式 AI 实践之路

一、Spring AI 简介 在人工智能技术快速迭代的今天&#xff0c;Spring AI 作为 Spring 生态系统的新生力量&#xff0c;正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务&#xff08;如 OpenAI、Anthropic&#xff09;的无缝对接&…...

【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分

一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计&#xff0c;提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合&#xff1a;各模块职责清晰&#xff0c;便于独立开发…...

今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存

文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...

AI,如何重构理解、匹配与决策?

AI 时代&#xff0c;我们如何理解消费&#xff1f; 作者&#xff5c;王彬 封面&#xff5c;Unplash 人们通过信息理解世界。 曾几何时&#xff0c;PC 与移动互联网重塑了人们的购物路径&#xff1a;信息变得唾手可得&#xff0c;商品决策变得高度依赖内容。 但 AI 时代的来…...

Fabric V2.5 通用溯源系统——增加图片上传与下载功能

fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...