当前位置: 首页 > news >正文

黑马头条Day11- 实时计算热点文章、KafkaStream

一、今日内容

1. 定时计算与实时计算

2. 今日内容

KafkaStream

  • 什么是流式计算
  • KafkaStream概述
  • KafkaStream入门案例
  • SpringBoot集成KafkaStream

实时计算

  • 用户行为发送消息
  • KafkaStream聚合处理消息
  • 更新文章行为数量
  • 替换热点文章数据

二、实时流式计算

1. 概念

一般流式计算会与批量计算相比较。在流式计算模型这种,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算替代全量计算

流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。

2. 应用场景

日志分析:

对网站的用户访问日志进行实时的分析,计算访问量、用户画像、留存率等,实时进行数据分析,帮助企业进行决策

大屏看板统计:

可以实时的查看网站注册数量、订单数量、购买数量、金额等

公交实时数据:

可以随时更新公交车方位,计算多久到达站牌

实时文章分值计算:

头条类文章的分值计算,通过用户的行为实时计算文章的分值,分值越高就越被推荐。

3. 技术方案选项

Hadoop

Apache Storm

Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。

Kafka Stream

可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包、部署和操作工具集成。

三、Kafka Stream

1. 概述

Kafka Stream 是Apache Kafka从1.0版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的State store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提高记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可以处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原理Processor(类似于Storm 的spout和bolt),以及高层抽象的DSL(类似于Spark 的map/group/reduce)

2. Kafka Stream的关键概念

源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个Kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。

3. KStream

(1)数据结构类似于map,如下图,key-value键值对

(2)KStream

KStream数据流(data stream):即是一段顺序的,可以无限长,不断更新的数据集。数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身的topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据

为了说明这一点,让我们想象一下两个数据记录正在发送到流中:

("alice", 1) -> ("alice", 3)

如果您的流处理应用是要总结每个用户的价值,它将返回了4了alice。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据

4. Kafka Stream入门案例

(1)需求分析,求单词个数(word count)

(2)引入依赖

在之前的kafka-demo工程的pom文件中引入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>

(3)创建原生的kafka stream入门案例

package com.heima.kafka.sample;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** 流式处理*/
public class KafkaStreamQuickStart {public static void main(String[] args) {// kafka的配置中心Properties properties = new Properties();properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");// stream构建器StreamsBuilder streamsBuilder = new StreamsBuilder();// 流式处理StreamProcessor(streamsBuilder);// 创建KafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);// 开启流式计算kafkaStreams.start();}/*** 流式计算* 消息的内容:hello kafka hello itcast* @param streamsBuilder*/private static void StreamProcessor(StreamsBuilder streamsBuilder) {// 1. 创建kstream对象,同时指定从哪个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");// 2. 处理消息的valuestream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})// 按照value进行聚合处理.groupBy((key, value) -> value)// 时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))// 统计单词的个数.count()// 转换为KStream.toStream().map((key, value) -> {System.out.println("key: " + key + ", value: " + value);return new KeyValue<>(key.key().toString(), value.toString());})// 发送消息.to("itcast-topic-out");}
}

(4)测试准备

使用生产者在topic为:itcast_topic_input中发送多条消息

package com.heima.kafka.sample;import org.apache.kafka.clients.producer.*;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) throws ExecutionException, InterruptedException {//1.kafka链接配置信息Properties prop = new Properties();//kafka链接地址prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//key和value的序列化prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//ack配置  消息确认机制prop.put(ProducerConfig.ACKS_CONFIG,"all");//重试次数prop.put(ProducerConfig.RETRIES_CONFIG,10);//数据压缩prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");//2.创建kafka生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);//3.发送消息/*** 第一个参数 :topic* 第二个参数:消息的key* 第三个参数:消息的value*/for (int i = 0; i < 5; i++) {ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("itcast-topic-input","hello kafka");producer.send(kvProducerRecord);}//同步发送消息/*RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();System.out.println(recordMetadata.offset());*///异步消息发送/* producer.send(kvProducerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null){System.out.println("记录异常信息到日志表中");}System.out.println(recordMetadata.offset());}});*///4.关闭消息通道  必须要关闭,否则消息发送不成功producer.close();}}

使用消费者接收topic为:itcast_topic_input

package com.heima.kafka.sample;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** 消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties prop = new Properties();//链接地址prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");//key和value的反序列化器prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//设置消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");//手动提交偏移量prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//2.创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);//3.订阅主题consumer.subscribe(Collections.singletonList("itcast-topic-out"));//4.拉取消息//同步提交和异步提交偏移量try {while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());/* System.out.println(consumerRecord.offset());System.out.println(consumerRecord.partition());*/}//异步提交偏移量consumer.commitAsync();}}catch (Exception e){e.printStackTrace();System.out.println("记录错误的信息:"+e);}finally {//同步consumer.commitSync();}/*while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());System.out.println(consumerRecord.offset());System.out.println(consumerRecord.partition());*//* try {//同步提交偏移量consumer.commitSync();}catch (CommitFailedException e){System.out.println("记录提交失败的异常:"+e);}*//*}//异步的方式提交偏移量*//*consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e != null){System.out.println("记录错误的提交偏移量:"+map+",异常信息为:"+e);}}});*//*}*/}}

先启动Consumer,再启动KafkaStream,最后启动Producer发送消息

结果:通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

5. SpringBoot集成Kafka Stream

(1)自定义配置参数

package com.heima.kafka.config;import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;
import java.util.Map;/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix = "kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

修改application.yml文件,在最下方添加自定义配置

kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}

(2)新增配置类,创建KStream对象,进行聚合

package com.heima.kafka.stream;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;@Configuration
@Slf4j
public class KafkaStreamHelloListener {@Beanpublic KStream<String, String> kStream(StreamsBuilder streamsBuilder) {// 创建KStream对象,同时指定从哪个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//根据value进行聚合分组.groupBy((key,value)->value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");return stream;}
}

(3)测试:先启动ConsumerQuickStart -> KafkaDemoApplication -> ProducerQuickStart

启动微服务,正常发送消息,可以正常接收到消息

四、App端热点文章计算

1. 思路说明

2. 功能实现

2.1 用户行为(阅读、评论、点赞、收藏)发送消息,以阅读和点赞为例

(在第9天的实战中,我已经把这部分做了)

步骤①:在heima-leadnews-behavior微服务中集成Kafka生产者配置

在nacos修改leadnews-behavior.yml

spring:kafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerautoconfigure:exclude: org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfigurationredis:host: 192.168.200.130password: leadnewsport: 6379

步骤②:修改ApLikesBehaviorServiceImpl,新增发送消息

定义消息发送封装类:UpdateArticleMess

package com.heima.model.mess;import lombok.Data;@Data
public class UpdateArticleMess {/*** 修改文章的字段类型*/private UpdateArticleType type;/*** 文章ID*/private Long articleId;/*** 修改数据的增量,可为正负*/private Integer add;public enum UpdateArticleType{COLLECTION,COMMENT,LIKES,VIEWS;}
}

topic常量类:HotArticleConstants

package com.heima.common.constants;public class HotArticleConstants {public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
}

ApLikesBehaviorServiceImpl:

package com.heima.behavior.service.impl;import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApLikesBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.LikesBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
@Transactional
@Slf4j
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {@Autowiredprivate CacheService cacheService;@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 点赞或取消点赞* @param dto 0:点赞 1:取消点赞* @return*/@Overridepublic ResponseResult like(LikesBehaviorDto dto) {// 1. 检查参数if(dto == null || dto.getArticleId() == null || checkParam(dto)) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}// 2. 是否登录ApUser apUser = AppThreadLocalUtil.getUser();if(apUser == null) {return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);}UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);// 3. 点赞,保存数据if(dto.getOperation() == 0) {Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), apUser.getId().toString());if(obj != null) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞");}// 保存当前keylog.info("保存当前key:{}, {}, {}", dto.getArticleId(), apUser.getId(), dto);cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), apUser.getId().toString(), JSON.toJSONString(dto));mess.setAdd(1);} else {// 取消点赞,删除当前keylog.info("删除当前key:{}, {}", dto.getArticleId(), apUser.getId());cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), apUser.getId().toString());mess.setAdd(-1);}// 4. 发送消息,数据聚合kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));// 5. 结果返回return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}/*** 检查参数* @param dto* @return*/private boolean checkParam(LikesBehaviorDto dto) {// 参数有误if(dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {return true;}return false;}
}

步骤③:修改阅读行为的类ApReadBehaviorServiceImpl发送消息

package com.heima.behavior.service.impl;import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApReadBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.ReadBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
@Transactional
@Slf4j
public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {@Autowiredprivate CacheService cacheService;@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 用户行为 - 阅读* @param dto* @return*/@Overridepublic ResponseResult readBehavior(ReadBehaviorDto dto) {// 1. 检查参数if(dto == null || dto.getArticleId() == null) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}// 2. 是否登录ApUser user = AppThreadLocalUtil.getUser();if(user == null) {return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);}// 3. 更新阅读次数String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());if(StringUtils.isNotBlank(readBehaviorJson)) {ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount()));}// 4. 保存当前keylog.info("保存当前key: {} {} {}", dto.getArticleId(), user.getId(), dto);cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));// 5. 发送消息,数据聚合UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);mess.setAdd(1);kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));// 6. 结果返回return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}
}

2.2 使用KafkaStream实时接收消息,聚合内容

步骤①:在heima-leadnews-article微服务中集成kafkaStream

在pom.xml(heima-leadnews-article)添加Kafka Stream的依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>

KafkaStreamConfig:

package com.heima.article.config;import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;
import java.util.Map;/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

在nacos配置正常的leadnews-article添加如下

# 。。。 。。。省略
kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}

步骤②:定义实体类,用于聚合之后的分值封装

package com.heima.model.article.mess;import lombok.Data;@Data
public class ArticleVisitStreamMess {/*** 文章id*/private Long articleId;/*** 阅读*/private int view;/*** 收藏*/private int collect;/*** 评论*/private int comment;/*** 点赞*/private int like;
}

步骤③:定义stream,接收消息聚合

package com.heima.article.stream;import com.alibaba.fastjson.JSON;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import com.heima.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;@Configuration
@Slf4j
public class HotArticleStreamHandler {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//接收消息KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);//聚合流式处理stream.map((key,value)->{UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);//重置消息的key:1234343434   和  value: likes:1return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());})//按照文章id进行聚合.groupBy((key,value)->key)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))/*** 自行的完成聚合的计算*/.aggregate(new Initializer<String>() {/*** 初始方法,返回值是消息的value* @return*/@Overridepublic String apply() {return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";}/*** 真正的聚合操作,返回值是消息的value*/}, new Aggregator<String, String, String>() {/**** @param key* @param value  likes:1* @param aggValue   COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0* @return*/@Overridepublic String apply(String key, String value, String aggValue) {System.out.println(value);if(StringUtils.isBlank(value)){return aggValue;}String[] aggAry = aggValue.split(",");int col = 0,com=0,lik=0,vie=0;for (String agg : aggAry) {String[] split = agg.split(":");/*** 获得初始值,也是时间窗口内计算之后的值*/switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:col = Integer.parseInt(split[1]);break;case COMMENT:com = Integer.parseInt(split[1]);break;case LIKES:lik = Integer.parseInt(split[1]);break;case VIEWS:vie = Integer.parseInt(split[1]);break;}}/*** 累加操作   likes:1*/String[] valAry = value.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){case COLLECTION:col += Integer.parseInt(valAry[1]);break;case COMMENT:com += Integer.parseInt(valAry[1]);break;case LIKES:lik += Integer.parseInt(valAry[1]);break;case VIEWS:vie += Integer.parseInt(valAry[1]);break;}String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);System.out.println("文章的id:"+key);System.out.println("当前时间窗口内的消息处理结果:"+formatStr);return formatStr;}}, Materialized.as("hot-atricle-stream-count-001")).toStream().map((key,value)->{return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));})//发送消息.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);return stream;}/*** 格式化消息的value数据* @param articleId* @param value* @return*/public String formatObj(String articleId,String value){ArticleVisitStreamMess mess = new ArticleVisitStreamMess();mess.setArticleId(Long.valueOf(articleId));//COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0String[] valAry = value.split(",");for (String val : valAry) {String[] split = val.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:mess.setCollect(Integer.parseInt(split[1]));break;case COMMENT:mess.setComment(Integer.parseInt(split[1]));break;case LIKES:mess.setLike(Integer.parseInt(split[1]));break;case VIEWS:mess.setView(Integer.parseInt(split[1]));break;}}log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));return JSON.toJSONString(mess);}
}

2.3 重新计算文章的分值,更新到数据库和缓存中

步骤①:在ApArticleService添加方法,用于更新数据库中的文章分值

/*** 更新文章的分值  同时更新缓存中的热点文章数据* @param mess*/
public void updateScore(ArticleVisitStreamMess mess);

实现类方法:

/*** 更新文章的分值  同时更新缓存中的热点文章数据* @param mess*/
@Override
public void updateScore(ArticleVisitStreamMess mess) {//1.更新文章的阅读、点赞、收藏、评论的数量ApArticle apArticle = updateArticle(mess);//2.计算文章的分值Integer score = computeScore(apArticle);score = score * 3;//3.替换当前文章对应频道的热点数据replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());//4.替换推荐对应的热点数据replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);}/*** 替换数据并且存入到redis* @param apArticle* @param score* @param s*/
private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {String articleListStr = cacheService.get(s);if (StringUtils.isNotBlank(articleListStr)) {List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);boolean flag = true;//如果缓存中存在该文章,只更新分值for (HotArticleVo hotArticleVo : hotArticleVoList) {if (hotArticleVo.getId().equals(apArticle.getId())) {hotArticleVo.setScore(score);flag = false;break;}}//如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换if (flag) {if (hotArticleVoList.size() >= 30) {hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);if (lastHot.getScore() < score) {hotArticleVoList.remove(lastHot);HotArticleVo hot = new HotArticleVo();BeanUtils.copyProperties(apArticle, hot);hot.setScore(score);hotArticleVoList.add(hot);}} else {HotArticleVo hot = new HotArticleVo();BeanUtils.copyProperties(apArticle, hot);hot.setScore(score);hotArticleVoList.add(hot);}}//缓存到redishotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());cacheService.set(s, JSON.toJSONString(hotArticleVoList));}
}/*** 更新文章行为数量* @param mess*/
private ApArticle updateArticle(ArticleVisitStreamMess mess) {ApArticle apArticle = getById(mess.getArticleId());apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());updateById(apArticle);return apArticle;}/*** 计算文章的具体分值* @param apArticle* @return*/
private Integer computeScore(ApArticle apArticle) {Integer score = 0;if(apArticle.getLikes() != null){score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;}if(apArticle.getViews() != null){score += apArticle.getViews();}if(apArticle.getComment() != null){score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;}if(apArticle.getCollection() != null){score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;}return score;
}

步骤②:定义监听,接收聚合之后的数据,文章的分值重新进行计算

package com.heima.article.listener;import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleService;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ArticleIncrHandleListener {@Autowiredprivate ApArticleService apArticleService;@KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)public void onMessage(String mess){if(StringUtils.isNotBlank(mess)){ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);apArticleService.updateScore(articleVisitStreamMess);}}
}

相关文章:

黑马头条Day11- 实时计算热点文章、KafkaStream

一、今日内容 1. 定时计算与实时计算 2. 今日内容 KafkaStream 什么是流式计算KafkaStream概述KafkaStream入门案例SpringBoot集成KafkaStream 实时计算 用户行为发送消息KafkaStream聚合处理消息更新文章行为数量替换热点文章数据 二、实时流式计算 1. 概念 一般流式计…...

pnpm 设置国内源

pnpm config set registry https://registry.npmmirror.com/...

链表分割 C语言

链表分割_牛客题霸_牛客网 (nowcoder.com) ( 点击前面链接即可查看题目) /* struct ListNode {int val;struct ListNode *next;ListNode(int x) : val(x), next(NULL) {} };*/ #include <cstddef> class Partition { public:ListNode* partition(ListNode* pHea…...

python编程,设计一个详细的软件 与SADS 相似

软件功能模块&#xff1a; 用户界面模块&#xff08;UI Module&#xff09; 项目管理界面模型构建界面分析和设计界面结果展示和报告生成界面 数据库模块&#xff08;Database Module&#xff09; 材料数据库结构组件数据库设计标准和规范数据库用户项目数据存储 模型构建模块&…...

META 备受期待的 Llama 3 405B 即将发布

本心、输入输出、结果 文章目录 META 备受期待的 Llama 3 405B 即将发布前言Llama 3 405B或许会彻底改变专用模型的数据质量Llama 3 405B将形成新的模型生态系统:从基础模型到专家组合Llama 3 405B有最高效 API 的竞争Llama 3 405B 基准测试META 备受期待的 Llama 3 405B 即将…...

c# Math.Round()四舍五入取整数

可以使用Math.Round()方法进行四舍五入取整数的操作。 以下是使用Math.Round()方法的实现方法&#xff1a; 将浮点数直接作为参数传递给Math.Round()方法&#xff0c;并指定要保留的小数位数。此方法将返回最接近的整数值。 double number 3.89; int roundedNumber (int)Mat…...

【C++BFS算法】886. 可能的二分法

本文涉及的点 CBFS算法 LeetCod886. 可能的二分法 给定一组 n 人&#xff08;编号为 1, 2, …, n&#xff09;&#xff0c; 我们想把每个人分进任意大小的两组。每个人都可能不喜欢其他人&#xff0c;那么他们不应该属于同一组。 给定整数 n 和数组 dislikes &#xff0c;其…...

【MySQL】记录MySQL加载数据(LOAD DATA)

MySQL LOAD DATA 一、背景二、模拟生成用户信息三、加载到mysql表3.1、建表语句3.2 加载数据3.3、查看结果 一、背景 现在有个需求是将用户信息存入student.data文件中&#xff0c;在现在load到数据库中 二、模拟生成用户信息 假设用户信息&#xff0c;包含姓名&#xff0c;…...

6 网络

6 网络 1、概念2 IP地址3、套接字4、TCP协议4.1 TCP协议的基本特征4.2 建立连接4.4 终止连接4.5 编程模型 5、UDP协议5.1 UDP协议的基本特性5.2 常用函数5.3 UDP通信模型 6、域名解析 1、概念 计算机网络是实现资源共享和信息传递的计算机系统 ISO/OSI网络协议模型 TCP/IP协…...

SQL中CASE WHEN的用法

CASE WHEN的用法 1. CASE WHEN数据转换 说明&#xff1a;使用CASE WHEN我们可以将范围的数据转换成特定的值来表达; 假如&#xff1a;有一个员工表Employee(employee_id,department_id.salary,name,age)&#xff1b; 需求&#xff1a;需要根据薪资情况来评定等级&#xff1a;…...

CTF-Web习题:[GXYCTF2019]Ping Ping Ping

题目链接&#xff1a;[GXYCTF2019]Ping Ping Ping 解题思路 访问靶机&#xff0c;得到如下页面&#xff0c;类似于URL参数 尝试用HackBar构造url传输过去看看 发现返回了ping命令的执行结果&#xff0c;可以猜测php脚本命令是ping -c 4 $ip&#xff0c;暂时不知道执行的函数…...

python+vue3+onlyoffice在线文档系统实战20240725笔记,首页开发

解决遗留问题 内容区域的高度没有生效&#xff0c;会随着菜单的高度自动变化。 解决方案&#xff1a;给侧边加上一个最小高度。 首页设计 另一种设计&#xff1a; 进来以后&#xff0c;是所有的文件夹和最近的文件。 有一张表格&#xff0c;类似于Windows目录详情&…...

映美精彩色相机IFrameQueueBuffer转halcon的HObject

1.之前写了黑白IFrameQueueBuffer转halcon的HObject&#xff0c;下载这边文件写&#xff0c;彩色IFrameQueueBuffer转halcon的HObject 2.相机的部署跟黑白的一样&#xff0c;不同的是取图的格式改变 if (CamerTakeImageOne._camer_take_image_static._camer_is_exit){textbox_m…...

写代码对人的影响

1 代码是需要跑起来的&#xff0c;不能你写了一段代码运行不了 2 代码过程中有大量的bug&#xff0c;经常异常报错&#xff0c;你需要花费时间去解决 对人的影响就是解决问题的态度得到强化&#xff0c;解决问题要比坚持正确困难&#xff0c;坚持正确只是需要自然而然的努力&…...

Hive-基础介绍

简介 Apache Hive是一款数据仓库系统 功能 可以将存储在Hadoop(HDFS)中的数据映射为一张数据库表。核心是将HQL语句转化为MapRece程序&#xff0c;然后提交到Hadoop执行。 组件 用户接口&#xff1a;CLI(shell命令行)、WebGUI、Thrift Server元数据存储(Metastore)&#x…...

网站如何从0-1搭建部署蓝图介绍

第一步&#xff1a;网站规划 确定网站目的&#xff1a;明确网站的目标和预期的受众。内容规划&#xff1a;决定网站将包含哪些内容和功能。技术需求分析&#xff1a;确定所需的技术栈&#xff0c;例如前端和后端技术。 第二步&#xff1a;设计 草图和布局&#xff1a;绘制网…...

面向对象(封装)练习题 巩固一下啦!

# 设计一个类&#xff0c;用来描述手机 class Phone:# 提供私有成员变量&#xff1a;__is_5g_enable__is_5g_enable False # 5g状态# 提供私有成员方法&#xff1a;__check_5gdef __check_5g(self):if self.__is_5g_enable:print("5g开启")else:print("5g关闭…...

一些问题 7/28

get post可以public吗 在Java Servlet中&#xff0c;doGet()和doPost()方法的访问修饰符通常是public&#xff0c;因为这些方法需要被Servlet容器&#xff08;如Tomcat&#xff09;调用。 如果将这些方法声明为private或protected&#xff0c;Servlet容器将无法访问它们&…...

昇思MindSpore 应用学习-基于MobileNetv2的垃圾分类

基于MobileNetv2的垃圾分类 本文档主要介绍垃圾分类代码开发的方法。通过读取本地图像数据作为输入&#xff0c;对图像中的垃圾物体进行检测&#xff0c;并将检测结果图片保存到文件中。 1、实验目的 了解熟悉垃圾分类应用代码的编写&#xff08;Python语言&#xff09;&…...

matlab 常用数据类型的转换

目录 一、数据类型1、整型2、浮点型3、逻辑型4、元胞数组5、结构体 二、数据类型转换三、图像数据类型转换四、参考链接 一、数据类型 1、整型 int和unit都是整型&#xff0c;只是前一个有符号&#xff0c;后一个没有符号&#xff0c;比如在16位系统中&#xff0c;int范围是-3…...

Cocos Creator2D游戏开发(6)-飞机大战(4)-敌机产生

敌机产生&玩家发射子弹 敌机产生: 创建一个空节点 创建一个敌机预制体 把敌机图片拖入预制体内 使用代码生成敌机 让敌机动起来 创建一个预制体enemy_prefab双击预制体enemy_prefab,然后拖入一个敌机图片,设置好方向和尺寸,一定要记得保存然后关闭(场景编辑器里面的保存)…...

Hugo部署到Vercel踩大坑——全是XML文件?

问题描述 部署到Vercel全都是XML文件 Vercel是著名PAAS服务&#xff0c;相比于 Github Pages&#xff0c;其中国大陆可直接访问&#xff0c;因此尝试把Hugo站点发布到vercel中&#xff0c;部署后遇到问题&#xff0c;所有页面都为xml文件&#xff0c;如下所示&#xff1a; Ve…...

2024 暑假友谊赛-热身1

[ABC102D] Equal Cut - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 思路:找在区间[2,n-1]中找到i,j,k三个点,把序列分割成4个区间:[1,i],[i1,j],[j1,k],[k1,n] 暴力的做法是枚举i,j,k加上前缀和是o(n^3)的 key:"考虑枚举处于中间的j&#xff0c;然后用i平衡左两个区间,…...

Nginx系列-11 HTTP消息处理流程

背景 了解Nginx处理HTTP请求的11个阶段&#xff0c;有助于理解和配置nginx、自定义模块、基于lua模块自定义功能。按如下配置&#xff0c;执行"curl http://localhost:8001/query/test.html"&#xff0c;如果读者对结果不是很确定&#xff0c;建议阅读本文。 serve…...

前端知识--前端访问后端技术Ajax及框架Axios

一、异步数据请求技术----Ajax Ajax是前端访问后端的技术&#xff0c;为异步请求&#xff08;不刷新页面&#xff0c;请求数据&#xff0c;只更新局部数据&#xff09;。 例如&#xff1a;在京东网站中搜索电脑&#xff0c;就会出现一些联想搜索&#xff0c;但此时页面并没有…...

【前端/js】使用js读取本地文件(xml、二进制)内容

目录 说在前面FileReaderDOMParser文本文件二进制文件 说在前面 浏览器版本&#xff1a;Microsoft Edge 126.0.2 (正式版本) (64 位) FileReader MDNFileReader 接口允许 Web 应用程序异步读取存储在用户计算机上的文件&#xff08;或原始数据缓冲区&#xff09;的内容&#x…...

初步入门C ++之类的概念

文章目录 0 Hello World!1 编译过程2 类2.1 类的概念2.2 构造函数与析构函数 0 Hello World! #include <iostream> //相当于#include <stdio.h>int main(int argc, char argv[]) {char c;std::cout << "Hello World!\n" <<…...

什么是技术作家风格指南?

技术写作风格指南旨在提供必要的格式风格&#xff0c;以帮助技术作家为读者创建引人入胜且一致的内容。然而&#xff0c;技术写作与普通的自由写作有很大不同。目的是将复杂的技术主题分解为易于理解的内容&#xff0c;以帮助读者了解如何使用产品或服务。 在本文中&#xff0…...

WebGIS学习——Cesium|Javascript

1.Cesium学习什么&#xff1a;Cesium实战项目说明-CSDN博客 2.Cesium绘制图形(箭头等):Cesium 态势标绘 _cesium态势标绘-CSDN博客 3.CesiumThree集成 4.Cesium深度图相关&#xff1a;Cesium离屏渲染深度图实验_cesium 离屏渲染-CSDN博客 5.洪涝&#xff1a;cesium淹没分析…...

Qt,获取其他.exe文件的标准输出流的信息(printf/print的输出信息)

比如&#xff0c;通过Python编写爬虫软件功能是运行程序获取豆瓣电影排行榜信息&#xff0c;并通过print打印出来。将其打包成.exe,通过Qt来调用&#xff0c;并获取到.exe程序运行的结果 简单示例代码&#xff1a; // 创建 QProcess 对象QProcess process;// 连接信号槽以获取…...