【实战-08】flink 消费kafka自定义序列化
目的
让从kafka消费出来的数据,直接就转换成我们的对象
mvn pom
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.boke</groupId><artifactId>Flink1.7.1</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version>
<!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version>
<!-- <scope>provided</scope>--></dependency><!-- table 环境依赖【connectors 和 formats 和driver】 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/configuration/overview/ --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version>
<!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version>
<!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.18</version></dependency><!--idea 运行比西甲这个否则报错:【 Make sure a planner module is on the classpath】--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><!--第三方的包--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>--><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.boke.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>
核心代码
package com.boke.kafka;
import com.alibaba.fastjson.JSONObject;
public class Student {
public String name;
public Integer age;
public Student(String name, Integer age) {this.name = name;this.age = age;
}public static Student fromJson(String s){JSONObject jsonObject = JSONObject.parseObject(s);String name = jsonObject.getString("name");Integer age = jsonObject.getInteger("age");return new Student(name,age);
}public String getName() {return name;
}public void setName(String name) {this.name = name;
}public Integer getAge() {return age;
}public void setAge(Integer age) {this.age = age;
}
}
//下面是main主函数
package com.boke.kafka;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.nio.charset.StandardCharsets;
public class kafkaSource {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource source = KafkaSource.builder()
.setBootstrapServers(“brokers”)
.setTopics(“input-topic”)
.setGroupId(“my-group”)
.setStartingOffsets(OffsetsInitializer.earliest())//【无论如何都从最早开始消费】
// .setStartingOffsets(OffsetsInitializer.latest())//【无论如何都从最新开始消费】
// .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))//【groupid 存在offset 则从offset消费,否则从最早开始消费】
// .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))//【groupid 存在offset 则从offset消费,否则从最新开始消费】
// .setDeserializer(KafkaRecordDeserializationSchema.of(new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema())))
// .setDeserializer(KafkaRecordDeserializationSchema.of(new SimpleStringSchema());
.setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserializationSchema()))
// .setDeserializer(KafkaRecordDeserializationSchema.valueOnly())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
}
}
class MyKafkaDeserializationSchema implements KafkaDeserializationSchema{
@Override
public boolean isEndOfStream(Student nextElement) {return false;
}
//Deserializes the Kafka record.
//Params:
//record – Kafka record to be deserialized.
//Returns:
//The deserialized message as an object (null if the message cannot be deserialized).
@Override
public Student deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
/*
*自定义kafka反序列化
*如果数据异常,可以直接返回nulll即可,源码中有一句英文:null if the message cannot be deserialized
* */
String topic = record.topic();
long KafkaTimeStamp = record.timestamp();
int partitionNum = record.partition();
String value = new String(record.value(), StandardCharsets.UTF_8);
return Student.fromJson(value);
}
@Override
public TypeInformation<Student> getProducedType() {return TypeInformation.of(new TypeHint<Student>() {});
}
}
相关文章:
【实战-08】flink 消费kafka自定义序列化
目的 让从kafka消费出来的数据,直接就转换成我们的对象 mvn pom <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information …...
深入浅出 Django 异步编程
随着 Web 应用对性能的要求日益提高,异步编程成为了提升响应速度、提高系统吞吐量的重要手段。Django 作为一个成熟的 Python Web 框架,自 3.1 版本开始支持了异步编程。在本文中,我们将探讨 Django 异步编程的关键概念,并提供实际…...
力扣 138. 随机链表的复制
文章目录 1.解题思路2.代码实现 1.解题思路 在原先链表的每一个元素后面插入一个与前一个相同val的值的结点,然后由于是在原链表进行的操作,因此找每个random就变得很方便直接访问即可,此题目的精髓是cur1->randomp->random->next,看懂这串代码…...
STM32外部中断大问题
问题:一直进入中断,没有触发信号,也一直进入。 描述:开PA0为外部中断,刚刚很好,一个触发信号一个中断,中断函数没有丢,也没有抢跑,开PA1为外部中断也是,都很好…...
FPGA配置采集AR0135工业相机,提供2套工程源码和技术支持
目录 1、前言免责声明 2、AR0135工业相机简介3、我这里已有的 FPGA 图像处理解决方案4、设计思路框架AR0135配置和采集图像缓存视频输出 5、vivado工程1–>Kintex7开发板工程6、vivado工程1–>Zynq7100开发板工程7、上板调试验证8、福利:工程代码的获取 1、前…...
KubeSphere v3.4.0 部署K8S Docker + Prometheus + grafana
KubeSphere v3.4.0 部署K8S 1、整体思路2、修改linux主机名3、 离线安装3.1 问题列表3.2 执行命令成功列表 1、整体思路 将KubeSphere v3.4.0 安装包传输到其中一台机器修改Linux主机名(选取3台,修改为master01、master02、master03)安装官方…...
Codeforces Round 908 (Div. 2)题解
目录 A. Secret Sport 题目分析: B. Two Out of Three 题目分析: C. Anonymous Informant 题目分析: A. Secret Sport 题目分析: A,B一共打n场比赛,输入一个字符串由A和‘B’组成代表A赢或者B赢(无平局),因为题目说明这个人…...
Redis笔记 Redis主从同步
文章目录 Redis主从搭建主从架构主从数据同步原理全量同步增量同步repl_backlog原理 主从同步优化小结 Redis主从 搭建主从架构 单节点Redis的并发能力是有上限的,要进一步提高Redis的并发能力,就需要搭建主从集群,实现读写分离。 主从数据…...
数据结构-Prim算法构造无向图的最小生成树
引子: 无向图如果是一个网,那么它的所有的生成树中必有一颗生成树的边的权值之和是最小的,我们称 这颗权值和最小的树为:“最小生成树”(MST)。 其中,一棵树的代价就是树中所有权值之和。 而…...
MFC串口通信(SerialPort)
目录 1、SerialPort类的介绍和使用: (1)、SerialPort类的功能介绍 (2)、SerialPort类提供接口函数的介绍 1)、InitPort函数 2)、控制串口监视线程函数 3)、获取事件,…...
Vim基本使用操作
前言:作者也是初学Linux,可能总结的还不是很到位 Linux修炼功法:初阶功法 ♈️今日夜电波:美人鱼—林俊杰 0:21━━━━━━️💟──────── 4:14 …...
【深蓝学院】手写VIO第8章--相机与IMU时间戳同步--作业
0. 题目 1. T1 逆深度参数化时的特征匀速模型的重投影误差 参考常鑫助教的答案:思路是将i时刻的观测投到world系,再用j时刻pose和外参投到j时刻camera坐标系下,归一化得到预测的二维坐标(这里忽略了camera的内参,逆深…...
Naocs配置中心配置映射List、Map、Map嵌套List等方式
一、配置映射List 1、常规逐个配置方式,示例如下: 代码: @Data @Configuration @ConfigurationProperties(prefix = "list-json-str") public class ConfListByJsonStr implements Serializable, InitializingBean {@ApiModelProperty("映射结果集")…...
如何通过CRM系统进行销售机会管理?
销售机会管理是在销售过程中对潜在客户的精细化管理,销售机会管理的本质是公司用于管理销售机会通用的工具和方法。对于希望建立长期客户关系的现代销售团队来说,CRM客户管理系统是必不可少的工具。那企业如何通过CRM系统进行销售机会管理? …...
解决idea启动tomcat控制台中文乱码
#1.tomcat日志中文乱码# 如图这种情况,一般在idea用tomcat跑一个web项目启动后tomcat日志在控制台打印出来会出现中文乱码的情况 解决方案1:tomcat的日志配置文件的编码修改,找到tomcat安装目录conf下的logging.properties,encod…...
vscode + cmake + opencv example
nice try on macos CMakeLists.txt cmake_minimum_required(VERSION 3.20) #添加OPENCV库 #指定OpenCV版本,代码如下 #find_package(OpenCV 3.3 REQUIRED) #如果不需要指定OpenCV版本,代码如下 find_package(OpenCV REQUIRED)#添加OpenCV头文件 includ…...
day57【动态规划】647.回文子串 516.最长回文子序列
文章目录 647. 回文子串516.最长回文子序列 647. 回文子串 力扣题目链接 代码随想录讲解 题意:给你一个字符串 s ,请你统计并返回这个字符串中 回文子串 的数目。 回文字符串 是正着读和倒过来读一样的字符串。 子字符串 是字符串中的由连续字符组成的…...
分享vmware和Oracle VM VirtualBox虚拟机的区别,简述哪一个更适合我?
VMware和Oracle VM VirtualBox虚拟机的区别主要体现在以下几个方面: 首先两种软件的安装使用教程如下: 1:VMware ESXI 安装使用教程 2:Oracle VM VirtualBox安装使用教程 商业模式:VMware是一家商业公司,而…...
YOLOV5模型运行
1安装包 如果已经有了torch-cuda环境直接在环境下 pip install -r requirements.txt 2解决报错代码 raise ImportError("Failed to initialize: {0}".format(exc)) from exc ImportError: Failed to initialize: Bad git executable. The git executable must be …...
@Autowired和@Resource注解的区别和联系
直接看原文 原文链接: 【精选】Autowired和Resource注解的区别和联系【精选】 ------------------------------------------------------------------------------------------------------------------------------- 先说联系 联系 Autowired和Resource注解都是作为bean…...
idea大量爆红问题解决
问题描述 在学习和工作中,idea是程序员不可缺少的一个工具,但是突然在有些时候就会出现大量爆红的问题,发现无法跳转,无论是关机重启或者是替换root都无法解决 就是如上所展示的问题,但是程序依然可以启动。 问题解决…...
C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...
基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...
Unity3D中Gfx.WaitForPresent优化方案
前言 在Unity中,Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染(即CPU被阻塞),这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案: 对惹,这里有一个游戏开发交流小组&…...
Admin.Net中的消息通信SignalR解释
定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...
JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作
一、上下文切换 即使单核CPU也可以进行多线程执行代码,CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短,所以CPU会不断地切换线程执行,从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...
Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)
目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...
return this;返回的是谁
一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请,不同级别的经理有不同的审批权限: // 抽象处理者:审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...
Mysql故障排插与环境优化
前置知识点 最上层是一些客户端和连接服务,包含本 sock 通信和大多数jiyukehuduan/服务端工具实现的TCP/IP通信。主要完成一些简介处理、授权认证、及相关的安全方案等。在该层上引入了线程池的概念,为通过安全认证接入的客户端提供线程。同样在该层上可…...
