【黑马头条之热点文章kafkaStream】
本笔记内容为黑马头条项目的热点文章-实时计算部分
目录
一、实时流式计算
1、概念
2、应用场景
3、技术方案选型
二、Kafka Stream
1、概述
2、Kafka Streams的关键概念
3、KStream
4、Kafka Stream入门案例编写
5、SpringBoot集成Kafka Stream
三、app端热点文章计算
1、思路说明
2、功能实现
一、实时流式计算
1、概念
一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。
流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。
2、应用场景
日志分析
网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
大屏看板统计
可以实时的查看网站注册数量,订单数量,购买数量,金额等。
公交实时数据
可以随时更新公交车方位,计算多久到达站牌等
实时文章分值计算
头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。
3、技术方案选型
-
Hadoop
-
Apche Storm
-
Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。
-
Kafka Stream
可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。
二、Kafka Stream
1、概述
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。
Kafka Stream的特点如下:
Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
除了Kafka外,无任何外部依赖
充分利用Kafka分区机制实现水平扩展和顺序性保证
通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
支持正好一次处理语义
提供记录级的处理能力,从而实现毫秒级的低延迟
支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)
2、Kafka Streams的关键概念
源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。
3、KStream
(1)数据结构类似于map,如下图,key-value键值对
(2)KStream
KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。 数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。
KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。
为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:
(“ alice”,1)->(“” alice“,3)
如果您的流处理应用是要总结每个用户的价值,它将返回
4
了alice
。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据
4、Kafka Stream入门案例编写
(1)需求分析,求单词个数(word count)
(2)引入依赖
在之前的kafka-demo工程的pom文件中引入
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>
(3)创建原生的kafka staream入门案例
package com.heima.kafka.sample;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** 流式处理*/
public class KafkaStreamQuickStart {public static void main(String[] args) {//kafka的配置信心Properties prop = new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream 构建器StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建kafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);//开启流式计算kafkaStreams.start();}/*** 流式计算* 消息的内容:hello kafka hello itcast* @param streamsBuilder*/private static void streamProcessor(StreamsBuilder streamsBuilder) {//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");/*** 处理消息的value*/stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//按照value进行聚合处理.groupBy((key,value)->value)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词的个数.count()//转换为kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");}
}
(4)测试准备
-
使用生产者在topic为:itcast_topic_input中发送多条消息
-
使用消费者接收topic为:itcast_topic_out
结果:
-
通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出
5、SpringBoot集成Kafka Stream
(1)自定配置参数
package com.heima.kafka.config;import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;
import java.util.Map;/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}
修改application.yml文件,在最下方添加自定义配置
kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}
(2)新增配置类,创建KStream对象,进行聚合
package com.heima.kafka.stream;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;@Configuration
@Slf4j
public class KafkaStreamHelloListener {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//根据value进行聚合分组.groupBy((key,value)->value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");return stream;}
}
测试:
启动微服务,正常发送消息,可以正常接收到消息
三、app端热点文章计算
1、思路说明
2、功能实现
1.用户行为(阅读量,评论,点赞,收藏)发送消息,以阅读和点赞为例
①在heima-leadnews-behavior微服务中集成kafka生产者配置
修改nacos,新增内容
spring:application:name: leadnews-behaviorkafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
②修改ApLikesBehaviorServiceImpl新增发送消息
定义消息发送封装类:UpdateArticleMess
package com.heima.model.mess;import lombok.Data;@Data
public class UpdateArticleMess {/*** 修改文章的字段类型*/private UpdateArticleType type;/*** 文章ID*/private Long articleId;/*** 修改数据的增量,可为正负*/private Integer add;public enum UpdateArticleType{COLLECTION,COMMENT,LIKES,VIEWS;}
}
topic常量类:
package com.heima.common.constants;public class HotArticleConstants {public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";}
完整代码如下:
package com.heima.behavior.service.impl;import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApLikesBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.LikesBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
@Transactional
@Slf4j
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {@Autowiredprivate CacheService cacheService;@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@Overridepublic ResponseResult like(LikesBehaviorDto dto) {//1.检查参数if (dto == null || dto.getArticleId() == null || checkParam(dto)) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.是否登录ApUser user = AppThreadLocalUtil.getUser();if (user == null) {return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);}UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);//3.点赞 保存数据if (dto.getOperation() == 0) {Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());if (obj != null) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞");}// 保存当前keylog.info("保存当前key:{} ,{}, {}", dto.getArticleId(), user.getId(), dto);cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));mess.setAdd(1);} else {// 删除当前keylog.info("删除当前key:{}, {}", dto.getArticleId(), user.getId());cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());mess.setAdd(-1);}//发送消息,数据聚合kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}/*** 检查参数** @return*/private boolean checkParam(LikesBehaviorDto dto) {if (dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {return true;}return false;}
}
③修改阅读行为的类ApReadBehaviorServiceImpl发送消息
完整代码:
package com.heima.behavior.service.impl;import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApReadBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.ReadBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
@Transactional
@Slf4j
public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {@Autowiredprivate CacheService cacheService;@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@Overridepublic ResponseResult readBehavior(ReadBehaviorDto dto) {//1.检查参数if (dto == null || dto.getArticleId() == null) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.是否登录ApUser user = AppThreadLocalUtil.getUser();if (user == null) {return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);}//更新阅读次数String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());if (StringUtils.isNotBlank(readBehaviorJson)) {ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount()));}// 保存当前keylog.info("保存当前key:{} {} {}", dto.getArticleId(), user.getId(), dto);cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));//发送消息,数据聚合UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);mess.setAdd(1);kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}
}
2.使用kafkaStream实时接收消息,聚合内容
①在leadnews-article微服务中集成kafkaStream (参考kafka-demo)
②定义实体类,用于聚合之后的分值封装
package com.heima.model.article.mess;import lombok.Data;@Data
public class ArticleVisitStreamMess {/*** 文章id*/private Long articleId;/*** 阅读*/private int view;/*** 收藏*/private int collect;/*** 评论*/private int comment;/*** 点赞*/private int like;
}
修改常量类:增加常量
package com.heima.common.constans;public class HotArticleConstants {public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
}
③ 定义stream,接收消息并聚合
package com.heima.article.stream;import com.alibaba.fastjson.JSON;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import com.heima.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;@Configuration
@Slf4j
public class HotArticleStreamHandler {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//接收消息KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);//聚合流式处理stream.map((key,value)->{UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);//重置消息的key:1234343434 和 value: likes:1return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());})//按照文章id进行聚合.groupBy((key,value)->key)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))/*** 自行的完成聚合的计算*/.aggregate(new Initializer<String>() {/*** 初始方法,返回值是消息的value* @return*/@Overridepublic String apply() {return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";}/*** 真正的聚合操作,返回值是消息的value*/}, new Aggregator<String, String, String>() {@Overridepublic String apply(String key, String value, String aggValue) {if(StringUtils.isBlank(value)){return aggValue;}String[] aggAry = aggValue.split(",");int col = 0,com=0,lik=0,vie=0;for (String agg : aggAry) {String[] split = agg.split(":");/*** 获得初始值,也是时间窗口内计算之后的值*/switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:col = Integer.parseInt(split[1]);break;case COMMENT:com = Integer.parseInt(split[1]);break;case LIKES:lik = Integer.parseInt(split[1]);break;case VIEWS:vie = Integer.parseInt(split[1]);break;}}/*** 累加操作*/String[] valAry = value.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){case COLLECTION:col += Integer.parseInt(valAry[1]);break;case COMMENT:com += Integer.parseInt(valAry[1]);break;case LIKES:lik += Integer.parseInt(valAry[1]);break;case VIEWS:vie += Integer.parseInt(valAry[1]);break;}String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);System.out.println("文章的id:"+key);System.out.println("当前时间窗口内的消息处理结果:"+formatStr);return formatStr;}}, Materialized.as("hot-atricle-stream-count-001")).toStream().map((key,value)->{return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));})//发送消息.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);return stream;}/*** 格式化消息的value数据* @param articleId* @param value* @return*/public String formatObj(String articleId,String value){ArticleVisitStreamMess mess = new ArticleVisitStreamMess();mess.setArticleId(Long.valueOf(articleId));//COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0String[] valAry = value.split(",");for (String val : valAry) {String[] split = val.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:mess.setCollect(Integer.parseInt(split[1]));break;case COMMENT:mess.setComment(Integer.parseInt(split[1]));break;case LIKES:mess.setLike(Integer.parseInt(split[1]));break;case VIEWS:mess.setView(Integer.parseInt(split[1]));break;}}log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));return JSON.toJSONString(mess);}
}
3.重新计算文章的分值,更新到数据库和缓存中
①在ApArticleService添加方法,用于更新数据库中的文章分值
/*** 更新文章的分值 同时更新缓存中的热点文章数据* @param mess*/
public void updateScore(ArticleVisitStreamMess mess);
实现类方法
/*** 更新文章的分值 同时更新缓存中的热点文章数据* @param mess*/
@Override
public void updateScore(ArticleVisitStreamMess mess) {//1.更新文章的阅读、点赞、收藏、评论的数量ApArticle apArticle = updateArticle(mess);//2.计算文章的分值Integer score = computeScore(apArticle);score = score * 3;//3.替换当前文章对应频道的热点数据replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());//4.替换推荐对应的热点数据replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);}/*** 替换数据并且存入到redis* @param apArticle* @param score* @param s*/
private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {String articleListStr = cacheService.get(s);if (StringUtils.isNotBlank(articleListStr)) {List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);boolean flag = true;//如果缓存中存在该文章,只更新分值for (HotArticleVo hotArticleVo : hotArticleVoList) {if (hotArticleVo.getId().equals(apArticle.getId())) {hotArticleVo.setScore(score);flag = false;break;}}//如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换if (flag) {if (hotArticleVoList.size() >= 30) {hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);if (lastHot.getScore() < score) {hotArticleVoList.remove(lastHot);HotArticleVo hot = new HotArticleVo();BeanUtils.copyProperties(apArticle, hot);hot.setScore(score);hotArticleVoList.add(hot);}} else {HotArticleVo hot = new HotArticleVo();BeanUtils.copyProperties(apArticle, hot);hot.setScore(score);hotArticleVoList.add(hot);}}//缓存到redishotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());cacheService.set(s, JSON.toJSONString(hotArticleVoList));}
}/*** 更新文章行为数量* @param mess*/
private ApArticle updateArticle(ArticleVisitStreamMess mess) {ApArticle apArticle = getById(mess.getArticleId());apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());updateById(apArticle);return apArticle;}/*** 计算文章的具体分值* @param apArticle* @return*/
private Integer computeScore(ApArticle apArticle) {Integer score = 0;if(apArticle.getLikes() != null){score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;}if(apArticle.getViews() != null){score += apArticle.getViews();}if(apArticle.getComment() != null){score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;}if(apArticle.getCollection() != null){score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;}return score;
}
②定义监听,接收聚合之后的数据,文章的分值重新进行计算
package com.heima.article.listener;import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleService;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ArticleIncrHandleListener {@Autowiredprivate ApArticleService apArticleService;@KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)public void onMessage(String mess){if(StringUtils.isNotBlank(mess)){ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);apArticleService.updateScore(articleVisitStreamMess);}}
}
结束!
相关文章:

【黑马头条之热点文章kafkaStream】
本笔记内容为黑马头条项目的热点文章-实时计算部分 目录 一、实时流式计算 1、概念 2、应用场景 3、技术方案选型 二、Kafka Stream 1、概述 2、Kafka Streams的关键概念 3、KStream 4、Kafka Stream入门案例编写 5、SpringBoot集成Kafka Stream 三、app端热点文章…...

【SpringSecurity】三、访问授权
文章目录 1、配置用户权限2、针对URL授权3、针对方法的授权 1、配置用户权限 继续上一章,给在内存中创建两个用户配置权限。配置权限有两种方式: 配置roles配置authorities //哪个写在后面哪个起作用 //角色变成权限后会加一个ROLE_前缀,比…...

你对SPA单页面的理解,它的优缺点分别是什么?如何实现SPA应用呢?
一、什么是SPA SPA(single-page application),翻译过来就是单页应用SPA是一种网络应用程序或网站的模型,它通过动态重写当前页面来与用户交互,这种方法避免了页面之间切换打断用户体验在单页应用中,所有必…...

【LeetCode75】第三十七题 二叉树中的最长交错路径
目录 题目: 示例: 分析: 代码: 题目: 示例: 分析: 给我们一棵二叉树,问我们在这棵树里能找到的最长交错路径。最长交错路径就是在二叉树里一左一右一左一右这样走,最…...
百度Apollo学习心得:探索自动驾驶技术的前沿之旅
文章目录 前言一、理论学习与实践结合二、多方资源的整合利用三、团队合作与交流分享四、持续学习与创新思维总结 前言 百度Apollo是一项引领自动驾驶技术发展的开放平台,通过深度学习、感知与决策、定位与控制等关键技术,为开发者提供了丰富的工具和资…...
kafka原理之springboot 集成批量消费
前言 由于 Kafka 的写性能非常高,因此项目经常会碰到 Kafka 消息队列拥堵的情况。遇到这种情况,我们可以通过并发消费、批量消费的方法进行解决。 一、新建一个maven工程,添加kafka依赖 <dependency><groupId>org.springframe…...
【GeoDa实用技巧100例】024:geoda计算全局(局部)莫兰指数Moran‘s I,LISA聚类地图,显著性地图
严重声明:本文及专栏《GeoDa空间计量案例教程100例》为CSDN博客专家刘一哥GIS原创,原文及专栏地址为:https://blog.csdn.net/lucky51222/category_12373659.html,谢绝转载或爬取!!! 文章目录 一、计算全局(或局部)单变量莫兰指数I1. 加载实验数据2. 加载权重矩阵3. 创建…...

Java进阶(7)——手动实现LinkedList 内部node类的实现 增删改查的实现 toString方法 源码的初步理解
目录 引出从ArrayList到Linkedlist手动实现ArrayList从ArrayList到LinkedList 总体设计Node类Node的方法:根据index找node 增删改查的实现增加元素删除元素修改元素查询元素 toString方法完整代码List接口类LinkedList的实现测试类 总结 引出 1.linkedList的节点&am…...
CPU总线的理解
目录 CPU总线CPU总线是什么?CPU总线可以分为前端部分和后端部分吗? CPU总线 CPU总线是什么? CPU总线(Central Processing Unit Bus)是计算机硬件中的一个重要组成部分,它是连接CPU和其他硬件组件的通道。…...

Spring Boot 中的 AOP,到底是 JDK 动态代理还是 Cglib 动态代理
大家都知道,AOP 底层是动态代理,而 Java 中的动态代理有两种实现方式: 基于 JDK 的动态代理 基于 Cglib 的动态代理 这两者最大的区别在于基于 JDK 的动态代理需要被代理的对象有接口,而基于 Cglib 的动态代理并不需要被代理对…...
记录一下在工作中使用 LayUI bug的问题
前言: LayUI是一个很老的框架了,经常会碰到一些 bug。不过由于他的轻量级,仍然有一些项目在使用。解决这些 bug 可能会对大家产生一些意义。 layui中 slect form表单元素 不美化显现的问题 layui中美化的表单元素 在渲染完成要添加 form.re…...

手机自动无人直播,实景无人直播真的有用吗?
继数字人直播之后,手机自动直播开始火热了起来,因为其门槛低,成本低,一部手机一个账号就可以实现直播,一时深受广大商家的好评。那么,手机自动无人直播究竟是如何实现自动直播的呢? 在传统的直…...

python 面试题--2(15题)
目录 1.解释Python中的 GIL(全局解释器锁)是什么,它对多线程编程有什么影响? 2.Python中的装饰器是什么?如何使用装饰器? 3.解释Python中的迭代器和生成器的区别。 4.什么是Python中的列表解析…...

kafka复习:(11)auto.offset.reset的默认值
在ConsumerConfig这个类中定义了这个属性的默认值,如下图 也就是默认值为latest,它的含义是:如果没有客户端提交过offset的话,当新的客户端消费时,把最新的offset设置为当前消费的offset. 默认是自动提交位移的,每5秒…...

【javaweb】学习日记Day7 - Mysql 数据库 DQL 多表设计
之前学习过的SQL语句笔记总结戳这里→【数据库原理与应用 - 第六章】T-SQL 在SQL Server的使用_Roye_ack的博客-CSDN博客 目录 一、DQL 数据查询 1、基本查询 2、条件查询 3、分组查询 (1)聚合函数 ① count函数 ② max min avg sum函数 &…...
线程的生命周期
线程的生命周期 与人有生老病死一样,线程也同样要经历开始(等待)、运行、挂起和停止四种不同的状态。这四种状态都可以通过Thread类中的方法进行控制。下面给出了Thread类中和这四种状态相关的方法。 // 开始线程 public void start( ); …...

GAN | 论文精读 Generative Adversarial Nets
提出一个GAN (Generative Adversarial Nets) 1 方法 (1)生成模型G(Generative),是用来得到分布的,在统计学眼里,整个世界是通过采样不同的分布得到的,生成…...

Yolo系列-yolov2
YOLO-V2 更快!更强! YOLO-V2-BatchNormalization BatchNormalization(批归一化)是一个常用的深度神经网络优化技术,它可以将输入数据进行归一化处理,使得神经网络更容易进行学习。在YOLOv2中,B…...

Linux下的系统编程——vim/gcc编辑(二)
前言: 在Linux操作系统之中有很多使用的工具,我们可以用vim来进行程序的编写,然后用gcc来生成可执行文件,最终运行程序。下面就让我们一起了解一下vim和gcc吧 目录 一、vim编辑 1.vim的三种工作模式 2.基本操作之跳转字符 &a…...

2023年国赛 高教社杯数学建模思路 - 案例:最短时间生产计划安排
文章目录 0 赛题思路1 模型描述2 实例2.1 问题描述2.2 数学模型2.2.1 模型流程2.2.2 符号约定2.2.3 求解模型 2.3 相关代码2.4 模型求解结果 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 最短时…...

Flask RESTful 示例
目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)
HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...

相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...

C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。
1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj,再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...
【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统
目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

10-Oracle 23 ai Vector Search 概述和参数
一、Oracle AI Vector Search 概述 企业和个人都在尝试各种AI,使用客户端或是内部自己搭建集成大模型的终端,加速与大型语言模型(LLM)的结合,同时使用检索增强生成(Retrieval Augmented Generation &#…...
Xen Server服务器释放磁盘空间
disk.sh #!/bin/bashcd /run/sr-mount/e54f0646-ae11-0457-b64f-eba4673b824c # 全部虚拟机物理磁盘文件存储 a$(ls -l | awk {print $NF} | cut -d. -f1) # 使用中的虚拟机物理磁盘文件 b$(xe vm-disk-list --multiple | grep uuid | awk {print $NF})printf "%s\n"…...

RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)
RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发,后来由Pivotal Software Inc.(现为VMware子公司)接管。RabbitMQ 是一个开源的消息代理和队列服务器,用 Erlang 语言编写。广泛应用于各种分布…...

STM32---外部32.768K晶振(LSE)无法起振问题
晶振是否起振主要就检查两个1、晶振与MCU是否兼容;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容(CL)与匹配电容(CL1、CL2)的关系 2. 如何选择 CL1 和 CL…...

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制
目录 节点的功能承载层(GATT/Adv)局限性: 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能,如 Configuration …...