15_基于Flink将pulsar数据写入到ClickHouse
3.8.基于Flink将数据写入到ClickHouse
编写Flink完成数据写入到ClickHouse操作, 后续基于CK完成指标统计操作
3.8.1.ClickHouse基本介绍
ClickHouse 是俄罗斯的Yandex于2016年开源的列式存储数据库(DBMS),使用C++语言编写,主要用于在线分析处理查询(OLAP),能够使用SQL查询实时生成分析数据报告。

结论: ClickHouse像很多OLAP数据库一样,单表查询速度由于关联查询,而且ClickHouse的两者差距更为明显。
3.8.2.ClickHouse安装步骤
本项目中,我们仅需要安装单机测试版本即可使用(node2安装), 在实际生产中, 大家可以直接将分布式集群版本
- 1-设置yum源
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
- 2- 直接基于yum安装即可
sudo yum install clickhouse-server clickhouse-client
- 3-修改配置文件
vim /etc/clickhouse-server/config.xml
修改178行: 打开这一行的注释
<listen_host>::</listen_host>

- 4-启动clickhouse的server
systemctl start clickhouse-server
停止:
systemctl stop clickhouse-server
重启
systemctl restart clickhouse-server
- 5-进入客户端

3.8.3.在ClickHouse中创建目标表
create database itcast_ck;
use itcast_ck;
create table itcast_ck.itcast_ck_ems(
id int,
sid varchar(128),
ip varchar(128),
create_time varchar(128),
session_id varchar(128),
yearInfo varchar(128),
monthInfo varchar(128),
dayInfo varchar(128),
hourInfo varchar(128),
seo_source varchar(128),
area varchar(128),
origin_channel varchar(128),
msg_count int(128),
from_url varchar(128),
PRIMARY KEY (`id`)
) ENGINE=ReplacingMergeTree();
3.8.4.编写Flink代码完成写入到CK操作
import com.itheima.pojo.PulsarTopicPojo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.types.Row;import java.sql.Types;
import java.util.Properties;// 基于Flink完成读取Pulsar中数据将消息数据写入到clickhouse中
public class ItcastFlinkToClickHouse {public static void main(String[] args) throws Exception {//1. 创建Flinnk流式处理核心环境类对象 和 Table API 核心环境类对象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2. 添加Source组件, 从Pulsar中读取消息数据Properties props = new Properties();props.setProperty("topic","persistent://public/default/itcast_ems_tab");props.setProperty("partition.discovery.interval-millis","5000");FlinkPulsarSource<PulsarTopicPojo> pulsarSource = new FlinkPulsarSource<PulsarTopicPojo>("pulsar://node1:6650,node2:6650,node3:6650","http://node1:8080,node2:8080,node3:8080",JsonDeser.of(PulsarTopicPojo.class),props);//2.1 设置pulsarSource组件在消费数据的时候, 默认从什么位置开始消费pulsarSource.setStartFromLatest();DataStreamSource<PulsarTopicPojo> dataStreamSource = env.addSource(pulsarSource);//2.2 转换数据操作: 将 PulsarTopicPojo 转换为ROW对象SingleOutputStreamOperator<Row> rowDataSteam = dataStreamSource.map(new MapFunction<PulsarTopicPojo, Row>() {@Overridepublic Row map(PulsarTopicPojo pulsarTopicPojo) throws Exception {return Row.of(pulsarTopicPojo.getId(), pulsarTopicPojo.getSid(), pulsarTopicPojo.getIp(), pulsarTopicPojo.getCreate_time(),pulsarTopicPojo.getSession_id(), pulsarTopicPojo.getYearInfo(), pulsarTopicPojo.getMonthInfo(), pulsarTopicPojo.getDayInfo(),pulsarTopicPojo.getHourInfo(), pulsarTopicPojo.getSeo_source(), pulsarTopicPojo.getArea(), pulsarTopicPojo.getOrigin_channel(),pulsarTopicPojo.getMsg_count(), pulsarTopicPojo.getFrom_url());}});//2.3: 设置sink操作写入到CK操作String insertSql = "insert into itcast_ck.itcast_ck_ems (id,sid,ip,create_time,session_id,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";JDBCAppendTableSink tableSink = JDBCAppendTableSink.builder().setDrivername("ru.yandex.clickhouse.ClickHouseDriver").setDBUrl("jdbc:clickhouse://node2:8123/itcast_ck").setQuery(insertSql).setBatchSize(1).setParameterTypes(Types.INTEGER,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.INTEGER,Types.VARCHAR).build();tableSink.emitDataStream(rowDataSteam);//3. 提交执行env.execute("itcast_to_ck");}
}
3.9.HBase对接Phoenix实现即席查询
3.9.1.Phoenix安装操作
Phoenix是属于apache旗下的一款基于hbase的工具, 此工具提供一种全新的方式来操作hbase中数据(SQL),
同时Phoenix对hbase进行大量的优化工作, 能够让我们更加有效的操作hbase
整个安装操作, 大家可以参考资料中安装手册, 进行安装即可
3.9.2.在Phoenix中创建表
create view "itcast_h_ems" (
"id" integer primary key,
"f1"."sid" varchar,
"f1"."ip" varchar,
"f1"."create_time" varchar,
"f1"."session_id" varchar,
"f1"."yearInfo" varchar,
"f1"."monthInfo" varchar,
"f1"."dayInfo" varchar,
"f1"."hourInfo" varchar,
"f1"."seo_source" varchar,
"f1"."area" varchar,
"f1"."origin_channel" varchar,
"f1"."msg_count" integer,
"f1"."from_url" varchar
);
3.9.3.在Phoenix中类型说明

相关文章:
15_基于Flink将pulsar数据写入到ClickHouse
3.8.基于Flink将数据写入到ClickHouse 编写Flink完成数据写入到ClickHouse操作, 后续基于CK完成指标统计操作 3.8.1.ClickHouse基本介绍 ClickHouse 是俄罗斯的Yandex于2016年开源的列式存储数据库(DBMS),使用C语言编写,主要用…...
Pycharm如何打断点进行调试?
断点调试,是编写程序中一个很重要的步骤,有些简单的程序使用print语句就可看出问题,而比较复杂的程序,函数和变量较多的情况下,这时候就需要打断点了,更容易定位问题。 一、添加断点 在代码的行标前面&…...
微服务02-docker
1、Docker架构 1.1 镜像和容器 Docker中有几个重要的概念: 镜像(Image):Docker将应用程序及其所需的依赖、函数库、环境、配置等文件打包在一起,称为镜像。Docker镜像是用于创建 Docker 容器的模板 。就像面向对象编…...
CSS:盒子模型 与 多种横向布局方法
目录 盒子模型块级盒子内联级盒子内联块级盒子弹性盒子display 改变模型区域划分text 内容区padding 填充区border 边框区margin 外边距直接设置盒子大小 布局横向布局方法一 float 浮起来方法二 内联块级元素实现方法三 弹性盒子模型 盒子模型 块级盒子 独占一行,…...
用node.js搭建一个视频推流服务
由于业务中有不少视频使用的场景,今天来说说如何使用node完成一个视频推流服务。 先看看效果: 这里的播放的视频是一个多个Partial Content组合起来的,每个Partial Content大小是1M。 一,项目搭建 (1)初…...
【SpringCloud】Feign远程调用
先来看我们以前利用RestTemplate发起远程调用的代码: String url "http://userservice/user/" order.getUserId(); User user restTemplate.getForObject(url, User.class);存在下面的问题: • 代码可读性差,编程体验不统一 • …...
集合Collection-List-ArrayList学习
一、集合 集合是数据容器。相较于数组集合具有以下几个特点: 数组一旦创建,长度不可改变。集合的长度会自动扩容。集合具有很多数组没有的功能函数API数组元素的存储特点单一,不同的集合有不同的存储特点。 1. Collection顶层接口 Collect…...
mybatispuls代码生成器
引入依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.…...
【设计模式】-代理模式
在软件开发中,经常遇到需要对某个对象进行控制或者监控的场景。而直接修改对象的代码可能使代码变得复杂且难以维护。这时,使用代理模式(Proxy Pattern)可以很好地解决这个问题。 代理模式是一种结构型设计模式,通过引…...
爬虫ip池越大越好吗?
作为一名资深的程序员,今天我要给大家分享一些关于爬虫ip池的知识。关于ip代理池的问题,答案是肯定的,池子越大越好。下面跟我一起来盘点一下ip池大的好处吧! 1、提高稳定性 爬虫ip池越大,意味着拥有更多可用的爬虫ip…...
目标检测常用的数据集格式
在目标检测领域,有三种常用的数据集: 数据集标注文件格式bbox格式vocxmlxmin, ymin, xmax, ymax:bbox左上角(xmin, ymin)和右下角(xmax, ymax)的坐标cocojsonx, y, w, h:bbox左上角坐标(x, y)以及宽(w)和高(h)yolotxtxcenter, ycenter, w, h:bbox的中心…...
chrome插件开发实例03-使用 chrome.storage API永久保存数据
目录 防止数据丢失 使用chrome.storage API 功能 功能演示 源代码 manifest.json popup.html...
Segment Anything(SAM) 计算过程
给定输入图像 I ∈ R 3 H W I \in R^{3 \times H \times W} I∈R3HW。给定需要的prompts: M ∈ R 1 H W M \in R^{1 \times H \times W} M∈R1HW,代表图片的前背景信息。 P ∈ R N 2 P \in R^{N \times 2} P∈RN2,其中 N N N 是点的个数…...
Nacos配置文件读取源码解析
Nacos配置文件读取 本篇文章是探究,springboot启动时nacos是如何将配置中心的配置读取到springboot环境中的 PropertySourceLocator org.springframework.cloud.bootstrap.config.PropertySourceLocator 是 springcloud 定义的一个顶级接口,用来定义所…...
Linux0.11内核源码解析-fcntl.c/iotcl.c/stat.c
fcntl fcntl.c实现了文件控制系统调用fcntl和两个文件句柄描述符的复制系统调用dup()和dup2()。 dup返回当前值最小的未用句柄,dup2返回指定新句柄的数值,句柄的复制操作主要用在文件的标准输入、输出重定向和管道方面。 dupfd 复制文件句柄ÿ…...
OpenStack简介
OpenStack简介 目录 OpenStack简介 1、云计算模式2、云计算 虚拟化 openstack之间的关系?3、OpenStack 中有哪些组件?4、计算节点负责虚拟机运行5、网络节点负责对外网络与内网之间的通信 5.1 网络节点仅包含Neutron服务5.2 网络节点包含三个网络端口6、…...
二分法的应用
文章目录 什么是二分法🎮二分查找的优先级二分查找的步骤💥图解演示🧩 代码演示🫕python程序实现🐈⬛C程序实现🐕🦺C程序实现🐯Java程序实现🐳 非常规类二分查找&…...
ChatGPT在大规模数据处理和信息管理中的应用如何?
ChatGPT作为一种强大的自然语言处理模型,在大规模数据处理和信息管理领域有着广泛的应用潜力。它可以利用其文本生成、文本理解和问答等能力,为数据分析、信息提取、知识管理等任务提供智能化的解决方案。以下将详细介绍ChatGPT在大规模数据处理和信息管…...
【算法篇C++实现】五大常规算法
文章目录 🚀一、分治法⛳(一)算法思想⛳(二)相关代码 🚀二、动态规划算法⛳(一)算法思想⛳(二)相关代码 🚀三、回溯算法⛳(一…...
MySQL和钉钉单据接口对接
MySQL和钉钉单据接口对接 数据源系统:钉钉 钉钉(DingTalk)是阿里巴巴集团打造的企业级智能移动办公平台,是数字经济时代的企业组织协同办公和应用开发平台。钉钉将IM即时沟通、钉钉文档、钉闪会、钉盘、Teambition、OA审批、智能人事、钉工牌…...
基于大模型的 UI 自动化系统
基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...
2024年赣州旅游投资集团社会招聘笔试真
2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...
RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 本文全面剖析RNN核心原理,深入讲解梯度消失/爆炸问题,并通过LSTM/GRU结构实现解决方案,提供时间序列预测和文本生成…...
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南 在数字化营销时代,邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天,我们将深入解析邮件打开率、网站可用性、页面参与时…...
laravel8+vue3.0+element-plus搭建方法
创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...
JVM 内存结构 详解
内存结构 运行时数据区: Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器: 线程私有,程序控制流的指示器,分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 每个线程都有一个程序计数…...
无人机侦测与反制技术的进展与应用
国家电网无人机侦测与反制技术的进展与应用 引言 随着无人机(无人驾驶飞行器,UAV)技术的快速发展,其在商业、娱乐和军事领域的广泛应用带来了新的安全挑战。特别是对于关键基础设施如电力系统,无人机的“黑飞”&…...
MySQL 8.0 事务全面讲解
以下是一个结合两次回答的 MySQL 8.0 事务全面讲解,涵盖了事务的核心概念、操作示例、失败回滚、隔离级别、事务性 DDL 和 XA 事务等内容,并修正了查看隔离级别的命令。 MySQL 8.0 事务全面讲解 一、事务的核心概念(ACID) 事务是…...
打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用
一、方案背景 在现代生产与生活场景中,如工厂高危作业区、医院手术室、公共场景等,人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式,存在效率低、覆盖面不足、判断主观性强等问题,难以满足对人员打手机行为精…...
【 java 虚拟机知识 第一篇 】
目录 1.内存模型 1.1.JVM内存模型的介绍 1.2.堆和栈的区别 1.3.栈的存储细节 1.4.堆的部分 1.5.程序计数器的作用 1.6.方法区的内容 1.7.字符串池 1.8.引用类型 1.9.内存泄漏与内存溢出 1.10.会出现内存溢出的结构 1.内存模型 1.1.JVM内存模型的介绍 内存模型主要分…...
