当前位置: 首页 > news >正文

五分钟,Docker安装flink,并使用flinksql消费kafka数据

1、拉取flink镜像,创建网络

docker pull flink
docker network create flink-network

2、创建 jobmanager

# 创建 JobManager docker run \-itd \--name=jobmanager \--publish 8081:8081 \--network flink-network \--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \flink:latest jobmanager 

3、创建 taskmanager

# 创建 TaskManager docker run \-itd \--name=taskmanager \--network flink-network \--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \flink:latest taskmanager 

4、访问 http://localhost:8081/

在这里插入图片描述

4.1 修改Task Slots

默认的Slots num是1,我们可以修改为5:
修改的目录是jobmanager和taskmanager的/opt/flink/confflink-conf.yaml文件:
在这里插入图片描述
在这里插入图片描述
修改taskmanager.numberOfTaskSlots:即可。
注意:默认的docker容器中没有vi/vim命令,可以使用docker cp命令,复制出来修改,然后在复制回去,如下:

docker cp taskmanager:/opt/flink/conf/flink-conf.yaml .
docker cp flink-conf.yaml taskmanager:/opt/flink/conf/

在这里插入图片描述

5、通过flinksql消费Kafka

确保有一个可用的kafka,如果没有,可以五分钟内,Docker搭建一个
Docker安装kafka 3.5
并且通过python,简单写一个生产者
Python生产、消费Kafka

5.1 导入flink-sql-connector-kafka jar包

顾名思义,用于连接flinksql和kafka。
进入flink

docker exec -it jobmanager /bin/bash

进入 flink的bin目录

cd /opt/flink/bin

查看flink版本:

flink --version

可以看出,我的版本是1.18.0
在这里插入图片描述
根据自己的flink版本,下载对应的 flink-sql-connector-kafka jar包
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka
因为我是1.18.0,所以选择下图的版本包:
在这里插入图片描述
点进去进行下载:
在这里插入图片描述

将下载的jar包,分别在jobmanager,taskmanager /opt/flink/lib目录下,注意,是两个都要放,如下图:
在这里插入图片描述
可以使用docker cp test.txt jobmanager:/opt/flink/lib命令,用户宿主机和docker容器文件传输。把test.txt换成对应的jar包即可

5.2 flinksql消费kafka

进入jobmanager中,执行

cd /opt/flink/bin
sql-client.sh

在这里插入图片描述
在这里插入图片描述
Flink SQL执行以下语句:

CREATE TABLE KafkaTable (`count_num` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka','topic' = 'kafka_demo','properties.bootstrap.servers' = '192.168.10.15:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'json'
);show tables;
select * from KafkaTable;

可以看到Flink在消费kafka数据,如下图:
在这里插入图片描述

相关文章:

五分钟,Docker安装flink,并使用flinksql消费kafka数据

1、拉取flink镜像,创建网络 docker pull flink docker network create flink-network2、创建 jobmanager # 创建 JobManager docker run \-itd \--namejobmanager \--publish 8081:8081 \--network flink-network \--env FLINK_PROPERTIES"jobmanager.rpc.ad…...

【小聆送书第一期】让架构师的成神之路温暖你这个不景气的冬天

🌈个人主页:聆风吟 🔥系列专栏:网络奇遇记、数据结构 🔖少年有梦不应止于心动,更要付诸行动。 文章目录 📋前言 书籍一览 ⛳️书籍一⛳️书籍二⛳️书籍三⛳️书籍四⛳️书籍五⛳️书籍六⛳️书…...

网页爬虫反扒措施有哪些?

爬虫之常见的反扒 cookies 一般用requests直接请求网址的时候有时候可能会遇到反扒措施,这时候可以考虑一下加上user-agent伪装成浏览器;也可能有登录限制,这时候cookies就有用处了 浏览器中的cookie是保存我们的账号数据和访问记录&#…...

C#实现批量生成二维码

相信大家都使用过草料二维码生成器&#xff0c;单独生成二维码可以&#xff0c;但是批量生成二维码就需要收费了。既然要收费&#xff0c;那就自己写一个。 接口采用导入Excel文件生成二维码&#xff0c;首先需要读取Excel的数据&#xff0c;方法如下所示&#xff1a; /// <…...

3种在ArcGIS Pro中制作山体阴影的方法

山体阴影可以更直观的展现地貌特点&#xff0c;表达真实的地形&#xff0c;这里为大家介绍一下在ArcGIS Pro中制作山体阴影的方法&#xff0c;希望能对你有所帮助。 数据来源 本教程所使用的数据是从水经微图中下载的DEM数据&#xff0c;除了DEM数据&#xff0c;常见的GIS数据…...

【ChatGLM2-6B】Docker下部署及微调

【ChatGLM2-6B】小白入门及Docker下部署 一、简介1、ChatGLM2是什么2、组成部分3、相关地址 二、基于Docker安装部署1、前提2、CentOS7安装NVIDIA显卡驱动1&#xff09;查看服务器版本及显卡信息2&#xff09;相关依赖安装3&#xff09;显卡驱动安装 2、 CentOS7安装NVIDIA-Doc…...

输入两个整数,输出它们的乘积。 ← Python 及 C++ 代码比较

【题目描述】 输入两个整数&#xff0c;输出它们的乘积。【Python代码】 x,ymap(int,input().split()) print(x*y) 【C代码】 #include<bits/stdc.h> using namespace std;int x,y; int main() {cin>>x>>y;cout<<x*y<<endl;return 0; }/* in:…...

C语言——从键盘输人一个表示年份的整数,判断该年份是否为闰年,并显示判断结果。

#define _CRT_SECURE_NO_WARNINGS 1#include<stdio.h> int main() {int year 0;printf("请输入年份&#xff1a;");scanf("%d",&year);if((year%4 0) && (year%100!0) || (year%400 0)){printf("%d是闰年\n",year);}else{p…...

出于隐私和安全的考虑,有时需要从谷歌删除你的个人数据,有两种方法

如果你是公众人物、企业或拥有个人品牌的人&#xff0c;那么拥有在线形象很重要。然而&#xff0c;你可能会发现&#xff0c;通过谷歌搜索&#xff0c;陌生人可以获得你的个人信息&#xff0c;如联系方式、地址和财务信息&#xff0c;这会让你感到不安。 幸运的是&#xff0c;…...

【同一局域网下】两台电脑之间互ping

两台电脑互ping 首先需要连接同一网咯关闭需要ping的电脑的防火墙 关闭防火墙步骤&#xff08;以win11系统为例&#xff09;&#xff1a; 设置 --> 隐私和安全性 --> Windows 安全中心 打开Windows安全中心 防火墙和网络保护 --> 选择正在使用的网络 关闭 ping其他…...

【精选】Ajax技术知识点合集

Ajax技术详解 Ajax简介 Ajax 即“Asynchronous Javascript And XML”&#xff08;异步 JavaScript 和 XML&#xff09;&#xff0c;是指一种创建 交互式、快速动态应用的网页开发技术&#xff0c;无需重新加载整个网页的情况下&#xff0c;能够更新页面局 部数据的技术。通过在…...

智能优化算法应用:基于水循环算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于水循环算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于水循环算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.水循环算法4.实验参数设定5.算法结果6.参考文献7.…...

java-netty知识点笔记和注意事项

如何获取ctx的id 使用ctx.ctx.toString()就可以了 public void channelRead(ChannelHandlerContext ctx, Object msg) {//传来的消息包装成字节缓冲区String byteBuf (String) msg; // ByteBuf byteBuf (ByteBuf) msg;//Netty提供了字节缓冲区的toString方法&#xff…...

英伟达不同系列GPU介绍

英伟达有以下几个系列的产品线&#xff0c;并介绍它们的特点和主要应用领域&#xff1a; 1. GeForce系列&#xff08;G系列&#xff09;&#xff1a; - 特点&#xff1a;GeForce系列是英伟达主打的消费级GPU产品线&#xff0c;注重提供高性能的图形处理能力和游戏特性。它们…...

C语言——I /深入理解指针(二)

一、数组名的理解 int arr[10] {1,2,3,4,5,6,7,8,9,10}; int *p &arr[0];这⾥我们使⽤ &arr[0] 的⽅式拿到了数组第⼀个元素的地址&#xff0c;但是其实数组名本来就是地址&#xff0c;⽽且 是数组⾸元素的地址&#xff0c;我们来做个测试。 #include <stdio.…...

MySQL使用函数和存储过程实现:向数据表快速插入大量测试数据

实现过程 1.创建表 CREATE TABLE user_info (id INT(11) NOT NULL AUTO_INCREMENT,name VARCHAR(20) DEFAULT NULL,age INT(3) DEFAULT NULL,pwd VARCHAR(20) DEFAULT NULL,phone_number VARCHAR(11) DEFAULT NULL,email VARCHAR(255) DEFAULT NULL,address VARCHAR(255) DEF…...

力扣labuladong——一刷day59

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、力扣549. 二叉树中最长的连续序列二、力扣1325. 删除给定值的叶子节点 前言 像求和、求高度这种基本的二叉树函数很容易写&#xff0c;有时候只要在它们的后…...

接口性能测试 —— Jmeter并发与持续性压测

接口压测的方式&#xff1a; 1、同时并发&#xff1a;设置线程组、执行时间、循环次数&#xff0c;这种方式可以控制接口请求的次数 2、持续压测&#xff1a;设置线程组、循环次数&#xff0c;勾选“永远”&#xff0c;调度器&#xff08;持续时间&#xff09;&#xff0c;这种…...

redis报错3

INFO: Initializing SpringDispatcherServletdispatcherServlet...

Proteus的网络标号与总线

Proteus为了减少过多、复杂的连线&#xff0c;可以使用网络标号与总线配合使用。 Proteus的导线上添加了网络标号&#xff0c;意味着在Proteus上相同的网络标号是连在一起的&#xff0c;所说在图纸上看不出来。 如下图是比较好的Proteus中使用总线的绘制的图纸。可以效仿着画…...

RocketMQ延迟消息机制

两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

ip子接口配置及删除

配置永久生效的子接口&#xff0c;2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...

华为OD机考-机房布局

import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...

适应性Java用于现代 API:REST、GraphQL 和事件驱动

在快速发展的软件开发领域&#xff0c;REST、GraphQL 和事件驱动架构等新的 API 标准对于构建可扩展、高效的系统至关重要。Java 在现代 API 方面以其在企业应用中的稳定性而闻名&#xff0c;不断适应这些现代范式的需求。随着不断发展的生态系统&#xff0c;Java 在现代 API 方…...

论文阅读笔记——Muffin: Testing Deep Learning Libraries via Neural Architecture Fuzzing

Muffin 论文 现有方法 CRADLE 和 LEMON&#xff0c;依赖模型推理阶段输出进行差分测试&#xff0c;但在训练阶段是不可行的&#xff0c;因为训练阶段直到最后才有固定输出&#xff0c;中间过程是不断变化的。API 库覆盖低&#xff0c;因为各个 API 都是在各种具体场景下使用。…...

Linux中《基础IO》详细介绍

目录 理解"文件"狭义理解广义理解文件操作的归类认知系统角度文件类别 回顾C文件接口打开文件写文件读文件稍作修改&#xff0c;实现简单cat命令 输出信息到显示器&#xff0c;你有哪些方法stdin & stdout & stderr打开文件的方式 系统⽂件I/O⼀种传递标志位…...

js 设置3秒后执行

如何在JavaScript中延迟3秒执行操作 在JavaScript中&#xff0c;要设置一个操作在指定延迟后&#xff08;例如3秒&#xff09;执行&#xff0c;可以使用 setTimeout 函数。setTimeout 是JavaScript的核心计时器方法&#xff0c;它接受两个参数&#xff1a; 要执行的函数&…...

VUE3 ref 和 useTemplateRef

使用ref来绑定和获取 页面 <headerNav ref"headerNavRef"></headerNav><div click"showRef" ref"buttonRef">refbutton</div>使用ref方法const后面的命名需要跟页面的ref值一样 const buttonRef ref(buttonRef) cons…...

暴雨新专利解决服务器噪音与性能悖论

6月1日&#xff0c;我国首部数据中心绿色化评价方面国家标准《绿色数据中心评价》正式实施&#xff0c;为我国数据中心的绿色低碳建设提供了明确指引。《评价》首次将噪音控制纳入国家级绿色评价体系&#xff0c;要求从设计隔声结构到运维定期监测实现闭环管控&#xff0c;加速…...