Kafka与MySQL的组合使用
- 根据上面给出的student表,编写Python程序完成如下操作:
(1)读取student表的数据内容,将其转为JSON格式,发送给Kafka;
创建Student表的SQL语句如下:
create table student(
sno char(5),
sname char(10),
ssex char(2),
sage int
);


向student表中插入两条记录的SQL语句如下:
insert into student values(‘95001’,’John’,’M’,23);
insert into student values(‘95002’,’Tom’,’M’,23);

启动zookeeper和kafka的服务


编写一个生产者程序mysql_producer.py:
from kafka import KafkaProducer
import json
import pymysql.cursorsproducer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8'))connect=pymysql.Connect(host='localhost',port=3306,user='root',passwd='123456',db='zhangna',charset='utf8'
)
cursor=connect.cursor()
sql="select sno,sname,ssex,sage from student;"
cursor.execute(sql)
data=cursor.fetchall()
connect.commit()for message in data:zn={}zn['sno']=message[0]zn['sname']=message[1]zn['sex']=message[2]zn['age']=message[3]producer.send('mysql_topic',zn)connect.close()
producer.close()

(2)再从Kafka中获取到JSON格式数据,打印出来;
编写一个消费者程序mysql_consumer.py:
from kafka import KafkaConsumer
import json
import pymysql.cursorsconsumer = KafkaConsumer('mysql_topic',bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='earliest')
for msg in consumer:msg1=str(msg.value,encoding="utf-8")data=json.loads(msg1)print(data)

终于出来了,出错的原因是encoding,我写成了encodings的缘故
为什么我会出现两条重复记录,原因是我生产者程序运行了多次,生产者多运行一次,消费者程序就会多一次查询
相关文章:
Kafka与MySQL的组合使用
根据上面给出的student表,编写Python程序完成如下操作: (1)读取student表的数据内容,将其转为JSON格式,发送给Kafka; 创建Student表的SQL语句如下: create table student( sno ch…...
2018年亚太杯APMCM数学建模大赛A题老年人平衡能力的实时训练模型求解全过程文档及程序
2018年亚太杯APMCM数学建模大赛 A题 老年人平衡能力的实时训练模型 原题再现 跌倒在老年人中很常见。跌倒可能会导致老年人出现许多并发症,因为他们的康复能力通常较差,因此副作用可能会使人衰弱,从而加速身体衰竭。此外,对跌倒…...
华盛顿特区选举委员会:黑客可能已侵入整个选民名册
导语 近日,华盛顿特区选举委员会(DCBOE)传来了一条令人担忧的消息:黑客可能已经侵入了整个选民名册。这一事件引发了公众的广泛关注和担忧。本文将为大家详细介绍这一事件的经过以及可能带来的后果,并探讨选民数据的保…...
kali安装nodejs、npm失败
更新apt-get再安装,更新时间比较久,看网速,中间有一些确认步骤 22 apt-get update23 apt-get upgrade24 apt-get install nodejs25 node26 npm27 apt-get install npm...
插入排序(学习笔记)
插入排序 每一轮插入排序后的结果与打扑克牌取牌原理相似,将取到的牌插入到合适的位置,但在程序实现方面还是基于交换的算法。 它的基本思想是将一个记录插入到已经排好序的有序表中,从而一个新的、记录数增1的有序表。 import java.util.…...
wps excel js编程
定义全局变量 const a "dota" function test() {Debug.Print(a) }获取表格中单元格内容 function test() {Debug.Print("第一行第二列",Cells(1,2).Text)Debug.Print("A1:",Range("A1").Text) }写单元格 Range("C1").Val…...
Python 类继承解释
一、说明 类继承是Python中数据科学家和机器学习工程师需要了解的一个重要概念。在这里,我们的专家解释了它的工作原理。 在Python中,类包含属性和方法。属性是存储数据的变量。类方法是属于类的函数,通常对类属性执行一些逻辑。在本文中&…...
Reactor反应器模式
文章目录 一、单线程Reactor反应器模式二、多线程Reactor反应器模式 在Java的OIO编程中,最初和最原始的网络服务器程序使用一个while循环,不断地监听端口是否有新的连接,如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网…...
alibaba.fastjson的使用(六) -- JavaBean==》Json字符串、JSONObject、JSONArray
目录 1. JavaBean转 Json字符串 2. JavaBean转 JSONObject 3. List转JSONArray 在pom文件中引入依赖: <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.14</version></dependency&…...
uniapp 自定义导航栏
自定义导航栏 修改 pages.json 在 pages.json 中将 navigateionStyle 设为 custom 新建 systemInfo.js systemInfo.js 用来获取当前设备的机型系统信息,放在 common 目录下 /*** 此 js 文件管理关于当前设备的机型系统信息*/ const systemInfo function() {/***…...
查分小程序:一键查询成绩,班主任和家长的得力助手
作为一名老师,是否曾经为了让学生能够方便地查询成绩而烦恼?担心学生忘记密码?还是手动输入成绩太繁琐?今天,给大家分享一个超级实用的查分小程序,让成绩查询变得更轻松! 什么是成绩查询系统&am…...
Linux内核驱动开发的步骤
Linux操作系统的内核是一个强大的、开源的操作系统内核,它为各种硬件设备提供支持。为了让硬件设备能够与Linux系统无缝协作,需要编写相应的内核驱动程序。本文将介绍Linux内核驱动开发的一般步骤,以帮助开发者了解如何创建自己的内核驱动。 …...
【Java 进阶篇】HTML DOM 事件详解
当用户在网页上点击按钮、输入文本、鼠标移动到某个区域或执行其他互动操作时,这些动作都可以触发事件。HTML DOM(文档对象模型)允许我们使用JavaScript来捕获、处理和响应这些事件,以实现网页的交互和动态性。本篇博客将围绕HTML…...
redis 从小白到大师系列
字符串 Redis 字符串数据类型 set 字符串 /*** 设置字符串*/ $t $redis->set(o1,o1); //返回true or false var_dump($t);get字符串 /*** 获取字符串*/ $t $redis->get(o1); //返回true or false var_dump($t);结果: string(2) “o1” 返回 key 中字符串…...
vue使用.filter方法检索数组中指定时间段内的数据
假设你有一个名为dataArray的数组,其中包含了你要筛选的数据。那么,你可以按照以下步骤进行筛选: 创建一个名为filteredArray的新数组,用于存储筛选后的结果。使用数组的filter方法遍历dataArray,并对每个元素应用筛选…...
Ubuntu 安装 npm 和 node
前言 最近学习VUE,在ubuntu 2204 上配置开发环境,涉及到npm node nodejs vue-Cli脚手架等内容,做以记录。 一、node nodejs npm nvm 区别 ? node 是框架,类似python的解释器。nodejs 是编程语言,是js语言的…...
Matlab论文插图绘制模板第122期—函数折线图(fplot)
本期分享的是函数折线图的绘制模板。 所谓函数折线图,就是将自定义线函数进行可视化表达。 先来看一下成品效果: 特别提示:本期内容『数据代码』已上传资源群中,加群的朋友请自行下载。有需要的朋友可以关注同名公号【阿昆的…...
IK分词器如何修改支持跨版本ES
一、问题描述:IK分词器版本和ES版本不一致,无法找到和自己ES版本匹配的分词器。 IK分词器,提供的插件版本,远赶不上ES的更新版本,在使用过程中,不一定能顺利的找到与自己使用的ES版本相对应。在ES集群中使用…...
Spring MVC常用十大注解
Spring MVC常用十大注解 一,什么要使用注解 使用注解可以简化配置,提高代码的可读性和可维护性。通过注解可以实现依赖注入,减少手动管理对象的代码量。注解还支持面向切面编程,实现切面、切入点和通知等。此外,注解提…...
二、【MyBatis】 MyBatis入门与简单使用
二、【MyBatis】 MyBatis入门与简单使用 二、【MyBatis】 MyBatis入门与简单使用一、什么是ORM二、为什么mybatis是半自动的ORM框架2.1 Hibernate优点2.2 Hibernate缺点2.3 MyBatis与Hibernate区别三、Mybatis快速入门3.1 项目引入Maven相关依赖3.2 创建测试数据库3.3 编写数据…...
网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...
[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?
🧠 智能合约中的数据是如何在区块链中保持一致的? 为什么所有区块链节点都能得出相同结果?合约调用这么复杂,状态真能保持一致吗?本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里…...
多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...
黑马Mybatis
Mybatis 表现层:页面展示 业务层:逻辑处理 持久层:持久数据化保存 在这里插入图片描述 Mybatis快速入门 ` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...
连锁超市冷库节能解决方案:如何实现超市降本增效
在连锁超市冷库运营中,高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术,实现年省电费15%-60%,且不改动原有装备、安装快捷、…...
postgresql|数据库|只读用户的创建和删除(备忘)
CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...
【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)
可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...
srs linux
下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935,SRS管理页面端口是8080,可…...
华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

