当前位置: 首页 > article >正文

基于Go语言实现一个网络聊天室(连接Redis版)

基于Go语言和Redis的实时聊天室项目详解

项目概述

在这个项目中,我们实现了一个基于Go语言和Redis的实时聊天室系统。该系统允许用户通过客户端连接到服务器,进行实时聊天,并且支持以下功能:

  • 用户网名注册和验证
  • 消息广播和接收
  • 心跳检测和自动重连
  • 用户活跃度统计和排名
  • 消息存储到Redis

技术栈

  • Go语言:用于实现客户端和服务器逻辑
  • Redis:用于存储用户活跃度和聊天记录
  • TCP协议:用于客户端和服务器之间的通信

项目结构

项目由三个主要文件组成:

  1. client.go:客户端逻辑
  2. server.go:服务器逻辑
  3. utils.go:工具函数(消息发送和读取)

详细实现

客户端实现(client.go)

1. 连接到服务器

客户端通过TCP协议连接到服务器:

conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {log.Fatal("连接服务器时出错:", err)
}
defer conn.Close()
2. 输入网名

用户输入网名并发送到服务器:

fmt.Print("请输入你的网名: ")
nameInput, err := reader.ReadString('\n')
name = strings.TrimSpace(nameInput)
err = utils.SendMessage(conn, []byte(name))
3. 消息发送和接收

用户输入消息后,通过utils.SendMessage发送到服务器:

message, err := reader.ReadString('\n')
err = utils.SendMessage(conn, []byte(message))

服务器发送的消息通过handleServerMessages函数接收并打印:

func handleServerMessages(conn *net.Conn) {reader := bufio.NewReader(*conn)for {message, err := utils.ReadMessage(reader)if err != nil {// 处理错误和重连逻辑}fmt.Println(string(message))}
}
4. 心跳检测和自动重连

客户端会接收服务器的心跳检测消息(PING),并发送PONG响应:

if string(message) == "PING" {log.Printf("【心跳检测】接收到服务端 PING,发送 PONG 响应")utils.SendMessage(*conn, []byte("PONG"))
}

如果连接断开,客户端会尝试重新连接:

func reconnect(oldConn net.Conn) (net.Conn, bool) {for i := 0; i < 3; i++ {newConn, err := net.Dial("tcp", "localhost:8080")if err == nil {oldConn.Close()return newConn, true}time.Sleep(time.Duration(2<<uint(i)) * time.Second)}return nil, false
}

服务器实现(server.go)

1. 初始化Redis

服务器使用Redis存储用户活跃度和聊天记录:

redisClient := redis.NewClient(&redis.Options{Addr:     "localhost:6379",Password: "",DB:       0,
})
2. 客户端管理

服务器维护一个ChatRoom结构体,管理所有在线客户端:

type ChatRoom struct {Clients map[*Client]boolJoin    chan *ClientLeave   chan *ClientMessage chan []byteRedis   *redis.Clientmu      sync.RWMutex
}
3. 消息广播

服务器接收客户端的消息,并广播到所有在线客户端:

func (cr *ChatRoom) Run() {for {select {case message := <-cr.Message:cr.mu.RLock()for client := range cr.Clients {client.Send <- message}cr.mu.RUnlock()batchStoreToRedis(ctx, cr.Redis, message)}}
}
4. 心跳检测

服务器定期向客户端发送PING消息,并等待PONG响应:

ticker := time.NewTicker(10 * time.Second)
heartbeatTimeout := time.NewTimer(15 * time.Second)
for {select {case <-ticker.C:utils.SendMessage(client.Conn, []byte("PING"))heartbeatTimeout.Reset(15 * time.Second)case <-heartbeatTimeout.C:cr.Leave <- client}
}
5. Redis存储

服务器将消息批量存储到Redis,并更新用户活跃度:

func batchStoreToRedis(ctx context.Context, redisClient *redis.Client, message []byte) {queue := redisClient.LRange(ctx, "chat_messages_queue", 0, -1).Val()queue = append(queue, string(message))if len(queue) >= 10 {pipe := redisClient.Pipeline()for _, msg := range queue {pipe.RPush(ctx, "chat_messages", msg)pipe.ZIncrBy(ctx, "user_activity", 1, extractUsername(msg))}pipe.Exec(ctx)redisClient.Del(ctx, "chat_messages_queue")} else {redisClient.RPush(ctx, "chat_messages_queue", string(message))}
}

工具函数(utils.go)

工具函数提供了消息的发送和读取功能:

func SendMessage(conn net.Conn, message []byte) error {length := uint32(len(message))err := binary.Write(conn, binary.BigEndian, length)if err != nil {return err}_, err = conn.Write(message)return err
}func ReadMessage(reader *bufio.Reader) ([]byte, error) {var length uint32err := binary.Read(reader, binary.BigEndian, &length)if err != nil {return nil, err}message := make([]byte, length)_, err = io.ReadFull(reader, message)return message, err
}

重点问题分析

问题 1:心跳检测机制与消息处理分支的逻辑冲突导致服务端无法接收客户端消息

1.1 原始代码结构的问题

在未修复的代码中,服务端的 HandleClient 函数使用了如下逻辑:

for {select {case <-ticker.C:  // 心跳检测分支(每10秒触发一次)sendPing()default:          // 默认分支(非阻塞)message := readMessage()processMessage(message)}
}
1.2 关键问题分析
  • default 分支的局限性
    default 分支仅在 所有其他 case 未就绪时触发
    ticker.C 每隔 10 秒触发一次心跳检测时,select 会优先执行 case <-ticker.C,而 default 分支仅在心跳未触发时才会执行。
    这会导致:
    1. 消息读取延迟:客户端发送的消息可能堆积在 bufio.Reader 的缓冲区中,但服务端因 default 分支未及时执行而无法读取。
    2. 竞争条件:如果客户端在心跳触发时发送消息,服务端会优先处理心跳,消息可能被跳过。
1.3 具体场景模拟

假设客户端连续发送两条消息 消息A消息B,时间线如下:

时间点 0ms: 服务端开始循环
时间点 5ms: 客户端发送消息A
时间点 10ms: ticker.C 触发心跳检测(发送PING)
时间点 15ms: 客户端发送消息B
  • 服务端行为
    1. 10ms 时,case <-ticker.C 触发,发送 PING。
    2. default 分支在 10ms 后才有机会执行,但此时 bufio.Reader 中可能已有 消息A消息B
    3. 由于 default 分支是非阻塞的,服务端可能只读取到部分消息,甚至因心跳频繁触发而完全跳过消息处理。
1.4 Go 的 select 调度机制
  • 随机选择原则
    当多个 case 同时就绪时,select 会随机选择一个执行。
    但若某个 case(如 ticker.C)周期性触发,它会频繁占用执行机会,导致其他分支(如消息读取)被“饿死”。

  • default 分支的陷阱
    default 分支的设计初衷是避免阻塞,但它不适合需要持续监听的操作。
    在您的场景中,消息读取需要主动检查缓冲区,而 default 分支无法保证这一点。

1.5 修复方案的核心逻辑

修改后的代码通过以下方式解决问题:

for {select {case <-ticker.C:           // 心跳检测sendPing()case <-msgTicker.C:        // 专用消息读取分支(每50ms触发一次)message := readMessage()processMessage(message)}
}
1.6 为何有效?
  1. 独立的定时器 (msgTicker)
    添加了一个专用的 msgTicker,每隔 50ms 触发一次消息读取。

    • 即使心跳检测占用 select 的执行机会,消息读取仍有独立的触发窗口。
    • 避免了心跳和消息处理的竞争。
  2. 消除 default 分支的不可靠性
    用显式的 case <-msgTicker.C 替代 default,确保消息读取按固定频率执行。

1.7 总结:冲突的本质
  • 心跳机制干扰:周期性的心跳检测(ticker.C)占用了 select 的执行机会,导致 default 分支无法及时处理消息。
  • 修复思路:为消息读取分配独立的触发通道(msgTicker),与心跳检测解耦,确保两者互不阻塞。
1.8 类比解释

想象一个餐厅服务员(服务端)需要同时做两件事:

  1. 定时检查厨房温度(心跳检测):每10分钟一次。
  2. 接待顾客点餐(消息处理):需要随时响应。
  • 原始方案:服务员大部分时间站在厨房门口检查温度,只有偶尔看一眼大堂(default 分支),导致顾客长时间无人接待。
  • 修复方案:服务员每隔5分钟主动巡视大堂一次(msgTicker),同时定期检查厨房,两者互不干扰。

问题 2:网名重复导致身份冲突

2.1 前因后果
  • 问题本质:多个用户使用相同网名加入聊天室,导致消息归属混乱、活跃度统计错误。
  • 根本原因:服务端未验证网名唯一性,直接接受客户端提交的名称,未检查是否已被占用。
  • 具体表现
    • 用户A和用户B使用相同网名“小明”加入后,服务端无法区分两者消息。
    • Redis中user_activity的活跃度分数会被错误累加到同一用户名下。
2.2 具体场景模拟

假设用户A和用户B同时尝试使用网名“小明”连接:

时间点 0ms: 用户A发送网名“小明” → 服务端接受并加入。
时间点 50ms: 用户B发送网名“小明” → 服务端未检查唯一性,直接加入。
时间点 100ms: 用户A发送消息“你好”,用户B发送消息“大家好”。
  • 服务端行为
    1. 用户A和用户B的消息均被标记为“小明: 消息内容”。
    2. Redis中用户“小明”的活跃度分数错误累加为2(实际应为两个独立用户)。
2.3 Go 语言机制分析
  • 并发写入冲突:多个协程同时操作Redis集合时,若未加锁可能导致数据竞争。
  • 集合操作的原子性:Redis的SADDSISMEMBER命令是原子操作,但Go代码中需确保逻辑顺序正确。
2.4 解决方案
  1. Redis 集合管理在线用户
    • 用户加入时,检查网名是否存在于集合online_users
      exists, err := cr.Redis.SIsMember(ctx, "online_users", clientName).Result()
      if exists {utils.SendMessage(conn, []byte("ERROR: 网名已被占用"))return
      }
      
    • 网名合法后添加到集合:
      cr.Redis.SAdd(ctx, "online_users", clientName)
      
  2. 退出时清理网名
    • 使用defer确保客户端断开时从集合中移除:
      defer cr.Redis.SRem(ctx, "online_users", clientName)
      
2.5 总结类比
  • 问题类比:多人共用同一身份证号登记入住酒店,前台无法区分客人。
  • 修复思路:前台要求每位客人提供唯一身份证号,登记前查重,退房时注销。

问题 3:空消息和空网名导致无效数据

3.1 前因后果
  • 问题本质:用户输入空白内容作为网名或消息,导致系统处理无效数据。
  • 根本原因:客户端和服务端未对输入内容做非空验证。
  • 具体表现
    • 空网名:用户直接回车加入,服务端记录空名称,广播消息时无法标识来源。
    • 空消息:用户发送空格或回车,占用网络带宽和存储资源。
3.2 具体场景模拟

假设用户执行以下操作:

时间点 0ms: 用户输入空网名(直接回车) → 服务端允许加入。
时间点 100ms: 用户发送空消息(空格 + 回车) → 服务端广播“:  ”。
  • 服务端行为
    1. 网名为空的客户端加入,广播“系统广播: 加入了聊天室”。
    2. 空消息被广播到所有客户端,消息内容无意义。
3.3 Go 语言机制分析
  • 字符串处理strings.TrimSpace可过滤首尾空白字符,但需主动调用。
  • 输入流阻塞:客户端的ReadString可能读取到换行符,需显式检查内容是否为空。
3.4 解决方案
  1. 客户端验证
    • 循环读取网名直到非空:
      for {fmt.Print("请输入网名: ")name = strings.TrimSpace(reader.ReadString())if name != "" {break}fmt.Println("网名不能为空!")
      }
      
    • 发送消息前检查内容:
      message = strings.TrimSpace(input)
      if message == "" {fmt.Println("消息不能为空!")continue
      }
      
  2. 服务端二次过滤
    • 网名非空检查:
      if clientName == "" {utils.SendMessage(conn, []byte("ERROR: 网名不能为空"))return
      }
      
    • 消息内容过滤:
      msgContent := strings.TrimSpace(string(message))
      if msgContent == "" {log.Printf("客户端 %s 发送了空消息,已忽略", client.Name)continue
      }
      
3.5 总结类比
  • 问题类比:用户向邮箱发送空白信件,邮局仍派送,浪费资源。
  • 修复思路:邮局拒绝投递无内容信件,并要求寄件人填写有效地址。

问题 4:心跳超时导致僵尸连接

4.1 前因后果
  • 问题本质:客户端异常退出后,服务端未检测到离线状态,维持无效连接。
  • 根本原因:服务端仅依赖客户端显式退出信号,缺乏被动检测机制。
  • 具体表现
    • 客户端断网后,服务端持续等待消息,占用内存和连接资源。
    • Redis中online_users集合保留无效网名,影响新用户注册。
4.2 具体场景模拟

假设客户端因网络故障断开:

时间点 0ms: 服务端发送PING。
时间点 10ms: 客户端未响应(已断开)。
时间点 25ms: 服务端仍认为客户端在线,未清理资源。
  • 服务端行为
    1. 客户端连接残留,占用Clients集合和TCP端口。
    2. 新用户无法使用相同网名注册。
4.3 Go 语言机制分析
  • 计时器管理time.Timer需手动重置,避免误触发。
  • 协程泄漏风险:未关闭的协程可能持续占用内存。
4.4 解决方案
  1. 心跳检测与超时机制
    • 服务端每10秒发送PING:
      ticker := time.NewTicker(10 * time.Second)
      defer ticker.Stop()
      
    • 客户端响应PONG后重置超时计时器:
      case <-msgTicker.C:if message == "PONG" {heartbeatTimeout.Reset(15 * time.Second)}
      
  2. 超时强制断开
    • 若15秒未收到PONG,判定客户端离线:
      heartbeatTimeout := time.NewTimer(15 * time.Second)
      defer heartbeatTimeout.Stop()select {
      case <-heartbeatTimeout.C:cr.Leave <- clientreturn
      }
      
4.5 总结类比
  • 问题类比:电话通话中对方突然静默,但未挂断,导致线路一直被占用。
  • 修复思路:运营商设定“无响应超时”,若一段时间无声音,自动挂断释放线路。

总结

通过上述措施,项目实现了:

  1. 身份唯一性:Redis集合保障网名全局唯一。
  2. 数据有效性:双重验证过滤空输入。
  3. 连接健康性:心跳超时机制自动清理僵尸连接。

完整代码

client.go(客户端)

package mainimport ("bufio""fmt""log""net""os""strings""time""Learn/kaohe/redis2/utils"
)// handleServerMessages 负责处理从服务器接收到的消息
func handleServerMessages(conn *net.Conn) {reader := bufio.NewReader(*conn) // 创建一个读取器,用于从连接中读取消息for {message, err := utils.ReadMessage(reader) // 从服务器读取消息if err != nil {log.Println("接收服务器消息时出错:", err)// 如果连接断开,尝试重新连接newConn, ok := reconnect(*conn)if !ok {return // 如果重新连接失败,退出处理}*conn = newConn                 // 更新连接reader = bufio.NewReader(*conn) // 重新创建读取器continue}log.Printf("接收到消息: %s", string(message))fmt.Println(string(message)) // 打印消息到控制台// 如果收到的是 PING 消息,发送 PONG 响应if string(message) == "PING" {log.Printf("【心跳检测】接收到服务端 PING,发送 PONG 响应")err := utils.SendMessage(*conn, []byte("PONG"))if err != nil {log.Println("发送心跳响应时出错:", err)}}}
}// reconnect 尝试重新连接到服务器
func reconnect(oldConn net.Conn) (net.Conn, bool) {for i := 0; i < 3; i++ { // 最多尝试 3 次log.Printf("尝试第 %d 次重新连接服务器...", i+1)newConn, err := net.Dial("tcp", "localhost:8080") // 尝试连接服务器if err == nil {log.Println("重新连接服务器成功")oldConn.Close()      // 关闭旧连接return newConn, true // 返回新连接}time.Sleep(time.Duration(2<<uint(i)) * time.Second) // 指数退避等待}log.Println("多次尝试重新连接服务器失败,退出程序")return nil, false // 重新连接失败
}func main() {conn, err := net.Dial("tcp", "localhost:8080") // 连接到服务器if err != nil {log.Fatal("连接服务器时出错:", err)}defer conn.Close() // 程序结束时关闭连接go handleServerMessages(&conn) // 启动一个 goroutine 处理服务器消息reader := bufio.NewReader(os.Stdin) // 创建一个读取器,用于读取用户输入// 循环读取网名直到输入有效var name stringfor {fmt.Print("请输入你的网名: ")nameInput, err := reader.ReadString('\n') // 读取用户输入的网名if err != nil {log.Fatal("读取网名时出错:", err)}name = strings.TrimSpace(nameInput) // 去除多余空格if name == "" {fmt.Println("网名不能为空,请重新输入!")continue}break}err = utils.SendMessage(conn, []byte(name)) // 向服务器发送网名if err != nil {log.Fatal("发送网名时出错:", err)}fmt.Println("你已成功加入聊天室,可以开始聊天了!")for {message, err := reader.ReadString('\n') // 读取用户输入的消息if err != nil {log.Println("读取用户输入时出错:", err)break}message = strings.TrimSpace(message) // 去除多余空格if message == "" {fmt.Println("消息不能为空,请重新输入!")continue}if message == "/quit" { // 如果输入是 "/quit",退出聊天室fmt.Println("你已退出聊天室")utils.SendMessage(conn, []byte("/quit"))break}err = utils.SendMessage(conn, []byte(message)) // 向服务器发送消息log.Printf("DEBUG: 客户端尝试发送消息: %s, 错误: %v", message, err)if err != nil {log.Println("发送消息到服务器时出错:", err)// 如果发送失败,尝试重新连接newConn, ok := reconnect(conn)if !ok {return}conn = newConnerr = utils.SendMessage(conn, []byte(name)) // 重新发送网名if err != nil {log.Fatal("重新连接后发送网名时出错:", err)}}}
}

server.go(服务端)

package mainimport ("bufio""context""fmt""io""log""net""strings""sync""time""Learn/kaohe/redis2/utils""github.com/redis/go-redis/v9"
)// Client 表示一个客户端连接
type Client struct {Name string      // 客户端的网名Conn net.Conn    // 客户端的连接Send chan []byte // 用于向客户端发送消息的通道
}// ChatRoom 表示聊天室,管理所有客户端连接
type ChatRoom struct {Clients map[*Client]bool // 当前在线的客户端Join    chan *Client     // 新客户端加入的通道Leave   chan *Client     // 客户端离开的通道Message chan []byte      // 广播消息的通道Redis   *redis.Client    // Redis 客户端mu      sync.RWMutex     // 用于并发控制的互斥锁
}// NewChatRoom 创建一个新的聊天室
func NewChatRoom(redisClient *redis.Client) *ChatRoom {return &ChatRoom{Clients: make(map[*Client]bool), // 初始化客户端列表Join:    make(chan *Client),     // 初始化加入通道Leave:   make(chan *Client),     // 初始化离开通道Message: make(chan []byte, 100), // 初始化消息通道,缓冲大小为 100Redis:   redisClient,            // 初始化 Redis 客户端}
}// Run 聊天室的主循环,处理客户端的加入、离开和消息广播
func (cr *ChatRoom) Run() {for {select {case client := <-cr.Join: // 处理新客户端加入cr.mu.Lock()cr.Clients[client] = true // 将客户端添加到列表cr.mu.Unlock()message := fmt.Sprintf("系统广播: %s 加入了聊天室", client.Name)cr.Message <- []byte(message) // 广播加入消息case client := <-cr.Leave: // 处理客户端离开cr.mu.Lock()if _, ok := cr.Clients[client]; ok {delete(cr.Clients, client) // 从列表中删除客户端close(client.Send)         // 关闭客户端的发送通道}cr.mu.Unlock()if client.Name != "" {message := fmt.Sprintf("系统广播: %s 离开了聊天室", client.Name)cr.Message <- []byte(message) // 广播离开消息}case message := <-cr.Message: // 处理广播消息cr.mu.RLock()log.Printf("准备广播消息: %s", string(message))for client := range cr.Clients { // 向所有客户端发送消息select {case client.Send <- message: // 发送消息log.Printf("已发送消息到客户端 %s", client.Name)default:log.Printf("客户端 %s 的发送通道已满", client.Name)}}cr.mu.RUnlock()// 将消息存储到 Redisctx := context.Background()batchStoreToRedis(ctx, cr.Redis, message)}}
}// batchStoreToRedis 将消息批量存储到 Redis
func batchStoreToRedis(ctx context.Context, redisClient *redis.Client, message []byte) {const batchSize = 10 // 批量存储的大小// 从 Redis 获取当前队列中的消息queue := redisClient.LRange(ctx, "chat_messages_queue", 0, -1).Val()queue = append(queue, string(message)) // 将新消息添加到队列if len(queue) >= batchSize { // 如果队列达到批量大小pipe := redisClient.Pipeline() // 创建 Redis 管道for _, msg := range queue {// 将消息存储到 chat_messages 列表pipe.RPush(ctx, "chat_messages", msg)// 更新用户活跃度pipe.ZIncrBy(ctx, "user_activity", 1, extractUsername(msg))}_, _ = pipe.Exec(ctx)                       // 执行管道redisClient.Del(ctx, "chat_messages_queue") // 删除队列} else {// 如果队列未达到批量大小,将消息存储到队列redisClient.RPush(ctx, "chat_messages_queue", string(message))}
}// extractUsername 从消息中提取用户名
func extractUsername(message string) string {parts := strings.SplitN(message, ": ", 2)if len(parts) > 0 {return parts[0]}return ""
}// HandleClient 处理单个客户端的连接
func (cr *ChatRoom) HandleClient(conn net.Conn) {var clientName stringdefer func() {conn.Close()                                                 // 关闭连接cr.Leave <- &Client{Name: clientName, Conn: conn, Send: nil} // 通知聊天室客户端离开log.Printf("客户端连接已关闭: %s", conn.RemoteAddr())}()reader := bufio.NewReader(conn)             // 创建读取器nameBytes, err := utils.ReadMessage(reader) // 读取客户端的网名if err != nil {log.Println("读取网名时出错:", err)return}clientName = strings.TrimSpace(string(nameBytes))// 检查网名是否为空if clientName == "" {log.Println("客户端尝试使用空网名连接")utils.SendMessage(conn, []byte("ERROR: 网名不能为空"))return}// 检查网名是否已存在ctx := context.Background()exists, err := cr.Redis.SIsMember(ctx, "online_users", clientName).Result()if err != nil {log.Printf("检查网名 %s 时 Redis 出错: %v", clientName, err)utils.SendMessage(conn, []byte("ERROR: 服务器内部错误"))return}if exists {log.Printf("网名 %s 已存在,拒绝连接", clientName)utils.SendMessage(conn, []byte("ERROR: 网名已被占用,请更换其他名称"))return}// 添加网名到在线用户集合if _, err := cr.Redis.SAdd(ctx, "online_users", clientName).Result(); err != nil {log.Printf("存储网名 %s 到 Redis 失败: %v", clientName, err)utils.SendMessage(conn, []byte("ERROR: 服务器内部错误"))return}defer cr.Redis.SRem(ctx, "online_users", clientName)client := &Client{Name: clientName,Conn: conn,Send: make(chan []byte, 10), // 初始化发送通道}cr.Join <- client // 通知聊天室客户端加入// 启动一个 goroutine 处理发送消息go func() {for message := range client.Send {err := utils.SendMessage(client.Conn, message)if err != nil {log.Println("发送消息给客户端时出错:", err)cr.Leave <- client // 通知聊天室客户端离开break}}}()// 定时发送心跳检测ticker := time.NewTicker(10 * time.Second)// 定时读取消息msgTicker := time.NewTicker(50 * time.Millisecond)// 心跳超时检测heartbeatTimeout := time.NewTimer(15 * time.Second)defer func() {ticker.Stop()msgTicker.Stop()heartbeatTimeout.Stop()}()for {select {case <-ticker.C: // 心跳检测log.Printf("【心跳检测】向客户端 %s 发送 PING", client.Name)if err := utils.SendMessage(client.Conn, []byte("PING")); err != nil {log.Printf("【心跳异常】客户端 %s 心跳发送失败: %v", client.Name, err)cr.Leave <- clientreturn}heartbeatTimeout.Reset(15 * time.Second)case <-heartbeatTimeout.C:log.Printf("【心跳超时】客户端 %s 未响应 PONG,强制断开", client.Name)cr.Leave <- clientreturncase <-msgTicker.C: // 读取消息message, err := utils.ReadMessage(reader)if err != nil {if err == io.EOF {log.Printf("客户端 %s 主动断开连接", client.Name)} else {log.Printf("读取消息时出错: %v", err)}cr.Leave <- clientreturn}if string(message) == "PONG" { // 处理心跳响应log.Printf("【心跳响应】客户端 %s 返回 PONG", client.Name)heartbeatTimeout.Reset(15 * time.Second)continue}msgContent := strings.TrimSpace(string(message))if msgContent == "" {log.Printf("客户端 %s 发送了空消息,已忽略", client.Name)continue}log.Printf("接收到来自 %s 的消息: %s", client.Name, msgContent)// 构造完整消息并广播fullMessage := fmt.Sprintf("%s: %s", client.Name, msgContent)cr.Message <- []byte(fullMessage)}}
}// printActivityRankings 打印用户活跃度排名
func printActivityRankings(redisClient *redis.Client) {ctx := context.Background()topUsers, err := redisClient.ZRevRangeWithScores(ctx, "user_activity", 0, 9).Result()if err != nil {log.Println("获取活跃度排名时出错:", err)return}fmt.Println("活跃度排名:")for i, user := range topUsers {fmt.Printf("%d. %s: %.0f\n", i+1, user.Member.(string), user.Score)}
}// PrintConnectedClients 打印当前在线客户端
func (cr *ChatRoom) PrintConnectedClients() {cr.mu.RLock()defer cr.mu.RUnlock()log.Println("【活跃客户端】当前在线客户端列表:")for client := range cr.Clients {log.Printf(" - %s (IP: %s)", client.Name, client.Conn.RemoteAddr())}
}func main() {// 初始化 Redis 客户端redisClient := redis.NewClient(&redis.Options{Addr:     "localhost:6379",Password: "",DB:       0,})ctx := context.Background()_, err := redisClient.Ping(ctx).Result()if err != nil {log.Fatal("连接 Redis 时出错:", err)}// 监听端口listener, err := net.Listen("tcp", ":8080")if err != nil {log.Fatal("监听端口时出错:", err)}defer listener.Close()fmt.Println("聊天室服务器已启动,等待客户端连接...")// 创建聊天室chatRoom := NewChatRoom(redisClient)go chatRoom.Run() // 启动聊天室// 定期打印活跃度排名和在线客户端go func() {ticker := time.NewTicker(30 * time.Second)defer ticker.Stop()for range ticker.C {printActivityRankings(redisClient)chatRoom.PrintConnectedClients()}}()// 接受客户端连接for {conn, err := listener.Accept()if err != nil {log.Println("接受客户端连接时出错:", err)continue}go chatRoom.HandleClient(conn) // 处理客户端连接}
}

utils.go(工具函数,发送和接收消息)

package utilsimport ("bufio""encoding/binary""fmt""io""log""net"
)const maxMessageLength = 1 << 20 // 1MB,最大消息长度限制// SendMessage 向连接发送消息
func SendMessage(conn net.Conn, message []byte) error {length := uint32(len(message)) // 消息长度if length > maxMessageLength {log.Println("消息长度超出限制:", length)return fmt.Errorf("message too long")}// 写入消息长度err := binary.Write(conn, binary.BigEndian, length)if err != nil {log.Printf("写入消息长度时出错: %v", err)return err}// 写入消息内容_, err = conn.Write(message)if err != nil {log.Printf("写入消息内容时出错: %v", err)}return err
}// ReadMessage 从连接读取消息
func ReadMessage(reader *bufio.Reader) ([]byte, error) {var length uint32 // 消息长度err := binary.Read(reader, binary.BigEndian, &length)if err != nil {log.Printf("读取消息长度时出错: %v", err)return nil, err}if length > maxMessageLength {log.Println("消息长度超出限制:", length)return nil, fmt.Errorf("message too long")}// 读取消息内容message := make([]byte, length)_, err = io.ReadFull(reader, message)if err != nil {log.Printf("读取消息内容时出错: %v", err)return nil, err}return message, nil
}

如果对项目有任何问题或建议,欢迎在评论区留言!

相关文章:

基于Go语言实现一个网络聊天室(连接Redis版)

基于Go语言和Redis的实时聊天室项目详解 项目概述 在这个项目中&#xff0c;我们实现了一个基于Go语言和Redis的实时聊天室系统。该系统允许用户通过客户端连接到服务器&#xff0c;进行实时聊天&#xff0c;并且支持以下功能&#xff1a; 用户网名注册和验证消息广播和接收…...

深入解析 RocketMQ 中的 BrokerOuterAPI 组件​

在 RocketMQ 这一高性能分布式消息队列系统中&#xff0c;BrokerOuterAPI 组件犹如一座桥梁&#xff0c;连接着 Broker 与外部世界&#xff0c;在系统的运行、管理以及与其他组件交互过程中发挥着极为关键的作用。本文将深入探讨 BrokerOuterAPI 组件的内部机制、核心功能以及其…...

make_01_Program_06_:: 是什么功能

在 Makefile 中&#xff0c;:: 是一种特殊的分隔符&#xff0c;用于创建多重规则&#xff08;multiple rules&#xff09;。当您使用 :: 定义目标时&#xff0c;您可以为同一个目标指定多个命令或多个依赖关系。每个命令将在构建目标时按顺序执行&#xff0c;不会影响其他命令的…...

springboot2.7.x整合nacos+seata

1、nacos及seata下载地址 Nacos Server 下载 | Nacos 官网 Seata Java Download | Apache Seata 这里的seata版本用2.1.0版本。 2、启动nacos D:\本地-seata-nacos\nacos-server\bin>startup.cmd -m standalone 3、修改seata的conf下的application的内容 这里的数据库…...

为 IDEA 设置管理员权限

IDEA 安装目录 兼容性选择管理员身份运行程序 之后 IDEA 中的操作&#xff08;包括终端中的操作&#xff09;都是管理员权限的了...

Apache Doris 2.1.9 版本正式发布

亲爱的社区小伙伴们&#xff0c;Apache Doris 2.1.9 版本已正式发布。2.1.9 版本对湖仓一体、倒排索引、半结构化数据类型、查询优化器、执行引擎、存储管理进行了若干改进优化。欢迎大家下载使用。 官网下载&#xff1a;https://doris.apache.org/download GitHub 下载&…...

单片机学习笔记8.定时器

IAP15W4K58S4定时/计数器结构工作原理 定时器工作方式控制寄存器TMOD 不能进行位寻址&#xff0c;只能对整个寄存器进行赋值 寄存器地址D7D6D5D4D3D2D1D0复位值TMOD89HGATEC/(T低电平有效)M1M0GATEC/(T低电平有效)M1M000000000B D0-D3为T0控制&#xff0c;D4-D7为T1控制 GAT…...

vue3实现markdown预览和编辑

Markdown作为一种轻量级标记语言&#xff0c;已经成为开发者编写文档的首选工具之一。在Vue3项目中集成Markdown编辑和预览功能可以极大地提升内容管理体验。本文将介绍如何使用Vditor这一强大的开源Markdown编辑器在Vue3项目中实现这一功能。 一、Vditor简介 Vditor是一款浏…...

高并发秒杀系统接入层如何设计

博主介绍&#xff1a;✌全网粉丝5W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…...

C++异常处理 throw try catch

C 异常处理概述 C 异常处理机制提供了一种在程序运行时捕获错误或异常情况的方式。异常处理的目的是使得程序在遇到错误时能够优雅地终止或恢复&#xff0c;并防止程序出现崩溃。C 使用 try, throw, 和 catch 关键字来实现异常处理。 异常处理的基本结构&#xff1a; throw: …...

纯css实现环形进度条

需要在中实现一个定制化的环形进度条&#xff0c;最终效果如图&#xff1a; 使用代码 <divclass"circular-progress":style"{--progress: nextProgress,--color: endSliderColor,--size: isFull ? 60rpx : 90rpx,}"><div class"inner-conte…...

0基础 | 硬件 | 电源系统 一

降压电路LDO 几乎所有LDO都是基于此拓扑结构 图 拓扑结构 LDO属于线性电源&#xff0c;通过控制开关管的导通程度实现稳压&#xff0c;输出纹波小&#xff0c;无开关噪声 线性电源&#xff0c;IoutIin&#xff0c;发热功率P电压差△U*电流I&#xff0c;转换效率Vo/Vi LDO不适…...

详解 MySQL 索引的最左前缀匹配原则

MySQL 的最左前缀匹配原则主要是针对复合索引&#xff08;也称为联合索引&#xff09;而言的。其核心思想是&#xff1a;只有查询条件中包含索引最左侧&#xff08;第一列&#xff09;开始的连续一段列&#xff0c;才能让 MySQL 有效地利用该索引。 一、 复合索引的结构 复合…...

蓝桥杯算法题1

前言 双指针 唯一的雪花 Unique Snowflakes #include<iostream> #include<unordered_map> using namespace std; //这道题的意思就是在一个数组找一个最大的1区间的长度&#xff0c;这个区间里面没有重复数据 //如果暴力解法&#xff0c;每次求出以a[i]开头的满…...

【愚公系列】《高效使用DeepSeek》053-工艺参数调优

🌟【技术大咖愚公搬代码:全栈专家的成长之路,你关注的宝藏博主在这里!】🌟 📣开发者圈持续输出高质量干货的"愚公精神"践行者——全网百万开发者都在追更的顶级技术博主! 👉 江湖人称"愚公搬代码",用七年如一日的精神深耕技术领域,以"…...

Cortex-M系列MCU的位带操作

Cortex-M系列位带操作详解 位带&#xff08;Bit-Banding&#xff09;是Cortex-M3/M4等处理器提供的一种硬件特性&#xff0c;允许通过别名地址对内存或外设寄存器中的单个位进行原子读-改-写操作&#xff0c;无需禁用中断或使用互斥锁。以下是位带操作的完整指南&#xff1a; …...

大坑!GaussDB数据库批量插入数据变只读

大坑!GaussDB数据库批量插入数据变只读 GaussDB插入数据时变只读df和du为什么不一致GaussDB磁盘空间使用阈值GaussDB变只读怎么办正确删除表的姿势GaussDB插入数据时变只读 涉及的数据库版本为:GaussDB Kernel 505.1.0 build da28c417。 GuassDB TPCC灌数报错DML失败,数据…...

35信号和槽_信号槽小结

Qt 信号槽 1.信号槽是啥~~ 尤其是和 Linux 中的信号进行了对比&#xff08;三要素&#xff09; 1) 信号源 2) 信号的类型 3)信号的处理方式 2.信号槽 使用 connect 3.如何查阅文档. 一个控件&#xff0c;内置了哪些信号&#xff0c;信号都是何时触发 一…...

获取KUKA机器人诊断文件KRCdiag的方法

有时候在进行售后问题时需要获取KUKA机器人的诊断文件KRCdiag&#xff0c;通过以下方法可以获取KUKA机器人的诊断文件KRCdiag&#xff1a; 1、将U盘插到控制柜内的任意一个USB接口&#xff1b; 2、依次点【主菜单】—【文件】—【存档】—【USB&#xff08;控制柜&#xff09…...

一周学会Pandas2 Python数据处理与分析-NumPy数据类型

锋哥原创的Pandas2 Python数据处理与分析 视频教程&#xff1a; 2025版 Pandas2 Python数据处理与分析 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili NumPy 提供了丰富的数据类型&#xff08;dtypes&#xff09;&#xff0c;主要用于高效数值计算。以下是 NumPy 的主要…...

Redis核心机制-缓存、分布式锁

目录 缓存 缓存更新策略 定期生成 实时生成 缓存问题 缓存预热&#xff08;Cache preheating&#xff09; 缓存穿透&#xff08;Cache penetration&#xff09; 缓存雪崩&#xff08;Cache avalanche&#xff09; 缓存击穿&#xff08;Cache breakdown&#xff09; 分…...

Three.js 系列专题 1:入门与基础

什么是 Three.js? Three.js 是一个基于 WebGL 的 JavaScript 库,它简化了 3D 图形编程,让开发者无需深入了解底层 WebGL API 就能创建复杂的 3D 场景。它广泛应用于网页游戏、可视化、虚拟现实等领域。 学习目标 理解 Three.js 的核心组件:场景(Scene)、相机(Camera)…...

[C++面试] 如何在特定内存位置上分配内存、构造对象

new面试-高阶题&#xff08;可以主动讲给面试官&#xff09;&#xff0c;适用于内存池、高性能场景或需要精确控制内存布局的编程需求。 一、核心方法&#xff1a;placement new placement new 是C中一种特殊形式的new运算符&#xff0c;允许在预先分配好的内存地址上构造对象…...

针对Ansible执行脚本时报错“可执行文件格式错误”,以下是详细的解决步骤和示例

针对Ansible执行脚本时报错“可执行文件格式错误”&#xff0c;以下是详细的解决步骤和示例&#xff1a; 目录 一、错误原因分析二、解决方案1. 检查并添加可执行权限2. 修复Shebang行3. 转换文件格式&#xff08;Windows → Unix&#xff09;4. 检查脚本内容兼容性5. 显式指定…...

如何在Ubuntu上安装Dify

如何在Ubuntu上安装Dify 如何在Ubuntu上安装docker 使用apt安装 # Add Dockers official GPG key: sudo apt-get update sudo apt-get install ca-certificates curl sudo install -m 0755 -d /etc/apt/keyrings sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg…...

Python FastApi(13):APIRouter

如果你正在开发一个应用程序或 Web API&#xff0c;很少会将所有的内容都放在一个文件中。FastAPI 提供了一个方便的工具&#xff0c;可以在保持所有灵活性的同时构建你的应用程序。假设你的文件结构如下&#xff1a; . ├── app # 「app」是一个 Python 包…...

Harmony OS“一多” 详解:基于窗口变化的断点自适应实现

一、一多开发核心概念&#xff08;18N模式&#xff09; 目标&#xff1a;一次开发多端部署 解决的问题&#xff1a; 1、界面级一多&#xff1a;适配不同屏幕尺寸 2、功能级一多&#xff1a;设备功能兼容性处理(CanIUser) 3、工…...

【算法竞赛】状态压缩型背包问题经典应用(蓝桥杯2019A4分糖果)

在蓝桥杯中遇到的这道题&#xff0c;看上去比较普通&#xff0c;但其实蕴含了很巧妙的“状态压缩 背包”的思想&#xff0c;本文将从零到一&#xff0c;详细解析这个问题。 目录 一、题目 二、思路分析&#xff1a;状态压缩 最小覆盖 1. 本质&#xff1a;最小集合覆盖问题…...

kali——masscan

目录 前言 使用方法 前言 Masscan 是一款快速的端口扫描工具&#xff0c;在 Kali Linux 系统中常被用于网络安全评估和渗透测试。 使用方法 对单个IP进行端口扫描&#xff1a; masscan -p11-65535 192.168.238.131 扫描指定端口&#xff1a; masscan -p80,22 192.168.238.131…...

常微分方程 1

slow down and take your time 定积分应用回顾常微分方程的概述一阶微分方程可分离变量齐次方程三阶线性微分方程 一阶线性微分方程不定积分的被积分函数出现了绝对值梳理微分方程的基本概念题型 1 分离变量题型 2 齐次方程5.4 题型 3 一阶线性微分方程知识点5.55.6 尾声 定积分…...