尚硅谷大数据项目《在线教育之实时数仓》笔记003
视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili
目录
第7章 数仓开发之ODS层
P015
第8章 数仓开发之DIM层
P016
P017
P018
P019
01、node001节点Linux命令
02、KafkaUtil.java
03、DimSinkApp.java
P020
P021
P022
P023
第7章 数仓开发之ODS层
P015
第7章 数仓开发之ODS层
采集到 Kafka 的 topic_log 和 topic_db 主题的数据即为实时数仓的 ODS 层,这一层的作用是对数据做原样展示和备份。
8.2.2 动态拆分维度表功能
由于Maxwell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个维度表拆开处理。
在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。
这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张维度表表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。这种可以有三个方案实现:
一种是用Zookeeper存储,通过Watch感知数据变化;
另一种是用mysql数据库存储,周期性的同步;
再一种是用mysql数据库存储,使用广播流。
这里选择第三种方案,主要是MySQL对于配置数据初始化和维护管理,使用FlinkCDC读取配置信息表,将配置流作为广播流与主流进行连接。
第8章 数仓开发之DIM层
P016
8.1.1 Flink CDC
- 基于 Flink SQL CDC的实时数据同步方案
- https://github.com/ververica/flink-cdc-connectors
P017
8.2 主要任务
package com.atguigu.edu.realtime.app.dim;import com.atguigu.edu.realtime.util.EnvUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DimSinkApp {public static void main(String[] args) {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据//env.fromSource();// TODO 3 对主流数据进行ETL// TODO 4 使用flinkCDC读取配置表数据// TODO 5 将配置表数据创建为广播流// TODO 6 合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务}
}
package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class EnvUtil {/*** 环境准备及状态后端设置,获取对应的环境** @param parallelism Flink 程序的并行度* @return Flink 流处理环境对象*/public static StreamExecutionEnvironment getExecutionEnvironment(Integer parallelism) {//TODO 1 环境创建准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并发env.setParallelism(parallelism);//TODO 2 设置状态后端env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);//设置超时时间env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);//设置最小间隔时间env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.minutes(1)));env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("hdfs://node001:8020/edu/ck");System.setProperty("HADOOP_USER_NAME", "atguigu");return env;}
}
P018
package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.io.IOException;public class KafkaUtil {public static KafkaSource<String> getKafkaConsumer(String topic, String groupId) {return KafkaSource.<String>builder()// 必要参数
// .setBootstrapServers(EduConfig.KAFKA_BOOTSTRAPS)//“node001:9092”.setTopics(topic).setGroupId(groupId).setValueOnlyDeserializer(new DeserializationSchema<String>() {@Overridepublic String deserialize(byte[] message) throws IOException {if (message != null && message.length != 0) {return new String(message);}return null;}@Overridepublic boolean isEndOfStream(String nextElement) {return false;}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}})// 不必要的参数,设置offset重置的时候读取数据的位置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)).build();}
}
P019
01、node001节点Linux命令
[atguigu@node001 bin]$ jpsall
================ node001 ================
4803 QuorumPeerMain
5236 Kafka
7941 Maxwell
5350 Application
6726 ConsoleConsumer
4458 NodeManager
8810 Jps
4043 DataNode
3869 NameNode
4654 JobHistoryServer
================ node002 ================
3505 ResourceManager
4066 QuorumPeerMain
4490 Kafka
5179 Jps
3660 NodeManager
3263 DataNode
================ node003 ================
3505 SecondaryNameNode
5777 Jps
4369 Application
4279 Kafka
4569 Application
3354 DataNode
3851 QuorumPeerMain
3659 NodeManager
[atguigu@node001 bin]$
启动hadoop、maxwell、kafka。
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic topic_db
[atguigu@node001 ~]$ cd ~/bin
[atguigu@node001 bin]$ mysql_to_kafka_init.sh al
{"database":"edu","table":"video_info","type":"bootstrap-insert","ts":1645429973,"data":{"id":5410,"video_name":"day20_11复习_总结.avi","during_sec":900,"video_status":"1","video_size":12003100,"video_url":"file://xxx/xxx","video_source_id":null,"version_id":1,"chapter_id":26305,"course_id":39,"publisher_id":99,"create_time":"2021-11-14 04:15:01","update_time":null,"deleted":"0"}}
{"database":"edu","table":"video_info","type":"bootstrap-insert","ts":1645429973,"data":{"id":5410,"video_name":"day20_11复习_总结.avi","during_sec":900,"video_status":"1","video_size":12003100,"video_url":"file://xxx/xxx","video_source_id":null,"version_id":1,"chapter_id":26305,"course_id":39,"publisher_id":99,"create_time":"2021-11-14 04:15:01","update_time":null,"deleted":"0"}
}
02、KafkaUtil.java
package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.io.IOException;public class KafkaUtil {public static KafkaSource<String> getKafkaConsumer(String topic, String groupId) {return KafkaSource.<String>builder()// 必要参数.setBootstrapServers("node001:9092").setTopics(topic).setGroupId(groupId).setValueOnlyDeserializer(new DeserializationSchema<String>() {@Overridepublic String deserialize(byte[] message) throws IOException {if (message != null && message.length != 0) {return new String(message);}return null;}@Overridepublic boolean isEndOfStream(String nextElement) {return false;}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}})// 不必要的参数,设置offset重置的时候读取数据的位置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)).build();}
}
03、DimSinkApp.java
package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSource<String> eduDS = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", "dim_sink_app"),WatermarkStrategy.noWatermarks(),"kafka_source");// TODO 3 对主流数据进行ETL
// eduDS.map(new MapFunction<String, JSONObject>() {
// @Override
// public JSONObject map(String value) throws Exception {
// return JSONObject.parseObject(value);
// }
// }).filter(new FilterFunction<JSONObject>() {
// @Override
// public boolean filter(JSONObject jsonObject) throws Exception {
// String type = jsonObject.getString("type");
// if (type.equals("bootstrap-complete") || type.equals("bootstrap-start")) {
// return false;
// }
// return true;
// }
// });SingleOutputStreamOperator<JSONObject> jsonDS = eduDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if (!(type.equals("bootstrap-complete") || type.equals("bootstrap-start"))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println("数据转换json错误...");}}});jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// TODO 5 将配置表数据创建为广播流// TODO 6 合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}
P020
flinkCDC监控mysql中的binlog。
{"before":null,"after":{"source_table":"base_category_info","sink_table":"dim_base_category_info","sink_columns":"id,category_name,create_time,update_time,deleted","sink_pk":"id","sink_extend":null},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"edu_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1695262804254,"transaction":null}
{"before":null, # 被修改之前的数据"after":{ # 被修改之后的数据"source_table":"base_category_info","sink_table":"dim_base_category_info","sink_columns":"id,category_name,create_time,update_time,deleted","sink_pk":"id","sink_extend":null},"source":{ # 数据来源"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"edu_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r", # option,r修改"ts_ms":1695262804254,"transaction":null
}
package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSource<String> eduDS = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", "dim_sink_app"),WatermarkStrategy.noWatermarks(),"kafka_source");// TODO 3 对主流数据进行ETL
// eduDS.map(new MapFunction<String, JSONObject>() {
// @Override
// public JSONObject map(String value) throws Exception {
// return JSONObject.parseObject(value);
// }
// }).filter(new FilterFunction<JSONObject>() {
// @Override
// public boolean filter(JSONObject jsonObject) throws Exception {
// String type = jsonObject.getString("type");
// if (type.equals("bootstrap-complete") || type.equals("bootstrap-start")) {
// return false;
// }
// return true;
// }
// });SingleOutputStreamOperator<JSONObject> jsonDS = eduDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if (!(type.equals("bootstrap-complete") || type.equals("bootstrap-start"))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println("数据转换json错误...");}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node001").port(3306).databaseList("edu_config") // set captured database.tableList("edu_config.table_process") // set captured table.username("root").password("123456")//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSource<String> configDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source");configDS.print();// TODO 5 将配置表数据创建为广播流// TODO 6 连接流,合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}
P021
package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSource<String> eduDS = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", "dim_sink_app"),WatermarkStrategy.noWatermarks(),"kafka_source");// TODO 3 对主流数据进行ETL
// eduDS.map(new MapFunction<String, JSONObject>() {
// @Override
// public JSONObject map(String value) throws Exception {
// return JSONObject.parseObject(value);
// }
// }).filter(new FilterFunction<JSONObject>() {
// @Override
// public boolean filter(JSONObject jsonObject) throws Exception {
// String type = jsonObject.getString("type");
// if (type.equals("bootstrap-complete") || type.equals("bootstrap-start")) {
// return false;
// }
// return true;
// }
// });SingleOutputStreamOperator<JSONObject> jsonDS = eduDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if (!(type.equals("bootstrap-complete") || type.equals("bootstrap-start"))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println("数据转换json错误...");}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node001").port(3306).databaseList("edu_config") // set captured database.tableList("edu_config.table_process") // set captured table.username("root").password("123456")//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSource<String> configDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source");configDS.print();// TODO 5 将配置表数据创建为广播流// key-> 维度表名称,value-> mysql单行数据 使用javaBeanMapStateDescriptor<String, DimTableProcess> tableProcessState = new MapStateDescriptor<>("table_process_state", String.class, DimTableProcess.class);BroadcastStream<String> broadcastStream = configDS.broadcast(tableProcessState);// TODO 6 连接流,合并主流和广播流BroadcastConnectedStream<JSONObject, String> connectCS = jsonDS.connect(broadcastStream);// TODO 7 对合并流进行分别处理connectCS.process(new BroadcastProcessFunction<JSONObject, String, Object>() {//处理主流@Overridepublic void processElement(JSONObject jsonObject, BroadcastProcessFunction<JSONObject, String, Object>.ReadOnlyContext readOnlyContext, Collector<Object> collector) throws Exception {}//处理广播流@Overridepublic void processBroadcastElement(String s, BroadcastProcessFunction<JSONObject, String, Object>.Context context, Collector<Object> collector) throws Exception {}});// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}
package com.atguigu.edu.realtime.bean;import lombok.Data;@Data
public class DimTableProcess {//来源表String sourceTable;//输出表String sinkTable;//输出字段String sinkColumns;//主键字段String sinkPk;//建表扩展String sinkExtend;
}
P022
8.3.2 根据MySQL的配置表,动态进行分流
7)自定义函数DimBroadcastFunction
package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSource<String> eduDS = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", "dim_sink_app"),WatermarkStrategy.noWatermarks(),"kafka_source");// TODO 3 对主流数据进行ETL
// eduDS.map(new MapFunction<String, JSONObject>() {
// @Override
// public JSONObject map(String value) throws Exception {
// return JSONObject.parseObject(value);
// }
// }).filter(new FilterFunction<JSONObject>() {
// @Override
// public boolean filter(JSONObject jsonObject) throws Exception {
// String type = jsonObject.getString("type");
// if (type.equals("bootstrap-complete") || type.equals("bootstrap-start")) {
// return false;
// }
// return true;
// }
// });SingleOutputStreamOperator<JSONObject> jsonDS = eduDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if (!(type.equals("bootstrap-complete") || type.equals("bootstrap-start"))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println("数据转换json错误...");}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node001").port(3306).databaseList("edu_config") // set captured database.tableList("edu_config.table_process") // set captured table.username("root").password("123456")//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSource<String> configDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source");configDS.print();// TODO 5 将配置表数据创建为广播流// key-> 维度表名称,value-> mysql单行数据 使用javaBeanMapStateDescriptor<String, DimTableProcess> tableProcessState = new MapStateDescriptor<>("table_process_state", String.class, DimTableProcess.class);BroadcastStream<String> broadcastStream = configDS.broadcast(tableProcessState);// TODO 6 连接流,合并主流和广播流BroadcastConnectedStream<JSONObject, String> connectCS = jsonDS.connect(broadcastStream);// TODO 7 对合并流进行分别处理connectCS.process(new DimBroadcastProcessFunction(tableProcessState));// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}
package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, DimTableProcess> tableProcessState;// 初始化配置表数据private HashMap<String, DimTableProcess> configMap = new HashMap<>();public DimBroadcastProcessFunction(MapStateDescriptor<String, DimTableProcess> tableProcessState) {this.tableProcessState = tableProcessState;}/*** @param value flinkCDC直接输入的json* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取配置表数据解析格式//TODO 2 检查phoenix中是否存在表 不存在创建//TODO 3 将数据写入到状态 广播出去}/*** @param value kafka中maxwell生成的json数据* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段}
}
P023
package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, DimTableProcess> tableProcessState;// 初始化配置表数据private HashMap<String, DimTableProcess> configMap = new HashMap<>();public DimBroadcastProcessFunction(MapStateDescriptor<String, DimTableProcess> tableProcessState) {this.tableProcessState = tableProcessState;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +"user=root&password=123456&useUnicode=true&" +"characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false");PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");ResultSet resultSet = preparedStatement.executeQuery();ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {JSONObject jsonObject = new JSONObject();for (int i = 1; i <= metaData.getColumnCount(); i++) {String columnName = metaData.getColumnName(i);String columnValue = resultSet.getString(i);jsonObject.put(columnName, columnValue);}DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);}resultSet.close();preparedStatement.close();connection.close();}/*** @param value flinkCDC直接输入的json* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取配置表数据解析格式//TODO 2 检查phoenix中是否存在表 不存在创建//TODO 3 将数据写入到状态 广播出去}/*** @param value kafka中maxwell生成的json数据* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段}
}
😘
相关文章:

尚硅谷大数据项目《在线教育之实时数仓》笔记003
视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili 目录 第7章 数仓开发之ODS层 P015 第8章 数仓开发之DIM层 P016 P017 P018 P019 01、node001节点Linux命令 02、KafkaUtil.java 03、DimSinkApp.java P020 P021 P022 P023 第7章 数…...

【Linux】部署单体项目以及前后端分离项目(项目部署)
一、简介 以下就是Linux部署单机项目和前后端分离项目的优缺点,希望对你有所帮助。 1、Linux部署单机项目: 优点: 1.简化了系统管理:由于所有服务都在同一台机器上运行,因此可以简化系统管理和维护。 2.提高了性能&a…...

设计模式之门面模式
前言 什么是门面模式 门面模式是一种结构型设计模式,它提供了一个统一的接口,用来访问子系统中的一群接口。它定义了一个高层接口,让子系统更容易使用。这种模式常用于将一个复杂的子系统封装成一个简单的接口,使得客户端可以方…...

Postman的使用
Postman的使用 Postman断言Postman常用断言1、断言响应状态码2、断言包含某个字符串3、断言JSON数据4、Postman断言工作原理 Postman关联Postman自动关联创建环境 3、Postman参数化CSV文件JSON文件1、用例集的导入导出2、环境导出 Postman断言 让Postman工具代替人自动判断预期…...

QGIS008:QGIS拓扑检查、修改及验证
摘要:本文介绍使用QGIS拓扑检查器和几何图形检查器检查图层的拓扑错误,修改拓扑错误,并对修改后的图层进行错误验证。 实验数据: 链接:https://pan.baidu.com/s/1Vy2s-KYS-XJevqHNdavv9A?pwdf06o 提取码:…...
安装DBD-Oracle报错处理
cd DBD-Oracle-1.83 perl Makefile.PL make && make install make编译报错如下: /bin/ld: 找不到 -lnsl collect2: 错误:ld 返回 1 make: *** [Makefile:524:blib/arch/auto/DBD/Oracle/Oracle.so] 错误 1 [rootlocalhost DBD-Ora…...
【机器学习】KNN算法-鸢尾花种类预测
KNN算法-鸢尾花种类预测 文章目录 KNN算法-鸢尾花种类预测1. 数据集介绍2. KNN优缺点: K最近邻(K-Nearest Neighbors,KNN)算法是一种用于模式识别和分类的简单但强大的机器学习算法。它的工作原理非常直观:给定一个新数…...
LuatOS-SOC接口文档(air780E)--lora - lora驱动模块
常量 常量 类型 解释 lora.SLEEP number SLEEP模式 lora.STANDBY number STANDBY模式 lora.init(ic, loraconfig,spiconfig) lora初始化 参数 传入值类型 解释 string lora 型号,当前支持: llcc68 sx1268 table lora配置参数,与具体设备…...

Compose 自定义 - 绘制 Draw
一、概念 所有的绘制操作都是通过调整像素大小来执行的。若要确保项目在不同的设备密度和屏幕尺寸上都能采用一致的尺寸,请务必使用 .toPx() 对 dp 进行转换或者采用小数尺寸。 二、Modifier 修饰符绘制 官方页面 在修饰的可组合项之上或之下绘制。 .drawWithCon…...

c#学习相关系列之构造函数
目录 一、构造函数的作用 二、构造函数的特征 三、三种构造函数介绍 1、实例构造函数 2、静态构造函数 3、私有构造函数 一、构造函数的作用 构造函数用来创建对象,并且可以在构造函数中对此对象进行初始化。构造函数具有与类相同的名称,它通常用来…...

CS224W1.3——图表示的选择
文章目录 1. 图网络构成2. 选择一个合适的表示3. 图结构实例3.1 二部图3.2 图的表示 4. 节点和边的属性 这小节主要讲图表示的选择。 1. 图网络构成 对于每个实体,我们创建节点 N N N,对于每个关系,我们创建边 E E E,对于整体而言…...
rust学习——插件rust-analyzer安装与配置
插件rust-analyzer安装与配置 rust-analyzer有一个中文版本。安装前请先卸载其他rust插件。 首次安装会下载语言服务。 您可能是首次安装Rust中文标准库插件 现在还需要安装Rust语言服务(约25MB单文件)就全部安装完成啦~正在后台自动安装请稍后... 下载完成...OK配置 "…...

Spring Boot简介
Spring Boot帮助你创建可以运行的独立的、基于Spring的生产级应用程序。 我们对Spring平台和第三方库采取了有主见的观点,这样你就能以最少的麻烦开始工作。 大多数Spring Boot应用程序只需要很少的Spring配置。 你可以使用Spring Boot来创建Java应用程序ÿ…...

Linux下protobuf和 protobuf-c安装使用
如果在 C语言中使用 protobuf,就需要使用 protobuf-c这个库。 protobuf使用详解:https://blog.csdn.net/qq_42402854/article/details/134066566 下面在 Linux下安装 protobuf和 protobuf-c。 一、下载 protobuf和 protobuf-c 官方的 Protocol Buffer提…...

FastAPI 快速学习之 Flask 框架对比
目录 一、前言二、FastAPI 优势三、Hello World四、HTTP 方法五、URL 变量六、查询字符串七、POST 请求八、文件上传九、表单提交十、Cookies十一、模块化视图十二、数据校验十三、自动化文档Swagger 风格ReDoc 风格 十四、CORS跨域 一、前言 本文主要对 FastAPI 与 Flask 框架…...

Spring Boot和XXL-Job:高效定时任务管理
Spring Boot和XXL-Job:高效定时任务管理 前言第一:XXL-Job简介什么是XXL-job对比别的任务调度 第二: springboot整合XXL-job配置XXL-Job Admin拉取XXL-Job代码修改拉取的配置 配置执行器自己的项目如何整合maven依赖properties文件配置执行器…...

3、QtCharts 动态曲线图
文章目录 效果声明变量构建静态图表创建计时器连接信号与槽槽函数核心代码 效果 声明变量 构建静态图表 //构建曲线系列m_splineSerisenew QSplineSeries(this);//为折线添加数据qreal x0.f;for (size_t i0;i<c_MaxSize;i){xqreal(i1)/c_MaxSize;m_splineSerise->append(…...

Linux下自动挂载U盘或者USB移动硬盘
最近在折腾用树莓派(实际上是平替香橙派orangepi zero3)搭建共享文件服务器,有一个问题很重要,如何在系统启动时自动挂载USB移动硬盘。 1 使用/etc/fstab 最开始尝试了用/etc/fstab文件下增加:"/dev/sda1 /home/orangepi/s…...

一文通透位置编码:从标准位置编码到旋转位置编码RoPE
前言 关于位置编码和RoPE 我之前在本博客中的另外两篇文章中有阐述过(一篇是关于LLaMA解读的,一篇是关于transformer从零实现的),但自觉写的不是特别透彻好懂再后来在我参与主讲的类ChatGPT微调实战课中也有讲过,但有些学员依然反馈RoPE不是…...

八皇后问题
1、问题描述 在棋盘上放置 8 个皇后,使得它们互不攻击,此时每个皇后的攻击范围为同行同列和同对角线,要求找出所有解,如下图所示。 左图为皇后的攻击范围,右图为一个可行解。 2、分析 最简单的思路是把问题转化为 “…...

华为云AI开发平台ModelArts
华为云ModelArts:重塑AI开发流程的“智能引擎”与“创新加速器”! 在人工智能浪潮席卷全球的2025年,企业拥抱AI的意愿空前高涨,但技术门槛高、流程复杂、资源投入巨大的现实,却让许多创新构想止步于实验室。数据科学家…...

网络六边形受到攻击
大家读完觉得有帮助记得关注和点赞!!! 抽象 现代智能交通系统 (ITS) 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 (…...

7.4.分块查找
一.分块查找的算法思想: 1.实例: 以上述图片的顺序表为例, 该顺序表的数据元素从整体来看是乱序的,但如果把这些数据元素分成一块一块的小区间, 第一个区间[0,1]索引上的数据元素都是小于等于10的, 第二…...
CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型
CVPR 2025 | MIMO:支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题:MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者:Yanyuan Chen, Dexuan Xu, Yu Hu…...
Admin.Net中的消息通信SignalR解释
定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...
在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:
在 HarmonyOS 应用开发中,手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力,既支持点击、长按、拖拽等基础单一手势的精细控制,也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档,…...
STM32+rt-thread判断是否联网
一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...
Linux简单的操作
ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...
质量体系的重要
质量体系是为确保产品、服务或过程质量满足规定要求,由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面: 🏛️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限,形成层级清晰的管理网络…...
Java入门学习详细版(一)
大家好,Java 学习是一个系统学习的过程,核心原则就是“理论 实践 坚持”,并且需循序渐进,不可过于着急,本篇文章推出的这份详细入门学习资料将带大家从零基础开始,逐步掌握 Java 的核心概念和编程技能。 …...