JavaScript系列(62)--实时通信系统实现详解
JavaScript实时通信系统实现详解 🔄
今天,让我们深入探讨JavaScript的实时通信系统实现。实时通信是现代Web应用中不可或缺的一部分,它能够提供即时的数据交互和更好的用户体验。
WebSocket通信基础 🌟
💡 小知识:WebSocket是HTML5开始提供的一种在单个TCP连接上进行全双工通讯的协议。相比HTTP,它能够提供持久连接和双向通信能力。
// 1. WebSocket连接管理器
class WebSocketManager {constructor(url, options = {}) {this.url = url;this.options = {reconnectAttempts: 5,reconnectDelay: 1000,heartbeatInterval: 30000,...options};this.connection = null;this.reconnectCount = 0;this.listeners = new Map();this.heartbeatTimer = null;}// 建立连接connect() {try {this.connection = new WebSocket(this.url);this.setupEventListeners();this.startHeartbeat();} catch (error) {this.handleConnectionError(error);}}// 设置事件监听器setupEventListeners() {this.connection.onopen = () => {this.reconnectCount = 0;this.emit('connected');};this.connection.onclose = () => {this.handleDisconnect();};this.connection.onerror = (error) => {this.handleConnectionError(error);};this.connection.onmessage = (event) => {this.handleMessage(event.data);};}// 启动心跳检测startHeartbeat() {this.heartbeatTimer = setInterval(() => {if (this.connection.readyState === WebSocket.OPEN) {this.send('heartbeat', { timestamp: Date.now() });}}, this.options.heartbeatInterval);}// 处理断开连接handleDisconnect() {this.stopHeartbeat();this.emit('disconnected');if (this.reconnectCount < this.options.reconnectAttempts) {setTimeout(() => {this.reconnectCount++;this.connect();}, this.options.reconnectDelay * this.reconnectCount);} else {this.emit('maxReconnectAttemptsReached');}}// 发送消息send(type, data) {if (this.connection.readyState !== WebSocket.OPEN) {throw new Error('Connection is not open');}const message = JSON.stringify({ type, data });this.connection.send(message);}// 停止心跳检测stopHeartbeat() {if (this.heartbeatTimer) {clearInterval(this.heartbeatTimer);this.heartbeatTimer = null;}}
}// 2. 消息处理器
class MessageHandler {constructor() {this.handlers = new Map();}// 注册消息处理器register(type, handler) {if (!this.handlers.has(type)) {this.handlers.set(type, new Set());}this.handlers.get(type).add(handler);}// 处理消息async handle(message) {const { type, data } = JSON.parse(message);const handlers = this.handlers.get(type);if (handlers) {const promises = Array.from(handlers).map(handler => handler(data));await Promise.all(promises);}}
}// 3. 重连管理器
class ReconnectionManager {constructor(options = {}) {this.options = {maxAttempts: 5,baseDelay: 1000,maxDelay: 30000,...options};this.attempts = 0;this.currentDelay = this.options.baseDelay;}// 计算下一次重连延迟getNextDelay() {const delay = Math.min(this.currentDelay * Math.pow(2, this.attempts),this.options.maxDelay);this.attempts++;return delay;}// 重置重连状态reset() {this.attempts = 0;this.currentDelay = this.options.baseDelay;}// 检查是否可以继续重连canReconnect() {return this.attempts < this.options.maxAttempts;}
}
消息队列系统 📨
// 1. 消息队列
class MessageQueue {constructor() {this.queue = [];this.processing = false;this.maxRetries = 3;}// 添加消息enqueue(message) {this.queue.push({message,retries: 0,timestamp: Date.now()});this.processQueue();}// 处理队列async processQueue() {if (this.processing || this.queue.length === 0) return;this.processing = true;while (this.queue.length > 0) {const item = this.queue[0];try {await this.processMessage(item.message);this.queue.shift();} catch (error) {if (item.retries < this.maxRetries) {item.retries++;// 移到队列末尾this.queue.push(this.queue.shift());} else {// 放入死信队列this.moveToDeadLetter(item);this.queue.shift();}}}this.processing = false;}// 移动到死信队列moveToDeadLetter(item) {// 实现死信队列逻辑}
}// 2. 优先级队列
class PriorityMessageQueue {constructor() {this.queues = new Map();this.priorities = ['high', 'medium', 'low'];}// 添加消息enqueue(message, priority = 'medium') {if (!this.queues.has(priority)) {this.queues.set(priority, []);}this.queues.get(priority).push({message,timestamp: Date.now()});}// 获取下一个消息dequeue() {for (const priority of this.priorities) {const queue = this.queues.get(priority);if (queue && queue.length > 0) {return queue.shift();}}return null;}
}// 3. 消息持久化管理器
class MessagePersistenceManager {constructor() {this.storage = new Map();this.initStorage();}// 初始化存储async initStorage() {try {const stored = localStorage.getItem('message_queue');if (stored) {const data = JSON.parse(stored);this.storage = new Map(Object.entries(data));}} catch (error) {console.error('Failed to initialize storage:', error);}}// 保存消息async persistMessage(id, message) {this.storage.set(id, {message,timestamp: Date.now()});await this.saveToStorage();}// 保存到存储async saveToStorage() {try {const data = Object.fromEntries(this.storage);localStorage.setItem('message_queue', JSON.stringify(data));} catch (error) {console.error('Failed to save to storage:', error);}}
}
实时数据同步 🔄
// 1. 实时数据同步器
class RealtimeDataSync {constructor(options = {}) {this.options = {syncInterval: 1000,batchSize: 100,...options};this.changes = new Map();this.syncTimer = null;}// 记录变更recordChange(key, value) {this.changes.set(key, {value,timestamp: Date.now()});this.scheduleSyncIfNeeded();}// 调度同步scheduleSyncIfNeeded() {if (!this.syncTimer && this.changes.size > 0) {this.syncTimer = setTimeout(() => {this.performSync();}, this.options.syncInterval);}}// 执行同步async performSync() {const batch = this.prepareSyncBatch();if (batch.size > 0) {try {await this.sendChanges(batch);this.clearSyncedChanges(batch);} catch (error) {this.handleSyncError(error);}}this.syncTimer = null;this.scheduleSyncIfNeeded();}// 准备同步批次prepareSyncBatch() {const batch = new Map();let count = 0;for (const [key, value] of this.changes) {if (count >= this.options.batchSize) break;batch.set(key, value);count++;}return batch;}
}// 2. 冲突解决器
class ConflictResolver {constructor() {this.strategies = new Map();this.setupDefaultStrategies();}// 设置默认策略setupDefaultStrategies() {this.strategies.set('lastWriteWins', (local, remote) => {return local.timestamp > remote.timestamp ? local : remote;});this.strategies.set('merge', (local, remote) => {return {...local,...remote,timestamp: Math.max(local.timestamp, remote.timestamp)};});}// 解决冲突resolve(local, remote, strategy = 'lastWriteWins') {const resolver = this.strategies.get(strategy);if (!resolver) {throw new Error(`Unknown strategy: ${strategy}`);}return resolver(local, remote);}
}// 3. 版本控制管理器
class VersionManager {constructor() {this.versions = new Map();this.history = new Map();}// 更新版本updateVersion(key, value) {const currentVersion = this.versions.get(key) || 0;const newVersion = currentVersion + 1;this.versions.set(key, newVersion);this.recordHistory(key, value, newVersion);return newVersion;}// 记录历史recordHistory(key, value, version) {if (!this.history.has(key)) {this.history.set(key, new Map());}const keyHistory = this.history.get(key);keyHistory.set(version, {value,timestamp: Date.now()});}// 获取特定版本getVersion(key, version) {const keyHistory = this.history.get(key);if (!keyHistory) return null;return keyHistory.get(version);}
}
性能优化策略 ⚡
// 1. 消息压缩器
class MessageCompressor {constructor() {this.compressionThreshold = 1024; // 1KB}// 压缩消息async compress(message) {if (typeof message !== 'string') {message = JSON.stringify(message);}if (message.length < this.compressionThreshold) {return message;}const msgBuffer = new TextEncoder().encode(message);const compressed = await gzip(msgBuffer);return compressed;}// 解压消息async decompress(data) {if (!(data instanceof Uint8Array)) {return data;}const decompressed = await ungzip(data);return new TextDecoder().decode(decompressed);}
}// 2. 批处理优化器
class BatchProcessor {constructor(options = {}) {this.options = {maxBatchSize: 100,maxWaitTime: 1000,...options};this.batch = [];this.timer = null;}// 添加项目到批处理add(item) {this.batch.push(item);if (this.batch.length >= this.options.maxBatchSize) {this.flush();} else if (!this.timer) {this.timer = setTimeout(() => this.flush(), this.options.maxWaitTime);}}// 刷新批处理async flush() {if (this.timer) {clearTimeout(this.timer);this.timer = null;}if (this.batch.length === 0) return;const items = [...this.batch];this.batch = [];await this.processBatch(items);}
}// 3. 连接池管理器
class ConnectionPool {constructor(options = {}) {this.options = {maxConnections: 5,idleTimeout: 30000,...options};this.connections = new Set();this.idle = new Set();}// 获取连接async getConnection() {let connection;if (this.idle.size > 0) {connection = this.idle.values().next().value;this.idle.delete(connection);} else if (this.connections.size < this.options.maxConnections) {connection = await this.createConnection();this.connections.add(connection);} else {throw new Error('Connection pool exhausted');}return connection;}// 释放连接releaseConnection(connection) {if (this.connections.has(connection)) {this.idle.add(connection);setTimeout(() => {if (this.idle.has(connection)) {this.closeConnection(connection);}}, this.options.idleTimeout);}}
}
安全性考虑 🔒
// 1. 消息加密器
class MessageEncryptor {constructor() {this.keyPair = null;this.initializeKeyPair();}// 初始化密钥对async initializeKeyPair() {this.keyPair = await window.crypto.subtle.generateKey({name: 'RSA-OAEP',modulusLength: 2048,publicExponent: new Uint8Array([1, 0, 1]),hash: 'SHA-256'},true,['encrypt', 'decrypt']);}// 加密消息async encrypt(message) {const encoded = new TextEncoder().encode(typeof message === 'string' ? message : JSON.stringify(message));return window.crypto.subtle.encrypt({name: 'RSA-OAEP'},this.keyPair.publicKey,encoded);}// 解密消息async decrypt(encrypted) {const decrypted = await window.crypto.subtle.decrypt({name: 'RSA-OAEP'},this.keyPair.privateKey,encrypted);return new TextDecoder().decode(decrypted);}
}// 2. 认证管理器
class AuthenticationManager {constructor() {this.tokens = new Map();}// 验证令牌async validateToken(token) {if (!token) return false;const tokenInfo = this.tokens.get(token);if (!tokenInfo) return false;if (tokenInfo.expiresAt < Date.now()) {this.tokens.delete(token);return false;}return true;}// 生成新令牌async generateToken(userId) {const token = await this.createSecureToken();this.tokens.set(token, {userId,expiresAt: Date.now() + 24 * 60 * 60 * 1000 // 24小时});return token;}
}// 3. 速率限制器
class RateLimiter {constructor(options = {}) {this.options = {windowMs: 60000, // 1分钟maxRequests: 100,...options};this.requests = new Map();}// 检查请求是否允许async checkLimit(clientId) {this.removeOldRequests(clientId);const requests = this.requests.get(clientId) || [];if (requests.length >= this.options.maxRequests) {return false;}requests.push(Date.now());this.requests.set(clientId, requests);return true;}// 移除过期请求记录removeOldRequests(clientId) {const now = Date.now();const windowStart = now - this.options.windowMs;const requests = this.requests.get(clientId) || [];const validRequests = requests.filter(time => time > windowStart);if (validRequests.length < requests.length) {this.requests.set(clientId, validRequests);}}
}
最佳实践建议 💡
- 连接管理模式
// 1. 连接状态管理器
class ConnectionStateManager {constructor() {this.state = 'disconnected';this.listeners = new Set();}// 更新状态setState(newState) {const oldState = this.state;this.state = newState;this.notifyListeners(oldState, newState);}// 添加状态监听器addListener(listener) {this.listeners.add(listener);}// 通知监听器notifyListeners(oldState, newState) {for (const listener of this.listeners) {listener(oldState, newState);}}
}// 2. 重试策略
class RetryStrategy {constructor(options = {}) {this.options = {initialDelay: 1000,maxDelay: 30000,factor: 2,maxAttempts: 5,...options};}// 计算延迟时间getDelay(attempt) {const delay = this.options.initialDelay * Math.pow(this.options.factor, attempt);return Math.min(delay, this.options.maxDelay);}// 检查是否应该重试shouldRetry(attempt, error) {if (attempt >= this.options.maxAttempts) {return false;}// 根据错误类型决定是否重试return this.isRetryableError(error);}
}// 3. 日志记录器
class CommunicationLogger {constructor() {this.logs = [];this.maxLogs = 1000;}// 记录日志log(type, data) {const logEntry = {type,data,timestamp: Date.now()};this.logs.push(logEntry);if (this.logs.length > this.maxLogs) {this.logs.shift();}this.persistLogs();}// 持久化日志persistLogs() {try {localStorage.setItem('communication_logs', JSON.stringify(this.logs));} catch (error) {console.error('Failed to persist logs:', error);}}
}
结语 📝
实时通信系统是现代Web应用中的重要组成部分。通过本文,我们学习了:
- WebSocket通信的基础实现
- 消息队列系统的设计
- 实时数据同步机制
- 性能优化策略
- 安全性考虑和最佳实践
💡 学习建议:在实现实时通信系统时,要特别注意连接的可靠性和消息的可靠传递。同时,要根据实际需求选择合适的同步策略,平衡实时性和系统负载。
如果你觉得这篇文章有帮助,欢迎点赞收藏,也期待在评论区看到你的想法和建议!👇
终身学习,共同成长。
咱们下一期见
💻
相关文章:
JavaScript系列(62)--实时通信系统实现详解
JavaScript实时通信系统实现详解 🔄 今天,让我们深入探讨JavaScript的实时通信系统实现。实时通信是现代Web应用中不可或缺的一部分,它能够提供即时的数据交互和更好的用户体验。 WebSocket通信基础 🌟 💡 小知识&am…...

【蓝桥杯嵌入式】2_LED
1、电路图 74HC573是八位锁存器,当控制端LE脚为高电平时,芯片“导通”,LE为低电平时芯片“截止”即将输出状态“锁存”,led此时不会改变状态,所以可通过led对应的八个引脚的电平来控制led的状态,原理图分析…...
代码随想录day06
242.有效的字母异位词 刚学哈希表想着使用unordered_set来实现,结果无法通过,原因是对字母异位词理解有问题,字母异位词是通过重新排列不同单词或短语的字母而形成的单词或短语,并使用所有原字母一次。对字母出现的次数有要求&am…...

Spring @PropertySource:让你的应用配置更加模块化和可维护
PropertySource注解在Spring中的作用,就像是给Spring应用配了一个“外部配置箱”。 想象一下,你在开发一个Spring应用时,有很多配置信息需要设置,比如数据库的连接信息、应用的某些功能开关等。如果这些信息都硬编码在代码中&…...
【Java】MyBatis动态SQL
在MyBatis中使用动态SQL语句。 动态SQL是指根据参数数据动态组织SQL的技术。 生活中的案例: 在京东上买东西时,用户搜索商品,可以选择筛选条件,比如品牌,价格,材质等,也可以不使用筛选条件。这时…...
旅行社项目展示微信小程序功能模块和开发流程
旅行社当前旅游线路的程序(微信小程序),旨在帮助旅行社更高效地管理线下活动预订,同时为客户提供便捷的报名和查看功能。适用于短途游、团队建设等活动,支持在线预订、缴费及订单管理,可根据用户需求定制更多个性化服务,为公司提升品牌知名度与客户体验。通过简洁明了的…...
litemall,又一个小商场系统
litemall Spring Boot后端 Vue管理员前端 微信小程序用户前端 Vue用户移动端 代码地址:litemall: 又一个小商城。 litemall Spring Boot后端 Vue管理员前端 微信小程序用户前端 Vue用户移动端...

WGCLOUD监控系统部署教程
官网地址:下载WGCLOUD安装包 - WGCLOUD官网 第一步、环境配置 #安装jdk 1、安装 EPEL 仓库: sudo yum install -y epel-release 2、安装 OpenJDK 11: sudo yum install java-11-openjdk-devel 3、如果成功,你可以通过运行 java …...

Python大数据可视化:基于Python的王者荣耀战队的数据分析系统设计与实现_flask+hadoop+spider
开发语言:Python框架:flaskPython版本:python3.7.7数据库:mysql 5.7数据库工具:Navicat11开发软件:PyCharm 系统展示 管理员登录 管理员功能界面 比赛信息管理 看板展示 系统管理 摘要 本文使用Python与…...

(苍穹外卖)项目结构
苍穹外卖项目结构 后端工程基于 maven 进行项目构建,并且进行分模块开发。 1). 用 IDEA 打开初始工程,了解项目的整体结构: 对工程的每个模块作用说明: 序号名称说明1sky-take-outmaven父工程,统一管理依赖版本&…...
ASP.NET Core筛选器Filter
目录 什么是Filter? Exception Filter 实现 注意 ActionFilter 注意 案例:自动启用事务的筛选器 事务的使用 TransactionScopeFilter的使用 什么是Filter? 切面编程机制,在ASP.NET Core特定的位置执行我们自定义的代码。…...

ChatGPT怎么回事?
纯属发现,调侃一下~ 这段时间deepseek不是特别火吗,尤其是它的推理功能,突发奇想,想用deepseek回答一些问题,回答一个问题之后就回复服务器繁忙(估计还在被攻击吧~_~) 然后就转向了GPT…...
软件工程-可行性研究
包含 技术可行性 使用现有技术能实现这个系统吗? 经济可行性 这个系统的经济效益能超过它的开发成本吗? 操作可行性 系统的操作方式在这个用户组织内行得通吗? 必要时还应该从法律、社会效益等更广泛的方面研究每种解法的可行性 可行性…...

园区网设计与实战
想做一个自己学习的有关的csdn账号,努力奋斗......会更新我计算机网络实验课程的所有内容,还有其他的学习知识^_^,为自己巩固一下所学知识。 我是一个萌新小白,有误地方请大家指正,谢谢^_^ 文章目录 前言 这个实验主…...

spy-debugger + Charles 调试移动端/内嵌小程序H5
简介说明: PC端可以用F12进行console等进行调试,但移动端App中使用webview就无法进行实时调试,针对这种情况 1. 安装 全局安装 spy-debugger sudo npm install spy-debugger -g // window不用加sudo2. spy-debugger 证书 其实spy-debugg…...

4.攻防世界 unseping
进入题目页面如下 直接给出源码,开始代码审计 <?php // 高亮显示当前 PHP 文件的源代码,方便调试和查看代码结构 highlight_file(__FILE__);// 定义一个名为 ease 的类 class ease {// 定义私有属性 $method,用于存储要调用的方法名priv…...

安装和使用 Ollama(实验环境windows)
下载安装 下载 https://ollama.com/download/windows 安装 Windows 安装 如果直接双击 OllamaSetup.exe 安装,默认会安装到 C 盘,如果需要指定安装目录,需要通过命令行指定安装地址,如下: # 切换到安装目录 C:\Use…...

(一)DeepSeek大模型安装部署-Ollama安装
大模型deepseek安装部署 (一)、安装ollama curl -fsSL https://ollama.com/install.sh | sh sudo systemctl start ollama sudo systemctl enable ollama sudo systemctl status ollama(二)、安装ollama遇到网络问题,请手动下载 ollama-linux-amd64.tgz curl -L …...
【大数据技术】搭建完全分布式高可用大数据集群(ZooKeeper)
搭建完全分布式高可用大数据集群(ZooKeeper) apache-zookeeper-3.8.4-bin.tar.gz注:请在阅读本篇文章前,将以上资源下载下来。 写在前面 本文主要介绍搭建完全分布式高可用集群 ZooKeeper 的详细步骤。 注意: 统一约定将软件安装包存放于虚拟机的/software目录下,软件…...
前端学习-tab栏切换改造项目(三十一)
目录 前言 监听代码 思路 代码 事件委托代码 思路 代码 总结 前言 星垂平野阔,月涌大江流 监听代码 思路 等待DOM加载完成 获取所有标签 为每个标签添加鼠标悬停事件监听器 定义showTab函数: 接收一个索引参数index,用于标识当前悬停…...

YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练
前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1):从基础到实战的深度解析-CSDN博客,但实际面试中,企业更关注候选人对复杂场景的应对能力(如多设备并发扫描、低功耗与高发现率的平衡)和前沿技术的…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现
摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序,以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务,提供稳定高效的数据处理与业务逻辑支持;利用 uniapp 实现跨平台前…...
MySQL用户和授权
开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务: test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...
LeetCode - 199. 二叉树的右视图
题目 199. 二叉树的右视图 - 力扣(LeetCode) 思路 右视图是指从树的右侧看,对于每一层,只能看到该层最右边的节点。实现思路是: 使用深度优先搜索(DFS)按照"根-右-左"的顺序遍历树记录每个节点的深度对于…...

C++:多态机制详解
目录 一. 多态的概念 1.静态多态(编译时多态) 二.动态多态的定义及实现 1.多态的构成条件 2.虚函数 3.虚函数的重写/覆盖 4.虚函数重写的一些其他问题 1).协变 2).析构函数的重写 5.override 和 final关键字 1&#…...

[ACTF2020 新生赛]Include 1(php://filter伪协议)
题目 做法 启动靶机,点进去 点进去 查看URL,有 ?fileflag.php说明存在文件包含,原理是php://filter 协议 当它与包含函数结合时,php://filter流会被当作php文件执行。 用php://filter加编码,能让PHP把文件内容…...
PostgreSQL——环境搭建
一、Linux # 安装 PostgreSQL 15 仓库 sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-$(rpm -E %{rhel})-x86_64/pgdg-redhat-repo-latest.noarch.rpm# 安装之前先确认是否已经存在PostgreSQL rpm -qa | grep postgres# 如果存在࿰…...

Scrapy-Redis分布式爬虫架构的可扩展性与容错性增强:基于微服务与容器化的解决方案
在大数据时代,海量数据的采集与处理成为企业和研究机构获取信息的关键环节。Scrapy-Redis作为一种经典的分布式爬虫架构,在处理大规模数据抓取任务时展现出强大的能力。然而,随着业务规模的不断扩大和数据抓取需求的日益复杂,传统…...