从零开始实现 C++ TinyWebServer 阻塞队列 BlockQueue类详解
文章目录
- 阻塞队列是什么?
- 为什么需要阻塞队列?
- BlockQueue 成员变量
- 实现 push() 函数
- 实现 pop() 函数
- 实现 close() 函数
- BlockQueue 代码
- BlockQueue 测试
从零开始实现 C++ TinyWebServer 项目总览
项目源码
阻塞队列是什么?

阻塞队列是一种线程安全的数据结构,支持多线程环境中的生产者-消费者模型。其核心特点在于,当队列为空时,消费者线程会进入阻塞状态,直到有新的数据可供消费;而当队列已满时,生产者线程会被阻塞,直至队列中有空闲空间可供使用。
-
线程安全:借助同步机制,有效避免了多个线程同时操作队列时可能出现的数据竞争问题,确保数据的一致性和完整性。
-
容量限制:可以根据实际需求灵活设置队列的容量上限,当队列达到最大容量时,生产者线程会被阻塞,避免数据溢出。
-
阻塞操作:当队列为空时,消费者线程会自动等待;当队列满时,生产者线程也会进入等待状态,实现了线程间的协调与同步。
为什么需要阻塞队列?
-
解耦生产消费:将日志信息的产生和存储过程进行分离,应用程序只需将日志快速放入队列,无需等待写入操作完成,从而提高了系统的可维护性。
-
平衡速度差异:有效应对日志生产速度不稳定和消费速度受存储设备性能限制的问题,队列能够缓存多余的日志信息,实现动态平衡,确保系统的稳定运行。
-
提升并发性能:支持多线程协作,生产者和消费者线程可以同时工作,充分利用多核处理器的性能优势,同时队列的同步机制避免了线程竞争,提高了系统的并发处理能力。
-
保证日志顺序:阻塞队列的先进先出特性确保日志按产生顺序进行存储,便于后续的问题排查和分析。
BlockQueue 成员变量
bool is_close; // 是否关闭
size_t capacity_; // 容量
std::deque<T> deque_; // 双向队列 std::mutex mtx_; // 锁
std::condition_variable condition_producer; // 生产者条件变量
std::condition_variable condition_consumer; // 消费者条件变量
实现 push() 函数
获取互斥锁mtx_保证线程安全;检查队列是否已满,满了就等待;向队列添加元素;通知一个等待的消费者线程。
void push_back(const T& item);
向阻塞队列的尾部添加一个元素。
template<class T>
void BlockQueue<T>::push_back(const T& item) {std::unique_lock<std::mutex> locker(mtx_);// 队列满了,暂停生产while (deque_.size() >= capacity_)condition_producer.wait(locker); // 防止虚假唤醒deque_.push_back(item);condition_consumer.notify_one();
}
向阻塞队列的头部添加一个元素。
template<class T>
void BlockQueue<T>::push_front(const T& item) {std::unique_lock<std::mutex> locker(mtx_);while (deque_.size() >= capacity_)condition_producer.wait(locker);deque_.push_front(item);condition_consumer.notify_one();
}
实现 pop() 函数
获取互斥锁mtx_;检查队列是否为空,如果为空且队列未关闭,就等待;如果队列关闭,返回false;取出队列头部元素,存储到item中,并移除队列头部元素;通知一个等待的生产者线程,队列中有空间了。
bool pop(T& item);
从阻塞队列的头部取出一个元素。
template<class T>
bool BlockQueue<T>::pop(T& item) {std::unique_lock<std::mutex> locker(mtx_);// 队列空了,暂停消费while (deque_.empty()) {if (is_close)return false;condition_consumer.wait(locker);}item = deque_.front();deque_.pop_front();condition_producer.notify_one();return true;
}
bool pop(T& item, int timeout);
从阻塞队列的头部取出一个元素,但是有超时机制。
template<class T>
bool BlockQueue<T>::pop(T& item, int timeout) {std::unique_lock<std::mutex> locker(mtx_);const std::cv_status TIMEOUT_STATUS = std::cv_status::timeout;while (deque_.empty()) {if (is_close)return false;if (condition_consumer.wait_for(locker, std::chrono::seconds(timeout)) == TIMEOUT_STATUS)return false;}item = deque_.front();deque_.pop_front();condition_producer.notify_one();return true;
}
实现 close() 函数
获取互斥锁mtx_,清空队列,设置关闭标志is_close为true;通知所有等待的生产者和消费者线程。
void close();
关闭阻塞队列,释放所有等待的生产者和消费者线程。
template<class T>
void BlockQueue<T>::close() {{std::lock_guard<std::mutex> locker(mtx_);deque_.clear();is_close = true;}condition_producer.notify_all();condition_consumer.notify_all();
}
BlockQueue 代码
模板的定义和实现要放在同一个头文件中,因为模板的代码需要在编译时实例化。
block
#ifndef BLOCKQUEUE_H
#define BLOCKQUEUE_H#include <iostream>
#include <deque>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <assert.h>template<class T>
class BlockQueue {
public:BlockQueue(size_t max_size = 1000);~BlockQueue();bool empty();bool full();void clear();size_t size();size_t capacity();void push_front(const T& item);void push_back(const T& item);bool pop(T& item);bool pop(T& item, int timeout);T front();T back();void flush();void close();private:bool is_close; // 是否关闭size_t capacity_; // 容量std::deque<T> deque_; // 双向队列 std::mutex mtx_; // 锁std::condition_variable condition_producer; // 生产者条件变量std::condition_variable condition_consumer; // 消费者条件变量
};// 模板的定义和实现要放在同一个头文件中
// 因为模板的代码需要在编译时实例化template<class T>
BlockQueue<T>::BlockQueue(size_t max_size) : capacity_(max_size) {assert(max_size > 0);is_close = false;
}template<class T>
BlockQueue<T>::~BlockQueue() {close();
} template<class T>
void BlockQueue<T>::push_back(const T& item) {std::unique_lock<std::mutex> locker(mtx_);// 队列满了,暂停生产while (deque_.size() >= capacity_)condition_producer.wait(locker); // 防止虚假唤醒deque_.push_back(item);condition_consumer.notify_one();
}template<class T>
void BlockQueue<T>::push_front(const T& item) {std::unique_lock<std::mutex> locker(mtx_);while (deque_.size() >= capacity_)condition_producer.wait(locker);deque_.push_front(item);condition_consumer.notify_one();
}template<class T>
bool BlockQueue<T>::pop(T& item) {std::unique_lock<std::mutex> locker(mtx_);// 队列空了,暂停消费while (deque_.empty()) {if (is_close)return false;condition_consumer.wait(locker);}item = deque_.front();deque_.pop_front();condition_producer.notify_one();return true;
}template<class T>
bool BlockQueue<T>::pop(T& item, int timeout) {std::unique_lock<std::mutex> locker(mtx_);const std::cv_status TIMEOUT_STATUS = std::cv_status::timeout;while (deque_.empty()) {if (is_close)return false;if (condition_consumer.wait_for(locker, std::chrono::seconds(timeout)) == TIMEOUT_STATUS)return false;}item = deque_.front();deque_.pop_front();condition_producer.notify_one();return true;
}// 关闭阻塞队列,唤醒所有生产者和消费者
template<class T>
void BlockQueue<T>::close() {{std::lock_guard<std::mutex> locker(mtx_);deque_.clear();is_close = true;}condition_producer.notify_all();condition_consumer.notify_all();
}// 唤醒消费者
template<class T>
void BlockQueue<T>::flush() {condition_consumer.notify_one();
}template<class T>
T BlockQueue<T>::front() {std::lock_guard<std::mutex> locker(mtx_);return deque_.front();
}template<class T>
T BlockQueue<T>::back() {std::lock_guard<std::mutex> locker(mtx_);return deque_.back();
}template<class T>
bool BlockQueue<T>::empty() {std::lock_guard<std::mutex> locker(mtx_);return deque_.empty();
}template<class T>
bool BlockQueue<T>::full() {std::lock_guard<std::mutex> locker(mtx_);return deque_.size() >= capacity_;
}template<class T>
void BlockQueue<T>::clear() {std::lock_guard<std::mutex> locker(mtx_);deque_.clear();
}template<class T>
size_t BlockQueue<T>::size() {std::lock_guard<std::mutex> locker(mtx_);return deque_.size();
}template<class T>
size_t BlockQueue<T>::capacity() {std::lock_guard<std::mutex> locker(mtx_);return capacity_;
}#endif // BLOCKQUEUE_H
BlockQueue 测试
利用Google Test对BlockQueue类进行单元测试,测试对Buffer的基本功能,延时出队,多线程下的生产者-消费者模型,关闭操作。
#include "../code/log/blockqueue.h"
#include <thread>
#include <chrono>
#include <gtest/gtest.h>// 测试 BlockQueue 的基本功能
TEST(BlockQueueTest, TestBasicFunctionality) {BlockQueue<int> queue(5);EXPECT_TRUE(queue.empty());EXPECT_EQ(queue.capacity(), 5);for (int i = 0; i < 5; ++i)queue.push_back(i);EXPECT_TRUE(queue.full());EXPECT_EQ(queue.size(), 5);int item;EXPECT_TRUE(queue.pop(item));EXPECT_EQ(item, 0);EXPECT_EQ(queue.front(), 1);EXPECT_EQ(queue.back(), 4);queue.push_front(5);EXPECT_EQ(queue.front(), 5);queue.clear();EXPECT_TRUE(queue.empty());
}// 测试带有超时的 pop 操作
TEST(BlockQueueTest, TestPopWithTimeout) {BlockQueue<int> queue(5);int item;// 等待失败,花费1sEXPECT_FALSE(queue.pop(item, 1));queue.push_back(2);// 等待成功,不耗时EXPECT_TRUE(queue.pop(item, 1));EXPECT_EQ(item, 2);
}// 测试多线程下的 生产者-消费者模式
TEST(BlockQueueTest, TestProducerConsumer) {BlockQueue<int> queue(5);std::thread producer([&queue]() {for (int i = 0; i < 10; ++i) {queue.push_back(i);// 模拟生产过程std::this_thread::sleep_for(std::chrono::milliseconds(200));}});std::thread consumer([&queue]() {int item;for (int i = 0; i < 10; ++i) {if (queue.pop(item))EXPECT_EQ(item, i);elseFAIL() << "Failed to consume an item.";// 模拟消费过程std::this_thread::sleep_for(std::chrono::milliseconds(100));}});producer.join();consumer.join();
}// 测试关闭操作
TEST(BlockQueueTest, TestClose) {BlockQueue<int> queue(5);std::thread producer([&queue]() {for (int i = 0; i < 10; ++i) {queue.push_back(i);std::this_thread::sleep_for(std::chrono::milliseconds(100));}queue.close();});std::thread consumer([&queue]() {int item;while (queue.pop(item))continue;EXPECT_TRUE(queue.empty());});producer.join();consumer.join();
}int main(int argc, char* argv[]) {::testing::InitGoogleTest(&argc, argv);return RUN_ALL_TESTS();
}
CMakeLists.txt
cmake_minimum_required(VERSION 3.10)
project(tests)# 设置 C++ 标准和编译器选项
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra")# 查找 Google Test 包
find_package(GTest REQUIRED)
# 包含 Google Test 头文件目录
include_directories(${GTEST_INCLUDE_DIRS})
# 添加可执行文件
add_executable(blockqueue_unit_test blockqueue_unit_test.cc)
# 链接 Google Test 库
target_link_libraries(blockqueue_unit_test ${GTEST_LIBRARIES} pthread)
# 启用测试
enable_testing()
# 添加测试
add_test(NAME blockqueue_unit_test COMMAND blockqueue_unit_test)
相关文章:
从零开始实现 C++ TinyWebServer 阻塞队列 BlockQueue类详解
文章目录 阻塞队列是什么?为什么需要阻塞队列?BlockQueue 成员变量实现 push() 函数实现 pop() 函数实现 close() 函数BlockQueue 代码BlockQueue 测试 从零开始实现 C TinyWebServer 项目总览 项目源码 阻塞队列是什么? 阻塞队列是一种线程…...
Linux驱动开发基础(can)
目录 1.can的介绍 2.can的硬件连接 2.1 CPU自带can控制器 2.2 CPU没有can控制器 3.电气属性 4.can的特点 5.can协议 5.1 can的种类 5.2 数据帧 5.2.1 标准数据帧格式 5.3.1 扩展数据帧格式 5.3 遥控帧 5.4 错误帧 5.5 过载帧 5.6 帧间隔 5.7 位填充 5.8 位时…...
5.2《生活中的透镜》——5.3《凸透镜成像规律》讲后再上
教会什么:照相机、投影仪、放大镜的原理 培养什么:(再说) 课标: (二)运动和相互作用 2.3 声和光 2.3.5了解凸透镜成像规律的应用。 例7 了解凸透镜成像规律在放大镜、照相机中的应用。 一、导入 提问:生活中有哪些透镜?(放大镜、照相机、投影仪/幻灯机) ——直接提出…...
【LangChain入门 3 Prompts组件】聊天提示词模板 ChatPromptTemplate
文章目录 一、 聊天信息提示词模板1.1 使用关键字1.2 使用SystemMessage, HumanMessage, AIMessage来定义消息1.3 使用MessagesPlaceholder 在特定未知添加消息列表 二、关键类介绍2.1 ChatPromptTemplate 类2.1.1 from_messages()2.1.2 format_messages()2.1.3 format_prompt(…...
fastadmin后台管理员日志指定方法不记录
做的订单提醒,只要在线会把日志自动存储进去,这个又是每30s执行一次,数据库没多久就爆掉了,最终找到一个处理方法,可能不是最好的,仅供大家参考 具体位置: application/admin/model/AdminLog.php里面的$ignoreRegex方法 protected static $ignoreRegex [/^(.*)\/(selectpage…...
leetcode热题100道——字母异位词分组
给你一个字符串数组,请你将 字母异位词 组合在一起。可以按任意顺序返回结果列表。 字母异位词 是由重新排列源单词的所有字母得到的一个新单词。 示例 1: 输入: strs ["eat", "tea", "tan", "ate", "nat", &…...
MCU-芯片时钟与总线和定时器关系,举例QSPI
时钟源: 时钟源为系统时钟提供原始频率信号,系统时钟则通过(分频、倍频、选择器)成为整个芯片的“主时钟”,驱动 CPU 内核、总线(AHB、APB)及外设的运行。 内部时钟源: HSI&#x…...
力扣热题100(方便自己复习,自用)
力扣热题100 1. 两数之和 - 力扣(LeetCode) 查找两数之和是不是等于target也就是我们找到一个数之后,用target将其减掉,再寻找应当对应的元素是什么每找到一个数,我们就将其放在集合中,因为集合中可以去重…...
暂存合并分支
合并分支代码,冲突过多,没解决完 想切换分支,可以把合并暂存 先先 git add . 再git stash 恢复搁置: 查看当前的搁置列表: git stash list恢复特定的搁置 如果你想恢复特定的搁置更改,可以指定索引&a…...
力扣hot100——三数之和(双指针)
题目:三数之和 排序 双指针 本题的难点在于如何去除重复解。 算法流程: 1、特判,对于数组长度 n,如果数组为 null 或者数组长度小于 3,返回 []。 2、对数组进行排序。 3、遍历排序后数组: (…...
技术分享 | MySQL内存使用率高问题排查
本文为墨天轮数据库管理服务团队第51期技术分享,内容原创,如需转载请联系小墨(VX:modb666)并注明来源。 一、问题现象 问题实例mysql进程实际内存使用率过高 二、问题排查 2.1 参数检查 mysql版本 :8.0.…...
分享一个精灵图生成和拆分的实现
概述 精灵图(Sprite)是一种将多个小图像合并到单个图像文件中的技术,广泛应用于网页开发、游戏开发和UI设计中。在MapboxGL中,跟之配套的还有一个json文件用来记录图标的大小和位置。本文分享基于Node和sharp库实现精灵图的合并与…...
AI日报 - 2025年3月21日
🌟 今日概览(60秒速览) ▎🤖 AGI突破 | OpenAI成立安全委员会,加速AGI治理框架构建 ▎💼 商业动向 | 微软发布医疗大模型DAX Copilot 3.0,覆盖全球临床场景 ▎📜 政策追踪 | 中国发布…...
MongoDB 配合python使用的入门教程
MongoDB 入门教程 1. 安装 MongoDB 首先,你需要在你的机器上安装MongoDB。你可以从 MongoDB官网 下载并安装 Community 版本。安装完成后,启动MongoDB服务。 # 在Linux/Mac上启动MongoDB mongod# 在Windows上,你可以通过Windows服务启动Mo…...
函数:形参和实参
在函数的使用过程中分为实参和形参,实参是主函数实际调用的值而形参则是给实参调用的值,如果函数没被调用则函式不会向内存申请空间,先用一段代码演示 形参: int test(int x ,int y ) {int z 0;z x y;return z; } 为何会叫做…...
【C#知识点详解】ExcelDataReader介绍
今天来给大家介绍一下ExcelDataReader,ExcelDataReader是一个轻量级的可快速读取Excel文件中数据的工具。话不多说直接开始。 ExcelDataReader简介 ExcelDataReader支持.xlsx、.xlsb、.xls、.csv格式文件的读取,版本基本在2007及以上版本,支…...
Vala编程语言教程-控制结构
控制结构 while (a > b) { a--; } 会重复递减a,每次迭代前检查a是否大于b。 do { a--; } while (a > b); 会重复递减a,每次迭代后检查a是否大于b。 for (int a 0; a < 10; a) { stdout.printf("%d\n", a); } 会先将a初始化为0…...
《视觉SLAM十四讲》ch13 设计SLAM系统 相机轨迹实现
前言 相信大家在slam学习中,一定会遇到slam系统的性能评估问题。虽然有EVO这样的开源评估工具,我们也需要自己了解系统生成的trajectory.txt的含义,方便我们更好的理解相机的运行跟踪过程。 项目配置如下: 数据解读: …...
服务的拆分数据的迁移
参考: 数据迁移调研...
在类Unix终端中如何实现快速进入新建目录
🚪 前言 相信喜欢使用终端工作的小伙伴或多或少会被一个小地方给膈应,那就是每次想要新建一个文件夹并且进入之,那么就需要两条指令:mkdir DIR和cd DIR,有些人可能要杠了,我一条指令也能,mkdir…...
01分数规划,二分法,题目练习
一、01分数规划 1.1 01分数规划 01分数规划用来求一个分式的极值。模型如下: 给出 a i a_i ai 和 b i b_i bi,求一组 w i ∈ { 0 , 1 } w_i \in \{ 0, 1 \} wi∈{0,1}最小化或最大化 ∑ i 1 n a i w i ∑ i 1 n b i w i \frac{\sum_{i1…...
TG电报群管理机器人定制开发的重要性
在Telegram(电报)用户突破20亿、中文社群规模持续扩张的背景下,定制化群管理机器人的开发已成为社群运营的战略刚需。这种技术工具不仅解决了海量用户管理的效率难题,更通过智能化功能重构了数字社群的治理范式。本文从管理效能、…...
VNA操作使用学习-01 界面说明
以我手里面的liteVNA为例。也可以参考其他的nanoVNA的操作说明。我先了解一下具体的菜单意思。 今天我想做一个天调,居然发现我连一颗基本的50欧姆插件电阻和50欧姆的smt电阻的幅频特性都没有去测试过,那买来这个nva有什么用途呢,束之高阁求…...
mysql-DELETE、DROP 和 TRUNCATE区别
在MySQL中,DELETE、DROP 和 TRUNCATE 是用于不同目的的SQL命令,它们各自执行不同的数据库操作。以下是它们的区别和适用场景: DELETE 用途:用于从表中删除满足特定条件的行。语法示例:DELETE FROM table_name WHERE …...
耘想Docker版Linux NAS的安装说明
耘想LinNAS(Linux NAS)可以通过Docker部署,支持x86和arm64两种硬件架构。下面讲解LinNAS的部署过程。 1. 安装Docker CentOS系统:yum install docker –y Ubuntu系统:apt install docker.io –y 2. 下载LinNas镜像…...
OpenCV图像拼接(4)图像拼接模块的一个匹配器类cv::detail::BestOf2NearestRangeMatcher
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 cv::detail::BestOf2NearestRangeMatcher 是 OpenCV 库中用于图像拼接模块的一个匹配器类,专门用于寻找两幅图像之间的最佳特征点匹配…...
k8s 配置imagePullSecrets仓库认证
在 Kubernetes (K8s) 中,imagePullSecrets 允许 Pod 访问私有镜像仓库,例如 Docker Hub、Harbor、阿里云镜像仓库、腾讯云 TCR 或自建的 registry.flow.cn。以下是完整的 imagePullSecrets 配置步骤: 步骤 1:创建 imagePullSecret…...
不用 Tomcat?SpringBoot 项目用啥代替?
在SpringBoot框架中,我们使用最多的是Tomcat,这是SpringBoot默认的容器技术,而且是内嵌式的Tomcat。 同时,SpringBoot也支持Undertow容器,我们可以很方便的用Undertow替换Tomcat,而Undertow的性能和内存使…...
Zabbix安装(保姆级教程)
Zabbix 是一款开源的企业级监控解决方案,能够监控网络的多个参数以及服务器、虚拟机、应用程序、服务、数据库、网站和云的健康状况和完整性。它提供了灵活的通知机制,允许用户为几乎任何事件配置基于电子邮件的告警,从而能够快速响应服务器问…...
鸿蒙开发真机调试:无线调试和USB调试
前言 在鸿蒙开发的旅程中,真机调试堪称至关重要的环节,其意义不容小觑。虽说模拟器能够为我们提供初步的测试环境,方便我们在开发过程中快速预览应用的基本效果,但它与真机环境相比,仍存在诸多差异。就好比在模拟器中…...
