Kafka与Flink的整合 -- sink、source
1、首先导入依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.2</version></dependency>
2、 source:Flink从Kafka中读取数据
public class Demo01KafkaSource {public static void main(String[] args) throws Exception{//构建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//构建kafka source 环境KafkaSource<String> source = KafkaSource.<String>builder()//指定broker列表.setBootstrapServers("master:9092,node1:9092,node2:9092")//指定topic.setTopics("bigdata")//指定消费组.setGroupId("my-group")//指定数据的读取的位置,earliest指的是读取最早的数据,latest:指定的读取的是最新的数据.setStartingOffsets(OffsetsInitializer.earliest())//读取数据格式:.setValueOnlyDeserializer(new SimpleStringSchema()).build();//使用kafka数据源DataStreamSource<String> kafkaSourceDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");kafkaSourceDS.print();//启动flinkenv.execute();}
}
启动生产kafka:
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata
3、sink:Flink向Kafka中写入数据
public class Demo02KafkaSink {public static void main(String[] args) throws Exception{//构建flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//读取数据文件:DataStreamSource<String> studentDS = env.readTextFile("flink/data/students.txt");//创建kafka sinkKafkaSink<String> sink = KafkaSink.<String>builder()//指定flink broker列表.setBootstrapServers("master:9092,node1:9092,node2:9092")//指定数据的格式:.setRecordSerializer(KafkaRecordSerializationSchema.builder()//指定topic,如果topic不存在就会自动的创建一个分区是1个副本是1个的topic.setTopic("student")//指定数据的格式.setValueSerializationSchema(new SimpleStringSchema()).build())//指定数据处理的语义:.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();//执行flinkstudentDS.sinkTo(sink);//构建flink环境env.execute();}
}
启动消费kafka:
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic student
相关文章:
Kafka与Flink的整合 -- sink、source
1、首先导入依赖: <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.2</version></dependency> 2、 source:Flink从Kafka中读取数据 p…...
小鱼ROS
git clone git clone https://ghproxy.com/https://github.com/stilleshan/ServerStatus git clone 私有仓库 Clone 私有仓库需要用户在 Personal access tokens 申请 Token 配合使用.git clone https://user:your_tokenghproxy.com/https://github.com/your_name/your_priv…...
简单讲讲RISC-V跳转指令基于具体场景的实现
背景 在 RISC-V指令集中,一共有 6 条有条件跳转指令,分别是 beq、bne、blt、bltu、bge、bgeu。如下是它们的定义与接口 BEQ rs1, rs2, imm ≠ BNE rs1, rs2, imm < BLT rs1, rs2, imm ≥ BGE rs1, rs2, imm < unsigned BLTU rs1…...
第13章 Java IO流处理(一) File类
目录 内容说明 章节内容 一、 File类 内容说明 结合章节内容重点难点,会对重要知识点进行扩展,以及做示例说明等,以便更好理解重点难点 章节内容 一、 File类 1、文件与目录的描述类——File ✔️ File类并不用来进行文件的读/写操作,并未涉及到写入或读取文件内容的…...
测试面试题集锦(四)| Linux 与 Python 编程篇(附答案)
本系列文章总结归纳了一些软件测试工程师常见的面试题,主要来源于个人面试遇到的、网络搜集(完善)、工作日常讨论等,分为以下十个部分,供大家参考。如有错误的地方,欢迎指正。有更多的面试题或面试中遇到的…...
pytorch中的矩阵乘法
1. 运算符介绍 关于运算,*运算,torch.mul(), torch.mm(), torch.mv(), tensor.t() 和 *代表矩阵的两种相乘方式: 表示常规的数学上定义的矩阵相乘; *表示两个矩阵对应位置处的两个元素相乘。 1.1 矩阵点乘 *和torch.mul()等同…...
Java--Stream流详解
Stream是Java 8 API添加的一个新的抽象,称为流Stream,以一种声明性方式处理数据集合(侧重对于源数据计算能力的封装,并且支持序列与并行两种操作方式) Stream流是从支持数据处理操作的源生成的元素序列,源可…...
[PHP]ShopXO企业级B2C免费开源商城系统 v2.3.1
ShopXO 企业级B2C免费开源电商系统! 求实进取、创新专注、自主研发、国内领先企业级B2C电商系统解决方案。 遵循Apache2开源协议发布,无需授权、可商用、可二次开发、满足99%的电商运营需求。 PCH5、支付宝小程序、微信小程序、百度小程序、头条&抖音…...
Python基础入门系列详解20篇
Python基础入门(1)----Python简介 Python基础入门(2)----安装Python环境(Windows、MacOS、CentOS、Ubuntu) Python基础入门(3)----Python基础语法:解释器、标识符、关键…...
P02项目(学习)
★ P02项目 项目描述:安全操作项目旨在提高医疗设备的安全性,特别是在医生离开操作屏幕时,以减少非授权人员的误操作风险。为实现这一目标,我们采用多层次的保护措施,包括人脸识别、姿势检测以及二维码识别等技术。这些…...
pandas 笔记:get_dummies分类变量one-hot化
1 函数介绍 pandas.get_dummies 是 pandas 库中的一个函数,它用于将分类变量转换为哑变量/指示变量。所谓的哑变量,就是将分类变量的每一个不同的值转换为一个新的0/1变量。在输出的DataFrame中,每一列都以该值的名称命名 pandas.get_dummi…...
PTE作文练习(一)
目录 65分备考建议 WE模版 范文 Supporting ideas: SWT 65分备考建议 RA重在多听标准的正确的示范,RS重在抓大放小,WFD重在整理错题,以及反反复复的车轮战,FIBRW重在“以对代记” 就是直接看答案,节约时间&#…...
如何做到一套FPGA工程无缝兼容两款不同的板卡?
试想这样一种场景,有两款不同的FPGA板卡,它们的功能代码90%都是一样的,但是两个板卡的管脚分配完全不同,一般情况下,我们需要设计两个工程,两套代码,之后还需要一直维护两个版本。 那么有没有一种自动化的方式,实现一个工程,编译出一个程序文件,下载到这两个不同的板…...
VSCode修改主题为Eclipse 绿色护眼模式
前言 从参加开发以来,一直使用eclipse进行开发,基本官方出新版本,我都会更新。后来出来很多其他的IDE工具,我也尝试了,但他们的主题都把我劝退了,黑色主题是谁想出来?😂 字体小的时…...
conan和cmake编译器版本不匹配问题解决
conan和cmake编译器版本不匹配问题解决 1 问题现象2 解决方法2.1 在CMakeLists.txt禁止编译器检查2.1.1 修改方式 2.2 探查问题出现的根本原因2.2.1 安装升级gcc2.2.2 安装升级g 注 执行环境:ubuntu 1 问题现象 conan要求的编译器版本和cmake检测到的当前的编译器…...
float单精度浮点数如何在计算机中存储
文章目录 1 float型数据组成2 实际举例3 代码测试4 写在最后 1 float型数据组成 按照IEEE浮点标准存储浮点数时,一个float型的值由1个符号位(最左边的位或最高有效位)、8个指数位以及23个小数位依次组成: 符号位为0时表示正数,为1…...
机器视觉在虚拟现实与增强现实中的作用
机器视觉在虚拟现实(VR)和增强现实(AR)中发挥着至关重要的作用。这些技术的核心是计算机视觉领域,重点是让计算机具有“看到”和理解周围世界的能力。 在虚拟现实中,计算机视觉用于创建和处理用户所见的虚…...
红黑数原理及存在原因
我红黑树那么牛,你们为什么不用?_哔哩哔哩_bilibili 面试时经常会被问到红黑树,它到底有什么优点呢? 对于查找数据,数组二分查询速度最快,时间复杂度为O(logN)。但是如果增加和删除数据,数组就…...
Ansible入门—安装部署及各个模块应用案例(超详细)
目录 前言 一、环境概况 修改主机名(可选项) 二、安装部署 1.安装epel扩展源 2.安装Ansible 3.修改Ansible的hosts文件 4.生成密钥 三、Ansible模块使用介绍 Command模块 Shell模块 User模块 Copy模块 File模块 Hostname模块 Yum模块 Se…...
Spring Boot 3系列之-启动类详解
Spring Boot是一个功能强大、灵活且易于使用的框架,它极大地简化了Spring应用程序的开发和部署流程,使得开发人员能够更专注于业务逻辑的实现。在我们的Spring Boot 3系列之一(初始化项目)文章中,我们使用了Spring官方…...
Graphormer实战:输入SMILES字符串,5分钟获取分子属性预测结果
Graphormer实战:输入SMILES字符串,5分钟获取分子属性预测结果 1. 为什么选择Graphormer进行分子属性预测 在药物发现和材料科学领域,准确预测分子属性是核心挑战之一。传统方法通常需要复杂的实验或耗时的计算模拟,而Graphormer…...
QWEN-AUDIO功能全解析:声波可视化、情感指令、四种人声,到底怎么用?
QWEN-AUDIO功能全解析:声波可视化、情感指令、四种人声,到底怎么用? 1. 认识QWEN-AUDIO语音合成系统 QWEN-AUDIO是一款基于Qwen3-Audio架构构建的智能语音合成系统,它能够将文字转换成带有情感和温度的自然语音。这个系统最特别…...
Mac窗口置顶终极指南:用Topit解锁你的多任务超能力 [特殊字符]
Mac窗口置顶终极指南:用Topit解锁你的多任务超能力 🚀 【免费下载链接】Topit Pin any window to the top of your screen / 在Mac上将你的任何窗口强制置顶 项目地址: https://gitcode.com/gh_mirrors/to/Topit 还在为频繁切换窗口而烦恼&#x…...
CogVideoX-2b作品集:这些流畅自然的视频都是用文字生成的
CogVideoX-2b作品集:这些流畅自然的视频都是用文字生成的 当文字能够直接转化为流畅自然的视频,创作的门槛将被彻底打破。CogVideoX-2b作为智谱AI开源的文字生成视频工具,正在让这一愿景成为现实。本文将展示一系列由该模型生成的惊艳视频作…...
八大网盘直链下载神器:告别客户端依赖,解锁高速下载新体验
八大网盘直链下载神器:告别客户端依赖,解锁高速下载新体验 【免费下载链接】Online-disk-direct-link-download-assistant 一个基于 JavaScript 的网盘文件下载地址获取工具。基于【网盘直链下载助手】修改 ,支持 百度网盘 / 阿里云盘 / 中国…...
如何让QQ音乐下载的加密歌曲在任何设备上自由播放?qmcdump解密工具深度解析
如何让QQ音乐下载的加密歌曲在任何设备上自由播放?qmcdump解密工具深度解析 【免费下载链接】qmcdump 一个简单的QQ音乐解码(qmcflac/qmc0/qmc3 转 flac/mp3),仅为个人学习参考用。 项目地址: https://gitcode.com/gh_mirrors/q…...
从理论到实践:基于MATLAB的ZF、ML、MRC与MMSE信号检测算法性能深度剖析
1. 信号检测算法入门:从通信系统到MATLAB实现 第一次接触信号检测算法时,我被各种缩写搞得晕头转向。直到在MIMO系统项目中真正用MATLAB实现了这些算法,才明白它们就像不同的"翻译官",负责把混乱的接收信号还原成原始信…...
LongCat动物百变秀效果展示:橘猫变布偶、柯基穿毛衣,AI编辑惊艳案例
LongCat动物百变秀效果展示:橘猫变布偶、柯基穿毛衣,AI编辑惊艳案例 1. 开篇:当AI成为宠物造型师 想象一下这样的场景:你拍了一张自家橘猫的照片,突然想看看它变成高贵布偶猫的样子;或者给柯基犬穿上毛衣…...
AI 模型推理框架对比 TensorRT vs ONNX
AI模型推理框架对比:TensorRT与ONNX的深度解析在人工智能技术飞速发展的今天,模型推理框架的选择直接影响着部署效率与性能表现。NVIDIA推出的TensorRT与微软主导的ONNX作为两大主流推理框架,各自拥有独特的优势与适用场景。本文将从多个维度…...
寻音捉影·侠客行从零开始:基于ModelScope FunASR的私有化语音检索实践
寻音捉影侠客行:从零开始基于ModelScope FunASR的私有化语音检索实践 1. 什么是“寻音捉影侠客行”? 在信息爆炸的时代,我们每天面对大量语音内容——会议录音、课程回放、采访素材、客服对话……但想从中快速找到一句关键话,却…...
