当前位置: 首页 > news >正文

聊聊KafkaListener的实现机制

本文只要研究一下KafkaListener的实现机制

KafkaListener

org/springframework/kafka/annotation/KafkaListener.java

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {String id() default "";String containerFactory() default "";String[] topics() default {};String topicPattern() default "";TopicPartition[] topicPartitions() default {};String containerGroup() default "";String errorHandler() default "";String groupId() default "";boolean idIsGroup() default true;String clientIdPrefix() default "";String beanRef() default "__listener";String concurrency() default "";String autoStartup() default "";String[] properties() default {};
}

KafkaListener注解定义了id、topics、groupId等属性

KafkaListenerAnnotationBeanPostProcessor

org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

public class KafkaListenerAnnotationBeanPostProcessor<K, V>implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();	@Overridepublic int getOrder() {return LOWEST_PRECEDENCE;}@Overridepublic void setBeanFactory(BeanFactory beanFactory) {this.beanFactory = beanFactory;if (beanFactory instanceof ConfigurableListableBeanFactory) {this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,this.listenerScope);}}@Overridepublic void afterSingletonsInstantiated() {this.registrar.setBeanFactory(this.beanFactory);if (this.beanFactory instanceof ListableBeanFactory) {Map<String, KafkaListenerConfigurer> instances =((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);for (KafkaListenerConfigurer configurer : instances.values()) {configurer.configureKafkaListeners(this.registrar);}}if (this.registrar.getEndpointRegistry() == null) {if (this.endpointRegistry == null) {Assert.state(this.beanFactory != null,"BeanFactory must be set to find endpoint registry by bean name");this.endpointRegistry = this.beanFactory.getBean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,KafkaListenerEndpointRegistry.class);}this.registrar.setEndpointRegistry(this.endpointRegistry);}if (this.defaultContainerFactoryBeanName != null) {this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);}// Set the custom handler method factory once resolved by the configurerMessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();if (handlerMethodFactory != null) {this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);}else {addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);}// Actually register all listenersthis.registrar.afterPropertiesSet();}@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class<?> targetClass = AopUtils.getTargetClass(bean);Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);final boolean hasClassLevelListeners = classLevelListeners.size() > 0;final List<Method> multiMethods = new ArrayList<>();Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {Set<KafkaListener> listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (hasClassLevelListeners) {Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method ->AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(bean.getClass());if (this.logger.isTraceEnabled()) {this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());}}else {// Non-empty set of methodsfor (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (KafkaListener listener : entry.getValue()) {processKafkaListener(listener, method, bean, beanName);}}if (this.logger.isDebugEnabled()) {this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"+ beanName + "': " + annotatedMethods);}}if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}		
}		

KafkaListenerAnnotationBeanPostProcessor实现了BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton接口,其getOrder返回LOWEST_PRECEDENCE
其afterSingletonsInstantiated方法(SmartInitializingSingleton接口)首先获取KafkaListenerConfigurer,然后设置configureKafkaListeners为registrar,最后是执行registrar.afterPropertiesSet()
其postProcessAfterInitialization方法(BeanPostProcessor接口)会收集标注KafkaListener的bean的方法,然后针对每个方法执行processKafkaListener

processKafkaListener

	protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {Method methodToUse = checkProxy(method, bean);MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();endpoint.setMethod(methodToUse);processListener(endpoint, kafkaListener, bean, methodToUse, beanName);}protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,Object bean, Object adminTarget, String beanName) {String beanRef = kafkaListener.beanRef();if (StringUtils.hasText(beanRef)) {this.listenerScope.addListener(beanRef, bean);}endpoint.setBean(bean);endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);endpoint.setId(getEndpointId(kafkaListener));endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));endpoint.setTopics(resolveTopics(kafkaListener));endpoint.setTopicPattern(resolvePattern(kafkaListener));endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));String group = kafkaListener.containerGroup();if (StringUtils.hasText(group)) {Object resolvedGroup = resolveExpression(group);if (resolvedGroup instanceof String) {endpoint.setGroup((String) resolvedGroup);}}String concurrency = kafkaListener.concurrency();if (StringUtils.hasText(concurrency)) {endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));}String autoStartup = kafkaListener.autoStartup();if (StringUtils.hasText(autoStartup)) {endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));}resolveKafkaProperties(endpoint, kafkaListener.properties());KafkaListenerContainerFactory<?> factory = null;String containerFactoryBeanName = resolve(kafkaListener.containerFactory());if (StringUtils.hasText(containerFactoryBeanName)) {Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");try {factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);}catch (NoSuchBeanDefinitionException ex) {throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);}}endpoint.setBeanFactory(this.beanFactory);String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");if (StringUtils.hasText(errorHandlerBeanName)) {endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));}this.registrar.registerEndpoint(endpoint, factory);if (StringUtils.hasText(beanRef)) {this.listenerScope.removeListener(beanRef);}}	

processKafkaListener方法将method转换为MethodKafkaListenerEndpoint,然后执行processListener方法,它主要是将KafkaListener注解的信息填充到MethodKafkaListenerEndpoint上,确定KafkaListenerContainerFactory,最后执行registrar.registerEndpoint(endpoint, factory)

registrar.registerEndpoint

org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java

	/*** Register a new {@link KafkaListenerEndpoint} alongside the* {@link KafkaListenerContainerFactory} to use to create the underlying container.* <p>The {@code factory} may be {@code null} if the default factory has to be* used for that endpoint.* @param endpoint the {@link KafkaListenerEndpoint} instance to register.* @param factory the {@link KafkaListenerContainerFactory} to use.*/public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {Assert.notNull(endpoint, "Endpoint must be set");Assert.hasText(endpoint.getId(), "Endpoint id must be set");// Factory may be null, we defer the resolution right before actually creating the containerKafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);synchronized (this.endpointDescriptors) {if (this.startImmediately) { // Register and start immediatelythis.endpointRegistry.registerListenerContainer(descriptor.endpoint,resolveContainerFactory(descriptor), true);}else {this.endpointDescriptors.add(descriptor);}}}

KafkaListenerEndpointRegistrar的registerEndpoint会创建KafkaListenerEndpointDescriptor,然后执行endpointRegistry.registerListenerContainer

endpointRegistry.registerListenerContainer

org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

	public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,boolean startImmediately) {Assert.notNull(endpoint, "Endpoint must not be null");Assert.notNull(factory, "Factory must not be null");String id = endpoint.getId();Assert.hasText(id, "Endpoint id must not be empty");synchronized (this.listenerContainers) {Assert.state(!this.listenerContainers.containsKey(id),"Another endpoint is already registered with id '" + id + "'");MessageListenerContainer container = createListenerContainer(endpoint, factory);this.listenerContainers.put(id, container);if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {List<MessageListenerContainer> containerGroup;if (this.applicationContext.containsBean(endpoint.getGroup())) {containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);}else {containerGroup = new ArrayList<MessageListenerContainer>();this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);}containerGroup.add(container);}if (startImmediately) {startIfNecessary(container);}}}/*** Start the specified {@link MessageListenerContainer} if it should be started* on startup.* @param listenerContainer the listener container to start.* @see MessageListenerContainer#isAutoStartup()*/private void startIfNecessary(MessageListenerContainer listenerContainer) {if (this.contextRefreshed || listenerContainer.isAutoStartup()) {listenerContainer.start();}}	

KafkaListenerEndpointRegistry的registerListenerContainer方法会根据endpoint和factory来创建MessageListenerContainer,然后放入到listenerContainers中,对于startImmediately的会执行startIfNecessary,它主要是执行listenerContainer.start()

MessageListenerContainer

org/springframework/kafka/listener/MessageListenerContainer.java

public interface MessageListenerContainer extends SmartLifecycle {void setupMessageListener(Object messageListener);Map<String, Map<MetricName, ? extends Metric>> metrics();default ContainerProperties getContainerProperties() {throw new UnsupportedOperationException("This container doesn't support retrieving its properties");}default Collection<TopicPartition> getAssignedPartitions() {throw new UnsupportedOperationException("This container doesn't support retrieving its assigned partitions");}default void pause() {throw new UnsupportedOperationException("This container doesn't support pause");}default void resume() {throw new UnsupportedOperationException("This container doesn't support resume");}default boolean isPauseRequested() {throw new UnsupportedOperationException("This container doesn't support pause/resume");}default boolean isContainerPaused() {throw new UnsupportedOperationException("This container doesn't support pause/resume");}default void setAutoStartup(boolean autoStartup) {// empty}default String getGroupId() {throw new UnsupportedOperationException("This container does not support retrieving the group id");}@Nullabledefault String getListenerId() {throw new UnsupportedOperationException("This container does not support retrieving the listener id");}
}

MessageListenerContainer继承了SmartLifecycle接口,它有一个泛型接口为GenericMessageListenerContainer,后者有一个抽象类为AbstractMessageListenerContainer,然后它有两个子类,分别是KafkaMessageListenerContainer与ConcurrentMessageListenerContainer

AbstractMessageListenerContainer

public abstract class AbstractMessageListenerContainer<K, V>implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware {@Overridepublic final void start() {checkGroupId();synchronized (this.lifecycleMonitor) {if (!isRunning()) {Assert.isTrue(this.containerProperties.getMessageListener() instanceof GenericMessageListener,() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");doStart();}}}@Overridepublic final void stop() {synchronized (this.lifecycleMonitor) {if (isRunning()) {final CountDownLatch latch = new CountDownLatch(1);doStop(latch::countDown);try {latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); // NOSONARpublishContainerStoppedEvent();}catch (InterruptedException e) {Thread.currentThread().interrupt();}}}}//......
}		

AbstractMessageListenerContainer的start方法会回调子类的doStart方法,其stop方法会回调子类的doStop方法

KafkaMessageListenerContainer

org/springframework/kafka/listener/KafkaMessageListenerContainer.java

public class KafkaMessageListenerContainer<K, V> // NOSONAR comment densityextends AbstractMessageListenerContainer<K, V> {@Overrideprotected void doStart() {if (isRunning()) {return;}if (this.clientIdSuffix == null) { // stand-alone containercheckTopics();}ContainerProperties containerProperties = getContainerProperties();checkAckMode(containerProperties);Object messageListener = containerProperties.getMessageListener();Assert.state(messageListener != null, "A MessageListener is required");if (containerProperties.getConsumerTaskExecutor() == null) {SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;ListenerType listenerType = deteremineListenerType(listener);this.listenerConsumer = new ListenerConsumer(listener, listenerType);setRunning(true);this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);}//......
}		

KafkaMessageListenerContainer的doStart方法会获取到messageListener,然后创建ListenerConsumer,最后提交到线程池中执行

ConcurrentMessageListenerContainer

org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {@Overrideprotected void doStart() {if (!isRunning()) {checkTopics();ContainerProperties containerProperties = getContainerProperties();TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();if (topicPartitions != null && this.concurrency > topicPartitions.length) {this.logger.warn("When specific partitions are provided, the concurrency must be less than or "+ "equal to the number of partitions; reduced from " + this.concurrency + " to "+ topicPartitions.length);this.concurrency = topicPartitions.length;}setRunning(true);for (int i = 0; i < this.concurrency; i++) {KafkaMessageListenerContainer<K, V> container;if (topicPartitions == null) {container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties);}else {container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,containerProperties, partitionSubset(containerProperties, i));}String beanName = getBeanName();container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);if (getApplicationEventPublisher() != null) {container.setApplicationEventPublisher(getApplicationEventPublisher());}container.setClientIdSuffix("-" + i);container.setGenericErrorHandler(getGenericErrorHandler());container.setAfterRollbackProcessor(getAfterRollbackProcessor());container.setRecordInterceptor(getRecordInterceptor());container.setEmergencyStop(() -> {stop(() -> {// NOSONAR});publishContainerStoppedEvent();});if (isPaused()) {container.pause();}container.start();this.containers.add(container);}}}//......	
}

ConcurrentMessageListenerContainer的doStart会根据concurrency值来创建对应的KafkaMessageListenerContainer,然后执行其start方法

ListenerConsumer

org/springframework/kafka/listener/KafkaMessageListenerContainer.java

private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {@Overridepublic void run() {this.consumerThread = Thread.currentThread();if (this.genericListener instanceof ConsumerSeekAware) {((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);}if (this.transactionManager != null) {ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);}this.count = 0;this.last = System.currentTimeMillis();initAsignedPartitions();while (isRunning()) {try {pollAndInvoke();}catch (@SuppressWarnings(UNUSED) WakeupException e) {// Ignore, we're stopping or applying immediate foreign acks}catch (NoOffsetForPartitionException nofpe) {this.fatalError = true;ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);break;}catch (Exception e) {handleConsumerException(e);}catch (Error e) { // NOSONAR - rethrownRunnable runnable = KafkaMessageListenerContainer.this.emergencyStop;if (runnable != null) {runnable.run();}this.logger.error("Stopping container due to an Error", e);wrapUp();throw e;}}wrapUp();}protected void pollAndInvoke() {if (!this.autoCommit && !this.isRecordAck) {processCommits();}processSeeks();checkPaused();ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout);this.lastPoll = System.currentTimeMillis();checkResumed();debugRecords(records);if (records != null && records.count() > 0) {if (this.containerProperties.getIdleEventInterval() != null) {this.lastReceive = System.currentTimeMillis();}invokeListener(records);}else {checkIdle();}}private void invokeListener(final ConsumerRecords<K, V> records) {if (this.isBatchListener) {invokeBatchListener(records);}else {invokeRecordListener(records);}}private void doInvokeBatchOnMessage(final ConsumerRecords<K, V> records,List<ConsumerRecord<K, V>> recordList) {switch (this.listenerType) {case ACKNOWLEDGING_CONSUMER_AWARE:this.batchListener.onMessage(recordList,this.isAnyManualAck? new ConsumerBatchAcknowledgment(records): null, this.consumer);break;case ACKNOWLEDGING:this.batchListener.onMessage(recordList,this.isAnyManualAck? new ConsumerBatchAcknowledgment(records): null);break;case CONSUMER_AWARE:this.batchListener.onMessage(recordList, this.consumer);break;case SIMPLE:this.batchListener.onMessage(recordList);break;}}private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {ConsumerRecord<K, V> record = recordArg;if (this.recordInterceptor != null) {record = this.recordInterceptor.intercept(record);}if (record == null) {if (this.logger.isDebugEnabled()) {this.logger.debug("RecordInterceptor returned null, skipping: " + recordArg);}}else {switch (this.listenerType) {case ACKNOWLEDGING_CONSUMER_AWARE:this.listener.onMessage(record,this.isAnyManualAck? new ConsumerAcknowledgment(record): null, this.consumer);break;case CONSUMER_AWARE:this.listener.onMessage(record, this.consumer);break;case ACKNOWLEDGING:this.listener.onMessage(record,this.isAnyManualAck? new ConsumerAcknowledgment(record): null);break;case SIMPLE:this.listener.onMessage(record);break;}}}									//......	
}

ListenerConsumer实现了org.springframework.scheduling.SchedulingAwareRunnable接口(它继承了Runnable接口)以及org.springframework.kafka.listener.ConsumerSeekCallback接口
其run方法主要是执行initAsignedPartitions,然后循环执行pollAndInvoke,对于NoOffsetForPartitionException则跳出异常,对于其他Exception则执行handleConsumerException,对于Error执行emergencyStop与wrapUp方法
pollAndInvoke方法主要是执行consumer.poll(),然后通过invokeListener(records)回调,最后是通过doInvokeBatchOnMessage、doInvokeOnMessage去回调listener.onMessage方法

小结

KafkaListenerAnnotationBeanPostProcessor主要是收集标注KafkaListener的bean的方法,然后针对每个方法执行processKafkaListener,processKafkaListener方法将method转换为MethodKafkaListenerEndpoint,执行registrar.registerEndpoint(endpoint, factory)
KafkaListenerEndpointRegistry的registerListenerContainer方法会根据endpoint和factory来创建MessageListenerContainer,然后放入到listenerContainers中,对于startImmediately的会执行startIfNecessary,它主要是执行listenerContainer.start()
MessageListenerContainer有两个主要的实现类分别是KafkaMessageListenerContainer与ConcurrentMessageListenerContainer,后者的start方法主要是根据concurrency创建对应数量的KafkaMessageListenerContainer,最后都是执行KafkaMessageListenerContainer的start方法,它会创建ListenerConsumer,最后提交到线程池中执行;ListenerConsumer主要是执行pollAndInvoke,拉取消息,然后回到listener的onMessage方法
整体的链路就是KafkaListenerAnnotationBeanPostProcessor --> KafkaListenerEndpointRegistry --> MessageListenerContainer --> GenericMessageListener.onMessage

相关文章:

聊聊KafkaListener的实现机制

序 本文只要研究一下KafkaListener的实现机制 KafkaListener org/springframework/kafka/annotation/KafkaListener.java Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE }) Retention(RetentionPolicy.RUNTIME) MessageMapping Documented …...

Golang洗牌算法(Golang乱序算法)

Golang 洗牌算法&#xff08;乱序算法&#xff09;&#xff1b;需求背景&#xff1a;从一个文件下下读取所有文件&#xff0c;获取他们的名字&#xff0c; 将名字乱序排序&#xff0c;按着乱序后的序列&#xff0c;通过名字去找到文件&#xff0c;再上传&#xff0c;以达到上传…...

SpringBoot 源码分析(三) 监听器分析以及属性文件加载分析

前言 在创建SpringBoot项目的时候会在对应的application.properties或者application.yml文件中添加对应的属性信息&#xff0c;这些属性文件是什么时候被加载的&#xff1f;如果要实现自定义的属性文件怎么来实现&#xff1f;在讲属性加载之前先讲下监听器分析。 一、监听器分…...

记录nvm use node.js版本失败,出现报错: exit status 1: ��û���㹻��Ȩ��ִ�д˲�����

使用管理员权限运行cmd&#xff0c;再使用nvm use node.js版本号 参考&#xff1a; nvm use (node版本号)时报错&#xff1a; exit status 1: &#xfffd;&#xfffd;&#xfffd;&#xfffd;&#xfffd;㹻&#xfffd;&#xfffd;Ȩ&#xfffd;&#xfffd;ִ&#xf…...

【蓝牙协议】简介:蓝牙芯片、蓝牙协议架构

文章目录 蓝牙芯片架构另一个视角由下到上看&#xff1a;Controller-->Host由上到下看&#xff1a;Host-->Controller 蓝牙协议架构视角HW层——蓝牙芯片层Transport——数据传输层HOST——协议层 总结 参考&#xff1a;https://zhuanlan.zhihu.com/p/585248998 参考&…...

【深度学习】

什么是深度学习&#xff1f; 感知器 为了实现模拟人类的学习&#xff0c;科学家们首先设计了构成神经网络的基本结构神经元&#xff08;感知器模型&#xff09;&#xff0c;然后再由大量的神经元构成复杂的&#xff0c;能够实现各种功能的神经网络。 这种模式和超能陆战队中的…...

centos启动tomcat 并指定jdk 版本

在tomcat的catalina.sh文件手动设置JAVA_HOME变量即可 例如&#xff1a; 前提是文件存在 保存配置重新启动tomcat...

day37(事件轮询机制 ajaxGet执行步骤与案例(五个步骤) ajax属性 PHP返回JSON对象(两种))

一.事件轮询机制 1. 无论同步还是异步代码都要经过主线程编译&#xff0c;同步代码开始排在执行栈(主线程)上&#xff0c;异步代码开 始存放在任务队列中 2. 主线程优先执行同步代码&#xff0c;同步代码必须前一行执行完&#xff0c;后一行才能执行&#xff1b;当异步代码…...

Flume基本使用--mysql数据输出

MySQL数据输出 在MySQL中建立数据库school&#xff0c;在数据库中建立表student。SQL语句如下&#xff1a; create database school; use school; create table student(id int not null,name varchar(40),age int,grade int,primary key(id) ); 请使用Flume实时捕…...

MySQL——EXPLAIN用法详解

EXPLAIN是MySQL官方提供的sql分析的工具之一&#xff0c;可以用于模拟优化器执行sql查询语句&#xff0c;从而知道MySQL是如何处理sql语句。EXPLAIN主要用于分析查询语句或表结构的性能瓶颈。 以下是基于MySQL5.7.19版本进行分析的&#xff0c;不同版本之间略有差异。 1、EXP…...

69 划分字母区间

划分字母区间 题解1 贪心1&#xff08;方法略笨&#xff0c;性能很差&#xff09;题解2 贪心2&#xff08;参考标答&#xff09; 给你一个字符串 s 。我们要把这个字符串划分为尽可能多的片段&#xff0c;同一字母最多出现在一个片段中。 注意&#xff0c;划分结果需要满足&am…...

文件上传漏洞(1), 文件上传绕过原理

文件上传漏洞 一, 前端校验上传文件 添加 Javascript 代码&#xff0c;然后在 form 表单中 添加 onsubmit"returb checkFile()" <script>function checkFile() {// var file document.getElementsByName(photo)[0].value;var file document.getElementByI…...

【ARM 嵌入式 C 入门及渐进 10 -- 冒泡排序 选择排序 插入排序 快速排序 归并排序 堆排序 比较介绍】

文章目录 排序算法小结排序算法C实现 排序算法小结 C语言中常用的排序算法包括冒泡排序、选择排序、插入排序、快速排序、归并排序、堆排序。下面我们来一一介绍&#xff1a; 冒泡排序&#xff08;Bubble Sort&#xff09;&#xff1a;冒泡排序是通过比较相邻元素的大小进行排…...

虹科 | 解决方案 | 汽车示波器 学校教学方案

虹科Pico汽车示波器是基于PC的设备&#xff0c;特别适用于大课堂的教学、备课以及与师生的互动交流。老师展现讲解波形数据&#xff0c;让学生直观形象地理解汽车的工作原理 高效备课 课前实测&#xff0c;采集波形数据&#xff0c;轻松截图与标注&#xff0c;制作优美的课件&…...

广播和组播(多播)

广播 概述 广播&#xff08;broadcast&#xff09;是指封包在计算机网络中传输时&#xff0c;目的地址为网络中所有设备的一种传输方式。实际上&#xff0c;这里所说的“所有设备”也是限定在一个范围之中&#xff0c;称为“广播域”。并非所有的计算机网络都支持广播&#xf…...

【Linux】gdb调试

目录 进入调试查看代码运行代码断点打断点查断点删断点从一个断点转跳至下一个断点保留断点但不会运行该断点 退出调试逐过程逐语句监视跳转至指定行运行结束当前函数 进入调试 指令&#xff1a;gdb 【可执行文件】&#xff1a; 查看代码 &#xff1a;l 【第几行】如果输入指…...

MySQL创建函数及其使用

MySQL创建函数及其使用 一、MySQL 创建函数二、示例 一、MySQL 创建函数 MySQL 函数是一种可重用的代码块&#xff0c;可以接受输入参数并返回值。你可以在 MySQL 中创建各种类型的函数&#xff0c;包括系统函数、用户定义函数和存储过程。在此处&#xff0c;我们将重点关注用…...

大数据-Storm流式框架(四)---storm容错机制

1、集群节点宕机 Nimbus服务器 硬件 单点故障&#xff1f;可以搭建HA jStorm搭建 nimbus的HA nimbus的信息存储到zookeeper中&#xff0c;只要下游没问题&#xff08;进程退出&#xff09;nimbus退出就不会有问题&#xff0c; 如果在nimbus宕机&#xff0c;也不能提交…...

SpringBoot项目把Mysql从5.7升级到8.0

首先你需要把之前的库导入到mysql库导入到8.0的新库中。&#xff08;导入的时候会报错我是通过navcat备份恢复的&#xff09; 1、项目中需要修改pom文件的依赖 mysql 和 jdbc <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java&…...

RK3568-适配at24c04模块

将at24c04模块连接到开发板i2c2总线上 i2ctool查看i2c2总线上都有哪些设备 UU表示设备地址的从设备被驱动占用,卸载对应的驱动后,UU就会变成从设备地址。at24c04模块设备地址 0x50和0x51是at24c04模块i2c芯片的设备地址。这个从芯片手册上也可以得知。A0 A1 A2表示的是模块对…...

Banana Pi BPI-W3 ArmSoM-W3之RK3588-MIPI-DSI屏幕调试笔记

一. 简介 本文是基于RK3588平台&#xff0c;MIPI屏调试总结。 二. 环境介绍 硬件环境&#xff1a; ArmSoM-W3 RK3588开发板、MIPI-DSI显示屏( ArmSoM官方配件 )软件版本&#xff1a; OS&#xff1a;ArmSoM-W3 Debian11 三. MIPI屏幕调试 3.1 调试总览&#xff0c;调试步骤分…...

Git的远程仓库

Git的远程仓库 添加远程仓库从远程库克隆 添加远程仓库 你在本地创建了一个Git仓库后&#xff0c;又想在GitHub创建一个Git仓库&#xff0c;并且让这两个仓库进行远程同步&#xff0c;这样&#xff0c;GitHub上的仓库既可以作为备份&#xff0c;又可以让其他人通过该仓库来协作…...

Linux虚拟网络设备—Veth Pair

veth是Virtual Ethernet Device的缩写&#xff0c;是一种成对出现的Linux虚拟网络接口设备。它最常用的功能是用于将不同的Linux network namespaces 命名空间网络连接起来&#xff0c;让二个namespaces之间可以进行通信。我们可以简单的把veth pair理解为用一根网线&#xff0…...

Parcelable protocol requires the CREATOR object to be static on class com.test

对于 Parcelable 协议&#xff0c;确实要求 CREATOR 对象必须是静态的。这是因为在反序列化过程中&#xff0c;需要通过 CREATOR 对象来创建 Parcelable 对象的实例。 根据错误信息&#xff0c;涉及到了com.test类中的问题。通常情况下&#xff0c;如果一个内部类需要实现 Par…...

Python的Matplotlib库:数据可视化的利器

引言&#xff1a; Matplotlib是一款强大的Python库&#xff0c;专为数据可视化而设计。无论是绘制折线图、散点图、柱状图还是饼图&#xff0c;Matplotlib都能提供灵活且易于操作的绘图方法。 1. Matplotlib简介 Matplotlib是Python中最流行的绘图库之一&#xff0c;被广泛应…...

普通人做抖店,需要具备什么条件?一篇详解!

我是电商珠珠 抖音小店的热度一直很高&#xff0c;对于想开店的新手来说&#xff0c;不知道需要什么条件&#xff0c;今天我就来给大家详细的讲一下。 一、营业执照 在入驻抖音小店之前&#xff0c;需要准备一张营业执照。 营业执照一共有两种类型&#xff0c;一种为个体工…...

Django分页功能的使用和自定义分装

1. 在settings中进行注册 # drf配置 REST_FRAMEWORK {DEFAULT_AUTHENTICATION_CLASSES: (# rest_framework_jwt.authentication.JSONWebTokenAuthentication,rest_framework_simplejwt.authentication.JWTAuthentication,rest_framework.authentication.SessionAuthenticatio…...

React-hooks有哪些用法?

React Hooks 是 React 16.8 引入的一种新的特性,用于在函数组件中使用状态和其他 React 特性。下面列举了一些常见的 React Hooks 的用法: 1:useState:用于在函数组件中添加状态。: import React, { useState } from react;function MyComponent() {const [count, setCou…...

2024年CFA一级公示表,一级quicksheet(内附分享链接)

随着金融行业的迅速发展&#xff0c;CFA&#xff08;特许金融分析师&#xff09;认证成为了许多金融从业者追求的目标。2024年CFA一级公示表资料的自学&#xff0c;为那些渴望在金融领域取得突破的人们提供了宝贵的机会。 通过自学CFA一级公示表资料&#xff0c;我们可以深入了…...

【Kubernetes】 Kubernetes 了解云原生的原理

Kubernetes 了解云原生的原理 云原生是一种软件设计、实施和部署方法&#xff0c;旨在充分利用基于云的服务和交付模型。云原生[1]应用程序通常也使用分布式架构运行。这意味着应用程序功能被分解为多个服务&#xff0c;然后分布在托管环境中&#xff0c;而不是整合到单个服务…...