当前位置: 首页 > news >正文

redis 使用Lettuce 当redis挂掉重启之后 网络是怎么重新连接

Lettuce是一个高性能的Java Redis客户端,支持同步、异步和反应式编程模式

Lettuce的核心功能包括:

  • 高性能‌:通过使用Netty作为底层网络通信框架,实现了非阻塞IO,提高了性能。
  • 丰富的API‌:提供了丰富的Redis命令API,支持多种Redis数据类型和操作。
  • 高级特性‌:支持命令批处理、事务、发布订阅等功能,并且可以适应不同的Redis数据类型和应用场景。
  • 灵活性‌:支持多种Redis序列化器和编解码器,方便在不同场景下使用。

Lettuce的这些特性使得它成为了一个受欢迎的Redis客户端,广泛应用于各种需要高性能Redis交互的场景中‌。

Lettuce使用了Connection Watchdog(连接看门狗),用于管理和监控与远程服务器的连接。在网络通信中,Channel 代表了与远程服务的连接,当连接丢失或关闭时,Connection Watchdog 会自动尝试重新连接。它并不直接依赖于 channelActive() 来实现自动重连,而是负责在连接丢失时主动检测并安排重新连接的任务。

主要作用

  1. 监控连接状态

    • ConnectionWatchdog 继承自 ChannelInboundHandlerAdapter,实现了 Netty 的 ChannelHandler 接口。它在 channelActive()channelInactive() 事件中插入了额外的逻辑。
    • 当连接激活时(channelActive()),它会初始化连接,清除之前的重连状态。
    • 当连接关闭时(channelInactive()),它会检测连接是否已经关闭,并尝试重新连接。
  2. 自动重连

    • 自动调度重连:如果连接丢失,ConnectionWatchdog 会在适当的时间间隔后安排一个新的重连尝试。
    • 重连机制:它会基于预定义的重连延迟、尝试次数和其他条件,调度新的重连任务。重连操作是异步执行的,使用 reconnectWorkers 线程池来处理。
    • 延迟处理:重连延迟使用 DelayStatefulDelay 管理,以确保每次重连尝试之间有适当的间隔,防止过于频繁的重连尝试。
  3. 连接恢复

    • 如果 channelInactive() 事件触发(即连接丢失),ConnectionWatchdog 会在重连条件满足时重新启动连接。它通过 reconnectionHandler.reconnect() 来尝试重新建立连接。
    • 重连失败事件:如果重连尝试失败,它会触发 ReconnectFailedEvent 事件,将失败信息发布到事件总线 eventBus,供其他组件处理。
  4. 支持可配置的重连逻辑

    • ConnectionWatchdog 提供了多种配置项来控制重连行为,如:
      • 重连延迟:使用 Delay 来管理每次重连之间的延迟。
      • 重连调度:在连接丢失时,自动触发重连,且支持延迟、间隔等参数。
      • 重连暂停:通过 setReconnectSuspended(true) 方法可以暂停重连尝试,避免在某些情况下自动重连。

关键方法

  1. channelActive(ChannelHandlerContext ctx):

    • 这个方法在 Channel 激活时调用。当连接建立成功时,会被触发。它会初始化一些内部状态,并清除之前的重连调度。
  2. channelInactive(ChannelHandlerContext ctx):

    • Channel 关闭时调用。如果连接丢失,ConnectionWatchdog 会根据当前配置来判断是否需要进行重连。
    • 如果启用了重连监听(listenOnChannelInactivetrue),并且重连没有被暂停,它会调用 scheduleReconnect() 来触发重连。
  3. scheduleReconnect():

    • 用于调度下一次的重连尝试。它会检查当前连接是否有效,如果没有有效的连接(即连接丢失),则会安排在适当的延迟后尝试重新连接。
    • 这个方法会使用 reconnectDelay 来计算每次重连之间的延迟时间。
    • 重连尝试会通过 reconnectionHandler.reconnect() 来实际执行重连逻辑。
  4. run(int attempt):

    • 这是执行实际重连的代码。如果 scheduleReconnect() 被调用,run() 会尝试重新建立连接。
    • 如果重连成功,它会停止重连操作;如果失败,则发布 ReconnectFailedEvent 事件,并根据情况决定是否继续重连。

为什么 channelActive() 不会自动重连?

Netty 中,channelActive() 只是一个通道激活事件。当一个连接成功建立时,channelActive() 会被触发,通常表示连接已经准备好进行数据传输。然而,channelActive() 事件本身并不处理连接丢失后的自动重连

  • ConnectionWatchdog 的作用就是在连接丢失或关闭时自动安排重连任务。
  • 自动重连的原因:因为一旦连接丢失,channelInactive() 会被触发。ConnectionWatchdog 会在 channelInactive() 中判断是否启用重连逻辑,然后调度一个新的重连任务,确保在连接失败后能够尝试重新连接。
  • channelActive() 只关心通道的初始化,不能保证在通道关闭或掉线后自动恢复连接。ConnectionWatchdog 负责在通道断开后,通过一定的重连策略来确保连接恢复。

总结

ConnectionWatchdog 的作用是:

  • 监控连接的生命周期,当连接丢失时触发重连。
  • 管理重连过程,通过延迟和重连尝试次数来合理安排重连。
  • 确保连接恢复,并在重连失败时通过事件总线通知其他组件。

因此,channelActive() 只是连接建立时的一个简单事件,而 ConnectionWatchdog 是负责监控连接丢失后自动重连的核心组件。

##看门狗源码

/** Copyright 2011-2019 the original author or authors.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**      http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package io.lettuce.core.protocol;import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionEvents;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.event.connection.ReconnectFailedEvent;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.resource.Delay;
import io.lettuce.core.resource.Delay.StatefulDelay;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;/*** A netty {@link ChannelHandler} responsible for monitoring the channel and reconnecting when the connection is lost.** @author Will Glozer* @author Mark Paluch* @author Koji Lin*/
@ChannelHandler.Sharable
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {private static final long LOGGING_QUIET_TIME_MS = TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS);private static final InternalLogger logger = InternalLoggerFactory.getInstance(ConnectionWatchdog.class);private final Delay reconnectDelay;private final Bootstrap bootstrap;private final EventExecutorGroup reconnectWorkers;private final ReconnectionHandler reconnectionHandler;private final ReconnectionListener reconnectionListener;private final Timer timer;private final EventBus eventBus;private Channel channel;private SocketAddress remoteAddress;private long lastReconnectionLogging = -1;private String logPrefix;private final AtomicBoolean reconnectSchedulerSync;private volatile int attempts;private volatile boolean armed;private volatile boolean listenOnChannelInactive;private volatile Timeout reconnectScheduleTimeout;/*** Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup} and establishes a new* {@link Channel} when disconnected, while reconnect is true. The socketAddressSupplier can supply the reconnect address.** @param reconnectDelay reconnect delay, must not be {@literal null}* @param clientOptions client options for the current connection, must not be {@literal null}* @param bootstrap Configuration for new channels, must not be {@literal null}* @param timer Timer used for delayed reconnect, must not be {@literal null}* @param reconnectWorkers executor group for reconnect tasks, must not be {@literal null}* @param socketAddressSupplier the socket address supplier to obtain an address for reconnection, may be {@literal null}* @param reconnectionListener the reconnection listener, must not be {@literal null}* @param connectionFacade the connection facade, must not be {@literal null}* @param eventBus Event bus to emit reconnect events.*/public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Bootstrap bootstrap, Timer timer,EventExecutorGroup reconnectWorkers, Mono<SocketAddress> socketAddressSupplier,ReconnectionListener reconnectionListener, ConnectionFacade connectionFacade, EventBus eventBus) {LettuceAssert.notNull(reconnectDelay, "Delay must not be null");LettuceAssert.notNull(clientOptions, "ClientOptions must not be null");LettuceAssert.notNull(bootstrap, "Bootstrap must not be null");LettuceAssert.notNull(timer, "Timer must not be null");LettuceAssert.notNull(reconnectWorkers, "ReconnectWorkers must not be null");LettuceAssert.notNull(socketAddressSupplier, "SocketAddressSupplier must not be null");LettuceAssert.notNull(reconnectionListener, "ReconnectionListener must not be null");LettuceAssert.notNull(connectionFacade, "ConnectionFacade must not be null");LettuceAssert.notNull(eventBus, "EventBus must not be null");this.reconnectDelay = reconnectDelay;this.bootstrap = bootstrap;this.timer = timer;this.reconnectWorkers = reconnectWorkers;this.reconnectionListener = reconnectionListener;this.reconnectSchedulerSync = new AtomicBoolean(false);this.eventBus = eventBus;Mono<SocketAddress> wrappedSocketAddressSupplier = socketAddressSupplier.doOnNext(addr -> remoteAddress = addr).onErrorResume(t -> {if (logger.isDebugEnabled()) {logger.warn("Cannot retrieve current address from socketAddressSupplier: " + t.toString()+ ", reusing cached address " + remoteAddress, t);} else {logger.warn("Cannot retrieve current address from socketAddressSupplier: " + t.toString()+ ", reusing cached address " + remoteAddress);}return Mono.just(remoteAddress);});this.reconnectionHandler = new ReconnectionHandler(clientOptions, bootstrap, wrappedSocketAddressSupplier, timer,reconnectWorkers, connectionFacade);resetReconnectDelay();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {logger.debug("{} userEventTriggered(ctx, {})", logPrefix(), evt);if (evt instanceof ConnectionEvents.Activated) {attempts = 0;resetReconnectDelay();}super.userEventTriggered(ctx, evt);}void prepareClose() {setListenOnChannelInactive(false);setReconnectSuspended(true);Timeout reconnectScheduleTimeout = this.reconnectScheduleTimeout;if (reconnectScheduleTimeout != null && !reconnectScheduleTimeout.isCancelled()) {reconnectScheduleTimeout.cancel();}reconnectionHandler.prepareClose();}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {reconnectSchedulerSync.set(false);channel = ctx.channel();reconnectScheduleTimeout = null;logPrefix = null;remoteAddress = channel.remoteAddress();logPrefix = null;logger.debug("{} channelActive()", logPrefix());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.debug("{} channelInactive()", logPrefix());if (!armed) {logger.debug("{} ConnectionWatchdog not armed", logPrefix());return;}channel = null;if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {scheduleReconnect();} else {logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);}super.channelInactive(ctx);}/*** Enable {@link ConnectionWatchdog} to listen for disconnected events.*/void arm() {this.armed = true;setListenOnChannelInactive(true);}/*** Schedule reconnect if channel is not available/not active.*/public void scheduleReconnect() {logger.debug("{} scheduleReconnect()", logPrefix());if (!isEventLoopGroupActive()) {logger.debug("isEventLoopGroupActive() == false");return;}if (!isListenOnChannelInactive()) {logger.debug("Skip reconnect scheduling, listener disabled");return;}if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {attempts++;final int attempt = attempts;int timeout = (int) reconnectDelay.createDelay(attempt).toMillis();logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);this.reconnectScheduleTimeout = timer.newTimeout(it -> {reconnectScheduleTimeout = null;if (!isEventLoopGroupActive()) {logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated");return;}reconnectWorkers.submit(() -> {ConnectionWatchdog.this.run(attempt);return null;});}, timeout, TimeUnit.MILLISECONDS);// Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment.if (!reconnectSchedulerSync.get()) {reconnectScheduleTimeout = null;}} else {logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());}}/*** Reconnect to the remote address that the closed channel was connected to. This creates a new {@link ChannelPipeline} with* the same handler instances contained in the old channel's pipeline.** @param attempt attempt counter** @throws Exception when reconnection fails.*/public void run(int attempt) throws Exception {reconnectSchedulerSync.set(false);reconnectScheduleTimeout = null;if (!isEventLoopGroupActive()) {logger.debug("isEventLoopGroupActive() == false");return;}if (!isListenOnChannelInactive()) {logger.debug("Skip reconnect scheduling, listener disabled");return;}if (isReconnectSuspended()) {logger.debug("Skip reconnect scheduling, reconnect is suspended");return;}boolean shouldLog = shouldLog();InternalLogLevel infoLevel = InternalLogLevel.INFO;InternalLogLevel warnLevel = InternalLogLevel.WARN;if (shouldLog) {lastReconnectionLogging = System.currentTimeMillis();} else {warnLevel = InternalLogLevel.DEBUG;infoLevel = InternalLogLevel.DEBUG;}InternalLogLevel warnLevelToUse = warnLevel;try {reconnectionListener.onReconnectAttempt(new ConnectionEvents.Reconnect(attempt));logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress);Tuple2<CompletableFuture<Channel>, CompletableFuture<SocketAddress>> tuple = reconnectionHandler.reconnect();CompletableFuture<Channel> future = tuple.getT1();future.whenComplete((c, t) -> {if (c != null && t == null) {return;}CompletableFuture<SocketAddress> remoteAddressFuture = tuple.getT2();SocketAddress remote = remoteAddress;if (remoteAddressFuture.isDone() && !remoteAddressFuture.isCompletedExceptionally()&& !remoteAddressFuture.isCancelled()) {remote = remoteAddressFuture.join();}String message = String.format("Cannot reconnect to [%s]: %s", remote,t.getMessage() != null ? t.getMessage() : t.toString());if (ReconnectionHandler.isExecutionException(t)) {if (logger.isDebugEnabled()) {logger.debug(message, t);} else {logger.log(warnLevelToUse, message);}} else {logger.log(warnLevelToUse, message, t);}eventBus.publish(new ReconnectFailedEvent(LocalAddress.ANY, remote, t, attempt));if (!isReconnectSuspended()) {scheduleReconnect();}});} catch (Exception e) {logger.log(warnLevel, "Cannot reconnect: {}", e.toString());eventBus.publish(new ReconnectFailedEvent(LocalAddress.ANY, remoteAddress, e, attempt));}}private boolean isEventLoopGroupActive() {if (!isEventLoopGroupActive(bootstrap.group()) || !isEventLoopGroupActive(reconnectWorkers)) {return false;}return true;}private static boolean isEventLoopGroupActive(EventExecutorGroup executorService) {return !(executorService.isShuttingDown());}private boolean shouldLog() {long quietUntil = lastReconnectionLogging + LOGGING_QUIET_TIME_MS;return quietUntil <= System.currentTimeMillis();}/*** Enable event listener for disconnected events.** @param listenOnChannelInactive {@literal true} to listen for disconnected events.*/public void setListenOnChannelInactive(boolean listenOnChannelInactive) {this.listenOnChannelInactive = listenOnChannelInactive;}public boolean isListenOnChannelInactive() {return listenOnChannelInactive;}/*** Suspend reconnection temporarily. Reconnect suspension will interrupt reconnection attempts.** @param reconnectSuspended {@literal true} to suspend reconnection*/public void setReconnectSuspended(boolean reconnectSuspended) {reconnectionHandler.setReconnectSuspended(reconnectSuspended);}public boolean isReconnectSuspended() {return reconnectionHandler.isReconnectSuspended();}ReconnectionHandler getReconnectionHandler() {return reconnectionHandler;}private void resetReconnectDelay() {if (reconnectDelay instanceof StatefulDelay) {((StatefulDelay) reconnectDelay).reset();}}private String logPrefix() {if (logPrefix != null) {return logPrefix;}String buffer = "[" + ChannelLogDescriptor.logDescriptor(channel) + ", last known addr=" + remoteAddress + ']';return logPrefix = buffer;}
}

##redis lettuce重新连接代码

  1. 重连处理 (reconnectreconnect0)

    • reconnect() 方法会尝试重新连接 Redis 服务器。当远程地址发生变化时,它会尝试通过 socketAddressSupplier 获取新的地址并发起连接。
    • reconnect0() 方法执行实际的重连逻辑。它通过 Netty 的 bootstrap.connect(remoteAddress) 发起连接,并通过 ChannelFuture 来管理连接的异步状态。
    • 如果连接失败,会通过 ChannelFutureListener 监听连接结果,执行相关的失败处理(如关闭通道、记录异常等)。
  2. 连接逻辑(Netty 的 bootstrap.connect()

    • 您的代码中也展示了与 Netty 的连接过程密切相关的部分,特别是 ChannelFutureChannelPromise 的使用,这些都是 Netty 中用于异步连接、处理连接结果的关键工具。
    • bootstrap.connect(remoteAddress) 是用来发起连接的核心方法。它返回一个 ChannelFuture,通过这个 ChannelFuture 可以监听连接成功与否的结果。
  3. 连接超时处理

    • 在重连的过程中,代码实现了一个超时机制(TimeoutException)。如果重连操作超时,它会取消连接操作,并触发异常。
    • 通过 eventLoop().schedule() 来设定连接超时。
  4. Channel 的初始化与配置

    • 在连接成功之后,会通过 RedisChannelInitializer 初始化通道的处理流水线(ChannelPipeline)。如果初始化失败,会进行相应的失败处理,包括重置连接、关闭连接等。
    • 如果连接成功,则会执行一些调试输出和状态更新。
  5. 错误处理与异常捕获

    • 对于连接过程中的各种异常(如 DNS 解析失败、连接失败等),代码进行了详细的异常捕获和处理。在 reconnect0 中,如果连接失败或初始化失败,都会通过 completeExceptionally() 完成 CompletableFuture,确保连接错误能够被外部捕获。
  6. 使用 CompletableFuture

    • 重连操作通过 CompletableFuture 来管理异步结果。CompletableFuture<Channel> 用来表示连接是否成功,CompletableFuture<SocketAddress> 用来表示地址解析的结果。

关键部分的 Netty 连接代码:

  • 连接过程中的异步操作:

    • bootstrap.connect(remoteAddress) 返回一个 ChannelFuture,表示异步连接操作,addListener() 用来监听连接的结果。
  • 异常处理与重试机制:

    • 如果连接失败,代码会尝试关闭连接并报告异常。如果连接成功,则会初始化相关的 ChannelPipeline,并进行后续的操作。
  • 超时处理:

    • TimeoutException 用来在连接超时后进行错误处理。

##网络连接源码如下

Tuple2<CompletableFuture<Channel>, CompletableFuture<SocketAddress>> tuple = reconnectionHandler.reconnect(); 

protected Tuple2<CompletableFuture<Channel>, CompletableFuture<SocketAddress>> reconnect() {CompletableFuture<Channel> future = new CompletableFuture<>();CompletableFuture<SocketAddress> address = new CompletableFuture<>();socketAddressSupplier.subscribe(remoteAddress -> {address.complete(remoteAddress);if (future.isCancelled()) {return;}reconnect0(future, remoteAddress);}, ex -> {if (!address.isDone()) {address.completeExceptionally(ex);}future.completeExceptionally(ex);});this.currentFuture = future;return Tuples.of(future, address);}

##reconnect0重连

private void reconnect0(CompletableFuture<Channel> result, SocketAddress remoteAddress) {ChannelFuture connectFuture = bootstrap.connect(remoteAddress);ChannelPromise initFuture = connectFuture.channel().newPromise();logger.debug("Reconnecting to Redis at {}", remoteAddress);result.whenComplete((c, t) -> {if (t instanceof CancellationException) {connectFuture.cancel(true);initFuture.cancel(true);}});initFuture.addListener((ChannelFuture it) -> {if (it.cause() != null) {connectFuture.cancel(true);close(it.channel());result.completeExceptionally(it.cause());} else {result.complete(connectFuture.channel());}});connectFuture.addListener((ChannelFuture it) -> {if (it.cause() != null) {initFuture.tryFailure(it.cause());return;}ChannelPipeline pipeline = it.channel().pipeline();RedisChannelInitializer channelInitializer = pipeline.get(RedisChannelInitializer.class);if (channelInitializer == null) {initFuture.tryFailure(new IllegalStateException("Reconnection attempt without a RedisChannelInitializer in the channel pipeline"));return;}channelInitializer.channelInitialized().whenComplete((state, throwable) -> {if (throwable != null) {if (isExecutionException(throwable)) {initFuture.tryFailure(throwable);return;}if (clientOptions.isCancelCommandsOnReconnectFailure()) {connectionFacade.reset();}if (clientOptions.isSuspendReconnectOnProtocolFailure()) {logger.error("Disabling autoReconnect due to initialization failure", throwable);setReconnectSuspended(true);}initFuture.tryFailure(throwable);return;}if (logger.isDebugEnabled()) {logger.info("Reconnected to {}, Channel {}", remoteAddress,ChannelLogDescriptor.logDescriptor(it.channel()));} else {logger.info("Reconnected to {}", remoteAddress);}initFuture.trySuccess();});});Runnable timeoutAction = () -> {initFuture.tryFailure(new TimeoutException(String.format("Reconnection attempt exceeded timeout of %d %s ",timeout, timeoutUnit)));};Timeout timeoutHandle = timer.newTimeout(it -> {if (connectFuture.isDone() && initFuture.isDone()) {return;}if (reconnectWorkers.isShutdown()) {timeoutAction.run();return;}reconnectWorkers.submit(timeoutAction);}, this.timeout, timeoutUnit);initFuture.addListener(it -> timeoutHandle.cancel());}

##netty Bootstrap网络连接

public ChannelFuture connect(SocketAddress remoteAddress) {ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");validate();return doResolveAndConnect(remoteAddress, config.localAddress());}
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.isDone()) {if (!regFuture.isSuccess()) {return regFuture;}return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {// Directly obtain the cause and do a null check so we only need one volatile read in case of a// failure.Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();doResolveAndConnect0(channel, remoteAddress, localAddress, promise);}}});return promise;}}

##

private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,final SocketAddress localAddress, final ChannelPromise promise) {try {final EventLoop eventLoop = channel.eventLoop();final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {// Resolver has no idea about what to do with the specified remote address or it's resolved already.doConnect(remoteAddress, localAddress, promise);return promise;}final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);if (resolveFuture.isDone()) {final Throwable resolveFailureCause = resolveFuture.cause();if (resolveFailureCause != null) {// Failed to resolve immediatelychannel.close();promise.setFailure(resolveFailureCause);} else {// Succeeded to resolve immediately; cached? (or did a blocking lookup)doConnect(resolveFuture.getNow(), localAddress, promise);}return promise;}// Wait until the name resolution is finished.resolveFuture.addListener(new FutureListener<SocketAddress>() {@Overridepublic void operationComplete(Future<SocketAddress> future) throws Exception {if (future.cause() != null) {channel.close();promise.setFailure(future.cause());} else {doConnect(future.getNow(), localAddress, promise);}}});} catch (Throwable cause) {promise.tryFailure(cause);}return promise;}

##

private static void doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.final Channel channel = connectPromise.channel();channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (localAddress == null) {channel.connect(remoteAddress, connectPromise);} else {channel.connect(remoteAddress, localAddress, connectPromise);}connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}});}

##

@Overridepublic final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {return tail.connect(remoteAddress, promise);}

##

@Overridepublic ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {if (remoteAddress == null) {throw new NullPointerException("remoteAddress");}if (isNotValidPromise(promise, false)) {// cancelledreturn promise;}final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeConnect(remoteAddress, localAddress, promise);} else {safeExecute(executor, new Runnable() {@Overridepublic void run() {next.invokeConnect(remoteAddress, localAddress, promise);}}, promise, null);}return promise;}

##

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {if (invokeHandler()) {try {((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}} else {connect(remoteAddress, localAddress, promise);}}

##

@Overridepublic void connect(ChannelHandlerContext ctx,SocketAddress remoteAddress, SocketAddress localAddress,ChannelPromise promise) {unsafe.connect(remoteAddress, localAddress, promise);}

##

@Overridepublic final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}try {if (connectPromise != null) {// Already a connect in process.throw new ConnectionPendingException();}boolean wasActive = isActive();if (doConnect(remoteAddress, localAddress)) {fulfillConnectPromise(promise, wasActive);} else {connectPromise = promise;requestedRemoteAddress = remoteAddress;// Schedule connect timeout.int connectTimeoutMillis = config().getConnectTimeoutMillis();if (connectTimeoutMillis > 0) {connectTimeoutFuture = eventLoop().schedule(new Runnable() {@Overridepublic void run() {ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;ConnectTimeoutException cause =new ConnectTimeoutException("connection timed out: " + remoteAddress);if (connectPromise != null && connectPromise.tryFailure(cause)) {close(voidPromise());}}}, connectTimeoutMillis, TimeUnit.MILLISECONDS);}promise.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isCancelled()) {if (connectTimeoutFuture != null) {connectTimeoutFuture.cancel(false);}connectPromise = null;close(voidPromise());}}});}} catch (Throwable t) {promise.tryFailure(annotateConnectException(t, remoteAddress));closeIfClosed();}}

##socket连接工具

@Overrideprotected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {if (localAddress != null) {doBind0(localAddress);}boolean success = false;try {boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);if (!connected) {selectionKey().interestOps(SelectionKey.OP_CONNECT);}success = true;return connected;} finally {if (!success) {doClose();}}}

##socketChannel连接远程地址 socketChannel.connect(remoteAddress)

public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)throws IOException {try {return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {@Overridepublic Boolean run() throws IOException {return socketChannel.connect(remoteAddress);}});} catch (PrivilegedActionException e) {throw (IOException) e.getCause();}}

相关文章:

redis 使用Lettuce 当redis挂掉重启之后 网络是怎么重新连接

Lettuce是一个高性能的Java Redis客户端&#xff0c;支持同步、异步和反应式编程模式 Lettuce的核心功能包括&#xff1a; ‌高性能‌&#xff1a;通过使用Netty作为底层网络通信框架&#xff0c;实现了非阻塞IO&#xff0c;提高了性能。‌丰富的API‌&#xff1a;提供了丰富…...

【IntelliJ IDEA 集成工具】TalkX - AI编程助手

前言 在数字化时代&#xff0c;技术的迅猛发展给软件开发者带来了更多的挑战和机遇。为了提高技术开发群体在繁多项目中的编码效率和质量&#xff0c;他们需要一个强大而专业的工具来辅助开发过程&#xff0c;而正是为了满足这一需求&#xff0c;TalkX 应运而生。 一、概述 1…...

二叉搜索树Ⅲ【东北大学oj数据结构8-3】C++

二叉搜索树 III B&#xff1a;在二叉搜索树II中加入delete指令&#xff0c;创建程序对二叉搜索树T执行如下指令。 插入 k&#xff1a;将key k 插入到 T 中。 find k&#xff1a;报告T中是否存在key k。 delete k&#xff1a;删除key为 k 的节点。 打印&#xff1a;使用中序树遍…...

【面试笔记】CPU 缓存机制

CPU 缓存机制 1. CPU Cache 与 MMU1.1 MMU 是什么&#xff1f;TLB 又是什么&#xff1f;他们是怎么工作的&#xff1f;2.2 简述 Cache 与 MMU 的协作关系&#xff1f;2.3 简述 Cache 与 MMU 的协作工作流程&#xff1f; 2. CPU 多层次缓存2.1 什么是 CPU 的多层次缓存结构&…...

MySQL基础函数使用

目录 简介 1. 单行函数 1.1 字符串函数 1.2 日期函数 1.3 数值函数 1.4 转换函数 1.5 其他函数 2. 多行函数 示例&#xff1a; 3. 数据分组 示例&#xff1a; 4. DQL单表关键字执行顺序 示例&#xff1a; 5. 多表查询 示例&#xff1a; 6. 表与表的外连接 示例…...

解决docker环境下aspose-words转换word成pdf后乱码问题

描述 环境&#xff1a;docker 部署工具&#xff1a;Jenkins 需求&#xff1a;本地上传的word文档需要转换成pdf 问题&#xff1a;转换之后的pdf文档出现小框框&#xff08;乱码&#xff09; 转换成PDF的操作 pom&#xff1a; <dependency><groupId>org.apach…...

C# 生成随机数的方法

C# 提供了一种强大而方便的工具类 Random &#xff0c;用于生成随机数。这里将分类讨论如何通过 C# 实现随机数生成&#xff0c;以及应用于实际情况中的一些具体方案。 一、Random 类概述 Random 类表示一个伪随机数生成器&#xff0c;用于生成满足随机性统计要求的数字序列。…...

ip_done

文章目录 路由结论 IP分片 数据链路层重谈Mac地址MAC帧报头局域网的通信原理MSS&#xff0c;以及MAC帧对上层的影响ARP协议 1.公司是不是这样呢? 类似的要给运营商交钱&#xff0c;构建公司的子网&#xff0c;具有公司级别的入口路由器 2&#xff0e;为什么要这样呢?? IP地…...

3D可视化引擎HOOPS Visualize与HOOPS Luminate Bridge的功能与应用

HOOPS Visualize HPS / HOOPS Luminate Bridge为开发者提供了强大的工具&#xff0c;用于在CAD应用中集成逼真的渲染能力。本文旨在梳理该桥接产品的核心功能、使用方法及应用场景&#xff0c;为用户快速上手并充分利用产品特性提供指导。 桥接产品的核心功能概述 HOOPS Lumi…...

Docder 搭建Redis分片集群 散片插槽 数据分片 故障转移 Java连接

介绍 使多个 Redis 实例共同工作&#xff0c;实现数据的水平扩展。通过将数据分片到多个节点上&#xff0c;Redis 集群能够在不牺牲性能的前提下扩展存储容量和处理能力&#xff0c;从而支持更高并发的请求。Redis 集群不仅支持数据分片&#xff0c;还提供了自动故障转移和高可…...

校园交友app/校园资源共享小程序/校园圈子集合二手物品交易论坛、交友等综合型生活服务社交论坛

多客校园社交圈子系统搭建 校园交友多功能系统源码: 1、更改学校为独立的模块。整体UI改为绿色&#xff0c;青春色&#xff0c;更贴近校园风格。2、圈子归纳到学校去进行运营。每个学校可建立多个圈子。和其他学校圈子互不干扰。3、增加用户绑定学校&#xff0c;以后进入将默认…...

Chaos Mesh云原生的混沌测试平台搭建

Chaos Mesh云原生的混沌测试平台搭建 一.环境准备 ​ 确认已经安装helm&#xff0c;如要查看 Helm 是否已经安装&#xff0c;请执行如下命令&#xff1a; helm version二.使用helm安装 1.添加 Chaos Mesh 仓库 ​ 在 Helm 仓库中添加 Chaos Mesh 仓库&#xff1a; helm re…...

Vue3之组合式API详解

Vue 3引入了一种新的API风格——组合式API&#xff08;Composition API&#xff09;&#xff0c;旨在提升组件的逻辑复用性和可维护性。本文将详细阐述Vue 3中的组合式API&#xff0c;包括其定义、特点、使用场景、优势等&#xff0c;并给出具体的示例代码。 一、定义 组合式…...

大模型的构建与部署(3)——数据标注

版权声明 本文原创作者:谷哥的小弟作者博客地址:http://blog.csdn.net/lfdfhl1. 数据标注的重要性 1.1 增强数据可解释性 数据标注通过为原始数据添加标签或注释,显著增强了数据的可解释性。在机器学习和深度学习领域,模型的训练依赖于大量带标签的数据。这些标签不仅帮助…...

AI发展与LabVIEW程序员就业

人工智能&#xff08;AI&#xff09;技术的快速发展确实对许多行业带来了变革&#xff0c;包括自动化、数据分析、软件开发等领域。对于LabVIEW程序员来说&#xff0c;AI的崛起确实引发了一个值得关注的问题&#xff1a;AI会不会取代他们的工作&#xff0c;导致大量失业&#x…...

本地事务 + 消息队列事务方案设计

Spring Boot 和 RocketMQ 在Spring Boot项目中实现“本地事务 消息队列事务”的方案&#xff0c;可以按照以下步骤实现&#xff1a; 先执行MySQL本地事务操作&#xff08;未提交&#xff09;随后发送消息到消息队列&#xff08;如RocketMQ事务消息&#xff09;等待消息队列确…...

pinctrl子系统学习笔记

一、背景 cpu的gpio引脚可以复用成多个功能&#xff0c;如可以配置成I2C或者普通GPIO模式。配置方式一般是通过写引脚复用的配置寄存器&#xff0c;但是不同芯片厂商配置寄存器格式内容各不相同&#xff0c;设置引脚复用无法做到通用且自由的配置&#xff0c;只能在启动初始化…...

使用vue-element 的计数器inputNumber,传第三个参数

使用vue-element 的计数器inputNumber。 其中的change 事件中&#xff0c;默认自带两个参数&#xff0c;currentValue和oldValue&#xff0c;分别代表改变后的数和改变前的数&#xff0c; 如果想要传第三个参数&#xff0c; change"(currentValue, oldValue) > numCha…...

如何从0构建一个flask项目,直接上实操!!!

项目结构 首先&#xff0c;创建一个项目目录&#xff0c;结构如下&#xff1a; flask_app/ │ ├── app.py # Flask 应用代码 ├── static/ # 存放静态文件&#xff08;如CSS、JS、图片等&#xff09; │ └── style.css # 示例…...

Mongoose连接数据库操作实践

文章目录 介绍特点&#xff1a;Mongoose 使用&#xff1a;创建项目并安装&#xff1a;连接到 MongoDB&#xff1a;定义 Schema&#xff1a;创建模型并操作数据库&#xff1a;创建文档&#xff1a;查询文档&#xff1a;更新文档&#xff1a;删除文档&#xff1a;使用钩子&#x…...

centos 7.9 freeswitch1.10.9环境搭建

亲测版本centos 7.9系统–》 freeswitch1.10.9 一、下载插件 yum install -y git alsa-lib-devel autoconf automake bison broadvoice-devel bzip2 curl-devel libdb4-devel e2fsprogs-devel erlang flite-devel g722_1-devel gcc-c++ gdbm-devel gnutls-devel ilbc2...

Gitlab服务管理和仓库项目权限管理

Gitlab服务管理 gitlab-ctl start # 启动所有 gitlab 组件&#xff1b; gitlab-ctl stop # 停止所有 gitlab 组件&#xff1b; gitlab-ctl restart # 重启所有 gitlab 组件&#xff1b; gitlab-ctl status …...

LLMs之Llama-3:Llama-3.3的简介、安装和使用方法、案例应用之详细攻略

LLMs之Llama-3&#xff1a;Llama-3.3的简介、安装和使用方法、案例应用之详细攻略 目录 相关文章 LLMs之LLaMA&#xff1a;LLaMA的简介、安装和使用方法、案例应用之详细攻略 LLMs之LLaMA-2&#xff1a;LLaMA 2的简介(技术细节)、安装、使用方法(开源-免费用于研究和商业用途…...

OpenCV函数及其应用

1. 梯度处理的Sobel算子函数 功能 Sobel算子是一种用于边缘检测的离散微分算子&#xff0c;它结合了高斯平滑和微分求导&#xff0c;用于计算图像亮度的空间梯度。 参数 src&#xff1a;输入图像。 dst&#xff1a;输出图像。 ddepth&#xff1a;输出图像的深度。 dx&#xff…...

vulnhub靶场【DriftingBlues】之3

前言 靶机&#xff1a;DriftingBlues-3&#xff0c;IP地址192.168.1.60 攻击&#xff1a;kali&#xff0c;IP地址192.168.1.16 都采用虚拟机&#xff0c;网卡为桥接模式 主机发现 使用arp-scan -l或netdiscover -r 192.168.1.1/24 信息收集 使用nmap扫描端口 网站探测 访…...

文件上传—阿里云OSS对象存储

目录 一、OSS简介 二、OSS基本使用 1. 注册账号 2. 基本配置 (1) 开通OSS (2) 创建存储空间 (3) 修改权限 (4) 配置完成&#xff0c;上传一张图片&#xff0c;检验是否成功。 (5) 创建AccessKey 三、Java项目集成OSS 1. 导入依赖 2. Result.java代码&#xff1a; …...

mybatis-plus超详细讲解

mybatis-plus &#xff08;简化代码神器&#xff09; 地址&#xff1a;https://mp.baomidou.com/ 目录 mybatis-plus 简介 特性 支持数据库 参与贡献 快速指南 1、创建数据库 mybatis_plus 2、导入相关的依赖 3、创建对应的文件夹 4、编写配置文件 5、编写代码 …...

【Linux】--- 进程的概念

【Linux】--- 进程的概念 一、进程概念二、PCB1.什么是PCB2.什么是task_struct&#xff08;重点&#xff01;&#xff09;3.task_struct包含内容 三、task_struct内容详解1.查看进程&#xff08;1&#xff09;通过系统目录查看&#xff08;2&#xff09;通过ps命令查看&#xf…...

Unity NTPComponent应用, 实现一个无后端高效获取网络时间的组件

无后端高效获取网络时间的组件 废话不多说&#xff0c;直接上源码m_NowSerivceTime 一个基于你发行游戏地区的时间偏移&#xff0c; 比如北京时区就是 8, 巴西就是-3&#xff0c;美国就是-5using Newtonsoft.Json; 如果这里报错&#xff0c; 就说明项目没有 NewtonsoftJson插件…...

go语言使用zlib压缩[]byte

在Go语言中&#xff0c;可以使用compress/flate和compress/zlib包来实现对[]byte数据的Zlib压缩。下面是一个简单的示例&#xff0c;展示如何使用这些包来压缩一个字节切片&#xff1a; go package main import ( "bytes" "compress/zlib" "fmt"…...