JavaScript系列(62)--实时通信系统实现详解
JavaScript实时通信系统实现详解 🔄
今天,让我们深入探讨JavaScript的实时通信系统实现。实时通信是现代Web应用中不可或缺的一部分,它能够提供即时的数据交互和更好的用户体验。
WebSocket通信基础 🌟
💡 小知识:WebSocket是HTML5开始提供的一种在单个TCP连接上进行全双工通讯的协议。相比HTTP,它能够提供持久连接和双向通信能力。
// 1. WebSocket连接管理器
class WebSocketManager {constructor(url, options = {}) {this.url = url;this.options = {reconnectAttempts: 5,reconnectDelay: 1000,heartbeatInterval: 30000,...options};this.connection = null;this.reconnectCount = 0;this.listeners = new Map();this.heartbeatTimer = null;}// 建立连接connect() {try {this.connection = new WebSocket(this.url);this.setupEventListeners();this.startHeartbeat();} catch (error) {this.handleConnectionError(error);}}// 设置事件监听器setupEventListeners() {this.connection.onopen = () => {this.reconnectCount = 0;this.emit('connected');};this.connection.onclose = () => {this.handleDisconnect();};this.connection.onerror = (error) => {this.handleConnectionError(error);};this.connection.onmessage = (event) => {this.handleMessage(event.data);};}// 启动心跳检测startHeartbeat() {this.heartbeatTimer = setInterval(() => {if (this.connection.readyState === WebSocket.OPEN) {this.send('heartbeat', { timestamp: Date.now() });}}, this.options.heartbeatInterval);}// 处理断开连接handleDisconnect() {this.stopHeartbeat();this.emit('disconnected');if (this.reconnectCount < this.options.reconnectAttempts) {setTimeout(() => {this.reconnectCount++;this.connect();}, this.options.reconnectDelay * this.reconnectCount);} else {this.emit('maxReconnectAttemptsReached');}}// 发送消息send(type, data) {if (this.connection.readyState !== WebSocket.OPEN) {throw new Error('Connection is not open');}const message = JSON.stringify({ type, data });this.connection.send(message);}// 停止心跳检测stopHeartbeat() {if (this.heartbeatTimer) {clearInterval(this.heartbeatTimer);this.heartbeatTimer = null;}}
}// 2. 消息处理器
class MessageHandler {constructor() {this.handlers = new Map();}// 注册消息处理器register(type, handler) {if (!this.handlers.has(type)) {this.handlers.set(type, new Set());}this.handlers.get(type).add(handler);}// 处理消息async handle(message) {const { type, data } = JSON.parse(message);const handlers = this.handlers.get(type);if (handlers) {const promises = Array.from(handlers).map(handler => handler(data));await Promise.all(promises);}}
}// 3. 重连管理器
class ReconnectionManager {constructor(options = {}) {this.options = {maxAttempts: 5,baseDelay: 1000,maxDelay: 30000,...options};this.attempts = 0;this.currentDelay = this.options.baseDelay;}// 计算下一次重连延迟getNextDelay() {const delay = Math.min(this.currentDelay * Math.pow(2, this.attempts),this.options.maxDelay);this.attempts++;return delay;}// 重置重连状态reset() {this.attempts = 0;this.currentDelay = this.options.baseDelay;}// 检查是否可以继续重连canReconnect() {return this.attempts < this.options.maxAttempts;}
}
消息队列系统 📨
// 1. 消息队列
class MessageQueue {constructor() {this.queue = [];this.processing = false;this.maxRetries = 3;}// 添加消息enqueue(message) {this.queue.push({message,retries: 0,timestamp: Date.now()});this.processQueue();}// 处理队列async processQueue() {if (this.processing || this.queue.length === 0) return;this.processing = true;while (this.queue.length > 0) {const item = this.queue[0];try {await this.processMessage(item.message);this.queue.shift();} catch (error) {if (item.retries < this.maxRetries) {item.retries++;// 移到队列末尾this.queue.push(this.queue.shift());} else {// 放入死信队列this.moveToDeadLetter(item);this.queue.shift();}}}this.processing = false;}// 移动到死信队列moveToDeadLetter(item) {// 实现死信队列逻辑}
}// 2. 优先级队列
class PriorityMessageQueue {constructor() {this.queues = new Map();this.priorities = ['high', 'medium', 'low'];}// 添加消息enqueue(message, priority = 'medium') {if (!this.queues.has(priority)) {this.queues.set(priority, []);}this.queues.get(priority).push({message,timestamp: Date.now()});}// 获取下一个消息dequeue() {for (const priority of this.priorities) {const queue = this.queues.get(priority);if (queue && queue.length > 0) {return queue.shift();}}return null;}
}// 3. 消息持久化管理器
class MessagePersistenceManager {constructor() {this.storage = new Map();this.initStorage();}// 初始化存储async initStorage() {try {const stored = localStorage.getItem('message_queue');if (stored) {const data = JSON.parse(stored);this.storage = new Map(Object.entries(data));}} catch (error) {console.error('Failed to initialize storage:', error);}}// 保存消息async persistMessage(id, message) {this.storage.set(id, {message,timestamp: Date.now()});await this.saveToStorage();}// 保存到存储async saveToStorage() {try {const data = Object.fromEntries(this.storage);localStorage.setItem('message_queue', JSON.stringify(data));} catch (error) {console.error('Failed to save to storage:', error);}}
}
实时数据同步 🔄
// 1. 实时数据同步器
class RealtimeDataSync {constructor(options = {}) {this.options = {syncInterval: 1000,batchSize: 100,...options};this.changes = new Map();this.syncTimer = null;}// 记录变更recordChange(key, value) {this.changes.set(key, {value,timestamp: Date.now()});this.scheduleSyncIfNeeded();}// 调度同步scheduleSyncIfNeeded() {if (!this.syncTimer && this.changes.size > 0) {this.syncTimer = setTimeout(() => {this.performSync();}, this.options.syncInterval);}}// 执行同步async performSync() {const batch = this.prepareSyncBatch();if (batch.size > 0) {try {await this.sendChanges(batch);this.clearSyncedChanges(batch);} catch (error) {this.handleSyncError(error);}}this.syncTimer = null;this.scheduleSyncIfNeeded();}// 准备同步批次prepareSyncBatch() {const batch = new Map();let count = 0;for (const [key, value] of this.changes) {if (count >= this.options.batchSize) break;batch.set(key, value);count++;}return batch;}
}// 2. 冲突解决器
class ConflictResolver {constructor() {this.strategies = new Map();this.setupDefaultStrategies();}// 设置默认策略setupDefaultStrategies() {this.strategies.set('lastWriteWins', (local, remote) => {return local.timestamp > remote.timestamp ? local : remote;});this.strategies.set('merge', (local, remote) => {return {...local,...remote,timestamp: Math.max(local.timestamp, remote.timestamp)};});}// 解决冲突resolve(local, remote, strategy = 'lastWriteWins') {const resolver = this.strategies.get(strategy);if (!resolver) {throw new Error(`Unknown strategy: ${strategy}`);}return resolver(local, remote);}
}// 3. 版本控制管理器
class VersionManager {constructor() {this.versions = new Map();this.history = new Map();}// 更新版本updateVersion(key, value) {const currentVersion = this.versions.get(key) || 0;const newVersion = currentVersion + 1;this.versions.set(key, newVersion);this.recordHistory(key, value, newVersion);return newVersion;}// 记录历史recordHistory(key, value, version) {if (!this.history.has(key)) {this.history.set(key, new Map());}const keyHistory = this.history.get(key);keyHistory.set(version, {value,timestamp: Date.now()});}// 获取特定版本getVersion(key, version) {const keyHistory = this.history.get(key);if (!keyHistory) return null;return keyHistory.get(version);}
}
性能优化策略 ⚡
// 1. 消息压缩器
class MessageCompressor {constructor() {this.compressionThreshold = 1024; // 1KB}// 压缩消息async compress(message) {if (typeof message !== 'string') {message = JSON.stringify(message);}if (message.length < this.compressionThreshold) {return message;}const msgBuffer = new TextEncoder().encode(message);const compressed = await gzip(msgBuffer);return compressed;}// 解压消息async decompress(data) {if (!(data instanceof Uint8Array)) {return data;}const decompressed = await ungzip(data);return new TextDecoder().decode(decompressed);}
}// 2. 批处理优化器
class BatchProcessor {constructor(options = {}) {this.options = {maxBatchSize: 100,maxWaitTime: 1000,...options};this.batch = [];this.timer = null;}// 添加项目到批处理add(item) {this.batch.push(item);if (this.batch.length >= this.options.maxBatchSize) {this.flush();} else if (!this.timer) {this.timer = setTimeout(() => this.flush(), this.options.maxWaitTime);}}// 刷新批处理async flush() {if (this.timer) {clearTimeout(this.timer);this.timer = null;}if (this.batch.length === 0) return;const items = [...this.batch];this.batch = [];await this.processBatch(items);}
}// 3. 连接池管理器
class ConnectionPool {constructor(options = {}) {this.options = {maxConnections: 5,idleTimeout: 30000,...options};this.connections = new Set();this.idle = new Set();}// 获取连接async getConnection() {let connection;if (this.idle.size > 0) {connection = this.idle.values().next().value;this.idle.delete(connection);} else if (this.connections.size < this.options.maxConnections) {connection = await this.createConnection();this.connections.add(connection);} else {throw new Error('Connection pool exhausted');}return connection;}// 释放连接releaseConnection(connection) {if (this.connections.has(connection)) {this.idle.add(connection);setTimeout(() => {if (this.idle.has(connection)) {this.closeConnection(connection);}}, this.options.idleTimeout);}}
}
安全性考虑 🔒
// 1. 消息加密器
class MessageEncryptor {constructor() {this.keyPair = null;this.initializeKeyPair();}// 初始化密钥对async initializeKeyPair() {this.keyPair = await window.crypto.subtle.generateKey({name: 'RSA-OAEP',modulusLength: 2048,publicExponent: new Uint8Array([1, 0, 1]),hash: 'SHA-256'},true,['encrypt', 'decrypt']);}// 加密消息async encrypt(message) {const encoded = new TextEncoder().encode(typeof message === 'string' ? message : JSON.stringify(message));return window.crypto.subtle.encrypt({name: 'RSA-OAEP'},this.keyPair.publicKey,encoded);}// 解密消息async decrypt(encrypted) {const decrypted = await window.crypto.subtle.decrypt({name: 'RSA-OAEP'},this.keyPair.privateKey,encrypted);return new TextDecoder().decode(decrypted);}
}// 2. 认证管理器
class AuthenticationManager {constructor() {this.tokens = new Map();}// 验证令牌async validateToken(token) {if (!token) return false;const tokenInfo = this.tokens.get(token);if (!tokenInfo) return false;if (tokenInfo.expiresAt < Date.now()) {this.tokens.delete(token);return false;}return true;}// 生成新令牌async generateToken(userId) {const token = await this.createSecureToken();this.tokens.set(token, {userId,expiresAt: Date.now() + 24 * 60 * 60 * 1000 // 24小时});return token;}
}// 3. 速率限制器
class RateLimiter {constructor(options = {}) {this.options = {windowMs: 60000, // 1分钟maxRequests: 100,...options};this.requests = new Map();}// 检查请求是否允许async checkLimit(clientId) {this.removeOldRequests(clientId);const requests = this.requests.get(clientId) || [];if (requests.length >= this.options.maxRequests) {return false;}requests.push(Date.now());this.requests.set(clientId, requests);return true;}// 移除过期请求记录removeOldRequests(clientId) {const now = Date.now();const windowStart = now - this.options.windowMs;const requests = this.requests.get(clientId) || [];const validRequests = requests.filter(time => time > windowStart);if (validRequests.length < requests.length) {this.requests.set(clientId, validRequests);}}
}
最佳实践建议 💡
- 连接管理模式
// 1. 连接状态管理器
class ConnectionStateManager {constructor() {this.state = 'disconnected';this.listeners = new Set();}// 更新状态setState(newState) {const oldState = this.state;this.state = newState;this.notifyListeners(oldState, newState);}// 添加状态监听器addListener(listener) {this.listeners.add(listener);}// 通知监听器notifyListeners(oldState, newState) {for (const listener of this.listeners) {listener(oldState, newState);}}
}// 2. 重试策略
class RetryStrategy {constructor(options = {}) {this.options = {initialDelay: 1000,maxDelay: 30000,factor: 2,maxAttempts: 5,...options};}// 计算延迟时间getDelay(attempt) {const delay = this.options.initialDelay * Math.pow(this.options.factor, attempt);return Math.min(delay, this.options.maxDelay);}// 检查是否应该重试shouldRetry(attempt, error) {if (attempt >= this.options.maxAttempts) {return false;}// 根据错误类型决定是否重试return this.isRetryableError(error);}
}// 3. 日志记录器
class CommunicationLogger {constructor() {this.logs = [];this.maxLogs = 1000;}// 记录日志log(type, data) {const logEntry = {type,data,timestamp: Date.now()};this.logs.push(logEntry);if (this.logs.length > this.maxLogs) {this.logs.shift();}this.persistLogs();}// 持久化日志persistLogs() {try {localStorage.setItem('communication_logs', JSON.stringify(this.logs));} catch (error) {console.error('Failed to persist logs:', error);}}
}
结语 📝
实时通信系统是现代Web应用中的重要组成部分。通过本文,我们学习了:
- WebSocket通信的基础实现
- 消息队列系统的设计
- 实时数据同步机制
- 性能优化策略
- 安全性考虑和最佳实践
💡 学习建议:在实现实时通信系统时,要特别注意连接的可靠性和消息的可靠传递。同时,要根据实际需求选择合适的同步策略,平衡实时性和系统负载。
如果你觉得这篇文章有帮助,欢迎点赞收藏,也期待在评论区看到你的想法和建议!👇
终身学习,共同成长。
咱们下一期见
💻
相关文章:
JavaScript系列(62)--实时通信系统实现详解
JavaScript实时通信系统实现详解 🔄 今天,让我们深入探讨JavaScript的实时通信系统实现。实时通信是现代Web应用中不可或缺的一部分,它能够提供即时的数据交互和更好的用户体验。 WebSocket通信基础 🌟 💡 小知识&am…...

【蓝桥杯嵌入式】2_LED
1、电路图 74HC573是八位锁存器,当控制端LE脚为高电平时,芯片“导通”,LE为低电平时芯片“截止”即将输出状态“锁存”,led此时不会改变状态,所以可通过led对应的八个引脚的电平来控制led的状态,原理图分析…...
代码随想录day06
242.有效的字母异位词 刚学哈希表想着使用unordered_set来实现,结果无法通过,原因是对字母异位词理解有问题,字母异位词是通过重新排列不同单词或短语的字母而形成的单词或短语,并使用所有原字母一次。对字母出现的次数有要求&am…...

Spring @PropertySource:让你的应用配置更加模块化和可维护
PropertySource注解在Spring中的作用,就像是给Spring应用配了一个“外部配置箱”。 想象一下,你在开发一个Spring应用时,有很多配置信息需要设置,比如数据库的连接信息、应用的某些功能开关等。如果这些信息都硬编码在代码中&…...
【Java】MyBatis动态SQL
在MyBatis中使用动态SQL语句。 动态SQL是指根据参数数据动态组织SQL的技术。 生活中的案例: 在京东上买东西时,用户搜索商品,可以选择筛选条件,比如品牌,价格,材质等,也可以不使用筛选条件。这时…...
旅行社项目展示微信小程序功能模块和开发流程
旅行社当前旅游线路的程序(微信小程序),旨在帮助旅行社更高效地管理线下活动预订,同时为客户提供便捷的报名和查看功能。适用于短途游、团队建设等活动,支持在线预订、缴费及订单管理,可根据用户需求定制更多个性化服务,为公司提升品牌知名度与客户体验。通过简洁明了的…...
litemall,又一个小商场系统
litemall Spring Boot后端 Vue管理员前端 微信小程序用户前端 Vue用户移动端 代码地址:litemall: 又一个小商城。 litemall Spring Boot后端 Vue管理员前端 微信小程序用户前端 Vue用户移动端...

WGCLOUD监控系统部署教程
官网地址:下载WGCLOUD安装包 - WGCLOUD官网 第一步、环境配置 #安装jdk 1、安装 EPEL 仓库: sudo yum install -y epel-release 2、安装 OpenJDK 11: sudo yum install java-11-openjdk-devel 3、如果成功,你可以通过运行 java …...

Python大数据可视化:基于Python的王者荣耀战队的数据分析系统设计与实现_flask+hadoop+spider
开发语言:Python框架:flaskPython版本:python3.7.7数据库:mysql 5.7数据库工具:Navicat11开发软件:PyCharm 系统展示 管理员登录 管理员功能界面 比赛信息管理 看板展示 系统管理 摘要 本文使用Python与…...

(苍穹外卖)项目结构
苍穹外卖项目结构 后端工程基于 maven 进行项目构建,并且进行分模块开发。 1). 用 IDEA 打开初始工程,了解项目的整体结构: 对工程的每个模块作用说明: 序号名称说明1sky-take-outmaven父工程,统一管理依赖版本&…...
ASP.NET Core筛选器Filter
目录 什么是Filter? Exception Filter 实现 注意 ActionFilter 注意 案例:自动启用事务的筛选器 事务的使用 TransactionScopeFilter的使用 什么是Filter? 切面编程机制,在ASP.NET Core特定的位置执行我们自定义的代码。…...

ChatGPT怎么回事?
纯属发现,调侃一下~ 这段时间deepseek不是特别火吗,尤其是它的推理功能,突发奇想,想用deepseek回答一些问题,回答一个问题之后就回复服务器繁忙(估计还在被攻击吧~_~) 然后就转向了GPT…...
软件工程-可行性研究
包含 技术可行性 使用现有技术能实现这个系统吗? 经济可行性 这个系统的经济效益能超过它的开发成本吗? 操作可行性 系统的操作方式在这个用户组织内行得通吗? 必要时还应该从法律、社会效益等更广泛的方面研究每种解法的可行性 可行性…...

园区网设计与实战
想做一个自己学习的有关的csdn账号,努力奋斗......会更新我计算机网络实验课程的所有内容,还有其他的学习知识^_^,为自己巩固一下所学知识。 我是一个萌新小白,有误地方请大家指正,谢谢^_^ 文章目录 前言 这个实验主…...

spy-debugger + Charles 调试移动端/内嵌小程序H5
简介说明: PC端可以用F12进行console等进行调试,但移动端App中使用webview就无法进行实时调试,针对这种情况 1. 安装 全局安装 spy-debugger sudo npm install spy-debugger -g // window不用加sudo2. spy-debugger 证书 其实spy-debugg…...

4.攻防世界 unseping
进入题目页面如下 直接给出源码,开始代码审计 <?php // 高亮显示当前 PHP 文件的源代码,方便调试和查看代码结构 highlight_file(__FILE__);// 定义一个名为 ease 的类 class ease {// 定义私有属性 $method,用于存储要调用的方法名priv…...

安装和使用 Ollama(实验环境windows)
下载安装 下载 https://ollama.com/download/windows 安装 Windows 安装 如果直接双击 OllamaSetup.exe 安装,默认会安装到 C 盘,如果需要指定安装目录,需要通过命令行指定安装地址,如下: # 切换到安装目录 C:\Use…...

(一)DeepSeek大模型安装部署-Ollama安装
大模型deepseek安装部署 (一)、安装ollama curl -fsSL https://ollama.com/install.sh | sh sudo systemctl start ollama sudo systemctl enable ollama sudo systemctl status ollama(二)、安装ollama遇到网络问题,请手动下载 ollama-linux-amd64.tgz curl -L …...
【大数据技术】搭建完全分布式高可用大数据集群(ZooKeeper)
搭建完全分布式高可用大数据集群(ZooKeeper) apache-zookeeper-3.8.4-bin.tar.gz注:请在阅读本篇文章前,将以上资源下载下来。 写在前面 本文主要介绍搭建完全分布式高可用集群 ZooKeeper 的详细步骤。 注意: 统一约定将软件安装包存放于虚拟机的/software目录下,软件…...
前端学习-tab栏切换改造项目(三十一)
目录 前言 监听代码 思路 代码 事件委托代码 思路 代码 总结 前言 星垂平野阔,月涌大江流 监听代码 思路 等待DOM加载完成 获取所有标签 为每个标签添加鼠标悬停事件监听器 定义showTab函数: 接收一个索引参数index,用于标识当前悬停…...

免费批量去水印工具 - 针对文心一言生成图片
免费批量去水印工具 - 针对文心一言生成图片 工具介绍 这是一款免费的批量去水印工具,专门针对文心一言生成的图片进行处理。通过简单的操作,您可以快速去除图片中的水印。 下载链接 您可以通过以下网盘链接下载工具: 链接: https://pa…...

Python训练打卡Day43
复习日 1.卷积神经网络的基本概念 2.kaggle找到一个图像数据集,用cnn网络进行训练并且用grad-cam做可视化 进阶:并拆分成多个文件 tips:注册kaggle的注意事项 安装插件:Header Editor 然后打开扩展选项: 输入网址:ht…...

LabVIEW音频测试分析
LabVIEW通过读取指定WAV 文件,实现对音频信号的播放、多维度测量分析功能,为音频设备研发、声学研究及质量检测提供专业工具支持。 主要功能 文件读取与播放:支持持续读取示例数据文件夹内的 WAV 文件,可实时播放音频以监听被测信…...

Python Day46
Task: 1.不同CNN层的特征图:不同通道的特征图 2.什么是注意力:注意力家族,类似于动物园,都是不同的模块,好不好试了才知道。 3.通道注意力:模型的定义和插入的位置 4.通道注意力后的特征图和热力…...
Wireshark使用教程(含安装包和安装教程)
Wireshark使用入门教程 0.资源下载以及软件安装1.Wireshark中无法显示网卡列表2.Wireshark抓取H264过程 0.资源下载以及软件安装 参考blog: 抓包神器wireshark安装保姆级教程 压缩包下载:Wireshark安装包 1.Wireshark中无法显示网卡列表 Wireshark中无法显示网…...

前端文件下载常用方式详解
在前端开发中,实现文件下载是常见的需求。根据不同的场景,我们可以选择不同的方法来实现文件流的下载。本文介绍三种常用的文件下载方式: 使用 axios 发送 JSON 请求下载文件流使用 axios 发送 FormData 请求下载文件流使用原生 form 表单提…...

oracle数据恢复—oracle数据库执行truncate命令后的怎么恢复数据?
oracle数据库误执行truncate命令导致数据丢失是一种常见情况。通常情况下,oracle数据库误操作删除数据只需要通过备份恢复数据即可。也会碰到一些特殊情况,例如数据库备份无法使用或者还原报错等。下面和大家分享一例oracle数据库误执行truncate命令导致…...
java面试场景题:QPS 短链系统怎么设计
以下是对文章的润色版本: 这道场景设计题,初看似乎业务简单,实则覆盖的知识点极为丰富: 高并发与高性能分布式 ID 生成机制;Redis Bloom Filter——高并发、低内存损耗的过滤组件知识;分库、分表海量数据存…...
Hadolint:Dockerfile 语法检查与最佳实践验证的终极工具
在容器化应用开发的浪潮中,Dockerfile 作为构建 Docker 镜像的核心配置文件,其质量直接影响着应用的安全性、稳定性和可维护性。然而,随着项目复杂度的增加,手动检查 Dockerfile 不仅耗时,还容易遗漏潜在问题。今天,我要向大家介绍一款强大的工具——Hadolint,它将彻底改…...

ES 学习总结一 基础内容
ElasticSearch学习 一、 初识ES1、 认识与安装2、 倒排索引2.1 正向索引2.2 倒排索引 3、 基本概念3.1 文档和字段3.2 索引和倒排 4 、 IK分词器 二、 操作1、 mapping 映射属性2、 索引库增删改查3、 文档的增删改查3.1 新增文档3.2 查询文档3.3 删除文档3.4 修改文档3.5 批处…...