五分钟,Docker安装flink,并使用flinksql消费kafka数据
1、拉取flink镜像,创建网络
docker pull flink
docker network create flink-network
2、创建 jobmanager
# 创建 JobManager docker run \-itd \--name=jobmanager \--publish 8081:8081 \--network flink-network \--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \flink:latest jobmanager
3、创建 taskmanager
# 创建 TaskManager docker run \-itd \--name=taskmanager \--network flink-network \--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \flink:latest taskmanager
4、访问 http://localhost:8081/

4.1 修改Task Slots
默认的Slots num是1,我们可以修改为5:
修改的目录是jobmanager和taskmanager的/opt/flink/conf的flink-conf.yaml文件:


修改taskmanager.numberOfTaskSlots:即可。
注意:默认的docker容器中没有vi/vim命令,可以使用docker cp命令,复制出来修改,然后在复制回去,如下:
docker cp taskmanager:/opt/flink/conf/flink-conf.yaml .
docker cp flink-conf.yaml taskmanager:/opt/flink/conf/

5、通过flinksql消费Kafka
确保有一个可用的kafka,如果没有,可以五分钟内,Docker搭建一个
Docker安装kafka 3.5
并且通过python,简单写一个生产者
Python生产、消费Kafka
5.1 导入flink-sql-connector-kafka jar包
顾名思义,用于连接flinksql和kafka。
进入flink
docker exec -it jobmanager /bin/bash
进入 flink的bin目录
cd /opt/flink/bin
查看flink版本:
flink --version
可以看出,我的版本是1.18.0

根据自己的flink版本,下载对应的 flink-sql-connector-kafka jar包
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka
因为我是1.18.0,所以选择下图的版本包:

点进去进行下载:

将下载的jar包,分别在jobmanager,taskmanager /opt/flink/lib目录下,注意,是两个都要放,如下图:

可以使用docker cp test.txt jobmanager:/opt/flink/lib命令,用户宿主机和docker容器文件传输。把test.txt换成对应的jar包即可
5.2 flinksql消费kafka
进入jobmanager中,执行
cd /opt/flink/bin
sql-client.sh


Flink SQL执行以下语句:
CREATE TABLE KafkaTable (`count_num` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka','topic' = 'kafka_demo','properties.bootstrap.servers' = '192.168.10.15:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'json'
);show tables;
select * from KafkaTable;
可以看到Flink在消费kafka数据,如下图:

相关文章:
五分钟,Docker安装flink,并使用flinksql消费kafka数据
1、拉取flink镜像,创建网络 docker pull flink docker network create flink-network2、创建 jobmanager # 创建 JobManager docker run \-itd \--namejobmanager \--publish 8081:8081 \--network flink-network \--env FLINK_PROPERTIES"jobmanager.rpc.ad…...
【小聆送书第一期】让架构师的成神之路温暖你这个不景气的冬天
🌈个人主页:聆风吟 🔥系列专栏:网络奇遇记、数据结构 🔖少年有梦不应止于心动,更要付诸行动。 文章目录 📋前言 书籍一览 ⛳️书籍一⛳️书籍二⛳️书籍三⛳️书籍四⛳️书籍五⛳️书籍六⛳️书…...
网页爬虫反扒措施有哪些?
爬虫之常见的反扒 cookies 一般用requests直接请求网址的时候有时候可能会遇到反扒措施,这时候可以考虑一下加上user-agent伪装成浏览器;也可能有登录限制,这时候cookies就有用处了 浏览器中的cookie是保存我们的账号数据和访问记录&#…...
C#实现批量生成二维码
相信大家都使用过草料二维码生成器,单独生成二维码可以,但是批量生成二维码就需要收费了。既然要收费,那就自己写一个。 接口采用导入Excel文件生成二维码,首先需要读取Excel的数据,方法如下所示: /// <…...
3种在ArcGIS Pro中制作山体阴影的方法
山体阴影可以更直观的展现地貌特点,表达真实的地形,这里为大家介绍一下在ArcGIS Pro中制作山体阴影的方法,希望能对你有所帮助。 数据来源 本教程所使用的数据是从水经微图中下载的DEM数据,除了DEM数据,常见的GIS数据…...
【ChatGLM2-6B】Docker下部署及微调
【ChatGLM2-6B】小白入门及Docker下部署 一、简介1、ChatGLM2是什么2、组成部分3、相关地址 二、基于Docker安装部署1、前提2、CentOS7安装NVIDIA显卡驱动1)查看服务器版本及显卡信息2)相关依赖安装3)显卡驱动安装 2、 CentOS7安装NVIDIA-Doc…...
输入两个整数,输出它们的乘积。 ← Python 及 C++ 代码比较
【题目描述】 输入两个整数,输出它们的乘积。【Python代码】 x,ymap(int,input().split()) print(x*y) 【C代码】 #include<bits/stdc.h> using namespace std;int x,y; int main() {cin>>x>>y;cout<<x*y<<endl;return 0; }/* in:…...
C语言——从键盘输人一个表示年份的整数,判断该年份是否为闰年,并显示判断结果。
#define _CRT_SECURE_NO_WARNINGS 1#include<stdio.h> int main() {int year 0;printf("请输入年份:");scanf("%d",&year);if((year%4 0) && (year%100!0) || (year%400 0)){printf("%d是闰年\n",year);}else{p…...
出于隐私和安全的考虑,有时需要从谷歌删除你的个人数据,有两种方法
如果你是公众人物、企业或拥有个人品牌的人,那么拥有在线形象很重要。然而,你可能会发现,通过谷歌搜索,陌生人可以获得你的个人信息,如联系方式、地址和财务信息,这会让你感到不安。 幸运的是,…...
【同一局域网下】两台电脑之间互ping
两台电脑互ping 首先需要连接同一网咯关闭需要ping的电脑的防火墙 关闭防火墙步骤(以win11系统为例): 设置 --> 隐私和安全性 --> Windows 安全中心 打开Windows安全中心 防火墙和网络保护 --> 选择正在使用的网络 关闭 ping其他…...
【精选】Ajax技术知识点合集
Ajax技术详解 Ajax简介 Ajax 即“Asynchronous Javascript And XML”(异步 JavaScript 和 XML),是指一种创建 交互式、快速动态应用的网页开发技术,无需重新加载整个网页的情况下,能够更新页面局 部数据的技术。通过在…...
智能优化算法应用:基于水循环算法无线传感器网络(WSN)覆盖优化 - 附代码
智能优化算法应用:基于水循环算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于水循环算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.水循环算法4.实验参数设定5.算法结果6.参考文献7.…...
java-netty知识点笔记和注意事项
如何获取ctx的id 使用ctx.ctx.toString()就可以了 public void channelRead(ChannelHandlerContext ctx, Object msg) {//传来的消息包装成字节缓冲区String byteBuf (String) msg; // ByteBuf byteBuf (ByteBuf) msg;//Netty提供了字节缓冲区的toString方法ÿ…...
英伟达不同系列GPU介绍
英伟达有以下几个系列的产品线,并介绍它们的特点和主要应用领域: 1. GeForce系列(G系列): - 特点:GeForce系列是英伟达主打的消费级GPU产品线,注重提供高性能的图形处理能力和游戏特性。它们…...
C语言——I /深入理解指针(二)
一、数组名的理解 int arr[10] {1,2,3,4,5,6,7,8,9,10}; int *p &arr[0];这⾥我们使⽤ &arr[0] 的⽅式拿到了数组第⼀个元素的地址,但是其实数组名本来就是地址,⽽且 是数组⾸元素的地址,我们来做个测试。 #include <stdio.…...
MySQL使用函数和存储过程实现:向数据表快速插入大量测试数据
实现过程 1.创建表 CREATE TABLE user_info (id INT(11) NOT NULL AUTO_INCREMENT,name VARCHAR(20) DEFAULT NULL,age INT(3) DEFAULT NULL,pwd VARCHAR(20) DEFAULT NULL,phone_number VARCHAR(11) DEFAULT NULL,email VARCHAR(255) DEFAULT NULL,address VARCHAR(255) DEF…...
力扣labuladong——一刷day59
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、力扣549. 二叉树中最长的连续序列二、力扣1325. 删除给定值的叶子节点 前言 像求和、求高度这种基本的二叉树函数很容易写,有时候只要在它们的后…...
接口性能测试 —— Jmeter并发与持续性压测
接口压测的方式: 1、同时并发:设置线程组、执行时间、循环次数,这种方式可以控制接口请求的次数 2、持续压测:设置线程组、循环次数,勾选“永远”,调度器(持续时间),这种…...
redis报错3
INFO: Initializing SpringDispatcherServletdispatcherServlet...
Proteus的网络标号与总线
Proteus为了减少过多、复杂的连线,可以使用网络标号与总线配合使用。 Proteus的导线上添加了网络标号,意味着在Proteus上相同的网络标号是连在一起的,所说在图纸上看不出来。 如下图是比较好的Proteus中使用总线的绘制的图纸。可以效仿着画…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
龙虎榜——20250610
上证指数放量收阴线,个股多数下跌,盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型,指数短线有调整的需求,大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的:御银股份、雄帝科技 驱动…...
树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
oracle与MySQL数据库之间数据同步的技术要点
Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异,它们的数据同步要求既要保持数据的准确性和一致性,又要处理好性能问题。以下是一些主要的技术要点: 数据结构差异 数据类型差异ÿ…...
React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...
HashMap中的put方法执行流程(流程图)
1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中,其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下: 初始判断与哈希计算: 首先,putVal 方法会检查当前的 table(也就…...
Go 语言并发编程基础:无缓冲与有缓冲通道
在上一章节中,我们了解了 Channel 的基本用法。本章将重点分析 Go 中通道的两种类型 —— 无缓冲通道与有缓冲通道,它们在并发编程中各具特点和应用场景。 一、通道的基本分类 类型定义形式特点无缓冲通道make(chan T)发送和接收都必须准备好࿰…...
【笔记】WSL 中 Rust 安装与测试完整记录
#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统:Ubuntu 24.04 LTS (WSL2)架构:x86_64 (GNU/Linux)Rust 版本:rustc 1.87.0 (2025-05-09)Cargo 版本:cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...
LangFlow技术架构分析
🔧 LangFlow 的可视化技术栈 前端节点编辑器 底层框架:基于 (一个现代化的 React 节点绘图库) 功能: 拖拽式构建 LangGraph 状态机 实时连线定义节点依赖关系 可视化调试循环和分支逻辑 与 LangGraph 的深…...
