kafka小实站
需要先在前面的文章里面照着下载好kafka,并且启动 先启动zookeeper
项目目录
package kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** kafka消费者***/
@Component
@Slf4j
public class KafkaConsumer {@KafkaListener(topics = "${kafka.topic.name}")public void listen(ConsumerRecord<?, ?> record) {log.info("topic={}, offset={}, message={}", record.topic(), record.offset(), record.value());}}
comsumer代码
定义了一个简单的 Kafka 消费者,它使用 Spring 的 Kafka Listener 来监听指定的主题,并处理接收到的消息
-
@Component
标记这个类为一个 Spring 的组件,使其被 Spring 容器管理并可以被自动扫描和加载。 -
@Slf4j
这是 Lombok 提供的注解,自动为类生成一个名为log
的 日志记录器,可以直接用log.info()
等方法记录日志,而无需手动定义日志实例。 -
@KafkaListener
这是 Spring Kafka 提供的注解,用于标记一个方法为 Kafka 消息的监听器。topics
:指定要监听的 Kafka 主题。这里用占位符${kafka.topic.name}
,说明具体的主题名称是从配置文件(如application.properties
或application.yml
)中读取的。
-
自动监听
当有新的消息发布到指定的 Kafka 主题时,Spring 会自动调用标注了@KafkaListener
的方法来处理消息。 -
参数
ConsumerRecord<?, ?> record
代表 Kafka 中一条消费记录,包含了消息的元数据(如主题名、分区、偏移量)和消息体。record.topic()
:获取消息所属的 Kafka 主题名。record.offset()
:获取消息的偏移量(在 Kafka 中,每条消息都有一个唯一的偏移量)。record.value()
:获取消息的内容。
-
log.info
使用日志记录器将接收到的消息详细信息打印到日志中,包括:- 消息所在的 主题名。
- 偏移量(
offset
),用于定位消息在分区中的位置。 - 消息的 内容。
package kafka; import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;/*** kafka生产者**/
@Component
@Slf4j
public class KafkaProducer implements CommandLineRunner {@ResourceKafkaTemplate kafkaTemplate;@Value("${kafka.topic.name}")private String topic;/*** 发送kafka消息** @param string*/public void send(String string) {ListenableFuture future = kafkaTemplate.send(topic, JSONObject.toJSONString(string));//future.addCallback(o -> log.info("kafka消息发送成功:" + jsonString), throwable -> log.error("kafka消息发送失败:" + jsonString));}/*** 容器启动完成后,生产测试数据*/@Overridepublic void run(String... args) {for (int i = 0; i < 10; i++) {send("hello world" + i);}}
}
producer代码
-
KafkaTemplate
这是 Spring Kafka 提供的核心类,用于向 Kafka 发送消息。通过@Resource
注解实现依赖注入。 -
@Value("${kafka.topic.name}")
通过 Spring 的配置文件(如application.properties
或application.yml
)注入 Kafka 主题名称。 -
kafkaTemplate.send(topic, message)
通过KafkaTemplate
将消息发送到指定主题。 -
JSONObject.toJSONString(string)
使用阿里巴巴的fastjson
库将消息序列化为 JSON 格式。 -
ListenableFuture
Kafka 消息发送是异步的,send()
方法返回一个ListenableFuture
对象,可以用来监听消息的发送结果。 -
日志记录(被注释)
使用addCallback
方法为发送成功和失败分别设置回调逻辑:- 发送成功时记录日志:
log.info("kafka消息发送成功:" + string)
。 - 发送失败时记录错误日志:
log.error("kafka消息发送失败:" + string)
-
run(String... args)
这是CommandLineRunner
接口的方法,Spring 容器启动完成后会自动调用。 -
循环发送消息
启动后向 Kafka 主题发送 10 条测试消息,例如:hello world0
、hello world1
等。
- 发送成功时记录日志:
## kafka ##
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=testspring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.topic.name=mqtt_location_data
#
#spring.data.mongodb.host=localhost
#spring.data.mongodb.port=27017
#spring.data.mongodb.database=your_database_name
配置文件application.properties
运行结果:
2024-12-26 01:32:30.839 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:00.850 INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
2024-12-26 01:33:00.855 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.856 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.856 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted.
2024-12-26 01:33:00.970 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.971 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 01:33:00.980 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.847 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.847 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 01:33:15.874 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.875 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 3: {consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a=Assignment(partitions=[mqtt_location_data-0])}
2024-12-26 01:33:15.879 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.880 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-26 01:33:15.881 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-26 01:33:15.887 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-26 01:33:15.888 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions assigned: [mqtt_location_data-0]
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=10, message="hello world0"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=11, message="hello world1"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=12, message="hello world2"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=13, message="hello world3"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=14, message="hello world4"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=15, message="hello world5"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=16, message="hello world6"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=17, message="hello world7"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=18, message="hello world8"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=19, message="hello world9"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=20, message="hello world0"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=21, message="hello world1"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=22, message="hello world2"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=23, message="hello world3"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=24, message="hello world4"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=25, message="hello world5"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=26, message="hello world6"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=27, message="hello world7"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=28, message="hello world8"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=29, message="hello world9"
2024-12-26 04:36:36.611 INFO 22936 --- [ntainer#0-0-C-1] o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-test-1, groupId=test] Error sending fetch request (sessionId=1770952277, epoch=389) to node 0:org.apache.kafka.common.errors.DisconnectException: null2024-12-26 04:36:36.612 INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2024-12-26 04:36:36.727 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 04:36:36.731 ERROR 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Offset commit failed on partition mqtt_location_data-0 at offset 30: The coordinator is not aware of this member.
2024-12-26 04:36:36.731 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] OffsetCommit failed with Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}: The coordinator is not aware of this member.
2024-12-26 04:36:36.732 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2024-12-26 04:36:36.732 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2024-12-26 04:36:36.732 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Asynchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2024-12-26 04:36:36.732 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Failing OffsetCommit request since the consumer is not part of an active group
2024-12-26 04:36:36.733 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Synchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Lost previously assigned partitions mqtt_location_data-0
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions lost: [mqtt_location_data-0]
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions revoked: [mqtt_location_data-0]
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 04:36:36.734 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Attempt to heartbeat with stale Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, ignoring the error
2024-12-26 04:36:36.735 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: need to re-join with the given member-id
2024-12-26 04:36:36.735 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 04:36:37.543 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}
2024-12-26 04:36:37.543 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 5: {consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61=Assignment(partitions=[mqtt_location_data-0])}
2024-12-26 04:36:37.553 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}
2024-12-26 04:36:37.554 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-26 04:36:37.554 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-26 04:36:37.557 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-26 04:36:37.589 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions assigned: [mqtt_location_data-0]
2024-12-30 22:44:08.518 INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2024-12-30 22:44:09.620 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Attempt to heartbeat with Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Failing OffsetCommit request since the consumer is not part of an active group
2024-12-30 22:44:09.630 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Synchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Lost previously assigned partitions mqtt_location_data-0
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions lost: [mqtt_location_data-0]
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions revoked: [mqtt_location_data-0]
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-30 22:44:09.636 ERROR 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Offset commit failed on partition mqtt_location_data-0 at offset 30: The coordinator is not aware of this member.
2024-12-30 22:44:09.636 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] OffsetCommit failed with Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}: The coordinator is not aware of this member.
2024-12-30 22:44:09.637 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Asynchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer member's generation is already stale, meaning it has already participated another rebalance and got a new generation. You can try completing the rebalance by calling poll() and then retry commit again
2024-12-30 22:44:09.640 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: need to re-join with the given member-id
2024-12-30 22:44:09.640 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-30 22:44:09.663 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=7, memberId='consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd', protocol='range'}
2024-12-30 22:44:09.663 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 7: {consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd=Assignment(partitions=[mqtt_location_data-0])}
2024-12-30 22:44:09.667 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=7, memberId='consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd', protocol='range'}
2024-12-30 22:44:09.667 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-30 22:44:09.667 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-30 22:44:09.672 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-30 22:44:10.200 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions assigned: [mqtt_location_data-0]
相关文章:

kafka小实站
需要先在前面的文章里面照着下载好kafka,并且启动 先启动zookeeper 项目目录 package kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import…...

基于Python实现车辆检测、机动车检测、识别位置标记、计数
目录 引言背景与应用场景车辆检测的研究意义相关工作车辆检测概述机动车检测方法分类基于传统计算机视觉的检测方法基于深度学习的检测方法技术与方法车辆检测技术概述基于Python的车辆检测方法图像处理与特征提取深度学习方法(如YOLO、SSD、Faster R-CNN等)数据集与标注常用…...

心理学硕士
心理学硕士的主要研究方向包括基础心理学、发展心理学和应用心理学。 基础心理学研究一般的心理现象与规律,如心理的实质及神经机制、感觉与知觉、意识与注意、学习与记忆、思维与语言、情绪与意识、人格等。发展心理学研究人类个体心理发生发展的特点和规律&a…...
python量化分析学习与实践1:API接口篇
业内比较流行的几款API数据接口,有聚宽、TuShare,yfinance,以及pandas的pandas_datareader等。国内的一般都需要用户认证,才能下载数据。国外的yfinance与pandas_datareader等则不需要,但需要科学上网。 聚宽 测试下…...

【GO基础学习】gin的使用
文章目录 模版使用流程参数传递路由分组数据解析和绑定gin中间件 模版使用流程 package mainimport ("net/http""github.com/gin-gonic/gin" )func main() {// 1.创建路由r : gin.Default()// 2.绑定路由规则,执行的函数// gin.Context&#x…...
网卡状态变更,virtio-net检测
实现方案: 现在在amp模式下linux端有个真实的物理网卡eth0,有一个虚拟网卡virtio-net0后端,此时需要一种机制,将真实物理网卡的状态发送rtos的virtio-net0前端。这里使用register_netdevice_notifier机制,每个virtio-n…...
中华人民共和国保守国家秘密法
中华人民共和国保守国家秘密法 (1988年9月5日第七届全国人民代表大会常务委员会第三次会议通过 2010年4月29日第十一届全国人民代表大会常务委员会第十四次会议第一次修订 2024年2月27日第十四届全国人民代表大会常务委员会第八次会议第二次修订) 目…...

ELK日志收集系统部署
1、 ElasticSearch部署 Elastic — 搜索 AI 公司 | Elastic 系统类型:Centos7.4 节点IP:172.16.246.234 软件版本:jdk-8u191-linux-x64.tar.gz、elasticsearch-6.5.4.tar.gz 示例节点:172.16.246.234 1、安装配置jdk8 ES运行依…...

3D线上艺术展:艺术与技术的完美融合
随着数字技术的飞速发展,未来的艺术展览正逐步迈向线上线下融合的新阶段。其中,3D线上展览以其独特的魅力,成为线下展览的延伸与拓展,为艺术爱好者们开辟了全新的观赏途径。 对于艺术家和策展人而言,3D线上展览不仅打…...

TiDB 的MPP架构概述
MPP架构介绍: 如图,TiDB Server 作为协调者,首先 TiDB Server 会把每个TiFlash 拥有的region 会在TiFlash上做交换,让表连接在一个TiFlash上。另外 TiFlash会作为计算节点,每个TiFlash都负责数据交换,表连接…...

Leetcode 10-正则表达式匹配/ 剑指 Offer 19. 正则表达式匹配
给你一个字符串 s 和一个字符规律 p,请你来实现一个支持 ‘.’ 和 ‘*’ 的正则表达式匹配。 ‘.’ 匹配任意单个字符 ‘*’ 匹配零个或多个前面的那一个元素 所谓匹配,是要涵盖 整个 字符串 s 的,而不是部分字符串。 题解 字符串匹配多…...

FFmpeg 编码和解码
文章目录 音频格式AACADIF音频数据交换格式ADTS音频数据传输流 音频解码音频编码 视频格式H264GOP图像组I帧,P帧,B帧H264压缩技术H264压缩级别H264视频级别H264码流结构SPSPPS 解码视频编码视频 音频格式 AAC AAC全称 Advanced Audio Coding࿰…...
kali当中web扫描工具的用法
1. cadaver 用途:用于与WebDAV服务器交互,可进行文件上传、下载、目录浏览等操作。使用方法 连接到WebDAV服务器:cadaver <WebDAV服务器地址>,例如cadaver https://example.com/dav,然后按提示输入用户名和密码…...
深度剖析 Android Animation 框架
深度剖析 Android Animation 框架 目录 引言Android Animation 框架概述架构设计 3.1 核心类与接口3.2 动画类型3.3 动画执行流程使用指南 4.1 属性动画4.2 视图动画4.3 过渡动画设计模式 5.1 策略模式5.2 观察者模式5.3 工厂模式核心逻辑 6.1 动画插值器6.2 动画估值器6.3 动…...
泰山派GPIO子系统驱动---亮灯
本人linux驱动小白,文章基于B站up主 李Sir______ 视频内容记录,做笔记用。如有错误欢迎指正。本文将以开发板第40引脚GPIO3_B4作为LED灯珠的控制引脚,高电平灯亮,低电平灯灭。 杂话 在linux内核中,芯片厂商已经把所有…...

【C#特性整理】C#特性及语法基础
1. C#特性 1.1 统一的类型系统 C#中, 所有类型都共享一个公共的基类型. 例如,任何类型的实例都可以通过调用ToString方法将自身转换为一个字符串 1.2 类和接口 接口: 用于将标准与实现隔离, 仅仅定义行为,不做实现. 1.3 属性、方法、事件 属性: 封装了一部分对…...
Presence:Colyseus用于管理实时分布式数据的工具
Colyseus Presence 详细介绍 Presence 是 Colyseus 中用于管理实时分布式数据的一种工具。它主要用于在多房间、多服务器或分布式部署中实现玩家的实时在线状态、数据共享和通信。Presence 提供了一套简单的 API 来处理诸如在线玩家跟踪、分布式数据存储和发布/订阅模式等功能…...

Ubuntu 搭建SVN服务
目录 1、安装SVN服务端 2、创建SVN版本库 3、修改SVN配置svnserve.conf 3.1 配置文件介绍 3.2 svnserve.conf配置 3.3 authz配置设置用户读写权限 3.4 passwd配置 用户名密码 4、启动SVN服务 4.1 配置开机启动 1、安装SVN服务端 sudo apt-get install subversion…...
HTML速查
HTML 基本文档 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>文档标题</title></head><body>可见文本...</body> </html>基本标签(Basic Tags) <h1>最大的…...

day-102 二进制矩阵中的最短路径
思路 BFS 解题过程 从起点依次向八个方向尝试(之后也一样),如果某个位置在矩阵内且值为0且没有访问过,将其添加到一个队列中,依次类推,直到到达出口 Code class Solution {public int shortestPathBinar…...

【Axure高保真原型】引导弹窗
今天和大家中分享引导弹窗的原型模板,载入页面后,会显示引导弹窗,适用于引导用户使用页面,点击完成后,会显示下一个引导弹窗,直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)
HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...
【位运算】消失的两个数字(hard)
消失的两个数字(hard) 题⽬描述:解法(位运算):Java 算法代码:更简便代码 题⽬链接:⾯试题 17.19. 消失的两个数字 题⽬描述: 给定⼀个数组,包含从 1 到 N 所有…...

无法与IP建立连接,未能下载VSCode服务器
如题,在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈,发现是VSCode版本自动更新惹的祸!!! 在VSCode的帮助->关于这里发现前几天VSCode自动更新了,我的版本号变成了1.100.3 才导致了远程连接出…...

376. Wiggle Subsequence
376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...

让回归模型不再被异常值“带跑偏“,MSE和Cauchy损失函数在噪声数据环境下的实战对比
在机器学习的回归分析中,损失函数的选择对模型性能具有决定性影响。均方误差(MSE)作为经典的损失函数,在处理干净数据时表现优异,但在面对包含异常值的噪声数据时,其对大误差的二次惩罚机制往往导致模型参数…...
Go 语言并发编程基础:无缓冲与有缓冲通道
在上一章节中,我们了解了 Channel 的基本用法。本章将重点分析 Go 中通道的两种类型 —— 无缓冲通道与有缓冲通道,它们在并发编程中各具特点和应用场景。 一、通道的基本分类 类型定义形式特点无缓冲通道make(chan T)发送和接收都必须准备好࿰…...
动态 Web 开发技术入门篇
一、HTTP 协议核心 1.1 HTTP 基础 协议全称 :HyperText Transfer Protocol(超文本传输协议) 默认端口 :HTTP 使用 80 端口,HTTPS 使用 443 端口。 请求方法 : GET :用于获取资源,…...
Vue 模板语句的数据来源
🧩 Vue 模板语句的数据来源:全方位解析 Vue 模板(<template> 部分)中的表达式、指令绑定(如 v-bind, v-on)和插值({{ }})都在一个特定的作用域内求值。这个作用域由当前 组件…...