基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
文章目录
- 22:FineBI配置数据集
- 23:FineBI构建报表
- 24:FineBI实时配置测试
- 附录二:离线消费者完整代码
22:FineBI配置数据集
-
目标:实现FineBI访问MySQL结果数据集的配置
-
实施
-
安装FineBI
-
参考《FineBI Windows版本安装手册.docx》安装FineBI
-
-
配置连接
数据连接名称:Momo 用户名:root 密码:自己MySQL的密码 数据连接URL:jdbc:mysql://node1:3306/momo?useUnicode=true&characterEncoding=utf8
-
数据准备
SELECT id, momo_totalcount,momo_province,momo_username,momo_msgcount,CASE momo_grouptype WHEN '1' THEN '总消息量' WHEN '2' THEN '各省份发送量' WHEN '3' THEN '各省份接收量'WHEN '4' THEN '各用户发送量' WHEN '5' THEN '各用户接收量' END AS momo_grouptype FROM momo_count
-
-
小结
- 实现FineBI访问MySQL结果数据集的配置
23:FineBI构建报表
-
目标:实现FineBI实时报表构建
-
路径
- step1:实时报表构建
- step2:实时报表配置
- step3:实时刷新测试
-
实施
-
实时报表构建
-
新建仪表盘
-
添加标题
-
实时总消息数
-
发送消息最多的Top10用户
-
接受消息最多的Top10用户
-
各省份发送消息Top10
-
各省份接收消息Top10
-
各省份总消息量
-
-
-
小结
- 实现FineBI实时报表构建
24:FineBI实时配置测试
-
目标:实现实时报表测试
-
实施
-
实时报表配置
-
官方文档:https://help.fanruan.com/finebi/doc-view-363.html
-
添加jar包:将jar包放入FineBI安装目录的 webapps\webroot\WEB-INF\lib目录下
- 注意:如果提示已存在,就选择覆盖
-
添加JS文件
-
创建js文件:refresh.js
setTimeout(function () {var b =document.title;var a =BI.designConfigure.reportId;//获取仪表板id//这里要指定自己仪表盘的idif (a=="d574631848bd4e33acae54f986d34e69") {setInterval(function () {BI.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());//Data.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());BI.Utils.broadcastAllWidgets2Refresh(true);}, 3000);//5000000为定时刷新的频率,单位ms} }, 2000)
-
将创建好的refresh.js文件放至 FineBI 安装目录%FineBI%/webapps/webroot中
-
关闭FineBI缓存,然后关闭FineBI
-
修改jar包,添加js
<!-- 增加刷新功能 --> <script type="text/javascript" src="/webroot/refresh.js"></script>
-
-
重启FineBI
-
-
-
实时刷新测试
-
清空MySQL结果表
-
启动Flink程序:运行MoMoFlinkCount
-
启动Flume程序
cd /export/server/flume-1.9.0-bin bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
-
启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 10
- 观察报表
-
-
小结
- 实现FineBI实时测试
## 附录一:Maven依赖```xml<!--远程仓库--><repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled><updatePolicy>never</updatePolicy></snapshots></repository></repositories><dependencies><!--Hbase 客户端--><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.1.0</version></dependency><!--kafka 客户端--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><!--JSON解析工具包--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><!--Flink依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>1.10.0</version></dependency><!-- flink操作hdfs、Kafka、MySQL、Redis,所需要导入该包--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><!--HTTP请求的的依赖--><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.4</version></dependency><!--MySQL连接驱动--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><target>1.8</target><source>1.8</source></configuration></plugin></plugins></build>
附录二:离线消费者完整代码
package bigdata.itcast.cn.momo.offline;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;/*** @ClassName MomoKafkaToHbase* @Description TODO 离线场景:消费Kafka的数据写入Hbase* @Create By Maynor*/
public class MomoKafkaToHbase {private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");private static Connection conn;private static Table table;private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名private static byte[] family = Bytes.toBytes("C1");//列族//todo:2-构建Hbase连接//静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能static{try {//构建配置对象Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");//构建连接conn = ConnectionFactory.createConnection(conf);//获取表对象table = conn.getTable(tableName);} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {//todo:1-构建消费者,获取数据consumerKafkaToHbase();
// String momoRowkey = getMomoRowkey("2020-08-13 12:30:00", "13071949728", "17719988692");
// System.out.println(momoRowkey);}/*** 用于消费Kafka的数据,将合法数据写入Hbase*/private static void consumerKafkaToHbase() throws Exception {//构建配置对象Properties props = new Properties();//指定服务端地址props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//指定消费者组的idprops.setProperty("group.id", "momo1");//关闭自动提交props.setProperty("enable.auto.commit", "false");//指定K和V反序列化的类型props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//构建消费者的连接KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//指定订阅哪些Topicconsumer.subscribe(Arrays.asList("MOMO_MSG"));//持续拉取数据while (true) {//向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间//拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));//todo:3-处理拉取到的数据:打印//取出每个分区的数据进行处理Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区//对每个分区的数据做处理for (TopicPartition partition : partitions) {List<ConsumerRecord<String, String>> partRecords = records.records(partition);//取出这个分区的所有数据//处理这个分区的数据long offset = 0;for (ConsumerRecord<String, String> record : partRecords) {//获取TopicString topic = record.topic();//获取分区int part = record.partition();//获取offsetoffset = record.offset();//获取KeyString key = record.key();//获取ValueString value = record.value();System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value);//将Value数据写入Hbaseif(value != null && !"".equals(value) && value.split("\001").length == 20 ){writeToHbase(value);}}//手动提交分区的commit offsetMap<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1));consumer.commitSync(offsets);}}}/*** 用于实现具体的写入Hbase的方法* @param value*/private static void writeToHbase(String value) throws Exception {//todo:3-写入Hbase//切分数据String[] items = value.split("\001");String stime = items[0];String sender_accounter = items[2];String receiver_accounter = items[11];//构建rowkeyString rowkey = getMomoRowkey(stime,sender_accounter,receiver_accounter);//构建PutPut put = new Put(Bytes.toBytes(rowkey));//添加列put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));//执行写入table.put(put);}/*** 基于消息时间、发送人id、接受人id构建rowkey* @param stime* @param sender_accounter* @param receiver_accounter* @return* @throws Exception*/private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {//转换时间戳long time = format.parse(stime).getTime();String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;//构建MD5String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);//合并返回return prefix+"_"+suffix;}
}
相关文章:

基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
文章目录 22:FineBI配置数据集23:FineBI构建报表24:FineBI实时配置测试附录二:离线消费者完整代码 22:FineBI配置数据集 目标:实现FineBI访问MySQL结果数据集的配置 实施 安装FineBI 参考《FineBI Windows…...

Python逆向爬虫案例: 某网站AES逆向解密
前言 嗨喽,大家好呀~这里是爱看美女的茜茜呐 环境使用: Python 3.8 Pycharm 👇 👇 👇 更多精彩机密、教程,尽在下方,赶紧点击了解吧~ python源码、视频教程、插件安装教程、资料我都准备好了࿰…...
ONNX runtime本地终端部署
1、class_index.csv文件: ID,English,Chinese 0,A,你 1,B,我 2,C,他 3,D,她2、classification.onnx 3、单张图像处理代码如下: import onnxruntime import torch import torch.nn.functional as F import pandas as pd from PIL import Image from tor…...

Linux性能优化--性能工具:特定进程CPU
4.0 概述 在用系统级性能工具找出是哪个进程降低了系统速度之后,你需要用特定进程性能工具来发现这个进程的行为。对此,Linux提供了丰富的工具用于追踪一个进程和应用程序性能的重要统计信息。 阅读本章后,你将能够: 确定应用程…...
技术人员转岗产品经理,有优势吗?
产品经理是一个非技术型的岗位,但是懂一些技术相关的知识会更好的和技术部门沟通,能更好的从技术部门的角度理解需求的可行性。所以这么说来,技术转产品经理相对来说更加有优势。 任何事情不可能都是只有好处没有坏处的,同样的&a…...

使用IDEA2022.1创建Maven工程出现卡死问题
使用IDEA创建Maven工程出现卡死问题,这个是一个bug 这里是别人和官方提供这个bug,大家可以参考一下 话不多说,上教程 解决方案: 方案1:更新idea版本 方案2:关闭工程,再新建,看图...
Nuttx Syscall
在Nuttx系统中,mksyscall工具用于根据syscall/syscall.csv文件生成供用户调用的接口和内核中对应的接口。具体来说,mksyscall -p system.csv生成供用户调用的接口,而mksyscall -s system.csv生成内核中调用的接口。 在syscall/syscall.csv文…...
HTTP协议中GET请求和POST请求的区别
1. 形式上: GET请求:参数包含在URL中,意味着参数的长度是有限的,并且参数只能是ASCII码的形式。 POST请求:参数包含在请求体中,参数的长度是不受限,并且参数支持多种数据类型。 2.安全性 GET请…...

【广州华锐互动】利用VR开展施工现场安全培训,提高员工安全意识水平
随着科技的不断发展,虚拟现实(VR)技术已经逐渐渗透到各个领域,为我们带来了前所未有的沉浸式体验。在建筑施工行业,VR技术的应用也日益广泛,从设计、施工到管理,都可以看到VR技术的身影。而在这…...

Cornerstone for Mac:高效SVN管理的黄金标准
在当今的软件开发领域,版本控制系统是不可或缺的一部分。其中,Subversion(SVN)是一个广泛使用的版本控制系统,有助于团队协同工作,实现代码的版本管理和追踪。对于Mac用户来说,Cornerstone是一款…...

数据结构之顺序表的模拟实现
💕"世事犹如书籍,一页页被翻过去。人要向前看,少翻历史旧账。"💕 作者:Mylvzi 文章主要内容:数据结构之顺序表的模拟实现 /*** Created with IntelliJ IDEA.* Description:* User: 绿字* Date:…...

R6G azide, 5-isomer具有良好的水溶性,2135330-71-9
试剂 | 基础知识概述(部分): 英文名称:R6G azide, 5-isomer CAS:2135330-71-9 分子式:C30H32N6O4 分子量:540.61 规格标准:10mg,25mg,50mg,可提供mg级以…...

Canvas系列绘制图片学习:绘制图片和渐变效果
我们现在已经可以绘制好多东西了,不过在实际开发中,绘制最多的当然是图片了,这章我们就讲讲图片的绘制。 绘制图片 绘制图片的API是drawImage,它的参数有三种情况: // 将图片绘制在canvas的(dX, dY)坐标处 context.…...
AJAX为什么叫AJAX
AJAX(Asynchronous JavaScript and XML)这个名字是由美国程序员Jesse James Garrett在2005年提出的,用来描述一种用于创建交互式Web应用程序的技术组合。它之所以被称为"AJAX",有以下原因: Asynchronous&…...

自动化测试中如何编写配置文件 ? 该使用什么工具 ? 一文详解使用ConfigParser读写配置文件
1. 配置文件说明 只要是用编写项目,你就肯定离不开配置文件 。就以测试人员编写的自动化测试项目为例 ,如果你做连接数据库 、访问一些第三方接口、或者访问登录接口的用户名和密码。这些输入的信息最大特点就是都可能是变量,比如访问数据库…...

文件批量管理:轻松复制备份并删除原文件
在日常生活和工作中,我们经常需要处理大量的文件。为了确保文件的安全性和完整性,您需要一种高效的文件批量管理方法。本文将向您介绍如何一一复制备份并删除原文件里的文件,让您的文件管理变得轻松便捷。 首先,我们要进入文件批…...

Linux高性能服务器编程 学习笔记 第十七章 系统监测工具
tcpdump是一款经典的抓包工具,即使今天我们已经有了像Wireshark这样更易于使用和掌握的抓包工具,tcpdump仍是网络程序员的必备利器。 tcpdump提供了一些选项用以过滤数据包或定制输出格式,常见的选项如下: 1.-n:使用I…...
rabbitmq 消费者报错 ListenerExecutionFailedException NullPointerException
报错信息: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method private void com.xxx.service.impl.xxxServiceImpl.xxx(com.xxx.dto.XXX) threw exception at org.springframework.amqp.rabbit.listener.adapter.Mes…...

Java面试题:链表-合并两个排序的链表
描述 输入两个递增的链表,单个链表的长度为n,合并这两个链表并使新链表中的节点仍然是递增排序的。 示例 输入: {1,3,5}, {2,4,6}返回值: {1,2,3,4,5,6}原题地址:https://www.nowcoder.com/practice/d8b6b4358f7742…...

Springboot结合Mockito写单元测试实践和原理
文章目录 前言一、使用最佳实践使用场景SpyBean失效场景解决Mock失效的问题避免FactoryBean的实现方式使用MockBean,但是要指定name 个人推荐 二、原理1. MockBean2.SpyBean方法调用 总结 前言 相信看我博客的都是javaer,工作中一般都是使用Springboot框…...
Ubuntu系统下交叉编译openssl
一、参考资料 OpenSSL&&libcurl库的交叉编译 - hesetone - 博客园 二、准备工作 1. 编译环境 宿主机:Ubuntu 20.04.6 LTSHost:ARM32位交叉编译器:arm-linux-gnueabihf-gcc-11.1.0 2. 设置交叉编译工具链 在交叉编译之前&#x…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...

iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版分享
平时用 iPhone 的时候,难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵,或者买了二手 iPhone 却被原来的 iCloud 账号锁住,这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...

《通信之道——从微积分到 5G》读书总结
第1章 绪 论 1.1 这是一本什么样的书 通信技术,说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号(调制) 把信息从信号中抽取出来&am…...

P3 QT项目----记事本(3.8)
3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)
笔记整理:刘治强,浙江大学硕士生,研究方向为知识图谱表示学习,大语言模型 论文链接:http://arxiv.org/abs/2407.16127 发表会议:ISWC 2024 1. 动机 传统的知识图谱补全(KGC)模型通过…...
unix/linux,sudo,其发展历程详细时间线、由来、历史背景
sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...

html-<abbr> 缩写或首字母缩略词
定义与作用 <abbr> 标签用于表示缩写或首字母缩略词,它可以帮助用户更好地理解缩写的含义,尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时,会显示一个提示框。 示例&#x…...

AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...

中医有效性探讨
文章目录 西医是如何发展到以生物化学为药理基础的现代医学?传统医学奠基期(远古 - 17 世纪)近代医学转型期(17 世纪 - 19 世纪末)现代医学成熟期(20世纪至今) 中医的源远流长和一脉相承远古至…...