Kafka集成Flume/Spark/Flink(大数据)/SpringBoot
Kafka集成Flume
Flume生产者
③、安装Flume,上传apache-flume的压缩包.tar.gz到Linux系统的software,并解压到/opt/module目录下,并修改其名称为flume
Flume消费者
Kafka集成Spark
生产者
object SparkKafkaProducer{def main(args:Array[String]):Unit = {//配置信息val properties = new Properties()properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092")properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])//创建一个生产者var producer = new KafkaProducer[String,String](properties)//发送数据for(i <- 1 to 5){producer.send(new ProducerRecord[String,String]("first","atguigu"+i))}//关闭资源producer.close()}
}
消费者
Object SparkKafkaConsumer{def main(args:Array[String]):Unit = {//初始化上下文环境val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")val ssc = new StreamingContext(conf,Seconds(3))//消费数据val kafkapara = Map[String,Object](ConsumerConfig.BOOT_STRAP_SERVERS_CONFIG->"hadoop102:9092,hadoop103:9092",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.GROUP_ID_CONFIG->"test")val kafkaDStream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreFerConsistent,ConsumerStrategies.Subscribe[String,String](Set("first"),kafkapara))val valueDStream = kafkaDStream.map(record=>record.value())valueDStream.print()//执行代码,并阻塞ssc.start()ssc.awaitTermination()}
}
Kafka集成Flink
创建maven项目,导入以下依赖
resources里面添加log4j.properties文件,可以更改打印日志的级别为error
Flink生产者
public class FlinkafkaProducer1{public static void main(String[] args){//获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//准备数据源ArrayList<String> wordList = new ArrayList<>();wordList.add("hello");wordList.add("atguigu");DataStreamSource<String> stream = env.fromCollection();//创建一个kafka生产者Properties properteis = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("first",new SimpleStringSchema(),properties);//添加数据源Kafka生产者stream.addSink(kafkaProducer);//执行env.execute();}
}
Flink消费者
public class FlinkafkaConsumer1{public static void main(String[] args){//获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//创建一个消费者Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("first",new SimpleSStringSchema(),properties);//关联消费者和flink流env.addSource(kafkaConsumer).print();//执行env.execute();}
}
Kafka集成SpringBoot
生产者
通过浏览器发送
消费者
相关文章:

Kafka集成Flume/Spark/Flink(大数据)/SpringBoot
Kafka集成Flume Flume生产者 ③、安装Flume,上传apache-flume的压缩包.tar.gz到Linux系统的software,并解压到/opt/module目录下,并修改其名称为flume Flume消费者 Kafka集成Spark 生产者 object SparkKafkaProducer{def main(args:Array[S…...

Scratch节日 | 拯救屈原 | 端午节
端午节快乐! 这款特别为端午节打造的Scratch游戏 《拯救屈原》,将带你走进古代中国,感受历史与文化的魅力! 🏮 游戏介绍 扮演勇敢的探险者,穿越时空回到古代,解锁谜题,完成任务&…...

rabbitmq Direct交换机简介
在实际开发中,需求可能变得复杂,如消息的收发和处理。以支付系统为例,成功支付后需要改变订单状态并通知用户,而失败则不需要。为处理这种情况,提出了使用Direct交换机,它可以根据规则将消息路由到指定队列…...

Git实战--基于已有分支克隆进行项目开发的完整流程
Git克隆项目开发流程 ✅ 一、完整流程概述✅ 二、详细操作步骤Step 1:克隆仓库(如果尚未克隆)Step 2:获取远程分支信息并切换到 feature/ 获取所有远程分支Step 3:创建并切换到你的新分支Step 4:开始开发新…...
MapReduce(期末速成版)
起初在B站看3分钟的速成视频,感觉很多细节没听懂。 具体例子解析(文件内容去重) 对于两个输入文件,即文件A 和文件B,请编写MapReduce 程序,对两个文件进行合并,并剔除 其中重复的内容,得到一个新的输出文件…...
鸿蒙OSUniApp 移动端直播流播放实战:打造符合鸿蒙设计风格的播放器#三方框架 #Uniapp
UniApp 移动端直播流播放实战:打造符合鸿蒙设计风格的播放器 在移动互联网时代,直播已经成为一种主流的内容形式。本文将详细介绍如何使用 UniApp 框架开发一个优雅的直播流播放器,并融入鸿蒙系统的设计理念,实现一个既美观又实用…...

C3、C2f、C3K2、C2PSA的具体结构
YOLOV5 C3 Bottleneck C2f...

2_MCU开发环境搭建-配置MDK兼容Keil4和C51
MCU开发环境搭建-配置MDK兼容Keil4和C51 一、概述 本文以MDK-ARM V5.36版本基础介绍DMK-ARM工程兼容Keil4和C51的配置。 注:在阅读本文前,请先安装和配置完成MDK-ARM(Keil5)。 二、工具包下载 链接: https://pan.baidu.com/s/1Tu2tDD6zRra4xb_PuA1Wsw 提取码: 81pp 三、…...

通过远程桌面连接Windows实例提示“出现身份验证错误,无法连接到本地安全机构”错误怎么办?
本文介绍通过远程桌面连接Windows实例提示“出现身份验证错误无法连接到本地安全机构”错误的解决方案。 问题现象 通过本地电脑内的远程桌面连接Windows实例提示“出现身份验证错误,无法连接到本地安全机构”错误。 问题原因 导致该问题的可能原因如下&#x…...

百度golang研发一面面经
输入一个网址,到显示界面,中间的过程是怎样的 IP 报文段的结构是什么 Innodb 的底层结构 知道几种设计模式 工厂模式 简单工厂模式:根据传入类型参数判断创建哪种类型对象工厂方法模式:由子类决定实例化哪个类抽象工厂模式&#…...

TC3xx学习笔记-启动过程详解(一)
文章目录 前言Firmware启动过程BMHD Check流程ABM启动Internal Flash启动Bootloader ModeProcessing in case no valid BMHD foundProcessing in case no Boot Mode configured by SSW 总结 前言 之前介绍过UCB BMHD的使用,它在启动过程中起着重要的作用࿰…...

Scratch节日 | 六一儿童节抓糖果
六一儿童节怎么能没有糖果?这款 六一儿童节抓糖果 小游戏,让你变身小猫,开启一场甜蜜大作战! 🎮 游戏玩法 帮助小猫收集所有丢失的糖果,收集越多分数越高! 小心虫子一样的“坏糖果”ÿ…...
系统调用与程序接口的关系
程序接口类型 系统调用:是操作系统提供给应用程序的接口 ,允许应用程序请求操作系统执行特定操作,像文件操作(打开、读写、关闭文件 )、进程管理(创建、终止进程 )、设备管理(操作磁…...
从线性方程组角度理解公式 s=n−r(3E−A)
从线性方程组角度理解公式 sn−r(3E−A) 这个公式本质上是 齐次线性方程组解空间维度 的直接体现。下面通过三个关键步骤解释其在线性方程组中的含义: 1. 公式对应的线性方程组 考虑矩阵方程: (3E−A)x0 其中: x 是 n 维未知向量3E−…...

通信算法之280:无人机侦测模块知识框架思维导图
1. 无人机侦测模块知识框架思维导图, 见文末章节。 2. OFDM参数估计,基于循环自相关特性。 3. 无人机其它参数估计...

【Doris基础】Apache Doris中的Coordinator节点作用详解
目录 1 Doris架构概述 2 Coordinator节点的核心作用 2.1 查询协调与调度 2.2 执行计划生成与优化 2.3 资源管理与负载均衡 2.4 容错与故障恢复 3 Coordinator节点的关键实现机制 3.1 两阶段执行模型 3.2 流水线执行引擎 3.3 分布式事务管理 4 Coordinator节点的高可…...
软考 系统架构设计师之考试感悟3
接前一篇文章:软考 系统架构设计师之考试感悟2 上周六(2025年5月24日),本人第三次参加了软考系统架构师的考试。和前两次一样,考了一天,身心俱疲。不过这次感觉比上一次要稍微好点,可能也是考的…...

【Kubernetes-1.30】--containerd部署
文章目录 一、环境准备1.1 三台服务器1.2 基础配置(三台机通用)1.3 关闭 Swap(必须)1.4 关闭防火墙(可选)1.5 加载必要模块 & 配置内核参数 二、安装容器运行时(containerd 推荐)…...
Flutter 嵌套H5 传参数
你可以通过在加载 H5 页面时,将 token 作为 URL 参数拼接,或者通过 WebView 的 JavaScript 通信功能(如 runJavaScript 或 addJavaScriptChannel)传递 token。常用方式如下: 方式一:URL 拼接参数 假设你的…...
什么是线程上下文切换?
导语: 线程上下文切换(Context Switch)是Java并发编程中一个常见但容易被忽视的概念。在高并发场景下,它直接影响系统性能。本文将从面试官角度深入剖析这个话题,帮你理解底层原理、掌握优化思路、规避项目中的常见陷阱…...
Jvm 元空间大小分配原则
JVM元空间(Metaspace)的大小分配原则与系统物理内存密切相关,但并不是直接等比例分配,而是通过一系列参数和JVM的动态管理机制来确定。下面从原理和实际行为两方面详细说明: 1. 元空间(Metaspace࿰…...

相机--相机标定
教程 内外参公式及讲解 相机标定分类 相机标定分为内参标定和外参标定。 相机成像原理 相机成像畸变 链接 四个坐标系的转变 内参标定 内参 相机内参通常用一个 33 矩阵(内参矩阵,KK)表示,形式如下: (1)焦距&…...

MongoDB(七) - MongoDB副本集安装与配置
文章目录 前言一、下载MongoDB1. 下载MongoDB2. 上传安装包3. 创建相关目录 二、安装配置MongoDB1. 解压MongoDB安装包2. 重命名MongoDB文件夹名称3. 修改配置文件4. 分发MongoDB文件夹5. 配置环境变量6. 启动副本集7. 进入MongoDB客户端8. 初始化副本集8.1 初始化副本集8.2 添…...

131. 分割回文串-两种回溯思路
我们可以将字符串分割成若干回文子串,返回所有可能的方案。如果将问题分解,可以表示为分割长度为n-1的子字符串,这与原问题性质相同,因此可以采用递归方法解决。 为什么回溯与递归存在联系?在解决这个问题时࿰…...

[Java恶补day13] 53. 最大子数组和
休息了一天,开始补上! 给你一个整数数组 nums ,请你找出一个具有最大和的连续子数组(子数组最少包含一个元素),返回其最大和。 子数组是数组中的一个连续部分。 示例 1: 输入:nums …...

摩尔投票算法原理实现一文剖析
摩尔投票算法原理&实现一文剖析 一、算法原理1.1 基本思想1.2 数学原理 二、算法实现2.1 Python实现2.2 Java实现2.3 C实现 三、复杂度分析四、应用场景4.1 多数元素问题4.2 扩展应用:寻找出现次数超过n/3的元素 五、算法优势与注意事项5.1 优势5.2 注意事项 总…...
springboot项目下面的单元测试注入的RedisConnectionFactory类redisConnectionFactory值为什么为空呢?
你遇到的问题是: RedisConnectionFactory redisConnectionFactory 在单元测试中为 null 这是 Spring Boot 单元测试中非常常见的问题,根本原因是你的测试类没有启用 Spring 容器上下文,导致 Resource 注解无法注入 Bean。 ✅ 正确做法&…...

MyBatis操作数据库(2)
1.#{}和${}使用 Interger类型的参数可以看到这里显示的语句是:select username,password,age,gender,phone from userinfo where id? 输入的参数并没有在后面进行拼接,,id的值是使用?进行占位,这种sql称之为"预编译sql".这里,把#{}改成${}观察情况:这里可以看到…...

C++面向对象(二)
面向对象基础内容参考: C面向对象(一)-CSDN博客 友元函数 类的友元函数是定义在类外部,但有权访问类的所有私有(private)成员和保护(protected)成员。尽管友元函数的原型有在类的定…...

【C语言入门级教学】冒泡排序和指针数组
文章目录 1.冒泡排序2.⼆级指针3.指针数组4.指针数组模拟⼆维数组 1.冒泡排序 冒泡排序的核⼼思想:两两相邻的元素进⾏⽐较。 //⽅法1 void bubble_sort(int arr[], int sz)//参数接收数组元素个数 { int i 0;for(i0; i-1; i) { int j 0; for(j0; j-1; j) { …...