当前位置: 首页 > news >正文

【Kafka源码走读】Admin接口的客户端与服务端的连接流程

注:本文对应的kafka的源码的版本是trunk分支。写这篇文章的主要目的是当作自己阅读源码之后的笔记,写的有点凌乱,还望大佬们海涵,多谢!

最近在写一个Web版的kafka客户端工具,然后查看Kafka官网,发现想要与Server端建立连接,只需要执行

	Admin.create(Properties props)

方法即可,但其内部是如何工作的,不得而知。鉴于此,该死的好奇心又萌动了起来,那我们今天就来看看,当执行Admin.create(Properties props)方法之后,client是如何与Server端建立连接的。

首先,我们看下Admin.create(Properties props)方法的实现:

    static Admin create(Properties props) {return KafkaAdminClient.createInternal(new AdminClientConfig(props, true), null);}

Admin是一个接口,create()是其静态方法,该方法内部又调用的是KafkaAdminClient.createInternal()方法,createInternal()源码如下:

    static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory) {return createInternal(config, timeoutProcessorFactory, null);}

上述代码又调用了KafkaAdminClient类的另一个createInternal()方法

    static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory,HostResolver hostResolver) {Metrics metrics = null;NetworkClient networkClient = null;Time time = Time.SYSTEM;String clientId = generateClientId(config);ChannelBuilder channelBuilder = null;Selector selector = null;ApiVersions apiVersions = new ApiVersions();LogContext logContext = createLogContext(clientId);try {// Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it// simplifies communication with older brokers)AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,MetricsReporter.class,Collections.singletonMap(AdminClientConfig.CLIENT_ID_CONFIG, clientId));Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG)).timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS).recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG))).tags(metricTags);JmxReporter jmxReporter = new JmxReporter();jmxReporter.configure(config.originals());reporters.add(jmxReporter);MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));metrics = new Metrics(metricConfig, reporters, time, metricsContext);String metricGrpPrefix = "admin-client";channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),metrics, time, metricGrpPrefix, channelBuilder, logContext);networkClient = new NetworkClient(metadataManager.updater(),null,selector,clientId,1,config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),(int) TimeUnit.HOURS.toMillis(1),config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),time,true,apiVersions,null,logContext,(hostResolver == null) ? new DefaultHostResolver() : hostResolver);return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient,timeoutProcessorFactory, logContext);} catch (Throwable exc) {closeQuietly(metrics, "Metrics");closeQuietly(networkClient, "NetworkClient");closeQuietly(selector, "Selector");closeQuietly(channelBuilder, "ChannelBuilder");throw new KafkaException("Failed to create new KafkaAdminClient", exc);}}

前面的都是构造参数,关注以下这行代码:

	return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient,timeoutProcessorFactory, logContext);

KafkaAdminClient的构造方法如下:

    private KafkaAdminClient(AdminClientConfig config,String clientId,Time time,AdminMetadataManager metadataManager,Metrics metrics,KafkaClient client,TimeoutProcessorFactory timeoutProcessorFactory,LogContext logContext) {this.clientId = clientId;this.log = logContext.logger(KafkaAdminClient.class);this.logContext = logContext;this.requestTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);this.defaultApiTimeoutMs = configureDefaultApiTimeoutMs(config);this.time = time;this.metadataManager = metadataManager;this.metrics = metrics;this.client = client;this.runnable = new AdminClientRunnable();String threadName = NETWORK_THREAD_PREFIX + " | " + clientId;this.thread = new KafkaThread(threadName, runnable, true);this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ?new TimeoutProcessorFactory() : timeoutProcessorFactory;this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);this.retryBackoffMs = config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);config.logUnused();AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());log.debug("Kafka admin client initialized");thread.start();}

上面的代码,大部分都是传递参数,但里面有个细节,不能忽略。最后一行代码是thread.start(),这里启动了一个线程,根据thread对象往前找,看看该对象是如何初始化的:

    this.thread = new KafkaThread(threadName, runnable, true);

由此可知,threadKafkaThread构造的对象,KafkaThread继承于Thread类。同时,上述代码中KafkaThread的构造方法中的第二个参数是runnable,该参数的定义如下:

    this.runnable = new AdminClientRunnable();

既然runnable是类AdminClientRunnable构造的对象,那么,当thread.start()代码执行之后,类AdminClientRunnablerun()方法就开始执行了,我们看下run()方法的源码:

    @Overridepublic void run() {log.debug("Thread starting");try {processRequests();} finally {closing = true;AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);int numTimedOut = 0;TimeoutProcessor timeoutProcessor = new TimeoutProcessor(Long.MAX_VALUE);synchronized (this) {numTimedOut += timeoutProcessor.handleTimeouts(newCalls, "The AdminClient thread has exited.");}numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, "The AdminClient thread has exited.");numTimedOut += timeoutCallsToSend(timeoutProcessor);numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),"The AdminClient thread has exited.");if (numTimedOut > 0) {log.info("Timed out {} remaining operation(s) during close.", numTimedOut);}closeQuietly(client, "KafkaClient");closeQuietly(metrics, "Metrics");log.debug("Exiting AdminClientRunnable thread.");}}

在上述代码中,只需关注processRequests()方法,源码如下:

    private void processRequests() {long now = time.milliseconds();while (true) {// Copy newCalls into pendingCalls.drainNewCalls();// Check if the AdminClient thread should shut down.long curHardShutdownTimeMs = hardShutdownTimeMs.get();if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) && threadShouldExit(now, curHardShutdownTimeMs))break;// Handle timeouts.TimeoutProcessor timeoutProcessor = timeoutProcessorFactory.create(now);timeoutPendingCalls(timeoutProcessor);timeoutCallsToSend(timeoutProcessor);timeoutCallsInFlight(timeoutProcessor);long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs());if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) {pollTimeout = Math.min(pollTimeout, curHardShutdownTimeMs - now);}// Choose nodes for our pending calls.pollTimeout = Math.min(pollTimeout, maybeDrainPendingCalls(now));long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now);if (metadataFetchDelayMs == 0) {metadataManager.transitionToUpdatePending(now);Call metadataCall = makeMetadataCall(now);// Create a new metadata fetch call and add it to the end of pendingCalls.// Assign a node for just the new call (we handled the other pending nodes above).if (!maybeDrainPendingCall(metadataCall, now))pendingCalls.add(metadataCall);}pollTimeout = Math.min(pollTimeout, sendEligibleCalls(now));if (metadataFetchDelayMs > 0) {pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs);}// Ensure that we use a small poll timeout if there are pending calls which need to be sentif (!pendingCalls.isEmpty())pollTimeout = Math.min(pollTimeout, retryBackoffMs);// Wait for network responses.log.trace("Entering KafkaClient#poll(timeout={})", pollTimeout);List<ClientResponse> responses = client.poll(Math.max(0L, pollTimeout), now);log.trace("KafkaClient#poll retrieved {} response(s)", responses.size());// unassign calls to disconnected nodesunassignUnsentCalls(client::connectionFailed);// Update the current time and handle the latest responses.now = time.milliseconds();handleResponses(now, responses);}}

额,上面的代码,此时并未发现连接Server的过程,同时,我发现上述代码通过poll()方法在获取Server端的消息:

    List<ClientResponse> responses = client.poll(Math.max(0L, pollTimeout), now);

按照我当时看这段代码的思路,由于这部分代码没有连接的过程,所以,我也就不进入poll()方法了,从方法名上看,它里面也应该没有连接的过程,所以转而回头看下client对象是如何定义的,在KafkaAdminClient.createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory, HostResolver hostResolver)方法中,定义如下:

    networkClient = new NetworkClient(metadataManager.updater(),null,selector,clientId,1,config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),(int) TimeUnit.HOURS.toMillis(1),config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),time,true,apiVersions,null,logContext,(hostResolver == null) ? new DefaultHostResolver() : hostResolver);

再看下NetworkClient的构造函数:

    public NetworkClient(MetadataUpdater metadataUpdater,Metadata metadata,Selectable selector,String clientId,int maxInFlightRequestsPerConnection,long reconnectBackoffMs,long reconnectBackoffMax,int socketSendBuffer,int socketReceiveBuffer,int defaultRequestTimeoutMs,long connectionSetupTimeoutMs,long connectionSetupTimeoutMaxMs,Time time,boolean discoverBrokerVersions,ApiVersions apiVersions,Sensor throttleTimeSensor,LogContext logContext,HostResolver hostResolver) {/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the* super constructor is invoked.*/if (metadataUpdater == null) {if (metadata == null)throw new IllegalArgumentException("`metadata` must not be null");this.metadataUpdater = new DefaultMetadataUpdater(metadata);} else {this.metadataUpdater = metadataUpdater;}this.selector = selector;this.clientId = clientId;this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax,connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, logContext, hostResolver);this.socketSendBuffer = socketSendBuffer;this.socketReceiveBuffer = socketReceiveBuffer;this.correlation = 0;this.randOffset = new Random();this.defaultRequestTimeoutMs = defaultRequestTimeoutMs;this.reconnectBackoffMs = reconnectBackoffMs;this.time = time;this.discoverBrokerVersions = discoverBrokerVersions;this.apiVersions = apiVersions;this.throttleTimeSensor = throttleTimeSensor;this.log = logContext.logger(NetworkClient.class);this.state = new AtomicReference<>(State.ACTIVE);}

事与愿违,有点尴尬,从NetworkClient的构造方法来看,也不涉及连接Server端的代码,那连接是在什么时候发生的呢?我想到快速了解NetworkClient类中都有哪些方法,以寻找是否有建立连接的方法。可喜的是,我找到了initiateConnect(Node node, long now)方法,见下图:

这个方法像是连接Server的,然后顺着这个方法,去查看是谁在调用它的,如下图所示:

调用栈显示,有两个方法调用了initiateConnect()方法,他们分别是ready()maybeUpdate()方法,然后分别对ready()maybeUpdate()方法又进行反向跟踪,看他们又分别被谁调用,中间的反向调用过程在这里就省略了,感兴趣的可以自己去研究下。

我们先从maybeUpdate()方法着手吧,通过该方法,最后可追踪到maybeUpdate()方法最终被poll()所调用。嗯?是不是前面我们也跟踪到poll()方法了。难道就是在调用poll方法之后,才实现连接Server的过程?下面是poll()方法的实现:

    /*** Do actual reads and writes to sockets.** @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,*                must be non-negative. The actual timeout will be the minimum of timeout, request timeout and*                metadata timeout* @param now The current time in milliseconds* @return The list of responses received*/@Overridepublic List<ClientResponse> poll(long timeout, long now) {ensureActive();if (!abortedSends.isEmpty()) {// If there are aborted sends because of unsupported version exceptions or disconnects,// handle them immediately without waiting for Selector#poll.List<ClientResponse> responses = new ArrayList<>();handleAbortedSends(responses);completeResponses(responses);return responses;}long metadataTimeout = metadataUpdater.maybeUpdate(now);try {this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));} catch (IOException e) {log.error("Unexpected error during I/O", e);}// process completed actionslong updatedNow = this.time.milliseconds();List<ClientResponse> responses = new ArrayList<>();handleCompletedSends(responses, updatedNow);handleCompletedReceives(responses, updatedNow);handleDisconnections(responses, updatedNow);handleConnections();handleInitiateApiVersionRequests(updatedNow);handleTimedOutConnections(responses, updatedNow);handleTimedOutRequests(responses, updatedNow);completeResponses(responses);return responses;}

由上述代码可知,maybeUpdate()方法是被metadataUpdater对象所调用,接下来我们就需要了解metadataUpdater对象属于哪个类。
回到NetworkClient的构造方法可看到这段代码:

    if (metadataUpdater == null) {if (metadata == null)throw new IllegalArgumentException("`metadata` must not be null");this.metadataUpdater = new DefaultMetadataUpdater(metadata);} else {this.metadataUpdater = metadataUpdater;}

注意这里,如果metadataUpdater的值为null,则metadataUpdater = new DefaultMetadataUpdater(metadata),也就是说metadataUpdater对象属于DefaultMetadataUpdater类;

如果metadataUpdater的值不为null,则其值保持不变,也就是说,这个值是由调用者传入的。
现在我们需要跟踪调用者传入该值时是否为null,则需要回到KafkaAdminClient.createInternal()方法,下面对代码进行了精简,仅关注重点:

    AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));.......部分代码省略......networkClient = new NetworkClient(metadataManager.updater(),null,selector,clientId,1,config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),(int) TimeUnit.HOURS.toMillis(1),config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),time,true,apiVersions,null,logContext,(hostResolver == null) ? new DefaultHostResolver() : hostResolver);

由上述代码可知,在传入NetworkClient的构造方法时,metadataManager.updater()=AdminMetadataManager.updater(),而AdminMetadataManager的源码如下:

    public AdminMetadataManager(LogContext logContext, long refreshBackoffMs, long metadataExpireMs) {this.log = logContext.logger(AdminMetadataManager.class);this.refreshBackoffMs = refreshBackoffMs;this.metadataExpireMs = metadataExpireMs;this.updater = new AdminMetadataUpdater();}public AdminMetadataUpdater updater() {return updater;}

由此可知,传入NetworkClient的构造方法时的metadataUpdater对象并不为null,且该对象属于AdminMetadataUpdater类。

好了,到这里我们已经把metadataUpdater的值搞清楚了,其值并不为null。但如果通过IDE的代码默认跟踪方式,会将metadataUpdater的值定位为DefaultMetadataUpdater类,如果是这样,那会有什么影响呢?

前面我们提到,NetworkClient.poll()方法会调用maybeUpdate()方法,即如下这行代码:

    long metadataTimeout = metadataUpdater.maybeUpdate(now);

metadataUpdater对象如果为DefaultMetadataUpdater类,则调用上述maybeUpdate(now)方法时,会执行连接Server的过程,源码如下:

    @Overridepublic long maybeUpdate(long now) {// should we update our metadata?long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);long waitForMetadataFetch = hasFetchInProgress() ? defaultRequestTimeoutMs : 0;long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);if (metadataTimeout > 0) {return metadataTimeout;}// Beware that the behavior of this method and the computation of timeouts for poll() are// highly dependent on the behavior of leastLoadedNode.Node node = leastLoadedNode(now);if (node == null) {log.debug("Give up sending metadata request since no node is available");return reconnectBackoffMs;}return maybeUpdate(now, node);}# maybeUpdate(now)再调用maybeUpdate(now, node)方法,代码如下:private long maybeUpdate(long now, Node node) {String nodeConnectionId = node.idString();if (canSendRequest(nodeConnectionId, now)) {Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion(now);MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;log.debug("Sending metadata request {} to node {}", metadataRequest, node);sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);inProgress = new InProgressData(requestAndVersion.requestVersion, requestAndVersion.isPartialUpdate);return defaultRequestTimeoutMs;}// If there's any connection establishment underway, wait until it completes. This prevents// the client from unnecessarily connecting to additional nodes while a previous connection// attempt has not been completed.if (isAnyNodeConnecting()) {// Strictly the timeout we should return here is "connect timeout", but as we don't// have such application level configuration, using reconnect backoff instead.return reconnectBackoffMs;}if (connectionStates.canConnect(nodeConnectionId, now)) {// We don't have a connection to this node right now, make onelog.debug("Initialize connection to node {} for sending metadata request", node);# 这里就是连接Server端的入口了initiateConnect(node, now);return reconnectBackoffMs;}// connected, but can't send more OR connecting// In either case, we just need to wait for a network event to let us know the selected// connection might be usable again.return Long.MAX_VALUE;}

注意上述代码的中文注释部分,initiateConnect(node, now)方法就是连接Server端的入口,该方法的实现如下:

    /*** Initiate a connection to the given node* @param node the node to connect to* @param now current time in epoch milliseconds*/private void initiateConnect(Node node, long now) {String nodeConnectionId = node.idString();try {connectionStates.connecting(nodeConnectionId, now, node.host());InetAddress address = connectionStates.currentAddress(nodeConnectionId);log.debug("Initiating connection to node {} using address {}", node, address);selector.connect(nodeConnectionId,new InetSocketAddress(address, node.port()),this.socketSendBuffer,this.socketReceiveBuffer);} catch (IOException e) {log.warn("Error connecting to node {}", node, e);// Attempt failed, we'll try again after the backoffconnectionStates.disconnected(nodeConnectionId, now);// Notify metadata updater of the connection failuremetadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());}}

所以,metadataUpdater对象如果为DefaultMetadataUpdater类,就会在调用poll()方法时,初始化连接Server的过程。但前面已知,metadataUpdater对象属于AdminMetadataUpdater类,他又是在哪里与Server进行连接的呢?

我们再回到之前已知悉的内容,有两个方法调用了initiateConnect()方法,他们分别是ready()maybeUpdate()方法。通过上面的跟踪,目前可以排除maybeUpdate()方法了。接下来,通过ready()方法,我们再反向跟踪一下,哪些地方都调用了ready()方法。

通过层层筛选,发现KafkaAdminClient.sendEligibleCalls()方法调用了ready()方法,如下图所示:

通过sendEligibleCalls()方法又反向查找是谁在调用该方法,如下图所示:

由图可知,是KafkaAdminClient.processRequests()方法调用了sendEligibleCalls()方法,而processRequests()方法正是我们前面跟踪代码时,发现无法继续跟踪的地方。精简之后的代码如下:

    private void processRequests(){long now=time.milliseconds();while(true){// Copy newCalls into pendingCalls.drainNewCalls();......部分代码省略......pollTimeout=Math.min(pollTimeout,sendEligibleCalls(now));......部分代码省略......// Wait for network responses.log.trace("Entering KafkaClient#poll(timeout={})",pollTimeout);List<ClientResponse> responses=client.poll(Math.max(0L,pollTimeout),now);log.trace("KafkaClient#poll retrieved {} response(s)",responses.size());......部分代码省略......}}

由上述代码可知,与Server端的连接是在poll()方法执行之前,隐藏在pollTimeout=Math.min(pollTimeout,sendEligibleCalls(now));代码中。如果想要验证自己的理解是否正确,则可以通过调试源码,增加断点来验证,这里就略过了。

现在回过头来,就会发现,为什么我之前读到这个processRequests()方法时,没有发现这个方法呢?因为没有注意到一些细节,所以忽略了这个方法,误以为连接发生在其他地方。

当然,这可能也和我的惯性思维有关,总是通过类名和方法名来猜测这个方法的大概意图,然后当找不到流程的时候,就通过反向查找调用栈的方式去梳理执行流程,也算是殊途同归吧。

最后,用一张时序图来总结下上面的内容:

相关文章:

【Kafka源码走读】Admin接口的客户端与服务端的连接流程

注&#xff1a;本文对应的kafka的源码的版本是trunk分支。写这篇文章的主要目的是当作自己阅读源码之后的笔记&#xff0c;写的有点凌乱&#xff0c;还望大佬们海涵&#xff0c;多谢&#xff01; 最近在写一个Web版的kafka客户端工具&#xff0c;然后查看Kafka官网&#xff0c;…...

Windows API遍历桌面上所有文件

要获取桌面上的图标&#xff0c;可以使用Windows API中的Shell API。以下是遍历桌面上所有文件的示例代码&#xff1a; #include <Windows.h> #include <ShlObj.h> #include <iostream> #include <vector> using namespace std;int main() {// 获取桌…...

【MySQL】基本查询(插入查询结果、聚合函数、分组查询)

目录 一、插入查询结果二、聚合函数三、分组查询&#xff08;group by & having&#xff09;四、SQL查询的执行顺序五、OJ练习 一、插入查询结果 语法&#xff1a; INSERT INTO table_name [(column [, column ...])] SELECT ...案例&#xff1a;删除表中重复数据 --创建…...

【Go语言】Golang保姆级入门教程 Go初学者介绍chapter1

Golang 开山篇 Golang的学习方向 区块链研发工程师&#xff1a; 去中心化 虚拟货币 金融 Go服务器端、游戏软件工程师 &#xff1a; C C 处理日志 数据打包 文件系统 数据处理 很厉害 处理大并发 Golang分布式、云计算软件工程师&#xff1a;盛大云 cdn 京东 消息推送 分布式文…...

mysql 自增长键值增量设置

参考文章 MySQL中auto_increment的初值和增量值设置_auto_increment怎么设置_linda公馆的博客-CSDN博客 其中关键语句 show VARIABLES like %auto_increment% set auto_increment_increment4; set auto_increment_offset2;...

【pytho】request五种种请求处理为空和非空处理以及上传excel,上传图片处理

一、python中请求处理 request.args获取的是个字典&#xff0c;所以可以通过get方式获取请求参数和值 request.form获取的也是个字典&#xff0c;所以也可以通过get方式获取请求的form参数和值 request.data&#xff0c;使用过JavaScript&#xff0c;api调用方式进行掺入jso…...

【全面解析】Windows 如何使用 SSH 密钥远程连接 Linux 服务器

创建密钥 创建 linux 服务器端的终端中执行命令 ssh-keygen&#xff0c;之后一直按Enter即可&#xff0c;这样会在将在 ~/.ssh/ 路径下生成公钥(id_rsa.pub)和私钥(id_rsa) 注意&#xff1a;也可以在 windows 端生成密钥&#xff0c;只需要保证公钥在服务器端&#xff0c;私钥…...

解锁新技能《基于logback的纯java版本SDK实现》

开源SDK&#xff1a; <!--Java通用日志组件SDK--> <dependency><groupId>io.github.mingyang66</groupId><artifactId>oceansky-logger</artifactId><version>4.3.6</version> </dependency> <!-- Java基于logback的…...

你需要知道的云原生架构体系内容

云原生&#xff08;Cloud-Native&#xff09;的概念在国内提及的越来越多&#xff0c;但大部分人对云原生的认识仅限于容器、微服务、DevOps等内容&#xff0c;把容器、微服务、 DevOps就等同于云原生&#xff0c;这显然是不对的。CNCF从其自身的角度定义了云原生技术&#xff…...

安全渗透--正则表达式

什么是正则表达式&#xff1f; 正则表达式是一组由字母和符号组成的特殊文本&#xff0c;它可以用来从文本中找出满足你想要的格式的句子。 一个正则表达式是一种从左到右匹配主体字符串的模式。 “Regular expression”这个词比较拗口&#xff0c;我们常使用缩写的术语“regex…...

git如何撤销commit(未push)

文章目录 前言undo commitreset current branch to here Undo Commit&#xff0c;Revert Commit&#xff0c;Drop Commit的区别 是否删除对代码的修改是否删除Commit记录是否会新增Commit记录Undo Commit不会未Push会&#xff0c;已Push不会不会Revert Commit会不会会Drop Com…...

Vue数组与字符串互转

一、数组转换成字符串的方法 join() var arr [A, B, C]; var str arr.join(、); console.log(str); // 输出 A、B、C toString() var arr [A, B, C]; var str arr.toString(); console.log(str); // 输出 A, B, C JSON.stringify() var arr [A, B, C]; var str JSO…...

Java编程实现遍历两个MAC地址之间所有MAC的方法

Java编程实现遍历两个MAC地址之间所有MAC的方法 本文实例讲述了java编程实现遍历两个MAC地址之间所有MAC的方法。分享给大家供大http://家参考&#xff0c;具体如下&#xff1a; 在对发放的设备进行后台管理时,很多时候会用到设备MAC这个字段,它可以标识唯一一个设备。然而在数…...

用AXIS2发布WebService的方法

Axis2+tomcat6.0 实现webService 服务端发布与客户端的调用。 Aixs2开发webService的方法有很多,在此只介绍一种比较简单的实现方法。 第一步:首先要下载开发所需要的jar包 下载:axis2-1.6.1-war.zip http://www.apache.org/dist//axis/axis2/java/core/1.6.1/ 下载…...

嵌入式学习_Day 003

程序功能介绍 c #include <stdio.h> int main() {char c,ll;printf("please enter a capital letter:");cgetchar();getchar();if (c>A&& c<Z) {llc32;printf("Lowercase letter output:%c\n",ll);printf("ASCII value:%d\n"…...

常用的数据结构 JAVA

目录 1、线性表2、栈&#xff1a;3、队列&#xff1a; 1、线性表 List<Object> narnat new ArrayList<>();ArrayList&#xff1a;动态数组 1、可以嵌套使用 2、add(x)添加元素x&#xff0c;remove(index)删除某个位置的元素 3、注意list是指向性的&#xff0c…...

基于机器视觉工具箱和形态学处理的视频中目标形状检测算法matlab仿真

目录 1.算法理论概述 2.部分核心程序 3.算法运行软件版本 4.算法运行效果图预览 5.算法完整程序工程 1.算法理论概述 目标形状检测是计算机视觉领域的重要任务之一&#xff0c;旨在从视频序列中自动检测和识别特定目标的形状。本文介绍一种基于机器视觉工具箱和形态学处理…...

小白入门:sentence-transformer 提取embedding模型转onnx

文章目录 序言原理讲解哪些部分可转onnx 代码区0. 安装依赖1. 路径配置2. 测试数据3. 准备工作3.1迁移保存目标文件 4. model转onnx-gpu5. 测试一下是否出错以及速度5.1 测试速度是否OK5.2测试结果是否OK 6. tar 这些文件 序言 本文适合小白入门&#xff0c;以自己训练的句子e…...

数据库应用:Redis持久化

目录 一、理论 1.Redis 高可用 2.Redis持久化 3.RDB持久化 4.AOF持久化&#xff08;支持秒级写入&#xff09; 5.RDB和AOF的优缺点 6.RDB和AOF对比 7.Redis性能管理 8.Redis的优化 二、实验 1.RDB持久化 2.AOF持久化 3.Redis性能管理 4.Redis的优化 三、总结 一、…...

js版计算比亚迪行驶里程连续12个月计算不超3万公里改进版带echar

<!DOCTYPE html> <html lang"zh-CN" style"height: 100%"> <head> <meta charset"utf-8" /> <title>连续12个月不超3万公里计算LIGUANGHUA</title> <style> .clocks { …...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录

ASP.NET Core 是一个跨平台的开源框架&#xff0c;用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录&#xff0c;以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

Cesium1.95中高性能加载1500个点

一、基本方式&#xff1a; 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...

python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)

更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

JVM 内存结构 详解

内存结构 运行时数据区&#xff1a; Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器&#xff1a; ​ 线程私有&#xff0c;程序控制流的指示器&#xff0c;分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 ​ 每个线程都有一个程序计数…...

AGain DB和倍数增益的关系

我在设置一款索尼CMOS芯片时&#xff0c;Again增益0db变化为6DB&#xff0c;画面的变化只有2倍DN的增益&#xff0c;比如10变为20。 这与dB和线性增益的关系以及传感器处理流程有关。以下是具体原因分析&#xff1a; 1. dB与线性增益的换算关系 6dB对应的理论线性增益应为&…...

安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲

文章目录 前言第一部分&#xff1a;体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分&#xff1a;体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...

GitFlow 工作模式(详解)

今天再学项目的过程中遇到使用gitflow模式管理代码&#xff0c;因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存&#xff0c;无论是github还是gittee&#xff0c;都是一种基于git去保存代码的形式&#xff0c;这样保存代码…...

NPOI Excel用OLE对象的形式插入文件附件以及插入图片

static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...

PHP 8.5 即将发布:管道操作符、强力调试

前不久&#xff0c;PHP宣布了即将在 2025 年 11 月 20 日 正式发布的 PHP 8.5&#xff01;作为 PHP 语言的又一次重要迭代&#xff0c;PHP 8.5 承诺带来一系列旨在提升代码可读性、健壮性以及开发者效率的改进。而更令人兴奋的是&#xff0c;借助强大的本地开发环境 ServBay&am…...