基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
文章目录
- 22:FineBI配置数据集
- 23:FineBI构建报表
- 24:FineBI实时配置测试
- 附录二:离线消费者完整代码
22:FineBI配置数据集
-
目标:实现FineBI访问MySQL结果数据集的配置
-
实施
-
安装FineBI
-
参考《FineBI Windows版本安装手册.docx》安装FineBI
-
-
配置连接
数据连接名称:Momo 用户名:root 密码:自己MySQL的密码 数据连接URL:jdbc:mysql://node1:3306/momo?useUnicode=true&characterEncoding=utf8
-
数据准备
SELECT id, momo_totalcount,momo_province,momo_username,momo_msgcount,CASE momo_grouptype WHEN '1' THEN '总消息量' WHEN '2' THEN '各省份发送量' WHEN '3' THEN '各省份接收量'WHEN '4' THEN '各用户发送量' WHEN '5' THEN '各用户接收量' END AS momo_grouptype FROM momo_count
-
-
小结
- 实现FineBI访问MySQL结果数据集的配置
23:FineBI构建报表
-
目标:实现FineBI实时报表构建
-
路径
- step1:实时报表构建
- step2:实时报表配置
- step3:实时刷新测试
-
实施
-
实时报表构建
-
新建仪表盘
-
添加标题
-
实时总消息数
-
发送消息最多的Top10用户
-
接受消息最多的Top10用户
-
各省份发送消息Top10
-
各省份接收消息Top10
-
各省份总消息量
-
-
-
小结
- 实现FineBI实时报表构建
24:FineBI实时配置测试
-
目标:实现实时报表测试
-
实施
-
实时报表配置
-
官方文档:https://help.fanruan.com/finebi/doc-view-363.html
-
添加jar包:将jar包放入FineBI安装目录的 webapps\webroot\WEB-INF\lib目录下
- 注意:如果提示已存在,就选择覆盖
-
添加JS文件
-
创建js文件:refresh.js
setTimeout(function () {var b =document.title;var a =BI.designConfigure.reportId;//获取仪表板id//这里要指定自己仪表盘的idif (a=="d574631848bd4e33acae54f986d34e69") {setInterval(function () {BI.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());//Data.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());BI.Utils.broadcastAllWidgets2Refresh(true);}, 3000);//5000000为定时刷新的频率,单位ms} }, 2000)
-
将创建好的refresh.js文件放至 FineBI 安装目录%FineBI%/webapps/webroot中
-
关闭FineBI缓存,然后关闭FineBI
-
修改jar包,添加js
<!-- 增加刷新功能 --> <script type="text/javascript" src="/webroot/refresh.js"></script>
-
-
重启FineBI
-
-
-
实时刷新测试
-
清空MySQL结果表
-
启动Flink程序:运行MoMoFlinkCount
-
启动Flume程序
cd /export/server/flume-1.9.0-bin bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
-
启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 10
- 观察报表
-
-
小结
- 实现FineBI实时测试
## 附录一:Maven依赖```xml<!--远程仓库--><repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled><updatePolicy>never</updatePolicy></snapshots></repository></repositories><dependencies><!--Hbase 客户端--><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.1.0</version></dependency><!--kafka 客户端--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><!--JSON解析工具包--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><!--Flink依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>1.10.0</version></dependency><!-- flink操作hdfs、Kafka、MySQL、Redis,所需要导入该包--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><!--HTTP请求的的依赖--><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.4</version></dependency><!--MySQL连接驱动--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><target>1.8</target><source>1.8</source></configuration></plugin></plugins></build>
附录二:离线消费者完整代码
package bigdata.itcast.cn.momo.offline;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;/*** @ClassName MomoKafkaToHbase* @Description TODO 离线场景:消费Kafka的数据写入Hbase* @Create By Maynor*/
public class MomoKafkaToHbase {private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");private static Connection conn;private static Table table;private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名private static byte[] family = Bytes.toBytes("C1");//列族//todo:2-构建Hbase连接//静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能static{try {//构建配置对象Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");//构建连接conn = ConnectionFactory.createConnection(conf);//获取表对象table = conn.getTable(tableName);} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {//todo:1-构建消费者,获取数据consumerKafkaToHbase();
// String momoRowkey = getMomoRowkey("2020-08-13 12:30:00", "13071949728", "17719988692");
// System.out.println(momoRowkey);}/*** 用于消费Kafka的数据,将合法数据写入Hbase*/private static void consumerKafkaToHbase() throws Exception {//构建配置对象Properties props = new Properties();//指定服务端地址props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//指定消费者组的idprops.setProperty("group.id", "momo1");//关闭自动提交props.setProperty("enable.auto.commit", "false");//指定K和V反序列化的类型props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//构建消费者的连接KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//指定订阅哪些Topicconsumer.subscribe(Arrays.asList("MOMO_MSG"));//持续拉取数据while (true) {//向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间//拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));//todo:3-处理拉取到的数据:打印//取出每个分区的数据进行处理Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区//对每个分区的数据做处理for (TopicPartition partition : partitions) {List<ConsumerRecord<String, String>> partRecords = records.records(partition);//取出这个分区的所有数据//处理这个分区的数据long offset = 0;for (ConsumerRecord<String, String> record : partRecords) {//获取TopicString topic = record.topic();//获取分区int part = record.partition();//获取offsetoffset = record.offset();//获取KeyString key = record.key();//获取ValueString value = record.value();System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value);//将Value数据写入Hbaseif(value != null && !"".equals(value) && value.split("\001").length == 20 ){writeToHbase(value);}}//手动提交分区的commit offsetMap<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1));consumer.commitSync(offsets);}}}/*** 用于实现具体的写入Hbase的方法* @param value*/private static void writeToHbase(String value) throws Exception {//todo:3-写入Hbase//切分数据String[] items = value.split("\001");String stime = items[0];String sender_accounter = items[2];String receiver_accounter = items[11];//构建rowkeyString rowkey = getMomoRowkey(stime,sender_accounter,receiver_accounter);//构建PutPut put = new Put(Bytes.toBytes(rowkey));//添加列put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));//执行写入table.put(put);}/*** 基于消息时间、发送人id、接受人id构建rowkey* @param stime* @param sender_accounter* @param receiver_accounter* @return* @throws Exception*/private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {//转换时间戳long time = format.parse(stime).getTime();String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;//构建MD5String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);//合并返回return prefix+"_"+suffix;}
}
相关文章:

基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
文章目录 22:FineBI配置数据集23:FineBI构建报表24:FineBI实时配置测试附录二:离线消费者完整代码 22:FineBI配置数据集 目标:实现FineBI访问MySQL结果数据集的配置 实施 安装FineBI 参考《FineBI Windows…...

Python逆向爬虫案例: 某网站AES逆向解密
前言 嗨喽,大家好呀~这里是爱看美女的茜茜呐 环境使用: Python 3.8 Pycharm 👇 👇 👇 更多精彩机密、教程,尽在下方,赶紧点击了解吧~ python源码、视频教程、插件安装教程、资料我都准备好了࿰…...
ONNX runtime本地终端部署
1、class_index.csv文件: ID,English,Chinese 0,A,你 1,B,我 2,C,他 3,D,她2、classification.onnx 3、单张图像处理代码如下: import onnxruntime import torch import torch.nn.functional as F import pandas as pd from PIL import Image from tor…...

Linux性能优化--性能工具:特定进程CPU
4.0 概述 在用系统级性能工具找出是哪个进程降低了系统速度之后,你需要用特定进程性能工具来发现这个进程的行为。对此,Linux提供了丰富的工具用于追踪一个进程和应用程序性能的重要统计信息。 阅读本章后,你将能够: 确定应用程…...
技术人员转岗产品经理,有优势吗?
产品经理是一个非技术型的岗位,但是懂一些技术相关的知识会更好的和技术部门沟通,能更好的从技术部门的角度理解需求的可行性。所以这么说来,技术转产品经理相对来说更加有优势。 任何事情不可能都是只有好处没有坏处的,同样的&a…...

使用IDEA2022.1创建Maven工程出现卡死问题
使用IDEA创建Maven工程出现卡死问题,这个是一个bug 这里是别人和官方提供这个bug,大家可以参考一下 话不多说,上教程 解决方案: 方案1:更新idea版本 方案2:关闭工程,再新建,看图...
Nuttx Syscall
在Nuttx系统中,mksyscall工具用于根据syscall/syscall.csv文件生成供用户调用的接口和内核中对应的接口。具体来说,mksyscall -p system.csv生成供用户调用的接口,而mksyscall -s system.csv生成内核中调用的接口。 在syscall/syscall.csv文…...
HTTP协议中GET请求和POST请求的区别
1. 形式上: GET请求:参数包含在URL中,意味着参数的长度是有限的,并且参数只能是ASCII码的形式。 POST请求:参数包含在请求体中,参数的长度是不受限,并且参数支持多种数据类型。 2.安全性 GET请…...

【广州华锐互动】利用VR开展施工现场安全培训,提高员工安全意识水平
随着科技的不断发展,虚拟现实(VR)技术已经逐渐渗透到各个领域,为我们带来了前所未有的沉浸式体验。在建筑施工行业,VR技术的应用也日益广泛,从设计、施工到管理,都可以看到VR技术的身影。而在这…...

Cornerstone for Mac:高效SVN管理的黄金标准
在当今的软件开发领域,版本控制系统是不可或缺的一部分。其中,Subversion(SVN)是一个广泛使用的版本控制系统,有助于团队协同工作,实现代码的版本管理和追踪。对于Mac用户来说,Cornerstone是一款…...

数据结构之顺序表的模拟实现
💕"世事犹如书籍,一页页被翻过去。人要向前看,少翻历史旧账。"💕 作者:Mylvzi 文章主要内容:数据结构之顺序表的模拟实现 /*** Created with IntelliJ IDEA.* Description:* User: 绿字* Date:…...

R6G azide, 5-isomer具有良好的水溶性,2135330-71-9
试剂 | 基础知识概述(部分): 英文名称:R6G azide, 5-isomer CAS:2135330-71-9 分子式:C30H32N6O4 分子量:540.61 规格标准:10mg,25mg,50mg,可提供mg级以…...

Canvas系列绘制图片学习:绘制图片和渐变效果
我们现在已经可以绘制好多东西了,不过在实际开发中,绘制最多的当然是图片了,这章我们就讲讲图片的绘制。 绘制图片 绘制图片的API是drawImage,它的参数有三种情况: // 将图片绘制在canvas的(dX, dY)坐标处 context.…...
AJAX为什么叫AJAX
AJAX(Asynchronous JavaScript and XML)这个名字是由美国程序员Jesse James Garrett在2005年提出的,用来描述一种用于创建交互式Web应用程序的技术组合。它之所以被称为"AJAX",有以下原因: Asynchronous&…...

自动化测试中如何编写配置文件 ? 该使用什么工具 ? 一文详解使用ConfigParser读写配置文件
1. 配置文件说明 只要是用编写项目,你就肯定离不开配置文件 。就以测试人员编写的自动化测试项目为例 ,如果你做连接数据库 、访问一些第三方接口、或者访问登录接口的用户名和密码。这些输入的信息最大特点就是都可能是变量,比如访问数据库…...

文件批量管理:轻松复制备份并删除原文件
在日常生活和工作中,我们经常需要处理大量的文件。为了确保文件的安全性和完整性,您需要一种高效的文件批量管理方法。本文将向您介绍如何一一复制备份并删除原文件里的文件,让您的文件管理变得轻松便捷。 首先,我们要进入文件批…...

Linux高性能服务器编程 学习笔记 第十七章 系统监测工具
tcpdump是一款经典的抓包工具,即使今天我们已经有了像Wireshark这样更易于使用和掌握的抓包工具,tcpdump仍是网络程序员的必备利器。 tcpdump提供了一些选项用以过滤数据包或定制输出格式,常见的选项如下: 1.-n:使用I…...
rabbitmq 消费者报错 ListenerExecutionFailedException NullPointerException
报错信息: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method private void com.xxx.service.impl.xxxServiceImpl.xxx(com.xxx.dto.XXX) threw exception at org.springframework.amqp.rabbit.listener.adapter.Mes…...

Java面试题:链表-合并两个排序的链表
描述 输入两个递增的链表,单个链表的长度为n,合并这两个链表并使新链表中的节点仍然是递增排序的。 示例 输入: {1,3,5}, {2,4,6}返回值: {1,2,3,4,5,6}原题地址:https://www.nowcoder.com/practice/d8b6b4358f7742…...

Springboot结合Mockito写单元测试实践和原理
文章目录 前言一、使用最佳实践使用场景SpyBean失效场景解决Mock失效的问题避免FactoryBean的实现方式使用MockBean,但是要指定name 个人推荐 二、原理1. MockBean2.SpyBean方法调用 总结 前言 相信看我博客的都是javaer,工作中一般都是使用Springboot框…...
[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?
🧠 智能合约中的数据是如何在区块链中保持一致的? 为什么所有区块链节点都能得出相同结果?合约调用这么复杂,状态真能保持一致吗?本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...
音视频——I2S 协议详解
I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议,专门用于在数字音频设备之间传输数字音频数据。它由飞利浦(Philips)公司开发,以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...

七、数据库的完整性
七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...
Redis:现代应用开发的高效内存数据存储利器
一、Redis的起源与发展 Redis最初由意大利程序员Salvatore Sanfilippo在2009年开发,其初衷是为了满足他自己的一个项目需求,即需要一个高性能的键值存储系统来解决传统数据库在高并发场景下的性能瓶颈。随着项目的开源,Redis凭借其简单易用、…...

淘宝扭蛋机小程序系统开发:打造互动性强的购物平台
淘宝扭蛋机小程序系统的开发,旨在打造一个互动性强的购物平台,让用户在购物的同时,能够享受到更多的乐趣和惊喜。 淘宝扭蛋机小程序系统拥有丰富的互动功能。用户可以通过虚拟摇杆操作扭蛋机,实现旋转、抽拉等动作,增…...
小木的算法日记-多叉树的递归/层序遍历
🌲 从二叉树到森林:一文彻底搞懂多叉树遍历的艺术 🚀 引言 你好,未来的算法大神! 在数据结构的世界里,“树”无疑是最核心、最迷人的概念之一。我们中的大多数人都是从 二叉树 开始入门的,它…...
鸿蒙(HarmonyOS5)实现跳一跳小游戏
下面我将介绍如何使用鸿蒙的ArkUI框架,实现一个简单的跳一跳小游戏。 1. 项目结构 src/main/ets/ ├── MainAbility │ ├── pages │ │ ├── Index.ets // 主页面 │ │ └── GamePage.ets // 游戏页面 │ └── model │ …...
ThreadLocal 源码
ThreadLocal 源码 此类提供线程局部变量。这些变量不同于它们的普通对应物,因为每个访问一个线程局部变量的线程(通过其 get 或 set 方法)都有自己独立初始化的变量副本。ThreadLocal 实例通常是类中的私有静态字段,这些类希望将…...