Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用
基于Dubbo 3.1,详细介绍了Dubbo Consumer服务调用源码。
上文我们学习了,Dubbo 发起服务调用的上半部分源码,我们学习到了FailoverClusterInvoker最终会通过服务提供者Invoker#invoke发起RPC调用,下面我们来学习Dubbo 发起服务调用的下半部分源码,也就是真正的RPC调用的源码。
Dubbo 3.x服务调用源码:
- Dubbo 3.x源码(29)—Dubbo Consumer服务调用源码(1)服务调用入口
- Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用
Dubbo 3.x服务引用源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
- Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
- Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
- Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
- Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
- Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
- Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
- Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
- Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
- Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url
Dubbo 3.x服务发布源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
- Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
- Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
- Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
- Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
- Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
- Dubbo 3.x源码(28)—Dubbo服务发布导出源码(7)应用级服务接口元数据发布
文章目录
- AbstractInvoker#invoke服务提供者调用
- AbstractInvoker#doInvokeAndReturn执行rpc调用
- DubboInvoker#doInvoke调用remote层远程通信
- getCallbackExecutor用于获取回调线程池。
- ReferenceCountExchangeClient#request发起rpc请求
- HeaderExchangeClient#request发起rpc请求
- HeaderExchangeChannel#request异步发起rpc请求
- Request请求内容
- DefaultFuture#newFuture创建Future并启动超时检测
- new DefaultFuture
- timeoutCheck超时检测
- AbstractPeer#send发起异步请求
- AbstractClient#send发起异步请求
- NettyChannel#send基于netty发起异步请求
- AbstractInvoker#waitForResultIfSync异步转同步等待
- AsyncRpcResult#get阻塞获取结果
- ThreadlessExecutor#waitAndDrain等待返回
- 阻塞优化
- Result#recreate获取结果
- AsyncRpcResult#getAppResponse获取响应结果
- AppResponse#createDefaultValue创建默认返回值
- AppResponse#recreate处理结果
- 总结
AbstractInvoker#invoke服务提供者调用
ListenerInvokerWrapper#invoke方法没有其他操作,因此直接看AbstractInvoker#invoker方法。到这里,调用者变成了服务提供者invoker。
该方法首先调用doInvokeAndReturn方法执行RPC调用并返回异步执行结果,最后调用waitForResultIfSync方法判断如果同步调用,则等待RPC结果。
/*** AbstractInvoker的方法*/
@Override
public Result invoke(Invocation inv) throws RpcException {// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed//如果由于注册表的地址刷新导致调用程序被销毁,则允许当前调用继续进行if (isDestroyed()) {logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "+ ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");}RpcInvocation invocation = (RpcInvocation) inv;//准备RpcInvocationprepareInvocation(invocation);//执行RPC调用并返回异步执行结果AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);//如果同步调用,则等待RPC结果waitForResultIfSync(asyncResult, invocation);return asyncResult;
}
AbstractInvoker#doInvokeAndReturn执行rpc调用
执行RPC调用并返回异步执行结果AsyncRpcResult。
/*** AbstractInvoker的方法* <p>* 执行RPC调用并返回异步执行结果** @param invocation* @return*/
private AsyncRpcResult doInvokeAndReturn(RpcInvocation invocation) {AsyncRpcResult asyncResult;try {/** 执行调用*/asyncResult = (AsyncRpcResult) doInvoke(invocation);} catch (InvocationTargetException e) {//异常处理Throwable te = e.getTargetException();//asyncResult封装抛出的异常if (te != null) {// if biz exceptionif (te instanceof RpcException) {((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);}asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);} else {asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);}} catch (RpcException e) {// if biz exceptionif (e.isBiz()) {asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);} else {throw e;}} catch (Throwable e) {//asyncResult封装抛出的异常asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);}//当调用模式为异步时,或者setFutureWhenSync=true时if (setFutureWhenSync || invocation.getInvokeMode() != InvokeMode.SYNC) {// set server context 将future设置到线程本地变量中,使用FutureAdapter包装RpcContext.getServiceContext().setFuture(new FutureAdapter<>(asyncResult.getResponseFuture()));}return asyncResult;
}
DubboInvoker#doInvoke调用remote层远程通信
该方法由invoker具体的子类实现,以默认dubbo协议为例子,对应的invoker为DubboInvoker。该方法将会调用remote层进行rpc通信。
- 基于轮询的策略选择一个remote层客户端ExchangeClient,用以发起rpc调用,一般来说只有一个client,即共享连接。
- 判断如果是单向发送,那么直接发送异步请求,随后返回默认的AsyncRpcResult,不需要获取真正的请求响应结果。
- 同步、异步发送处理。Dubbo默认都是走的异步发送,发送完毕之后得到一个CompletableFuture,将CompletableFuture包装为一个AsyncRpcResult返回。
- 在外层AbstractInvoker#invoker方法中会判断,如果同步则会调用future.get阻塞等待结果。这就是Dubbo所谓的异步转同步处理。
/*** DubboInvoker的方法* <p>* 执行RPC调用并返回执行结果*/
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {RpcInvocation inv = (RpcInvocation) invocation;//调用方法名final String methodName = RpcUtils.getMethodName(invocation);//设置附加属性 path={serviceInterface}inv.setAttachment(PATH_KEY, getUrl().getPath());//设置附加属性 version={version}inv.setAttachment(VERSION_KEY, version);/** 基于轮询的策略选择一个remote层客户端,用以发起rpc调用* 一般来说只有一个client,即共享连接*/ExchangeClient currentClient;if (clients.length == 1) {currentClient = clients[0];} else {currentClient = clients[index.getAndIncrement() % clients.length];}try {//是否是单向通讯,即只管发送不需要返回结果boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//计算rpc调用超时时间,默认3000msint timeout = calculateTimeout(invocation, methodName);invocation.setAttachment(TIMEOUT_KEY, timeout);/** 单向发送处理*/if (isOneway) {boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);//发送单向请求currentClient.send(inv, isSent);//返回默认的AsyncRpcResult,不需要真正的请求响应结果return AsyncRpcResult.newDefaultAsyncResult(invocation);}/** 同步、异步发送处理* 默认都是走的异步发送*/else {//回调线程池ExecutorService executor = getCallbackExecutor(getUrl(), inv);//异步发送请求,发送完毕之后得到一个CompletableFutureCompletableFuture<AppResponse> appResponseFuture =currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapterFutureContext.getContext().setCompatibleFuture(appResponseFuture);//将CompletableFuture包装为一个AsyncRpcResultAsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);result.setExecutor(executor);return result;}} catch (TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (RemotingException e) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}
}
getCallbackExecutor用于获取回调线程池。
- 如果是同步调用,那么返回一个ThreadlessExecutor实例,默认同步。
- ThreadlessExecutor与其他普通Executor之间最重要的区别是它不管理任何线程。通过execute(Runnable)方法提交给该Executor的任务并不会被调度到特定的线程执行。这些任务将会被存储在阻塞队列中,只有当线程调用waitAndDrain()方法时才会执行,执行该任务的线程与调用waitAndDrain的线程完全相同。
- 如果是异步调用,则获取对应的多线程的线程池。
protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {//如果是同步调用,那么返回一个ThreadlessExecutor实例,默认同步//ThreadlessExecutor与其他普通Executor之间最重要的区别是它不管理任何线程if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {return new ThreadlessExecutor();}//如果是异步调用,则获取对应的多线程的线程池return url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
}
ReferenceCountExchangeClient#request发起rpc请求
接下来我们就来到了remote层,将会真正的发起rpc请求。入口方法是ReferenceCountExchangeClient#request方法,内部默认委托HeaderExchangeClient#request发起请求。
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {//默认通过HeaderExchangeClient发起请求return client.request(request, timeout, executor);
}
HeaderExchangeClient#request发起rpc请求
HeaderExchangeClient#request方法,内部默认委托HeaderExchangeChannel#request发起请求。
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {//默认通过HeaderExchangeChannel发起请求return channel.request(request, timeout, executor);
}
HeaderExchangeChannel#request异步发起rpc请求
该类的职责是负责发送网络请求,Dubbo所有的网络请求最终都会封装为Request对象,并且生成并记录唯一的mId,version,requestBody等信息,创建结果对象DefaultFuture 对象,最终通过NettyClient#send发起异步请求。
/*** HeaderExchangeChannel的方法* <p>* 发起异步请求** @param request 请求,如RpcInvocation* @param timeout 请求超时时间* @param executor 回调执行器* @return 未来执行结果* @throws RemotingException*/
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {if (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request. 创建Request,生成唯一请求idRequest req = new Request();//设置Dubbo RPC protocol versionreq.setVersion(Version.getProtocolVersion());//设置双向请求req.setTwoWay(true);//设置请求数据,如RpcInvocationreq.setData(request);//创建 DefaultFuture 对象DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);try {/** 通过NettyClient#send发起异步请求*/channel.send(req);} catch (RemotingException e) {future.cancel();throw e;}return future;
}
Request请求内容
每次请求都会生成一个Request对象,表示请求的内容。创建Request的时候会生成一个自增的id,表示本次请求的唯一标识。
public Request() {//生成新的自增唯一idmId = newId();
}
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
private static long newId() {//通过一个原子变量获取唯一自增id// getAndIncrement() 当它增长到MAX_VALUE时,它将增长到MIN_VALUE,并且负数可以用作IDreturn INVOKE_ID.getAndIncrement();
}
DefaultFuture#newFuture创建Future并启动超时检测
每次请求都会通过该方法创建一个DefaultFuture对象并进行超时检测。
/*** 创建一个DefaultFuture并进行超时检测** @param channel NettyClient* @param request 请求内容Request* @param timeout 请求超时时间* @return DefaultFuture*/
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {//创建一个DefaultFuture,存入缓存final DefaultFuture future = new DefaultFuture(channel, request, timeout);//设置回调执行器future.setExecutor(executor);// ThreadlessExecutor needs to hold the waiting future in case of circuit return.//ThreadlessExecutor需要保存waitingFuture以防响应返回。同步调用的执行器为ThreadlessExecutorif (executor instanceof ThreadlessExecutor) {((ThreadlessExecutor) executor).setWaitingFuture(future);}/** 超时检查*/timeoutCheck(future);return future;
}
new DefaultFuture
在DefaultFuture的构造器中,会将将请求id与当前DefaultFuture实例存入,静态map FUTURES内部,将请求id与当前NettyClient实例存入,静态map CHANNELS内部。
请求id和响应id是同一个值,因此当有响应返回时,就能从此缓存中根据id找到对应的DefaultFuture并唤醒阻塞线程。
/*** 正在处理的channel*/
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();/*** 正在处理的请求*/
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
private DefaultFuture(Channel channel, Request request, int timeout) {//保存NettyClientthis.channel = channel;//保存requestthis.request = request;//获取mid,即请求唯一idthis.id = request.getId();//获取请求超时时间this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);//将请求id与当前DefaultFuture实例存入,静态map FUTURES内部FUTURES.put(id, this);//将请求id与当前NettyClient实例存入,静态map CHANNELS内部CHANNELS.put(id, channel);
}
timeoutCheck超时检测
创建超时检查任务TimeoutCheckTask,并且添加到dubbo时间轮中。
当超时时间到了,便会执行TimeoutCheckTask中的检查任务。将会通过id获取到对应的DefaultFuture,然后构建一个超时响应Response,通过DefaultFuture#received处理超时响应。
/*** 检查future的超时*/
private static void timeoutCheck(DefaultFuture future) {//创建超时检查任务TimeoutCheckTask task = new TimeoutCheckTask(future.getId());//添加到dubbo时间轮中future.timeoutCheckTask = TIME_OUT_TIMER.get().newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
}private static class TimeoutCheckTask implements TimerTask {//请求idprivate final Long requestID;TimeoutCheckTask(Long requestID) {this.requestID = requestID;}@Overridepublic void run(Timeout timeout) {//从FUTURES中获取请求id对应的DefaultFutureDefaultFuture future = DefaultFuture.getFuture(requestID);if (future == null || future.isDone()) {return;}//通过执行器或者直接通知超时if (future.getExecutor() != null) {future.getExecutor().execute(() -> notifyTimeout(future));} else {notifyTimeout(future);}}private void notifyTimeout(DefaultFuture future) {// create exception response.Response timeoutResponse = new Response(future.getId());// set timeout status.timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));// handle response. 处理超时响应DefaultFuture.received(future.getChannel(), timeoutResponse, true);}
}
AbstractPeer#send发起异步请求
@Override
public void send(Object message) throws RemotingException {//AbstractClient#send发起异步请求send(message, url.getParameter(Constants.SENT_KEY, false));
}
AbstractClient#send发起异步请求
内部通过NettyChannel#send发起请求。
@Override
public void send(Object message, boolean sent) throws RemotingException {//重新连接判断if (needReconnect && !isConnected()) {connect();}//获取NettyChannel,这是dubbo中的类Channel channel = getChannel();//TODO Can the value returned by getChannel() be null? need improvement.if (channel == null || !channel.isConnected()) {throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());}//通过NettyChannel#send发起请求channel.send(message, sent);
}
NettyChannel#send基于netty发起异步请求
此处的NettyChannel是位于dubbo的netty4包下的类。该方法中,将会通过NioSocketChannel#writeAndFlush方法发送请求,NioSocketChannel是netty包的类,到此,真正开始的发送请求,走到netty里面的逻辑。
NettyChannel内部有一个静态属性CHANNEL_MAP,存储着netty的NioSocketChannel到NettyChannel实例的映射关系,而NettyClient内部也持有netty的NioSocketChannel。
基于netty的NioSocketChannel#writeAndFlush方法发起请求的时候,实际上内部调用ChannelPipeline#writeAndFlush方法进行数据出站过程,包括一系列的ChannelOutboundHandler的链式处理,出站处理器执行顺序与处理器添加顺序相反,Dubbo3.1版本中对于consumer内部的NettyClient,在NettyClient#initBootstrap方法中初始化netty客户端的时候,绑定了如下的handler:
//自定义客户端消息的业务处理逻辑Handler
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug//解码.addLast("decoder", adapter.getDecoder())//编码.addLast("encoder", adapter.getEncoder())//心跳检测.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))//最后是此前创建的nettyClientHandler.addLast("handler", nettyClientHandler);
其中NettyClientHandler内部包括的dubbo handler顺序为:NettyClient、MultiMessageHandler、HeartbeatHandler、AllChannelHandler、DecodeHandler、HeaderExchangeHandler、ExchangeHandlerAdapter(DubboProtocol.requestHandler),但是在发送消息的时候这些handler没有太多的功能。关键是消息编码,这一点我们下面单独说。
/*** NettyChannel* 通过netty发送消息以及是否等待发送完成。** @param message 要发送的消息* @param sent 是否ack async-sent,默认false* @throws RemotingException 如果等待到超时或被try-catch包围的方法体抛出的任何异常,则抛出RemotingException。*/
@Override
public void send(Object message, boolean sent) throws RemotingException {//channel是否关闭super.send(message, sent);boolean success = true;int timeout = 0;try {/** 通过NioSocketChannel#writeAndFlush方法发送请求* NioSocketChannel是netty包的类,到此真正的发送请求*/ChannelFuture future = channel.writeAndFlush(message);//是否ack async-sent,默认falseif (sent) {// wait timeout mstimeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);success = future.await(timeout);}//获取异常并抛出Throwable cause = future.cause();if (cause != null) {throw cause;}} catch (Throwable e) {removeChannelIfDisconnected(channel);throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);}if (!success) {throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()+ "in timeout(" + timeout + "ms) limit");}
}
AbstractInvoker#waitForResultIfSync异步转同步等待
当AbstractInvoker#doInvokeAndReturn执行完毕之后,将会返回一个AsyncRpcResult。随后调用waitForResultIfSync方法,判断如果是同步调用,则线程阻塞等待结果返回。如果是异步调用,则不会阻塞直接返回了,在代码中我们可以通过RpcContext.getContext().getFuture().get() 来获取异步调用的结果,而通过 Dubbo上下文获取的是 FutureAdapter。
waitForResultIfSync内部是调用AsyncRpcResult#get方法完成阻塞等待的。
/*** AbstractInvoker的方法* <p>* 阻塞等待获取结果** @param asyncResult 异步执行结果* @param invocation 方法调用抽象*/
private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {//如果不是同步调用,则直接返回if (InvokeMode.SYNC != invocation.getInvokeMode()) {return;}try {/** NOTICE!* must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because* {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.* 必须调用CompletableFuture#get(long, TimeUnit)方法而非CompletableFuture#get()方法等待* 因为CompletableFuture#get()方法被证明有严重的性能下降*///获取超时时间,默认3000msObject timeout = invocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);//阻塞等待给定的时间if (timeout instanceof Integer) {asyncResult.get((Integer) timeout, TimeUnit.MILLISECONDS);} else {//阻塞Integer.MAX_VALUE毫秒asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);}} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (ExecutionException e) {Throwable rootCause = e.getCause();if (rootCause instanceof TimeoutException) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} else if (rootCause instanceof RemotingException) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} else {throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}} catch (java.util.concurrent.TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (Throwable e) {throw new RpcException(e.getMessage(), e);}
}
AsyncRpcResult#get阻塞获取结果
判断如果执行器属于ThreadlessExecutor,那么调用waitAndDrain方法阻塞当前线程直到响应返回或者超时,在新版本默认都是ThreadlessExecutor,否则通过responseFuture#get进行阻塞等待。
如果使用responseFuture#get,因为responseFuture的类型是CompletableFuture,这是JDK提供的Future实现类,所以他的get方法的阻塞实现就和Dubbo无关了,那么JDK怎么实现的呢?实际上很简单,如果CompletableFuture的内部result还为null,说明还没有结果,那么会对调用get方法的线程通过LockSupport.park阻塞并等待结果,这里可以参考我此前写的关于FutureTask的源码的文章。
/*** AsyncRpcResult的方法* <p>* 阻塞等待获取结果** @param timeout 超时时间* @param unit 单位*/
@Override
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {//如果执行器属于ThreadlessExecutor,表示同步调用if (executor != null && executor instanceof ThreadlessExecutor) {ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;//调用waitAndDrain方法阻塞当前线程直到响应返回或者超时threadlessExecutor.waitAndDrain();}//通过responseFuture阻塞等待return responseFuture.get(timeout, unit);
}
ThreadlessExecutor#waitAndDrain等待返回
ThreadlessExecutor是Dubbo2.7.5新增的同步调用时使用的一个执行器,内部没有任何多余的线程。这实际上是一个线程模型的优化,具体的优化见这篇文章:https://blog.csdn.net/weixin_39860915/article/details/103917841,简单的说对于响应结果的序列化的这步操作由业务线程自己来执行,不再需要额外的消费端的线程池线程来执行了。
相比于老的线程池模型,由业务线程自己负责监测并解析返回结果,免去了额外的消费端线程池开销。因为消费端的线程池策略默认是使用cached ,线程池会为每次的消费请求创建一个线程,那么当消费者应用并发较大或者提供者响应的时间较长时,就会出现消费者线程很多的情况。
- waitAndDrain方法将会等待直到任务队列中有一个任务,执行该任务和所有排队的任务(如果有的话)。该任务要么是正常响应,要么是超时响应。
- 在另一边,当业务数据返回后,生成一个 Runnable Task 并放入 ThreadlessExecutor 队列。此时等待将会被唤醒。
- 获取到任务之后业务线程将 Task 取出并在本线程中执行:反序列化业务数据并 set 到 Future,随后返回。
所以说,ThreadlessExecutor是通过阻塞队列来实现同步等待的。这也是Dubbo同步调用实现,因为Dubbo底层通信使用的是Netty,Netty的调用都是异步调用,每次发送请求之后都会直接返回而不会等待结果,所以Dubbo所有的调用其实也都是异步调用,因此才需要这么个“异步转同步”的操作,即:发送请求之后,业务线程调用阻塞队列的take方法阻塞,后面如果收到服务端发过来的响应结果之后,将响应数据放到阻塞队列里面,这样当前阻塞的线程就被唤醒了,这样就实现了同步调用。
那么,收到的响应结果是怎么和此前发起的某个请求匹配上的呢?实际上就是通过唯一的请求id来匹配的,响应结果中也携带有发起的请求id。
/*** 等待直到任务队列中有一个任务,执行该任务和所有排队的任务(如果有的话)。该任务要么是正常响应,要么是超时响应。*/
public void waitAndDrain() throws InterruptedException {/*** Usually, {@link #waitAndDrain()} will only get called once. It blocks for the response for the first time,* once the response (the task) reached and being executed waitAndDrain will return, the whole request process* then finishes. Subsequent calls on {@link #waitAndDrain()} (if there're any) should return immediately.** There's no need to worry that {@link #finished} is not thread-safe. Checking and updating of* 'finished' only appear in waitAndDrain, since waitAndDrain is binding to one RPC call (one thread), the call* of it is totally sequential.*///如果已经完成,则本次请求则直接返回if (isFinished()) {return;}Runnable runnable;try {//当前线程等待直到任务队列中有一个任务,执行该任务runnable = queue.take();} catch (InterruptedException e) {setWaiting(false);throw e;}synchronized (lock) {setWaiting(false);//当前线程执行该任务runnable.run();}//继续执行全部任务runnable = queue.poll();while (runnable != null) {runnable.run();runnable = queue.poll();}// mark the status of ThreadlessExecutor as finished.setFinished(true);
}
阻塞优化
AbstractInvoker#waitForResultIfSync方法中,当超时时间不是Integer类型的时候,将会调用,asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS)方法,将可能会阻塞Integer.MAX_VALUE毫秒,这种情况就类似于CompletableFuture#get不带有超时时间的方法了。那么为什么还要用CompletableFuture#get(timeout, unit)方法呢?
在方法中能够看到这样的注释:必须调用CompletableFuture#get(long, TimeUnit)方法而非CompletableFuture#get()方法等待。因为CompletableFuture#get()方法被证明有严重的性能下降。
Result#recreate获取结果
在获取到Reuslt之后,将会调用recreate方法处理返回值获取最终结果。3.0.0中引入了AsyncRpcResult来替换RpcResult, RpcResult被AppResponse替换,因此目前无论同步还是异步的Reuslt都统一使用AsyncRpcResult。
- 请求如果是FUTURE模式,那么直接返回Future,不会阻塞。
- 请求如果是异步模式,那么直接创建默认返回值的AppResponse并执行recreate方法返回,不会阻塞。
- 请求如果是同步模式,那么在获取响应结果之后才会返回。
/*** AsyncRpcResult的方法*/
@Override
public Object recreate() throws Throwable {RpcInvocation rpcInvocation = (RpcInvocation) invocation;//如果是FUTURE模式,那么直接返回Future,不会阻塞if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {return RpcContext.getClientAttachment().getFuture();}//如果是异步else if (InvokeMode.ASYNC == rpcInvocation.getInvokeMode()) {//直接创建默认返回值的AppResponse并执行recreate方法返回,不会阻塞return createDefaultValue(invocation).recreate();}//同步调用,获取响应结果之后才会返回return getAppResponse().recreate();
}
AsyncRpcResult#getAppResponse获取响应结果
阻塞式的获取结果。当然,对于同步调用,在之前的ThreadlessExecutor#waitAndDrain方法中就完成了阻塞了。
public Result getAppResponse() {try {//阻塞式的获取结果//当然,对于同步调用,在之前的ThreadlessExecutor#waitAndDrain方法中就完成了阻塞了if (responseFuture.isDone()) {return responseFuture.get();}} catch (Exception e) {// This should not happen in normal request process;logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");throw new RpcException(e);}//创建默认返回值的AppResponsereturn createDefaultValue(invocation);
}
AppResponse#createDefaultValue创建默认返回值
异步调用的情况,将会创建默认返回值返回。
获取调用的方法,如果方法不为null,那么获取方法的返回值类型,如果是基本类型则返回基本类型的默认值,否则默认返回null。
private static Result createDefaultValue(Invocation invocation) {//获取调用的方法ConsumerMethodModel method = (ConsumerMethodModel) invocation.get(Constants.METHOD_MODEL);//如果方法不为null,那么获取方法的返回值类型,如果是基本类型则返回基本类型的默认值,否则默认返回nullreturn method != null ? new AppResponse(defaultReturn(method.getReturnClass())) : new AppResponse();
}
AppResponse#recreate处理结果
最终结果的处理,如果有异常则抛出,否则返回结果。
/*** AppResponse的方法*/
@Override
public Object recreate() throws Throwable {//如果有异常,则处理并抛出if (exception != null) {// fix issue#619try {Object stackTrace = exception.getStackTrace();if (stackTrace == null) {exception.setStackTrace(new StackTraceElement[0]);}} catch (Exception e) {// ignore}throw exception;}//返回真实结果return result;
}
总结
现在我们来总结一下Dubbo3.1版本中Dubbo Consumer发起服务调用请求的总体过程。
- 调用某个Dubbo接口的方法,实际上是调用之前服务引入时生成的代理类对象,最终所有方法被转发到InvokerInvocationHandler#invoke方法中,这是请求的通用入口。
- 随后利用服务消费者Invoker从服务引入是生成的一批服务提供者Invoker列表中经过路由过滤、负载均衡机制选择一个服务提供者Invoker,随后利用服务提供者Invoker内部封装的NettyClient序列化消息并发起远程rpc请求。
- 如果调用失败,则在服务消费者ClusterInvoker中使用容错策略,例如失败重试等等,发起调用的时候每次请求都会生成一个唯一id,消费者端会缓存本次请求的id与Future的映射关系,默认每次请求都是异步调用,对于同步调用使用异步转同步机制的阻塞等待直到返回响应。响应id和请求id一致,此时就可以找到对应的Future设置响应结果。
实际上还有很多详细的知识点没说出来,例如消费者MigrationInvoker最开始实际上会进行决策使用接口级还是应用级Invoker。
本次我们学习了Dubbo Consumer发起服务调用请求的过程源码,另外还有Dubbo Provider处理服务调用请求以及Dubbo Consumer接收服务调用响应这两个阶段的源码后续再聊。
相关文章:
Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用
基于Dubbo 3.1,详细介绍了Dubbo Consumer服务调用源码。 上文我们学习了,Dubbo 发起服务调用的上半部分源码,我们学习到了FailoverClusterInvoker最终会通过服务提供者Invoker#invoke发起RPC调用,下面我们来学习Dubbo 发起服务调用…...
学习日记-250207
一.论文 1.Prompt Learning for News Recommendation 任务不一致(LLM与实际任务)产生prompt提示。 Prompt Learning for News Recommendation 论文阅读 SIGIR2023-CSDN博客 2.GPT4Rec: A Generative Framework for Personalized Recommendation and…...
Rocky Linux9安装Zabbix7.0(精简版)
Linux 系统版本 Rocky Linux release 9.3 (Blue Onyx) 注意:zabbix 7以上版本不支持CentOS 7系统,需要CentOS 8以上, 本教程支持CentOS9及Rocky Linux 9 在Rocky Linux release 9.3测试通过 Linux环境准备 关闭防火墙和selinux #关闭防…...
网络分析工具—WireShark的安装及使用
Wireshark 是一个广泛使用的网络协议分析工具,常被网络管理员、开发人员和安全专家用来捕获和分析网络数据包。它支持多种网络协议,能够帮助用户深入理解网络流量、诊断网络问题以及进行安全分析。 Wireshark 的主要功能 数据包捕获与分析: …...
C++开发(软件开发)常见面试题
目录 1、C里指针和数组的区别 2、C中空指针请使用nullptr不要使用NULL 3、http/https区别和头部结构? 4、有了mac地址为什么还要ip地址?ip地址的作用 5、有了路由器为什么还要交换机? 6、面向对象三大特性 7、友元函数 8、大端小端 …...
云原生后端|实践?
云原生(Cloud Native)是一种构建和运行应用程序的方法,它充分利用云计算的优势,包括弹性、可扩展性、高可用性和自动化运维。云原生后端开发通常涉及微服务架构、容器化、持续集成/持续部署(CI/CD)、服务网…...
WEB攻防-文件下载文件读取文件删除目录遍历目录穿越
目录 一、文件下载漏洞 1.1 文件下载案例(黑盒角度) 1.2 文件读取案例(黑盒角度) 二、文件删除 三、目录遍历与目录穿越 四、审计分析-文件下载漏洞-XHCMS 五、审计分析-文件读取漏洞-MetInfo-函数搜索 六、审计分析-…...
to_csv保存指定列的方法
df是DataFrame的数据,它的列为[代码, 名称, 最高, 最低] 现在我只想将‘代码’、“名称”两列内容存入csv,实现如下: columns_to_save [代码, 名称] df.代码 df.代码.apply("{}".format)#此行可以防止代码之前的0被忽略掉 d…...
MySQL数据库(七)SQL 优化
一 插入数据 采用方法 1 批量插入 2 手动提交事务 3 主键顺序插入 4* 使用load插入指令数据 二 主键优化 1 数据组织方式 在InnoDB存储引擎中,表中的数据都是根据主键顺序组织存放的,这种存储方式的表称为索引组织表 2 页分裂 页可以为空也可…...
使用EVE-NG实现单臂路由
一、基础知识 1.三层vlan vlan在三层环境中通常用作网关vlan配上ip网关内部接口ip 2.vlan创建步骤 创建vlan将接口划分到不同的vlan给vlan配置ip地址 二、项目案例 1、项目拓扑 2、项目实现 PC1配置 配置PC1IP地址为192.168.1.10/24网关地址为192.168.1.1 ip 192.168.1…...
flask开发的网站,后端服务关闭后,可以找回之前的数据的吗
如果使用 Flask 开发的网页,后端服务关闭后,是否还能找回数据取决于数据的存储方式: 可能找回数据的情况: 数据库存储(MySQL、PostgreSQL、SQLite 等) 如果 Flask 连接的是持久化数据库,即使后…...
疯狂SQL转换系列- SQL for Milvs2.4
鉴于Milvus仍在不停的迭代新版本,推出新功能,其SDK目前并不稳定。目前其2.4版本的SDK接口已与之前的2.2版本有了较大的差别,功能上也有了一定的调整。为此,我们重新提供了针对[Milvus2.4](https://github.com/colorknight/moql-tr…...
本地部署DeepSeek(Mac版本,带图形化操作界面)
一、下载安装:Ollama 官网下载:Download Ollama on macOS 二、安装Ollama 1、直接解压zip压缩包,解压出来就是应用程序 2、直接将Ollama拖到应用程序中即可 3、启动终端命令验证 # 输入 ollama 代表已经安装成功。 4、下载模型 点击模型…...
Linux LED 实验
一、Linux 下 LED 灯驱动原理 其实跟裸机实验很相似,只不过要编写符合 Linux 的驱动框架。 1. 地址映射 MMU全称 Memory Manage Unit,即内存存储单元。 MMU主要功能为: 1)完成虚拟空间到物理空间的映射; 2&#x…...
深入解析:Jsoup 库的多功能应用场景
Jsoup 是一个强大的 Java 库,主要用于解析和操作 HTML 文档。它不仅广泛应用于网络爬虫和数据抓取,还在网页内容分析、数据清洗与处理、自动化测试等多个领域有着广泛的应用。本文将详细介绍 Jsoup 库的多种用途,并提供具体的代码示例。 一、…...
直接抓取网页的爬虫技术:限制与合规挑战
在利用爬虫技术直接抓取网页内容时,尤其是针对像淘宝这样的大型电商平台,开发者可能会面临诸多技术限制和法律风险。这些限制不仅影响爬虫的效率,还可能引发法律问题。因此,了解这些限制并采取合规措施至关重要。 一、直接抓取网…...
docker常用命令及案例
以下是 Docker 的所有常用命令及其案例说明,按功能分类整理: 1. 镜像管理 1.1 拉取镜像 命令: docker pull <镜像名>:<标签>案例: 拉取官方的 nginx 镜像docker pull nginx:latest1.2 列出本地镜像 命令: docker images案例: 查看本地所有…...
【Redis】redis 存储的列表如何分页和检索
博主介绍:✌全网粉丝22W,CSDN博客专家、Java领域优质创作者,掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围:SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…...
2025.2.6 数模AI智能体大更新,更专业的比赛辅导,同提示词效果优于gpt-o1/o3mini、deepseek-r1满血
本次更新重新梳理了回复逻辑规则,无任何工作流,一共3.2k字细节描述。具体效果可以看视频,同时也比对了gpt-o1、gpt-o3mini、deepseek-r1-67BI,从数学建模题目解答上来看,目前我的数模AI智能体具有明显优势。 AI智能体优…...
如何在 Linux 中管理自定义脚本:将 ~/bin 目录添加到 $PATH
1. 前言 在 Linux Shell 中,$PATH 是一个非常重要的环境变量,它决定了系统在哪里寻找可执行命令。通过为 $PATH 添加自定义目录,你就能在任意位置方便地调用自己写的脚本或程序。本文将围绕这一主题,结合示例脚本 mycmd 以及 .ba…...
[LUA ERROR] bad light userdata pointer
Cocos2d项目,targetSdkVersion30,在 android 13 设备运行报错: [LUA ERROR] bad light userdata pointer ,导致黑屏。 参考 https://blog.csdn.net/sdsabc2000/article/details/135384162的方法 下载最新的Cocos2dx 4.0,将其中的luajit &am…...
cursor指令工具
Cursor 工具使用指南与实例 工具概览 Cursor 提供了一系列强大的工具来帮助开发者提高工作效率。本指南将通过具体实例来展示这些工具的使用方法。 1. 目录文件操作 1.1 查看目录内容 (list_dir) 使用 list_dir 命令可以查看指定目录下的文件结构: 示例: list_dir log…...
【玩转全栈】----Django模板语法、请求与响应
目录 一、引言 二、模板语法 三、传参 1、视图函数到模板文件 2、模板文件到视图函数 四、引入静态文件 五、请求与响应 ?1、请求 2、响应 六、综合小案例 1、源码展示 2、注意事项以及部分解释 3、展示 一、引言 像之前那个页面,太过简陋,而且一个完整…...
2025年2月9日(数据分析,在最高点和最低点添加注释,添加水印)
要在最高点和最低点添加文本注释,可以使用 plt.annotate() 函数。这个函数允许你在图表中的特定位置添加文本注释,并且可以指定箭头指向特定的数据点。 以下是修改后的代码,添加了在最高点和最低点的文本注释: from matplotlib import pyplot as plt from matplotlib imp…...
C++,设计模式,【单例模式】
文章目录 一、模式定义与核心价值二、模式结构解析三、关键实现技术演进1. 基础版(非线程安全)2. 线程安全版(双重检查锁)3. 现代C++实现(C++11起)四、实战案例:全局日志管理器五、模式优缺点深度分析✅ 核心优势⚠️ 潜在缺陷六、典型应用场景七、高级实现技巧1. 模板化…...
今日AI和商界事件(2025-02-08)
今日AI领域的重大事件主要包括以下几个方面: 一、DeepSeek引发的行业震动 事件概述:DeepSeek作为近期崛起的AI模型,以其低成本、高性能的推理能力引发了广泛关注。其开源策略、独特的出身以及强大的算力表现,使得微软、英伟达等…...
C# LINQ与集合类 数据操作
目录 LINQ语法 过滤数据 投影数据 排序数据 集合操作 聚合操作 分组操作 查找元素 其他操作 常用的集合类 List LinkedList HashSet Dictionary List:动态数组 LinkedList:双向链表 HashSet:唯一无序集合 Dictionary&…...
开源流程引擎对比:compileflow、Turbo、Warm-Flow、 flowable、activiti
文章目录 开源流程引擎对比I 工作流引擎阿里的Compileflowflowableactivitiwarm-flow(国产)Turbo (didiopensource)II 知识扩展开发流程开源流程引擎对比 ActivitiCamundaCompileflowturbo核心表量282205特性 中断可重入√√√支持回滚√√运行模式独立运行和内嵌独立运行和…...
golang使用sqlite3,开启wal模式,并发读写
因为sqlite是基于文件的,所以默认情况下,sqlite是不支持并发读写的,即写操作会阻塞其他操作,同时sqlite也很容易就产生死锁。 但是作为一个使用广泛的离线数据库,从sqlite3.7.0版本开始(SQLite Release 3.…...
基于yolov11的阿尔兹海默症严重程度检测系统python源码+onnx模型+评估指标曲线+精美GUI界面
【算法介绍】 基于YOLOv11的阿尔兹海默症严重程度检测系统是一种创新的医疗辅助工具,旨在通过先进的计算机视觉技术提高阿尔兹海默症的早期诊断和病情监测效率。阿尔兹海默症是一种渐进性的神经退行性疾病,通常表现为认知障碍、记忆丧失和语言障碍等症状…...
