基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
文章目录
- 22:FineBI配置数据集
- 23:FineBI构建报表
- 24:FineBI实时配置测试
- 附录二:离线消费者完整代码
22:FineBI配置数据集
-
目标:实现FineBI访问MySQL结果数据集的配置
-
实施
-
安装FineBI
-
参考《FineBI Windows版本安装手册.docx》安装FineBI

-
-
配置连接



数据连接名称:Momo 用户名:root 密码:自己MySQL的密码 数据连接URL:jdbc:mysql://node1:3306/momo?useUnicode=true&characterEncoding=utf8

-
数据准备



SELECT id, momo_totalcount,momo_province,momo_username,momo_msgcount,CASE momo_grouptype WHEN '1' THEN '总消息量' WHEN '2' THEN '各省份发送量' WHEN '3' THEN '各省份接收量'WHEN '4' THEN '各用户发送量' WHEN '5' THEN '各用户接收量' END AS momo_grouptype FROM momo_count
-
-
小结
- 实现FineBI访问MySQL结果数据集的配置
23:FineBI构建报表
-
目标:实现FineBI实时报表构建
-
路径
- step1:实时报表构建
- step2:实时报表配置
- step3:实时刷新测试
-
实施
-
实时报表构建
-
新建仪表盘


-
添加标题


-
实时总消息数

-
发送消息最多的Top10用户








-
接受消息最多的Top10用户




-
各省份发送消息Top10



-
各省份接收消息Top10






-
各省份总消息量




-
-
-
小结
- 实现FineBI实时报表构建
24:FineBI实时配置测试
-
目标:实现实时报表测试
-
实施
-
实时报表配置
-
官方文档:https://help.fanruan.com/finebi/doc-view-363.html
-
添加jar包:将jar包放入FineBI安装目录的 webapps\webroot\WEB-INF\lib目录下

- 注意:如果提示已存在,就选择覆盖
-
添加JS文件
-
创建js文件:refresh.js
setTimeout(function () {var b =document.title;var a =BI.designConfigure.reportId;//获取仪表板id//这里要指定自己仪表盘的idif (a=="d574631848bd4e33acae54f986d34e69") {setInterval(function () {BI.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());//Data.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());BI.Utils.broadcastAllWidgets2Refresh(true);}, 3000);//5000000为定时刷新的频率,单位ms} }, 2000) -
将创建好的refresh.js文件放至 FineBI 安装目录%FineBI%/webapps/webroot中

-
关闭FineBI缓存,然后关闭FineBI

-
修改jar包,添加js




<!-- 增加刷新功能 --> <script type="text/javascript" src="/webroot/refresh.js"></script>
-
-
重启FineBI
-
-
-
实时刷新测试
-
清空MySQL结果表
-
启动Flink程序:运行MoMoFlinkCount
-
启动Flume程序
cd /export/server/flume-1.9.0-bin bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console -
启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 10
- 观察报表 -


-
小结
- 实现FineBI实时测试
## 附录一:Maven依赖```xml<!--远程仓库--><repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled><updatePolicy>never</updatePolicy></snapshots></repository></repositories><dependencies><!--Hbase 客户端--><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.1.0</version></dependency><!--kafka 客户端--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><!--JSON解析工具包--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><!--Flink依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>1.10.0</version></dependency><!-- flink操作hdfs、Kafka、MySQL、Redis,所需要导入该包--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><!--HTTP请求的的依赖--><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.4</version></dependency><!--MySQL连接驱动--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><target>1.8</target><source>1.8</source></configuration></plugin></plugins></build>
附录二:离线消费者完整代码
package bigdata.itcast.cn.momo.offline;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;/*** @ClassName MomoKafkaToHbase* @Description TODO 离线场景:消费Kafka的数据写入Hbase* @Create By Maynor*/
public class MomoKafkaToHbase {private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");private static Connection conn;private static Table table;private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名private static byte[] family = Bytes.toBytes("C1");//列族//todo:2-构建Hbase连接//静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能static{try {//构建配置对象Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");//构建连接conn = ConnectionFactory.createConnection(conf);//获取表对象table = conn.getTable(tableName);} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {//todo:1-构建消费者,获取数据consumerKafkaToHbase();
// String momoRowkey = getMomoRowkey("2020-08-13 12:30:00", "13071949728", "17719988692");
// System.out.println(momoRowkey);}/*** 用于消费Kafka的数据,将合法数据写入Hbase*/private static void consumerKafkaToHbase() throws Exception {//构建配置对象Properties props = new Properties();//指定服务端地址props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//指定消费者组的idprops.setProperty("group.id", "momo1");//关闭自动提交props.setProperty("enable.auto.commit", "false");//指定K和V反序列化的类型props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//构建消费者的连接KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//指定订阅哪些Topicconsumer.subscribe(Arrays.asList("MOMO_MSG"));//持续拉取数据while (true) {//向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间//拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));//todo:3-处理拉取到的数据:打印//取出每个分区的数据进行处理Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区//对每个分区的数据做处理for (TopicPartition partition : partitions) {List<ConsumerRecord<String, String>> partRecords = records.records(partition);//取出这个分区的所有数据//处理这个分区的数据long offset = 0;for (ConsumerRecord<String, String> record : partRecords) {//获取TopicString topic = record.topic();//获取分区int part = record.partition();//获取offsetoffset = record.offset();//获取KeyString key = record.key();//获取ValueString value = record.value();System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value);//将Value数据写入Hbaseif(value != null && !"".equals(value) && value.split("\001").length == 20 ){writeToHbase(value);}}//手动提交分区的commit offsetMap<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1));consumer.commitSync(offsets);}}}/*** 用于实现具体的写入Hbase的方法* @param value*/private static void writeToHbase(String value) throws Exception {//todo:3-写入Hbase//切分数据String[] items = value.split("\001");String stime = items[0];String sender_accounter = items[2];String receiver_accounter = items[11];//构建rowkeyString rowkey = getMomoRowkey(stime,sender_accounter,receiver_accounter);//构建PutPut put = new Put(Bytes.toBytes(rowkey));//添加列put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));//执行写入table.put(put);}/*** 基于消息时间、发送人id、接受人id构建rowkey* @param stime* @param sender_accounter* @param receiver_accounter* @return* @throws Exception*/private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {//转换时间戳long time = format.parse(stime).getTime();String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;//构建MD5String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);//合并返回return prefix+"_"+suffix;}
}相关文章:
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
文章目录 22:FineBI配置数据集23:FineBI构建报表24:FineBI实时配置测试附录二:离线消费者完整代码 22:FineBI配置数据集 目标:实现FineBI访问MySQL结果数据集的配置 实施 安装FineBI 参考《FineBI Windows…...
Python逆向爬虫案例: 某网站AES逆向解密
前言 嗨喽,大家好呀~这里是爱看美女的茜茜呐 环境使用: Python 3.8 Pycharm 👇 👇 👇 更多精彩机密、教程,尽在下方,赶紧点击了解吧~ python源码、视频教程、插件安装教程、资料我都准备好了࿰…...
ONNX runtime本地终端部署
1、class_index.csv文件: ID,English,Chinese 0,A,你 1,B,我 2,C,他 3,D,她2、classification.onnx 3、单张图像处理代码如下: import onnxruntime import torch import torch.nn.functional as F import pandas as pd from PIL import Image from tor…...
Linux性能优化--性能工具:特定进程CPU
4.0 概述 在用系统级性能工具找出是哪个进程降低了系统速度之后,你需要用特定进程性能工具来发现这个进程的行为。对此,Linux提供了丰富的工具用于追踪一个进程和应用程序性能的重要统计信息。 阅读本章后,你将能够: 确定应用程…...
技术人员转岗产品经理,有优势吗?
产品经理是一个非技术型的岗位,但是懂一些技术相关的知识会更好的和技术部门沟通,能更好的从技术部门的角度理解需求的可行性。所以这么说来,技术转产品经理相对来说更加有优势。 任何事情不可能都是只有好处没有坏处的,同样的&a…...
使用IDEA2022.1创建Maven工程出现卡死问题
使用IDEA创建Maven工程出现卡死问题,这个是一个bug 这里是别人和官方提供这个bug,大家可以参考一下 话不多说,上教程 解决方案: 方案1:更新idea版本 方案2:关闭工程,再新建,看图...
Nuttx Syscall
在Nuttx系统中,mksyscall工具用于根据syscall/syscall.csv文件生成供用户调用的接口和内核中对应的接口。具体来说,mksyscall -p system.csv生成供用户调用的接口,而mksyscall -s system.csv生成内核中调用的接口。 在syscall/syscall.csv文…...
HTTP协议中GET请求和POST请求的区别
1. 形式上: GET请求:参数包含在URL中,意味着参数的长度是有限的,并且参数只能是ASCII码的形式。 POST请求:参数包含在请求体中,参数的长度是不受限,并且参数支持多种数据类型。 2.安全性 GET请…...
【广州华锐互动】利用VR开展施工现场安全培训,提高员工安全意识水平
随着科技的不断发展,虚拟现实(VR)技术已经逐渐渗透到各个领域,为我们带来了前所未有的沉浸式体验。在建筑施工行业,VR技术的应用也日益广泛,从设计、施工到管理,都可以看到VR技术的身影。而在这…...
Cornerstone for Mac:高效SVN管理的黄金标准
在当今的软件开发领域,版本控制系统是不可或缺的一部分。其中,Subversion(SVN)是一个广泛使用的版本控制系统,有助于团队协同工作,实现代码的版本管理和追踪。对于Mac用户来说,Cornerstone是一款…...
数据结构之顺序表的模拟实现
💕"世事犹如书籍,一页页被翻过去。人要向前看,少翻历史旧账。"💕 作者:Mylvzi 文章主要内容:数据结构之顺序表的模拟实现 /*** Created with IntelliJ IDEA.* Description:* User: 绿字* Date:…...
R6G azide, 5-isomer具有良好的水溶性,2135330-71-9
试剂 | 基础知识概述(部分): 英文名称:R6G azide, 5-isomer CAS:2135330-71-9 分子式:C30H32N6O4 分子量:540.61 规格标准:10mg,25mg,50mg,可提供mg级以…...
Canvas系列绘制图片学习:绘制图片和渐变效果
我们现在已经可以绘制好多东西了,不过在实际开发中,绘制最多的当然是图片了,这章我们就讲讲图片的绘制。 绘制图片 绘制图片的API是drawImage,它的参数有三种情况: // 将图片绘制在canvas的(dX, dY)坐标处 context.…...
AJAX为什么叫AJAX
AJAX(Asynchronous JavaScript and XML)这个名字是由美国程序员Jesse James Garrett在2005年提出的,用来描述一种用于创建交互式Web应用程序的技术组合。它之所以被称为"AJAX",有以下原因: Asynchronous&…...
自动化测试中如何编写配置文件 ? 该使用什么工具 ? 一文详解使用ConfigParser读写配置文件
1. 配置文件说明 只要是用编写项目,你就肯定离不开配置文件 。就以测试人员编写的自动化测试项目为例 ,如果你做连接数据库 、访问一些第三方接口、或者访问登录接口的用户名和密码。这些输入的信息最大特点就是都可能是变量,比如访问数据库…...
文件批量管理:轻松复制备份并删除原文件
在日常生活和工作中,我们经常需要处理大量的文件。为了确保文件的安全性和完整性,您需要一种高效的文件批量管理方法。本文将向您介绍如何一一复制备份并删除原文件里的文件,让您的文件管理变得轻松便捷。 首先,我们要进入文件批…...
Linux高性能服务器编程 学习笔记 第十七章 系统监测工具
tcpdump是一款经典的抓包工具,即使今天我们已经有了像Wireshark这样更易于使用和掌握的抓包工具,tcpdump仍是网络程序员的必备利器。 tcpdump提供了一些选项用以过滤数据包或定制输出格式,常见的选项如下: 1.-n:使用I…...
rabbitmq 消费者报错 ListenerExecutionFailedException NullPointerException
报错信息: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method private void com.xxx.service.impl.xxxServiceImpl.xxx(com.xxx.dto.XXX) threw exception at org.springframework.amqp.rabbit.listener.adapter.Mes…...
Java面试题:链表-合并两个排序的链表
描述 输入两个递增的链表,单个链表的长度为n,合并这两个链表并使新链表中的节点仍然是递增排序的。 示例 输入: {1,3,5}, {2,4,6}返回值: {1,2,3,4,5,6}原题地址:https://www.nowcoder.com/practice/d8b6b4358f7742…...
Springboot结合Mockito写单元测试实践和原理
文章目录 前言一、使用最佳实践使用场景SpyBean失效场景解决Mock失效的问题避免FactoryBean的实现方式使用MockBean,但是要指定name 个人推荐 二、原理1. MockBean2.SpyBean方法调用 总结 前言 相信看我博客的都是javaer,工作中一般都是使用Springboot框…...
Python|GIF 解析与构建(5):手搓截屏和帧率控制
目录 Python|GIF 解析与构建(5):手搓截屏和帧率控制 一、引言 二、技术实现:手搓截屏模块 2.1 核心原理 2.2 代码解析:ScreenshotData类 2.2.1 截图函数:capture_screen 三、技术实现&…...
label-studio的使用教程(导入本地路径)
文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...
VB.net复制Ntag213卡写入UID
本示例使用的发卡器:https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...
Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...
【JavaSE】绘图与事件入门学习笔记
-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角,以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向,距离坐标原点x个像素;第二个是y坐标,表示当前位置为垂直方向,距离坐标原点y个像素。 坐标体系-像素 …...
【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分
一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计,提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合:各模块职责清晰,便于独立开发…...
力扣热题100 k个一组反转链表题解
题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...
pikachu靶场通关笔记19 SQL注入02-字符型注入(GET)
目录 一、SQL注入 二、字符型SQL注入 三、字符型注入与数字型注入 四、源码分析 五、渗透实战 1、渗透准备 2、SQL注入探测 (1)输入单引号 (2)万能注入语句 3、获取回显列orderby 4、获取数据库名database 5、获取表名…...
mac:大模型系列测试
0 MAC 前几天经过学生优惠以及国补17K入手了mac studio,然后这两天亲自测试其模型行运用能力如何,是否支持微调、推理速度等能力。下面进入正文。 1 mac 与 unsloth 按照下面的进行安装以及测试,是可以跑通文章里面的代码。训练速度也是很快的。 注意…...
Sklearn 机器学习 缺失值处理 获取填充失值的统计值
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 使用 Scikit-learn 处理缺失值并提取填充统计信息的完整指南 在机器学习项目中,数据清…...
