kafka小实站
需要先在前面的文章里面照着下载好kafka,并且启动 先启动zookeeper



项目目录
package kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** kafka消费者***/
@Component
@Slf4j
public class KafkaConsumer {@KafkaListener(topics = "${kafka.topic.name}")public void listen(ConsumerRecord<?, ?> record) {log.info("topic={}, offset={}, message={}", record.topic(), record.offset(), record.value());}}
comsumer代码
定义了一个简单的 Kafka 消费者,它使用 Spring 的 Kafka Listener 来监听指定的主题,并处理接收到的消息
-
@Component
标记这个类为一个 Spring 的组件,使其被 Spring 容器管理并可以被自动扫描和加载。 -
@Slf4j
这是 Lombok 提供的注解,自动为类生成一个名为log的 日志记录器,可以直接用log.info()等方法记录日志,而无需手动定义日志实例。 -
@KafkaListener
这是 Spring Kafka 提供的注解,用于标记一个方法为 Kafka 消息的监听器。topics:指定要监听的 Kafka 主题。这里用占位符${kafka.topic.name},说明具体的主题名称是从配置文件(如application.properties或application.yml)中读取的。
-
自动监听
当有新的消息发布到指定的 Kafka 主题时,Spring 会自动调用标注了@KafkaListener的方法来处理消息。 -
参数
ConsumerRecord<?, ?> record
代表 Kafka 中一条消费记录,包含了消息的元数据(如主题名、分区、偏移量)和消息体。record.topic():获取消息所属的 Kafka 主题名。record.offset():获取消息的偏移量(在 Kafka 中,每条消息都有一个唯一的偏移量)。record.value():获取消息的内容。
-
log.info
使用日志记录器将接收到的消息详细信息打印到日志中,包括:- 消息所在的 主题名。
- 偏移量(
offset),用于定位消息在分区中的位置。 - 消息的 内容。
package kafka; import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;/*** kafka生产者**/
@Component
@Slf4j
public class KafkaProducer implements CommandLineRunner {@ResourceKafkaTemplate kafkaTemplate;@Value("${kafka.topic.name}")private String topic;/*** 发送kafka消息** @param string*/public void send(String string) {ListenableFuture future = kafkaTemplate.send(topic, JSONObject.toJSONString(string));//future.addCallback(o -> log.info("kafka消息发送成功:" + jsonString), throwable -> log.error("kafka消息发送失败:" + jsonString));}/*** 容器启动完成后,生产测试数据*/@Overridepublic void run(String... args) {for (int i = 0; i < 10; i++) {send("hello world" + i);}}
}
producer代码
-
KafkaTemplate
这是 Spring Kafka 提供的核心类,用于向 Kafka 发送消息。通过@Resource注解实现依赖注入。 -
@Value("${kafka.topic.name}")
通过 Spring 的配置文件(如application.properties或application.yml)注入 Kafka 主题名称。 -
kafkaTemplate.send(topic, message)
通过KafkaTemplate将消息发送到指定主题。 -
JSONObject.toJSONString(string)
使用阿里巴巴的fastjson库将消息序列化为 JSON 格式。 -
ListenableFuture
Kafka 消息发送是异步的,send()方法返回一个ListenableFuture对象,可以用来监听消息的发送结果。 -
日志记录(被注释)
使用addCallback方法为发送成功和失败分别设置回调逻辑:- 发送成功时记录日志:
log.info("kafka消息发送成功:" + string)。 - 发送失败时记录错误日志:
log.error("kafka消息发送失败:" + string) -
run(String... args)
这是CommandLineRunner接口的方法,Spring 容器启动完成后会自动调用。 -
循环发送消息
启动后向 Kafka 主题发送 10 条测试消息,例如:hello world0、hello world1等。
- 发送成功时记录日志:
## kafka ##
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=testspring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.topic.name=mqtt_location_data
#
#spring.data.mongodb.host=localhost
#spring.data.mongodb.port=27017
#spring.data.mongodb.database=your_database_name
配置文件application.properties
运行结果:
2024-12-26 01:32:30.839 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:00.850 INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
2024-12-26 01:33:00.855 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.856 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.856 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted.
2024-12-26 01:33:00.970 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.971 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 01:33:00.980 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.847 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.847 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 01:33:15.874 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.875 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 3: {consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a=Assignment(partitions=[mqtt_location_data-0])}
2024-12-26 01:33:15.879 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.880 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-26 01:33:15.881 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-26 01:33:15.887 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-26 01:33:15.888 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions assigned: [mqtt_location_data-0]
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=10, message="hello world0"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=11, message="hello world1"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=12, message="hello world2"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=13, message="hello world3"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=14, message="hello world4"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=15, message="hello world5"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=16, message="hello world6"
2024-12-26 01:33:15.900 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=17, message="hello world7"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=18, message="hello world8"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=19, message="hello world9"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=20, message="hello world0"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=21, message="hello world1"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=22, message="hello world2"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=23, message="hello world3"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=24, message="hello world4"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=25, message="hello world5"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=26, message="hello world6"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=27, message="hello world7"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=28, message="hello world8"
2024-12-26 01:33:15.901 INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer : topic=mqtt_location_data, offset=29, message="hello world9"
2024-12-26 04:36:36.611 INFO 22936 --- [ntainer#0-0-C-1] o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-test-1, groupId=test] Error sending fetch request (sessionId=1770952277, epoch=389) to node 0:org.apache.kafka.common.errors.DisconnectException: null2024-12-26 04:36:36.612 INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2024-12-26 04:36:36.727 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 04:36:36.731 ERROR 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Offset commit failed on partition mqtt_location_data-0 at offset 30: The coordinator is not aware of this member.
2024-12-26 04:36:36.731 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] OffsetCommit failed with Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}: The coordinator is not aware of this member.
2024-12-26 04:36:36.732 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2024-12-26 04:36:36.732 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2024-12-26 04:36:36.732 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Asynchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2024-12-26 04:36:36.732 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Failing OffsetCommit request since the consumer is not part of an active group
2024-12-26 04:36:36.733 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Synchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Lost previously assigned partitions mqtt_location_data-0
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions lost: [mqtt_location_data-0]
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions revoked: [mqtt_location_data-0]
2024-12-26 04:36:36.733 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 04:36:36.734 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Attempt to heartbeat with stale Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, ignoring the error
2024-12-26 04:36:36.735 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: need to re-join with the given member-id
2024-12-26 04:36:36.735 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 04:36:37.543 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}
2024-12-26 04:36:37.543 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 5: {consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61=Assignment(partitions=[mqtt_location_data-0])}
2024-12-26 04:36:37.553 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}
2024-12-26 04:36:37.554 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-26 04:36:37.554 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-26 04:36:37.557 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-26 04:36:37.589 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions assigned: [mqtt_location_data-0]
2024-12-30 22:44:08.518 INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2024-12-30 22:44:09.620 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Attempt to heartbeat with Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response
2024-12-30 22:44:09.629 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Failing OffsetCommit request since the consumer is not part of an active group
2024-12-30 22:44:09.630 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Synchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Lost previously assigned partitions mqtt_location_data-0
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions lost: [mqtt_location_data-0]
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions revoked: [mqtt_location_data-0]
2024-12-30 22:44:09.630 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-30 22:44:09.636 ERROR 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Offset commit failed on partition mqtt_location_data-0 at offset 30: The coordinator is not aware of this member.
2024-12-30 22:44:09.636 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] OffsetCommit failed with Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}: The coordinator is not aware of this member.
2024-12-30 22:44:09.637 WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Asynchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer member's generation is already stale, meaning it has already participated another rebalance and got a new generation. You can try completing the rebalance by calling poll() and then retry commit again
2024-12-30 22:44:09.640 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: need to re-join with the given member-id
2024-12-30 22:44:09.640 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-30 22:44:09.663 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=7, memberId='consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd', protocol='range'}
2024-12-30 22:44:09.663 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 7: {consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd=Assignment(partitions=[mqtt_location_data-0])}
2024-12-30 22:44:09.667 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=7, memberId='consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd', protocol='range'}
2024-12-30 22:44:09.667 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-30 22:44:09.667 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-30 22:44:09.672 INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-30 22:44:10.200 INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test: partitions assigned: [mqtt_location_data-0]
相关文章:
kafka小实站
需要先在前面的文章里面照着下载好kafka,并且启动 先启动zookeeper 项目目录 package kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import…...
基于Python实现车辆检测、机动车检测、识别位置标记、计数
目录 引言背景与应用场景车辆检测的研究意义相关工作车辆检测概述机动车检测方法分类基于传统计算机视觉的检测方法基于深度学习的检测方法技术与方法车辆检测技术概述基于Python的车辆检测方法图像处理与特征提取深度学习方法(如YOLO、SSD、Faster R-CNN等)数据集与标注常用…...
心理学硕士
心理学硕士的主要研究方向包括基础心理学、发展心理学和应用心理学。 基础心理学研究一般的心理现象与规律,如心理的实质及神经机制、感觉与知觉、意识与注意、学习与记忆、思维与语言、情绪与意识、人格等。发展心理学研究人类个体心理发生发展的特点和规律&a…...
python量化分析学习与实践1:API接口篇
业内比较流行的几款API数据接口,有聚宽、TuShare,yfinance,以及pandas的pandas_datareader等。国内的一般都需要用户认证,才能下载数据。国外的yfinance与pandas_datareader等则不需要,但需要科学上网。 聚宽 测试下…...
【GO基础学习】gin的使用
文章目录 模版使用流程参数传递路由分组数据解析和绑定gin中间件 模版使用流程 package mainimport ("net/http""github.com/gin-gonic/gin" )func main() {// 1.创建路由r : gin.Default()// 2.绑定路由规则,执行的函数// gin.Context&#x…...
网卡状态变更,virtio-net检测
实现方案: 现在在amp模式下linux端有个真实的物理网卡eth0,有一个虚拟网卡virtio-net0后端,此时需要一种机制,将真实物理网卡的状态发送rtos的virtio-net0前端。这里使用register_netdevice_notifier机制,每个virtio-n…...
中华人民共和国保守国家秘密法
中华人民共和国保守国家秘密法 (1988年9月5日第七届全国人民代表大会常务委员会第三次会议通过 2010年4月29日第十一届全国人民代表大会常务委员会第十四次会议第一次修订 2024年2月27日第十四届全国人民代表大会常务委员会第八次会议第二次修订) 目…...
ELK日志收集系统部署
1、 ElasticSearch部署 Elastic — 搜索 AI 公司 | Elastic 系统类型:Centos7.4 节点IP:172.16.246.234 软件版本:jdk-8u191-linux-x64.tar.gz、elasticsearch-6.5.4.tar.gz 示例节点:172.16.246.234 1、安装配置jdk8 ES运行依…...
3D线上艺术展:艺术与技术的完美融合
随着数字技术的飞速发展,未来的艺术展览正逐步迈向线上线下融合的新阶段。其中,3D线上展览以其独特的魅力,成为线下展览的延伸与拓展,为艺术爱好者们开辟了全新的观赏途径。 对于艺术家和策展人而言,3D线上展览不仅打…...
TiDB 的MPP架构概述
MPP架构介绍: 如图,TiDB Server 作为协调者,首先 TiDB Server 会把每个TiFlash 拥有的region 会在TiFlash上做交换,让表连接在一个TiFlash上。另外 TiFlash会作为计算节点,每个TiFlash都负责数据交换,表连接…...
Leetcode 10-正则表达式匹配/ 剑指 Offer 19. 正则表达式匹配
给你一个字符串 s 和一个字符规律 p,请你来实现一个支持 ‘.’ 和 ‘*’ 的正则表达式匹配。 ‘.’ 匹配任意单个字符 ‘*’ 匹配零个或多个前面的那一个元素 所谓匹配,是要涵盖 整个 字符串 s 的,而不是部分字符串。 题解 字符串匹配多…...
FFmpeg 编码和解码
文章目录 音频格式AACADIF音频数据交换格式ADTS音频数据传输流 音频解码音频编码 视频格式H264GOP图像组I帧,P帧,B帧H264压缩技术H264压缩级别H264视频级别H264码流结构SPSPPS 解码视频编码视频 音频格式 AAC AAC全称 Advanced Audio Coding࿰…...
kali当中web扫描工具的用法
1. cadaver 用途:用于与WebDAV服务器交互,可进行文件上传、下载、目录浏览等操作。使用方法 连接到WebDAV服务器:cadaver <WebDAV服务器地址>,例如cadaver https://example.com/dav,然后按提示输入用户名和密码…...
深度剖析 Android Animation 框架
深度剖析 Android Animation 框架 目录 引言Android Animation 框架概述架构设计 3.1 核心类与接口3.2 动画类型3.3 动画执行流程使用指南 4.1 属性动画4.2 视图动画4.3 过渡动画设计模式 5.1 策略模式5.2 观察者模式5.3 工厂模式核心逻辑 6.1 动画插值器6.2 动画估值器6.3 动…...
泰山派GPIO子系统驱动---亮灯
本人linux驱动小白,文章基于B站up主 李Sir______ 视频内容记录,做笔记用。如有错误欢迎指正。本文将以开发板第40引脚GPIO3_B4作为LED灯珠的控制引脚,高电平灯亮,低电平灯灭。 杂话 在linux内核中,芯片厂商已经把所有…...
【C#特性整理】C#特性及语法基础
1. C#特性 1.1 统一的类型系统 C#中, 所有类型都共享一个公共的基类型. 例如,任何类型的实例都可以通过调用ToString方法将自身转换为一个字符串 1.2 类和接口 接口: 用于将标准与实现隔离, 仅仅定义行为,不做实现. 1.3 属性、方法、事件 属性: 封装了一部分对…...
Presence:Colyseus用于管理实时分布式数据的工具
Colyseus Presence 详细介绍 Presence 是 Colyseus 中用于管理实时分布式数据的一种工具。它主要用于在多房间、多服务器或分布式部署中实现玩家的实时在线状态、数据共享和通信。Presence 提供了一套简单的 API 来处理诸如在线玩家跟踪、分布式数据存储和发布/订阅模式等功能…...
Ubuntu 搭建SVN服务
目录 1、安装SVN服务端 2、创建SVN版本库 3、修改SVN配置svnserve.conf 3.1 配置文件介绍 3.2 svnserve.conf配置 3.3 authz配置设置用户读写权限 3.4 passwd配置 用户名密码 4、启动SVN服务 4.1 配置开机启动 1、安装SVN服务端 sudo apt-get install subversion…...
HTML速查
HTML 基本文档 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>文档标题</title></head><body>可见文本...</body> </html>基本标签(Basic Tags) <h1>最大的…...
day-102 二进制矩阵中的最短路径
思路 BFS 解题过程 从起点依次向八个方向尝试(之后也一样),如果某个位置在矩阵内且值为0且没有访问过,将其添加到一个队列中,依次类推,直到到达出口 Code class Solution {public int shortestPathBinar…...
基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...
MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
linux 下常用变更-8
1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行,YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID: YW3…...
工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配
AI3D视觉的工业赋能者 迁移科技成立于2017年,作为行业领先的3D工业相机及视觉系统供应商,累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成,通过稳定、易用、高回报的AI3D视觉系统,为汽车、新能源、金属制造等行…...
React---day11
14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store: 我们在使用异步的时候理应是要使用中间件的,但是configureStore 已经自动集成了 redux-thunk,注意action里面要返回函数 import { configureS…...
Golang——6、指针和结构体
指针和结构体 1、指针1.1、指针地址和指针类型1.2、指针取值1.3、new和make 2、结构体2.1、type关键字的使用2.2、结构体的定义和初始化2.3、结构体方法和接收者2.4、给任意类型添加方法2.5、结构体的匿名字段2.6、嵌套结构体2.7、嵌套匿名结构体2.8、结构体的继承 3、结构体与…...
【Linux系统】Linux环境变量:系统配置的隐形指挥官
。# Linux系列 文章目录 前言一、环境变量的概念二、常见的环境变量三、环境变量特点及其相关指令3.1 环境变量的全局性3.2、环境变量的生命周期 四、环境变量的组织方式五、C语言对环境变量的操作5.1 设置环境变量:setenv5.2 删除环境变量:unsetenv5.3 遍历所有环境…...
9-Oracle 23 ai Vector Search 特性 知识准备
很多小伙伴是不是参加了 免费认证课程(限时至2025/5/15) Oracle AI Vector Search 1Z0-184-25考试,都顺利拿到certified了没。 各行各业的AI 大模型的到来,传统的数据库中的SQL还能不能打,结构化和非结构的话数据如何和…...
