当前位置: 首页 > news >正文

Kafka与Flink的整合 -- sink、source

1、首先导入依赖:
        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.2</version></dependency>
2、 source:Flink从Kafka中读取数据
public class Demo01KafkaSource {public static void main(String[] args) throws Exception{//构建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//构建kafka source 环境KafkaSource<String> source = KafkaSource.<String>builder()//指定broker列表.setBootstrapServers("master:9092,node1:9092,node2:9092")//指定topic.setTopics("bigdata")//指定消费组.setGroupId("my-group")//指定数据的读取的位置,earliest指的是读取最早的数据,latest:指定的读取的是最新的数据.setStartingOffsets(OffsetsInitializer.earliest())//读取数据格式:.setValueOnlyDeserializer(new SimpleStringSchema()).build();//使用kafka数据源DataStreamSource<String> kafkaSourceDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");kafkaSourceDS.print();//启动flinkenv.execute();}
}
        启动生产kafka:
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata
3、sink:Flink向Kafka中写入数据
public class Demo02KafkaSink {public static void main(String[] args) throws Exception{//构建flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//读取数据文件:DataStreamSource<String> studentDS = env.readTextFile("flink/data/students.txt");//创建kafka sinkKafkaSink<String> sink = KafkaSink.<String>builder()//指定flink broker列表.setBootstrapServers("master:9092,node1:9092,node2:9092")//指定数据的格式:.setRecordSerializer(KafkaRecordSerializationSchema.builder()//指定topic,如果topic不存在就会自动的创建一个分区是1个副本是1个的topic.setTopic("student")//指定数据的格式.setValueSerializationSchema(new SimpleStringSchema()).build())//指定数据处理的语义:.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();//执行flinkstudentDS.sinkTo(sink);//构建flink环境env.execute();}
}
        启动消费kafka:
kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic student

相关文章:

Kafka与Flink的整合 -- sink、source

1、首先导入依赖&#xff1a; <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.2</version></dependency> 2、 source&#xff1a;Flink从Kafka中读取数据 p…...

小鱼ROS

git clone git clone https://ghproxy.com/https://github.com/stilleshan/ServerStatus git clone 私有仓库 Clone 私有仓库需要用户在 Personal access tokens 申请 Token 配合使用.git clone https://user:your_tokenghproxy.com/https://github.com/your_name/your_priv…...

简单讲讲RISC-V跳转指令基于具体场景的实现

背景 在 RISC-V指令集中&#xff0c;一共有 6 条有条件跳转指令&#xff0c;分别是 beq、bne、blt、bltu、bge、bgeu。如下是它们的定义与接口 BEQ rs1, rs2, imm ≠ BNE rs1, rs2, imm &#xff1c; BLT rs1, rs2, imm ≥ BGE rs1, rs2, imm < unsigned BLTU rs1…...

第13章 Java IO流处理(一) File类

目录 内容说明 章节内容 一、 File类 内容说明 结合章节内容重点难点,会对重要知识点进行扩展,以及做示例说明等,以便更好理解重点难点 章节内容 一、 File类 1、文件与目录的描述类——File ✔️ File类并不用来进行文件的读/写操作,并未涉及到写入或读取文件内容的…...

测试面试题集锦(四)| Linux 与 Python 编程篇(附答案)

本系列文章总结归纳了一些软件测试工程师常见的面试题&#xff0c;主要来源于个人面试遇到的、网络搜集&#xff08;完善&#xff09;、工作日常讨论等&#xff0c;分为以下十个部分&#xff0c;供大家参考。如有错误的地方&#xff0c;欢迎指正。有更多的面试题或面试中遇到的…...

pytorch中的矩阵乘法

1. 运算符介绍 关于运算&#xff0c;*运算&#xff0c;torch.mul(), torch.mm(), torch.mv(), tensor.t() 和 *代表矩阵的两种相乘方式&#xff1a; 表示常规的数学上定义的矩阵相乘&#xff1b; *表示两个矩阵对应位置处的两个元素相乘。 1.1 矩阵点乘 *和torch.mul()等同…...

Java--Stream流详解

Stream是Java 8 API添加的一个新的抽象&#xff0c;称为流Stream&#xff0c;以一种声明性方式处理数据集合&#xff08;侧重对于源数据计算能力的封装&#xff0c;并且支持序列与并行两种操作方式&#xff09; Stream流是从支持数据处理操作的源生成的元素序列&#xff0c;源可…...

[PHP]ShopXO企业级B2C免费开源商城系统 v2.3.1

ShopXO 企业级B2C免费开源电商系统&#xff01; 求实进取、创新专注、自主研发、国内领先企业级B2C电商系统解决方案。 遵循Apache2开源协议发布&#xff0c;无需授权、可商用、可二次开发、满足99%的电商运营需求。 PCH5、支付宝小程序、微信小程序、百度小程序、头条&抖音…...

Python基础入门系列详解20篇

Python基础入门&#xff08;1&#xff09;----Python简介 Python基础入门&#xff08;2&#xff09;----安装Python环境&#xff08;Windows、MacOS、CentOS、Ubuntu&#xff09; Python基础入门&#xff08;3&#xff09;----Python基础语法&#xff1a;解释器、标识符、关键…...

P02项目(学习)

★ P02项目 项目描述&#xff1a;安全操作项目旨在提高医疗设备的安全性&#xff0c;特别是在医生离开操作屏幕时&#xff0c;以减少非授权人员的误操作风险。为实现这一目标&#xff0c;我们采用多层次的保护措施&#xff0c;包括人脸识别、姿势检测以及二维码识别等技术。这些…...

pandas 笔记:get_dummies分类变量one-hot化

1 函数介绍 pandas.get_dummies 是 pandas 库中的一个函数&#xff0c;它用于将分类变量转换为哑变量/指示变量。所谓的哑变量&#xff0c;就是将分类变量的每一个不同的值转换为一个新的0/1变量。在输出的DataFrame中&#xff0c;每一列都以该值的名称命名 pandas.get_dummi…...

PTE作文练习(一)

目录 65分备考建议 WE模版 范文 Supporting ideas: SWT 65分备考建议 RA重在多听标准的正确的示范&#xff0c;RS重在抓大放小&#xff0c;WFD重在整理错题&#xff0c;以及反反复复的车轮战&#xff0c;FIBRW重在“以对代记” 就是直接看答案&#xff0c;节约时间&#…...

如何做到一套FPGA工程无缝兼容两款不同的板卡?

试想这样一种场景,有两款不同的FPGA板卡,它们的功能代码90%都是一样的,但是两个板卡的管脚分配完全不同,一般情况下,我们需要设计两个工程,两套代码,之后还需要一直维护两个版本。 那么有没有一种自动化的方式,实现一个工程,编译出一个程序文件,下载到这两个不同的板…...

VSCode修改主题为Eclipse 绿色护眼模式

前言 从参加开发以来&#xff0c;一直使用eclipse进行开发&#xff0c;基本官方出新版本&#xff0c;我都会更新。后来出来很多其他的IDE工具&#xff0c;我也尝试了&#xff0c;但他们的主题都把我劝退了&#xff0c;黑色主题是谁想出来&#xff1f;&#x1f602; 字体小的时…...

conan和cmake编译器版本不匹配问题解决

conan和cmake编译器版本不匹配问题解决 1 问题现象2 解决方法2.1 在CMakeLists.txt禁止编译器检查2.1.1 修改方式 2.2 探查问题出现的根本原因2.2.1 安装升级gcc2.2.2 安装升级g 注 执行环境&#xff1a;ubuntu 1 问题现象 conan要求的编译器版本和cmake检测到的当前的编译器…...

float单精度浮点数如何在计算机中存储

文章目录 1 float型数据组成2 实际举例3 代码测试4 写在最后 1 float型数据组成 按照IEEE浮点标准存储浮点数时&#xff0c;一个float型的值由1个符号位&#xff08;最左边的位或最高有效位&#xff09;、8个指数位以及23个小数位依次组成: 符号位为0时表示正数&#xff0c;为1…...

机器视觉在虚拟现实与增强现实中的作用

机器视觉在虚拟现实&#xff08;VR&#xff09;和增强现实&#xff08;AR&#xff09;中发挥着至关重要的作用。这些技术的核心是计算机视觉领域&#xff0c;重点是让计算机具有“看到”和理解周围世界的能力。 在虚拟现实中&#xff0c;计算机视觉用于创建和处理用户所见的虚…...

红黑数原理及存在原因

我红黑树那么牛&#xff0c;你们为什么不用&#xff1f;_哔哩哔哩_bilibili 面试时经常会被问到红黑树&#xff0c;它到底有什么优点呢&#xff1f; 对于查找数据&#xff0c;数组二分查询速度最快&#xff0c;时间复杂度为O(logN)。但是如果增加和删除数据&#xff0c;数组就…...

Ansible入门—安装部署及各个模块应用案例(超详细)

目录 前言 一、环境概况 修改主机名&#xff08;可选项&#xff09; 二、安装部署 1.安装epel扩展源 2.安装Ansible 3.修改Ansible的hosts文件 4.生成密钥 三、Ansible模块使用介绍 Command模块 Shell模块 User模块 Copy模块 File模块 Hostname模块 Yum模块 Se…...

Spring Boot 3系列之-启动类详解

Spring Boot是一个功能强大、灵活且易于使用的框架&#xff0c;它极大地简化了Spring应用程序的开发和部署流程&#xff0c;使得开发人员能够更专注于业务逻辑的实现。在我们的Spring Boot 3系列之一&#xff08;初始化项目&#xff09;文章中&#xff0c;我们使用了Spring官方…...

[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?

&#x1f9e0; 智能合约中的数据是如何在区块链中保持一致的&#xff1f; 为什么所有区块链节点都能得出相同结果&#xff1f;合约调用这么复杂&#xff0c;状态真能保持一致吗&#xff1f;本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里&#xf…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

工业安全零事故的智能守护者:一体化AI智能安防平台

前言&#xff1a; 通过AI视觉技术&#xff0c;为船厂提供全面的安全监控解决方案&#xff0c;涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面&#xff0c;能够实现对应负责人反馈机制&#xff0c;并最终实现数据的统计报表。提升船厂…...

关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案

问题描述&#xff1a;iview使用table 中type: "index",分页之后 &#xff0c;索引还是从1开始&#xff0c;试过绑定后台返回数据的id, 这种方法可行&#xff0c;就是后台返回数据的每个页面id都不完全是按照从1开始的升序&#xff0c;因此百度了下&#xff0c;找到了…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢

随着互联网技术的飞速发展&#xff0c;消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁&#xff0c;不仅优化了客户体验&#xff0c;还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用&#xff0c;并…...

如何在看板中有效管理突发紧急任务

在看板中有效管理突发紧急任务需要&#xff1a;设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP&#xff08;Work-in-Progress&#xff09;弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中&#xff0c;设立专门的紧急任务通道尤为重要&#xff0c;这能…...

【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统

目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索&#xff08;基于物理空间 广播范围&#xff09;2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

小木的算法日记-多叉树的递归/层序遍历

&#x1f332; 从二叉树到森林&#xff1a;一文彻底搞懂多叉树遍历的艺术 &#x1f680; 引言 你好&#xff0c;未来的算法大神&#xff01; 在数据结构的世界里&#xff0c;“树”无疑是最核心、最迷人的概念之一。我们中的大多数人都是从 二叉树 开始入门的&#xff0c;它…...

聚六亚甲基单胍盐酸盐市场深度解析:现状、挑战与机遇

根据 QYResearch 发布的市场报告显示&#xff0c;全球市场规模预计在 2031 年达到 9848 万美元&#xff0c;2025 - 2031 年期间年复合增长率&#xff08;CAGR&#xff09;为 3.7%。在竞争格局上&#xff0c;市场集中度较高&#xff0c;2024 年全球前十强厂商占据约 74.0% 的市场…...

SQL注入篇-sqlmap的配置和使用

在之前的皮卡丘靶场第五期SQL注入的内容中我们谈到了sqlmap&#xff0c;但是由于很多朋友看不了解命令行格式&#xff0c;所以是纯手动获取数据库信息的 接下来我们就用sqlmap来进行皮卡丘靶场的sql注入学习&#xff0c;链接&#xff1a;https://wwhc.lanzoue.com/ifJY32ybh6vc…...