redis 使用Lettuce 当redis挂掉重启之后 网络是怎么重新连接
Lettuce是一个高性能的Java Redis客户端,支持同步、异步和反应式编程模式
Lettuce的核心功能包括:
- 高性能:通过使用Netty作为底层网络通信框架,实现了非阻塞IO,提高了性能。
- 丰富的API:提供了丰富的Redis命令API,支持多种Redis数据类型和操作。
- 高级特性:支持命令批处理、事务、发布订阅等功能,并且可以适应不同的Redis数据类型和应用场景。
- 灵活性:支持多种Redis序列化器和编解码器,方便在不同场景下使用。
Lettuce的这些特性使得它成为了一个受欢迎的Redis客户端,广泛应用于各种需要高性能Redis交互的场景中。
Lettuce使用了Connection Watchdog(连接看门狗),用于管理和监控与远程服务器的连接。在网络通信中,Channel
代表了与远程服务的连接,当连接丢失或关闭时,Connection Watchdog 会自动尝试重新连接。它并不直接依赖于 channelActive()
来实现自动重连,而是负责在连接丢失时主动检测并安排重新连接的任务。
主要作用
-
监控连接状态:
ConnectionWatchdog
继承自ChannelInboundHandlerAdapter
,实现了 Netty 的ChannelHandler
接口。它在channelActive()
和channelInactive()
事件中插入了额外的逻辑。- 当连接激活时(
channelActive()
),它会初始化连接,清除之前的重连状态。 - 当连接关闭时(
channelInactive()
),它会检测连接是否已经关闭,并尝试重新连接。
-
自动重连:
- 自动调度重连:如果连接丢失,
ConnectionWatchdog
会在适当的时间间隔后安排一个新的重连尝试。 - 重连机制:它会基于预定义的重连延迟、尝试次数和其他条件,调度新的重连任务。重连操作是异步执行的,使用
reconnectWorkers
线程池来处理。 - 延迟处理:重连延迟使用
Delay
和StatefulDelay
管理,以确保每次重连尝试之间有适当的间隔,防止过于频繁的重连尝试。
- 自动调度重连:如果连接丢失,
-
连接恢复:
- 如果
channelInactive()
事件触发(即连接丢失),ConnectionWatchdog
会在重连条件满足时重新启动连接。它通过reconnectionHandler.reconnect()
来尝试重新建立连接。 - 重连失败事件:如果重连尝试失败,它会触发
ReconnectFailedEvent
事件,将失败信息发布到事件总线eventBus
,供其他组件处理。
- 如果
-
支持可配置的重连逻辑:
ConnectionWatchdog
提供了多种配置项来控制重连行为,如:- 重连延迟:使用
Delay
来管理每次重连之间的延迟。 - 重连调度:在连接丢失时,自动触发重连,且支持延迟、间隔等参数。
- 重连暂停:通过
setReconnectSuspended(true)
方法可以暂停重连尝试,避免在某些情况下自动重连。
- 重连延迟:使用
关键方法
-
channelActive(ChannelHandlerContext ctx)
:- 这个方法在
Channel
激活时调用。当连接建立成功时,会被触发。它会初始化一些内部状态,并清除之前的重连调度。
- 这个方法在
-
channelInactive(ChannelHandlerContext ctx)
:- 当
Channel
关闭时调用。如果连接丢失,ConnectionWatchdog
会根据当前配置来判断是否需要进行重连。 - 如果启用了重连监听(
listenOnChannelInactive
为true
),并且重连没有被暂停,它会调用scheduleReconnect()
来触发重连。
- 当
-
scheduleReconnect()
:- 用于调度下一次的重连尝试。它会检查当前连接是否有效,如果没有有效的连接(即连接丢失),则会安排在适当的延迟后尝试重新连接。
- 这个方法会使用
reconnectDelay
来计算每次重连之间的延迟时间。 - 重连尝试会通过
reconnectionHandler.reconnect()
来实际执行重连逻辑。
-
run(int attempt)
:- 这是执行实际重连的代码。如果
scheduleReconnect()
被调用,run()
会尝试重新建立连接。 - 如果重连成功,它会停止重连操作;如果失败,则发布
ReconnectFailedEvent
事件,并根据情况决定是否继续重连。
- 这是执行实际重连的代码。如果
为什么 channelActive()
不会自动重连?
在 Netty
中,channelActive()
只是一个通道激活事件。当一个连接成功建立时,channelActive()
会被触发,通常表示连接已经准备好进行数据传输。然而,channelActive()
事件本身并不处理连接丢失后的自动重连。
ConnectionWatchdog
的作用就是在连接丢失或关闭时自动安排重连任务。- 自动重连的原因:因为一旦连接丢失,
channelInactive()
会被触发。ConnectionWatchdog
会在channelInactive()
中判断是否启用重连逻辑,然后调度一个新的重连任务,确保在连接失败后能够尝试重新连接。 channelActive()
只关心通道的初始化,不能保证在通道关闭或掉线后自动恢复连接。ConnectionWatchdog
负责在通道断开后,通过一定的重连策略来确保连接恢复。
总结
ConnectionWatchdog
的作用是:
- 监控连接的生命周期,当连接丢失时触发重连。
- 管理重连过程,通过延迟和重连尝试次数来合理安排重连。
- 确保连接恢复,并在重连失败时通过事件总线通知其他组件。
因此,channelActive()
只是连接建立时的一个简单事件,而 ConnectionWatchdog
是负责监控连接丢失后自动重连的核心组件。
##看门狗源码
/** Copyright 2011-2019 the original author or authors.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package io.lettuce.core.protocol;import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionEvents;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.event.connection.ReconnectFailedEvent;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.resource.Delay;
import io.lettuce.core.resource.Delay.StatefulDelay;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;/*** A netty {@link ChannelHandler} responsible for monitoring the channel and reconnecting when the connection is lost.** @author Will Glozer* @author Mark Paluch* @author Koji Lin*/
@ChannelHandler.Sharable
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {private static final long LOGGING_QUIET_TIME_MS = TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS);private static final InternalLogger logger = InternalLoggerFactory.getInstance(ConnectionWatchdog.class);private final Delay reconnectDelay;private final Bootstrap bootstrap;private final EventExecutorGroup reconnectWorkers;private final ReconnectionHandler reconnectionHandler;private final ReconnectionListener reconnectionListener;private final Timer timer;private final EventBus eventBus;private Channel channel;private SocketAddress remoteAddress;private long lastReconnectionLogging = -1;private String logPrefix;private final AtomicBoolean reconnectSchedulerSync;private volatile int attempts;private volatile boolean armed;private volatile boolean listenOnChannelInactive;private volatile Timeout reconnectScheduleTimeout;/*** Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup} and establishes a new* {@link Channel} when disconnected, while reconnect is true. The socketAddressSupplier can supply the reconnect address.** @param reconnectDelay reconnect delay, must not be {@literal null}* @param clientOptions client options for the current connection, must not be {@literal null}* @param bootstrap Configuration for new channels, must not be {@literal null}* @param timer Timer used for delayed reconnect, must not be {@literal null}* @param reconnectWorkers executor group for reconnect tasks, must not be {@literal null}* @param socketAddressSupplier the socket address supplier to obtain an address for reconnection, may be {@literal null}* @param reconnectionListener the reconnection listener, must not be {@literal null}* @param connectionFacade the connection facade, must not be {@literal null}* @param eventBus Event bus to emit reconnect events.*/public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Bootstrap bootstrap, Timer timer,EventExecutorGroup reconnectWorkers, Mono<SocketAddress> socketAddressSupplier,ReconnectionListener reconnectionListener, ConnectionFacade connectionFacade, EventBus eventBus) {LettuceAssert.notNull(reconnectDelay, "Delay must not be null");LettuceAssert.notNull(clientOptions, "ClientOptions must not be null");LettuceAssert.notNull(bootstrap, "Bootstrap must not be null");LettuceAssert.notNull(timer, "Timer must not be null");LettuceAssert.notNull(reconnectWorkers, "ReconnectWorkers must not be null");LettuceAssert.notNull(socketAddressSupplier, "SocketAddressSupplier must not be null");LettuceAssert.notNull(reconnectionListener, "ReconnectionListener must not be null");LettuceAssert.notNull(connectionFacade, "ConnectionFacade must not be null");LettuceAssert.notNull(eventBus, "EventBus must not be null");this.reconnectDelay = reconnectDelay;this.bootstrap = bootstrap;this.timer = timer;this.reconnectWorkers = reconnectWorkers;this.reconnectionListener = reconnectionListener;this.reconnectSchedulerSync = new AtomicBoolean(false);this.eventBus = eventBus;Mono<SocketAddress> wrappedSocketAddressSupplier = socketAddressSupplier.doOnNext(addr -> remoteAddress = addr).onErrorResume(t -> {if (logger.isDebugEnabled()) {logger.warn("Cannot retrieve current address from socketAddressSupplier: " + t.toString()+ ", reusing cached address " + remoteAddress, t);} else {logger.warn("Cannot retrieve current address from socketAddressSupplier: " + t.toString()+ ", reusing cached address " + remoteAddress);}return Mono.just(remoteAddress);});this.reconnectionHandler = new ReconnectionHandler(clientOptions, bootstrap, wrappedSocketAddressSupplier, timer,reconnectWorkers, connectionFacade);resetReconnectDelay();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {logger.debug("{} userEventTriggered(ctx, {})", logPrefix(), evt);if (evt instanceof ConnectionEvents.Activated) {attempts = 0;resetReconnectDelay();}super.userEventTriggered(ctx, evt);}void prepareClose() {setListenOnChannelInactive(false);setReconnectSuspended(true);Timeout reconnectScheduleTimeout = this.reconnectScheduleTimeout;if (reconnectScheduleTimeout != null && !reconnectScheduleTimeout.isCancelled()) {reconnectScheduleTimeout.cancel();}reconnectionHandler.prepareClose();}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {reconnectSchedulerSync.set(false);channel = ctx.channel();reconnectScheduleTimeout = null;logPrefix = null;remoteAddress = channel.remoteAddress();logPrefix = null;logger.debug("{} channelActive()", logPrefix());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.debug("{} channelInactive()", logPrefix());if (!armed) {logger.debug("{} ConnectionWatchdog not armed", logPrefix());return;}channel = null;if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {scheduleReconnect();} else {logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);}super.channelInactive(ctx);}/*** Enable {@link ConnectionWatchdog} to listen for disconnected events.*/void arm() {this.armed = true;setListenOnChannelInactive(true);}/*** Schedule reconnect if channel is not available/not active.*/public void scheduleReconnect() {logger.debug("{} scheduleReconnect()", logPrefix());if (!isEventLoopGroupActive()) {logger.debug("isEventLoopGroupActive() == false");return;}if (!isListenOnChannelInactive()) {logger.debug("Skip reconnect scheduling, listener disabled");return;}if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {attempts++;final int attempt = attempts;int timeout = (int) reconnectDelay.createDelay(attempt).toMillis();logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);this.reconnectScheduleTimeout = timer.newTimeout(it -> {reconnectScheduleTimeout = null;if (!isEventLoopGroupActive()) {logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated");return;}reconnectWorkers.submit(() -> {ConnectionWatchdog.this.run(attempt);return null;});}, timeout, TimeUnit.MILLISECONDS);// Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment.if (!reconnectSchedulerSync.get()) {reconnectScheduleTimeout = null;}} else {logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());}}/*** Reconnect to the remote address that the closed channel was connected to. This creates a new {@link ChannelPipeline} with* the same handler instances contained in the old channel's pipeline.** @param attempt attempt counter** @throws Exception when reconnection fails.*/public void run(int attempt) throws Exception {reconnectSchedulerSync.set(false);reconnectScheduleTimeout = null;if (!isEventLoopGroupActive()) {logger.debug("isEventLoopGroupActive() == false");return;}if (!isListenOnChannelInactive()) {logger.debug("Skip reconnect scheduling, listener disabled");return;}if (isReconnectSuspended()) {logger.debug("Skip reconnect scheduling, reconnect is suspended");return;}boolean shouldLog = shouldLog();InternalLogLevel infoLevel = InternalLogLevel.INFO;InternalLogLevel warnLevel = InternalLogLevel.WARN;if (shouldLog) {lastReconnectionLogging = System.currentTimeMillis();} else {warnLevel = InternalLogLevel.DEBUG;infoLevel = InternalLogLevel.DEBUG;}InternalLogLevel warnLevelToUse = warnLevel;try {reconnectionListener.onReconnectAttempt(new ConnectionEvents.Reconnect(attempt));logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress);Tuple2<CompletableFuture<Channel>, CompletableFuture<SocketAddress>> tuple = reconnectionHandler.reconnect();CompletableFuture<Channel> future = tuple.getT1();future.whenComplete((c, t) -> {if (c != null && t == null) {return;}CompletableFuture<SocketAddress> remoteAddressFuture = tuple.getT2();SocketAddress remote = remoteAddress;if (remoteAddressFuture.isDone() && !remoteAddressFuture.isCompletedExceptionally()&& !remoteAddressFuture.isCancelled()) {remote = remoteAddressFuture.join();}String message = String.format("Cannot reconnect to [%s]: %s", remote,t.getMessage() != null ? t.getMessage() : t.toString());if (ReconnectionHandler.isExecutionException(t)) {if (logger.isDebugEnabled()) {logger.debug(message, t);} else {logger.log(warnLevelToUse, message);}} else {logger.log(warnLevelToUse, message, t);}eventBus.publish(new ReconnectFailedEvent(LocalAddress.ANY, remote, t, attempt));if (!isReconnectSuspended()) {scheduleReconnect();}});} catch (Exception e) {logger.log(warnLevel, "Cannot reconnect: {}", e.toString());eventBus.publish(new ReconnectFailedEvent(LocalAddress.ANY, remoteAddress, e, attempt));}}private boolean isEventLoopGroupActive() {if (!isEventLoopGroupActive(bootstrap.group()) || !isEventLoopGroupActive(reconnectWorkers)) {return false;}return true;}private static boolean isEventLoopGroupActive(EventExecutorGroup executorService) {return !(executorService.isShuttingDown());}private boolean shouldLog() {long quietUntil = lastReconnectionLogging + LOGGING_QUIET_TIME_MS;return quietUntil <= System.currentTimeMillis();}/*** Enable event listener for disconnected events.** @param listenOnChannelInactive {@literal true} to listen for disconnected events.*/public void setListenOnChannelInactive(boolean listenOnChannelInactive) {this.listenOnChannelInactive = listenOnChannelInactive;}public boolean isListenOnChannelInactive() {return listenOnChannelInactive;}/*** Suspend reconnection temporarily. Reconnect suspension will interrupt reconnection attempts.** @param reconnectSuspended {@literal true} to suspend reconnection*/public void setReconnectSuspended(boolean reconnectSuspended) {reconnectionHandler.setReconnectSuspended(reconnectSuspended);}public boolean isReconnectSuspended() {return reconnectionHandler.isReconnectSuspended();}ReconnectionHandler getReconnectionHandler() {return reconnectionHandler;}private void resetReconnectDelay() {if (reconnectDelay instanceof StatefulDelay) {((StatefulDelay) reconnectDelay).reset();}}private String logPrefix() {if (logPrefix != null) {return logPrefix;}String buffer = "[" + ChannelLogDescriptor.logDescriptor(channel) + ", last known addr=" + remoteAddress + ']';return logPrefix = buffer;}
}
##redis lettuce重新连接代码
-
重连处理 (
reconnect
和reconnect0
):reconnect()
方法会尝试重新连接 Redis 服务器。当远程地址发生变化时,它会尝试通过socketAddressSupplier
获取新的地址并发起连接。reconnect0()
方法执行实际的重连逻辑。它通过 Netty 的bootstrap.connect(remoteAddress)
发起连接,并通过ChannelFuture
来管理连接的异步状态。- 如果连接失败,会通过
ChannelFutureListener
监听连接结果,执行相关的失败处理(如关闭通道、记录异常等)。
-
连接逻辑(Netty 的
bootstrap.connect()
):- 您的代码中也展示了与 Netty 的连接过程密切相关的部分,特别是
ChannelFuture
和ChannelPromise
的使用,这些都是 Netty 中用于异步连接、处理连接结果的关键工具。 bootstrap.connect(remoteAddress)
是用来发起连接的核心方法。它返回一个ChannelFuture
,通过这个ChannelFuture
可以监听连接成功与否的结果。
- 您的代码中也展示了与 Netty 的连接过程密切相关的部分,特别是
-
连接超时处理:
- 在重连的过程中,代码实现了一个超时机制(
TimeoutException
)。如果重连操作超时,它会取消连接操作,并触发异常。 - 通过
eventLoop().schedule()
来设定连接超时。
- 在重连的过程中,代码实现了一个超时机制(
-
Channel
的初始化与配置:- 在连接成功之后,会通过
RedisChannelInitializer
初始化通道的处理流水线(ChannelPipeline
)。如果初始化失败,会进行相应的失败处理,包括重置连接、关闭连接等。 - 如果连接成功,则会执行一些调试输出和状态更新。
- 在连接成功之后,会通过
-
错误处理与异常捕获:
- 对于连接过程中的各种异常(如 DNS 解析失败、连接失败等),代码进行了详细的异常捕获和处理。在
reconnect0
中,如果连接失败或初始化失败,都会通过completeExceptionally()
完成CompletableFuture
,确保连接错误能够被外部捕获。
- 对于连接过程中的各种异常(如 DNS 解析失败、连接失败等),代码进行了详细的异常捕获和处理。在
-
使用
CompletableFuture
:- 重连操作通过
CompletableFuture
来管理异步结果。CompletableFuture<Channel>
用来表示连接是否成功,CompletableFuture<SocketAddress>
用来表示地址解析的结果。
- 重连操作通过
关键部分的 Netty 连接代码:
-
连接过程中的异步操作:
bootstrap.connect(remoteAddress)
返回一个ChannelFuture
,表示异步连接操作,addListener()
用来监听连接的结果。
-
异常处理与重试机制:
- 如果连接失败,代码会尝试关闭连接并报告异常。如果连接成功,则会初始化相关的
ChannelPipeline
,并进行后续的操作。
- 如果连接失败,代码会尝试关闭连接并报告异常。如果连接成功,则会初始化相关的
-
超时处理:
TimeoutException
用来在连接超时后进行错误处理。
##网络连接源码如下
Tuple2<CompletableFuture<Channel>, CompletableFuture<SocketAddress>> tuple = reconnectionHandler.reconnect();
protected Tuple2<CompletableFuture<Channel>, CompletableFuture<SocketAddress>> reconnect() {CompletableFuture<Channel> future = new CompletableFuture<>();CompletableFuture<SocketAddress> address = new CompletableFuture<>();socketAddressSupplier.subscribe(remoteAddress -> {address.complete(remoteAddress);if (future.isCancelled()) {return;}reconnect0(future, remoteAddress);}, ex -> {if (!address.isDone()) {address.completeExceptionally(ex);}future.completeExceptionally(ex);});this.currentFuture = future;return Tuples.of(future, address);}
##reconnect0重连
private void reconnect0(CompletableFuture<Channel> result, SocketAddress remoteAddress) {ChannelFuture connectFuture = bootstrap.connect(remoteAddress);ChannelPromise initFuture = connectFuture.channel().newPromise();logger.debug("Reconnecting to Redis at {}", remoteAddress);result.whenComplete((c, t) -> {if (t instanceof CancellationException) {connectFuture.cancel(true);initFuture.cancel(true);}});initFuture.addListener((ChannelFuture it) -> {if (it.cause() != null) {connectFuture.cancel(true);close(it.channel());result.completeExceptionally(it.cause());} else {result.complete(connectFuture.channel());}});connectFuture.addListener((ChannelFuture it) -> {if (it.cause() != null) {initFuture.tryFailure(it.cause());return;}ChannelPipeline pipeline = it.channel().pipeline();RedisChannelInitializer channelInitializer = pipeline.get(RedisChannelInitializer.class);if (channelInitializer == null) {initFuture.tryFailure(new IllegalStateException("Reconnection attempt without a RedisChannelInitializer in the channel pipeline"));return;}channelInitializer.channelInitialized().whenComplete((state, throwable) -> {if (throwable != null) {if (isExecutionException(throwable)) {initFuture.tryFailure(throwable);return;}if (clientOptions.isCancelCommandsOnReconnectFailure()) {connectionFacade.reset();}if (clientOptions.isSuspendReconnectOnProtocolFailure()) {logger.error("Disabling autoReconnect due to initialization failure", throwable);setReconnectSuspended(true);}initFuture.tryFailure(throwable);return;}if (logger.isDebugEnabled()) {logger.info("Reconnected to {}, Channel {}", remoteAddress,ChannelLogDescriptor.logDescriptor(it.channel()));} else {logger.info("Reconnected to {}", remoteAddress);}initFuture.trySuccess();});});Runnable timeoutAction = () -> {initFuture.tryFailure(new TimeoutException(String.format("Reconnection attempt exceeded timeout of %d %s ",timeout, timeoutUnit)));};Timeout timeoutHandle = timer.newTimeout(it -> {if (connectFuture.isDone() && initFuture.isDone()) {return;}if (reconnectWorkers.isShutdown()) {timeoutAction.run();return;}reconnectWorkers.submit(timeoutAction);}, this.timeout, timeoutUnit);initFuture.addListener(it -> timeoutHandle.cancel());}
##netty Bootstrap网络连接
public ChannelFuture connect(SocketAddress remoteAddress) {ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");validate();return doResolveAndConnect(remoteAddress, config.localAddress());}
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.isDone()) {if (!regFuture.isSuccess()) {return regFuture;}return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {// Directly obtain the cause and do a null check so we only need one volatile read in case of a// failure.Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();doResolveAndConnect0(channel, remoteAddress, localAddress, promise);}}});return promise;}}
##
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,final SocketAddress localAddress, final ChannelPromise promise) {try {final EventLoop eventLoop = channel.eventLoop();final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {// Resolver has no idea about what to do with the specified remote address or it's resolved already.doConnect(remoteAddress, localAddress, promise);return promise;}final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);if (resolveFuture.isDone()) {final Throwable resolveFailureCause = resolveFuture.cause();if (resolveFailureCause != null) {// Failed to resolve immediatelychannel.close();promise.setFailure(resolveFailureCause);} else {// Succeeded to resolve immediately; cached? (or did a blocking lookup)doConnect(resolveFuture.getNow(), localAddress, promise);}return promise;}// Wait until the name resolution is finished.resolveFuture.addListener(new FutureListener<SocketAddress>() {@Overridepublic void operationComplete(Future<SocketAddress> future) throws Exception {if (future.cause() != null) {channel.close();promise.setFailure(future.cause());} else {doConnect(future.getNow(), localAddress, promise);}}});} catch (Throwable cause) {promise.tryFailure(cause);}return promise;}
##
private static void doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.final Channel channel = connectPromise.channel();channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (localAddress == null) {channel.connect(remoteAddress, connectPromise);} else {channel.connect(remoteAddress, localAddress, connectPromise);}connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}});}
##
@Overridepublic final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {return tail.connect(remoteAddress, promise);}
##
@Overridepublic ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {if (remoteAddress == null) {throw new NullPointerException("remoteAddress");}if (isNotValidPromise(promise, false)) {// cancelledreturn promise;}final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeConnect(remoteAddress, localAddress, promise);} else {safeExecute(executor, new Runnable() {@Overridepublic void run() {next.invokeConnect(remoteAddress, localAddress, promise);}}, promise, null);}return promise;}
##
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {if (invokeHandler()) {try {((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}} else {connect(remoteAddress, localAddress, promise);}}
##
@Overridepublic void connect(ChannelHandlerContext ctx,SocketAddress remoteAddress, SocketAddress localAddress,ChannelPromise promise) {unsafe.connect(remoteAddress, localAddress, promise);}
##
@Overridepublic final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}try {if (connectPromise != null) {// Already a connect in process.throw new ConnectionPendingException();}boolean wasActive = isActive();if (doConnect(remoteAddress, localAddress)) {fulfillConnectPromise(promise, wasActive);} else {connectPromise = promise;requestedRemoteAddress = remoteAddress;// Schedule connect timeout.int connectTimeoutMillis = config().getConnectTimeoutMillis();if (connectTimeoutMillis > 0) {connectTimeoutFuture = eventLoop().schedule(new Runnable() {@Overridepublic void run() {ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;ConnectTimeoutException cause =new ConnectTimeoutException("connection timed out: " + remoteAddress);if (connectPromise != null && connectPromise.tryFailure(cause)) {close(voidPromise());}}}, connectTimeoutMillis, TimeUnit.MILLISECONDS);}promise.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isCancelled()) {if (connectTimeoutFuture != null) {connectTimeoutFuture.cancel(false);}connectPromise = null;close(voidPromise());}}});}} catch (Throwable t) {promise.tryFailure(annotateConnectException(t, remoteAddress));closeIfClosed();}}
##socket连接工具
@Overrideprotected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {if (localAddress != null) {doBind0(localAddress);}boolean success = false;try {boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);if (!connected) {selectionKey().interestOps(SelectionKey.OP_CONNECT);}success = true;return connected;} finally {if (!success) {doClose();}}}
##socketChannel连接远程地址 socketChannel.connect(remoteAddress)
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)throws IOException {try {return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {@Overridepublic Boolean run() throws IOException {return socketChannel.connect(remoteAddress);}});} catch (PrivilegedActionException e) {throw (IOException) e.getCause();}}
相关文章:
redis 使用Lettuce 当redis挂掉重启之后 网络是怎么重新连接
Lettuce是一个高性能的Java Redis客户端,支持同步、异步和反应式编程模式 Lettuce的核心功能包括: 高性能:通过使用Netty作为底层网络通信框架,实现了非阻塞IO,提高了性能。丰富的API:提供了丰富…...

【IntelliJ IDEA 集成工具】TalkX - AI编程助手
前言 在数字化时代,技术的迅猛发展给软件开发者带来了更多的挑战和机遇。为了提高技术开发群体在繁多项目中的编码效率和质量,他们需要一个强大而专业的工具来辅助开发过程,而正是为了满足这一需求,TalkX 应运而生。 一、概述 1…...
二叉搜索树Ⅲ【东北大学oj数据结构8-3】C++
二叉搜索树 III B:在二叉搜索树II中加入delete指令,创建程序对二叉搜索树T执行如下指令。 插入 k:将key k 插入到 T 中。 find k:报告T中是否存在key k。 delete k:删除key为 k 的节点。 打印:使用中序树遍…...

【面试笔记】CPU 缓存机制
CPU 缓存机制 1. CPU Cache 与 MMU1.1 MMU 是什么?TLB 又是什么?他们是怎么工作的?2.2 简述 Cache 与 MMU 的协作关系?2.3 简述 Cache 与 MMU 的协作工作流程? 2. CPU 多层次缓存2.1 什么是 CPU 的多层次缓存结构&…...
MySQL基础函数使用
目录 简介 1. 单行函数 1.1 字符串函数 1.2 日期函数 1.3 数值函数 1.4 转换函数 1.5 其他函数 2. 多行函数 示例: 3. 数据分组 示例: 4. DQL单表关键字执行顺序 示例: 5. 多表查询 示例: 6. 表与表的外连接 示例…...
解决docker环境下aspose-words转换word成pdf后乱码问题
描述 环境:docker 部署工具:Jenkins 需求:本地上传的word文档需要转换成pdf 问题:转换之后的pdf文档出现小框框(乱码) 转换成PDF的操作 pom: <dependency><groupId>org.apach…...

C# 生成随机数的方法
C# 提供了一种强大而方便的工具类 Random ,用于生成随机数。这里将分类讨论如何通过 C# 实现随机数生成,以及应用于实际情况中的一些具体方案。 一、Random 类概述 Random 类表示一个伪随机数生成器,用于生成满足随机性统计要求的数字序列。…...

ip_done
文章目录 路由结论 IP分片 数据链路层重谈Mac地址MAC帧报头局域网的通信原理MSS,以及MAC帧对上层的影响ARP协议 1.公司是不是这样呢? 类似的要给运营商交钱,构建公司的子网,具有公司级别的入口路由器 2.为什么要这样呢?? IP地…...

3D可视化引擎HOOPS Visualize与HOOPS Luminate Bridge的功能与应用
HOOPS Visualize HPS / HOOPS Luminate Bridge为开发者提供了强大的工具,用于在CAD应用中集成逼真的渲染能力。本文旨在梳理该桥接产品的核心功能、使用方法及应用场景,为用户快速上手并充分利用产品特性提供指导。 桥接产品的核心功能概述 HOOPS Lumi…...

Docder 搭建Redis分片集群 散片插槽 数据分片 故障转移 Java连接
介绍 使多个 Redis 实例共同工作,实现数据的水平扩展。通过将数据分片到多个节点上,Redis 集群能够在不牺牲性能的前提下扩展存储容量和处理能力,从而支持更高并发的请求。Redis 集群不仅支持数据分片,还提供了自动故障转移和高可…...

校园交友app/校园资源共享小程序/校园圈子集合二手物品交易论坛、交友等综合型生活服务社交论坛
多客校园社交圈子系统搭建 校园交友多功能系统源码: 1、更改学校为独立的模块。整体UI改为绿色,青春色,更贴近校园风格。2、圈子归纳到学校去进行运营。每个学校可建立多个圈子。和其他学校圈子互不干扰。3、增加用户绑定学校,以后进入将默认…...

Chaos Mesh云原生的混沌测试平台搭建
Chaos Mesh云原生的混沌测试平台搭建 一.环境准备 确认已经安装helm,如要查看 Helm 是否已经安装,请执行如下命令: helm version二.使用helm安装 1.添加 Chaos Mesh 仓库 在 Helm 仓库中添加 Chaos Mesh 仓库: helm re…...
Vue3之组合式API详解
Vue 3引入了一种新的API风格——组合式API(Composition API),旨在提升组件的逻辑复用性和可维护性。本文将详细阐述Vue 3中的组合式API,包括其定义、特点、使用场景、优势等,并给出具体的示例代码。 一、定义 组合式…...

大模型的构建与部署(3)——数据标注
版权声明 本文原创作者:谷哥的小弟作者博客地址:http://blog.csdn.net/lfdfhl1. 数据标注的重要性 1.1 增强数据可解释性 数据标注通过为原始数据添加标签或注释,显著增强了数据的可解释性。在机器学习和深度学习领域,模型的训练依赖于大量带标签的数据。这些标签不仅帮助…...

AI发展与LabVIEW程序员就业
人工智能(AI)技术的快速发展确实对许多行业带来了变革,包括自动化、数据分析、软件开发等领域。对于LabVIEW程序员来说,AI的崛起确实引发了一个值得关注的问题:AI会不会取代他们的工作,导致大量失业&#x…...
本地事务 + 消息队列事务方案设计
Spring Boot 和 RocketMQ 在Spring Boot项目中实现“本地事务 消息队列事务”的方案,可以按照以下步骤实现: 先执行MySQL本地事务操作(未提交)随后发送消息到消息队列(如RocketMQ事务消息)等待消息队列确…...

pinctrl子系统学习笔记
一、背景 cpu的gpio引脚可以复用成多个功能,如可以配置成I2C或者普通GPIO模式。配置方式一般是通过写引脚复用的配置寄存器,但是不同芯片厂商配置寄存器格式内容各不相同,设置引脚复用无法做到通用且自由的配置,只能在启动初始化…...
使用vue-element 的计数器inputNumber,传第三个参数
使用vue-element 的计数器inputNumber。 其中的change 事件中,默认自带两个参数,currentValue和oldValue,分别代表改变后的数和改变前的数, 如果想要传第三个参数, change"(currentValue, oldValue) > numCha…...

如何从0构建一个flask项目,直接上实操!!!
项目结构 首先,创建一个项目目录,结构如下: flask_app/ │ ├── app.py # Flask 应用代码 ├── static/ # 存放静态文件(如CSS、JS、图片等) │ └── style.css # 示例…...
Mongoose连接数据库操作实践
文章目录 介绍特点:Mongoose 使用:创建项目并安装:连接到 MongoDB:定义 Schema:创建模型并操作数据库:创建文档:查询文档:更新文档:删除文档:使用钩子&#x…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式
一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...
在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:
在 HarmonyOS 应用开发中,手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力,既支持点击、长按、拖拽等基础单一手势的精细控制,也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档,…...

Linux-07 ubuntu 的 chrome 启动不了
文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了,报错如下四、启动不了,解决如下 总结 问题原因 在应用中可以看到chrome,但是打不开(说明:原来的ubuntu系统出问题了,这个是备用的硬盘&a…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...

什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...

USB Over IP专用硬件的5个特点
USB over IP技术通过将USB协议数据封装在标准TCP/IP网络数据包中,从根本上改变了USB连接。这允许客户端通过局域网或广域网远程访问和控制物理连接到服务器的USB设备(如专用硬件设备),从而消除了直接物理连接的需要。USB over IP的…...

听写流程自动化实践,轻量级教育辅助
随着智能教育工具的发展,越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式,也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建,…...