【Easylive】视频在线人数统计系统实现详解 WebSocket 及其在在线人数统计中的应用
【Easylive】项目常见问题解答(自用&持续更新中…) 汇总版
视频在线人数统计系统实现详解
1. 系统架构概述
您实现的是一个基于Redis的视频在线人数统计系统,主要包含以下组件:
- 心跳上报接口:客户端定期调用以维持在线状态
- Redis存储结构:使用两种键存储在线信息
- 过期监听机制:通过Redis的键过期事件自动减少在线人数
- 计数维护逻辑:确保在线人数的准确性
2. 核心实现细节
2.1 数据结构设计
系统使用了两种Redis键:
-
用户播放键 (userPlayOnlineKey)
• 格式:video:play:user:{fileId}:{deviceId}
• 作用:标记特定设备是否在线
• 过期时间:8秒 -
在线计数键 (playOnlineCountKey)
• 格式:video:play:online:{fileId}
• 作用:存储当前视频的在线人数
• 过期时间:10秒
2.2 心跳上报流程 (reportVideoPlayOnline)
public Integer reportVideoPlayOnline(String fileId, String deviceId) {// 构造Redis键String userPlayOnlineKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER, fileId, deviceId);String playOnlineCountKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId);// 新用户上线处理if (!redisUtils.keyExists(userPlayOnlineKey)) {// 设置用户键(8秒过期)redisUtils.setex(userPlayOnlineKey, fileId, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);// 增加在线计数(10秒过期)return redisUtils.incrementex(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10).intValue();}// 已有用户续期处理redisUtils.expire(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10);redisUtils.expire(userPlayOnlineKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);// 返回当前在线人数Integer count = (Integer) redisUtils.get(playOnlineCountKey);return count == null ? 1 : count;
}
工作流程:
- 客户端每5-7秒调用一次
/reportVideoPlayOnline接口 - 服务端检查用户键是否存在:
• 不存在:创建用户键(8秒过期),增加计数键(10秒过期)
• 存在:续期两个键的过期时间 - 返回当前在线人数
2.3 过期监听机制 (RedisKeyExpirationListener)
@Override
public void onMessage(Message message, byte[] pattern) {String key = message.toString();// 只处理用户播放键的过期事件if (!key.startsWith(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE_PREIFX + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX)) {return;}// 从key中提取fileIdInteger userKeyIndex = key.indexOf(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX) + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX.length();String fileId = key.substring(userKeyIndex, userKeyIndex + Constants.LENGTH_20);// 减少对应视频的在线计数redisComponent.decrementPlayOnlineCount(String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId));
}
工作流程:
- Redis在用户键(8秒)过期时发送通知
- 监听器收到通知后:
• 验证是否为用户播放键
• 从键名中提取视频ID(fileId)
• 减少对应视频的在线计数
2.4 计数递减逻辑 (decrementPlayOnlineCount)
public void decrementPlayOnlineCount(String key) {redisUtils.decrement(key);
}
作用:简单地减少指定键的计数值
3. 关键设计原理
3.1 双键设计的意义
-
用户播放键:
• 作为"心跳"存在的证据
• 过期时间(8秒)短于计数键(10秒),确保先检测到用户离线 -
在线计数键:
• 集中存储当前在线人数
• 稍长的过期时间防止误删
3.2 时间参数设计
• 8秒用户键过期:假设客户端每5-7秒上报一次,8秒确保能检测到中断
• 10秒计数键过期:比用户键多2秒,防止竞态条件
• 客户端上报频率:建议5-7秒一次,平衡准确性和服务器压力
3.3 容错机制
- 计数键续期:每次心跳都会延长计数键的过期时间
- 空值处理:当计数键不存在时返回1作为默认值
- 精确递减:只在用户键过期时才减少计数,避免重复递减
4. 工作流程图
5. 系统优势
- 实时性高:秒级检测用户离线
- 性能优异:完全基于Redis内存操作
- 扩展性强:轻松支持大量并发用户
- 准确可靠:双重验证机制防止误计数
- 资源节约:自动清理不活跃用户的记录
6. 潜在优化方向
- 批量上报:允许客户端一次上报多个视频的状态
- 分布式锁:在极高并发下保证计数准确
- 异常处理:增加Redis操作失败的重试机制
- 监控指标:添加在线人数变化的监控和报警
- 动态过期:根据系统负载动态调整过期时间
这个实现很好地平衡了准确性、实时性和性能要求,是一个非常典型的在线人数统计解决方案。
WebSocket 及其在在线人数统计中的应用
WebSocket 基础介绍
什么是 WebSocket?
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。
与传统 HTTP 轮询的区别
| 特性 | WebSocket | HTTP 轮询 |
|---|---|---|
| 连接方式 | 持久化连接 | 每次请求新建连接 |
| 通信方向 | 全双工 | 半双工 |
| 实时性 | 毫秒级 | 依赖轮询间隔(通常秒级) |
| 服务器推送 | 支持 | 不支持 |
| 资源消耗 | 连接初期开销大,后期开销小 | 每次请求都有完整HTTP开销 |
| 适用场景 | 高实时性应用 | 实时性要求不高的应用 |
基于 WebSocket 的在线人数统计实现
系统架构设计
客户端A ──┐├─── WebSocket 服务器 ─── Redis 集群
客户端B ──┘ ││数据库(持久化)
核心实现代码
1. WebSocket 服务端实现 (Spring Boot)
@ServerEndpoint("/online/{videoId}")
@Component
public class VideoOnlineEndpoint {private static ConcurrentMap<String, Set<Session>> videoSessions = new ConcurrentHashMap<>();private static RedisTemplate<String, String> redisTemplate;@Autowiredpublic void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {VideoOnlineEndpoint.redisTemplate = redisTemplate;}@OnOpenpublic void onOpen(Session session, @PathParam("videoId") String videoId) {// 添加会话到视频组videoSessions.computeIfAbsent(videoId, k -> ConcurrentHashMap.newKeySet()).add(session);// 更新Redis计数String redisKey = "video:online:" + videoId;redisTemplate.opsForValue().increment(redisKey);redisTemplate.expire(redisKey, 10, TimeUnit.MINUTES);// 广播更新后的在线人数broadcastOnlineCount(videoId);}@OnClosepublic void onClose(Session session, @PathParam("videoId") String videoId) {// 从视频组移除会话Set<Session> sessions = videoSessions.get(videoId);if (sessions != null) {sessions.remove(session);// 更新Redis计数String redisKey = "video:online:" + videoId;redisTemplate.opsForValue().decrement(redisKey);// 广播更新后的在线人数broadcastOnlineCount(videoId);}}@OnErrorpublic void onError(Session session, Throwable error) {error.printStackTrace();}private void broadcastOnlineCount(String videoId) {String count = redisTemplate.opsForValue().get("video:online:" + videoId);String message = "ONLINE_COUNT:" + (count != null ? count : "0");Set<Session> sessions = videoSessions.get(videoId);if (sessions != null) {sessions.forEach(session -> {try {session.getBasicRemote().sendText(message);} catch (IOException e) {e.printStackTrace();}});}}
}
2. 客户端实现 (JavaScript)
const videoId = '12345'; // 当前观看的视频ID
const socket = new WebSocket(`wss://yourdomain.com/online/${videoId}`);// 连接建立时
socket.onopen = function(e) {console.log("WebSocket连接已建立");
};// 接收消息
socket.onmessage = function(event) {if(event.data.startsWith("ONLINE_COUNT:")) {const count = event.data.split(":")[1];updateOnlineCountDisplay(count);}
};// 连接关闭时
socket.onclose = function(event) {if (event.wasClean) {console.log(`连接正常关闭,code=${event.code} reason=${event.reason}`);} else {console.log('连接异常断开');// 尝试重新连接setTimeout(() => connectWebSocket(), 5000);}
};// 错误处理
socket.onerror = function(error) {console.log(`WebSocket错误: ${error.message}`);
};function updateOnlineCountDisplay(count) {document.getElementById('online-count').innerText = count;
}
3. 心跳机制实现
// 客户端心跳
setInterval(() => {if(socket.readyState === WebSocket.OPEN) {socket.send("HEARTBEAT");}
}, 30000); // 30秒发送一次心跳// 服务端心跳检测 (Java)
@ServerEndpoint配置中添加:
@OnMessage
public void onMessage(Session session, String message) {if("HEARTBEAT".equals(message)) {session.getAsyncRemote().sendText("HEARTBEAT_ACK");}
}
方案优势分析
-
实时性极佳
• 在线人数变化可实时推送到所有客户端
• 无轮询延迟,通常达到毫秒级更新 -
精确计数
• 基于实际连接状态计数
• 避免Redis过期时间的估算误差 -
扩展功能容易
• 可轻松扩展实现弹幕、实时评论等功能
• 支持复杂的互动场景 -
减少无效请求
• 相比HTTP轮询减少90%以上的请求量
• 显著降低服务器压力
潜在挑战与解决方案
1. 连接保持问题
问题:移动网络不稳定导致频繁断开
解决方案:
• 实现自动重连机制
• 使用心跳包检测连接状态
• 设置合理的超时时间
2. 大规模并发问题
问题:单视频热点导致连接数激增
解决方案:
• 使用WebSocket集群
• 引入负载均衡(如Nginx)
• 实现连接分片策略
3. 状态同步问题
问题:集群环境下状态同步
解决方案:
• 使用Redis Pub/Sub同步各节点状态
• 采用一致性哈希分配连接
• 实现分布式会话管理
性能优化建议
-
协议优化
• 启用WebSocket压缩扩展
• 使用二进制协议替代文本协议 -
资源控制
• 实现连接数限制
• 设置单个IP连接限制 -
监控体系
• 建立连接数监控
• 实现异常连接报警 -
优雅降级
• WebSocket不可用时自动降级为长轮询
• 提供兼容性方案
与传统方案的对比
| 指标 | WebSocket方案 | Redis键过期方案 |
|---|---|---|
| 实时性 | 毫秒级 | 秒级(依赖过期时间) |
| 精确度 | 100%准确 | 有1-2秒延迟 |
| 实现复杂度 | 较高 | 较低 |
| 服务器负载 | 连接初期高,维持期低 | 持续中等负载 |
| 扩展性 | 容易扩展其他实时功能 | 仅限于计数 |
| 客户端兼容性 | 需现代浏览器支持 | 所有环境兼容 |
| 移动端表现 | 可能因网络切换断开 | 不受影响 |
适用场景建议
推荐使用WebSocket方案当:
• 需要实时显示精确在线人数
• 已经使用或计划使用WebSocket实现其他功能(如弹幕、聊天)
• 客户端环境可控(如自己的APP或现代浏览器)
• 有足够资源维护WebSocket基础设施
推荐保持Redis方案当:
• 实时性要求不是极高(秒级可接受)
• 需要支持老旧客户端
• 系统规模较小,希望简单维护
• 主要关注计数而非实时交互
混合方案设计
结合两种方案优势的折中实现:
// WebSocket连接时更新精确计数
@OnOpen
public void onOpen(Session session, @PathParam("videoId") String videoId) {// 更新内存中的精确计数incrementLocalCount(videoId);// 每10秒同步到Redis一次if(needSyncToRedis(videoId)) {redisTemplate.opsForValue().set("video:online:" + videoId, getLocalCount(videoId).toString());}
}// 对外提供查询接口
@GetMapping("/online/{videoId}")
public int getOnlineCount(@PathVariable String videoId) {// 优先返回本地精确计数Integer localCount = getLocalCount(videoId);if(localCount != null) {return localCount;}// 回退到Redis计数String count = redisTemplate.opsForValue().get("video:online:" + videoId);return count != null ? Integer.parseInt(count) : 0;
}
这种混合方案:
• 对WebSocket客户端提供精确计数
• 对非WebSocket客户端提供近似的Redis计数
• 平衡了精确性和兼容性
查看在线观看人数
通过轮询上报心跳,在服务端记录设备有没有不停地上报心跳,如果没有上报心跳,通过 Redis 的 key 的失效,会有一个通知没有再上报心跳,就会把在线人数 -1。
Redis在线人数统计实现详解
以下是带有详细注释的代码实现,解释了基于Redis的在线人数统计系统的工作原理:
/*** 客户端上报心跳接口* @param fileId 视频文件ID* @param deviceId 设备唯一标识* @return 当前在线人数*/
@RequestMapping("/reportVideoPlayOnline")
public ResponseVO reportVideoPlayOnline(@NotEmpty String fileId, @NotEmpty String deviceId){// 调用Redis组件处理心跳上报,并返回成功响应return getSuccessResponseVO(redisComponent.reportVideoPlayOnline(fileId, deviceId));
}/*** 处理视频在线人数统计的核心方法* @param fileId 视频文件ID* @param deviceId 设备唯一标识* @return 当前在线人数*/
public Integer reportVideoPlayOnline(String fileId, String deviceId){// 构建Redis键:用户级别的键,用于标记特定设备是否在线String userPlayOnlineKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER, fileId, deviceId);// 构建Redis键:视频级别的键,用于存储当前视频的总在线人数String playOnlineCountKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId);// 检查是否是新的观看用户(该设备首次上报或已过期)if (!redisUtils.keyExists(userPlayOnlineKey)) {// 设置用户键,8秒后过期(如果8秒内没有下次心跳,则认为离线)redisUtils.setex(userPlayOnlineKey, fileId, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);// 增加视频的总在线人数计数,并设置10秒过期时间return redisUtils.incrementex(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10).intValue();}// 以下是已有用户的处理逻辑:// 续期视频的总在线人数键(10秒)redisUtils.expire(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10);// 续期用户级别的键(8秒)redisUtils.expire(userPlayOnlineKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);// 获取当前在线人数(防止并发问题导致的计数不准确)Integer count = (Integer) redisUtils.get(playOnlineCountKey);// 如果获取不到计数(极端情况),默认返回1return count == null ? 1 : count;
}/*** 减少在线人数计数* @param key 需要减少计数的Redis键*/
public void decrementPlayOnlineCount(String key) {// 对指定键的值进行原子递减redisUtils.decrement(key);
}/*** Redis键过期监听器,用于处理用户离线情况*/
@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {@Resourceprivate RedisComponent redisComponent;public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/*** 处理Redis键过期事件* @param message 过期消息* @param pattern 模式*/@Overridepublic void onMessage(Message message, byte[] pattern) {// 获取过期的键名String key = message.toString();// 只处理用户级别的在线状态键过期事件if (!key.startsWith(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE_PREIFX + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX)) {return;}// 从键名中提取视频ID// 计算用户键前缀的长度Integer userKeyIndex = key.indexOf(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX) + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX.length();// 截取视频ID(假设ID长度为20)String fileId = key.substring(userKeyIndex, userKeyIndex + Constants.LENGTH_20);// 减少对应视频的在线人数计数redisComponent.decrementPlayOnlineCount(String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId));}
}
系统工作流程详解
-
心跳上报机制:
• 客户端每隔5-7秒调用/reportVideoPlayOnline接口上报心跳
• 服务端通过Redis记录设备最后一次活跃时间 -
双键设计原理:
• 用户键(userPlayOnlineKey)
◦ 格式:video:play:user:{fileId}:{deviceId}
◦ 作用:标记特定设备是否在线
◦ 过期时间:8秒(如果8秒内没有心跳则认为离线)
• 计数键(playOnlineCountKey)
◦ 格式:video:play:online:{fileId}
◦ 作用:存储当前视频的总在线人数
◦ 过期时间:10秒(比用户键稍长,防止竞态条件) -
新用户上线处理:
if (!redisUtils.keyExists(userPlayOnlineKey)) {redisUtils.setex(userPlayOnlineKey, fileId, 8秒);return redisUtils.incrementex(playOnlineCountKey, 10秒); }• 当用户键不存在时,创建用户键并增加总计数
-
已有用户续期处理:
redisUtils.expire(playOnlineCountKey, 10秒); redisUtils.expire(userPlayOnlineKey, 8秒);• 续期两个键的过期时间,保持活跃状态
-
离线检测机制:
• 当用户键8秒过期时,触发RedisKeyExpirationListener
• 监听器从键名提取videoId,减少对应视频的在线计数 -
容错处理:
Integer count = (Integer) redisUtils.get(playOnlineCountKey); return count == null ? 1 : count;• 防止极端情况下计数键丢失,返回默认值1
设计优势分析
- 精确计数:基于实际心跳而非估算,结果准确
- 自动清理:通过Redis过期机制自动清理不活跃用户
- 低延迟:键过期通知机制实现秒级离线检测
- 高性能:完全基于内存操作,无数据库IO
- 可扩展:Redis集群支持横向扩展
关键参数说明
| 参数 | 值 | 说明 |
|---|---|---|
| 用户键过期时间 | 8秒 | 客户端应每5-7秒上报一次心跳 |
| 计数键过期时间 | 10秒 | 比用户键稍长,防止竞态条件 |
| 视频ID长度 | 20 | 需与业务系统保持一致 |
这个实现方案在保证准确性的同时,具有优秀的性能和可扩展性,非常适合中小规模的实时在线人数统计场景。
自看
通过Redis计数器来给视频的在线观看人数进行增加和减少,也就是通过心跳来不停上报当前用户是否正在观看,当浏览器关闭时,该用户就不会再持续上报心跳,此时该用户的Redis Key则会失效,Redis Key失效的时候会发送消息通知,根据这个消息通知得知失效,再去减少在线观看人数。
Netty与视频在线人数统计的结合
Netty基础介绍
Netty是一个异步事件驱动的网络应用框架,用于快速开发高性能、高可靠性的网络服务器和客户端程序。它基于Java NIO(Non-blocking I/O)构建,主要特点包括:
- 高性能:支持百万级并发连接
- 低延迟:非阻塞I/O模型减少等待时间
- 高扩展性:模块化设计,可灵活扩展
- 协议支持:内置HTTP、WebSocket、TCP/UDP等协议支持
为什么考虑用Netty实现在线人数统计?
当前基于HTTP轮询+Redis的实现存在以下可优化点:
• HTTP开销大:每次轮询都需要完整的HTTP请求/响应头
• 实时性有限:依赖轮询间隔(通常秒级)
• 服务器压力:高并发时大量无效轮询请求
Netty可以解决这些问题,提供真正的实时通信能力。
基于Netty的在线人数统计设计
系统架构
客户端App/Web ──▶ Netty服务器集群 ──▶ Redis集群││ (WebSocket/TCP长连接)▼
用户行为数据(心跳、上下线)
核心组件实现
1. Netty服务器初始化
public class VideoOnlineServer {public void start(int port) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 心跳检测(15秒无读写则关闭连接)pipeline.addLast("idleStateHandler", new IdleStateHandler(15, 0, 0, TimeUnit.SECONDS));// 自定义协议解码/编码pipeline.addLast("decoder", new OnlineMessageDecoder());pipeline.addLast("encoder", new OnlineMessageEncoder());// 业务逻辑处理器pipeline.addLast("handler", new OnlineMessageHandler());}});ChannelFuture f = b.bind(port).sync();f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
}
2. 消息处理器实现
public class OnlineMessageHandler extends SimpleChannelInboundHandler<OnlineMessage> {// 视频ID到Channel组的映射private static Map<String, ChannelGroup> videoGroups = new ConcurrentHashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, OnlineMessage msg) {switch (msg.getType()) {case CONNECT: // 连接初始化handleConnect(ctx, msg.getVideoId(), msg.getDeviceId());break;case HEARTBEAT: // 心跳handleHeartbeat(ctx, msg.getVideoId(), msg.getDeviceId());break;case DISCONNECT: // 主动断开handleDisconnect(ctx, msg.getVideoId(), msg.getDeviceId());break;}}private void handleConnect(ChannelHandlerContext ctx, String videoId, String deviceId) {// 加入视频频道组ChannelGroup group = videoGroups.computeIfAbsent(videoId, k -> new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));group.add(ctx.channel());// 更新Redis计数long count = RedisUtils.increment("video:online:" + videoId);// 广播新在线人数broadcastCount(videoId, count);}private void handleHeartbeat(ChannelHandlerContext ctx, String videoId, String deviceId) {// 更新设备最后活跃时间(Redis)RedisUtils.setex("device:active:" + videoId + ":" + deviceId, "1", 15); // 15秒过期// 可选择性返回当前人数ctx.writeAndFlush(new OnlineMessage(HEARTBEAT_ACK, getOnlineCount(videoId)));}
}
3. 客户端断连处理
@Override
public void channelInactive(ChannelHandlerContext ctx) {// 从所有视频组中移除该ChannelvideoGroups.values().forEach(group -> group.remove(ctx.channel()));// 更新Redis计数(需要维护设备到视频ID的映射)String deviceId = getDeviceId(ctx.channel());String videoId = getVideoId(ctx.channel());long count = RedisUtils.decrement("video:online:" + videoId);// 广播新人数broadcastCount(videoId, count);
}@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {// 处理空闲连接if (evt instanceof IdleStateEvent) {ctx.close(); // 关闭超时未心跳的连接}
}
与传统方案的对比
| 特性 | Netty实现方案 | HTTP轮询+Redis方案 |
|---|---|---|
| 实时性 | 毫秒级 | 依赖轮询间隔(通常秒级) |
| 协议开销 | 仅心跳数据(几十字节) | 完整HTTP头(通常几百字节) |
| 服务器压力 | 长连接维护,无重复握手 | 每次轮询都新建连接 |
| 并发能力 | 单机支持10万+连接 | 受限于HTTP服务器性能 |
| 实现复杂度 | 较高 | 简单 |
| 移动网络适应性 | 需处理频繁重连 | 天然适应 |
关键设计考虑
-
连接管理
• 使用ChannelGroup管理同视频的用户连接
•IdleStateHandler自动检测空闲连接 -
状态同步
• Redis存储全局计数,避免Netty单点问题
• 定期同步内存与Redis的数据 -
消息协议设计
message OnlineMessage {enum Type {CONNECT = 0;HEARTBEAT = 1;DISCONNECT = 2;}Type type = 1;string videoId = 2;string deviceId = 3;int64 count = 4; // 用于服务端返回当前人数 } -
弹性设计
• 客户端实现自动重连
• 服务端优雅降级机制
性能优化技巧
- 对象池化:重用消息对象减少GC
- 零拷贝:使用
CompositeByteBuf合并小数据包 - 事件循环:业务逻辑放入单独线程池
- 批量操作:合并Redis操作减少网络往返
适用场景建议
推荐使用Netty当:
• 需要真正的实时互动(如直播弹幕)
• 预期有超高并发(万级同时在线)
• 已经需要维护长连接(如游戏、IM)
保持当前方案当:
• 实时性要求不高
• 开发资源有限
• 客户端环境复杂(如需要支持老旧浏览器)
Netty方案虽然实现复杂度较高,但能为视频平台提供更实时、更高效的在线人数统计能力,并为未来扩展实时互动功能奠定基础。
Netty与WebSocket的关系及在实时统计中的应用
Netty和WebSocket是不同层次的技术,但它们可以紧密结合来构建高性能的实时通信系统。以下是它们的核心关系和在视频在线人数统计中的应用分析:
1. Netty与WebSocket的基础关系
| 维度 | Netty | WebSocket | 二者关系 |
|---|---|---|---|
| 定位 | 网络应用框架 | 通信协议 | Netty是实现WebSocket协议的底层框架之一 |
| 层级 | 传输层/应用层框架 | 应用层协议 | Netty提供了对WebSocket协议的支持 |
| 功能 | 处理TCP/UDP连接、编解码、并发等 | 提供全双工通信能力 | Netty帮助高效实现WebSocket的通信能力 |
| 典型使用 | 可作为WebSocket服务器的基础实现 | 运行在Netty等框架之上 | 开发者通过Netty API构建WebSocket服务 |
2. 技术栈组合原理
[WebSocket客户端] ←WebSocket协议→ [Netty WebSocket服务端] ←TCP→ [操作系统网络栈]
-
协议支持:
• Netty内置WebSocketServerProtocolHandler等组件
• 自动处理WebSocket握手、帧编解码等底层细节 -
性能优势:
• Netty的Reactor线程模型优化WebSocket连接管理
• 零拷贝技术提升WebSocket数据传输效率 -
扩展能力:
• 在WebSocket之上可添加自定义协议
• 方便集成SSL/TLS等安全层
3. 在视频在线统计中的联合实现
基于Netty的WebSocket服务端示例
public class VideoWebSocketServer {public void start(int port) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// HTTP编解码器(用于WebSocket握手)pipeline.addLast(new HttpServerCodec());// 聚合HTTP请求pipeline.addLast(new HttpObjectAggregator(65536));// WebSocket协议处理器pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));// 自定义业务处理器pipeline.addLast(new OnlineStatsHandler());}});ChannelFuture f = b.bind(port).sync();f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
}
在线统计业务处理器
public class OnlineStatsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {// 视频频道组映射private static Map<String, ChannelGroup> videoGroups = new ConcurrentHashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {// 解析JSON消息:{"action":"heartbeat","videoId":"123"}JsonObject json = parseJson(msg.text());String videoId = json.getString("videoId");ChannelGroup group = videoGroups.computeIfAbsent(videoId, k -> new DefaultChannelGroup(ctx.executor()));switch (json.getString("action")) {case "join":group.add(ctx.channel());broadcastCount(videoId, group.size());break;case "heartbeat":// 更新Redis活跃记录redis.incr("active:" + videoId + ":" + ctx.channel().id());break;}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {// 从所有组中移除并更新计数videoGroups.values().forEach(group -> {if (group.remove(ctx.channel())) {broadcastCount(getVideoId(ctx), group.size());}});}
}
4. 与传统HTTP轮询方案的对比
| 特性 | Netty+WebSocket | HTTP轮询 |
|---|---|---|
| 连接方式 | 1个持久连接 | 频繁新建连接 |
| 头部开销 | 握手后无冗余头 | 每次请求都带完整HTTP头 |
| 实时性 | 毫秒级 | 依赖轮询间隔(通常秒级) |
| 服务器压力 | 连接数×心跳频率 | 请求数×轮询频率 |
| 移动网络适应 | 需处理网络切换 | 天然适应 |
| 实现复杂度 | 较高 | 简单 |
5. 典型消息流程
-
连接建立:
客户端 → HTTP Upgrade请求 → Netty(完成WebSocket握手) → 建立持久连接 -
心跳维持:
// 客户端每10秒发送 {"action":"heartbeat","videoId":"123","timestamp":1620000000}// 服务端响应 {"type":"ack","online":1524} -
人数推送:
// 服务端主动推送 {"type":"stats","videoId":"123","online":1525,"change":1}
6. 性能优化关键点
-
连接管理:
• 使用ChannelGroup管理视频房间的订阅者
• 配置合理的IdleStateHandler检测死连接 -
序列化优化:
// 使用二进制协议代替JSON pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); -
集群扩展:
// 使用Redis Pub/Sub同步各节点状态 redis.subscribe("video:123", (channel, message) -> {broadcastToLocalClients(message); }); -
监控指标:
• 跟踪每个视频频道的连接数
• 监控消息吞吐量和延迟
Netty与WebSocket的结合为实时统计提供了高并发、低延迟的解决方案,特别适合需要精确到毫秒级的在线人数统计场景,同时为未来扩展实时弹幕、即时消息等功能奠定了基础。
相关文章:
【Easylive】视频在线人数统计系统实现详解 WebSocket 及其在在线人数统计中的应用
【Easylive】项目常见问题解答(自用&持续更新中…) 汇总版 视频在线人数统计系统实现详解 1. 系统架构概述 您实现的是一个基于Redis的视频在线人数统计系统,主要包含以下组件: 心跳上报接口:客户端定期调用以…...
tomcat 目录结构组成
文章目录 背景文件结构层级一些常用的路径 背景 现在非常多的 java web 服务部署在 linux 服务器中,我们服务器中的 tomcat 会有各种文件路径,看下它有哪些文件 文件结构层级 ├── bin/ # 核心脚本和启动文件 ├── conf/ # …...
苍穹外卖day12
课程内容 工作台 Apache POI 导出运营数据Excel报表 功能实现:工作台、数据导出 工作台效果图: 数据导出效果图: 在数据统计页面点击数据导出:生成Excel报表 1. 工作台 1.1 需求分析和设计 1.1.1 产品原型 工作台是系统运…...
Unity Final IK:下一代角色动画与物理交互的技术解析
引言:角色动画的范式转移 在传统游戏开发中,角色动画主要依赖于 前向动力学(Forward Kinematics, FK) 和预烘焙动画。然而,这种方法的局限性在开放世界、物理交互和VR等场景中愈发明显: 环境适应性差&…...
前端开发时的内存泄漏问题
目录 🔍 什么是内存泄漏(Memory Leak)?🚨 常见的内存泄漏场景1️⃣ 未清除的定时器(setInterval / setTimeout)2️⃣ 全局变量(变量未正确释放)3️⃣ 事件监听未清除4️⃣…...
【Feign】⭐️使用 openFeign 时传递 MultipartFile 类型的参数参考
💥💥✈️✈️欢迎阅读本文章❤️❤️💥💥 🏆本篇文章阅读大约耗时三分钟。 ⛳️motto:不积跬步、无以千里 📋📋📋本文目录如下:🎁🎁&a…...
Linux中动静态库的制作
1.什么是库 库是写好的现有的,成熟的,可以复⽤的代码。现实中每个程序都要依赖很多基础的底层库,不可能每个⼈的代码都从零开始,因此库的存在意义非同寻常。 本质上来说库是⼀种可执⾏代码的⼆进制形式,可以被操作系统…...
Docker部署sprintboot后端项目
创建Docker网络 docker network create icjs 部署Redis docker run -d \--network icjs \--name redis \-p 6379:6379 \redis:latest数据持久化 docker run --restartalways --network icjs -p 6379:6379 --name redis -v /opt/docker/redis/redis.conf:/etc/redis/redis.c…...
forms实现连连看
说明: forms实现连连看 效果图: step1:C:\Users\wangrusheng\RiderProjects\WinFormsApp2\WinFormsApp2\Form1.cs using System; using System.Collections.Generic; using System.Drawing; using System.Linq; using System.Windows.Forms;namespace …...
多视图几何--立体校正--Fusiello方法
1. 坐标系对齐与正交基构造 目标:构建新坐标系基向量 { e 1 , e 2 , e 3 } \{ \mathbf{e}_1, \mathbf{e}_2, \mathbf{e}_3 \} {e1,e2,e3},使成像平面共面且极线水平对齐。 (1) 基线方向 e 1 \mathbf{e}_1 e1 基线向量由左右相机光心平移向量…...
鸿蒙开发踩坑记录 - 2024S2
wrapBuilder如果想View和ObservedV2做绑定 必须要用 ComponentV2 Param 和 区别 退出两层循环 Builder的传入的参数及时是Trace修饰的也无法刷新组件 折叠屏展开后键盘无法点击 vm是公用的,组件生命周期问题导致 监听键盘高度变化失效 原因:分享面…...
【学Rust写CAD】21 2D 点(point.rs)
源码 //matrix/point.rs use std::ops::Mul; use super::algebraic_units::{Zero, One}; use super::generic::Matrix;/// 点坐标结构体 #[derive(Debug, Clone, Copy, PartialEq)] pub struct Point<X, Y>(Matrix<X, Y, One, Zero, Zero, One>);impl<X, Y>…...
0基础入门scrapy 框架,获取豆瓣top250存入mysql
一、基础教程 创建项目命令 scrapy startproject mySpider --项目名称 创建爬虫文件 scrapy genspider itcast "itcast.cn" --自动生成 itcast.py 文件 爬虫名称 爬虫网址 运行爬虫 scrapy crawl baidu(爬虫名) 使用终端运行太麻烦了,而且…...
鸿蒙NEXT小游戏开发:井字棋
1. 引言 井字棋是一款经典的两人对战游戏,简单易懂,适合各个年龄段的玩家。本文将介绍如何使用鸿蒙NEXT框架开发一个井字棋游戏,涵盖游戏逻辑、界面设计及AI对战功能。 2. 开发环境准备 电脑系统:windows 10 开发工具:…...
deep-sync开源程序插件导出您的 DeepSeek 与 public 聊天
一、软件介绍 文末提供下载 deep-sync开源程序插件导出您的 DeepSeek 与 public 聊天,这是一个浏览器扩展,它允许用户公开、私下分享他们的聊天对话,并使用密码或过期链接来增强 Deepseek Web UI。该扩展程序在 Deepseek 界面中添加了一个 “…...
4. 理解Prompt Engineering:如何让模型听懂你的需求
引言:当模型变成“实习生” 想象一下,你新招的实习生总把“帮我写份报告”理解为“做PPT”或“整理数据表”——这正是开发者与大模型对话的日常困境。某金融公司优化提示词后,合同审查准确率从72%飙升至94%。本文将用3个核心法则+5个行业案例,教你用Prompt Engineering让…...
网络编程—网络概念
目录 1 网络分类 1.1 局域网 1.2 广域网 2 常见网络概念 2.1 交换机 2.2 路由器 2.3 集线器 2.4 IP地址 2.5 端口号 2.6 协议 3 网络协议模型 3.1 OSI七层模型 3.2 TCP/IP五层模型 3.3 每层中常见的协议和作用 3.3.1 应用层 3.3.2 传输层 3.3.3 网络层 3.3.4…...
基于Rust与WebAssembly实现高性能前端计算
引言 随着Web应用的复杂性增加,前端开发者经常面临性能瓶颈。传统JavaScript在处理密集型计算任务(如大数据处理或实时图像渲染)时,往往显得力不从心。而Rust语言凭借其高性能和内存安全特性,结合WebAssembly的接近原生…...
MATLAB 代码学习
1. Cell数组 Cell数组用于存储异构数据,每个元素(称为cell)可以包含不同类型的数据(如数值、字符串、矩阵等)。 1.1 创建Cell数组 直接赋值:使用花括号{}定义内容。 students {Alice, 20, [85, 90, 78…...
SELinux
一、selinux技术详解 SELinux 概述 SELinux,即 Security-Enhanced Linux,意为安全强化的 Linux,由美国国家安全局(NSA)主导开发。开发初衷是防止系统资源被误用。在 Linux 系统中,系统资源的访问均通过程…...
Axios 相关的面试题
在跟着视频教程学习项目的时候使用了axios发送请求,但是只是跟着把代码粘贴上去,一些语法规则根本不太清楚,但是根据之前的博客学习了fetch了之后,一看axios的介绍就明白了。所以就直接展示axios的面试题吧 本文主要内容ÿ…...
Spring Cloud 跨云灾备:如何实现5分钟级区域切换?
引言:云原生时代,区域级故障的致命性与应对 在混合云与多云架构中,单个区域的宕机可能导致全局服务瘫痪(如2023年AWS美东区域故障影响超200家金融系统)。传统灾备方案依赖手动切换DNS或冷备集群,恢复时间长…...
ES6对函数参数的新设计
ES6 对函数参数进行了新的设计,主要添加了默认参数、不定参数和扩展参数: 不定参数和扩展参数可以认为恰好是相反的两个模式,不定参数是使用数组来表示多个参数,扩展参数则是将多个参数映射到一个数组。 需要注意:不定…...
爬虫【feapder框架】
feapder框架 1、简单介绍 简介 feapder上手简单、功能强大的Python爬虫框架,内置AirSpider、Spider、Task、Spider、BatchSpider四种爬虫解决不同场景的需求支持断点续爬、监控报警、浏览器渲染、海量数据去重等功能更有功能强大的爬虫管理系统feaplat为其提供方…...
python如何提取html中所有的图片链接
在Python中,你可以使用BeautifulSoup库来解析HTML内容,并提取其中所有的图片链接(即<img>标签的src属性)。以下是一个示例代码,展示了如何做到这一点: 首先,确保你已经安装了BeautifulSo…...
网络协议之系列
网络协议之基础介绍 。 网络协议之清空购物车时都发生了啥? 。...
LLaMA Factory微调后的大模型在vLLM框架中对齐对话模版
LLaMA Factory微调后的大模型Chat对话效果,与该模型使用vLLM推理架构中的对话效果,可能会出现不一致的情况。 下图是LLaMA Factory中的Chat的对话 下图是vLLM中的对话效果。 模型回答不稳定:有一半是对的,有一半是无关的。 1、未…...
群体智能优化算法-鹈鹕优化算法(Pelican Optimization Algorithm, POA,含Matlab源代码)
摘要 鹈鹕优化算法(Pelican Optimization Algorithm, POA)是一种灵感来自自然界鹈鹕觅食行为的元启发式优化算法。POA 模拟鹈鹕捕食的两个主要阶段:探索阶段和开发阶段。通过模拟鹈鹕追捕猎物的动态行为,该算法在全局探索和局部开…...
代理模式-spring关键设计模式,bean的增强,AOP的实现
以下是一个结合代理模式解决实际问题的Java实现案例,涵盖远程调用、缓存优化、访问控制等场景,包含逐行中文注释: 场景描述 开发一个跨网络的文件查看器,需实现: 远程文件访问:通过代理访问网络文件 缓存…...
前端实现单点登录(SSO)的方案
概念:单点登录(Single Sign-On, SSO)主要是在多个系统、多个浏览器或多个标签页之间共享登录状态,保证用户只需登录一次,就能访问多个关联应用,而不需要重复登录。 💡 方案分类 1. 前端级别 SS…...
