kafka小实站
需要先在前面的文章里面照着下载好kafka,并且启动 先启动zookeeper
项目目录
package kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** kafka消费者***/
@Component
@Slf4j
public class KafkaConsumer {@KafkaListener(topics = "${kafka.topic.name}")public void listen(ConsumerRecord<?, ?> record) {log.info("topic={}, offset={}, message={}", record.topic(), record.offset(), record.value());}}
comsumer代码
定义了一个简单的 Kafka 消费者,它使用 Spring 的 Kafka Listener 来监听指定的主题,并处理接收到的消息
-
@Component
标记这个类为一个 Spring 的组件,使其被 Spring 容器管理并可以被自动扫描和加载。 -
@Slf4j
这是 Lombok 提供的注解,自动为类生成一个名为log
的 日志记录器,可以直接用log.info()
等方法记录日志,而无需手动定义日志实例。 -
@KafkaListener
这是 Spring Kafka 提供的注解,用于标记一个方法为 Kafka 消息的监听器。topics
:指定要监听的 Kafka 主题。这里用占位符${kafka.topic.name}
,说明具体的主题名称是从配置文件(如application.properties
或application.yml
)中读取的。
-
自动监听
当有新的消息发布到指定的 Kafka 主题时,Spring 会自动调用标注了@KafkaListener
的方法来处理消息。 -
参数
ConsumerRecord<?, ?> record
代表 Kafka 中一条消费记录,包含了消息的元数据(如主题名、分区、偏移量)和消息体。record.topic()
:获取消息所属的 Kafka 主题名。record.offset()
:获取消息的偏移量(在 Kafka 中,每条消息都有一个唯一的偏移量)。record.value()
:获取消息的内容。
-
log.info
使用日志记录器将接收到的消息详细信息打印到日志中,包括:- 消息所在的 主题名。
- 偏移量(
offset
),用于定位消息在分区中的位置。 - 消息的 内容。
package kafka; import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;/*** kafka生产者**/
@Component
@Slf4j
public class KafkaProducer implements CommandLineRunner {@ResourceKafkaTemplate kafkaTemplate;@Value("${kafka.topic.name}")private String topic;/*** 发送kafka消息** @param string*/public void send(String string) {ListenableFuture future = kafkaTemplate.send(topic, JSONObject.toJSONString(string));//future.addCallback(o -> log.info("kafka消息发送成功:" + jsonString), throwable -> log.error("kafka消息发送失败:" + jsonString));}/*** 容器启动完成后,生产测试数据*/@Overridepublic void run(String... args) {for (int i = 0; i < 10; i++) {send("hello world" + i);}}
}
producer代码
-
KafkaTemplate
这是 Spring Kafka 提供的核心类,用于向 Kafka 发送消息。通过@Resource
注解实现依赖注入。 -
@Value("${kafka.topic.name}")
通过 Spring 的配置文件(如application.properties
或application.yml
)注入 Kafka 主题名称。 -
kafkaTemplate.send(topic, message)
通过KafkaTemplate
将消息发送到指定主题。 -
JSONObject.toJSONString(string)
使用阿里巴巴的fastjson
库将消息序列化为 JSON 格式。 -
ListenableFuture
Kafka 消息发送是异步的,send()
方法返回一个ListenableFuture
对象,可以用来监听消息的发送结果。 -
日志记录(被注释)
使用addCallback
方法为发送成功和失败分别设置回调逻辑:- 发送成功时记录日志:
log.info("kafka消息发送成功:" + string)
。 - 发送失败时记录错误日志:
log.error("kafka消息发送失败:" + string)
-
run(String... args)
这是CommandLineRunner
接口的方法,Spring 容器启动完成后会自动调用。 -
循环发送消息
启动后向 Kafka 主题发送 10 条测试消息,例如:hello world0
、hello world1
等。
- 发送成功时记录日志:
## kafka ##
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=testspring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.topic.name=mqtt_location_data
#
#spring.data.mongodb.host=localhost
#spring.data.mongodb.port=27017
#spring.data.mongodb.database=your_database_name
配置文件application.properties
运行结果:
2024-12-26 01:32:30.839 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:00.850 INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
2024-12-26 01:33:00.855 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.856 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.856 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted.
2024-12-26 01:33:00.970 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.971 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 01:33:00.980 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.847 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.847 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 01:33:15.874 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.875 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 3: {consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a=Assignment(partitions=[mqtt_location_data-0])}
2024-12-26 01:33:15.879 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.880 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-26 01:33:15.881 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-26 01:33:15.887 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-26 01:33:15.888 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions assigned: [mqtt_location_data-0]
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=10, message="hello world0"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=11, message="hello world1"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=12, message="hello world2"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=13, message="hello world3"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=14, message="hello world4"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=15, message="hello world5"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=16, message="hello world6"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=17, message="hello world7"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=18, message="hello world8"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=19, message="hello world9"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=20, message="hello world0"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=21, message="hello world1"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=22, message="hello world2"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=23, message="hello world3"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=24, message="hello world4"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=25, message="hello world5"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=26, message="hello world6"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=27, message="hello world7"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=28, message="hello world8"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=29, message="hello world9"
2024-12-26 04:36:36.611 INFO 22936 --- [ntainer#0-0-C-1] o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-test-1, groupId=test] Error sending fetch request (sessionId=1770952277, epoch=389) to node 0:org.apache.kafka.common.errors.DisconnectException: null2024-12-26 04:36:36.612 INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2024-12-26 04:36:36.727 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 04:36:36.731 ERROR 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Offset commit failed on partition mqtt_location_data-0 at offset 30: The coordinator is not aware of this member.
2024-12-26 04:36:36.731 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] OffsetCommit failed with Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}: The coordinator is not aware of this member.
2024-12-26 04:36:36.732 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2024-12-26 04:36:36.732 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2024-12-26 04:36:36.732 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Asynchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2024-12-26 04:36:36.732 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Failing OffsetCommit request since the consumer is not part of an active group
2024-12-26 04:36:36.733 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Synchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Lost previously assigned partitions mqtt_location_data-0
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions lost: [mqtt_location_data-0]
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions revoked: [mqtt_location_data-0]
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 04:36:36.734 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Attempt to heartbeat with stale Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, ignoring the error
2024-12-26 04:36:36.735 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: need to re-join with the given member-id
2024-12-26 04:36:36.735 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 04:36:37.543 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}
2024-12-26 04:36:37.543 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 5: {consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61=Assignment(partitions=[mqtt_location_data-0])}
2024-12-26 04:36:37.553 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}
2024-12-26 04:36:37.554 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-26 04:36:37.554 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-26 04:36:37.557 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-26 04:36:37.589 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions assigned: [mqtt_location_data-0]
2024-12-30 22:44:08.518 INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2024-12-30 22:44:09.620 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Attempt to heartbeat with Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Failing OffsetCommit request since the consumer is not part of an active group
2024-12-30 22:44:09.630 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Synchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Lost previously assigned partitions mqtt_location_data-0
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions lost: [mqtt_location_data-0]
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions revoked: [mqtt_location_data-0]
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-30 22:44:09.636 ERROR 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Offset commit failed on partition mqtt_location_data-0 at offset 30: The coordinator is not aware of this member.
2024-12-30 22:44:09.636 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] OffsetCommit failed with Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}: The coordinator is not aware of this member.
2024-12-30 22:44:09.637 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Asynchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer member's generation is already stale, meaning it has already participated another rebalance and got a new generation. You can try completing the rebalance by calling poll() and then retry commit again
2024-12-30 22:44:09.640 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: need to re-join with the given member-id
2024-12-30 22:44:09.640 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-30 22:44:09.663 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=7, memberId='consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd', protocol='range'}
2024-12-30 22:44:09.663 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 7: {consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd=Assignment(partitions=[mqtt_location_data-0])}
2024-12-30 22:44:09.667 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=7, memberId='consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd', protocol='range'}
2024-12-30 22:44:09.667 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-30 22:44:09.667 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-30 22:44:09.672 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-30 22:44:10.200 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions assigned: [mqtt_location_data-0]
相关文章:

kafka小实站
需要先在前面的文章里面照着下载好kafka,并且启动 先启动zookeeper 项目目录 package kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import…...

基于Python实现车辆检测、机动车检测、识别位置标记、计数
目录 引言背景与应用场景车辆检测的研究意义相关工作车辆检测概述机动车检测方法分类基于传统计算机视觉的检测方法基于深度学习的检测方法技术与方法车辆检测技术概述基于Python的车辆检测方法图像处理与特征提取深度学习方法(如YOLO、SSD、Faster R-CNN等)数据集与标注常用…...

心理学硕士
心理学硕士的主要研究方向包括基础心理学、发展心理学和应用心理学。 基础心理学研究一般的心理现象与规律,如心理的实质及神经机制、感觉与知觉、意识与注意、学习与记忆、思维与语言、情绪与意识、人格等。发展心理学研究人类个体心理发生发展的特点和规律&a…...

python量化分析学习与实践1:API接口篇
业内比较流行的几款API数据接口,有聚宽、TuShare,yfinance,以及pandas的pandas_datareader等。国内的一般都需要用户认证,才能下载数据。国外的yfinance与pandas_datareader等则不需要,但需要科学上网。 聚宽 测试下…...

【GO基础学习】gin的使用
文章目录 模版使用流程参数传递路由分组数据解析和绑定gin中间件 模版使用流程 package mainimport ("net/http""github.com/gin-gonic/gin" )func main() {// 1.创建路由r : gin.Default()// 2.绑定路由规则,执行的函数// gin.Context&#x…...

网卡状态变更,virtio-net检测
实现方案: 现在在amp模式下linux端有个真实的物理网卡eth0,有一个虚拟网卡virtio-net0后端,此时需要一种机制,将真实物理网卡的状态发送rtos的virtio-net0前端。这里使用register_netdevice_notifier机制,每个virtio-n…...

中华人民共和国保守国家秘密法
中华人民共和国保守国家秘密法 (1988年9月5日第七届全国人民代表大会常务委员会第三次会议通过 2010年4月29日第十一届全国人民代表大会常务委员会第十四次会议第一次修订 2024年2月27日第十四届全国人民代表大会常务委员会第八次会议第二次修订) 目…...

ELK日志收集系统部署
1、 ElasticSearch部署 Elastic — 搜索 AI 公司 | Elastic 系统类型:Centos7.4 节点IP:172.16.246.234 软件版本:jdk-8u191-linux-x64.tar.gz、elasticsearch-6.5.4.tar.gz 示例节点:172.16.246.234 1、安装配置jdk8 ES运行依…...

3D线上艺术展:艺术与技术的完美融合
随着数字技术的飞速发展,未来的艺术展览正逐步迈向线上线下融合的新阶段。其中,3D线上展览以其独特的魅力,成为线下展览的延伸与拓展,为艺术爱好者们开辟了全新的观赏途径。 对于艺术家和策展人而言,3D线上展览不仅打…...

TiDB 的MPP架构概述
MPP架构介绍: 如图,TiDB Server 作为协调者,首先 TiDB Server 会把每个TiFlash 拥有的region 会在TiFlash上做交换,让表连接在一个TiFlash上。另外 TiFlash会作为计算节点,每个TiFlash都负责数据交换,表连接…...

Leetcode 10-正则表达式匹配/ 剑指 Offer 19. 正则表达式匹配
给你一个字符串 s 和一个字符规律 p,请你来实现一个支持 ‘.’ 和 ‘*’ 的正则表达式匹配。 ‘.’ 匹配任意单个字符 ‘*’ 匹配零个或多个前面的那一个元素 所谓匹配,是要涵盖 整个 字符串 s 的,而不是部分字符串。 题解 字符串匹配多…...

FFmpeg 编码和解码
文章目录 音频格式AACADIF音频数据交换格式ADTS音频数据传输流 音频解码音频编码 视频格式H264GOP图像组I帧,P帧,B帧H264压缩技术H264压缩级别H264视频级别H264码流结构SPSPPS 解码视频编码视频 音频格式 AAC AAC全称 Advanced Audio Coding࿰…...

kali当中web扫描工具的用法
1. cadaver 用途:用于与WebDAV服务器交互,可进行文件上传、下载、目录浏览等操作。使用方法 连接到WebDAV服务器:cadaver <WebDAV服务器地址>,例如cadaver https://example.com/dav,然后按提示输入用户名和密码…...

深度剖析 Android Animation 框架
深度剖析 Android Animation 框架 目录 引言Android Animation 框架概述架构设计 3.1 核心类与接口3.2 动画类型3.3 动画执行流程使用指南 4.1 属性动画4.2 视图动画4.3 过渡动画设计模式 5.1 策略模式5.2 观察者模式5.3 工厂模式核心逻辑 6.1 动画插值器6.2 动画估值器6.3 动…...

泰山派GPIO子系统驱动---亮灯
本人linux驱动小白,文章基于B站up主 李Sir______ 视频内容记录,做笔记用。如有错误欢迎指正。本文将以开发板第40引脚GPIO3_B4作为LED灯珠的控制引脚,高电平灯亮,低电平灯灭。 杂话 在linux内核中,芯片厂商已经把所有…...

【C#特性整理】C#特性及语法基础
1. C#特性 1.1 统一的类型系统 C#中, 所有类型都共享一个公共的基类型. 例如,任何类型的实例都可以通过调用ToString方法将自身转换为一个字符串 1.2 类和接口 接口: 用于将标准与实现隔离, 仅仅定义行为,不做实现. 1.3 属性、方法、事件 属性: 封装了一部分对…...

Presence:Colyseus用于管理实时分布式数据的工具
Colyseus Presence 详细介绍 Presence 是 Colyseus 中用于管理实时分布式数据的一种工具。它主要用于在多房间、多服务器或分布式部署中实现玩家的实时在线状态、数据共享和通信。Presence 提供了一套简单的 API 来处理诸如在线玩家跟踪、分布式数据存储和发布/订阅模式等功能…...

Ubuntu 搭建SVN服务
目录 1、安装SVN服务端 2、创建SVN版本库 3、修改SVN配置svnserve.conf 3.1 配置文件介绍 3.2 svnserve.conf配置 3.3 authz配置设置用户读写权限 3.4 passwd配置 用户名密码 4、启动SVN服务 4.1 配置开机启动 1、安装SVN服务端 sudo apt-get install subversion…...

HTML速查
HTML 基本文档 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>文档标题</title></head><body>可见文本...</body> </html>基本标签(Basic Tags) <h1>最大的…...

day-102 二进制矩阵中的最短路径
思路 BFS 解题过程 从起点依次向八个方向尝试(之后也一样),如果某个位置在矩阵内且值为0且没有访问过,将其添加到一个队列中,依次类推,直到到达出口 Code class Solution {public int shortestPathBinar…...

SQL Server大批量数据插入
数据库连接及相关操作 public class DataBase {/*** 驱动*/private static final String DRIVER PropertiesUtil.getString("spring.datasource.driver-class-name");/*** 数据库地址*/private static final String URL PropertiesUtil.getString("spring.da…...

在 Ubuntu 下通过 Docker 部署 Caddy 服务器
嘿,伙伴们!今天我们来聊聊如何在 Ubuntu 系统下通过 Docker 部署 Caddy 服务器。Caddy 是一个现代的 Web 服务器,支持自动 HTTPS,简单易用,特别适合快速搭建网站。而 Docker 则是一个让你可以隔离和管理应用的神器。结…...

ZooKeeper注册中心实现
具体步骤 安装ZooKeeper(启动端口占用,2181:客户端,8080:管理端)引入客户端依赖实现注册中心接口SPI补充ZooKeeper注册中心 引入依赖 <!-- zookeeper --> <dependency><groupId>org.a…...

数仓建模:如何进行实体建模?
目录 1 如何进行实体建模? 业务建模 领域建模 逻辑建模 2 实体建模具体步骤 需求分析...

Python编程技术
设计目的 该项目框架Scrapy可以让我们平时所学的技术整合旨在帮助学习者提高Python编程技能并熟悉基本概念: 1. 学习基本概念:介绍Python的基本概念,如变量、数据类型、条件语句、循环等。 2. 掌握基本编程技巧:教授学生如何使…...

「Mac玩转仓颉内测版55」应用篇2 - 使用函数实现更复杂的计算
本篇教程基于仓颉编程语言扩展了计算器功能,支持加减乘除的基础运算,以及幂运算和开平方等高级功能。代码经过简化后,移除了对输入的复杂校验,提升了程序的可维护性和交互效率。 关键词 仓颉编程语言函数封装高级运算 一、功能说…...

map参数详解
const items new Array(15).fill(null).map((_, index) > ({key: index 1,label: nav ${index 1}, })); $.map() 函数用于使用指定函数处理数组中的每个元素(或对象的每个属性),并将处理结果封装为新的数组返回。 注意:1. 在jQuery 1.6 之前&#…...

OSI 七层模型 | TCP/IP 四层模型
注:本文为 “OSI 七层模型 | TCP/IP 四层模型” 相关文章合辑。 未整理去重。 OSI 参考模型(七层模型) BeretSEC 于 2020-04-02 15:54:37 发布 OSI 的概念 七层模型,亦称 OSI(Open System Interconnection…...

高转速风扇|无刷暴力风扇方案设计
在当今科技高速发展的时代,电子设备的性能不断提升,散热问题也日益成为关注的焦点。而 13w 高转速暴力风扇方案的出现,为解决各种设备的散热难题提供了强大的技术支持。 一、高转速暴力风扇的重要性 随着电子设备的不断升级,其功率…...

GPU 进阶笔记(三):华为 NPU/GPU 演进
大家读完觉得有意义记得关注和点赞!!! 1 术语 1.1 CPU1.2 GPU1.3 NPU / TPU1.4 小结2 华为 DaVinci 架构:一种方案覆盖所有算力场景 2.1 场景、算力需求和解决方案2.2 Ascend NPU 设计3 路线一:NPU 用在手机芯片&…...