搭建大型分布式服务(三十八)SpringBoot 整合多个kafka数据源-支持protobuf
系列文章目录
文章目录
- 系列文章目录
- 前言
- 一、本文要点
- 二、开发环境
- 三、原项目
- 四、修改项目
- 五、测试一下
- 五、小结
前言
本插件稳定运行上百个kafka项目,每天处理上亿级的数据的精简小插件,快速上手。
<dependency><groupId>io.github.vipjoey</groupId><artifactId>multi-kafka-consumer-starter</artifactId><version>最新版本号</version>
</dependency>
例如下面这样简单的配置就完成SpringBoot和kafka的整合,我们只需要关心com.mmc.multi.kafka.starter.OneProcessor和com.mmc.multi.kafka.starter.TwoProcessor 这两个Service的代码开发。
## topic1的kafka配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor // 业务处理类名称
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer## topic2的kafka配置
spring.kafka.two.enabled=true
spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.two.topic=mmc-topic-two
spring.kafka.two.group-id=group-consumer-two
spring.kafka.two.processor=com.mmc.multi.kafka.starter.TwoProcessor // 业务处理类名称
spring.kafka.two.consumer.auto-offset-reset=latest
spring.kafka.two.consumer.max-poll-records=10
spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer## pb 消息消费者
spring.kafka.pb.enabled=true
spring.kafka.pb.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.pb.topic=mmc-topic-pb
spring.kafka.pb.group-id=group-consumer-pb
spring.kafka.pb.processor=pbProcessor
spring.kafka.pb.consumer.auto-offset-reset=latest
spring.kafka.pb.consumer.max-poll-records=10
spring.kafka.pb.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.pb.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
国籍惯例,先上源码:Github源码
一、本文要点
本文将介绍通过封装一个starter,来实现多kafka数据源的配置,通过通过源码,可以学习以下特性。系列文章完整目录
- SpringBoot 整合多个kafka数据源
- SpringBoot 批量消费kafka消息
- SpringBoot 优雅地启动或停止消费kafka
- SpringBoot kafka本地单元测试(免集群)
- SpringBoot 利用map注入多份配置
- SpringBoot BeanPostProcessor 后置处理器使用方式
- SpringBoot 将自定义类注册到IOC容器
- SpringBoot 注入bean到自定义类成员变量
- Springboot 取消限定符
- Springboot 支持消费protobuf类型的kafka消息
二、开发环境
- jdk 1.8
- maven 3.6.2
- springboot 2.4.3
- kafka-client 2.6.6
- idea 2020
三、原项目
1、接前文,我们开发了一个kafka插件,但在使用过程中发现有些不方便的地方,在公共接口MmcKafkaStringInputer 显示地继承了BatchMessageListener<String, String>,导致我们没办法去指定消费protobuf类型的message。
public interface MmcKafkaStringInputer extends MmcInputer, BatchMessageListener<String, String> {}/*** 消费kafka消息.*/@Overridepublic void onMessage(List<ConsumerRecord<String, String>> records) {if (null == records || CollectionUtils.isEmpty(records)) {log.warn("{} records is null or records.value is empty.", name);return;}Assert.hasText(name, "You must pass the field `name` to the Constructor or invoke the setName() after the class was created.");Assert.notNull(properties, "You must pass the field `properties` to the Constructor or invoke the setProperties() after the class was created.");try {Stream<T> dataStream = records.stream().map(ConsumerRecord::value).flatMap(this::doParse).filter(Objects::nonNull).filter(this::isRightRecord);// 支持配置强制去重或实现了接口能力去重if (properties.isDuplicate() || isSubtypeOfInterface(MmcKafkaMsg.class)) {// 检查是否实现了去重接口if (!isSubtypeOfInterface(MmcKafkaMsg.class)) {throw new RuntimeException("The interface "+ MmcKafkaMsg.class.getName() + " is not implemented if you set the config `spring.kafka.xxx.duplicate=true` .");}dataStream = dataStream.collect(Collectors.groupingBy(this::buildRoutekey)).entrySet().stream().map(this::findLasted).filter(Objects::nonNull);}List<T> datas = dataStream.collect(Collectors.toList());if (CommonUtil.isNotEmpty(datas)) {this.dealMessage(datas);}} catch (Exception e) {log.error(name + "-dealMessage error ", e);}}
2、由于实现了BatchMessageListener<String, String>接口,抽象父类必须实现onMessage(List<ConsumerRecord<String, String>> records)方法,这样会导致子类局限性很大,没办法去实现其它kafka的xxxListener接口,例如手工提交offset,单条消息消费等。
因此、所以我们要升级和优化。
四、修改项目
1、新增KafkaAbastrctProcessor抽象父类,直接实现MmcInputer接口,要求所有子类都需要继承本类,子类通过调用{@link #receiveMessage(List)} 模板方法来实现通用功能;
@Slf4j
@Setter
abstract class KafkaAbstractProcessor<T> implements MmcInputer {// 类的内容基本和MmcKafkaKafkaAbastrctProcessor保持一致// 主要修改了doParse方法,目的是让子类可以自定义解析protobuf/*** 将kafka消息解析为实体,支持json对象或者json数组.** @param msg kafka消息* @return 实体类*/protected Stream<T> doParse(ConsumerRecord<String, Object> msg) {// 消息对象Object record = msg.value();// 如果是pb格式if (record instanceof byte[]) {return doParseProtobuf((byte[]) record);} else if (record instanceof String) {// 普通kafka消息String json = record.toString();if (json.startsWith("[")) {// 数组List<T> datas = doParseJsonArray(json);if (CommonUtil.isEmpty(datas)) {log.warn("{} doParse error, json={} is error.", name, json);return Stream.empty();}// 反序列对象后,做一些初始化操作datas = datas.stream().peek(this::doAfterParse).collect(Collectors.toList());return datas.stream();} else {// 对象T data = doParseJsonObject(json);if (null == data) {log.warn("{} doParse error, json={} is error.", name, json);return Stream.empty();}// 反序列对象后,做一些初始化操作doAfterParse(data);return Stream.of(data);}} else if (record instanceof MmcKafkaMsg) {// 如果本身就是PandoKafkaMsg对象,直接返回//noinspection uncheckedreturn Stream.of((T) record);} else {throw new UnsupportedForMessageFormatException("not support message type");}}/*** 将json消息解析为实体.** @param json kafka消息* @return 实体类*/protected T doParseJsonObject(String json) {if (properties.isSnakeCase()) {return JsonUtil.parseSnackJson(json, getEntityClass());} else {return JsonUtil.parseJsonObject(json, getEntityClass());}}/*** 将json消息解析为数组.** @param json kafka消息* @return 数组*/protected List<T> doParseJsonArray(String json) {if (properties.isSnakeCase()) {try {return JsonUtil.parseSnackJsonArray(json, getEntityClass());} catch (Exception e) {throw new RuntimeException(e);}} else {return JsonUtil.parseJsonArray(json, getEntityClass());}}/*** 序列化为pb格式,假设你消费的是pb消息,需要自行实现这个类.** @param record pb字节数组* @return pb实体类流*/protected Stream<T> doParseProtobuf(byte[] record) {throw new NotImplementedException();}
}
2、修改MmcKafkaBeanPostProcessor类,暂存KafkaAbastrctProcessor的子类。
public class MmcKafkaBeanPostProcessor implements BeanPostProcessor {@Getterprivate final Map<String, KafkaAbstractProcessor<?>> suitableClass = new ConcurrentHashMap<>();@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof KafkaAbstractProcessor) {KafkaAbstractProcessor<?> target = (KafkaAbstractProcessor<?>) bean;suitableClass.putIfAbsent(beanName, target);suitableClass.putIfAbsent(bean.getClass().getName(), target);}return bean;}
}
3、修改MmcKafkaProcessorFactory,更换构造的目标类为KafkaAbstractProcessor。
public class MmcKafkaProcessorFactory {@Resourceprivate DefaultListableBeanFactory defaultListableBeanFactory;public KafkaAbstractProcessor<? > buildInputer(String name, MmcMultiKafkaProperties.MmcKafkaProperties properties,Map<String, KafkaAbstractProcessor<? >> suitableClass) throws Exception {// 如果没有配置process,则直接从注册的Bean里查找if (!StringUtils.hasText(properties.getProcessor())) {return findProcessorByName(name, properties.getProcessor(), suitableClass);}// 如果配置了process,则从指定配置中生成实例// 判断给定的配置是类,还是bean名称if (!isClassName(properties.getProcessor())) {throw new IllegalArgumentException("It's not a class, wrong value of ${spring.kafka." + name + ".processor}.");}// 如果ioc容器已经存在该处理实例,则直接使用,避免既配置了process,又使用了@Service等注解KafkaAbstractProcessor<? > inc = findProcessorByClass(name, properties.getProcessor(), suitableClass);if (null != inc) {return inc;}// 指定的processor处理类必须继承KafkaAbstractProcessorClass<?> clazz = Class.forName(properties.getProcessor());boolean isSubclass = KafkaAbstractProcessor.class.isAssignableFrom(clazz);if (!isSubclass) {throw new IllegalStateException(clazz.getName() + " is not subClass of KafkaAbstractProcessor.");}// 创建实例Constructor<?> constructor = clazz.getConstructor();KafkaAbstractProcessor<? > ins = (KafkaAbstractProcessor<? >) constructor.newInstance();// 注入依赖的变量defaultListableBeanFactory.autowireBean(ins);return ins;}private KafkaAbstractProcessor<? > findProcessorByName(String name, String processor, Map<String,KafkaAbstractProcessor<? >> suitableClass) {return suitableClass.entrySet().stream().filter(e -> e.getKey().startsWith(name) || e.getKey().equalsIgnoreCase(processor)).map(Map.Entry::getValue).findFirst().orElseThrow(() -> new RuntimeException("Can't found any suitable processor class for the consumer which name is " + name+ ", please use the config ${spring.kafka." + name + ".processor} or set name of Bean like @Service(\"" + name + "Processor\") "));}private KafkaAbstractProcessor<? > findProcessorByClass(String name, String processor, Map<String,KafkaAbstractProcessor<? >> suitableClass) {return suitableClass.entrySet().stream().filter(e -> e.getKey().startsWith(name) || e.getKey().equalsIgnoreCase(processor)).map(Map.Entry::getValue).findFirst().orElse(null);}private boolean isClassName(String processor) {// 使用正则表达式验证类名格式String regex = "^[a-zA-Z_$][a-zA-Z\\d_$]*([.][a-zA-Z_$][a-zA-Z\\d_$]*)*$";return Pattern.matches(regex, processor);}}
4、修改MmcMultiConsumerAutoConfiguration,更换构造的目标类的父类为KafkaAbstractProcessor。
@Beanpublic MmcKafkaInputerContainer mmcKafkaInputerContainer(MmcKafkaProcessorFactory factory,MmcKafkaBeanPostProcessor beanPostProcessor) throws Exception {Map<String, MmcInputer> inputers = new HashMap<>();Map<String, MmcMultiKafkaProperties.MmcKafkaProperties> kafkas = mmcMultiKafkaProperties.getKafka();// 逐个遍历,并生成consumerfor (Map.Entry<String, MmcMultiKafkaProperties.MmcKafkaProperties> entry : kafkas.entrySet()) {// 唯一消费者名称String name = entry.getKey();// 消费者配置MmcMultiKafkaProperties.MmcKafkaProperties properties = entry.getValue();// 是否开启if (properties.isEnabled()) {// 生成消费者KafkaAbstractProcessor inputer = factory.buildInputer(name, properties, beanPostProcessor.getSuitableClass());// 输入源容器ConcurrentMessageListenerContainer<Object, Object> container = concurrentMessageListenerContainer(properties);// 设置容器inputer.setContainer(container);inputer.setName(name);inputer.setProperties(properties);// 设置消费者container.setupMessageListener(inputer);// 关闭时候停止消费Runtime.getRuntime().addShutdownHook(new Thread(inputer::stop));// 直接启动container.start();// 加入集合inputers.put(name, inputer);}}return new MmcKafkaInputerContainer(inputers);}
5、修改MmcKafkaKafkaAbastrctProcessor,用于实现kafka的BatchMessageListener 接口,当然你也可以实现其它Listener接口,或者在这基础上扩展。
public abstract class MmcKafkaKafkaAbastrctProcessor<T> extends KafkaAbstractProcessor<T> implements BatchMessageListener<String, Object> {@Overridepublic void onMessage(List<ConsumerRecord<String, Object>> records) {if (null == records || CollectionUtils.isEmpty(records)) {log.warn("{} records is null or records.value is empty.", name);return;}receiveMessage(records);}
}
五、测试一下
1、引入kafka测试需要的jar。参考文章:kafka单元测试
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.18.0</version><scope>test</scope></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java-util</artifactId><version>3.18.0</version><scope>test</scope></dependency>
2、定义一个pb类型消息和业务处理类。
(1) 定义pb,然后通过命令生成对应的实体类;
syntax = "proto2";package com.mmc.multi.kafka;option java_package = "com.mmc.multi.kafka.starter.proto";
option java_outer_classname = "DemoPb";message PbMsg {optional string routekey = 1;optional string cosImgUrl = 2;optional string base64str = 3;}
(2)创建PbProcessor消息处理类,用于消费protobuf类型的消息;
@Slf4j
@Service("pbProcessor")
public class PbProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {@Overrideprotected Stream<DemoMsg> doParseProtobuf(byte[] record) {try {DemoPb.PbMsg msg = DemoPb.PbMsg.parseFrom(record);DemoMsg demo = new DemoMsg();BeanUtils.copyProperties(msg, demo);return Stream.of(demo);} catch (InvalidProtocolBufferException e) {log.error("parssPbError", e);return Stream.empty();}}@Overrideprotected void dealMessage(List<DemoMsg> datas) {System.out.println("PBdatas: " + datas);}
}
3、配置kafka地址和指定业务处理类。
## pb 消息消费者
spring.kafka.pb.enabled=true
spring.kafka.pb.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.pb.topic=mmc-topic-pb
spring.kafka.pb.group-id=group-consumer-pb
spring.kafka.pb.processor=pbProcessor
spring.kafka.pb.consumer.auto-offset-reset=latest
spring.kafka.pb.consumer.max-poll-records=10
spring.kafka.pb.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.pb.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
4、编写测试类。
@Slf4j
@ActiveProfiles("dev")
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {MmcMultiConsumerAutoConfiguration.class, DemoService.class, PbProcessor.class})
@TestPropertySource(value = "classpath:application-pb.properties")
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"},topics = {"${spring.kafka.pb.topic}"})
class KafkaPbMessageTest {@Resourceprivate EmbeddedKafkaBroker embeddedKafkaBroker;@Value("${spring.kafka.pb.topic}")private String topicPb;@Testvoid testDealMessage() throws Exception {Thread.sleep(2 * 1000);// 模拟生产数据produceMessage();Thread.sleep(10 * 1000);}void produceMessage() {Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));Producer<String, byte[]> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new ByteArraySerializer()).createProducer();for (int i = 0; i < 10; i++) {DemoPb.PbMsg msg = DemoPb.PbMsg.newBuilder().setCosImgUrl("http://google.com").setRoutekey("routekey-" + i).build();producer.send(new ProducerRecord<>(topicPb, "my-aggregate-id", msg.toByteArray()));producer.flush();}}
}
5、运行一下,测试通过。

五、小结
将本项目代码构建成starter,就可以大大提升我们开发效率,我们只需要关心业务代码的开发,github项目源码:轻触这里。如果对你有用可以打个星星哦。下一篇,升级本starter,在kafka单分区下实现十万级消费处理速度。
《搭建大型分布式服务(三十六)SpringBoot 零代码方式整合多个kafka数据源》
《搭建大型分布式服务(三十七)SpringBoot 整合多个kafka数据源-取消限定符》
《搭建大型分布式服务(三十八)SpringBoot 整合多个kafka数据源-支持protobuf》
加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你
![]() | ![]() |
相关文章:
搭建大型分布式服务(三十八)SpringBoot 整合多个kafka数据源-支持protobuf
系列文章目录 文章目录 系列文章目录前言一、本文要点二、开发环境三、原项目四、修改项目五、测试一下五、小结 前言 本插件稳定运行上百个kafka项目,每天处理上亿级的数据的精简小插件,快速上手。 <dependency><groupId>io.github.vipjo…...
SpringBoot如何使用日志Logback,及日志等级详解
Spring Boot默认已经集成了SLF4J(Simple Logging Facade for Java)作为日志的接口,以及Logback作为日志的实现。这意味着在大多数情况下,你无需做额外的配置即可开始记录日志。 下面是一个简要的指南,包括如何在Spring…...
若依启动run-modules-system.bat报错问题解决方案
在启动run-modules-system.bat时遇到了一些问题,在网上搜索无果后,排查解决完毕 1.启动nacos时,报错如下 Error creating bean with name grpcClusterServer: Invocation of init method failed; nested exception is java.io.IOException: Failed to bind to address 0.0.0.0…...
Aws CodeCommit代码仓储库
1 创建IAM用户 IAM创建admin用户,增加AWSCodeCommitFullAccess权限 2 创建存储库 CodePipeline -> CodeCommit -> 存储库 创建存储库 3 SSH 1) window环境 3.1.1 上载SSH公有秘钥 生成SSH秘钥ID 3.1.2 编辑本地 ~/.ssh 目录中名为“config”的 SSH 配置文…...
PostgreSQL的内存参数
PostgreSQL的内存参数 基础信息 OS版本:Red Hat Enterprise Linux Server release 7.9 (Maipo) DB版本:16.2 pg软件目录:/home/pg16/soft pg数据目录:/home/pg16/data 端口:5777PostgreSQL 提供了多种内存参数&#x…...
【教程】在CentOS上使用Docker部署前后端分离项目的完整指南
当在CentOS上使用Docker部署前后端分离项目时,需要遵循一系列步骤来实现这一目标。以下是每个步骤的详细内容: 步骤1:安装Docker和Docker Compose 1.1 安装Docker 在CentOS上安装Docker,可以按照以下步骤进行: sudo yum install -y yum-utils device-mapper-persistent…...
某公司新招了个牛逼的架构师后.....
网友评论: 架构师一个响指之后。第二天,老板不见了走走停停 回头已是数月图片是我的故事没错了,本来我们组有10个人,我把代码重构之后,只要半个人维护,于是老板要裁掉9个>人,于是我被搞走了图…...
云计算和雾计算
雾计算作为传统集中式数据存储系统(云)和边缘设备之间的中间层。雾扩展了云,使计算和数据存储更接近边缘。雾由多个节点(雾节点)组成,并创建一个本地网络,使其成为一个去中心化的生态系统——雾…...
正缘画像 api数据接口
测测正缘画像,相貌特征,高矮胖瘦,黑白美丑,对方何许人也,远嫁近娶,何方定居,家庭观,持家爱家,生活质量,富裕贫穷,健康情况,测算结果仅…...
Java 基础面试300题 (171- 200)
Java 基础面试300题 (171- 200) 171.什么是同步? 当多个线程试图同时访问共享资源时,那么他们需要以某种方式让资源一次只能由一个线程访问。实现这一目标的过程被称为同步。Java提供了一个名为synchronized的关键字实现这一目标…...
0基础学习Elasticsearch-使用Java操作ES
文章目录 1 背景2 前言3 Java如何操作ES3.1 引入依赖3.2 依赖介绍3.3 隐藏依赖3.4 初始化客户端(获取ES连接)3.5 发送请求给ES 1 背景 上篇学习了0基础学习Elasticsearch-Quick start,随后本篇研究如何使用Java操作ES 2 前言 建议通篇阅读再回…...
【Linux】GNU编译器基础
文章目录 GCCMakefile、make GCC 常见的GNU编译器是GCC其包含gcc以及g等,适用于C/C中,在Windows系统中通常使用IDE进行程序的编写和编译、链接等操作,但在Linux系统中通常使用GNU编译器来进行,对于C/C等高级语言需要进行预编译、编…...
Linux 软件安装:从源码编译到包管理器安装
Linux 软件安装:从源码编译到包管理器安装 在 Linux 操作系统中,软件安装是一个非常重要的任务。不同的软件安装方式有不同的优缺点,本篇博客将介绍 Linux 软件安装的几种方式,包括从源码编译安装、使用包管理器安装和使用第三方…...
Python3 match-case 语句
前言 本文主要介绍match-case语句与switch-case的区别,及match-case语句的基本用法。 文章目录 前言一、switch-case 和match-case的区别二、match-case的基本用法1、可匹配的数据类型2、多条件匹配3、通配符匹配 一、switch-case 和match-case的区别 C语言里面s…...
图论第三天
似乎要团建了,我再猫会。我必须参与上团建再走。 130.被围绕的区域 先把外围的O变成A,再把飞地的O变成X,再把外围A变回O class Solution { public:int neighbor[4][2] {1,0,0,-1,-1,0,0,1};void solve(vector<vector<char>>&a…...
计算机网络学习2
文章目录 信道复用技术 第三章数据链路层概述数据链路层的三个重要问题封装成帧和透明传输差错检测可靠传输的相关基本概念可靠传输的实现机制停止等待协议回退N帧协议选择重传协议 点对点协议PPP共享式以太网网络适配器和MAC地址CSMA_CD协议的基本原理共享式以太网的争用期共享…...
unittest框架
目录 框架: unittest框架: 使用的原因: 核心要素(组成): TestCase测试用例: 可能出现的错误: TestSuite(测试套件): TestRunner(测试执行): 整体步骤: 查看执行结果: TestLoader测试加载: 方法级别Fixture: 类级别Fixture: 模块级别Fixture: 用例脚本…...
Python中的__str__和__repr__:揭示字符串表示的奥秘
标题:Python中的__str__和__repr__:揭示字符串表示的奥秘 摘要 在Python中,对象的字符串表示对于调试和日志记录至关重要。__str__和__repr__是两个特殊的方法,用于定义对象的字符串表示形式。尽管它们在功能上相似,…...
gazebo插入一个图片
在下面的目录下添加文件夹 my_ground_plane 文件夹内容如下 model.sdf <?xml version"1.0" encoding"UTF-8"?> <sdf version"1.4"><model name"my_ground_plane"><static>true</static><link na…...
(已解决)Bootstrap精美弹出框模态框modal,实现js向modal传递数据
Modals是Bootstrap中用户弹框用的组件,使用时不需要额外引入其他插件,在引入了boostrap.js或者boostrap.min.js前提下就可以使用。 官方的示例: <!-- Button trigger modal --> <button type="button" class="btn btn-primary" data-bs-…...
JavaScript 中的 ES|QL:利用 Apache Arrow 工具
作者:来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗?了解下一期 Elasticsearch Engineer 培训的时间吧! Elasticsearch 拥有众多新功能,助你为自己…...
Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)
文章目录 1.什么是Redis?2.为什么要使用redis作为mysql的缓存?3.什么是缓存雪崩、缓存穿透、缓存击穿?3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
可靠性+灵活性:电力载波技术在楼宇自控中的核心价值
可靠性灵活性:电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中,电力载波技术(PLC)凭借其独特的优势,正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据,无需额外布…...
iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版分享
平时用 iPhone 的时候,难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵,或者买了二手 iPhone 却被原来的 iCloud 账号锁住,这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...
【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力
引言: 在人工智能快速发展的浪潮中,快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型(LLM)。该模型代表着该领域的重大突破,通过独特方式融合思考与非思考…...
如何将联系人从 iPhone 转移到 Android
从 iPhone 换到 Android 手机时,你可能需要保留重要的数据,例如通讯录。好在,将通讯录从 iPhone 转移到 Android 手机非常简单,你可以从本文中学习 6 种可靠的方法,确保随时保持连接,不错过任何信息。 第 1…...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
稳定币的深度剖析与展望
一、引言 在当今数字化浪潮席卷全球的时代,加密货币作为一种新兴的金融现象,正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而,加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下,稳定…...

