kafka小实站
需要先在前面的文章里面照着下载好kafka,并且启动 先启动zookeeper
项目目录
package kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** kafka消费者***/
@Component
@Slf4j
public class KafkaConsumer {@KafkaListener(topics = "${kafka.topic.name}")public void listen(ConsumerRecord<?, ?> record) {log.info("topic={}, offset={}, message={}", record.topic(), record.offset(), record.value());}}
comsumer代码
定义了一个简单的 Kafka 消费者,它使用 Spring 的 Kafka Listener 来监听指定的主题,并处理接收到的消息
-
@Component
标记这个类为一个 Spring 的组件,使其被 Spring 容器管理并可以被自动扫描和加载。 -
@Slf4j
这是 Lombok 提供的注解,自动为类生成一个名为log
的 日志记录器,可以直接用log.info()
等方法记录日志,而无需手动定义日志实例。 -
@KafkaListener
这是 Spring Kafka 提供的注解,用于标记一个方法为 Kafka 消息的监听器。topics
:指定要监听的 Kafka 主题。这里用占位符${kafka.topic.name}
,说明具体的主题名称是从配置文件(如application.properties
或application.yml
)中读取的。
-
自动监听
当有新的消息发布到指定的 Kafka 主题时,Spring 会自动调用标注了@KafkaListener
的方法来处理消息。 -
参数
ConsumerRecord<?, ?> record
代表 Kafka 中一条消费记录,包含了消息的元数据(如主题名、分区、偏移量)和消息体。record.topic()
:获取消息所属的 Kafka 主题名。record.offset()
:获取消息的偏移量(在 Kafka 中,每条消息都有一个唯一的偏移量)。record.value()
:获取消息的内容。
-
log.info
使用日志记录器将接收到的消息详细信息打印到日志中,包括:- 消息所在的 主题名。
- 偏移量(
offset
),用于定位消息在分区中的位置。 - 消息的 内容。
package kafka; import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;/*** kafka生产者**/
@Component
@Slf4j
public class KafkaProducer implements CommandLineRunner {@ResourceKafkaTemplate kafkaTemplate;@Value("${kafka.topic.name}")private String topic;/*** 发送kafka消息** @param string*/public void send(String string) {ListenableFuture future = kafkaTemplate.send(topic, JSONObject.toJSONString(string));//future.addCallback(o -> log.info("kafka消息发送成功:" + jsonString), throwable -> log.error("kafka消息发送失败:" + jsonString));}/*** 容器启动完成后,生产测试数据*/@Overridepublic void run(String... args) {for (int i = 0; i < 10; i++) {send("hello world" + i);}}
}
producer代码
-
KafkaTemplate
这是 Spring Kafka 提供的核心类,用于向 Kafka 发送消息。通过@Resource
注解实现依赖注入。 -
@Value("${kafka.topic.name}")
通过 Spring 的配置文件(如application.properties
或application.yml
)注入 Kafka 主题名称。 -
kafkaTemplate.send(topic, message)
通过KafkaTemplate
将消息发送到指定主题。 -
JSONObject.toJSONString(string)
使用阿里巴巴的fastjson
库将消息序列化为 JSON 格式。 -
ListenableFuture
Kafka 消息发送是异步的,send()
方法返回一个ListenableFuture
对象,可以用来监听消息的发送结果。 -
日志记录(被注释)
使用addCallback
方法为发送成功和失败分别设置回调逻辑:- 发送成功时记录日志:
log.info("kafka消息发送成功:" + string)
。 - 发送失败时记录错误日志:
log.error("kafka消息发送失败:" + string)
-
run(String... args)
这是CommandLineRunner
接口的方法,Spring 容器启动完成后会自动调用。 -
循环发送消息
启动后向 Kafka 主题发送 10 条测试消息,例如:hello world0
、hello world1
等。
- 发送成功时记录日志:
## kafka ##
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=testspring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.topic.name=mqtt_location_data
#
#spring.data.mongodb.host=localhost
#spring.data.mongodb.port=27017
#spring.data.mongodb.database=your_database_name
配置文件application.properties
运行结果:
2024-12-26 01:32:30.839 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:00.850 INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
2024-12-26 01:33:00.855 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.856 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.856 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted.
2024-12-26 01:33:00.970 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.971 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 01:33:00.980 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.847 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.847 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 01:33:15.874 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.875 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 3: {consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a=Assignment(partitions=[mqtt_location_data-0])}
2024-12-26 01:33:15.879 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.880 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-26 01:33:15.881 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-26 01:33:15.887 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-26 01:33:15.888 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions assigned: [mqtt_location_data-0]
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=10, message="hello world0"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=11, message="hello world1"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=12, message="hello world2"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=13, message="hello world3"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=14, message="hello world4"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=15, message="hello world5"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=16, message="hello world6"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=17, message="hello world7"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=18, message="hello world8"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=19, message="hello world9"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=20, message="hello world0"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=21, message="hello world1"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=22, message="hello world2"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=23, message="hello world3"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=24, message="hello world4"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=25, message="hello world5"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=26, message="hello world6"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=27, message="hello world7"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=28, message="hello world8"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=29, message="hello world9"
2024-12-26 04:36:36.611 INFO 22936 --- [ntainer#0-0-C-1] o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-test-1, groupId=test] Error sending fetch request (sessionId=1770952277, epoch=389) to node 0:org.apache.kafka.common.errors.DisconnectException: null2024-12-26 04:36:36.612 INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2024-12-26 04:36:36.727 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 04:36:36.731 ERROR 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Offset commit failed on partition mqtt_location_data-0 at offset 30: The coordinator is not aware of this member.
2024-12-26 04:36:36.731 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] OffsetCommit failed with Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}: The coordinator is not aware of this member.
2024-12-26 04:36:36.732 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2024-12-26 04:36:36.732 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2024-12-26 04:36:36.732 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Asynchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2024-12-26 04:36:36.732 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Failing OffsetCommit request since the consumer is not part of an active group
2024-12-26 04:36:36.733 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Synchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Lost previously assigned partitions mqtt_location_data-0
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions lost: [mqtt_location_data-0]
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions revoked: [mqtt_location_data-0]
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 04:36:36.734 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Attempt to heartbeat with stale Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, ignoring the error
2024-12-26 04:36:36.735 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: need to re-join with the given member-id
2024-12-26 04:36:36.735 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 04:36:37.543 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}
2024-12-26 04:36:37.543 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 5: {consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61=Assignment(partitions=[mqtt_location_data-0])}
2024-12-26 04:36:37.553 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}
2024-12-26 04:36:37.554 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-26 04:36:37.554 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-26 04:36:37.557 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-26 04:36:37.589 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions assigned: [mqtt_location_data-0]
2024-12-30 22:44:08.518 INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2024-12-30 22:44:09.620 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Attempt to heartbeat with Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Failing OffsetCommit request since the consumer is not part of an active group
2024-12-30 22:44:09.630 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Synchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Lost previously assigned partitions mqtt_location_data-0
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions lost: [mqtt_location_data-0]
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions revoked: [mqtt_location_data-0]
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-30 22:44:09.636 ERROR 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Offset commit failed on partition mqtt_location_data-0 at offset 30: The coordinator is not aware of this member.
2024-12-30 22:44:09.636 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] OffsetCommit failed with Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}: The coordinator is not aware of this member.
2024-12-30 22:44:09.637 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Asynchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer member's generation is already stale, meaning it has already participated another rebalance and got a new generation. You can try completing the rebalance by calling poll() and then retry commit again
2024-12-30 22:44:09.640 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: need to re-join with the given member-id
2024-12-30 22:44:09.640 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-30 22:44:09.663 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=7, memberId='consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd', protocol='range'}
2024-12-30 22:44:09.663 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 7: {consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd=Assignment(partitions=[mqtt_location_data-0])}
2024-12-30 22:44:09.667 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=7, memberId='consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd', protocol='range'}
2024-12-30 22:44:09.667 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-30 22:44:09.667 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-30 22:44:09.672 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-30 22:44:10.200 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions assigned: [mqtt_location_data-0]
相关文章:

kafka小实站
需要先在前面的文章里面照着下载好kafka,并且启动 先启动zookeeper 项目目录 package kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import…...

基于Python实现车辆检测、机动车检测、识别位置标记、计数
目录 引言背景与应用场景车辆检测的研究意义相关工作车辆检测概述机动车检测方法分类基于传统计算机视觉的检测方法基于深度学习的检测方法技术与方法车辆检测技术概述基于Python的车辆检测方法图像处理与特征提取深度学习方法(如YOLO、SSD、Faster R-CNN等)数据集与标注常用…...

心理学硕士
心理学硕士的主要研究方向包括基础心理学、发展心理学和应用心理学。 基础心理学研究一般的心理现象与规律,如心理的实质及神经机制、感觉与知觉、意识与注意、学习与记忆、思维与语言、情绪与意识、人格等。发展心理学研究人类个体心理发生发展的特点和规律&a…...
python量化分析学习与实践1:API接口篇
业内比较流行的几款API数据接口,有聚宽、TuShare,yfinance,以及pandas的pandas_datareader等。国内的一般都需要用户认证,才能下载数据。国外的yfinance与pandas_datareader等则不需要,但需要科学上网。 聚宽 测试下…...

【GO基础学习】gin的使用
文章目录 模版使用流程参数传递路由分组数据解析和绑定gin中间件 模版使用流程 package mainimport ("net/http""github.com/gin-gonic/gin" )func main() {// 1.创建路由r : gin.Default()// 2.绑定路由规则,执行的函数// gin.Context&#x…...
网卡状态变更,virtio-net检测
实现方案: 现在在amp模式下linux端有个真实的物理网卡eth0,有一个虚拟网卡virtio-net0后端,此时需要一种机制,将真实物理网卡的状态发送rtos的virtio-net0前端。这里使用register_netdevice_notifier机制,每个virtio-n…...
中华人民共和国保守国家秘密法
中华人民共和国保守国家秘密法 (1988年9月5日第七届全国人民代表大会常务委员会第三次会议通过 2010年4月29日第十一届全国人民代表大会常务委员会第十四次会议第一次修订 2024年2月27日第十四届全国人民代表大会常务委员会第八次会议第二次修订) 目…...

ELK日志收集系统部署
1、 ElasticSearch部署 Elastic — 搜索 AI 公司 | Elastic 系统类型:Centos7.4 节点IP:172.16.246.234 软件版本:jdk-8u191-linux-x64.tar.gz、elasticsearch-6.5.4.tar.gz 示例节点:172.16.246.234 1、安装配置jdk8 ES运行依…...

3D线上艺术展:艺术与技术的完美融合
随着数字技术的飞速发展,未来的艺术展览正逐步迈向线上线下融合的新阶段。其中,3D线上展览以其独特的魅力,成为线下展览的延伸与拓展,为艺术爱好者们开辟了全新的观赏途径。 对于艺术家和策展人而言,3D线上展览不仅打…...

TiDB 的MPP架构概述
MPP架构介绍: 如图,TiDB Server 作为协调者,首先 TiDB Server 会把每个TiFlash 拥有的region 会在TiFlash上做交换,让表连接在一个TiFlash上。另外 TiFlash会作为计算节点,每个TiFlash都负责数据交换,表连接…...

Leetcode 10-正则表达式匹配/ 剑指 Offer 19. 正则表达式匹配
给你一个字符串 s 和一个字符规律 p,请你来实现一个支持 ‘.’ 和 ‘*’ 的正则表达式匹配。 ‘.’ 匹配任意单个字符 ‘*’ 匹配零个或多个前面的那一个元素 所谓匹配,是要涵盖 整个 字符串 s 的,而不是部分字符串。 题解 字符串匹配多…...

FFmpeg 编码和解码
文章目录 音频格式AACADIF音频数据交换格式ADTS音频数据传输流 音频解码音频编码 视频格式H264GOP图像组I帧,P帧,B帧H264压缩技术H264压缩级别H264视频级别H264码流结构SPSPPS 解码视频编码视频 音频格式 AAC AAC全称 Advanced Audio Coding࿰…...
kali当中web扫描工具的用法
1. cadaver 用途:用于与WebDAV服务器交互,可进行文件上传、下载、目录浏览等操作。使用方法 连接到WebDAV服务器:cadaver <WebDAV服务器地址>,例如cadaver https://example.com/dav,然后按提示输入用户名和密码…...
深度剖析 Android Animation 框架
深度剖析 Android Animation 框架 目录 引言Android Animation 框架概述架构设计 3.1 核心类与接口3.2 动画类型3.3 动画执行流程使用指南 4.1 属性动画4.2 视图动画4.3 过渡动画设计模式 5.1 策略模式5.2 观察者模式5.3 工厂模式核心逻辑 6.1 动画插值器6.2 动画估值器6.3 动…...
泰山派GPIO子系统驱动---亮灯
本人linux驱动小白,文章基于B站up主 李Sir______ 视频内容记录,做笔记用。如有错误欢迎指正。本文将以开发板第40引脚GPIO3_B4作为LED灯珠的控制引脚,高电平灯亮,低电平灯灭。 杂话 在linux内核中,芯片厂商已经把所有…...

【C#特性整理】C#特性及语法基础
1. C#特性 1.1 统一的类型系统 C#中, 所有类型都共享一个公共的基类型. 例如,任何类型的实例都可以通过调用ToString方法将自身转换为一个字符串 1.2 类和接口 接口: 用于将标准与实现隔离, 仅仅定义行为,不做实现. 1.3 属性、方法、事件 属性: 封装了一部分对…...
Presence:Colyseus用于管理实时分布式数据的工具
Colyseus Presence 详细介绍 Presence 是 Colyseus 中用于管理实时分布式数据的一种工具。它主要用于在多房间、多服务器或分布式部署中实现玩家的实时在线状态、数据共享和通信。Presence 提供了一套简单的 API 来处理诸如在线玩家跟踪、分布式数据存储和发布/订阅模式等功能…...

Ubuntu 搭建SVN服务
目录 1、安装SVN服务端 2、创建SVN版本库 3、修改SVN配置svnserve.conf 3.1 配置文件介绍 3.2 svnserve.conf配置 3.3 authz配置设置用户读写权限 3.4 passwd配置 用户名密码 4、启动SVN服务 4.1 配置开机启动 1、安装SVN服务端 sudo apt-get install subversion…...
HTML速查
HTML 基本文档 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>文档标题</title></head><body>可见文本...</body> </html>基本标签(Basic Tags) <h1>最大的…...

day-102 二进制矩阵中的最短路径
思路 BFS 解题过程 从起点依次向八个方向尝试(之后也一样),如果某个位置在矩阵内且值为0且没有访问过,将其添加到一个队列中,依次类推,直到到达出口 Code class Solution {public int shortestPathBinar…...

华为云AI开发平台ModelArts
华为云ModelArts:重塑AI开发流程的“智能引擎”与“创新加速器”! 在人工智能浪潮席卷全球的2025年,企业拥抱AI的意愿空前高涨,但技术门槛高、流程复杂、资源投入巨大的现实,却让许多创新构想止步于实验室。数据科学家…...
模型参数、模型存储精度、参数与显存
模型参数量衡量单位 M:百万(Million) B:十亿(Billion) 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的,但是一个参数所表示多少字节不一定,需要看这个参数以什么…...

安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件
在选煤厂、化工厂、钢铁厂等过程生产型企业,其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进,需提前预防假检、错检、漏检,推动智慧生产运维系统数据的流动和现场赋能应用。同时,…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...

uniapp微信小程序视频实时流+pc端预览方案
方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度WebSocket图片帧定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐RTMP推流TRTC/即构SDK推流❌ 付费方案 (部分有免费额度&#x…...

【Java_EE】Spring MVC
目录 Spring Web MVC 编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 编辑参数重命名 RequestParam 编辑编辑传递集合 RequestParam 传递JSON数据 编辑RequestBody …...
HTML前端开发:JavaScript 常用事件详解
作为前端开发的核心,JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例: 1. onclick - 点击事件 当元素被单击时触发(左键点击) button.onclick function() {alert("按钮被点击了!&…...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...

回溯算法学习
一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...