当前位置: 首页 > news >正文

flink: 将接收到的tcp文本流写入HBase

一、依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>pulsar-demo2</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><pulsar.version>2.8.0</pulsar.version><jackson.version>2.10.5</jackson.version><!--<jackson.version>2.6.7</jackson.version>--></properties><dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-all</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-kafka</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-spark</artifactId><version>${pulsar.version}</version><exclusions><exclusion><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId></exclusion></exclusions></dependency><!--<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.4.0</version></dependency>--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.4.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hbase-1.4_2.12</artifactId><version>1.13.6</version></dependency></dependencies></project>

二、HBase中建表:

create 'hbasetable','family1','family2','family3','family4'

三、在一台服务器上开启nc

nc -lk 9999

四、运行,demo程序

package cn.edu.tju;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;import java.util.UUID;public class FlinkHBase3 {
//nc 服务器地址private static String HOST_NAME = "xx.xx.xx.xx";private static int PORT = 9999;private static String DELIMITER ="\n";public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> socketDataInfo =  env.socketTextStream(HOST_NAME, PORT, DELIMITER);SingleOutputStreamOperator<DataInfo> dataInfoStream = socketDataInfo.map(new MapFunction<String, DataInfo>() {@Overridepublic DataInfo map(String value) throws Exception {String[] stringList = value.split(",");DataInfo dataInfo = new DataInfo(UUID.randomUUID().toString(), Long.parseLong(stringList[0]), stringList[1], Double.parseDouble(stringList[2]));return dataInfo;}});Table dataTable = tableEnv.fromDataStream(dataInfoStream,"rowkey,ts,info,val");tableEnv.createTemporaryView("dataTable", dataTable);// 这里要配自己HBase的zookeeper地址tableEnv.executeSql("CREATE TABLE flinkTable (\n" +" rowkey STRING,\n" +" family1 ROW<ts BIGINT, info STRING, val DOUBLE>,\n" +" PRIMARY KEY (rowkey) NOT ENFORCED\n" +") WITH (\n" +" 'connector' = 'hbase-1.4',\n" +" 'table-name' = 'hbasetable',\n" +" 'zookeeper.quorum' = 'xx.xx.xx.xx:2181'\n" +")");tableEnv.executeSql("INSERT INTO flinkTable " +"SELECT rowkey, ROW(ts,info,val) FROM dataTable");env.execute("HBaseFlinkJob");}public static class DataInfo{private String rowkey;private Long ts;private String info;private double val;public String getRowkey() {return rowkey;}public void setRowkey(String rowkey) {this.rowkey = rowkey;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts = ts;}public String getInfo() {return info;}public void setInfo(String info) {this.info = info;}public double getVal() {return val;}public void setVal(double val) {this.val = val;}@Overridepublic String toString() {return "DataInfo{" +"ts=" + ts +", info='" + info + '\'' +", val='" + val + '\'' +'}';}public DataInfo( String rowkey, Long ts, String info, double val) {this.rowkey = rowkey;this.ts = ts;this.info = info;this.val = val;}public DataInfo() {}}}

五、在nc窗口输入:

1689999832,dong,32.45

六、在HBase检查数据是否已经写入:

scan 'hbasetable'

在这里插入图片描述

相关文章:

flink: 将接收到的tcp文本流写入HBase

一、依赖&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.o…...

SpringBoot集成knife4j

SpringBoot集成knife4j 1、什么是Knife4j2、SpringBoor整合Knife4j2.1、Knife4j配置方式12.2 配置方式二2.3、写注解2.4、效果 1、什么是Knife4j 在日常开发中&#xff0c;写接口文档是我们必不可少的&#xff0c;而Knife4j就是一个接口文档工具&#xff0c;可以看作是Swagger…...

Vue3之setup方法

Vue 3 的 setup 方法是 Vue Composition API 的一部分&#xff0c;用于组织和复用 Vue 组件的逻辑代码。Vue Composition API 允许您以更具响应性和函数式的方式来组织和复用 Vue 组件中的代码&#xff0c;特别是在处理复杂逻辑或跨组件共享逻辑时非常有用。 以下是关于 setup…...

MySQL常见索引及其创建

MySQL索引 在 MySQL 数据库中&#xff0c;常见的索引类型包括以下几种&#xff1a; 普通索引&#xff08;Normal Index&#xff09;&#xff1a;最基本的索引类型&#xff0c;没有任何限制。唯一索引&#xff08;Unique Index&#xff09;&#xff1a;要求索引列的值是唯一的…...

高效测量“芯”搭档 | ACM32激光测距仪应用方案

激光测距仪概述 激光测距仪是利用激光对目标的距离进行准确测定的仪器。激光测距仪在工作时向目标射出一束很细的激光&#xff0c;由光电元件接收目标反射的激光束&#xff0c;计时器测定激光束从发射到接收的时间&#xff0c;计算出从观测者到目标的距离。激光测距仪分为手持激…...

基于Hive大数据分析springboot为后端以及vue为前端的的民宿系

标题基于Hive大数据分析springboot为后端以及vue为前端的的民宿系 本文介绍了如何利用Hive进行大数据分析,并结合Spring Boot和Vue构建了一个民宿管理系统。该民民宿管理系统包含用户和管理员登陆注册的功能,发布下架酒店信息,模糊搜索,酒店详情信息展示,收藏以及对收藏的…...

pnpm、monorepo分包管理、多包管理、npm、vite、前端工程化、保姆级教程

浅尝pnpm monorepo 多包管理方案 &#x1f4a1;tips: 创建pnpm monorope多包管理框架流程 初始化 mkdir taurus & cd taurus pnpm init创建基础文件 创建文件pnpm-workspace.yaml packages:- packages/**创建文件夹packages/ -packages/ -package.json -pnpm-workspace…...

vue3封装Element分页

配置当前页 配置每页条数 页面改变、每页条数改变都触发回调 封装分页 Pagination.vue <template><el-paginationbackgroundv-bind"$attrs":page-sizes"pageSizes"v-model:current-page"page"v-model:page-size"pageSize":t…...

真机 ARM64 架构转模拟器 ARM64 架构

本文字数&#xff1a;2051字 预计阅读时间&#xff1a;15分钟 01 需要转换架构的原因 老版 Mac 使用 Intel 芯片&#xff0c;是x86_64架构&#xff0c;相应地在老版 Mac 上运行的模拟器使用的也就是 x86_64架构。 由于模拟器的 x86_64 架构与真机的 arm64、armv7 等架构不冲突&…...

敏捷教练CSM认证考了有没有用,谁说了算?

敏捷教练CSM证书是近年来备受关注的一项证书&#xff0c;它被认为可以提升敏捷开发团队的管理能力和项目执行效率。然而&#xff0c;对于这个证书的价值和含金量&#xff0c;人们的观点却不尽相同。那么&#xff0c;CSM证书到底有没有用&#xff0c;谁来说了算呢&#xff1f; 首…...

Docker-Container

Docker ①什么是容器②为什么需要容器③容器的生命周期容器 OOM容器异常退出容器暂停 ④容器命令清单总览docker createdocker rundocker psdocker logsdocker attachdocker execdocker startdocker stopdocker restartdocker killdocker topdocker statsdocker container insp…...

下载安装anaconda和pytorch的详细方法,以及遇到的问题和解决办法

下载安装Anaconda 首先需要下载Anaconda&#xff0c;可以到官网Anaconda官网或者这里提供一个镜像网站去下载anaconda镜像网站 安装步骤可参考该文章&#xff1a;Anaconda安装步骤&#xff0c;本篇不再赘述 注意环境变量的配置&#xff0c;安装好Anaconda之后一定要在环境变量…...

2020年天津市二级分类土地利用数据(矢量)

天津市&#xff0c;位于华北平原海河五大支流汇流处&#xff0c;东临渤海&#xff0c;北依燕山。地势以平原和洼地为主&#xff0c;北部有低山丘陵&#xff0c;海拔由北向南逐渐下降&#xff0c;地貌总轮廓为西北高而东南低。天津有山地、丘陵和平原三种地形&#xff0c;平原约…...

设计模式——结构型——外观模式Facade

处理器类 public class Cpu {public void start() {System.out.println("处理器启动了...");} } 内存类 public class Memory {public void start() {System.out.println("内存启动了...");} } 硬盘类 public class Disk {public void start() {Syste…...

OpenGL的MVP矩阵理解

OpenGL的MVP矩阵理解 右手坐标系 右手坐标系与左手坐标系都是三维笛卡尔坐标系&#xff0c;他们唯一的不同在于z轴的方向&#xff0c;如下图&#xff0c;左边是左手坐标系&#xff0c;右边是右手坐标系 OpenGL中一般用的是右手坐标系 1.模型坐标系&#xff08;Local Space&…...

前端超分辨率技术应用:图像质量提升与场景实践探索-设计篇

超分辨率&#xff01; 引言 在数字化时代&#xff0c;图像质量对于用户体验的重要性不言而喻。随着显示技术的飞速发展&#xff0c;尤其是移动终端视网膜屏幕的广泛应用&#xff0c;用户对高分辨率、高质量图像的需求日益增长。然而&#xff0c;受限于网络流量、存储空间和图像…...

C++11入门手册第一节,学完直接上手Qt(共两节)

入门 hello.cpp #include <iostream>int main() { std::cout << "Hello Quick Reference\n"<<endl; return 0;} 编译运行 $ g hello.cpp -o hello$ ./hello​Hello Quick Reference 变量 int number 5; // 整数float f 0.95; //…...

Docker部署MinIO对象存储服务

1. 拉取MinIO镜像 # 下载镜像 docker pull minio/minio#查看镜像 docker images2. 创建目录 # 文件存储目录 mkdir -p /opt/minio/data# 配置文件 mkdir -p /opt/minio/config# 日志文件 mkdir -p /opt/minio/logs3. 创建Minio容器并运行 docker run \ -p 9000:9000 \ -p 90…...

基于Echarts的超市销售可视化分析系统(数据+程序+论文)

本论文旨在研究Python技术和ECharts可视化技术在超市销售数据分析系统中的应用。本系统通过对超市销售数据进行分析和可视化展示&#xff0c;帮助决策层更好地了解销售情况和趋势&#xff0c;进而做出更有针对性的决策。本系统主要包括数据处理、数据可视化和系统测试三个模块。…...

使用ai智能写作场景之gpt整理资料,如何ai智能写作整理资料

Ai智能写作助手&#xff1a;Ai智能整理资料小助手 Ai智能整理资料小助手可试用3天&#xff01; 通俗的解释一下怎么用ChatGPT来进行资料整理&#xff1a; 搜寻并获取指定数量的特定领域文章&#xff1a; 想像你在和我说话一样&#xff0c;告诉我你想要多少篇关于某个话题的文…...

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周&#xff0c;有很多同学在写期末Java web作业时&#xff0c;运行tomcat出现乱码问题&#xff0c;经过多次解决与研究&#xff0c;我做了如下整理&#xff1a; 原因&#xff1a; IDEA本身编码与tomcat的编码与Windows编码不同导致&#xff0c;Windows 系统控制台…...

设计模式和设计原则回顾

设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...

【入坑系列】TiDB 强制索引在不同库下不生效问题

文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

2024年赣州旅游投资集团社会招聘笔试真

2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

Frozen-Flask :将 Flask 应用“冻结”为静态文件

Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是&#xff1a;将一个 Flask Web 应用生成成纯静态 HTML 文件&#xff0c;从而可以部署到静态网站托管服务上&#xff0c;如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec&#xff1f; IPsec VPN 5.1 IPsec传输模式&#xff08;Transport Mode&#xff09; 5.2 IPsec隧道模式&#xff08;Tunne…...

JavaScript基础-API 和 Web API

在学习JavaScript的过程中&#xff0c;理解API&#xff08;应用程序接口&#xff09;和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能&#xff0c;使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...

Web中间件--tomcat学习

Web中间件–tomcat Java虚拟机详解 什么是JAVA虚拟机 Java虚拟机是一个抽象的计算机&#xff0c;它可以执行Java字节码。Java虚拟机是Java平台的一部分&#xff0c;Java平台由Java语言、Java API和Java虚拟机组成。Java虚拟机的主要作用是将Java字节码转换为机器代码&#x…...

day36-多路IO复用

一、基本概念 &#xff08;服务器多客户端模型&#xff09; 定义&#xff1a;单线程或单进程同时监测若干个文件描述符是否可以执行IO操作的能力 作用&#xff1a;应用程序通常需要处理来自多条事件流中的事件&#xff0c;比如我现在用的电脑&#xff0c;需要同时处理键盘鼠标…...