如何在Linux环境中的Qt项目中使用ActiveMQ-CPP
文章目录
- 代码1:消费者
- 代码2:生成者
之前在Linux下的qt程序中使用activeMQ的时候也是用了很多时间去研究,本来想的是好好记录一下,但是当时顾着写代码。很多细节也不想再去走一遍了。大概写一下怎么使用就行了。注意:一定要先开启服务器。
代码1:消费者
代码;这个代码其实是官网的源码里面提供的例子
我来对这个代码做个解释,也是作为自己的笔记。
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
class SimpleAsyncConsumer : public ExceptionListener,public MessageListener,public DefaultTransportListener {
private:Connection* connection;Session* session;Destination* destination;MessageConsumer* consumer;bool useTopic;std::string brokerURI;std::string destURI;bool clientAck;private:SimpleAsyncConsumer(const SimpleAsyncConsumer&);SimpleAsyncConsumer& operator=(const SimpleAsyncConsumer&);public:SimpleAsyncConsumer(const std::string& brokerURI,const std::string& destURI,bool useTopic = false,bool clientAck = false) :connection(NULL),session(NULL),destination(NULL),consumer(NULL),useTopic(useTopic),brokerURI(brokerURI),destURI(destURI),clientAck(clientAck) {}virtual ~SimpleAsyncConsumer() {this->cleanup();}void close() {this->cleanup();}void runConsumer() {try {// Create a ConnectionFactoryActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory(brokerURI);// Create a Connectionconnection = connectionFactory->createConnection();delete connectionFactory;ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection);if (amqConnection != NULL) {amqConnection->addTransportListener(this);}//如果不是使用的failover连接的话。如果服务器没有开启,执行这一步的时候会抛出错误//如果是使用failover连接,执行到这一步的时候,就不会往下继续执行了,就一直卡在这里,直到和服务器建立连接//failover连接是一个什么模式我也不是很清楚,我只知道在使用这个模式连接的时候,它是会自动重连的。即使服务器中途断了,服务器再 次开启的时候,它是能自动连接上的connection->start();//监听异常。有异常抛出的时候会执行相关的函数connection->setExceptionListener(this);// Create a Sessionif (clientAck) {session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);} else {session = connection->createSession(Session::AUTO_ACKNOWLEDGE);}// Create the destination (Topic or Queue)//使用queue模式还是topic模式//queue模式是一对一连接:一个生成者一个消费者。//topic模式是多对多连接;可以有多个生成者和多个消费者//如果你需要既发送有接收的消息的话,这个情况大概率是应该使用topic模式的//在使用topic模式的时候,你自己通过生成者发出去的消息,你自己的消费者也是会收到这个消息的//这个时候,你可以设置消息过滤,或者使用消息属性识别。if (useTopic) {destination = session->createTopic(destURI);} else {destination = session->createQueue(destURI);}// Create a MessageConsumer from the Session to the Topic or Queueconsumer = session->createConsumer(destination);consumer->setMessageListener(this);} catch (CMSException& e) {e.printStackTrace();}}// Called from the consumer since this class is a registered MessageListener.virtual void onMessage(const Message* message) {static int count = 0;try {count++;const TextMessage* textMessage = dynamic_cast<const TextMessage*>(message);string text = "";if (textMessage != NULL) {text = textMessage->getText();} else {text = "NOT A TEXTMESSAGE!";}if (clientAck) {message->acknowledge();}printf("Message #%d Received: %s\n", count, text.c_str());} catch (CMSException& e) {e.printStackTrace();}}// If something bad happens you see it here as this class is also been// registered as an ExceptionListener with the connection.virtual void onException(const CMSException& ex AMQCPP_UNUSED) {printf("CMS Exception occurred. Shutting down client.\n");exit(1);}virtual void onException(const decaf::lang::Exception& ex) {printf("Transport Exception occurred: %s \n", ex.getMessage().c_str());}virtual void transportInterrupted() {std::cout << "The Connection's Transport has been Interrupted." << std::endl;}virtual void transportResumed() {std::cout << "The Connection's Transport has been Restored." << std::endl;}private:void cleanup(){//*************************************************// Always close destination, consumers and producers before// you destroy their sessions and connection.//*************************************************// Destroy resources.try{if( destination != NULL ) delete destination;}catch (CMSException& e) {}destination = NULL;try{if( consumer != NULL ) delete consumer;}catch (CMSException& e) {}consumer = NULL;// Close open resources.try{if( session != NULL ) session->close();if( connection != NULL ) connection->close();}catch (CMSException& e) {}// Now Destroy themtry{if( session != NULL ) delete session;}catch (CMSException& e) {}session = NULL;try{if( connection != NULL ) delete connection;}catch (CMSException& e) {}connection = NULL;}
};
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {activemq::library::ActiveMQCPP::initializeLibrary();std::cout << "=====================================================\n";std::cout << "Starting the example:" << std::endl;std::cout << "-----------------------------------------------------\n";// Set the URI to point to the IPAddress of your broker.// add any optional params to the url to enable things like// tightMarshalling or tcp logging etc. See the CMS web site for// a full list of configuration options.//// http://activemq.apache.org/cms///// Wire Format Options:// =====================// Use either stomp or openwire, the default ports are different for each//// Examples:// tcp://127.0.0.1:61616 default to openwire// tcp://127.0.0.1:61616?wireFormat=openwire same as above// tcp://127.0.0.1:61613?wireFormat=stomp use stomp instead//std::string brokerURI ="failover:(tcp://127.0.0.1:61616"
// "?wireFormat=openwire"
// "&connection.useAsyncSend=true"
// "&transport.commandTracingEnabled=true"
// "&transport.tcpTracingEnabled=true"
// "&wireFormat.tightEncodingEnabled=true"")";//============================================================// This is the Destination Name and URI options. Use this to// customize where the consumer listens, to have the consumer// use a topic or queue set the 'useTopics' flag.//============================================================std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1";//============================================================// set to true to use topics instead of queues// Note in the code above that this causes createTopic or// createQueue to be used in the consumer.//============================================================bool useTopics = false;//============================================================// set to true if you want the consumer to use client ack mode// instead of the default auto ack mode.//============================================================bool clientAck = false;// Create the consumerSimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );// Start it up and it will listen forever.consumer.runConsumer();// Wait to exit.std::cout << "Press 'q' to quit" << std::endl;while( std::cin.get() != 'q') {}// All CMS resources should be closed before the library is shutdown.consumer.close();std::cout << "-----------------------------------------------------\n";std::cout << "Finished with the example." << std::endl;std::cout << "=====================================================\n";activemq::library::ActiveMQCPP::shutdownLibrary();
}
代码2:生成者
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <activemq/library/ActiveMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>using namespace activemq;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
class SimpleProducer : public Runnable {
private:Connection* connection;Session* session;Destination* destination;MessageProducer* producer;bool useTopic;bool clientAck;unsigned int numMessages;std::string brokerURI;std::string destURI;private:SimpleProducer( const SimpleProducer& );SimpleProducer& operator= ( const SimpleProducer& );public:SimpleProducer( const std::string& brokerURI, unsigned int numMessages,const std::string& destURI, bool useTopic = false, bool clientAck = false ) :connection(NULL),session(NULL),destination(NULL),producer(NULL),useTopic(useTopic),clientAck(clientAck),numMessages(numMessages),brokerURI(brokerURI),destURI(destURI) {}virtual ~SimpleProducer(){cleanup();}void close() {this->cleanup();}virtual void run() {try {// Create a ConnectionFactoryauto_ptr<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory( brokerURI ) );// Create a Connectiontry{connection = connectionFactory->createConnection();connection->start();} catch( CMSException& e ) {e.printStackTrace();throw e;}// Create a Sessionif( clientAck ) {session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );} else {session = connection->createSession( Session::AUTO_ACKNOWLEDGE );}// Create the destination (Topic or Queue)if( useTopic ) {destination = session->createTopic( destURI );} else {destination = session->createQueue( destURI );}// Create a MessageProducer from the Session to the Topic or Queueproducer = session->createProducer( destination );producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );// Create the Thread Id Stringstring threadIdStr = Long::toString( Thread::currentThread()->getId() );// Create a messagesstring text = (string)"Hello world! from thread " + threadIdStr;for( unsigned int ix=0; ix<numMessages; ++ix ){TextMessage* message = session->createTextMessage( text );//这里有一个setIntProperty,应该也会有一个对应的getIntProperty(我没有去看有没有)。你可以在消费者获取消息的时候调用这个函数,函数属性是否一致,如果一致说明是自己发送的消息,就可以不去处理它。//可以确认的是TextMessage是有一个setPropertyString和getPropertyString函数的,这个函数可以用了区别是否是自己的消息。message->setIntProperty( "Integer", ix );// Tell the producer to send the messageprintf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() );producer->send( message );delete message;}}catch ( CMSException& e ) {e.printStackTrace();}}private:void cleanup(){// Destroy resources.try{if( destination != NULL ) delete destination;}catch ( CMSException& e ) { e.printStackTrace(); }destination = NULL;try{if( producer != NULL ) delete producer;}catch ( CMSException& e ) { e.printStackTrace(); }producer = NULL;// Close open resources.try{if( session != NULL ) session->close();if( connection != NULL ) connection->close();}catch ( CMSException& e ) { e.printStackTrace(); }try{if( session != NULL ) delete session;}catch ( CMSException& e ) { e.printStackTrace(); }session = NULL;try{if( connection != NULL ) delete connection;}catch ( CMSException& e ) { e.printStackTrace(); }connection = NULL;}
};
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {//这个函数只调用一次activemq::library::ActiveMQCPP::initializeLibrary();std::cout << "=====================================================\n";std::cout << "Starting the example:" << std::endl;std::cout << "-----------------------------------------------------\n";// Set the URI to point to the IPAddress of your broker.// add any optional params to the url to enable things like// tightMarshalling or tcp logging etc. See the CMS web site for// a full list of configuration options.//// http://activemq.apache.org/cms///// Wire Format Options:// =====================// Use either stomp or openwire, the default ports are different for each//// Examples:// tcp://127.0.0.1:61616 default to openwire// tcp://127.0.0.1:61616?wireFormat=openwire same as above// tcp://127.0.0.1:61613?wireFormat=stomp use stomp instead//std::string brokerURI ="failover://(tcp://127.0.0.1:61616"
// "?wireFormat=openwire"
// "&connection.useAsyncSend=true"
// "&transport.commandTracingEnabled=true"
// "&transport.tcpTracingEnabled=true"
// "&wireFormat.tightEncodingEnabled=true"")";//============================================================// Total number of messages for this producer to send.//============================================================unsigned int numMessages = 2000;//============================================================// This is the Destination Name and URI options. Use this to// customize where the Producer produces, to have the producer// use a topic or queue set the 'useTopics' flag.//============================================================std::string destURI = "TEST.FOO";//============================================================// set to true to use topics instead of queues// Note in the code above that this causes createTopic or// createQueue to be used in the producer.//============================================================bool useTopics = false;// Create the producer and run it.SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );// Publish the given number of Messagesproducer.run();// Before exiting we ensure that all CMS resources are closed.producer.close();std::cout << "-----------------------------------------------------\n";std::cout << "Finished with the example." << std::endl;std::cout << "=====================================================\n";activemq::library::ActiveMQCPP::shutdownLibrary();
}
相关文章:
如何在Linux环境中的Qt项目中使用ActiveMQ-CPP
文章目录 代码1:消费者代码2:生成者 之前在Linux下的qt程序中使用activeMQ的时候也是用了很多时间去研究,本来想的是好好记录一下,但是当时顾着写代码。很多细节也不想再去走一遍了。大概写一下怎么使用就行了。注意:一…...
HTML字符实体详解
HTML 字符实体是在 HTML 文档中用来表示特定字符的特殊编码。这些字符可能因为直接输入而引发解析错误,或某些字符在 HTML 中具有特殊含义(例如,< 和 > 用于标签)。因此,使用字符实体可以确保文本的准确呈现。 1. 什么是字符实体? 字符实体由一个 & 符号开始,…...
Netty学习——NIO基础与IO模型
导学 Socket和NIO的区别 Socket和NIO是Java中用于网络编程的两个不同的API,具有不同的设计理念和用途。以下是它们的主要区别: 1. 定义 Socket: Socket是Java中用于实现网络通信的传统API,通常被称为Java I/O(输入/输出&#…...
ZYNQ7045之YOLO部署——FPGA-ZYNQ Soc实战笔记1
一、简介 1、目标检测概念 2、目标检测应用 3、目标检测发展历程 二、YOLO V1 1、输入 必须为448x448分辨率 2、网络结构 卷积 池化 卷积 池化 3、输出 最终7x7x30表示,7x7个各自,每个格子有30个数据,30个数据包含两个部分 1:…...
Spring中的资源以及分类
Spring中的资源都被封装成 Resource 对象 以上是我测试代码的项目编译后的目录结构,target 所在的目录是 D:\\IdeaProjects\\study-spring\\ public void printStream(InputStream inputStream) throws IOException {Reader reader new InputStreamReader(input…...
初步认识Java,及使用
JAVA 特点 简单性 面向对象 分布式 健壮性 安全性 体系结构中立(平台无关) 可移植性 解释执行 高性能 多线程 动态 发展史 JDK,Eclipse下载,…...
C,C++被static标记的变量和函数分别是什么意思
被static关键字标记的变量和函数的含义 在C中,static关键字可以用于变量和函数的声明,它具有不同的语义和用途:static变量 1.全局静态变量:当全局变量被声明为static时,其作用域被限制为声明它的文件,即使使…...
Map 不常用方法介绍
getOrDefault 尝试获取key对应的值,如果未获取到,就返回默认值。 例子: private static void testGetOrDefault() {Map<String, String> map new HashMap<>(4);map.put("123", "123");String key "…...
论文翻译:ICLR 2024.DETECTING PRETRAINING DATA FROM LARGE LANGUAGE MODELS
文章目录 检测大型语言模型的预训练数据摘要1 引言2 预训练数据检测问题2.1 问题定义和挑战2.2 WIKIMIA:动态评估基准 3 MIN-K% PROB:简单的无参考预训练数据检测方法4 实验4.1 数据集和指标4.2 基线检测方法4.3 实现和结果4.4 分析 5 案例研究ÿ…...
Spring 框架精髓:从基础到分布式架构的进阶之路
一、概述 (一)Spring框架概念 1.概念: Spring框架是一个用于简化Java企业级应用开发的开源应用程序框架。 2.Spring框架的核心与提供的技术支持: 核心: IoC控制反转|反转控制:利用框架创建类的对象的…...
深入理解C++ Lambda表达式:语法、用法与原理及其包装器的使用
深入理解C Lambda表达式:语法、用法与原理及其包装器的使用 lambda表达式C98中的一个例子lambda表达式语法lambda表达式各部分说明捕获列表说明 函数对象与lambda表达式 包装器function包装器 bind 🌏个人博客主页: 个人主页 本文深入介绍了…...
C# 编程语言:跨时代的革命
C# 是一种由微软开发的现代、类型安全、面向对象的编程语言,自2000年推出以来,它已经成为.NET平台的核心组成部分。在本文中,我们将探讨C#语言的特点、优势以及它在软件开发领域中的应用。 C# 语言特点 类型安全和自动垃圾回收 C# 是一种类…...
恋爱脑学Rust之Box与RC的对比
在遥远的某个小镇,住着一对年轻的恋人:阿丽和小明。他们的爱情故事就像 Rust 中的 Rc 和 Box 智能指针那样,有着各自不同的「所有权」和「共享」的理解。 故事背景 阿丽和小明准备共同养一株非常珍贵的花(我们称之为“心之花”&…...
Rust 力扣 - 1423. 可获得的最大点数
文章目录 题目描述题解思路题解代码题目链接 题目描述 题解思路 题目所求结果存在下述等式 可获得的最大点数 所有卡牌的点数之和 - 长度为(卡牌数量 - k)的窗口的点数之和的最小值 我们遍历长度为(卡牌数量 - k)的窗口&#…...
Android15音频进阶之Cuttlefish搭建音频开发环境(九十二)
简介: CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布:《Android系统多媒体进阶实战》🚀 优质专栏: Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏: 多媒体系统工程师系列【原创干货持续更新中……】🚀 优质视频课程:AAOS车载系统+…...
发现不为人知的AI宝藏:发现AI新天地! —— 《第八期》
在人工智能(AI)领域,尽管ChatGPT、Midjourney等知名产品广为人知,但还有许多小众而有趣的AI工具等待你的探索。本文将推荐五款实用的AI工具,它们不仅功能强大,而且使用简单,帮助你在各种场景中提…...
基于物联网设计的地下煤矿安全监测与预警
文章目录 一、前言1.1 项目介绍【1】项目开发背景【2】设计实现的功能【3】项目硬件模块组成 1.2 设计思路1.3 系统功能总结1.4 开发工具的选择【1】设备端开发【2】上位机开发 1.5 模块的技术详情介绍【1】NBIOT-BC26模块【2】MQ5传感器【4】DHT11传感器【5】红外热释电人体检…...
Java 23 的12 个新特性!!
Java 23 来啦!和 Java 22 一样,这也是一个非 LTS(长期支持)版本,Oracle 仅提供六个月的支持。下一个长期支持版是 Java 25,预计明年 9 月份发布。 Java 23 一共有 12 个新特性! 有同学表示&…...
.NET 8 中 Entity Framework Core 的使用
本文代码:https://download.csdn.net/download/hefeng_aspnet/89935738 概述 Entity Framework Core (EF Core) 已成为 .NET 开发中数据访问的基石工具,为开发人员提供了强大而多功能的解决方案。随着 .NET 8 和 C# 10 中引入的改进,开发人…...
ai数字人分身123口播克隆数字人小程序源码_博纳软云
功能配置 一、用户 用户管理小黑屋用户反馈登录设置短信参数 二、作品 视频作品背景音乐库背景音乐分类 三、形象分身 上传记录视频要求参数配置 四、声音克隆 克隆记录参数配置声音要求文案示例 五、AI文案 生成记录创作模型模型分类Al配置 六、充值 充值订单积分套…...
Cursor实现用excel数据填充word模版的方法
cursor主页:https://www.cursor.com/ 任务目标:把excel格式的数据里的单元格,按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例,…...
Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件
今天呢,博主的学习进度也是步入了Java Mybatis 框架,目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学,希望能对大家有所帮助,也特别欢迎大家指点不足之处,小生很乐意接受正确的建议&…...
五年级数学知识边界总结思考-下册
目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解:由来、作用与意义**一、知识点核心内容****二、知识点的由来:从生活实践到数学抽象****三、知识的作用:解决实际问题的工具****四、学习的意义:培养核心素养…...
TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案
一、TRS收益互换的本质与业务逻辑 (一)概念解析 TRS(Total Return Swap)收益互换是一种金融衍生工具,指交易双方约定在未来一定期限内,基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...
Go语言多线程问题
打印零与奇偶数(leetcode 1116) 方法1:使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...
Golang——6、指针和结构体
指针和结构体 1、指针1.1、指针地址和指针类型1.2、指针取值1.3、new和make 2、结构体2.1、type关键字的使用2.2、结构体的定义和初始化2.3、结构体方法和接收者2.4、给任意类型添加方法2.5、结构体的匿名字段2.6、嵌套结构体2.7、嵌套匿名结构体2.8、结构体的继承 3、结构体与…...
tomcat入门
1 tomcat 是什么 apache开发的web服务器可以为java web程序提供运行环境tomcat是一款高效,稳定,易于使用的web服务器tomcathttp服务器Servlet服务器 2 tomcat 目录介绍 -bin #存放tomcat的脚本 -conf #存放tomcat的配置文件 ---catalina.policy #to…...
适应性Java用于现代 API:REST、GraphQL 和事件驱动
在快速发展的软件开发领域,REST、GraphQL 和事件驱动架构等新的 API 标准对于构建可扩展、高效的系统至关重要。Java 在现代 API 方面以其在企业应用中的稳定性而闻名,不断适应这些现代范式的需求。随着不断发展的生态系统,Java 在现代 API 方…...
在 Spring Boot 中使用 JSP
jsp? 好多年没用了。重新整一下 还费了点时间,记录一下。 项目结构: pom: <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://ww…...
