当前位置: 首页 > news >正文

springboot2.2.9整合kafka之KafkaListener实现原理

1、开启kafka的注解@EnableKafka

通过开启kafka注解可以看到Import的类KafkaListenerConfigurationSelector加载一个配置类KafkaBootstrapConfiguration,而此类中有两个重要的类:
KafkaListenerAnnotationBeanPostProcessor、KafkaListenerEndpointRegistry

2、KafkaListenerAnnotationBeanPostProcessor类的postProcessAfterInitialization方法

前置知识需要了解BeanPostProcessor的运行时机

	@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {//判断当前bean是否是没有注解的类if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class<?> targetClass = AopUtils.getTargetClass(bean);//获取类上面的注解(KafkaListener、KafkaListeners)Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);//是否存在类级别注解(习惯在方法上后面都是以方法级别介绍)final boolean hasClassLevelListeners = classLevelListeners.size() > 0;final List<Method> multiMethods = new ArrayList<>();//收集类中所有方法上带有KafkaListener、KafkaListeners注解的方法(方法级别、方法级别、方法级别)Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {Set<KafkaListener> listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});//是否存在类级别注解if (hasClassLevelListeners) {Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method ->AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty()) {//方法级别没有注解  把类添加了nonAnnotatedClasses集合this.nonAnnotatedClasses.add(bean.getClass());this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());}else {// Non-empty set of methodsfor (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (KafkaListener listener : entry.getValue()) {//进入processKafkaListener(listener, method, bean, beanName);}}this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"+ beanName + "': " + annotatedMethods);}//类级别注解if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {Method methodToUse = checkProxy(method, bean);//注解封装的endpointMethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();//方法endpoint.setMethod(methodToUse);processListener(endpoint, kafkaListener, bean, methodToUse, beanName);}protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,Object bean, Object adminTarget, String beanName) {String beanRef = kafkaListener.beanRef();if (StringUtils.hasText(beanRef)) {this.listenerScope.addListener(beanRef, bean);}//beanendpoint.setBean(bean);//KafkaHandlerMethodFactoryAdapter消息处理方法工厂endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);//KafkaListener注解设置的id,没有配置就是=>"org.springframework.kafka.KafkaListenerEndpointContainer#+原子自增的数值endpoint.setId(getEndpointId(kafkaListener));endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));endpoint.setTopics(resolveTopics(kafkaListener));endpoint.setTopicPattern(resolvePattern(kafkaListener));endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));String group = kafkaListener.containerGroup();if (StringUtils.hasText(group)) {Object resolvedGroup = resolveExpression(group);if (resolvedGroup instanceof String) {endpoint.setGroup((String) resolvedGroup);}}String concurrency = kafkaListener.concurrency();if (StringUtils.hasText(concurrency)) {endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));}String autoStartup = kafkaListener.autoStartup();if (StringUtils.hasText(autoStartup)) {endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));}resolveKafkaProperties(endpoint, kafkaListener.properties());endpoint.setSplitIterables(kafkaListener.splitIterables());//获取消费者监听工厂KafkaListenerContainerFactory<?> factory = null;String containerFactoryBeanName = resolve(kafkaListener.containerFactory());if (StringUtils.hasText(containerFactoryBeanName)) {Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");try {factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);}catch (NoSuchBeanDefinitionException ex) {throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);}}endpoint.setBeanFactory(this.beanFactory);String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");if (StringUtils.hasText(errorHandlerBeanName)) {endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));}//重点看这里 KafkaListenerEndpointRegistrarthis.registrar.registerEndpoint(endpoint, factory);if (StringUtils.hasText(beanRef)) {this.listenerScope.removeListener(beanRef);}}

KafkaListenerEndpointRegistrar#registerEndpoint

	public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {Assert.notNull(endpoint, "Endpoint must be set");Assert.hasText(endpoint.getId(), "Endpoint id must be set");// Factory may be null, we defer the resolution right before actually creating the container//endpoint和消费者工厂合并到一个类中KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);synchronized (this.endpointDescriptors) {if (this.startImmediately) { //false 是否立刻启动this.endpointRegistry.registerListenerContainer(descriptor.endpoint,resolveContainerFactory(descriptor), true);}else {//主要走这this.endpointDescriptors.add(descriptor);}}}

3、KafkaListenerAnnotationBeanPostProcessor类的afterSingletonsInstantiated方法

前置知识需要了解SmartInitializingSingleton类的afterSingletonsInstantiated方法的运行时机===>
DefaultListableBeanFactory#preInstantiateSingletons方法最下面

	public void preInstantiateSingletons() throws BeansException {//....省部分代码// Trigger post-initialization callback for all applicable beans...for (String beanName : beanNames) {Object singletonInstance = getSingleton(beanName);if (singletonInstance instanceof SmartInitializingSingleton) {SmartInitializingSingleton smartSingleton = (SmartInitializingSingleton) singletonInstance;if (System.getSecurityManager() != null) {AccessController.doPrivileged((PrivilegedAction<Object>) () -> {smartSingleton.afterSingletonsInstantiated();return null;}, getAccessControlContext());}else {//这里这里这里smartSingleton.afterSingletonsInstantiated();}}}}

KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated

	public void afterSingletonsInstantiated() {this.registrar.setBeanFactory(this.beanFactory);if (this.beanFactory instanceof ListableBeanFactory) {//true 这段代码进入Map<String, KafkaListenerConfigurer> instances =((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);for (KafkaListenerConfigurer configurer : instances.values()) {//XXConfigurer的扩展修改registrar里面相关内容configurer.configureKafkaListeners(this.registrar);}}if (this.registrar.getEndpointRegistry() == null) {//true 这段代码进入if (this.endpointRegistry == null) {//第一点提到的KafkaListenerEndpointRegistry类this.endpointRegistry = this.beanFactory.getBean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,KafkaListenerEndpointRegistry.class);}this.registrar.setEndpointRegistry(this.endpointRegistry);}//默认工厂beanName=>"kafkaListenerContainerFactory"if (this.defaultContainerFactoryBeanName != null) {this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);}// Set the custom handler method factory once resolved by the configurer//消息处理方法工厂MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();if (handlerMethodFactory != null) {this.messageHandlerMethodFactory.setHandlerMethodFactory(handlerMethodFactory);}else {addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);}// Actually register all listeners//重点: 注册监听this.registrar.afterPropertiesSet();}

KafkaListenerEndpointRegistrar#afterPropertiesSet

	@Overridepublic void afterPropertiesSet() {registerAllEndpoints();}protected void registerAllEndpoints() {synchronized (this.endpointDescriptors) {//遍历前面收集的endpointDescriptors集合for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint&& this.validator != null) {((MultiMethodKafkaListenerEndpoint) descriptor.endpoint).setValidator(this.validator);}//注册监听工厂this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));}this.startImmediately = true;  // trigger immediate startup}}

KafkaListenerEndpointRegistry#registerListenerContainer

	public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {//传入是否启动为falseregisterListenerContainer(endpoint, factory, false);}public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,boolean startImmediately) {String id = endpoint.getId();synchronized (this.listenerContainers) {//创建监听容器MessageListenerContainer container = createListenerContainer(endpoint, factory);//放入集合this.listenerContainers.put(id, container);//有分组则注册bean(beanName=group, object=>List<MessageListenerContainer>)if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {List<MessageListenerContainer> containerGroup;if (this.applicationContext.containsBean(endpoint.getGroup())) {containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);}else {containerGroup = new ArrayList<MessageListenerContainer>();this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);}containerGroup.add(container);}//falseif (startImmediately) {startIfNecessary(container);}}}protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,KafkaListenerContainerFactory<?> factory) {//创建监听容器MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);if (listenerContainer instanceof InitializingBean) {//falsetry {((InitializingBean) listenerContainer).afterPropertiesSet();}catch (Exception ex) {throw new BeanInitializationException("Failed to initialize message listener container", ex);}}int containerPhase = listenerContainer.getPhase();if (listenerContainer.isAutoStartup() &&containerPhase != AbstractMessageListenerContainer.DEFAULT_PHASE) {  // a custom phase valueif (this.phase != AbstractMessageListenerContainer.DEFAULT_PHASE && this.phase != containerPhase) {throw new IllegalStateException("Encountered phase mismatch between container "+ "factory definitions: " + this.phase + " vs " + containerPhase);}this.phase = listenerContainer.getPhase();}return listenerContainer;}

AbstractKafkaListenerContainerFactory#createListenerContainer

	public C createListenerContainer(KafkaListenerEndpoint endpoint) {//创建从前实例 进入看看C instance = createContainerInstance(endpoint);JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getId(), instance::setBeanName);if (endpoint instanceof AbstractKafkaListenerEndpoint) {configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);}//属性复制endpoint.setupListenerContainer(instance, this.messageConverter);//调用子类的initializeContainerinitializeContainer(instance, endpoint);//扩展customizeContainer(instance);return instance;}//子类: ConcurrentKafkaListenerContainerFactory#initializeContainer@Overrideprotected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance,KafkaListenerEndpoint endpoint) {super.initializeContainer(instance, endpoint);//一个topic启动几个消费者(注意这里是几个消费者,很多项目配置很大再加上几个节点,就设置了很多无用的消费者取while(true)消耗cpu)if (endpoint.getConcurrency() != null) {instance.setConcurrency(endpoint.getConcurrency());}else if (this.concurrency != null) {instance.setConcurrency(this.concurrency);}}

ConcurrentKafkaListenerContainerFactory#createContainerInstance

	protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {//KafkaListener注解上配置内容//指定分区消费TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();if (topicPartitions != null && topicPartitions.length > 0) {ContainerProperties properties = new ContainerProperties(topicPartitions);return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}else {Collection<String> topics = endpoint.getTopics();if (!topics.isEmpty()) {//指定topic消费  ContainerProperties 继承ConsumerPropertiesContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}else {ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern());return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}}}

4、KafkaListenerEndpointRegistry类的start方法

此类实现了SmartLifecycle接口关注start方法;
前置知识需要了解KafkaListenerEndpointRegistry类的start方法的运行时机AbstractApplicationContext#finishRefresh

	protected void finishRefresh() {// Clear context-level resource caches (such as ASM metadata from scanning).clearResourceCaches();// Initialize lifecycle processor for this context.initLifecycleProcessor();//这里这里这里// Propagate refresh to lifecycle processor first.//看这LifecycleProcessor本身就是LifeCycle接口的扩展  DefaultLifecycleProcessor 的 onRefreshgetLifecycleProcessor().onRefresh();// Publish the final event.publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.LiveBeansView.registerApplicationContext(this);}
	public void start() {for (MessageListenerContainer listenerContainer : getListenerContainers()) {startIfNecessary(listenerContainer);}this.running = true;}private void startIfNecessary(MessageListenerContainer listenerContainer) {if (this.contextRefreshed || listenerContainer.isAutoStartup()) {//listenerContainer.isAutoStartup = true//进入ConcurrentMessageListenerContainer父类AbstractMessageListenerContainer#startlistenerContainer.start();}}

//进入ConcurrentMessageListenerContainer父类AbstractMessageListenerContainer#start

	public final void start() {checkGroupId();synchronized (this.lifecycleMonitor) {if (!isRunning()) {Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");//子类ConcurrentMessageListenerContainer#doStartdoStart();}}}

ConcurrentMessageListenerContainer#doStart

protected void doStart() {if (!isRunning()) {checkTopics();ContainerProperties containerProperties = getContainerProperties();TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();if (topicPartitions != null && this.concurrency > topicPartitions.length) {this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or "+ "equal to the number of partitions; reduced from " + this.concurrency + " to "+ topicPartitions.length);this.concurrency = topicPartitions.length;}setRunning(true);for (int i = 0; i < this.concurrency; i++) {//一个topic启动多个消费者//构造KafkaMessageListenerContainerKafkaMessageListenerContainer<K, V> container =constructContainer(containerProperties, topicPartitions, i);String beanName = getBeanName();container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);container.setApplicationContext(getApplicationContext());if (getApplicationEventPublisher() != null) {container.setApplicationEventPublisher(getApplicationEventPublisher());}container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + i : "");container.setGenericErrorHandler(getGenericErrorHandler());container.setAfterRollbackProcessor(getAfterRollbackProcessor());container.setRecordInterceptor(getRecordInterceptor());container.setInterceptBeforeTx(isInterceptBeforeTx());container.setEmergencyStop(() -> {stop(() -> {// NOSONAR});publishContainerStoppedEvent();});if (isPaused()) {container.pause();}//启动 AbstractMessageListenerContainer#startcontainer.start();this.containers.add(container);}}}

KafkaMessageListenerContainer#doStart类

	protected void doStart() {if (isRunning()) {return;}if (this.clientIdSuffix == null) { // stand-alone containercheckTopics();}ContainerProperties containerProperties = getContainerProperties();checkAckMode(containerProperties);Object messageListener = containerProperties.getMessageListener();//创建线程池if (containerProperties.getConsumerTaskExecutor() == null) {SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;ListenerType listenerType = determineListenerType(listener);//ListenerConsumer对象是一个runnable实现类this.listenerConsumer = new ListenerConsumer(listener, listenerType);setRunning(true);this.startLatch = new CountDownLatch(1);//线程池执行任务this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);try {if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {this.logger.error("Consumer thread failed to start - does the configured task executor "+ "have enough threads to support all containers and concurrency?");publishConsumerFailedToStart();}}catch (@SuppressWarnings(UNUSED) InterruptedException e) {Thread.currentThread().interrupt();}}

KafkaMessageListenerContainer.ListenerConsumer#ListenerConsumer

		ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {Properties consumerProperties = propertiesFromProperties();checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);this.autoCommit = determineAutoCommit(consumerProperties);//创建消费者this.consumer =KafkaMessageListenerContainer.this.consumerFactory.createConsumer(this.consumerGroupId,this.containerProperties.getClientId(),KafkaMessageListenerContainer.this.clientIdSuffix,consumerProperties);this.clientId = determineClientId();this.transactionTemplate = determineTransactionTemplate();this.genericListener = listener;this.consumerSeekAwareListener = checkConsumerSeekAware(listener);this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties());subscribeOrAssignTopics(this.consumer);GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();if (listener instanceof BatchMessageListener) {this.listener = null;this.batchListener = (BatchMessageListener<K, V>) listener;this.isBatchListener = true;this.wantsFullRecords = this.batchListener.wantsPollResult();}else if (listener instanceof MessageListener) {this.listener = (MessageListener<K, V>) listener;this.batchListener = null;this.isBatchListener = false;this.wantsFullRecords = false;}else {throw new IllegalArgumentException("Listener must be one of 'MessageListener', "+ "'BatchMessageListener', or the variants that are consumer aware and/or "+ "Acknowledging"+ " not " + listener.getClass().getName());}this.listenerType = listenerType;this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)|| listenerType.equals(ListenerType.CONSUMER_AWARE);if (this.isBatchListener) {validateErrorHandler(true);this.errorHandler = new LoggingErrorHandler();this.batchErrorHandler = determineBatchErrorHandler(errHandler);}else {validateErrorHandler(false);this.errorHandler = determineErrorHandler(errHandler);this.batchErrorHandler = new BatchLoggingErrorHandler();}Assert.state(!this.isBatchListener || !this.isRecordAck,"Cannot use AckMode.RECORD with a batch listener");if (this.containerProperties.getScheduler() != null) {this.taskScheduler = this.containerProperties.getScheduler();this.taskSchedulerExplicitlySet = true;}else {ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();threadPoolTaskScheduler.initialize();this.taskScheduler = threadPoolTaskScheduler;}this.monitorTask = this.taskScheduler.scheduleAtFixedRate(this::checkConsumer,Duration.ofSeconds(this.containerProperties.getMonitorInterval()));if (this.containerProperties.isLogContainerConfig()) {this.logger.info(this.toString());}Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();this.checkNullKeyForExceptions = checkDeserializer(findDeserializerClass(props, consumerProperties, false));this.checkNullValueForExceptions = checkDeserializer(findDeserializerClass(props, consumerProperties, true));this.syncCommitTimeout = determineSyncCommitTimeout();if (this.containerProperties.getSyncCommitTimeout() == null) {// update the property so we can use it directly from code elsewherethis.containerProperties.setSyncCommitTimeout(this.syncCommitTimeout);if (KafkaMessageListenerContainer.this.thisOrParentContainer != null) {KafkaMessageListenerContainer.this.thisOrParentContainer.getContainerProperties().setSyncCommitTimeout(this.syncCommitTimeout);}}this.maxPollInterval = obtainMaxPollInterval(consumerProperties);this.micrometerHolder = obtainMicrometerHolder();this.deliveryAttemptAware = setupDeliveryAttemptAware();this.subBatchPerPartition = setupSubBatchPerPartition();}@Overridepublic void run() { // NOSONAR complexityListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());publishConsumerStartingEvent();this.consumerThread = Thread.currentThread();setupSeeks();KafkaUtils.setConsumerGroupId(this.consumerGroupId);this.count = 0;this.last = System.currentTimeMillis();initAssignedPartitions();publishConsumerStartedEvent();Throwable exitThrowable = null;while (isRunning()) {try {//拉取消息pollAndInvoke();}catch (@SuppressWarnings(UNUSED) WakeupException e) {// Ignore, we're stopping or applying immediate foreign acks}catch (NoOffsetForPartitionException nofpe) {this.fatalError = true;ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");exitThrowable = nofpe;break;}catch (AuthorizationException ae) {if (this.authorizationExceptionRetryInterval == null) {ListenerConsumer.this.logger.error(ae, "Authorization Exception and no authorizationExceptionRetryInterval set");this.fatalError = true;exitThrowable = ae;break;}else {ListenerConsumer.this.logger.error(ae, "Authorization Exception, retrying in " + this.authorizationExceptionRetryInterval.toMillis() + " ms");// We can't pause/resume here, as KafkaConsumer doesn't take pausing// into account when committing, hence risk of being flooded with// GroupAuthorizationExceptions.// see: https://github.com/spring-projects/spring-kafka/pull/1337sleepFor(this.authorizationExceptionRetryInterval);}}catch (FencedInstanceIdException fie) {this.fatalError = true;ListenerConsumer.this.logger.error(fie, "'" + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG+ "' has been fenced");exitThrowable = fie;break;}catch (StopAfterFenceException e) {this.logger.error(e, "Stopping container due to fencing");stop(false);exitThrowable = e;}catch (Error e) { // NOSONAR - rethrownRunnable runnable = KafkaMessageListenerContainer.this.emergencyStop;if (runnable != null) {runnable.run();}this.logger.error(e, "Stopping container due to an Error");wrapUp(e);throw e;}catch (Exception e) {handleConsumerException(e);}}wrapUp(exitThrowable);}protected void pollAndInvoke() {if (!this.autoCommit && !this.isRecordAck) {processCommits();}fixTxOffsetsIfNeeded();idleBetweenPollIfNecessary();if (this.seeks.size() > 0) {processSeeks();}pauseConsumerIfNecessary();this.lastPoll = System.currentTimeMillis();if (!isRunning()) {return;}this.polling.set(true);//拉取消息ConsumerRecords<K, V> records = doPoll();if (!this.polling.compareAndSet(true, false) && records != null) {/** There is a small race condition where wakeIfNecessary was called between* exiting the poll and before we reset the boolean.*/if (records.count() > 0) {this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());}return;}resumeConsumerIfNeccessary();debugRecords(records);if (records != null && records.count() > 0) {savePositionsIfNeeded(records);notIdle();invokeListener(records);}else {checkIdle();}}private ConsumerRecords<K, V> doPoll() {ConsumerRecords<K, V> records;if (this.isBatchListener && this.subBatchPerPartition) {if (this.batchIterator == null) {this.lastBatch = this.consumer.poll(this.pollTimeout);if (this.lastBatch.count() == 0) {return this.lastBatch;}else {this.batchIterator = this.lastBatch.partitions().iterator();}}TopicPartition next = this.batchIterator.next();List<ConsumerRecord<K, V>> subBatch = this.lastBatch.records(next);records = new ConsumerRecords<>(Collections.singletonMap(next, subBatch));if (!this.batchIterator.hasNext()) {this.batchIterator = null;}}else {//拉取消息  基本apirecords = this.consumer.poll(this.pollTimeout);checkRebalanceCommits();}return records;}

相关文章:

springboot2.2.9整合kafka之KafkaListener实现原理

1、开启kafka的注解EnableKafka 通过开启kafka注解可以看到Import的类KafkaListenerConfigurationSelector加载一个配置类KafkaBootstrapConfiguration&#xff0c;而此类中有两个重要的类: KafkaListenerAnnotationBeanPostProcessor、KafkaListenerEndpointRegistry 2、Kaf…...

数据结构day7

1.思维导图 1.二叉树递归创建 2.二叉树先中后序遍历 3.二叉树计算节点 4.二叉树计算深度。 5.编程实现快速排序降序...

cleanmymacX有必要买吗

CleanMyMac X是一款被广泛推荐的Mac电脑清理软件。以下是关于是否购买CleanMyMac X的几个关键点&#xff1a; 软件功能&#xff1a;CleanMyMac X具备多项功能&#xff0c;包括但不限于系统垃圾清理、缓存清理、恶意软件移除、隐私保护等。这些功能有助于保持Mac电脑的清洁和性能…...

智慧文旅:打造无缝旅游体验的关键

随着科技的快速发展和消费者需求的不断升级&#xff0c;旅游业正面临着前所未有的变革压力。智慧文旅作为数字化转型的重要领域&#xff0c;旨在通过智能化、数据化手段为游客提供更加优质、便捷、个性化的服务&#xff0c;打造无缝的旅游体验。本文将深入探讨智慧文旅在打造无…...

C语言 | 求最大/小值小技巧:fmax、fmin函数

如果你只是因为不想用C语言手写max、min函数&#xff0c;就直接去用iostream中的max、min函数的话&#xff0c;这篇文章可能会有些许帮助。 &#x1f607; fmax、fmin函数用于确定两个指定值的较大/较小值。 头文件 math.h&#xff08;或者cmath&#xff09;。 定义 double …...

【深度学习每日小知识】Model Accuracy 模型准确率

Model Accuracy 模型准确率 模型准确性是衡量机器学习 (ML) 模型基于数据做出预测或决策的能力的指标。它是用于评估 ML 模型性能的常用指标&#xff0c;可用于比较不同模型的性能或评估特定模型对于给定任务的有效性。 有多种不同的方法来衡量模型的准确性&#xff0c;具体取…...

智能AI系统开发,专业软件硬件物联网开发公司,探索未来科技新纪元

在信息时代&#xff0c;人工智能&#xff08;AI&#xff09;、物联网等前沿技术日益受到人们的关注。智能AI系统、专业软件硬件物联网开发公司应运而生。今天&#xff0c;我们将向大家介绍一家位于XX城的专业公司&#xff0c;致力于智能AI系统开发和软件硬件物联网领域的创新研…...

第七篇:node中间件详解

&#x1f3ac; 江城开朗的豌豆&#xff1a;个人主页 &#x1f525; 个人专栏 :《 VUE 》 《 javaScript 》 &#x1f4dd; 个人网站 :《 江城开朗的豌豆&#x1fadb; 》 ⛺️ 生活的理想&#xff0c;就是为了理想的生活 ! ​ 目录 &#x1f4d8; 引言&#xff1a; &#…...

Jenkins自动化打包

Jenkins自动化打包 下载安装 我们直接从官网https://www.jenkins.io/download/ 下载所需的Jenkins文件 如上图所示, 选择Windows版本,下面就是一路安装即可,需要注意的是,选择作为系统服务选项, 不要自己设置账号密码登录. Web配置 安装完根据提示在浏览器打开 http://lo…...

【服务端性能测试】性能测试策略如何做

一、需求收集 先需要确认本次测试目的是什么&#xff0c;然后再看我们需要用什么参数来判断这个目的是否能够达成。 1.1 业务性能指标参考&#xff1a; TPS、QPS、RT、请求成功率&#xff08;一般请求成功率>99.99%&#xff09; 1.2 硬件性能指标参考&#xff1a; 即服…...

透明拼接屏造型:多样拼接与影响因素

透明拼接屏&#xff0c;以其独特的透明显示效果和灵活的拼接方式&#xff0c;在现代显示领域中独树一帜。其造型多样&#xff0c;包括横屏拼接、竖屏拼接、异形拼接以及定制拼接等多种方式&#xff0c;满足了不同场景和应用的需求。尼伽小编将详细介绍这些拼接方式&#xff0c;…...

c# 对路径的访问被拒绝

c#写入一个文件&#xff0c;报错&#xff1a; c# 对路径的访问被拒绝 解决方法&#xff1a; 检查文件路径和目录权限&#xff1a; 确保你的应用程序有权限写入指定的文件或目录。在某些情况下&#xff0c;你可能需要以管理员身份运行应用程序或更改文件/目录的权限。 确保目…...

【数据结构】单调队列

参考这篇文章 单调队列的作用是&#xff1a;给定一个长度为 n 的数组&#xff0c;维护长度为 m 的区间最大/小值 &#xff08;下面以维护区间最小值为例&#xff0c;最大值相反&#xff09; 简单来说就是维护一个 deque&#xff0c;deque 的队头是当前最小值的序号&#xff…...

《统计学习方法:李航》笔记 从原理到实现(基于python)-- 第5章 决策树(代码python实践)

文章目录 第5章 决策树—python 实践书上题目5.1利用ID3算法生成决策树&#xff0c;例5.3scikit-learn实例 《统计学习方法&#xff1a;李航》笔记 从原理到实现&#xff08;基于python&#xff09;-- 第5章 决策树 第5章 决策树—python 实践 import numpy as np import pand…...

电脑可以设置代理IP吗

首先需要回答的是&#xff0c;电脑可以设置代理IP&#xff0c;下面我们详细说说如何设置。 首先&#xff0c;我们使用工具来完成&#xff0c;使用工具的好处就是可以设置单独的软件使用代理&#xff0c;也可以设置全局&#xff0c;比较方便 我们解压这个文件出来&#xff0c;打…...

Zookeeper服务注册与发现实战

目录 设计思路 Zookeeper注册中心的优缺点 SpringCloudZookeeper实现微服务注册中心 第一步&#xff1a;在父pom文件中指定Spring Cloud版本 第二步&#xff1a;微服务pom文件中引入Spring Cloud Zookeeper注册中心依赖 第三步&#xff1a; 微服务配置文件application.y…...

【LeetCode】每日一题 2024_1_30 使循环数组所有元素相等的最少秒数(哈希、贪心、扩散)

文章目录 LeetCode&#xff1f;启动&#xff01;&#xff01;&#xff01;题目&#xff1a;使循环数组所有元素相等的最少秒数题目描述代码与解题思路 LeetCode&#xff1f;启动&#xff01;&#xff01;&#xff01; 今天的题目类型差不多是第一次见到&#xff0c;原来题目描述…...

uni-app vite+ts+vue3模式 集成微信云开发

1.创建uni-app项目 此处使用的是通过vue-cli命令行方式uni-app官网 使用vue3/vite版 创建以 typescript 开发的工程&#xff08;如命令行创建失败&#xff0c;请直接访问 gitee 下载模板&#xff09; npx degit dcloudio/uni-preset-vue#vite-ts my-vue3-project(我创建失败…...

一个程序入库出现死锁问题的排查

某虚拟化部署的服务群&#xff0c;发现其中一个程序在写数据库时&#xff0c;经常有死锁现象&#xff0c;一旦出现&#xff0c;持续时间长达数分钟。当时没时间排查&#xff0c;一直到年底才解决。后面又忙&#xff0c;直到月底才有点时间总结。抛开起初没找到问题的时间外&…...

记录解决报错--These dependencies were not found jsencrypt lodash-es

1.场景 idea打包vue&#xff0c;报错退出&#xff0c;缺少依赖 These dependencies were not found jsencrypt lodash-es2.解决步骤 ①到相关目录下直接安装依赖&#xff0c;npm install --save jsencrypt lodash-es。我这里是没安装成功&#xff0c;原因是很多依赖冲突。…...

【杂谈】-递归进化:人工智能的自我改进与监管挑战

递归进化&#xff1a;人工智能的自我改进与监管挑战 文章目录 递归进化&#xff1a;人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管&#xff1f;3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

镜像里切换为普通用户

如果你登录远程虚拟机默认就是 root 用户&#xff0c;但你不希望用 root 权限运行 ns-3&#xff08;这是对的&#xff0c;ns3 工具会拒绝 root&#xff09;&#xff0c;你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案&#xff1a;创建非 roo…...

【python异步多线程】异步多线程爬虫代码示例

claude生成的python多线程、异步代码示例&#xff0c;模拟20个网页的爬取&#xff0c;每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程&#xff1a;允许程序同时执行多个任务&#xff0c;提高IO密集型任务&#xff08;如网络请求&#xff09;的效率…...

Java线上CPU飙高问题排查全指南

一、引言 在Java应用的线上运行环境中&#xff0c;CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时&#xff0c;通常会导致应用响应缓慢&#xff0c;甚至服务不可用&#xff0c;严重影响用户体验和业务运行。因此&#xff0c;掌握一套科学有效的CPU飙高问题排查方法&…...

对象回调初步研究

_OBJECT_TYPE结构分析 在介绍什么是对象回调前&#xff0c;首先要熟悉下结构 以我们上篇线程回调介绍过的导出的PsProcessType 结构为例&#xff0c;用_OBJECT_TYPE这个结构来解析它&#xff0c;0x80处就是今天要介绍的回调链表&#xff0c;但是先不着急&#xff0c;先把目光…...

webpack面试题

面试题&#xff1a;webpack介绍和简单使用 一、webpack&#xff08;模块化打包工具&#xff09;1. webpack是把项目当作一个整体&#xff0c;通过给定的一个主文件&#xff0c;webpack将从这个主文件开始找到你项目当中的所有依赖文件&#xff0c;使用loaders来处理它们&#x…...

HTML中各种标签的作用

一、HTML文件主要标签结构及说明 1. <&#xff01;DOCTYPE html> 作用&#xff1a;声明文档类型&#xff0c;告知浏览器这是 HTML5 文档。 必须&#xff1a;是。 2. <html lang“zh”>. </html> 作用&#xff1a;包裹整个网页内容&#xff0c;lang"z…...

aurora与pcie的数据高速传输

设备&#xff1a;zynq7100&#xff1b; 开发环境&#xff1a;window&#xff1b; vivado版本&#xff1a;2021.1&#xff1b; 引言 之前在前面两章已经介绍了aurora读写DDR,xdma读写ddr实验。这次我们做一个大工程&#xff0c;pc通过pcie传输给fpga&#xff0c;fpga再通过aur…...

el-amap-bezier-curve运用及线弧度设置

文章目录 简介示例线弧度属性主要弧度相关属性其他相关样式属性完整示例链接简介 ‌el-amap-bezier-curve 是 Vue-Amap 组件库中的一个组件,用于在 高德地图 上绘制贝塞尔曲线。‌ 基本用法属性path定义曲线的路径,可以是多个弧线段的组合。stroke-weight线条的宽度。stroke…...