当前位置: 首页 > article >正文

使用 SSE + WebFlux 推送日志信息到前端

为什么使用 SSE 而不使用 WebSocket, 请看 SEE 对比 Websocket 的优缺点。

特性SSEWebSocket
通信方向单向(服务器→客户端)双向(全双工)
协议基于 HTTP独立协议(需 ws:// 前缀)
兼容性现代浏览器(IE 不支持)广泛支持
复杂度简单(服务器只需返回流数据)较复杂(需处理握手、帧协议等)
适用场景实时推送(日志、通知、新闻)双向交互(聊天、实时协作)

我的架构如下:

前端
后端中间件
后端服务

[前端] → 打开模态框 → 发起SSE连接 → [后端中间件] → 转发请求 → [后端服务] → 查询日志信息
← 实时日志推送 ← (SSE流) ← 捕获进程输出流 ← 返回实时日志流

前端html代码:

前端代码使用 bootstrap + jquery

<a href='#' data-bs-toggle='modal' data-bs-target='#logModal'>日志</a><!-- Modal -->
<div class="modal fade" id="logModal" tabindex="-1" aria-labelledby="logModalLabel" aria-hidden="true"><div class="modal-dialog modal-fullscreen"><div class="modal-content"><div class="modal-header"><h1 class="modal-title fs-5" id="logModalLabel">Modal title</h1><button type="button" class="btn-close" data-bs-dismiss="modal" aria-label="Close"></button></div><div class="modal-body" id="logContent">...</div><div class="modal-footer"><button type="button" class="btn btn-secondary" data-bs-dismiss="modal">Close</button><button type="button" class="btn btn-primary">Save changes</button></div></div></div>
</div>

前端js代码:

$("#logModal").on('shown.bs.modal', function (){const eventSource = new EventSource("http://localhost:9998/middle/logs");const logContent = $('#logContent');eventSource.onmessage = function (event){const logLine = $('<div>').text(event.data)logContent.append(logLine);logContent[0].scrollTop = logContent[0].scrollHeight; // 自动滚动到底部};eventSource.onerror = function (error ) {console.log('日志连接失败: ', error);eventSource.close();logContent.text('日志获取失败,请检查容器常或网络连接')}$("#logModal").on('hide.bs.modal', function (){eventSource.close();logContent.empty(); // 清空日志})
})

后端中间件需要引入 webflux 的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

后端中间件代码:

controller:

@GetMapping(value = "/middle/logs", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter getDockerLogs(){return publishServerService.getDockerLogs();
}

service:

private final WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create())).build();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();public SseEmitter getDockerLogs() {String url = "http://127.0.0.1:9999/service/logs";SseEmitter emitter = new SseEmitter();executorService.submit(() -> {try {webClient.get().uri(url).retrieve().bodyToFlux(String.class).subscribe(log -> {try {emitter.send(SseEmitter.event().data(log)); // 发送消息到前端} catch (IOException e) {emitter.completeWithError(e);}},error -> emitter.completeWithError(error),emitter::complete);} catch (Exception e) {emitter.completeWithError(e);}});return emitter;
}

后端服务代码:

controller:

    @GetMapping(value = "/service/logs", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter getDockerLogs() {return publishServerService.getDockerLogs();}

service:

private final ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();public SseEmitter getDockerLogs() {SseEmitter emitter = new SseEmitter();executorService.submit(() -> {try {int i = 0;while (i < 100000000) {try {emitter.send(SseEmitter.event().data(i)); // 发送消息到后端中间件Thread.sleep(1000);i++;} catch (InterruptedException e) {throw new RuntimeException(e);}}emitter.complete();} catch (IOException e) {throw new RuntimeException(e);}});return emitter;
}

提示:你可以把 i 换成真实的应用程序日志,如下:

public SseEmitter getDockerLogs() {SseEmitteremitter = new SseEmitter();executorService.submit(() -> {try {Process process = Runtime.getRuntime().exec("docker logs -f nginx");InputStreamReader inputStreamReader = new InputStreamReader(process.getInputStream());BufferedReader reader = new BufferedReader(inputStreamReader);String line;while ((line = reader.readLine()) != null) {emitter.send(SseEmitter.event().data(line));}emitter.complete();} catch (IOException e) {throw new RuntimeException(e);}});return emitter;
}

最终前端显示效果如下:

在这里插入图片描述

到此前端就可以实时的获取后端日志在页面中显示了。

但是你会发现后端控制台会时不时报错,错误信息如下:

2025-04-24T11:01:12.488Z  WARN 26616 --- [nio-9990-exec-2] .w.s.m.s.DefaultHandlerExceptionResolver : Ignoring exception, response committed. : org.springframework.web.context.request.async.AsyncRequestTimeoutException
2025-04-24T11:01:12.489Z  WARN 26616 --- [nio-9990-exec-2] .w.s.m.s.DefaultHandlerExceptionResolver : Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException]org.springframework.web.context.request.async.AsyncRequestTimeoutExceptionat org.springframework.web.context.request.async.TimeoutDeferredResultProcessingInterceptor.handleTimeout(TimeoutDeferredResultProcessingInterceptor.java:42)at org.springframework.web.context.request.async.DeferredResultInterceptorChain.triggerAfterTimeout(DeferredResultInterceptorChain.java:81)at org.springframework.web.context.request.async.WebAsyncManager.lambda$startDeferredResultProcessing$5(WebAsyncManager.java:442)at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.onTimeout(StandardServletAsyncWebRequest.java:154)at org.apache.catalina.core.AsyncListenerWrapper.fireOnTimeout(AsyncListenerWrapper.java:44)at org.apache.catalina.core.AsyncContextImpl.timeout(AsyncContextImpl.java:135)at org.apache.catalina.connector.CoyoteAdapter.asyncDispatch(CoyoteAdapter.java:135)at org.apache.coyote.AbstractProcessor.dispatch(AbstractProcessor.java:243)at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:57)at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:896)at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1744)at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52)at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)javaweb 报错如下:
2025-04-24T11:02:41.538Z  WARN 26653 --- [nio-9991-exec-1] .w.s.m.s.DefaultHandlerExceptionResolver : Ignoring exception, response committed. : org.springframework.web.context.request.async.AsyncRequestTimeoutException
2025-04-24T11:02:41.538Z  WARN 26653 --- [nio-9991-exec-1] .w.s.m.s.DefaultHandlerExceptionResolver : Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException]
2025-04-24T11:02:42.480Z ERROR 26653 --- [or-http-epoll-2] reactor.core.publisher.Operators         : Operator called default onErrorDroppedjava.lang.IllegalStateException: The response object has been recycled and is no longer associated with this facadeat org.apache.catalina.connector.ResponseFacade.checkFacade(ResponseFacade.java:478) ~[tomcat-embed-core-10.1.16.jar!/:na]at org.apache.catalina.connector.ResponseFacade.isFinished(ResponseFacade.java:154) ~[tomcat-embed-core-10.1.16.jar!/:na]at org.apache.catalina.connector.ResponseFacade.flushBuffer(ResponseFacade.java:240) ~[tomcat-embed-core-10.1.16.jar!/:na]at org.springframework.http.server.ServletServerHttpResponse.flush(ServletServerHttpResponse.java:104) ~[spring-web-6.0.14.jar!/:6.0.14]at org.springframework.http.server.DelegatingServerHttpResponse.flush(DelegatingServerHttpResponse.java:61) ~[spring-web-6.0.14.jar!/:6.0.14]at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.complete(ResponseBodyEmitterReturnValueHandler.java:231) ~[spring-webmvc-6.0.14.jar!/:6.0.14]at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.complete(ResponseBodyEmitter.java:266) ~[spring-webmvc-6.0.14.jar!/:6.0.14]at reactor.core.publisher.LambdaSubscriber.onComplete(LambdaSubscriber.java:132) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onComplete(FluxConcatMapNoPrefetch.java:240) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:183) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onComplete(FluxBufferPredicate.java:356) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onComplete(FluxPeekFuseable.java:595) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onComplete(FluxMapFuseable.java:350) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:230) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:371) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onComplete(FluxFlattenIterable.java:273) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:483) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:275) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:419) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:500) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:768) ~[reactor-netty-http-1.1.13.jar!/:1.1.13]at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[netty-transport-classes-epoll-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509) ~[netty-transport-classes-epoll-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) ~[netty-transport-classes-epoll-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.101.Final.jar!/:4.1.101.Final]at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

这是因为 SpringMVC 默认设置了 30 秒超时时间,你需要修改的长一点,具体多少由你自己定义。后端中间件和后端服务都需要配置。

错误分析

1. AsyncRequestTimeoutException

此错误表明异步请求超时。在 Spring 里,当一个异步请求在规定的时间内没有完成时,就会抛出该异常。默认情况下,Spring 的异步请求超时时间是 30 秒。在 SSE 场景中,因为要保持长连接以实现实时数据推送,所以很容易超出这个时间限制。

2. IllegalStateException

该错误显示响应对象已被回收,不再和当前的请求关联。这通常是在异步请求超时后,响应对象被关闭或者回收,而代码还尝试对其进行操作时发生的。

解决办法

1. 增加异步请求超时时间

你可以通过配置 WebMvcConfigurer 来增加异步请求的超时时间,避免因超时引发异常。

在 后端中间件 和 后端服务 中添加如下配置类:

import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;@Configuration
class LogWebConfig implements WebMvcConfigurer {@Overridepublic void configureAsyncSupport(AsyncSupportConfigurer configurer) {// 设置异步请求超时时间为 3600 秒(1 小时)configurer.setDefaultTimeout(3600 * 1000); }
}

扩展:

我通过前端传递参数(容器id)给后端执行,我发现了一个奇怪的问题。

我第一次打开模态框,可以正常的实时显示日志。但是我点击 close 的时候,这段代码确实执行了。

$("#logModal").on('hide.bs.modal', function (){eventSource.close();logContent.empty(); // 清空日志
})

但是我再次打开模态框,发现没有日志显示了,我甚至刷新页面,然后再次打开模态框依然没有日志显示,只有我把后端服务重启之后,再次打开模态框,才有日志显示,这是为什么?

这个问题可能是由于EventSource 实例未正确释放或重复创建导致的资源冲突。以下是具体分析和解决方案:

问题原因分析

1. EventSource 作用域问题:

在当前代码中,eventSource变量是在shown.bs.modal事件处理函数的闭包中创建的。当模态框关闭时,虽然通过hide.bs.modal事件调用了eventSource.close(),但闭包中的变量可能未被正确释放,导致再次打开模态框时,旧的eventSource实例残留或引用错误。

2. 事件绑定重复问题:

每次打开模态框时,shown.bs.modal事件处理函数会重复绑定hide.bs.modal事件,导致多个处理函数共存,可能混淆不同实例的eventSource。

3. 后端连接未正确断开:

若后端未正确处理EventSource的断开(如未关闭响应流),可能导致端口或连接资源被占用,后续请求无法建立新连接。

解决方案:

使用模态框数据存储 EventSource 实例

修改代码,将eventSource实例存储在模态框元素的data属性中,确保每次关闭时正确释放当前实例:

// 修改后的模态框显示事件处理函数
$("#logModal").on('shown.bs.modal', function (event) {const trigger = $(event.relatedTarget);const itemStr = trigger.data('item');if (!itemStr) return;const item = JSON.parse(itemStr);const modal = $(this); // 获取当前模态框实例// 先关闭可能存在的旧连接const oldEventSource = modal.data('eventSource');if (oldEventSource) {oldEventSource.close();modal.removeData('eventSource');}// 创建新的EventSource并存储到模态框数据中const eventSource = new EventSource(`http://localhost:9998/middle/logs?containerId=${item.containerId}&ip=${item.ip}`);modal.data('eventSource', eventSource);const logContent = $('#logContent');logContent.empty();eventSource.onmessage = function (event) {const logLine = $('<div>').text(event.data);logContent.append(logLine);logContent[0].scrollTop = logContent[0].scrollHeight;};eventSource.onerror = function (error) {console.log('日志连接失败: ', error);eventSource.close();logContent.text('日志获取失败,请检查容器状态或网络连接');modal.removeData('eventSource'); // 清除错误状态的实例};
});// 修改后的模态框隐藏事件处理函数
$("#logModal").on('hide.bs.modal', function () {const modal = $(this);const eventSource = modal.data('eventSource');if (eventSource) {eventSource.close(); // 关闭当前实例modal.removeData('eventSource'); // 清除数据引用}$('#logContent').empty(); // 清空日志
});

其实不是上面这个问题,经过再次排查,我发现了问题所在,当我的后端服务的代码这样写的时候,就会出现前面说的问题。

public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {SseEmitter emitter = new SseEmitter();executorService.submit(() -> {try {Process process = Runtime.getRuntime().exec("docker logs -f " + publishServerDto.getContainerId());InputStreamReader inputStreamReader = new InputStreamReader(process.getInputStream());BufferedReader reader = new BufferedReader(inputStreamReader);String line;while ((line = reader.readLine()) != null) {emitter.send(SseEmitter.event().data(line));}emitter.complete();} catch (IOException e) {emitter.completeWithError(e);}});return emitter;}

当我的后端服务的代码修改成这样的时候。

public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {SseEmitter emitter = new SseEmitter();executorService.submit(() -> {try {int i = 0;while (i < 100000000) {try {emitter.send(SseEmitter.event().data(i)); // 发送消息到后端中间件Thread.sleep(1000);i++;} catch (InterruptedException e) {throw new RuntimeException(e);}}emitter.complete();} catch (IOException e) {emitter.completeWithError(e);}});return emitter;}

前端关闭再打开模态框也能正常显示数据。为什么?

这是因为 docker logs -f 命令会持续阻塞直到进程被终止,而我的后端代码在 EventSource 关闭时未正确终止正在运行的 Process 对象,导致资源占用和连接冲突。

docker logs -f 命令导致线程阻塞,无法响应 SseEmitter 的关闭事件。当前端关闭 EventSource 时,后端的 reader.readLine() 处于阻塞状态,无法感知 SseEmitter 的异常或完成信号,导致 finally 块和异常处理代码无法执行。

另外,

以下是详细分析和解决方案:

问题根源:未终止后台进程

1. docker logs -f 的阻塞特性

  • docker logs -f 会持续读取容器日志并阻塞当前线程,直到容器停止或命令被中断(如按下 Ctrl+C)。

  • 当前端关闭 EventSource 时,后端的 SseEmitter 会触发 completeWithError,但 Process 对象(docker logs 进程)仍在后台运行,其输入流被占用,导致:

    • 再次打开模态框时,新的 Process 无法创建(端口 / 资源被占用)。
    • 旧的 Process 残留数据可能干扰新连接。

2. 模拟数据与真实命令的差异

模拟数据场景: 模拟数据的循环中使用了 Thread.sleep(1000),该方法会响应线程中断(InterruptedException)。当 SseEmitter 关闭时,会抛出异常并中断线程,而 docker logs -f 的阻塞式读取无法响应中断,导致清理逻辑失效。

真实命令场景: docker logs -f 是外部进程,不受 Java 线程控制,EventSource 关闭时未终止该进程,导致资源泄漏。

3. 阻塞式读取的局限性

BufferedReader.readLine() 是阻塞式方法,当 docker logs -f 没有新日志时,线程会一直阻塞在此处,无法处理 SseEmitter 的关闭事件(如 emitter.completeWithError() 或客户端断开连接引发的异常)。可以使用非阻塞式读取或中断机制 解决这个问题。

解决方案:

分离读取逻辑

将日志读取放到独立线程中,避免主线程被 docker logs -f 阻塞,确保能响应 SseEmitter 的关闭事件。

监听 SseEmitter 事件

使用 emitter.onCompletion() 和 emitter.onError() 回调,在连接关闭或出错时执行清理逻辑。

修改后的代码:

service :

// 使用原子引用存储进程对象,确保多线程环境下的可见性和原子性private final AtomicReference<Process> processHolder = new AtomicReference<>();// 使用原子布尔控制日志读取循环,确保线程安全的状态变更private final AtomicBoolean isRunning = new AtomicBoolean(true);public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {SseEmitter emitter = new SseEmitter();// 提交异步任务处理日志流executorService.submit(() -> {try {// 1. 执行docker logs命令,获取容器实时日志String containerId = publishServerDto.getContainerId();Process process = Runtime.getRuntime().exec("docker logs -f " + containerId);processHolder.set(process); // 保存进程引用到原子容器中// 2. 启动独立线程读取日志输入流(避免主线程阻塞)new Thread(() -> {try ( // 使用try-with-resources自动关闭流BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {String line;// 循环读取日志,直到isRunning为false或读取到null(流结束)while (isRunning.get() && (line = reader.readLine()) != null) {// 通过SSE发送日志数据到前端emitter.send(SseEmitter.event().data(line));}} catch (IOException e) {// 读取流时发生异常(如管道关闭),终止进程Process p = processHolder.get();if (p != null) {p.destroy(); // 尝试正常终止进程System.out.println("taskagent - 读取日志流异常: " + e.getMessage());}}}).start();// 3. 监听SSE连接的生命周期事件// 当前端正常关闭连接时触发(如关闭模态框)emitter.onCompletion(() -> {handleShutdown("正常关闭", processHolder.get()); // 统一处理关闭逻辑});// 当SSE连接发生错误时触发(如网络中断)emitter.onError(ex -> {handleShutdown("错误关闭: " + ex.getMessage(), processHolder.get());});// 当SSE连接超时时触发(需配置超时时间,默认30秒)emitter.onTimeout(() -> {handleShutdown("超时关闭", processHolder.get());});} catch (Exception e) {// 初始化进程时发生异常(如命令格式错误)isRunning.set(false);Process p = processHolder.get();if (p != null) p.destroy();System.out.println("taskagent - 初始化docker进程失败: " + e.getMessage());emitter.completeWithError(e); // 通知前端连接失败}});return emitter;}/*** 统一处理进程关闭逻辑* @param reason 关闭原因(用于日志输出)* @param process 待关闭的进程*/private void handleShutdown(String reason, Process process) {isRunning.set(false); // 停止日志读取循环if (process != null) {process.destroy(); // 发送中断信号(等效于Ctrl+C)System.out.println("taskagent - " + reason + ", 已终止docker进程"); // 关键日志输出点}}

现在我发现另一个问题。我的页面有很多容器,首先我打开nginx容器的日志,模态框被打开,实时日志能正常显示在页面上,当我关闭模态框,然后再次打开 nginx的日志,模态框被打开,依然能正常显示实时日志。 关键问题来了,然后我关闭模态框,接着打开 redis 的容器日志,日志没有显示在页面上了。

问题分析:

1. isRunning 标志的重置问题 - 可能

在 handleShutdown 方法里,isRunning 被设置为 false,不过在开启新的日志流时,没有将其重置为 true。这会让后续的日志读取循环无法正常运行。

解决办法:

在开启新的日志流之前,把 isRunning 重置为 true。

2. processHolder 未正确更新 - 可能

当切换容器查看日志时,processHolder 可能还保存着之前的 Process 对象,这会对新的日志读取产生影响。

解决办法:
在开启新的日志流之前,确保 processHolder 被清空。

3. webClient 连接问题(taskweb 端) - 可能

taskweb 端的 webClient 在处理多个连接时,可能会出现连接复用或者资源未正确释放的情况。

解决办法:

确保 webClient 在每次请求结束后都能正确释放资源。可以考虑为每个请求创建独立的 webClient 实例。

4. 前端 EventSource 管理问题 - 可能

前端在切换容器时,EventSource 可能没有正确关闭或者重新创建。

解决办法:

确保在切换容器时,EventSource 被正确关闭并重新创建。你的前端代码已经有了关闭逻辑,不过可以添加一些调试日志来确认是否正常执行。

最终修改的代码如下:

前端js代码:

// 显示实时日志$("#logModal").on('shown.bs.modal', function (event){// 从触发模态框的元素中获取 containerIdconst trigger = $(event.relatedTarget);const itemStr = trigger.data('item');if (!itemStr) {return;}const modal = $(this);let oldEventSource = modal.data('eventSource');if (oldEventSource) {oldEventSource.close();oldEventSource = null;modal.removeData('eventSource');}const item = JSON.parse(itemStr);let eventSource = new EventSource(`http://localhost:9998/middle/logs?containerId=${item.containerId}&ip=${item.ip}`);modal.data('eventSource', eventSource);const logContent = $('#logContent');logContent.empty(); // 清空日志eventSource.onmessage = function (event){const logLine = $('<div>').text(event.data)logContent.append(logLine);logContent[0].scrollTop = logContent[0].scrollHeight;};eventSource.onerror = function (error ) {console.log('日志连接失败: ', error);eventSource.close();eventSource = null; // 确保引用被清除logContent.text('日志获取失败,请检查容器常或网络连接')}})// 监听模态框关闭逻辑$("#logModal").on('hide.bs.modal', function (){const modal = $(this);let eventSource = modal.data('eventSource');if (eventSource) {eventSource.close(); // 关闭当前实例eventSource = null;modal.removeData('eventSource');  // 清除数据引用}$("#logContent").empty(); // 清空日志})

后端中间件代码:

service:

public SseEmitter getDockerLogs(PublishServer publishServer) {String url = taskagentConfig.getPrefixAddress(publishServer.getIp(), taskagentConfig.getGetDockerLogsUrl());SseEmitter emitter = new SseEmitter();executorService.submit(() -> {try {WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create())).build();webClient.get().uri(url, publishServer.getContainerId()).retrieve().bodyToFlux(String.class).subscribe(log -> {try {emitter.send(SseEmitter.event().data(log));} catch (IOException e) {emitter.completeWithError(e);}},emitter::completeWithError,emitter::complete);} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}

后端服务代码:

// 使用原子应用存储进程对象,确保多线程环境下的可见性和原子性private final AtomicReference<Process> processHolder = new AtomicReference<>();// 使用原子布尔值控制日志读取循环,确保线程安全的状态变更private final AtomicBoolean isRunning = new AtomicBoolean(true);public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {SseEmitter emitter = new SseEmitter();isRunning.set(true);processHolder.set(null); // 清空进程引用// 提交异步任务处理日志流executorService.submit(() -> {try {// 1. 执行 docker logs 命令,获取容器实时日志Process process = Runtime.getRuntime().exec("docker logs -f "+publishServerDto.getContainerId());processHolder.set(process); // 保存进程引用到原子容器中// 2. 启动独立线程读取日志输入流(避免主线程阻塞)new Thread(() -> {// 使用 try-with-resources 自动关闭流资源try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))){String line;// 循环读取日志,直到isRunning为false或读取到null(流结束)while (isRunning.get() && (line = reader.readLine()) != null) {// 通过 SSE 发送日志数据到taskwebemitter.send(SseEmitter.event().data(line));}} catch (IOException e) {// 读取流时发生异常(如管道关闭),终止进程Process p = processHolder.get();if (p != null) {p.destroy(); // 尝试正常终止进程log.error("taskagent - 读取流时发生异常", e.getMessage());}}}).start();// 3. 监听SSE连接的生命周期事件// 当前端正常关闭连接时触发emitter.onCompletion(() -> {// 统一处理关闭逻辑handleShutdown("正常关闭", processHolder.get());});// 当SSE连接发生错误时触发(如网络中断)emitter.onError(ex -> {handleShutdown("错误关闭: "+ex.getMessage(), processHolder.get());});// 当SSE连接超时时触发emitter.onTimeout(() -> {handleShutdown("超时关闭", processHolder.get());});} catch (IOException e) {// 初始化进程时发生异常isRunning.set(false);Process p = processHolder.get();if (p != null) {p.destroy();}log.error("taskagent - 初始化 docker 进程失败:", e.getMessage());emitter.completeWithError(e); // 通知taskweb连接失败}});return emitter;}/*** 统一处理进程关闭逻辑* @param reason 关闭原因(用于日志输出)* @param process 待关闭的进程*/private void handleShutdown(String reason, Process process) {isRunning.set(false); // 停止日志读取循环if (process != null) {process.destroy(); // 发送中断信号(等效于Ctrl+C)log.info("taskagent - " + reason + ", 已终止docker进程");}}

修改成上面之后,前端实时显示日志的效果就全部正常了。

相关文章:

使用 SSE + WebFlux 推送日志信息到前端

为什么使用 SSE 而不使用 WebSocket, 请看 SEE 对比 Websocket 的优缺点。 特性SSEWebSocket通信方向单向&#xff08;服务器→客户端&#xff09;双向&#xff08;全双工&#xff09;协议基于 HTTP独立协议&#xff08;需 ws:// 前缀&#xff09;兼容性现代浏览器&#xff08…...

Ollama 常见命令速览:本地大模型管理指南

Ollama 常见命令速览&#xff1a;本地大模型管理指南 一、什么是 Ollama&#xff1f; Ollama 是一个轻量级工具&#xff0c;允许用户在本地快速部署和运行大型语言模型&#xff08;LLM&#xff09;&#xff0c;如 Llama、DeepSeek、CodeLlama 等。其命令行工具设计简洁&#…...

C++开发之设计模式

设计模式存在的意义 设计模式提供了经过验证的解决方案&#xff0c;帮助开发者在不同的项目中重用这些解决方法&#xff0c;减少重复劳动&#xff0c;提高代码的复用性。设计模式通常遵循面向对象的设计原则&#xff0c;如单一职责原则、开放封闭原则等&#xff0c;能够帮助开…...

AWS Glue ETL设计与调度最佳实践

一、引言 在AWS Glue中设计和调度ETL过程时&#xff0c;需结合其无服务器架构和托管服务特性&#xff0c;采用系统化方法和最佳实践&#xff0c;以提高效率、可靠性和可维护性。本文将从调度策略和设计方法两大维度详细论述&#xff0c;并辅以实际案例说明。 二、调度策略的最…...

二叉树的遍历(广度优先搜索)

二叉树的第二种遍历方式&#xff0c;层序遍历&#xff0c;本质是运用队列对二叉树进行搜索。 层序遍历是指将二叉树的每一层按顺序遍历&#xff0c;通过队列实现就是先将根节点push入队&#xff0c;统计此时的队列中的元素数量size&#xff0c;将size元素全部pop出去&#xff0…...

JavaScript 里创建对象

咱们来用有趣的方式探索一下 JavaScript 里创建对象的各种“魔法咒语”! 想象一下,你是一位魔法工匠,想要在你的代码世界里创造各种奇妙的“魔法物品”(也就是对象)。你有好几种不同的配方和工具: 1. 随手捏造(对象字面量 {}) 场景:你想快速做一个独一无二的小玩意儿…...

2025年计算机视觉与智能通信国际会议(ICCVIC 2025)

2025 International Conference on Computer Vision and Intelligent Communication 一、大会信息 会议简称&#xff1a;ICCVIC 2025 大会地点&#xff1a;中国杭州 收录检索&#xff1a;提交Ei Compendex,CPCI,CNKI,Google Scholar等 二、会议简介 2025年计算机视觉与智能通…...

手工收集统计信息

有时想对某些表收集统计信息 CREATE OR REPLACE PROCEDURE GATHER_STATS ASDECLAREV_SQL1 VARCHAR(1000);--表游标CURSOR C1 ISSELECT (SELECT USER) AS TABLE_OWNER,TABLE_NAMEFROM USER_TABLES; --可以在这里加过滤条件--索引游标CURSOR C2 ISSELECT TABLE_OWNER,INDEX_NAM…...

flume整合Kafka和spark-streaming核心编程

flume整合Kafka 需求1&#xff1a;利用flume监控某目录中新生成的文件&#xff0c;将监控到的变更数据发送给kafka&#xff0c;kafka将收到的数据打印到控制台&#xff1a; 1.查看topic 2.编辑flume-Kafka.conf&#xff0c;并启动flume 3.启动Kafka消费者 4.新增测试数据 5.查…...

tokenizer的用法

下面介绍下基于 Hugging Face Transformers 库中 tokenizer&#xff08;分词器&#xff09;的主要用法和常用方法&#xff0c;帮助你了解如何在各种场景下处理文本。这里以 AutoTokenizer 为例&#xff0c;但大多数模型对应的 tokenizer 用法大同小异。 ───────────…...

kotlin和MVVM的结合使用总结(二)

MVVM 架构详解 核心组件&#xff1a;ViewModel 和 LiveData 在 Android 中&#xff0c;MVVM 架构主要借助 ViewModel 和 LiveData 来实现。ViewModel 负责处理业务逻辑&#xff0c;而 LiveData 则用于实现数据的响应式更新。 ViewModel 的源码分析 ViewModel 的核心逻辑在 …...

Git 入门知识详解

文章目录 一、Git 是什么1、Git 简介2、Git 的诞生3、集中式 vs 分布式3.1 集中式版本控制系统3.2 分布式版本控制系统 二、GitHub 与 Git 安装1、GitHub2、Git 安装 一、Git 是什么 1、Git 简介 Git 是目前世界上最先进的分布式版本控制系统。版本控制系统能帮助我们更好地管…...

React.memo 和 useMemo

现象 React 中&#xff0c;通常父组件的某个state发生改变&#xff0c;会引起父组件的重新渲染&#xff08;和其他state的重新计算&#xff09;&#xff0c;从而会导致子组件的重新渲染&#xff08;和其他非相关属性的重新计算&#xff09; 问题一&#xff1a;如何避免因为某个…...

EDI 如何与 ERP,CRM,WMS等系统集成

在数字化浪潮下&#xff0c;与制造供应链相关产业正加速向智能化供应链转型。传统人工处理订单、库存和物流的方式已难以满足下单客户对响应速度和数据准确性的严苛要求。EDI技术作为企业间数据交换的核心枢纽&#xff0c;其与ERP、CRM、WMS等业务系统的深度集成&#xff0c;成…...

面试踩过的坑

1、 “”和equals 的区别 “”是运算符&#xff0c;如果是基本数据类型&#xff0c;则比较存储的值&#xff1b;如果是引用数据类型&#xff0c;则比较所指向对象的地址值。equals是Object的方法&#xff0c;比较的是所指向的对象的地址值&#xff0c;一般情况下&#xff0c;重…...

论文阅读:2024 ACL ArtPrompt: ASCII Art-based Jailbreak Attacks against Aligned LLMs

总目录 大模型安全相关研究&#xff1a;https://blog.csdn.net/WhiffeYF/article/details/142132328 Artprompt: Ascii art-based jailbreak attacks against aligned llms https://www.doubao.com/chat/3846685176618754 https://arxiv.org/pdf/2402.11753 https://github…...

多物理场耦合低温等离子体装置求解器PASSKEy2

文章目录 PASSKEy2简介PASSKEY2计算流程PASSKEy2 中求解的物理方程电路模型等离子体模型燃烧模型 PASSKEy2的使用 PASSKEy2简介 PASSKEy2 是在 PASSKEy1 的基础上重新编写的等离子体数值模拟程序。 相较于 PASSKEy1&#xff0c; PASSKEy2 在具备解决低温等离子体模拟问题的能力…...

视频噪点多,如何去除画面噪点?

你是否遇到过这样的困扰&#xff1f;辛辛苦苦拍摄的视频&#xff0c;导出后却满屏 “雪花”&#xff0c;夜景变 “噪点盛宴”&#xff0c;低光环境秒变 “马赛克现场”&#xff1f; 无论是日常拍摄的vlog、珍贵的家庭录像&#xff0c;还是专业制作的影视作品&#xff0c;噪点问…...

09前端项目----分页功能

分页功能 分页器的优点实现分页功能自定义分页器先实现静态分页器调试分页器动态数据/交互 Element UI组件 分页器的优点 电商平台同时展示的数据很多&#xff0c;所以采用分页功能实现分页功能 Element UI已经有封装好的组件&#xff0c;但是也要掌握原理&#xff0c;以及自定…...

第十二届蓝桥杯 2021 C/C++组 直线

目录 题目&#xff1a; 题目描述&#xff1a; 题目链接&#xff1a; 思路&#xff1a; 核心思路&#xff1a; 两点确定一条直线&#xff1a; 思路详解&#xff1a; 代码&#xff1a; 第一种方式代码详解&#xff1a; 第二种方式代码详解&#xff1a; 题目&#xff1a;…...

《Piper》皮克斯技术解析:RIS系统与云渲染如何创造奥斯卡级动画短片

本文由专业专栏作家 Mike Seymour 撰写&#xff0c;内容包含非常有价值的行业资讯。 译者注 《Piper》是皮克斯动画工作室的一部技术突破性的短片&#xff0c;讲述了一只小鸟在海滩上寻找食物并面对自然挑战的故事。它不仅凭借其精美的视觉效果和细腻的情感表达赢得了2017年奥…...

Java在excel中导出动态曲线图DEMO

1、环境 JDK8 POI 5.2.3 Springboot2.7 2、DEMO pom <dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>5.2.3</version></dependency><dependency><groupId>commons…...

第19章:Multi-Agent多智能体系统介绍

第19章:Multi-Agent多智能体系统介绍 欢迎来到多智能体系统 (Multi-Agent System, MAS) 的世界!在之前的章节中,我们深入探讨了单个 AI Agent 的构建,特别是结合了记忆、上下文和规划能力的 MCP 框架。然而,现实世界中的许多复杂问题往往需要多个智能体协同工作才能有效解…...

Kotlin Multiplatform--02:项目结构进阶

Kotlin Multiplatform--02&#xff1a;项目结构进阶 引言正文 引言 在上一章中&#xff0c;我们对 Kotlin Multiplatform 项目有了基本的了解&#xff0c;已经可以进行开发了。但我们只是使用了系统默认的项目结构。本章介绍了如何进行更复杂的项目结构管理。 正文 在上一章中&…...

Spring Cloud Gateway配置双向SSL认证(完整指南)

本文将详细介绍如何为Spring Cloud Gateway配置双向SSL认证,包括证书生成、配置和使用。 目录结构 /my-gateway-project ├── /certs │ ├── ca.crt # 根证书 │ ├── ca.key # 根私钥 │ ├── gateway.crt # 网关证书 │ ├── …...

Windows同步技术-使用命名对象

在 Windows 系统下使用命名对象&#xff08;如互斥体、事件、信号量、文件映射等内核对象&#xff09;时&#xff0c;需注意以下关键要点&#xff1a; 命名规则 唯一性&#xff1a;名称需全局唯一&#xff0c;避免与其他应用或系统对象冲突&#xff0c;建议使用 GUID 或应用专…...

代码随想录算法训练营第五十八天 | 1.拓扑排序精讲 2.dijkstra(朴素版)精讲 卡码网117.网站构建 卡码网47.参加科学大会

1.拓扑排序精讲 题目链接&#xff1a;117. 软件构建 文章讲解&#xff1a;代码随想录 思路&#xff1a; 把有向无环图进行线性排序的算法都可以叫做拓扑排序。 实现拓扑排序的算法有两种&#xff1a;卡恩算法&#xff08;BFS&#xff09;和DFS&#xff0c;以下BFS的实现思…...

linux ptrace 图文详解(七) gdb、strace跟踪系统调用

目录 一、gdb/strace 跟踪程序系统调用 二、实现原理 三、代码实现 四、总结 &#xff08;代码&#xff1a;linux 6.3.1&#xff0c;架构&#xff1a;arm64&#xff09; One look is worth a thousand words. —— Tess Flanders 相关链接&#xff1a; linux ptrace 图…...

【前端】ES6 引入的异步编程解决方案Promise 详解

Promise 详解 1. 基本概念 定义&#xff1a;Promise 是 ES6 引入的异步编程解决方案&#xff0c;表示一个异步操作的最终完成&#xff08;或失败&#xff09;及其结果值。核心作用&#xff1a;替代回调函数&#xff0c;解决“回调地狱”问题&#xff0c;提供更清晰的异步流程控…...

常见正则表达式整理与Java使用正则表达式的例子

一、常见正则表达式整理 1. 基础验证类 邮箱地址 ^[a-zA-Z0-9._%-][a-zA-Z0-9.-]\\.[a-zA-Z]{2,}$ &#xff08;匹配如 userexample.com&#xff09;手机号 ^1[3-9]\\\\d{9}$ &#xff08;匹配国内11位手机号&#xff0c;如 13812345678&#xff09;中文字符 ^[\u4e00-\u9fa5…...