当前位置: 首页 > news >正文

python 自建kafka消息生成和消费小工具

要将 Kafka 的消息生产和消费转换为 API 接口,我们可以使用 Python 的 Web 框架。其中 Flask 是一个轻量级且易于使用的选择。下面是一个简单的例子,使用 Flask 创建 API 来生成和消费 Kafka 消息。

1. 安装所需的库

pip install kafka-python flask

2. 创建 Flask API

from flask import Flask, request, jsonify
from kafka import KafkaProducer, KafkaConsumerapp = Flask(__name__)# 配置 Kafka
KAFKA_BROKER_URL = 'localhost:9092'
TOPIC = 'test_topic'
producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER_URL)@app.route('/send', methods=['POST'])
def send_message():message = request.json.get('message')if message:producer.send(TOPIC, value=message.encode('utf-8'))return jsonify({"status": "success", "message": "Message sent!"}), 200else:return jsonify({"status": "error", "message": "Message cannot be empty!"}), 400@app.route('/receive', methods=['GET'])
def receive_message():consumer = KafkaConsumer(TOPIC, bootstrap_servers=KAFKA_BROKER_URL, auto_offset_reset='earliest')messages = []for message in consumer:messages.append(message.value.decode('utf-8'))if len(messages) > 5:  # 只收集最近的5条消息,可以根据需要调整breakreturn jsonify(messages)if __name__ == '__main__':app.run(debug=True, port=5000)

这个 Flask 应用程序定义了两个端点:

  • /send: 它接受 POST 请求并发送消息到 Kafka。
  • /receive: 它返回 Kafka 主题中的最近消息。

3. 使用 API

  • 发送消息:
curl -X POST http://localhost:5000/send -H "Content-Type: application/json" -d '{"message": "Hello, Kafka!"}'
  • 接收消息:
curl http://localhost:5000/receive

这只是一个简单的示例,您可能需要添加错误处理、日志记录、认证、消息序列化和反序列化等功能,以满足更复杂的需求。

相关文章:

python 自建kafka消息生成和消费小工具

要将 Kafka 的消息生产和消费转换为 API 接口,我们可以使用 Python 的 Web 框架。其中 Flask 是一个轻量级且易于使用的选择。下面是一个简单的例子,使用 Flask 创建 API 来生成和消费 Kafka 消息。 1. 安装所需的库: pip install kafka-py…...

Prim算法:经过图中所有节点的最短路径

题目链接&#xff1a;53. 寻宝&#xff08;第七期模拟笔试&#xff09; #include<bits/stdc.h> using namespace std;// v为节点数量&#xff0c;e为边数量 int v, e;// 最小生成树 void prim(vector<vector<int>>& adj) {vector<int> dist(v1, I…...

Linux 信号捕捉函数 signal sigaction

signal函数 #include <signal.h> typedef void (*sighandler_t)(int); sighandler_t signal(int signum, sighandler_t handler); 功能&#xff1a;设置某个信号的捕捉行为 参数&#xff1a; -signum&#xff1a;要捕捉的信号 handler&#xff1a;对捕捉到的信号怎么处理…...

StarRocks操作笔记

最近在使用starRocks&#xff0c;记录一些临时的操作技巧&#xff0c;防止遗忘。 1. 创建表 CREATE TABLE IF NOT EXISTS ODS.T_TEST( pk_day date, pool_address string, code string comment 唯一主键, test1 string, test2 string, test3 string, pk_year varchar(4), pk_m…...

Linux的ls -ld命令产生的信息怎么看

2023年9月24日&#xff0c;周日上午 目录 ls -ld列出的目录或文件的信息含义文件硬链接什么是文件硬链接为什么新建目录的文件硬链接为2举例说明例一例二例三 ls -ld列出的目录或文件的信息含义 第一个字符表示文件类型: d: 目录 -: 普通文件 l: 软链接 b: 块设备文件 c:…...

Linux- 内存映射文件(Memory-Mapped File)

内存映射文件&#xff08;Memory-Mapped File&#xff09;是⼀种将文件内容映射到内存中的机制&#xff0c;允许程序直接访问文件数据&#xff0c;就好像这些数据已经被加载到了内存⼀样。这个机制允许文件的内容被映射到⼀个进程的地址空间&#xff0c;从而允许程序以⼀种更高…...

李航老师《统计学习方法》第五章阅读笔记

决策树&#xff08;decision tree&#xff09;是一种基本的分类与回归方法。本章主要讨论用于分类的决策树。决策树模型呈树形结构&#xff0c;在分类问题中&#xff0c;表示基于特征对实例进行分类的过程。 以下是关于分类决策树的一些基本概念和特点&#xff1a; 树形结构&am…...

iOS16新特性:实时活动-在锁屏界面实时更新APP消息 | 京东云技术团队

简介 之前在 《iOS16新特性:灵动岛适配开发与到家业务场景结合的探索实践》 里介绍了iOS16新的特性&#xff1a;实时更新&#xff08;Live Activity&#xff09;中灵动岛的适配流程&#xff0c;但其实除了灵动岛的展示样式&#xff0c;Live Activity还有一种非常实用的应用场景…...

使用 Elasticsearch、OpenAI 和 LangChain 进行语义搜索

在本教程中&#xff0c;我将引导您使用 Elasticsearch、OpenAI、LangChain 和 FastAPI 构建语义搜索服务。 LangChain 是这个领域的新酷孩子。 它是一个旨在帮助你与大型语言模型 (LLM) 交互的库。 LangChain 简化了与 LLMs 相关的许多日常任务&#xff0c;例如从文档中提取文本…...

NIFI集群_队列Queue中数据无法清空_清除队列数据报错_无法删除queue_解决_集群中机器交替重启删除---大数据之Nifi工作笔记0061

今天发现,有两个处理器,启动以后,数据流不过去,后来,锁定问题在,queue队列上面,因为别的队列都可以通过,右键,empty queue清空,就是 这个队列不行,这个队列无法被删除,至于为什么导致这样的, 猜测是因为之前,流程设计好以后,队列没有设置背压,也没有设置队列中的内容大小和fl…...

leetcode20. 有效的括号 [简单题]

题目 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&#xff1a; 左括号必须用相同类型的右括号闭合。左括号必须以正确的顺序闭合。每个右括号都有一个对应的相同类型…...

ubuntu20.04下源码编译colmap

由于稠密重建需要CUDA&#xff0c;因此先安装CUDA&#xff0c;我使用的是3050GPU&#xff0c;nvidia-smi显示最高支持CUDA11.4。 不要用sudo apt安装&#xff0c;版本较低&#xff0c;30系显卡建议安装CUDA11.0以上&#xff0c;这里安装了11.1版本。 下载&#xff1a; cuda_1…...

Jumpserver堡垒机

一、堡垒机概述 1、堡垒机的基本概念 堡垒机也是一台服务器&#xff0c;在一个特定的网络环境下&#xff0c;为了保障网络和数据不受来自外部和内部用户的入侵和破坏&#xff0c;而运用各种技术手段实时收集、监控网络环境中每一个组成部分&#xff08;服务器&#xff09;的系…...

第一百五十三回 如何实现滑动窗口

文章目录 概念介绍实现方法示例代码 我们在上一章回中介绍了自定义组件实现游戏摇杆相关的内容&#xff0c;本章回中将介绍 如何实现滑动窗口.闲话休提&#xff0c;让我们一起Talk Flutter吧。 概念介绍 我们在本章回中介绍的滑动窗口表示在屏幕底部向上滑动时弹出一个窗口&a…...

Oracle 12c自动化管理特性的新进展:自动备份、自动恢复和自动维护功能的优势|oracle 12c相对oralce 11g的新特性(3)

一、前言: 前面几期讲解了oracle 12c多租户的使用、In-Memory列存储来提高查询性能以及数据库的克隆、全局数据字典和共享数据库资源的使用 今天我们讲讲oracle 12c的另外的一个自动化管理功能新特性:自动备份、自动恢复、自动维护的功能 二、自动备份、自动恢复、自动维护…...

Redis——Jedis中hash类型使用

hset 和 hget hset可以逐一添加key和value&#xff0c;也可以通过map类型来直接添加多组fields 而hget则返回string类型&#xff0c;如果元素不存在则返回null private static void hsetAndHget(Jedis jedis) {jedis.flushAll();jedis.hset("key", "f1"…...

肖sir__项目实战讲解__004

项目实战讲解 一、项目的类型 金融类&#xff1a; 保险(健康险理财险)、证券、基金(股票型基金、混合型基金、指数型基金、债券型基金、 天天基金网&#xff08;ETF基金、货币型基金、量化基金)、银行、贷款、信用卡、外汇、二元期权、期货原油、blockchain、 数字货币、黄金白…...

数据库数据恢复-ORACLE常见故障有哪些?恢复数据的可能性高吗?

ORACLE数据库常见故障&#xff1a; 1、ORACLE数据库无法启动或无法正常工作。 2、ORACLE数据库ASM存储破坏。 3、ORACLE数据库数据文件丢失。 4、ORACLE数据库数据文件部分损坏。 5、ORACLE数据库DUMP文件损坏。 ORACLE数据库数据恢复可能性分析&#xff1a; 1、ORACLE数据库无…...

合规性管理如何帮助产品团队按时交付?

成功的产品和产品发布背后通常需要经过一个涉及多个监督机构、多功能团队和利益相关者的复杂流程。在组织的治理、风险管理和合规性&#xff08;GRC&#xff09;框架下&#xff0c;产品团队不仅需要追求市场创新&#xff0c;还需要确保符合所有适用的法规、标准和合同要求。由于…...

从平均数到排名算法

平均数用更少的数字&#xff0c;概括一组数字。属于概述统计量、集中趋势测度、位置测度。中位数是第二常见的概述统计量。许多情况下比均值更合适。算术平均数是3中毕达哥拉斯平均数之一&#xff0c;另外两种毕达哥拉斯平均数是几何平均数和调和平均数。 算术平均 A M 1 n ∑…...

智慧医疗能源事业线深度画像分析(上)

引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...

3.3.1_1 检错编码(奇偶校验码)

从这节课开始&#xff0c;我们会探讨数据链路层的差错控制功能&#xff0c;差错控制功能的主要目标是要发现并且解决一个帧内部的位错误&#xff0c;我们需要使用特殊的编码技术去发现帧内部的位错误&#xff0c;当我们发现位错误之后&#xff0c;通常来说有两种解决方案。第一…...

TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案

一、TRS收益互换的本质与业务逻辑 &#xff08;一&#xff09;概念解析 TRS&#xff08;Total Return Swap&#xff09;收益互换是一种金融衍生工具&#xff0c;指交易双方约定在未来一定期限内&#xff0c;基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...

MySQL账号权限管理指南:安全创建账户与精细授权技巧

在MySQL数据库管理中&#xff0c;合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号&#xff1f; 最小权限原则&#xf…...

网站指纹识别

网站指纹识别 网站的最基本组成&#xff1a;服务器&#xff08;操作系统&#xff09;、中间件&#xff08;web容器&#xff09;、脚本语言、数据厍 为什么要了解这些&#xff1f;举个例子&#xff1a;发现了一个文件读取漏洞&#xff0c;我们需要读/etc/passwd&#xff0c;如…...

回溯算法学习

一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...

全面解析数据库:从基础概念到前沿应用​

在数字化时代&#xff0c;数据已成为企业和社会发展的核心资产&#xff0c;而数据库作为存储、管理和处理数据的关键工具&#xff0c;在各个领域发挥着举足轻重的作用。从电商平台的商品信息管理&#xff0c;到社交网络的用户数据存储&#xff0c;再到金融行业的交易记录处理&a…...

Kubernetes 节点自动伸缩(Cluster Autoscaler)原理与实践

在 Kubernetes 集群中&#xff0c;如何在保障应用高可用的同时有效地管理资源&#xff0c;一直是运维人员和开发者关注的重点。随着微服务架构的普及&#xff0c;集群内各个服务的负载波动日趋明显&#xff0c;传统的手动扩缩容方式已无法满足实时性和弹性需求。 Cluster Auto…...

解析两阶段提交与三阶段提交的核心差异及MySQL实现方案

引言 在分布式系统的事务处理中&#xff0c;如何保障跨节点数据操作的一致性始终是核心挑战。经典的两阶段提交协议&#xff08;2PC&#xff09;通过准备阶段与提交阶段的协调机制&#xff0c;以同步决策模式确保事务原子性。其改进版本三阶段提交协议&#xff08;3PC&#xf…...