当前位置: 首页 > news >正文

15_基于Flink将pulsar数据写入到ClickHouse

3.8.基于Flink将数据写入到ClickHouse

编写Flink完成数据写入到ClickHouse操作, 后续基于CK完成指标统计操作

3.8.1.ClickHouse基本介绍

ClickHouse 是俄罗斯的Yandex于2016年开源的列式存储数据库(DBMS),使用C++语言编写,主要用于在线分析处理查询(OLAP),能够使用SQL查询实时生成分析数据报告。
在这里插入图片描述
结论: ClickHouse像很多OLAP数据库一样,单表查询速度由于关联查询,而且ClickHouse的两者差距更为明显。

3.8.2.ClickHouse安装步骤

本项目中,我们仅需要安装单机测试版本即可使用(node2安装), 在实际生产中, 大家可以直接将分布式集群版本

  • 1-设置yum源
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
  • 2- 直接基于yum安装即可
sudo yum install clickhouse-server clickhouse-client
  • 3-修改配置文件
vim /etc/clickhouse-server/config.xml 
修改178行: 打开这一行的注释 
<listen_host>::</listen_host>

在这里插入图片描述

  • 4-启动clickhouse的server
systemctl start clickhouse-server 
停止:
systemctl stop clickhouse-server 
重启
systemctl restart clickhouse-server
  • 5-进入客户端
    在这里插入图片描述

3.8.3.在ClickHouse中创建目标表

create database itcast_ck; 
use itcast_ck; 
create table itcast_ck.itcast_ck_ems( 
id int, 
sid varchar(128), 
ip varchar(128), 
create_time varchar(128), 
session_id varchar(128), 
yearInfo varchar(128), 
monthInfo varchar(128), 
dayInfo varchar(128), 
hourInfo varchar(128), 
seo_source varchar(128), 
area varchar(128), 
origin_channel varchar(128), 
msg_count int(128), 
from_url varchar(128), 
PRIMARY KEY (`id`) 
) ENGINE=ReplacingMergeTree();

3.8.4.编写Flink代码完成写入到CK操作

import com.itheima.pojo.PulsarTopicPojo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.types.Row;import java.sql.Types;
import java.util.Properties;// 基于Flink完成读取Pulsar中数据将消息数据写入到clickhouse中
public class ItcastFlinkToClickHouse {public static void main(String[] args) throws Exception {//1. 创建Flinnk流式处理核心环境类对象 和 Table API 核心环境类对象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2. 添加Source组件, 从Pulsar中读取消息数据Properties props = new Properties();props.setProperty("topic","persistent://public/default/itcast_ems_tab");props.setProperty("partition.discovery.interval-millis","5000");FlinkPulsarSource<PulsarTopicPojo> pulsarSource = new FlinkPulsarSource<PulsarTopicPojo>("pulsar://node1:6650,node2:6650,node3:6650","http://node1:8080,node2:8080,node3:8080",JsonDeser.of(PulsarTopicPojo.class),props);//2.1 设置pulsarSource组件在消费数据的时候, 默认从什么位置开始消费pulsarSource.setStartFromLatest();DataStreamSource<PulsarTopicPojo> dataStreamSource = env.addSource(pulsarSource);//2.2  转换数据操作: 将 PulsarTopicPojo 转换为ROW对象SingleOutputStreamOperator<Row> rowDataSteam = dataStreamSource.map(new MapFunction<PulsarTopicPojo, Row>() {@Overridepublic Row map(PulsarTopicPojo pulsarTopicPojo) throws Exception {return Row.of(pulsarTopicPojo.getId(), pulsarTopicPojo.getSid(), pulsarTopicPojo.getIp(), pulsarTopicPojo.getCreate_time(),pulsarTopicPojo.getSession_id(), pulsarTopicPojo.getYearInfo(), pulsarTopicPojo.getMonthInfo(), pulsarTopicPojo.getDayInfo(),pulsarTopicPojo.getHourInfo(), pulsarTopicPojo.getSeo_source(), pulsarTopicPojo.getArea(), pulsarTopicPojo.getOrigin_channel(),pulsarTopicPojo.getMsg_count(), pulsarTopicPojo.getFrom_url());}});//2.3: 设置sink操作写入到CK操作String insertSql = "insert into itcast_ck.itcast_ck_ems (id,sid,ip,create_time,session_id,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";JDBCAppendTableSink tableSink = JDBCAppendTableSink.builder().setDrivername("ru.yandex.clickhouse.ClickHouseDriver").setDBUrl("jdbc:clickhouse://node2:8123/itcast_ck").setQuery(insertSql).setBatchSize(1).setParameterTypes(Types.INTEGER,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.INTEGER,Types.VARCHAR).build();tableSink.emitDataStream(rowDataSteam);//3. 提交执行env.execute("itcast_to_ck");}
}

3.9.HBase对接Phoenix实现即席查询

3.9.1.Phoenix安装操作

Phoenix是属于apache旗下的一款基于hbase的工具, 此工具提供一种全新的方式来操作hbase中数据(SQL),
同时Phoenix对hbase进行大量的优化工作, 能够让我们更加有效的操作hbase

整个安装操作, 大家可以参考资料中安装手册, 进行安装即可

3.9.2.在Phoenix中创建表

create view "itcast_h_ems" ( 
"id" integer primary key, 
"f1"."sid" varchar, 
"f1"."ip" varchar, 
"f1"."create_time" varchar, 
"f1"."session_id" varchar, 
"f1"."yearInfo" varchar, 
"f1"."monthInfo" varchar, 
"f1"."dayInfo" varchar, 
"f1"."hourInfo" varchar, 
"f1"."seo_source" varchar, 
"f1"."area" varchar, 
"f1"."origin_channel" varchar, 
"f1"."msg_count" integer, 
"f1"."from_url" varchar 
);

3.9.3.在Phoenix中类型说明

在这里插入图片描述

相关文章:

15_基于Flink将pulsar数据写入到ClickHouse

3.8.基于Flink将数据写入到ClickHouse 编写Flink完成数据写入到ClickHouse操作, 后续基于CK完成指标统计操作 3.8.1.ClickHouse基本介绍 ClickHouse 是俄罗斯的Yandex于2016年开源的列式存储数据库&#xff08;DBMS&#xff09;&#xff0c;使用C语言编写&#xff0c;主要用…...

Pycharm如何打断点进行调试?

断点调试&#xff0c;是编写程序中一个很重要的步骤&#xff0c;有些简单的程序使用print语句就可看出问题&#xff0c;而比较复杂的程序&#xff0c;函数和变量较多的情况下&#xff0c;这时候就需要打断点了&#xff0c;更容易定位问题。 一、添加断点 在代码的行标前面&…...

微服务02-docker

1、Docker架构 1.1 镜像和容器 Docker中有几个重要的概念&#xff1a; 镜像&#xff08;Image&#xff09;&#xff1a;Docker将应用程序及其所需的依赖、函数库、环境、配置等文件打包在一起&#xff0c;称为镜像。Docker镜像是用于创建 Docker 容器的模板 。就像面向对象编…...

CSS:盒子模型 与 多种横向布局方法

目录 盒子模型块级盒子内联级盒子内联块级盒子弹性盒子display 改变模型区域划分text 内容区padding 填充区border 边框区margin 外边距直接设置盒子大小 布局横向布局方法一 float 浮起来方法二 内联块级元素实现方法三 弹性盒子模型 盒子模型 块级盒子 独占一行&#xff0c…...

用node.js搭建一个视频推流服务

由于业务中有不少视频使用的场景&#xff0c;今天来说说如何使用node完成一个视频推流服务。 先看看效果&#xff1a; 这里的播放的视频是一个多个Partial Content组合起来的&#xff0c;每个Partial Content大小是1M。 一&#xff0c;项目搭建 &#xff08;1&#xff09;初…...

【SpringCloud】Feign远程调用

先来看我们以前利用RestTemplate发起远程调用的代码&#xff1a; String url "http://userservice/user/" order.getUserId(); User user restTemplate.getForObject(url, User.class);存在下面的问题&#xff1a; • 代码可读性差&#xff0c;编程体验不统一 • …...

集合Collection-List-ArrayList学习

一、集合 集合是数据容器。相较于数组集合具有以下几个特点&#xff1a; 数组一旦创建&#xff0c;长度不可改变。集合的长度会自动扩容。集合具有很多数组没有的功能函数API数组元素的存储特点单一&#xff0c;不同的集合有不同的存储特点。 1. Collection顶层接口 Collect…...

mybatispuls代码生成器

引入依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.…...

【设计模式】-代理模式

在软件开发中&#xff0c;经常遇到需要对某个对象进行控制或者监控的场景。而直接修改对象的代码可能使代码变得复杂且难以维护。这时&#xff0c;使用代理模式&#xff08;Proxy Pattern&#xff09;可以很好地解决这个问题。 代理模式是一种结构型设计模式&#xff0c;通过引…...

爬虫ip池越大越好吗?

作为一名资深的程序员&#xff0c;今天我要给大家分享一些关于爬虫ip池的知识。关于ip代理池的问题&#xff0c;答案是肯定的&#xff0c;池子越大越好。下面跟我一起来盘点一下ip池大的好处吧&#xff01; 1、提高稳定性 爬虫ip池越大&#xff0c;意味着拥有更多可用的爬虫ip…...

目标检测常用的数据集格式

在目标检测领域&#xff0c;有三种常用的数据集&#xff1a; 数据集标注文件格式bbox格式vocxmlxmin, ymin, xmax, ymax:bbox左上角(xmin, ymin)和右下角(xmax, ymax)的坐标cocojsonx, y, w, h:bbox左上角坐标(x, y)以及宽(w)和高(h)yolotxtxcenter, ycenter, w, h:bbox的中心…...

chrome插件开发实例03-使用 chrome.storage API永久保存数据

目录 防止数据丢失 使用chrome.storage API 功能 功能演示 源代码 manifest.json popup.html...

Segment Anything(SAM) 计算过程

给定输入图像 I ∈ R 3 H W I \in R^{3 \times H \times W} I∈R3HW。给定需要的prompts&#xff1a; M ∈ R 1 H W M \in R^{1 \times H \times W} M∈R1HW&#xff0c;代表图片的前背景信息。 P ∈ R N 2 P \in R^{N \times 2} P∈RN2&#xff0c;其中 N N N 是点的个数…...

Nacos配置文件读取源码解析

Nacos配置文件读取 本篇文章是探究&#xff0c;springboot启动时nacos是如何将配置中心的配置读取到springboot环境中的 PropertySourceLocator org.springframework.cloud.bootstrap.config.PropertySourceLocator 是 springcloud 定义的一个顶级接口&#xff0c;用来定义所…...

Linux0.11内核源码解析-fcntl.c/iotcl.c/stat.c

fcntl fcntl.c实现了文件控制系统调用fcntl和两个文件句柄描述符的复制系统调用dup()和dup2()。 dup返回当前值最小的未用句柄&#xff0c;dup2返回指定新句柄的数值&#xff0c;句柄的复制操作主要用在文件的标准输入、输出重定向和管道方面。 dupfd 复制文件句柄&#xff…...

OpenStack简介

OpenStack简介 目录 OpenStack简介 1、云计算模式2、云计算 虚拟化 openstack之间的关系&#xff1f;3、OpenStack 中有哪些组件&#xff1f;4、计算节点负责虚拟机运行5、网络节点负责对外网络与内网之间的通信 5.1 网络节点仅包含Neutron服务5.2 网络节点包含三个网络端口6、…...

二分法的应用

文章目录 什么是二分法&#x1f3ae;二分查找的优先级二分查找的步骤&#x1f4a5;图解演示&#x1f9e9; 代码演示&#x1fad5;python程序实现&#x1f408;‍⬛C程序实现&#x1f415;‍&#x1f9ba;C程序实现&#x1f42f;Java程序实现&#x1f433; 非常规类二分查找&…...

ChatGPT在大规模数据处理和信息管理中的应用如何?

ChatGPT作为一种强大的自然语言处理模型&#xff0c;在大规模数据处理和信息管理领域有着广泛的应用潜力。它可以利用其文本生成、文本理解和问答等能力&#xff0c;为数据分析、信息提取、知识管理等任务提供智能化的解决方案。以下将详细介绍ChatGPT在大规模数据处理和信息管…...

【算法篇C++实现】五大常规算法

文章目录 &#x1f680;一、分治法⛳&#xff08;一&#xff09;算法思想⛳&#xff08;二&#xff09;相关代码 &#x1f680;二、动态规划算法⛳&#xff08;一&#xff09;算法思想⛳&#xff08;二&#xff09;相关代码 &#x1f680;三、回溯算法⛳&#xff08;一&#xf…...

MySQL和钉钉单据接口对接

MySQL和钉钉单据接口对接 数据源系统:钉钉 钉钉&#xff08;DingTalk&#xff09;是阿里巴巴集团打造的企业级智能移动办公平台&#xff0c;是数字经济时代的企业组织协同办公和应用开发平台。钉钉将IM即时沟通、钉钉文档、钉闪会、钉盘、Teambition、OA审批、智能人事、钉工牌…...

变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析

一、变量声明设计&#xff1a;let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性&#xff0c;这种设计体现了语言的核心哲学。以下是深度解析&#xff1a; 1.1 设计理念剖析 安全优先原则&#xff1a;默认不可变强制开发者明确声明意图 let x 5; …...

生成xcframework

打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式&#xff0c;可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...

(十)学生端搭建

本次旨在将之前的已完成的部分功能进行拼装到学生端&#xff0c;同时完善学生端的构建。本次工作主要包括&#xff1a; 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...

三维GIS开发cesium智慧地铁教程(5)Cesium相机控制

一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点&#xff1a; 路径验证&#xff1a;确保相对路径.…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望

文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例&#xff1a;使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例&#xff1a;使用OpenAI GPT-3进…...

[10-3]软件I2C读写MPU6050 江协科技学习笔记(16个知识点)

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16...

鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/

使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题&#xff1a;docker pull 失败 网络不同&#xff0c;需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...

深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南

&#x1f680; C extern 关键字深度解析&#xff1a;跨文件编程的终极指南 &#x1f4c5; 更新时间&#xff1a;2025年6月5日 &#x1f3f7;️ 标签&#xff1a;C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言&#x1f525;一、extern 是什么&#xff1f;&…...

MySQL 知识小结(一)

一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库&#xff0c;分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷&#xff0c;但是文件存放起来数据比较冗余&#xff0c;用二进制能够更好管理咱们M…...

【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案

目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后&#xff0c;迭代器会失效&#xff0c;因为顺序迭代器在内存中是连续存储的&#xff0c;元素删除后&#xff0c;后续元素会前移。 但一些场景中&#xff0c;我们又需要在执行删除操作…...