尚硅谷大数据项目《在线教育之实时数仓》笔记003
视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili
目录
第7章 数仓开发之ODS层
P015
第8章 数仓开发之DIM层
P016
P017
P018
P019
01、node001节点Linux命令
02、KafkaUtil.java
03、DimSinkApp.java
P020
P021
P022
P023
第7章 数仓开发之ODS层
P015
第7章 数仓开发之ODS层
采集到 Kafka 的 topic_log 和 topic_db 主题的数据即为实时数仓的 ODS 层,这一层的作用是对数据做原样展示和备份。
8.2.2 动态拆分维度表功能
由于Maxwell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个维度表拆开处理。
在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。
这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张维度表表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。这种可以有三个方案实现:
一种是用Zookeeper存储,通过Watch感知数据变化;
另一种是用mysql数据库存储,周期性的同步;
再一种是用mysql数据库存储,使用广播流。
这里选择第三种方案,主要是MySQL对于配置数据初始化和维护管理,使用FlinkCDC读取配置信息表,将配置流作为广播流与主流进行连接。
第8章 数仓开发之DIM层
P016
8.1.1 Flink CDC
- 基于 Flink SQL CDC的实时数据同步方案
- https://github.com/ververica/flink-cdc-connectors
P017
8.2 主要任务
package com.atguigu.edu.realtime.app.dim;import com.atguigu.edu.realtime.util.EnvUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DimSinkApp {public static void main(String[] args) {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据//env.fromSource();// TODO 3 对主流数据进行ETL// TODO 4 使用flinkCDC读取配置表数据// TODO 5 将配置表数据创建为广播流// TODO 6 合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务}
}
package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class EnvUtil {/*** 环境准备及状态后端设置,获取对应的环境** @param parallelism Flink 程序的并行度* @return Flink 流处理环境对象*/public static StreamExecutionEnvironment getExecutionEnvironment(Integer parallelism) {//TODO 1 环境创建准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并发env.setParallelism(parallelism);//TODO 2 设置状态后端env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);//设置超时时间env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);//设置最小间隔时间env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.minutes(1)));env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("hdfs://node001:8020/edu/ck");System.setProperty("HADOOP_USER_NAME", "atguigu");return env;}
}
P018
package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.io.IOException;public class KafkaUtil {public static KafkaSource<String> getKafkaConsumer(String topic, String groupId) {return KafkaSource.<String>builder()// 必要参数
// .setBootstrapServers(EduConfig.KAFKA_BOOTSTRAPS)//“node001:9092”.setTopics(topic).setGroupId(groupId).setValueOnlyDeserializer(new DeserializationSchema<String>() {@Overridepublic String deserialize(byte[] message) throws IOException {if (message != null && message.length != 0) {return new String(message);}return null;}@Overridepublic boolean isEndOfStream(String nextElement) {return false;}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}})// 不必要的参数,设置offset重置的时候读取数据的位置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)).build();}
}
P019
01、node001节点Linux命令
[atguigu@node001 bin]$ jpsall
================ node001 ================
4803 QuorumPeerMain
5236 Kafka
7941 Maxwell
5350 Application
6726 ConsoleConsumer
4458 NodeManager
8810 Jps
4043 DataNode
3869 NameNode
4654 JobHistoryServer
================ node002 ================
3505 ResourceManager
4066 QuorumPeerMain
4490 Kafka
5179 Jps
3660 NodeManager
3263 DataNode
================ node003 ================
3505 SecondaryNameNode
5777 Jps
4369 Application
4279 Kafka
4569 Application
3354 DataNode
3851 QuorumPeerMain
3659 NodeManager
[atguigu@node001 bin]$
启动hadoop、maxwell、kafka。
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic topic_db
[atguigu@node001 ~]$ cd ~/bin
[atguigu@node001 bin]$ mysql_to_kafka_init.sh al
{"database":"edu","table":"video_info","type":"bootstrap-insert","ts":1645429973,"data":{"id":5410,"video_name":"day20_11复习_总结.avi","during_sec":900,"video_status":"1","video_size":12003100,"video_url":"file://xxx/xxx","video_source_id":null,"version_id":1,"chapter_id":26305,"course_id":39,"publisher_id":99,"create_time":"2021-11-14 04:15:01","update_time":null,"deleted":"0"}}
{"database":"edu","table":"video_info","type":"bootstrap-insert","ts":1645429973,"data":{"id":5410,"video_name":"day20_11复习_总结.avi","during_sec":900,"video_status":"1","video_size":12003100,"video_url":"file://xxx/xxx","video_source_id":null,"version_id":1,"chapter_id":26305,"course_id":39,"publisher_id":99,"create_time":"2021-11-14 04:15:01","update_time":null,"deleted":"0"}
}
02、KafkaUtil.java
package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.io.IOException;public class KafkaUtil {public static KafkaSource<String> getKafkaConsumer(String topic, String groupId) {return KafkaSource.<String>builder()// 必要参数.setBootstrapServers("node001:9092").setTopics(topic).setGroupId(groupId).setValueOnlyDeserializer(new DeserializationSchema<String>() {@Overridepublic String deserialize(byte[] message) throws IOException {if (message != null && message.length != 0) {return new String(message);}return null;}@Overridepublic boolean isEndOfStream(String nextElement) {return false;}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}})// 不必要的参数,设置offset重置的时候读取数据的位置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)).build();}
}
03、DimSinkApp.java
package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSource<String> eduDS = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", "dim_sink_app"),WatermarkStrategy.noWatermarks(),"kafka_source");// TODO 3 对主流数据进行ETL
// eduDS.map(new MapFunction<String, JSONObject>() {
// @Override
// public JSONObject map(String value) throws Exception {
// return JSONObject.parseObject(value);
// }
// }).filter(new FilterFunction<JSONObject>() {
// @Override
// public boolean filter(JSONObject jsonObject) throws Exception {
// String type = jsonObject.getString("type");
// if (type.equals("bootstrap-complete") || type.equals("bootstrap-start")) {
// return false;
// }
// return true;
// }
// });SingleOutputStreamOperator<JSONObject> jsonDS = eduDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if (!(type.equals("bootstrap-complete") || type.equals("bootstrap-start"))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println("数据转换json错误...");}}});jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// TODO 5 将配置表数据创建为广播流// TODO 6 合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}
P020
flinkCDC监控mysql中的binlog。
{"before":null,"after":{"source_table":"base_category_info","sink_table":"dim_base_category_info","sink_columns":"id,category_name,create_time,update_time,deleted","sink_pk":"id","sink_extend":null},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"edu_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1695262804254,"transaction":null}
{"before":null, # 被修改之前的数据"after":{ # 被修改之后的数据"source_table":"base_category_info","sink_table":"dim_base_category_info","sink_columns":"id,category_name,create_time,update_time,deleted","sink_pk":"id","sink_extend":null},"source":{ # 数据来源"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"edu_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r", # option,r修改"ts_ms":1695262804254,"transaction":null
}
package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSource<String> eduDS = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", "dim_sink_app"),WatermarkStrategy.noWatermarks(),"kafka_source");// TODO 3 对主流数据进行ETL
// eduDS.map(new MapFunction<String, JSONObject>() {
// @Override
// public JSONObject map(String value) throws Exception {
// return JSONObject.parseObject(value);
// }
// }).filter(new FilterFunction<JSONObject>() {
// @Override
// public boolean filter(JSONObject jsonObject) throws Exception {
// String type = jsonObject.getString("type");
// if (type.equals("bootstrap-complete") || type.equals("bootstrap-start")) {
// return false;
// }
// return true;
// }
// });SingleOutputStreamOperator<JSONObject> jsonDS = eduDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if (!(type.equals("bootstrap-complete") || type.equals("bootstrap-start"))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println("数据转换json错误...");}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node001").port(3306).databaseList("edu_config") // set captured database.tableList("edu_config.table_process") // set captured table.username("root").password("123456")//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSource<String> configDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source");configDS.print();// TODO 5 将配置表数据创建为广播流// TODO 6 连接流,合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}
P021
package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSource<String> eduDS = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", "dim_sink_app"),WatermarkStrategy.noWatermarks(),"kafka_source");// TODO 3 对主流数据进行ETL
// eduDS.map(new MapFunction<String, JSONObject>() {
// @Override
// public JSONObject map(String value) throws Exception {
// return JSONObject.parseObject(value);
// }
// }).filter(new FilterFunction<JSONObject>() {
// @Override
// public boolean filter(JSONObject jsonObject) throws Exception {
// String type = jsonObject.getString("type");
// if (type.equals("bootstrap-complete") || type.equals("bootstrap-start")) {
// return false;
// }
// return true;
// }
// });SingleOutputStreamOperator<JSONObject> jsonDS = eduDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if (!(type.equals("bootstrap-complete") || type.equals("bootstrap-start"))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println("数据转换json错误...");}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node001").port(3306).databaseList("edu_config") // set captured database.tableList("edu_config.table_process") // set captured table.username("root").password("123456")//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSource<String> configDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source");configDS.print();// TODO 5 将配置表数据创建为广播流// key-> 维度表名称,value-> mysql单行数据 使用javaBeanMapStateDescriptor<String, DimTableProcess> tableProcessState = new MapStateDescriptor<>("table_process_state", String.class, DimTableProcess.class);BroadcastStream<String> broadcastStream = configDS.broadcast(tableProcessState);// TODO 6 连接流,合并主流和广播流BroadcastConnectedStream<JSONObject, String> connectCS = jsonDS.connect(broadcastStream);// TODO 7 对合并流进行分别处理connectCS.process(new BroadcastProcessFunction<JSONObject, String, Object>() {//处理主流@Overridepublic void processElement(JSONObject jsonObject, BroadcastProcessFunction<JSONObject, String, Object>.ReadOnlyContext readOnlyContext, Collector<Object> collector) throws Exception {}//处理广播流@Overridepublic void processBroadcastElement(String s, BroadcastProcessFunction<JSONObject, String, Object>.Context context, Collector<Object> collector) throws Exception {}});// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}
package com.atguigu.edu.realtime.bean;import lombok.Data;@Data
public class DimTableProcess {//来源表String sourceTable;//输出表String sinkTable;//输出字段String sinkColumns;//主键字段String sinkPk;//建表扩展String sinkExtend;
}
P022
8.3.2 根据MySQL的配置表,动态进行分流
7)自定义函数DimBroadcastFunction
package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSource<String> eduDS = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", "dim_sink_app"),WatermarkStrategy.noWatermarks(),"kafka_source");// TODO 3 对主流数据进行ETL
// eduDS.map(new MapFunction<String, JSONObject>() {
// @Override
// public JSONObject map(String value) throws Exception {
// return JSONObject.parseObject(value);
// }
// }).filter(new FilterFunction<JSONObject>() {
// @Override
// public boolean filter(JSONObject jsonObject) throws Exception {
// String type = jsonObject.getString("type");
// if (type.equals("bootstrap-complete") || type.equals("bootstrap-start")) {
// return false;
// }
// return true;
// }
// });SingleOutputStreamOperator<JSONObject> jsonDS = eduDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if (!(type.equals("bootstrap-complete") || type.equals("bootstrap-start"))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println("数据转换json错误...");}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node001").port(3306).databaseList("edu_config") // set captured database.tableList("edu_config.table_process") // set captured table.username("root").password("123456")//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSource<String> configDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source");configDS.print();// TODO 5 将配置表数据创建为广播流// key-> 维度表名称,value-> mysql单行数据 使用javaBeanMapStateDescriptor<String, DimTableProcess> tableProcessState = new MapStateDescriptor<>("table_process_state", String.class, DimTableProcess.class);BroadcastStream<String> broadcastStream = configDS.broadcast(tableProcessState);// TODO 6 连接流,合并主流和广播流BroadcastConnectedStream<JSONObject, String> connectCS = jsonDS.connect(broadcastStream);// TODO 7 对合并流进行分别处理connectCS.process(new DimBroadcastProcessFunction(tableProcessState));// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}
package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, DimTableProcess> tableProcessState;// 初始化配置表数据private HashMap<String, DimTableProcess> configMap = new HashMap<>();public DimBroadcastProcessFunction(MapStateDescriptor<String, DimTableProcess> tableProcessState) {this.tableProcessState = tableProcessState;}/*** @param value flinkCDC直接输入的json* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取配置表数据解析格式//TODO 2 检查phoenix中是否存在表 不存在创建//TODO 3 将数据写入到状态 广播出去}/*** @param value kafka中maxwell生成的json数据* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段}
}
P023
package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, DimTableProcess> tableProcessState;// 初始化配置表数据private HashMap<String, DimTableProcess> configMap = new HashMap<>();public DimBroadcastProcessFunction(MapStateDescriptor<String, DimTableProcess> tableProcessState) {this.tableProcessState = tableProcessState;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +"user=root&password=123456&useUnicode=true&" +"characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false");PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");ResultSet resultSet = preparedStatement.executeQuery();ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {JSONObject jsonObject = new JSONObject();for (int i = 1; i <= metaData.getColumnCount(); i++) {String columnName = metaData.getColumnName(i);String columnValue = resultSet.getString(i);jsonObject.put(columnName, columnValue);}DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);}resultSet.close();preparedStatement.close();connection.close();}/*** @param value flinkCDC直接输入的json* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取配置表数据解析格式//TODO 2 检查phoenix中是否存在表 不存在创建//TODO 3 将数据写入到状态 广播出去}/*** @param value kafka中maxwell生成的json数据* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段}
}
😘
相关文章:

尚硅谷大数据项目《在线教育之实时数仓》笔记003
视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili 目录 第7章 数仓开发之ODS层 P015 第8章 数仓开发之DIM层 P016 P017 P018 P019 01、node001节点Linux命令 02、KafkaUtil.java 03、DimSinkApp.java P020 P021 P022 P023 第7章 数…...

【Linux】部署单体项目以及前后端分离项目(项目部署)
一、简介 以下就是Linux部署单机项目和前后端分离项目的优缺点,希望对你有所帮助。 1、Linux部署单机项目: 优点: 1.简化了系统管理:由于所有服务都在同一台机器上运行,因此可以简化系统管理和维护。 2.提高了性能&a…...

设计模式之门面模式
前言 什么是门面模式 门面模式是一种结构型设计模式,它提供了一个统一的接口,用来访问子系统中的一群接口。它定义了一个高层接口,让子系统更容易使用。这种模式常用于将一个复杂的子系统封装成一个简单的接口,使得客户端可以方…...

Postman的使用
Postman的使用 Postman断言Postman常用断言1、断言响应状态码2、断言包含某个字符串3、断言JSON数据4、Postman断言工作原理 Postman关联Postman自动关联创建环境 3、Postman参数化CSV文件JSON文件1、用例集的导入导出2、环境导出 Postman断言 让Postman工具代替人自动判断预期…...

QGIS008:QGIS拓扑检查、修改及验证
摘要:本文介绍使用QGIS拓扑检查器和几何图形检查器检查图层的拓扑错误,修改拓扑错误,并对修改后的图层进行错误验证。 实验数据: 链接:https://pan.baidu.com/s/1Vy2s-KYS-XJevqHNdavv9A?pwdf06o 提取码:…...

安装DBD-Oracle报错处理
cd DBD-Oracle-1.83 perl Makefile.PL make && make install make编译报错如下: /bin/ld: 找不到 -lnsl collect2: 错误:ld 返回 1 make: *** [Makefile:524:blib/arch/auto/DBD/Oracle/Oracle.so] 错误 1 [rootlocalhost DBD-Ora…...

【机器学习】KNN算法-鸢尾花种类预测
KNN算法-鸢尾花种类预测 文章目录 KNN算法-鸢尾花种类预测1. 数据集介绍2. KNN优缺点: K最近邻(K-Nearest Neighbors,KNN)算法是一种用于模式识别和分类的简单但强大的机器学习算法。它的工作原理非常直观:给定一个新数…...

LuatOS-SOC接口文档(air780E)--lora - lora驱动模块
常量 常量 类型 解释 lora.SLEEP number SLEEP模式 lora.STANDBY number STANDBY模式 lora.init(ic, loraconfig,spiconfig) lora初始化 参数 传入值类型 解释 string lora 型号,当前支持: llcc68 sx1268 table lora配置参数,与具体设备…...

Compose 自定义 - 绘制 Draw
一、概念 所有的绘制操作都是通过调整像素大小来执行的。若要确保项目在不同的设备密度和屏幕尺寸上都能采用一致的尺寸,请务必使用 .toPx() 对 dp 进行转换或者采用小数尺寸。 二、Modifier 修饰符绘制 官方页面 在修饰的可组合项之上或之下绘制。 .drawWithCon…...

c#学习相关系列之构造函数
目录 一、构造函数的作用 二、构造函数的特征 三、三种构造函数介绍 1、实例构造函数 2、静态构造函数 3、私有构造函数 一、构造函数的作用 构造函数用来创建对象,并且可以在构造函数中对此对象进行初始化。构造函数具有与类相同的名称,它通常用来…...

CS224W1.3——图表示的选择
文章目录 1. 图网络构成2. 选择一个合适的表示3. 图结构实例3.1 二部图3.2 图的表示 4. 节点和边的属性 这小节主要讲图表示的选择。 1. 图网络构成 对于每个实体,我们创建节点 N N N,对于每个关系,我们创建边 E E E,对于整体而言…...

rust学习——插件rust-analyzer安装与配置
插件rust-analyzer安装与配置 rust-analyzer有一个中文版本。安装前请先卸载其他rust插件。 首次安装会下载语言服务。 您可能是首次安装Rust中文标准库插件 现在还需要安装Rust语言服务(约25MB单文件)就全部安装完成啦~正在后台自动安装请稍后... 下载完成...OK配置 "…...

Spring Boot简介
Spring Boot帮助你创建可以运行的独立的、基于Spring的生产级应用程序。 我们对Spring平台和第三方库采取了有主见的观点,这样你就能以最少的麻烦开始工作。 大多数Spring Boot应用程序只需要很少的Spring配置。 你可以使用Spring Boot来创建Java应用程序ÿ…...

Linux下protobuf和 protobuf-c安装使用
如果在 C语言中使用 protobuf,就需要使用 protobuf-c这个库。 protobuf使用详解:https://blog.csdn.net/qq_42402854/article/details/134066566 下面在 Linux下安装 protobuf和 protobuf-c。 一、下载 protobuf和 protobuf-c 官方的 Protocol Buffer提…...

FastAPI 快速学习之 Flask 框架对比
目录 一、前言二、FastAPI 优势三、Hello World四、HTTP 方法五、URL 变量六、查询字符串七、POST 请求八、文件上传九、表单提交十、Cookies十一、模块化视图十二、数据校验十三、自动化文档Swagger 风格ReDoc 风格 十四、CORS跨域 一、前言 本文主要对 FastAPI 与 Flask 框架…...

Spring Boot和XXL-Job:高效定时任务管理
Spring Boot和XXL-Job:高效定时任务管理 前言第一:XXL-Job简介什么是XXL-job对比别的任务调度 第二: springboot整合XXL-job配置XXL-Job Admin拉取XXL-Job代码修改拉取的配置 配置执行器自己的项目如何整合maven依赖properties文件配置执行器…...

3、QtCharts 动态曲线图
文章目录 效果声明变量构建静态图表创建计时器连接信号与槽槽函数核心代码 效果 声明变量 构建静态图表 //构建曲线系列m_splineSerisenew QSplineSeries(this);//为折线添加数据qreal x0.f;for (size_t i0;i<c_MaxSize;i){xqreal(i1)/c_MaxSize;m_splineSerise->append(…...

Linux下自动挂载U盘或者USB移动硬盘
最近在折腾用树莓派(实际上是平替香橙派orangepi zero3)搭建共享文件服务器,有一个问题很重要,如何在系统启动时自动挂载USB移动硬盘。 1 使用/etc/fstab 最开始尝试了用/etc/fstab文件下增加:"/dev/sda1 /home/orangepi/s…...

一文通透位置编码:从标准位置编码到旋转位置编码RoPE
前言 关于位置编码和RoPE 我之前在本博客中的另外两篇文章中有阐述过(一篇是关于LLaMA解读的,一篇是关于transformer从零实现的),但自觉写的不是特别透彻好懂再后来在我参与主讲的类ChatGPT微调实战课中也有讲过,但有些学员依然反馈RoPE不是…...

八皇后问题
1、问题描述 在棋盘上放置 8 个皇后,使得它们互不攻击,此时每个皇后的攻击范围为同行同列和同对角线,要求找出所有解,如下图所示。 左图为皇后的攻击范围,右图为一个可行解。 2、分析 最简单的思路是把问题转化为 “…...

UE4/UE5 设置widget中text的字体Outline
想要在蓝图中控制Widget 中的 text字体,对字体outline参数进行设置。 但是蓝图中无法直接获取设置outline参数的方法: 没有outline相关的蓝图函数 该参数本身是在Font类别下的扩展,所以只要获取设置Font参数即可进行outline的设置 text连出…...

漏洞复现-phpmyadmin_SQL注入 (CVE-2020-5504)
phpmyadmin SQL注入 _(CVE-2020-5504) 漏洞信息 CVE-2020-5504sql注入漏洞Phpmyadmin 5.00以下 描述 phpMyAdmin是Phpmyadmin团队的一套免费的、基于Web的MySQL数据库管理工具。该工具能够创建和删除数据库,创建、删除、修改数据库表&…...

安装虚拟机(VMware)保姆级教程及配置虚拟网络编辑器和安装WindowsServer以及宿主机访问虚拟机和配置服务器环境
目录 一、操作系统 1.1.什么是操作系统 1.2.常见操作系统 1.3.个人版本和服务器版本的区别 1.4.Linux的各个版本 二、VMware Wworkstation Pro虚拟机的安装 1.下载与安装 注意:VMWare虚拟网卡 2.配置虚拟网络编辑器 三、安装配置 WindowsServer 1.创建虚拟…...

vue表格列表导出excel
你可以通过下面的步骤使用Vue导出Excel表格: 安装依赖 安装两个依赖包: npm install --save xlsx file-saver创建Excel导出方法 //导出 Excel exportExcel() {// 表格数据let data this.tableData;// 转化为工作簿对象const workbook XLSX.utils.bo…...

CSS基础入门03
目录 1.圆角矩形 1.1基本用法 1.2生成圆形 1.3生成圆角矩形 1.4展开写法 2.Chrome 调试工具--查看 CSS 属性 2.1打开浏览器 2.2标签页含义 2.3elements 标签页使用 3.元素的显示模式 3.1块级元素 3.2行内元素/内联元素 3.3行内元素和块级元素的区别 3.4改变显示模…...

大数据架构设计理论与实践
大数据架构设计理论与实践 大数据处理系统概述 传统数据处理系统存在的问题 大数据处理系统面临的挑战 大数据处理系统的属性/特征 典型的大数据架构 Lambda架构 Lambda定义 优缺点 应用场景 Lambda的体系结构( Batch Layer (批处理层)、Speed Layer (加速层)、Serving Lay…...

2024级199管理类联考之英语二2200核心词汇(第三天)
abstract 抽象的,非具体的 n-摘要ideal adj -理想的 n-理想idealized 理想化的ideology 意识形态,思想体系concept 观念,概念 conception n-构想,怀孕,观念awareness 意识,认识significant 重要的,有意义的 significance n-意义,重要性major v-主修 adj-主要的,成年的 n-成年人…...

SQL中:语法总结(group by,having ,distinct,top,order by,like等等)
语法总结:group by,distinct ...... 1.group by2.聚集函数count 3.order by4.增insert、删(drop、delete)、改(update、alter)5.查select嵌套查询不相关子查询相关子查询使用的谓词使用的谓词子查询的相关谓…...

13.计算机视觉
#pic_center R 1 R_1 R1 R 2 R^2 R2 目录 知识框架No.1 数据增广一、数据增广二、D2L代码注意点三、QA No.2 微调一、微调二、D2L代码注意点三、QA No.3 第二次竞赛 树叶分类结果No.4 实战 Kaggle 比赛:图像分类(CIFAR-10)一、Kaggle Cifar…...

关于Java中的运算符
文章目录 前言一、什么是运算符二、算术运算符1.基本四则运算符:加减乘除模( - * / %)2.增量运算符( - * /*)3.自增/自减运算符( --) 三、关系运算符四、逻辑运算符1.逻辑&&2.逻辑||3.逻辑非!4.短路求值 五、位运算六、移位运算七、条件运算符八…...