基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
文章目录
- 22:FineBI配置数据集
- 23:FineBI构建报表
- 24:FineBI实时配置测试
- 附录二:离线消费者完整代码
22:FineBI配置数据集
-
目标:实现FineBI访问MySQL结果数据集的配置
-
实施
-
安装FineBI
-
参考《FineBI Windows版本安装手册.docx》安装FineBI
-
-
配置连接
数据连接名称:Momo 用户名:root 密码:自己MySQL的密码 数据连接URL:jdbc:mysql://node1:3306/momo?useUnicode=true&characterEncoding=utf8
-
数据准备
SELECT id, momo_totalcount,momo_province,momo_username,momo_msgcount,CASE momo_grouptype WHEN '1' THEN '总消息量' WHEN '2' THEN '各省份发送量' WHEN '3' THEN '各省份接收量'WHEN '4' THEN '各用户发送量' WHEN '5' THEN '各用户接收量' END AS momo_grouptype FROM momo_count
-
-
小结
- 实现FineBI访问MySQL结果数据集的配置
23:FineBI构建报表
-
目标:实现FineBI实时报表构建
-
路径
- step1:实时报表构建
- step2:实时报表配置
- step3:实时刷新测试
-
实施
-
实时报表构建
-
新建仪表盘
-
添加标题
-
实时总消息数
-
发送消息最多的Top10用户
-
接受消息最多的Top10用户
-
各省份发送消息Top10
-
各省份接收消息Top10
-
各省份总消息量
-
-
-
小结
- 实现FineBI实时报表构建
24:FineBI实时配置测试
-
目标:实现实时报表测试
-
实施
-
实时报表配置
-
官方文档:https://help.fanruan.com/finebi/doc-view-363.html
-
添加jar包:将jar包放入FineBI安装目录的 webapps\webroot\WEB-INF\lib目录下
- 注意:如果提示已存在,就选择覆盖
-
添加JS文件
-
创建js文件:refresh.js
setTimeout(function () {var b =document.title;var a =BI.designConfigure.reportId;//获取仪表板id//这里要指定自己仪表盘的idif (a=="d574631848bd4e33acae54f986d34e69") {setInterval(function () {BI.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());//Data.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());BI.Utils.broadcastAllWidgets2Refresh(true);}, 3000);//5000000为定时刷新的频率,单位ms} }, 2000)
-
将创建好的refresh.js文件放至 FineBI 安装目录%FineBI%/webapps/webroot中
-
关闭FineBI缓存,然后关闭FineBI
-
修改jar包,添加js
<!-- 增加刷新功能 --> <script type="text/javascript" src="/webroot/refresh.js"></script>
-
-
重启FineBI
-
-
-
实时刷新测试
-
清空MySQL结果表
-
启动Flink程序:运行MoMoFlinkCount
-
启动Flume程序
cd /export/server/flume-1.9.0-bin bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
-
启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 10
- 观察报表
-
-
小结
- 实现FineBI实时测试
## 附录一:Maven依赖```xml<!--远程仓库--><repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled><updatePolicy>never</updatePolicy></snapshots></repository></repositories><dependencies><!--Hbase 客户端--><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.1.0</version></dependency><!--kafka 客户端--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><!--JSON解析工具包--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><!--Flink依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>1.10.0</version></dependency><!-- flink操作hdfs、Kafka、MySQL、Redis,所需要导入该包--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><!--HTTP请求的的依赖--><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.4</version></dependency><!--MySQL连接驱动--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><target>1.8</target><source>1.8</source></configuration></plugin></plugins></build>
附录二:离线消费者完整代码
package bigdata.itcast.cn.momo.offline;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;/*** @ClassName MomoKafkaToHbase* @Description TODO 离线场景:消费Kafka的数据写入Hbase* @Create By Maynor*/
public class MomoKafkaToHbase {private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");private static Connection conn;private static Table table;private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名private static byte[] family = Bytes.toBytes("C1");//列族//todo:2-构建Hbase连接//静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能static{try {//构建配置对象Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");//构建连接conn = ConnectionFactory.createConnection(conf);//获取表对象table = conn.getTable(tableName);} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {//todo:1-构建消费者,获取数据consumerKafkaToHbase();
// String momoRowkey = getMomoRowkey("2020-08-13 12:30:00", "13071949728", "17719988692");
// System.out.println(momoRowkey);}/*** 用于消费Kafka的数据,将合法数据写入Hbase*/private static void consumerKafkaToHbase() throws Exception {//构建配置对象Properties props = new Properties();//指定服务端地址props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//指定消费者组的idprops.setProperty("group.id", "momo1");//关闭自动提交props.setProperty("enable.auto.commit", "false");//指定K和V反序列化的类型props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//构建消费者的连接KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//指定订阅哪些Topicconsumer.subscribe(Arrays.asList("MOMO_MSG"));//持续拉取数据while (true) {//向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间//拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));//todo:3-处理拉取到的数据:打印//取出每个分区的数据进行处理Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区//对每个分区的数据做处理for (TopicPartition partition : partitions) {List<ConsumerRecord<String, String>> partRecords = records.records(partition);//取出这个分区的所有数据//处理这个分区的数据long offset = 0;for (ConsumerRecord<String, String> record : partRecords) {//获取TopicString topic = record.topic();//获取分区int part = record.partition();//获取offsetoffset = record.offset();//获取KeyString key = record.key();//获取ValueString value = record.value();System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value);//将Value数据写入Hbaseif(value != null && !"".equals(value) && value.split("\001").length == 20 ){writeToHbase(value);}}//手动提交分区的commit offsetMap<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1));consumer.commitSync(offsets);}}}/*** 用于实现具体的写入Hbase的方法* @param value*/private static void writeToHbase(String value) throws Exception {//todo:3-写入Hbase//切分数据String[] items = value.split("\001");String stime = items[0];String sender_accounter = items[2];String receiver_accounter = items[11];//构建rowkeyString rowkey = getMomoRowkey(stime,sender_accounter,receiver_accounter);//构建PutPut put = new Put(Bytes.toBytes(rowkey));//添加列put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));//执行写入table.put(put);}/*** 基于消息时间、发送人id、接受人id构建rowkey* @param stime* @param sender_accounter* @param receiver_accounter* @return* @throws Exception*/private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {//转换时间戳long time = format.parse(stime).getTime();String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;//构建MD5String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);//合并返回return prefix+"_"+suffix;}
}
相关文章:

基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
文章目录 22:FineBI配置数据集23:FineBI构建报表24:FineBI实时配置测试附录二:离线消费者完整代码 22:FineBI配置数据集 目标:实现FineBI访问MySQL结果数据集的配置 实施 安装FineBI 参考《FineBI Windows…...

Python逆向爬虫案例: 某网站AES逆向解密
前言 嗨喽,大家好呀~这里是爱看美女的茜茜呐 环境使用: Python 3.8 Pycharm 👇 👇 👇 更多精彩机密、教程,尽在下方,赶紧点击了解吧~ python源码、视频教程、插件安装教程、资料我都准备好了࿰…...

ONNX runtime本地终端部署
1、class_index.csv文件: ID,English,Chinese 0,A,你 1,B,我 2,C,他 3,D,她2、classification.onnx 3、单张图像处理代码如下: import onnxruntime import torch import torch.nn.functional as F import pandas as pd from PIL import Image from tor…...

Linux性能优化--性能工具:特定进程CPU
4.0 概述 在用系统级性能工具找出是哪个进程降低了系统速度之后,你需要用特定进程性能工具来发现这个进程的行为。对此,Linux提供了丰富的工具用于追踪一个进程和应用程序性能的重要统计信息。 阅读本章后,你将能够: 确定应用程…...

技术人员转岗产品经理,有优势吗?
产品经理是一个非技术型的岗位,但是懂一些技术相关的知识会更好的和技术部门沟通,能更好的从技术部门的角度理解需求的可行性。所以这么说来,技术转产品经理相对来说更加有优势。 任何事情不可能都是只有好处没有坏处的,同样的&a…...

使用IDEA2022.1创建Maven工程出现卡死问题
使用IDEA创建Maven工程出现卡死问题,这个是一个bug 这里是别人和官方提供这个bug,大家可以参考一下 话不多说,上教程 解决方案: 方案1:更新idea版本 方案2:关闭工程,再新建,看图...

Nuttx Syscall
在Nuttx系统中,mksyscall工具用于根据syscall/syscall.csv文件生成供用户调用的接口和内核中对应的接口。具体来说,mksyscall -p system.csv生成供用户调用的接口,而mksyscall -s system.csv生成内核中调用的接口。 在syscall/syscall.csv文…...

HTTP协议中GET请求和POST请求的区别
1. 形式上: GET请求:参数包含在URL中,意味着参数的长度是有限的,并且参数只能是ASCII码的形式。 POST请求:参数包含在请求体中,参数的长度是不受限,并且参数支持多种数据类型。 2.安全性 GET请…...

【广州华锐互动】利用VR开展施工现场安全培训,提高员工安全意识水平
随着科技的不断发展,虚拟现实(VR)技术已经逐渐渗透到各个领域,为我们带来了前所未有的沉浸式体验。在建筑施工行业,VR技术的应用也日益广泛,从设计、施工到管理,都可以看到VR技术的身影。而在这…...

Cornerstone for Mac:高效SVN管理的黄金标准
在当今的软件开发领域,版本控制系统是不可或缺的一部分。其中,Subversion(SVN)是一个广泛使用的版本控制系统,有助于团队协同工作,实现代码的版本管理和追踪。对于Mac用户来说,Cornerstone是一款…...

数据结构之顺序表的模拟实现
💕"世事犹如书籍,一页页被翻过去。人要向前看,少翻历史旧账。"💕 作者:Mylvzi 文章主要内容:数据结构之顺序表的模拟实现 /*** Created with IntelliJ IDEA.* Description:* User: 绿字* Date:…...

R6G azide, 5-isomer具有良好的水溶性,2135330-71-9
试剂 | 基础知识概述(部分): 英文名称:R6G azide, 5-isomer CAS:2135330-71-9 分子式:C30H32N6O4 分子量:540.61 规格标准:10mg,25mg,50mg,可提供mg级以…...

Canvas系列绘制图片学习:绘制图片和渐变效果
我们现在已经可以绘制好多东西了,不过在实际开发中,绘制最多的当然是图片了,这章我们就讲讲图片的绘制。 绘制图片 绘制图片的API是drawImage,它的参数有三种情况: // 将图片绘制在canvas的(dX, dY)坐标处 context.…...

AJAX为什么叫AJAX
AJAX(Asynchronous JavaScript and XML)这个名字是由美国程序员Jesse James Garrett在2005年提出的,用来描述一种用于创建交互式Web应用程序的技术组合。它之所以被称为"AJAX",有以下原因: Asynchronous&…...

自动化测试中如何编写配置文件 ? 该使用什么工具 ? 一文详解使用ConfigParser读写配置文件
1. 配置文件说明 只要是用编写项目,你就肯定离不开配置文件 。就以测试人员编写的自动化测试项目为例 ,如果你做连接数据库 、访问一些第三方接口、或者访问登录接口的用户名和密码。这些输入的信息最大特点就是都可能是变量,比如访问数据库…...

文件批量管理:轻松复制备份并删除原文件
在日常生活和工作中,我们经常需要处理大量的文件。为了确保文件的安全性和完整性,您需要一种高效的文件批量管理方法。本文将向您介绍如何一一复制备份并删除原文件里的文件,让您的文件管理变得轻松便捷。 首先,我们要进入文件批…...

Linux高性能服务器编程 学习笔记 第十七章 系统监测工具
tcpdump是一款经典的抓包工具,即使今天我们已经有了像Wireshark这样更易于使用和掌握的抓包工具,tcpdump仍是网络程序员的必备利器。 tcpdump提供了一些选项用以过滤数据包或定制输出格式,常见的选项如下: 1.-n:使用I…...

rabbitmq 消费者报错 ListenerExecutionFailedException NullPointerException
报错信息: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method private void com.xxx.service.impl.xxxServiceImpl.xxx(com.xxx.dto.XXX) threw exception at org.springframework.amqp.rabbit.listener.adapter.Mes…...

Java面试题:链表-合并两个排序的链表
描述 输入两个递增的链表,单个链表的长度为n,合并这两个链表并使新链表中的节点仍然是递增排序的。 示例 输入: {1,3,5}, {2,4,6}返回值: {1,2,3,4,5,6}原题地址:https://www.nowcoder.com/practice/d8b6b4358f7742…...

Springboot结合Mockito写单元测试实践和原理
文章目录 前言一、使用最佳实践使用场景SpyBean失效场景解决Mock失效的问题避免FactoryBean的实现方式使用MockBean,但是要指定name 个人推荐 二、原理1. MockBean2.SpyBean方法调用 总结 前言 相信看我博客的都是javaer,工作中一般都是使用Springboot框…...

操作系统之微内核架构
宏内核相反,微内核架构提倡功能尽可能少,只提供进程调度、处理中断、内存映射、进程间通信等功能。微内核架构是不能够提供什么实际功能的,而内存管理、进程管理、设备管理和文件管理服务等,都被做成一个个服务进程,它…...

24---WPF缓存
一、什么是缓存: 1.缓存指的是将需要频繁访问的网络内容存放在离用户较近、访问速度更快的系统中,以提高内容访问速度的一种技术。缓存服务器就是存放频繁访问内容的服务器。 2.缓存就是一个临时存放区域--离用户比较近 二、作用--意义---如果系统出现故…...

vite+vue3.0 使用tailwindcss
参考资料: 安装 - TailwindCSS中文文档 | TailwindCSS中文网 npm install -D tailwindcss npm install postcss npm install autoprefixer npx tailwindcss init -p 生成/src/tailwind.config.js和/src/postcss.config.js配置文件 在/src/tailwind.config.js配置文件…...

C++QT---QT-day3
使用手动连接,将登录框中的取消按钮使用qt4版本的连接到自定义的槽函数中,在自定义的槽函数中调用关闭函数将登录按钮使用qt5版本的连接到自定义的槽函数中,在槽函数中判断ui界面上输入的账号是否为"admin",密码是否为&…...

小程序如何搭建在服务器上
小程序可以通过搭建在服务器上,来实现跨平台的访问和使用。以下是搭建小程序在服务器上的步骤: 安装Node.js:首先,你需要在服务器上安装Node.js。你可以从Node.js的官方网站下载并安装。 安装微信开发者工具:然后&…...

JavaEE初阶学习:Servlet
1.Servlet 是什么 Servlet 是一种 Java 程序,用于在 Web 服务器上处理客户端请求和响应。Servlet 可以接收来自客户端(浏览器、移动应用等)的 HTTP 请求,并生成 HTML 页面或其他格式的数据,然后将响应发送回客户端。S…...

黑白二维码不好看,那么快学习改色的方法吧
现在经常会看到很多的二维码不是黑白图案,可以是其他纯色或者渐变色等样式的,那么怎么将黑白二维码改成其他鲜艳好看的颜色呢?一般想要修改普通样式的二维码可以用二维码美化生成器来处理,只需要上传二维码图片,就可以…...

coreldraw2024版本有哪些新增功能?
有小伙伴在用电脑查找软件程序的时候,看到了一款叫cdr软件的应用,自己之前没接触过,不知道cdr是什么软件?cdr软件是干什么的?十分好奇。其实它是一款平面设计软件,下面就给大家介绍下相关的cdr软件的知识。…...

2023最新Office2021专业增强版安装使用教程
Microsoft Office专业增强版2021是一套办公软件套装,包含了Word、Excel、PowerPoint、Outlook、Access、Publisher、OneNote、Teams等应用程序。这个版本是在Office 365的基础上推出的新版本,与之前的Office版本相比,增强了许多功能。也是目前…...

实时配送跟踪功能的实现:外卖跑腿小程序的技术挑战
在当今数字化时代,外卖和跑腿服务已经成为了生活中不可或缺的一部分。为了提供更好的用户体验,外卖跑腿小程序越来越注重实时配送跟踪功能的实现。这项技术挑战旨在确保顾客可以方便地跟踪他们的订单,以及配送员可以高效地完成送货任务。本文…...