flink: 将接收到的tcp文本流写入HBase
一、依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>pulsar-demo2</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><pulsar.version>2.8.0</pulsar.version><jackson.version>2.10.5</jackson.version><!--<jackson.version>2.6.7</jackson.version>--></properties><dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-all</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-kafka</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-spark</artifactId><version>${pulsar.version}</version><exclusions><exclusion><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId></exclusion></exclusions></dependency><!--<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.4.0</version></dependency>--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.4.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hbase-1.4_2.12</artifactId><version>1.13.6</version></dependency></dependencies></project>
二、HBase中建表:
create 'hbasetable','family1','family2','family3','family4'
三、在一台服务器上开启nc
nc -lk 9999
四、运行,demo程序
package cn.edu.tju;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;import java.util.UUID;public class FlinkHBase3 {
//nc 服务器地址private static String HOST_NAME = "xx.xx.xx.xx";private static int PORT = 9999;private static String DELIMITER ="\n";public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> socketDataInfo = env.socketTextStream(HOST_NAME, PORT, DELIMITER);SingleOutputStreamOperator<DataInfo> dataInfoStream = socketDataInfo.map(new MapFunction<String, DataInfo>() {@Overridepublic DataInfo map(String value) throws Exception {String[] stringList = value.split(",");DataInfo dataInfo = new DataInfo(UUID.randomUUID().toString(), Long.parseLong(stringList[0]), stringList[1], Double.parseDouble(stringList[2]));return dataInfo;}});Table dataTable = tableEnv.fromDataStream(dataInfoStream,"rowkey,ts,info,val");tableEnv.createTemporaryView("dataTable", dataTable);// 这里要配自己HBase的zookeeper地址tableEnv.executeSql("CREATE TABLE flinkTable (\n" +" rowkey STRING,\n" +" family1 ROW<ts BIGINT, info STRING, val DOUBLE>,\n" +" PRIMARY KEY (rowkey) NOT ENFORCED\n" +") WITH (\n" +" 'connector' = 'hbase-1.4',\n" +" 'table-name' = 'hbasetable',\n" +" 'zookeeper.quorum' = 'xx.xx.xx.xx:2181'\n" +")");tableEnv.executeSql("INSERT INTO flinkTable " +"SELECT rowkey, ROW(ts,info,val) FROM dataTable");env.execute("HBaseFlinkJob");}public static class DataInfo{private String rowkey;private Long ts;private String info;private double val;public String getRowkey() {return rowkey;}public void setRowkey(String rowkey) {this.rowkey = rowkey;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts = ts;}public String getInfo() {return info;}public void setInfo(String info) {this.info = info;}public double getVal() {return val;}public void setVal(double val) {this.val = val;}@Overridepublic String toString() {return "DataInfo{" +"ts=" + ts +", info='" + info + '\'' +", val='" + val + '\'' +'}';}public DataInfo( String rowkey, Long ts, String info, double val) {this.rowkey = rowkey;this.ts = ts;this.info = info;this.val = val;}public DataInfo() {}}}
五、在nc窗口输入:
1689999832,dong,32.45
六、在HBase检查数据是否已经写入:
scan 'hbasetable'

相关文章:
flink: 将接收到的tcp文本流写入HBase
一、依赖: <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.o…...
SpringBoot集成knife4j
SpringBoot集成knife4j 1、什么是Knife4j2、SpringBoor整合Knife4j2.1、Knife4j配置方式12.2 配置方式二2.3、写注解2.4、效果 1、什么是Knife4j 在日常开发中,写接口文档是我们必不可少的,而Knife4j就是一个接口文档工具,可以看作是Swagger…...
Vue3之setup方法
Vue 3 的 setup 方法是 Vue Composition API 的一部分,用于组织和复用 Vue 组件的逻辑代码。Vue Composition API 允许您以更具响应性和函数式的方式来组织和复用 Vue 组件中的代码,特别是在处理复杂逻辑或跨组件共享逻辑时非常有用。 以下是关于 setup…...
MySQL常见索引及其创建
MySQL索引 在 MySQL 数据库中,常见的索引类型包括以下几种: 普通索引(Normal Index):最基本的索引类型,没有任何限制。唯一索引(Unique Index):要求索引列的值是唯一的…...
高效测量“芯”搭档 | ACM32激光测距仪应用方案
激光测距仪概述 激光测距仪是利用激光对目标的距离进行准确测定的仪器。激光测距仪在工作时向目标射出一束很细的激光,由光电元件接收目标反射的激光束,计时器测定激光束从发射到接收的时间,计算出从观测者到目标的距离。激光测距仪分为手持激…...
基于Hive大数据分析springboot为后端以及vue为前端的的民宿系
标题基于Hive大数据分析springboot为后端以及vue为前端的的民宿系 本文介绍了如何利用Hive进行大数据分析,并结合Spring Boot和Vue构建了一个民宿管理系统。该民民宿管理系统包含用户和管理员登陆注册的功能,发布下架酒店信息,模糊搜索,酒店详情信息展示,收藏以及对收藏的…...
pnpm、monorepo分包管理、多包管理、npm、vite、前端工程化、保姆级教程
浅尝pnpm monorepo 多包管理方案 💡tips: 创建pnpm monorope多包管理框架流程 初始化 mkdir taurus & cd taurus pnpm init创建基础文件 创建文件pnpm-workspace.yaml packages:- packages/**创建文件夹packages/ -packages/ -package.json -pnpm-workspace…...
vue3封装Element分页
配置当前页 配置每页条数 页面改变、每页条数改变都触发回调 封装分页 Pagination.vue <template><el-paginationbackgroundv-bind"$attrs":page-sizes"pageSizes"v-model:current-page"page"v-model:page-size"pageSize":t…...
真机 ARM64 架构转模拟器 ARM64 架构
本文字数:2051字 预计阅读时间:15分钟 01 需要转换架构的原因 老版 Mac 使用 Intel 芯片,是x86_64架构,相应地在老版 Mac 上运行的模拟器使用的也就是 x86_64架构。 由于模拟器的 x86_64 架构与真机的 arm64、armv7 等架构不冲突&…...
敏捷教练CSM认证考了有没有用,谁说了算?
敏捷教练CSM证书是近年来备受关注的一项证书,它被认为可以提升敏捷开发团队的管理能力和项目执行效率。然而,对于这个证书的价值和含金量,人们的观点却不尽相同。那么,CSM证书到底有没有用,谁来说了算呢? 首…...
Docker-Container
Docker ①什么是容器②为什么需要容器③容器的生命周期容器 OOM容器异常退出容器暂停 ④容器命令清单总览docker createdocker rundocker psdocker logsdocker attachdocker execdocker startdocker stopdocker restartdocker killdocker topdocker statsdocker container insp…...
下载安装anaconda和pytorch的详细方法,以及遇到的问题和解决办法
下载安装Anaconda 首先需要下载Anaconda,可以到官网Anaconda官网或者这里提供一个镜像网站去下载anaconda镜像网站 安装步骤可参考该文章:Anaconda安装步骤,本篇不再赘述 注意环境变量的配置,安装好Anaconda之后一定要在环境变量…...
2020年天津市二级分类土地利用数据(矢量)
天津市,位于华北平原海河五大支流汇流处,东临渤海,北依燕山。地势以平原和洼地为主,北部有低山丘陵,海拔由北向南逐渐下降,地貌总轮廓为西北高而东南低。天津有山地、丘陵和平原三种地形,平原约…...
设计模式——结构型——外观模式Facade
处理器类 public class Cpu {public void start() {System.out.println("处理器启动了...");} } 内存类 public class Memory {public void start() {System.out.println("内存启动了...");} } 硬盘类 public class Disk {public void start() {Syste…...
OpenGL的MVP矩阵理解
OpenGL的MVP矩阵理解 右手坐标系 右手坐标系与左手坐标系都是三维笛卡尔坐标系,他们唯一的不同在于z轴的方向,如下图,左边是左手坐标系,右边是右手坐标系 OpenGL中一般用的是右手坐标系 1.模型坐标系(Local Space&…...
前端超分辨率技术应用:图像质量提升与场景实践探索-设计篇
超分辨率! 引言 在数字化时代,图像质量对于用户体验的重要性不言而喻。随着显示技术的飞速发展,尤其是移动终端视网膜屏幕的广泛应用,用户对高分辨率、高质量图像的需求日益增长。然而,受限于网络流量、存储空间和图像…...
C++11入门手册第一节,学完直接上手Qt(共两节)
入门 hello.cpp #include <iostream>int main() { std::cout << "Hello Quick Reference\n"<<endl; return 0;} 编译运行 $ g hello.cpp -o hello$ ./helloHello Quick Reference 变量 int number 5; // 整数float f 0.95; //…...
Docker部署MinIO对象存储服务
1. 拉取MinIO镜像 # 下载镜像 docker pull minio/minio#查看镜像 docker images2. 创建目录 # 文件存储目录 mkdir -p /opt/minio/data# 配置文件 mkdir -p /opt/minio/config# 日志文件 mkdir -p /opt/minio/logs3. 创建Minio容器并运行 docker run \ -p 9000:9000 \ -p 90…...
基于Echarts的超市销售可视化分析系统(数据+程序+论文)
本论文旨在研究Python技术和ECharts可视化技术在超市销售数据分析系统中的应用。本系统通过对超市销售数据进行分析和可视化展示,帮助决策层更好地了解销售情况和趋势,进而做出更有针对性的决策。本系统主要包括数据处理、数据可视化和系统测试三个模块。…...
使用ai智能写作场景之gpt整理资料,如何ai智能写作整理资料
Ai智能写作助手:Ai智能整理资料小助手 Ai智能整理资料小助手可试用3天! 通俗的解释一下怎么用ChatGPT来进行资料整理: 搜寻并获取指定数量的特定领域文章: 想像你在和我说话一样,告诉我你想要多少篇关于某个话题的文…...
黑马Mybatis
Mybatis 表现层:页面展示 业务层:逻辑处理 持久层:持久数据化保存 在这里插入图片描述 Mybatis快速入门 指令的指南
在Ubuntu系统中,有时需要在系统启动时自动执行某些命令,特别是需要 sudo权限的指令。为了实现这一功能,可以使用多种方法,包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法,并提供…...
WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成
厌倦手动写WordPress文章?AI自动生成,效率提升10倍! 支持多语言、自动配图、定时发布,让内容创作更轻松! AI内容生成 → 不想每天写文章?AI一键生成高质量内容!多语言支持 → 跨境电商必备&am…...
EtherNet/IP转DeviceNet协议网关详解
一,设备主要功能 疆鸿智能JH-DVN-EIP本产品是自主研发的一款EtherNet/IP从站功能的通讯网关。该产品主要功能是连接DeviceNet总线和EtherNet/IP网络,本网关连接到EtherNet/IP总线中做为从站使用,连接到DeviceNet总线中做为从站使用。 在自动…...
python执行测试用例,allure报乱码且未成功生成报告
allure执行测试用例时显示乱码:‘allure’ �����ڲ����ⲿ���Ҳ���ǿ�&am…...
大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计
随着大语言模型(LLM)参数规模的增长,推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长,而KV缓存的内存消耗可能高达数十GB(例如Llama2-7B处理100K token时需50GB内存&a…...
Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...
认识CMake并使用CMake构建自己的第一个项目
1.CMake的作用和优势 跨平台支持:CMake支持多种操作系统和编译器,使用同一份构建配置可以在不同的环境中使用 简化配置:通过CMakeLists.txt文件,用户可以定义项目结构、依赖项、编译选项等,无需手动编写复杂的构建脚本…...
前端调试HTTP状态码
1xx(信息类状态码) 这类状态码表示临时响应,需要客户端继续处理请求。 100 Continue 服务器已收到请求的初始部分,客户端应继续发送剩余部分。 2xx(成功类状态码) 表示请求已成功被服务器接收、理解并处…...
轻量级Docker管理工具Docker Switchboard
简介 什么是 Docker Switchboard ? Docker Switchboard 是一个轻量级的 Web 应用程序,用于管理 Docker 容器。它提供了一个干净、用户友好的界面来启动、停止和监控主机上运行的容器,使其成为本地开发、家庭实验室或小型服务器设置的理想选择…...
