当前位置: 首页 > news >正文

Spark-Streaming+Kafka+mysql实战示例

文章目录

  • 前言
  • 一、简介
    • 1. Spark-Streaming简介
    • 2. Kafka简介
  • 二、实战演练
    • 1. MySQL数据库部分
    • 2. 导入依赖
    • 3. 编写实体类代码
    • 4. 编写kafka主题管理代码
    • 5. 编写kafka生产者代码
    • 6. 编写Spark-Streaming代码
    • 7. 查看数据库
    • 8. 代码下载
  • 总结


前言

本文将介绍一个使用Spark Streaming和Kafka进行实时数据处理的示例。通过该示例,您将了解到如何使用Spark Streaming和Kafka处理实时数据流,以及如何将处理后的数据保存到MySQL数据库中。示例涵盖了从环境搭建到代码实现的全过程,帮助您快速上手实时数据处理的开发。

zookeeper安装教程:zookeeper安装与配置:使用shell脚本在centos上进行zookeeper自动化下载安装配置(集群搭建版)
kafka安装教程:Kafka安装与配置-shell脚本一键安装配置(集群版)


一、简介

1. Spark-Streaming简介

Spark Streaming是Apache Spark的一个组件,用于实时流数据处理。它提供了高级别的API,可以使用类似于批处理的方式处理实时数据流。Spark Streaming可以与各种消息队列系统集成,包括Kafka、RabbitMQ等。

2. Kafka简介

Kafka是一个分布式流处理平台,具有高吞吐量、可扩展性和可靠性。它提供了一种可持久化、分布式、分区的日志服务,用于处理实时数据流。Kafka使用发布-订阅模型,消息被发布到一个或多个主题,然后由订阅该主题的消费者进行消费。


二、实战演练

开始之前先启动zookeeper集群和kafka集群。

1. MySQL数据库部分

这部分代码用于创建MySQL数据库和数据表,以及将从Kafka获取的数据保存到数据库中。

create database kafkademo;

创建数据表:

CREATE TABLE kafka_tb
(`txid`      varchar(255) PRIMARY KEY,`version`   varchar(255),`connector` varchar(255),`name`      varchar(255),`ts_ms`     varchar(255),`snapshot`  varchar(255),`db`        varchar(255),`sequence`  varchar(255),`schema`    varchar(255),`table`     varchar(255),`lsn`       varchar(255),`xmin`      varchar(255)
);

2. 导入依赖

这部分代码是Maven的依赖配置,用于引入所需的Spark、Kafka和MySQL相关的库。

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version><scope>compile</scope>
</dependency>

3. 编写实体类代码

这部分代码定义了一个Java类EntityMessage,用于将从Kafka获取的JSON数据转换为Java对象。

package com.zcs;import lombok.Data;import java.io.Serializable;/*** @author zcs2312* @date 2023/12/12 20:49:47* @product_name IntelliJ IDEA* @project_name spark-kafka*/
@Data
public class EntityMessage implements Serializable {private String op;private String ts_ms;private String transaction;private DataItem dataItem;@Datapublic static class DataItem {private String version;private String connector;private String name;private String ts_ms;private String snapshot;private String db;private String[] sequence;private String schema;private String table;private String txId;private String lsn;private String xmin;}
}

4. 编写kafka主题管理代码

这部分代码用于创建、删除和修改Kafka主题的一些操作。

package com.zcs;import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;import java.util.*;
import java.util.concurrent.ExecutionException;/*** @author zcs2312* @date 2023/12/12 20:51:34* @product_name IntelliJ IDEA* @project_name spark-kafka*/
public class KafkaTopicManager {

相关文章:

Spark-Streaming+Kafka+mysql实战示例

文章目录 前言一、简介1. Spark-Streaming简介2. Kafka简介二、实战演练1. MySQL数据库部分2. 导入依赖3. 编写实体类代码4. 编写kafka主题管理代码5. 编写kafka生产者代码6. 编写Spark-Streaming代码7. 查看数据库8. 代码下载总结前言 本文将介绍一个使用Spark Streaming和Ka…...

C++改写为C

stm使用中&#xff0c;经常能见到CPP的示例&#xff0c;这些是给arduino&#xff0c;esp32用的&#xff0c;stm32 也支持cpp但是你就想用c怎么办呢&#xff0c;比如我在新手的时候&#xff1a;&#xff1a; 这个双冒号就难住了英雄好汉 比如这是个cpp的 如果类不多的情况下 改写…...

抖去推--短视频剪辑、矩阵无人直播saas营销工具一站式开发

抖去推是一款短视频剪辑和矩阵无人直播SAAS营销工具一站式开发平台。它提供了以下功能和特点&#xff1a; 1. 短视频剪辑&#xff1a;抖去推提供了一系列的剪辑工具&#xff0c;包括自动剪辑、特效制作、配音配乐等&#xff0c;可以帮助用户轻松制作出高质量的短视频。 2. 矩阵…...

HBase 详细图文介绍

目录 一、HBase 定义 二、HBase 数据模型 2.1 HBase 逻辑结构 2.2 HBase 物理存储结构 ​2.3 数据模型 2.3.1 Name Space 2.3.2 Table 2.3.3 Row 2.3.4 Column 2.3.5 Time Stamp 2.3.6 Cell 三、HBase 基本架构 架构角色 3.1 Master 3.2 Region Server 3.3 Zo…...

Hanlp自然语言处理如何再Spring Boot中使用

一、HanLP HanLP (Hankcs NLP) 是一个自然语言处理工具包&#xff0c;具有功能强大、性能高效、易于使用的特点。HanLP 主要支持中文文本处理&#xff0c;包括分词、词性标注、命名实体识别、依存句法分析、关键词提取、文本分类、情感分析等多种功能。 HanLP 可以在 Java、Py…...

MySQL 是什么?

MySQL官方网站&#xff08;http://www.mysql.com/&#xff09;提供关于MySQL软件的最新信息。 MySQL是一个数据库管理系统。 数据库是一种结构化的数据集合。它可以是从简单的购物清单到图片库&#xff0c;再到企业网络中的大量信息等任何形式。要添加、访问和处理存储在计算…...

yarn link使用(npm link)

使用场景 前端开发中&#xff0c;两个项目相互依赖时&#xff0c;使用yarn link(npm link)链接 例如&#xff1a;A项目依赖于本司自己的UI库B&#xff0c;当我们修改了UI库B中的某些代码时&#xff0c;需本地验证后再发布到私服&#xff0c;此时A项目与UI项目B通过yarn link连…...

Docker容器讲解

Docker是一个开源的容器化平台&#xff0c;可以用来在轻量级容器中打包、部署和运行应用程序。Docker的基本概念包括容器、镜像、仓库和服务。 容器是一个独立运行的应用程序包&#xff0c;包括应用程序及其依赖项、运行时环境和配置等。容器相互隔离&#xff0c;可以在不同的…...

three.js模拟太阳系

地球的旋转轨迹目前设置为了圆形&#xff0c;效果&#xff1a; <template><div><el-container><el-main><div class"box-card-left"><div id"threejs" style"border: 1px solid red"></div><div c…...

WPF仿网易云搭建笔记(1):项目搭建

文章目录 前言项目地址动态样式组合样式批量样式覆盖Prism新建UserControler修改Material Design 笔刷收放列表可以滚动的StackPanel列表点击展开或折叠 实现效果 前言 今天接着继续细化代码&#xff0c;把整体框架写出来 项目地址 WPF仿网易云 Gitee仓库 动态样式 【WPF】C#…...

DDOS 攻击是什么?有哪些常见的DDOS攻击?

DDOS简介 DDOS又称为分布式拒绝服务&#xff0c;全称是Distributed Denial of Service。DDOS本是利用合理的请求造成资源过载&#xff0c;导致服务不可用&#xff0c;从而造成服务器拒绝正常流量服务。就如酒店里的房间是有固定的数量的&#xff0c;比如一个酒店有50个房间&am…...

未来应用从何而来:认知力延伸、边界突破、回归云与产业

文 | 智能相对论 作者 | 沈浪 或许&#xff0c;谁也没想到未来应用来的如此之快&#xff0c;现如今传统应用从开发到体验&#xff0c;已经进入了一个前所未有的颠覆性改革阶段。 不久前&#xff0c;美国人工智能公司OpenAI举办开发者大会。在现场&#xff0c;公司创始人Sam …...

vue零基础

vue 与其他框架的对比 框架设计模式数据绑定灵活度文件模式复杂性学习曲线生态VueMVVM双向灵活单文件小缓完善ReactMVC单向较灵活all in js大陡丰富AngularMVC双向固定多文件较大较陡&#xff08;Typescript&#xff09;独立 更多对比细节&#xff1a;vue 官网&#xff1a;ht…...

html中一个div中平均一行分配四个盒子,可展开与收起所有的盒子

html中一个div中平均一行分配四个盒子&#xff0c;可展开与收起所有的盒子 1.截图显示部分 2.代码展示部分 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"wid…...

Python虚拟环境指南:告别依赖地狱

一、背景 在SAAS&#xff08;软件即服务&#xff09;平台中&#xff0c;用户使用自行定制的Python脚本已经成为司空见惯的做法&#xff0c;然而&#xff0c;由于不同用户对Python三方库的需求各不相同&#xff0c;而底层服务器一般只安装了一个Python版本。举例来说&#xff0…...

【Jeecg Boot 3 - 第二天】第2节 前后端docker部署云服务器

更新完成&#xff0c;点击下面章节进入 一、后端部署 1.1、后端 docker-compose 部署 JEECGBOOT 1.2、jar 包和 lib 依赖分离&#xff0c;部署包缩小100倍 二、前端部署 2.1、nginx 部署 JEECGBOOT VUE3 2.2、开启Nginx压缩&#xff0c;解决前端访问慢问题...

2020年第九届数学建模国际赛小美赛A题自由泳解题全过程文档及程序

2020年第九届数学建模国际赛小美赛 A题 自由泳 原题再现&#xff1a; 在所有常见的游泳泳姿中&#xff0c;哪一种最快&#xff1f;哪个冲程推力最大&#xff1f;在自由泳项目中&#xff0c;游泳者可以选择他们的泳姿&#xff0c;他们通常选择前面的爬行。然而&#xff0c;游泳…...

双端队列和优先级队列

文章目录 前言dequedeque底层设计迭代器设计 priority仿函数数组中的第k个最大元素优先级队列模拟实现pushpop调整仿函数存储自定义类型 前言 今天要介绍比较特殊的结构&#xff0c;双端队列。 还有一个适配器&#xff0c;优先级队列。 deque 栈的默认容器用了一个deque的东西…...

c#读取CSV文件跟Excel导入成DataTble

1.读取CSV文件 /// <summary>/// 读取CSV文件/// </summary>/// <param name"fileName">文件路径</param>public static DataTable ReadCSV(string fileName){DataTable dt new DataTable();FileStream fs new FileStream(fileName, FileM…...

Python编程技巧 – 单字符函数

Python编程技巧 – 单字符函数 Python Programming Skills – Single Character Function By JacksonML 0. 前言 Python有其内建(built-in)的一系列函数&#xff0c;其中&#xff0c;有两个函数为长度为一的字符设计。这样的函数是单字符函数&#xff0c;尽管它们操作的对象…...

python打卡day49

知识点回顾&#xff1a; 通道注意力模块复习空间注意力模块CBAM的定义 作业&#xff1a;尝试对今天的模型检查参数数目&#xff0c;并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

五年级数学知识边界总结思考-下册

目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解&#xff1a;由来、作用与意义**一、知识点核心内容****二、知识点的由来&#xff1a;从生活实践到数学抽象****三、知识的作用&#xff1a;解决实际问题的工具****四、学习的意义&#xff1a;培养核心素养…...

剑指offer20_链表中环的入口节点

链表中环的入口节点 给定一个链表&#xff0c;若其中包含环&#xff0c;则输出环的入口节点。 若其中不包含环&#xff0c;则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...

Psychopy音频的使用

Psychopy音频的使用 本文主要解决以下问题&#xff1a; 指定音频引擎与设备&#xff1b;播放音频文件 本文所使用的环境&#xff1a; Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...

Robots.txt 文件

什么是robots.txt&#xff1f; robots.txt 是一个位于网站根目录下的文本文件&#xff08;如&#xff1a;https://example.com/robots.txt&#xff09;&#xff0c;它用于指导网络爬虫&#xff08;如搜索引擎的蜘蛛程序&#xff09;如何抓取该网站的内容。这个文件遵循 Robots…...

让AI看见世界:MCP协议与服务器的工作原理

让AI看见世界&#xff1a;MCP协议与服务器的工作原理 MCP&#xff08;Model Context Protocol&#xff09;是一种创新的通信协议&#xff0c;旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天&#xff0c;MCP正成为连接AI与现实世界的重要桥梁。…...

Mobile ALOHA全身模仿学习

一、题目 Mobile ALOHA&#xff1a;通过低成本全身远程操作学习双手移动操作 传统模仿学习&#xff08;Imitation Learning&#xff09;缺点&#xff1a;聚焦与桌面操作&#xff0c;缺乏通用任务所需的移动性和灵活性 本论文优点&#xff1a;&#xff08;1&#xff09;在ALOHA…...

论文笔记——相干体技术在裂缝预测中的应用研究

目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术&#xff1a;基于互相关的相干体技术&#xff08;Correlation&#xff09;第二代相干体技术&#xff1a;基于相似的相干体技术&#xff08;Semblance&#xff09;基于多道相似的相干体…...

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制

目录 节点的功能承载层&#xff08;GATT/Adv&#xff09;局限性&#xff1a; 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能&#xff0c;如 Configuration …...

手动给中文分词和 直接用神经网络RNN做有什么区别

手动分词和基于神经网络&#xff08;如 RNN&#xff09;的自动分词在原理、实现方式和效果上有显著差异&#xff0c;以下是核心对比&#xff1a; 1. 实现原理对比 对比维度手动分词&#xff08;规则 / 词典驱动&#xff09;神经网络 RNN 分词&#xff08;数据驱动&#xff09…...