当前位置: 首页 > news >正文

TeamTalk梳理概括

文章目录

  • 即时通讯重点概括
    • 展开聊聊单聊消息流转流程
    • 展开聊聊群聊消息流转流程
    • 群成员管理
    • 数据库
    • MySQL连接池设计
    • redis连接池设计
    • 文件传输原理
    • 实时性
    • 并发能力
  • db_proxy_server reactor响应处理流程
  • 单聊消息
    • 消息如何封装?如何保证对端完整解析一帧消息?协议格式?
    • 消息序号(msg_id )为什么使用redis生成?
    • 展开聊聊单聊消息流转流程
  • 展开聊聊群聊消息流转流程
    • 群聊消息流转
    • 怎么保证数据的不丢失以及重复包?
    • 消息未读计数是怎么实现的?
  • 群成员管理

即时通讯重点概括

展开聊聊单聊消息流转流程

  • 消息如何封装
  1. 怎么解决半包、粘包问题?
  2. 消息流转流程介绍下?
  • 消息序号(msg_id )在哪里生成以及生成方式
  • 怎么保证数据的不丢失以及重复包?
  1. 接收端收到数据后(收到消息区别于阅读消息)如何应答?
  2. 消息发送后服务器怎么应答?
  3. 消息发送时的seq有什么作用(业务层的ack机制)

展开聊聊群聊消息流转流程

  • 如何推送群聊
  • 群消息计数器(msg_id )
  • 群会话如何更新(每有一个人发送消息,则其他人都需要更新会话消息)

群成员管理

  • 如何创建群
  • 如何删除群
  • 怎么使用redis管理群成员
    消息未读计数是怎么实现的?
  • 服务器怎么保留消息未读计数(redis 单聊和群聊机制不同)
  • 客户端的未读消息计数从何而来
  • 客户端未读消息计数清0时向服务器发送了什么?服务器又是怎么清除未读消息计数(单聊和群聊机制不同)

数据库

  • 数据库表设计(表达笼统)
  • 密码存储方式
  • 未读消息如何体现
  • 聊天消息分表问题(单聊和群聊消息表)
  • 最近会话表

MySQL连接池设计

  • 为什么使用连接池
  • 连接池设置多大合适?

redis连接池设计

  • 为什么使用连接池
  • 连接池设置多大合适?

文件传输原理

  • 在线传输和离线传输有什么区别

实时性

  • Http(登录、图片服务)
  • Socket
  • websocket
    展开聊聊登录流程

并发能力

  • 如何做到百万并发
  • 如何做到千万并发

db_proxy_server reactor响应处理流程

  1. 数据入口 reactor CProxyConn:: HandlePduBuf
  2. 怎么初始化epoll+线程池
  3. 任务封装
  4. 把任务放入线程池
  5. 执行任务
  6. 把要回应的数据放入回复列表CProxyConn::SendResponsePdulist
  7. epoll所在线程读取回复列表的数据发给请求端

单聊消息

消息如何封装?如何保证对端完整解析一帧消息?协议格式?

  1. 答:消息封装采用包头(Header)+包体(Body)的格式。包头自定义格式如下代码所示,包体采用protobuf序列化。
typedef struct {uint32_t    length;        // the whole pdu lengthuint16_t    version;       // pdu version numberuint16_t    flag;          // not useduint16_t    service_id;    //uint16_t    command_id;    //uint16_t    seq_num;       // 包序号uint16_t    reversed;      // 保留
} PduHeader_t;
  1. 答:
  • 采用tcp保证数据传输可靠性
  • 通过包头的 length 字段标记一帧消息的长度
  • 通过service id 和 command id区分不同的命令(比如登录、退出等)
  • 解决数据TCP粘包(包头长度字段)、半包(放入网络库的缓冲区)问题
void CImConn::OnRead()
{for (;;){uint32_t free_buf_len = m_in_buf.GetAllocSize() - m_in_buf.GetWriteOffset();if (free_buf_len < READ_BUF_SIZE)m_in_buf.Extend(READ_BUF_SIZE);int ret = netlib_recv(m_handle, m_in_buf.GetBuffer() + m_in_buf.GetWriteOffset(), READ_BUF_SIZE);if (ret <= 0)break;m_recv_bytes += ret;m_in_buf.IncWriteOffset(ret);m_last_recv_tick = get_tick_count();}CImPdu* pPdu = NULL;try{while ( ( pPdu = CImPdu::ReadPdu(m_in_buf.GetBuffer(), m_in_buf.GetWriteOffset()) ) ){uint32_t pdu_len = pPdu->GetLength();HandlePdu(pPdu);m_in_buf.Read(NULL, pdu_len);delete pPdu;pPdu = NULL;
//                        ++g_recv_pkt_cnt;}} catch (CPduException& ex) {log("!!!catch exception, sid=%u, cid=%u, err_code=%u, err_msg=%s, close the connection ",ex.GetServiceId(), ex.GetCommandId(), ex.GetErrorCode(), ex.GetErrorMsg());if (pPdu) {delete pPdu;pPdu = NULL;}OnClose();}
}
message IMMsgData{//cmd id:                0x0301required uint32 from_user_id = 1;    //消息发送方required uint32 to_session_id = 2;    //消息接受方required uint32 msg_id = 3;required uint32 create_time = 4; required IM.BaseDefine.MsgType msg_type = 5;required bytes msg_data = 6;optional bytes attach_data = 20;
}message IMMsgDataAck{//cmd id:                0x0302required uint32 user_id = 1;    //发送此信令的用户idrequired uint32 session_id = 2;                                required uint32 msg_id = 3;required IM.BaseDefine.SessionType session_type = 4;
}

消息序号(msg_id )为什么使用redis生成?

  1. 消息ID(msg_id )的作用是防止消息乱序。
  2. 消息ID(msg_id )为什么这么设计?
    答:msg_id 存储在 unread 连接池所在的redis数据库。单聊 msg_id 的 key涉及到 nRelateId。nRelateId 从关系表(IMRelationShip :两个用户id的映射关系)中获取。
/***  获取会话关系ID*  对于群组,必须把nUserBId设置为群ID**  @param nUserAId  <#nUserAId description#>*  @param nUserBId  <#nUserBId description#>*  @param bAdd      <#bAdd description#>*  @param nStatus 0 获取未被删除会话,1获取所有。*/
uint32_t CRelationModel::getRelationId(uint32_t nUserAId, uint32_t nUserBId, bool bAdd)
{uint32_t nRelationId = INVALID_VALUE;if (nUserAId == 0 || nUserBId == 0) {log("invalied user id:%u->%u", nUserAId, nUserBId);return nRelationId;}CDBManager* pDBManager = CDBManager::getInstance();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");if (pDBConn){uint32_t nBigId = nUserAId > nUserBId ? nUserAId : nUserBId;uint32_t nSmallId = nUserAId > nUserBId ? nUserBId : nUserAId;string strSql = "select id from IMRelationShip where smallId=" + int2string(nSmallId) + " and bigId="+ int2string(nBigId) + " and status = 0";CResultSet* pResultSet = pDBConn->ExecuteQuery(strSql.c_str());if (pResultSet){while (pResultSet->Next()){nRelationId = pResultSet->GetInt("id");}delete pResultSet;}else{log("there is no result for sql:%s", strSql.c_str());}pDBManager->RelDBConn(pDBConn);if (nRelationId == INVALID_VALUE && bAdd){nRelationId = addRelation(nSmallId, nBigId);}}else{log("no db connection for teamtalk_slave");}return nRelationId;
}
  1. 群聊和单聊msg_id 的区别:key设置不同。
uint32_t CMessageModel::getMsgId(uint32_t nRelateId)
{uint32_t nMsgId = 0;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if(pCacheConn){string strKey = "msg_id_" + int2string(nRelateId);nMsgId = pCacheConn->incrBy(strKey, 1);pCacheManager->RelCacheConn(pCacheConn);}return nMsgId;
}/***  获取一个群组的msgId,自增,通过redis控制*  @param nGroupId 群Id*  @return 返回msgId*/
uint32_t CGroupMessageModel::getMsgId(uint32_t nGroupId)
{uint32_t nMsgId = 0;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if(pCacheConn){// TODOstring strKey = "group_msg_id_" + int2string(nGroupId);nMsgId = pCacheConn->incrBy(strKey, 1);pCacheManager->RelCacheConn(pCacheConn);}else{log("no cache connection for unread");}return nMsgId;
}

展开聊聊单聊消息流转流程

答:两个用户A给B发消息,用户A把聊天消息封装好以后发送给MsgServer;同时把消息进行持久化,将聊天消息发给这个 DBProxy(数据库代理服务),存储消息成功后,DBProxyServer组包应答MsgServer,MsgServer收到回复后组包应答Client A。如果 A 和 B 两个用户不在同一个 MsgServer 上,那么会通过这个 RouteServer 去中转Pdu包数据(广播给所有的MsgServer,MsgServer再广播给Client B),B收到消息后应答MsgServer,至此,流程结束。然后如果是一些热点数据,我们同时也会写Redis。
群聊消息

展开聊聊群聊消息流转流程

群聊消息流转

void CGroupChat::HandleGroupMessage(CImPdu* pPdu)
{IM::Message::IMMsgData msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));uint32_t from_user_id = msg.from_user_id();uint32_t to_group_id = msg.to_session_id();string msg_data = msg.msg_data();uint32_t msg_id = msg.msg_id();if (msg_id == 0) {log("HandleGroupMsg, write db failed, %u->%u. ", from_user_id, to_group_id);return;}uint8_t msg_type = msg.msg_type();CDbAttachData attach_data((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());log("HandleGroupMsg, %u->%u, msg id=%u. ", from_user_id, to_group_id, msg_id);CMsgConn* pFromConn = CImUserManager::GetInstance()->GetMsgConnByHandle(from_user_id,attach_data.GetHandle());if (pFromConn){//接收反馈IM::Message::IMMsgDataAck msg2;msg2.set_user_id(from_user_id);msg2.set_session_id(to_group_id);msg2.set_msg_id(msg_id);msg2.set_session_type(::IM::BaseDefine::SESSION_TYPE_GROUP);CImPdu pdu;pdu.SetPBMsg(&msg2);pdu.SetServiceId(SID_MSG);pdu.SetCommandId(CID_MSG_DATA_ACK);pdu.SetSeqNum(pPdu->GetSeqNum());pFromConn->SendPdu(&pdu);}CRouteServConn* pRouteConn = get_route_serv_conn();if (pRouteConn){pRouteConn->SendPdu(pPdu);}// 服务器没有群的信息,向DB服务器请求群信息,并带上消息作为附件,返回时在发送该消息给其他群成员//IM::BaseDefine::GroupVersionInfo group_version_info;CPduAttachData pduAttachData(ATTACH_TYPE_HANDLE_AND_PDU, attach_data.GetHandle(), pPdu->GetBodyLength(), pPdu->GetBodyData());IM::Group::IMGroupInfoListReq msg3;msg3.set_user_id(from_user_id);IM::BaseDefine::GroupVersionInfo* group_version_info = msg3.add_group_version_list();group_version_info->set_group_id(to_group_id);group_version_info->set_version(0);msg3.set_attach_data(pduAttachData.GetBuffer(), pduAttachData.GetLength());CImPdu pdu;pdu.SetPBMsg(&msg3);pdu.SetServiceId(SID_GROUP);pdu.SetCommandId(CID_GROUP_INFO_REQUEST);CDBServConn* pDbConn = get_db_serv_conn();if(pDbConn){pDbConn->SendPdu(&pdu);}
}
void CGroupChat::HandleGroupInfoResponse(CImPdu* pPdu)
{IM::Group::IMGroupInfoListRsp msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));uint32_t user_id = msg.user_id();uint32_t group_cnt = msg.group_info_list_size();CPduAttachData pduAttachData((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());log("HandleGroupInfoResponse, user_id=%u, group_cnt=%u. ", user_id, group_cnt);//此处是查询成员时使用,主要用于群消息从数据库获得msg_id后进行发送,一般此时group_cnt = 1if (pduAttachData.GetPduLength() > 0 && group_cnt > 0){IM::BaseDefine::GroupInfo group_info = msg.group_info_list(0);uint32_t group_id = group_info.group_id();log("GroupInfoRequest is send by server, group_id=%u ", group_id);std::set<uint32_t> group_member_set;for (uint32_t i = 0; i < group_info.group_member_list_size(); i++){uint32_t member_user_id = group_info.group_member_list(i);group_member_set.insert(member_user_id);}if (group_member_set.find(user_id) == group_member_set.end()){log("user_id=%u is not in group, group_id=%u. ", user_id, group_id);return;}IM::Message::IMMsgData msg2;CHECK_PB_PARSE_MSG(msg2.ParseFromArray(pduAttachData.GetPdu(), pduAttachData.GetPduLength()));CImPdu pdu;pdu.SetPBMsg(&msg2);pdu.SetServiceId(SID_MSG);pdu.SetCommandId(CID_MSG_DATA);//Push相关IM::Server::IMGroupGetShieldReq msg3;msg3.set_group_id(group_id);msg3.set_attach_data(pdu.GetBodyData(), pdu.GetBodyLength());for (uint32_t i = 0; i < group_info.group_member_list_size(); i++){uint32_t member_user_id = group_info.group_member_list(i);msg3.add_user_id(member_user_id);CImUser* pToImUser = CImUserManager::GetInstance()->GetImUserById(member_user_id);if (pToImUser){CMsgConn* pFromConn = NULL;if( member_user_id == user_id ){uint32_t reqHandle = pduAttachData.GetHandle();if(reqHandle != 0)pFromConn = CImUserManager::GetInstance()->GetMsgConnByHandle(user_id, reqHandle);}pToImUser->BroadcastData(pdu.GetBuffer(), pdu.GetLength(), pFromConn);}}CImPdu pdu2;pdu2.SetPBMsg(&msg3);pdu2.SetServiceId(SID_OTHER);pdu2.SetCommandId(CID_OTHER_GET_SHIELD_REQ);CDBServConn* pDbConn = get_db_serv_conn();if (pDbConn){pDbConn->SendPdu(&pdu2);}}else if (pduAttachData.GetPduLength() == 0){//正常获取群信息的返回CMsgConn* pConn = CImUserManager::GetInstance()->GetMsgConnByHandle(user_id, pduAttachData.GetHandle());if (pConn){msg.clear_attach_data();pPdu->SetPBMsg(&msg);pConn->SendPdu(pPdu);}}
}

同步群组聊天信息:群会话如何更新(每有一个人发送消息,则其他人都需要更新会话消息)
分析:
如果和单聊类似,实时更新会有大量操作数据库的成本。
某个群成员发消息时,存储消息成功后,会更新群的最新发言时间。
后续优化,将session放在redis中查询不方便???所以要分库分表???

void CGroupModel::updateGroupChat(uint32_t nGroupId)
{CDBManager* pDBManager = CDBManager::getInstance();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_master");if(pDBConn){uint32_t nNow = (uint32_t)time(NULL);string strSql = "update IMGroup set lastChated=" + int2string(nNow) + " where id=" + int2string(nGroupId);pDBConn->ExecuteUpdate(strSql.c_str());pDBManager->RelDBConn(pDBConn);}else{log("no db connection for teamtalk_master");}
}

CSyncCenter类,群会话的更新-独立的线程

CSyncCenter :: doSyncGroupChat

  1. 根据时间节点将需要更新会话的群id和最近的聊天时间读取出来放到map
  2. 保存当前时间到CSyncCenter
  3. 根据群id从redis读取群成员,然后遍历群成员更新会话信息。
/***  开启内网数据同步以及群组聊天记录同步*/
void CSyncCenter::startSync()
{
#ifdef _WIN32(void)CreateThread(NULL, 0, doSyncGroupChat, NULL, 0, &m_nGroupChatThreadId);
#else(void)pthread_create(&m_nGroupChatThreadId, NULL, doSyncGroupChat, NULL);
#endif
}
//谈取更新的时间大于之前更新的时间点,把对应的群id-nLastChat读取出来放到mapChangedGroup存储
/***  同步群组聊天信息**  @param arg NULL**  @return NULL*/
void* CSyncCenter::doSyncGroupChat(void* arg)
{m_bSyncGroupChatRuning = true;CDBManager* pDBManager = CDBManager::getInstance();map<uint32_t, uint32_t> mapChangedGroup;do{mapChangedGroup.clear();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");if(pDBConn){string strSql = "select id, lastChated from IMGroup where status=0 and lastChated >=" + int2string(m_pInstance->getLastUpdateGroup());CResultSet* pResult = pDBConn->ExecuteQuery(strSql.c_str());if(pResult){while (pResult->Next()) {uint32_t nGroupId = pResult->GetInt("id");uint32_t nLastChat = pResult->GetInt("lastChated");if(nLastChat != 0){mapChangedGroup[nGroupId] = nLastChat;}}delete pResult;}pDBManager->RelDBConn(pDBConn);}else{log("no db connection for teamtalk_slave");}m_pInstance->updateLastUpdateGroup(time(NULL));for (auto it=mapChangedGroup.begin(); it!=mapChangedGroup.end(); ++it){uint32_t nGroupId =it->first;list<uint32_t> lsUsers;uint32_t nUpdate = it->second;// 读取该群的群成员CGroupModel::getInstance()->getGroupUser(nGroupId, lsUsers);//遍历群成员,更新Sessionfor (auto it1=lsUsers.begin(); it1!=lsUsers.end(); ++it1){uint32_t nUserId = *it1;uint32_t nSessionId = INVALID_VALUE;nSessionId = CSessionModel::getInstance()->getSessionId(nUserId, nGroupId, IM::BaseDefine::SESSION_TYPE_GROUP, true);if(nSessionId != INVALID_VALUE){CSessionModel::getInstance()->updateSession(nSessionId, nUpdate);}else{CSessionModel::getInstance()->addSession(nUserId, nGroupId, IM::BaseDefine::SESSION_TYPE_GROUP);}}}
//    } while (!m_pInstance->m_pCondSync->waitTime(5*1000));} while (m_pInstance->m_bSyncGroupChatWaitting && !(m_pInstance->m_pCondGroupChat->waitTime(5*1000)));
//    } while(m_pInstance->m_bSyncGroupChatWaitting);m_bSyncGroupChatRuning = false;return NULL;
}

怎么保证数据的不丢失以及重复包?

  1. 包头的seq_num字段(包序号),未回复消息列表
  2. 业务层的ack机制,收到数据会回复ack

消息未读计数是怎么实现的?

服务器怎么保留消息未读计数(单聊和群聊)
7. 单聊和群聊消息未读计数机制为什么不同?
答:加入该群很大,有1000人,999个人的未读消息计数都+1,效率低下。
8. 单聊消息未读计数机制

  • key设计:“unread_” + int2string(nToId);field:int2string(nFromId),value:自增1
  • 发送消息时,将消息写入 mysql 消息表成功后,更新redis。
void CMessageModel::incMsgCount(uint32_t nFromId, uint32_t nToId)
{CacheManager* pCacheManager = CacheManager::getInstance();// increase message countCacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn) {pCacheConn->hincrBy("unread_" + int2string(nToId), int2string(nFromId), 1);pCacheManager->RelCacheConn(pCacheConn);} else {log("no cache connection to increase unread count: %d->%d", nFromId, nToId);}
}
9. 群聊消息未读计数机制
- 群总的消息计数key设计:int2string(nGroupId) + _im_group_msg;
- field:count
- 群内某个成员已经读取的消息计数key设计:
int2string(nUserId) + "_" + int2string(nGroupId) + _im_user_group
- field:count
所以:群内某个成员未读消息计数 = 群总消息数量 - 该成员已经读取的消息数量#define     GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX    "_im_group_msg"
#define     GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX     "_im_user_group"
#define     GROUP_COUNTER_SUBKEY_COUNTER_FIELD          "count"
增加群消息计数
/***  增加群消息计数*  @param nUserId  用户Id*  @param nGroupId 群组Id*  @return 成功返回true,失败返回false*/
bool CGroupMessageModel::incMessageCount(uint32_t nUserId, uint32_t nGroupId)
{bool bRet = false;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;pCacheConn->hincrBy(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD, 1);map<string, string> mapGroupCount;bool bRet = pCacheConn->hgetAll(strGroupKey, mapGroupCount);if(bRet){string strUserKey = int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;string strReply = pCacheConn->hmset(strUserKey, mapGroupCount);if(!strReply.empty()){bRet = true;}else{log("hmset %s failed !", strUserKey.c_str());}}else{log("hgetAll %s failed!", strGroupKey.c_str());}pCacheManager->RelCacheConn(pCacheConn);}else{log("no cache connection for unread");}return bRet;
}
获取用户群未读消息计数
/***  获取用户群未读消息计数*  @param nUserId       用户Id*  @param nTotalCnt     总条数*  @param lsUnreadCount 每个会话的未读信息包含了条数,最后一个消息的Id,最后一个消息的类型,最后一个消息的类容*/
void CGroupMessageModel::getUnreadMsgCount(uint32_t nUserId, uint32_t &nTotalCnt, list<IM::BaseDefine::UnreadInfo>& lsUnreadCount)
{list<uint32_t> lsGroupId;CGroupModel::getInstance()->getUserGroupIds(nUserId, lsGroupId, 0);uint32_t nCount = 0;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){for(auto it=lsGroupId.begin(); it!=lsGroupId.end(); ++it){uint32_t nGroupId = *it;string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;string strGroupCnt = pCacheConn->hget(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);if(strGroupCnt.empty()){
//                log("hget %s : count failed !", strGroupKey.c_str());continue;}uint32_t nGroupCnt = (uint32_t)(atoi(strGroupCnt.c_str()));string strUserKey = int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;string strUserCnt = pCacheConn->hget(strUserKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);uint32_t nUserCnt = ( strUserCnt.empty() ? 0 : ((uint32_t)atoi(strUserCnt.c_str())) );if(nGroupCnt >= nUserCnt) {nCount = nGroupCnt - nUserCnt;}if(nCount > 0){IM::BaseDefine::UnreadInfo cUnreadInfo;cUnreadInfo.set_session_id(nGroupId);cUnreadInfo.set_session_type(IM::BaseDefine::SESSION_TYPE_GROUP);cUnreadInfo.set_unread_cnt(nCount);nTotalCnt += nCount;string strMsgData;uint32_t nMsgId;IM::BaseDefine::MsgType nType;uint32_t nFromId;getLastMsg(nGroupId, nMsgId, strMsgData, nType, nFromId);if(IM::BaseDefine::MsgType_IsValid(nType)){cUnreadInfo.set_latest_msg_id(nMsgId);cUnreadInfo.set_latest_msg_data(strMsgData);cUnreadInfo.set_latest_msg_type(nType);cUnreadInfo.set_latest_msg_from_user_id(nFromId);lsUnreadCount.push_back(cUnreadInfo);}else{log("invalid msgType. userId=%u, groupId=%u, msgType=%u, msgId=%u", nUserId, nGroupId, nType, nMsgId);}}}pCacheManager->RelCacheConn(pCacheConn);}else{log("no cache connection for unread");}
}
清除未读消息
单聊和群聊清除未读消息都是调用如下函数
1. 单聊直接删掉key
2. 群聊将该成员的已读消息数量设置成群总消息数量。
m_handler_map.insert(make_pair(uint32_t(CID_MSG_READ_ACK), DB_PROXY::clearUnreadMsgCounter));
void CUserModel::clearUserCounter(uint32_t nUserId, uint32_t nPeerId, IM::BaseDefine::SessionType nSessionType)
{if(IM::BaseDefine::SessionType_IsValid(nSessionType)){CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){// Clear P2P msg Counterif(nSessionType == IM::BaseDefine::SESSION_TYPE_SINGLE){int nRet = pCacheConn->hdel("unread_" + int2string(nUserId), int2string(nPeerId));if(!nRet){log("hdel failed %d->%d", nPeerId, nUserId);}}// Clear Group msg Counterelse if(nSessionType == IM::BaseDefine::SESSION_TYPE_GROUP){string strGroupKey = int2string(nPeerId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;map<string, string> mapGroupCount;bool bRet = pCacheConn->hgetAll(strGroupKey, mapGroupCount);if(bRet){string strUserKey = int2string(nUserId) + "_" + int2string(nPeerId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;string strReply = pCacheConn->hmset(strUserKey, mapGroupCount);if(strReply.empty()) {log("hmset %s failed !", strUserKey.c_str());}}else{log("hgetall %s failed!", strGroupKey.c_str());}}pCacheManager->RelCacheConn(pCacheConn);}else{log("no cache connection for unread");}}else{log("invalid sessionType. userId=%u, fromId=%u, sessionType=%u", nUserId, nPeerId, nSessionType);}
}

群成员管理

  1. 为什么使用 redis 管理群成员?
    答:发群消息需要通知群成员,群成员很多从redis获取,提高效率。
  2. 群成员管理使用redis 缓存设计,以hash为存储结构,
  • key 为 “group_member_”+int2string(nGroupId);
  • field 为 userId(用户id)
  • Value 为 创建时间:int2string(nCreated)
  1. 加入成员:insertNewMember,插入mysql数据库的同时也插入redis缓存
    /***  修改群成员,增加或删除**  @param pPdu      收到的packet包指针*  @param conn_uuid 该包过来的socket 描述符*/void modifyMember(CImPdu* pPdu, uint32_t conn_uuid){IM::Group::IMGroupChangeMemberReq msg;IM::Group::IMGroupChangeMemberRsp msgResp;if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength())){uint32_t nUserId = msg.user_id();uint32_t nGroupId = msg.group_id();IM::BaseDefine::GroupModifyType nType = msg.change_type();if (IM::BaseDefine::GroupModifyType_IsValid(nType) &&CGroupModel::getInstance()->isValidateGroupId(nGroupId)) {CImPdu* pPduRes = new CImPdu;uint32_t nCnt = msg.member_id_list_size();set<uint32_t> setUserId;for(uint32_t i=0; i<nCnt;++i){setUserId.insert(msg.member_id_list(i));}list<uint32_t> lsCurUserId;bool bRet = CGroupModel::getInstance()->modifyGroupMember(nUserId, nGroupId, nType, setUserId, lsCurUserId);msgResp.set_user_id(nUserId);msgResp.set_group_id(nGroupId);msgResp.set_change_type(nType);msgResp.set_result_code(bRet?0:1);if(bRet){for(auto it=setUserId.begin(); it!=setUserId.end(); ++it){msgResp.add_chg_user_id_list(*it);}for(auto it=lsCurUserId.begin(); it!=lsCurUserId.end(); ++it){msgResp.add_cur_user_id_list(*it);}}log("userId=%u, groupId=%u, result=%u, changeCount:%u, currentCount=%u",nUserId, nGroupId,  bRet?0:1, msgResp.chg_user_id_list_size(), msgResp.cur_user_id_list_size());msgResp.set_attach_data(msg.attach_data());pPduRes->SetPBMsg(&msgResp);pPduRes->SetSeqNum(pPdu->GetSeqNum());pPduRes->SetServiceId(IM::BaseDefine::SID_GROUP);pPduRes->SetCommandId(IM::BaseDefine::CID_GROUP_CHANGE_MEMBER_RESPONSE);CProxyConn::AddResponsePdu(conn_uuid, pPduRes);}else{log("invalid groupModifyType or groupId. userId=%u, groupId=%u, groupModifyType=%u", nUserId, nGroupId, nType);}}else{log("parse pb failed");}}
bool CGroupModel::insertNewMember(uint32_t nGroupId, set<uint32_t>& setUsers)
{bool bRet = false;uint32_t nUserCnt = (uint32_t)setUsers.size();if(nGroupId != INVALID_VALUE &&  nUserCnt > 0){CDBManager* pDBManager = CDBManager::getInstance();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");if (pDBConn){uint32_t nCreated = (uint32_t)time(NULL);// 获取 已经存在群里的用户string strClause;bool bFirst = true;for (auto it=setUsers.begin(); it!=setUsers.end(); ++it){if(bFirst){bFirst = false;strClause = int2string(*it);}else{strClause += ("," + int2string(*it));}}string strSql = "select userId from IMGroupMember where groupId=" + int2string(nGroupId) + " and userId in (" + strClause + ")";CResultSet* pResult = pDBConn->ExecuteQuery(strSql.c_str());set<uint32_t> setHasUser;if(pResult){while (pResult->Next()) {setHasUser.insert(pResult->GetInt("userId"));}delete pResult;}else{log("no result for sql:%s", strSql.c_str());}pDBManager->RelDBConn(pDBConn);pDBConn = pDBManager->GetDBConn("teamtalk_master");if (pDBConn){CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("group_member");if (pCacheConn){// 设置已经存在群中人的状态if (!setHasUser.empty()){strClause.clear();bFirst = true;for (auto it=setHasUser.begin(); it!=setHasUser.end(); ++it) {if(bFirst){bFirst = false;strClause = int2string(*it);}else{strClause += ("," + int2string(*it));}}strSql = "update IMGroupMember set status=0, updated="+int2string(nCreated)+" where groupId=" + int2string(nGroupId) + " and userId in (" + strClause + ")";pDBConn->ExecuteUpdate(strSql.c_str());}strSql = "insert into IMGroupMember(`groupId`, `userId`, `status`, `created`, `updated`) values\(?,?,?,?,?)";//插入新成员auto it = setUsers.begin();uint32_t nStatus = 0;uint32_t nIncMemberCnt = 0;for (;it != setUsers.end();){uint32_t nUserId = *it;if(setHasUser.find(nUserId) == setHasUser.end()){CPrepareStatement* pStmt = new CPrepareStatement();if (pStmt->Init(pDBConn->GetMysql(), strSql)){uint32_t index = 0;pStmt->SetParam(index++, nGroupId);pStmt->SetParam(index++, nUserId);pStmt->SetParam(index++, nStatus);pStmt->SetParam(index++, nCreated);pStmt->SetParam(index++, nCreated);pStmt->ExecuteUpdate();++nIncMemberCnt;delete pStmt;}else{setUsers.erase(it++);delete pStmt;continue;}}++it;}if(nIncMemberCnt != 0){strSql = "update IMGroup set userCnt=userCnt+" + int2string(nIncMemberCnt) + " where id="+int2string(nGroupId);pDBConn->ExecuteUpdate(strSql.c_str());}//更新一份到redis中string strKey = "group_member_"+int2string(nGroupId);for(auto it = setUsers.begin(); it!=setUsers.end(); ++it){pCacheConn->hset(strKey, int2string(*it), int2string(nCreated));}pCacheManager->RelCacheConn(pCacheConn);bRet = true;}else{log("no cache connection");}pDBManager->RelDBConn(pDBConn);}else{log("no db connection for teamtalk_master");}}else{log("no db connection for teamtalk_slave");}}return bRet;
}
  1. 删除成员:removeMember;从mysql数据库删除的同时,也从redis缓存删除
bool CGroupModel::removeMember(uint32_t nGroupId, set<uint32_t> &setUser, list<uint32_t>& lsCurUserId)
{if(setUser.size() <= 0){return true;}bool bRet = false;CDBManager* pDBManager = CDBManager::getInstance();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_master");if(pDBConn){CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("group_member");if (pCacheConn){string strClause ;bool bFirst = true;for(auto it= setUser.begin(); it!=setUser.end();++it){if (bFirst) {bFirst = false;strClause = int2string(*it);}else{strClause += ("," + int2string(*it));}}string strSql = "update IMGroupMember set status=1 where  groupId =" + int2string(nGroupId) + " and userId in(" + strClause + ")";pDBConn->ExecuteUpdate(strSql.c_str());//从redis中删除成员string strKey = "group_member_"+ int2string(nGroupId);for (auto it=setUser.begin(); it!=setUser.end(); ++it) {string strField = int2string(*it);pCacheConn->hdel(strKey, strField);}pCacheManager->RelCacheConn(pCacheConn);bRet = true;}else{log("no cache connection");}pDBManager->RelDBConn(pDBConn);if (bRet){getGroupUser(nGroupId,lsCurUserId);}}else{log("no db connection for teamtalk_master");}return bRet;
}

redis连接池设计
6. 为什么使用连接池?
答:对象复用,减小频繁创建链接释放链接的开销时间。
7. CacheInstances=unread,group_set,token,sync,group_member 5 个连接池
8. 为什么分开不同的db redis?
答:方便扩展。
9. pool_name的意义?
答:抽象,不必关注redis是否分布式。

class CacheManager {
public:virtual ~CacheManager();static CacheManager* getInstance();int Init();CacheConn* GetCacheConn(const char* pool_name);void RelCacheConn(CacheConn* pCacheConn);
private:CacheManager();private:static CacheManager*         s_cache_manager;map<string, CachePool*>        m_cache_pool_map;
};
int CacheManager::Init()
{CConfigFileReader config_file("dbproxyserver.conf");//CacheInstances=unread,group_set,token,sync,group_memberchar* cache_instances = config_file.GetConfigName("CacheInstances");if (!cache_instances) {log("not configure CacheIntance");return 1;}char host[64];char port[64];char db[64];char maxconncnt[64];CStrExplode instances_name(cache_instances, ',');for (uint32_t i = 0; i < instances_name.GetItemCnt(); i++) {char* pool_name = instances_name.GetItem(i);//printf("%s", pool_name);snprintf(host, 64, "%s_host", pool_name);snprintf(port, 64, "%s_port", pool_name);snprintf(db, 64, "%s_db", pool_name);snprintf(maxconncnt, 64, "%s_maxconncnt", pool_name);char* cache_host = config_file.GetConfigName(host);char* str_cache_port = config_file.GetConfigName(port);char* str_cache_db = config_file.GetConfigName(db);char* str_max_conn_cnt = config_file.GetConfigName(maxconncnt);if (!cache_host || !str_cache_port || !str_cache_db || !str_max_conn_cnt) {log("not configure cache instance: %s", pool_name);return 2;}CachePool* pCachePool = new CachePool(pool_name, cache_host, atoi(str_cache_port),atoi(str_cache_db), atoi(str_max_conn_cnt));if (pCachePool->Init()) {log("Init cache pool failed");return 3;}m_cache_pool_map.insert(make_pair(pool_name, pCachePool));}return 0;
}

相关文章:

TeamTalk梳理概括

文章目录 即时通讯重点概括展开聊聊单聊消息流转流程展开聊聊群聊消息流转流程群成员管理数据库MySQL连接池设计redis连接池设计文件传输原理实时性并发能力 db_proxy_server reactor响应处理流程单聊消息消息如何封装&#xff1f;如何保证对端完整解析一帧消息&#xff1f;协议…...

构建“零工市场小程序”,服务灵活就业“大民生”

如今&#xff0c;灵活就业已成为现代劳动力市场的重要组成部分。然而&#xff0c;这一就业形态也面临着信息不对称、匹配效率低下等一系列挑战。为有效解决这些问题&#xff0c;构建一个高效、便捷的“零工市场小程序”显得尤为重要。 二、零工市场现状与挑战 市场规模与增长趋…...

【组件】前端js HEIC/HEIF 转换为JPEG、PNG或GIF格式 苹果格式

【组件】前端js HEIC/HEIF 转换为JPEG、PNG或GIF格式 Heic2any: Client-side conversion of HEIC/HEIF image files to JPEG,PNG, or GIF in the browser.https://alexcorvi.github.io/heic2any/#demo GitHub - alexcorvi/heic2any: Converting HEIF/HEIF image formats to PN…...

Vue3中slot插槽的几种使用实践

【1】默认插槽 父组件 <Category title"今日美食城市"><img :src"imgUrl" alt""> </Category>子组件 <div class"category"><h2>{{title}}</h2><slot>默认内容</slot> </div&g…...

SSH工具 MobaXterm的使用与快捷配置

软件下载/安装与链接服务器/本地虚拟机 文章目录 软件下载/安装与链接服务器/本地虚拟机软件下载软件安装使用软件链接非本地机器并设置用户密码我不想有确定密码的弹窗 其余便捷配置配置右键粘贴SSH链接设置 软件下载 如果你访问不了这个网址&#xff0c;可以评论区找博主或者…...

git 远程分支同步本地落后的有冲突的分支

如果你的本地分支已经修改了很多代码&#xff0c;但同时也已经落后于远程分支。这个时候你需要在主分支上拉最新的代码&#xff0c;然后切换到你的分支。 如主分支是 main &#xff0c;从分支是xing。 首先切换到子分支 $ git checkout xing 然后请求merge主分支main的代码 …...

如何基于Java解析国密数字证书

一、说明 随着信息安全的重要性日益凸显&#xff0c;数字证书在各种安全通信场景中扮演着至关重要的角色。国密算法&#xff0c;作为我国自主研发的加密算法标准&#xff0c;其应用也愈发广泛。然而&#xff0c;在Java环境中解析使用国密算法的数字证书时&#xff0c;我们可能…...

java实现系统文件管理

java实现系统文件管理 环境&#xff1a;jdk17springbootVueElementUI 背景&#xff1a;公司所做的项目需要别的系统向我们服务器上传文件&#xff0c;当我们需要查看这些文件什么时候上传的、文件数据是怎样的&#xff0c;只能去机房&#xff0c;排查问题效率较低&#xff0c;…...

pytorch快速入门(一)—— 基本工具及平台介绍

前言 该pytorch学习笔记应该配合b站小土堆的《pytorch深度学习快速入门教程》使用 环境配置&#xff1a;Anaconda Python编译器&#xff1a;pycharm、jupyter 两大法宝函数 dir&#xff08;&#xff09;&#xff1a;知道包中有什么东西&#xff08;函数 / 属性..…...

『功能项目』怪物的有限状态机【42】

本章项目成果展示 我们打开上一篇41项目优化 - 框架加载资源的项目&#xff0c; 本章要做的事情是按照框架的思想构建项目并完成怪物的自动巡逻状态&#xff0c;当主角靠近怪物时&#xff0c;怪物会朝向主角释放技能 首先新建脚本&#xff1a;BossCtrl.cs (通常把xxxCtrl.cs脚…...

【C++】模板进阶:深入解析模板特化

C语法相关知识点可以通过点击以下链接进行学习一起加油&#xff01;命名空间缺省参数与函数重载C相关特性类和对象-上篇类和对象-中篇类和对象-下篇日期类C/C内存管理模板初阶String使用String模拟实现Vector使用及其模拟实现List使用及其模拟实现容器适配器Stack与Queue 本章将…...

Python数据分析-世界上最富有的1000人

一、研究背景 随着全球化的加速发展和技术的进步&#xff0c;财富分配问题日益成为全球关注的焦点。财富的不平等现象日益明显&#xff0c;少数极富有的个人掌握了全球大部分的财富资源。了解全球最富有个人的财富分布及其背后的行业和国家因素&#xff0c;对于分析全球经济趋…...

CSS中隐藏滚动条的同时保留滚动功能

在CSS中&#xff0c;我们可以通过一些技巧来隐藏滚动条&#xff0c;同时保留滚动功能。以下是几种常用的方法和具体的实现步骤。 1. 使用 overflow 和 ::-webkit-scrollbar 这种方法适用于大多数现代浏览器。通过设置 overflow 属性启用滚动&#xff0c;同时利用 ::-webkit-s…...

我的标志:奇特的头像

<!DOCTYPE html> <html lang="zh-CN"> <head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>与妖为邻</title><style>figu…...

中国空间计算产业链发展分析

2024中国空间计算产业链拆解 空间计算设备主要包括AR、VR、MR等终端设备。VR设备通常包括头戴式显示器&#xff08;VR头盔&#xff09;、手柄或追踪器等组件&#xff0c;用以完全封闭用户视野&#xff0c;营造虚拟环境体验。这些设备配备高分辨率显示屏、内置传感器和跟踪器。 …...

DAY14信息打点-JS 架构框架识别泄漏提取API 接口枚举FUZZ 爬虫插件项目

本课意义&#xff1a; 1.如何从表现中的JS提取价值信息 2.如何从地址中FUZZ提取未知的JS文件 3.如何从JS开放框架WebPack进行测试 一、JS 前端架构-识别&分析 在JS中寻找更多的URL地址&#xff0c;在JS代码逻辑&#xff08;加密算法、APIKey配置、验证逻辑&#xff09;中进…...

TS - tsconfig.json 和 tsconfig.node.json 的关系,如何在TS 中使用 JS 不报错

目录 1&#xff0c;前言2&#xff0c;二者关系2.1&#xff0c;使用 3&#xff0c;遇到的问题3.1&#xff0c;TS 中使用 JS 1&#xff0c;前言 通过 Vite 创建的 Vue3 TS 项目&#xff0c;根目录下会有 tsconfig.json 和 tsconfig.node.json 文件&#xff0c;并且存在引用关系…...

revisiting拉普拉斯模板

二维向量的二阶微分是Hessian矩阵&#xff0c;拉普拉斯算子是将两个独立的二阶微分求和&#xff0c;对二阶微分的近似。 我不认同冈萨雷斯的8邻域拉普拉斯模板。 MATLAB图像处理工具箱中fspecial函数’laplacian’参数给的拉普拉斯模板&#xff1a; 对于数字滤波器&#xff…...

深入分析计算机网络性能指标

速率带宽吞吐量时延时延带宽积往返时间RTT利用率丢包率图书推荐内容简介作者简介 速率 连接在计算机网络上的主机在数字信道上传送比特的速率&#xff0c;也称为比特率或数据率。 基本单位&#xff1a;bit/s&#xff08;b/s、bps&#xff09; 常用单位&#xff1a;kb/s&#x…...

pyflink 安装和测试

FPY Warning! 安装 apache-Flink # pip install apache-Flink -i https://pypi.tuna.tsinghua.edu.cn/simple/ Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple/ Collecting apache-FlinkDownloading https://pypi.tuna.tsinghua.edu.cn/packages/7f/a3/ad502…...

微信小程序返回上一页监听

本文实现的是微信小程序在返回上一页时获取通知并自定义业务。 最简单的实现&#xff1a; 使用 wx.enableAlertBeforeUnload() 优点&#xff1a;快速接入 缺点&#xff1a;手势不能识别、无法自定义弹窗内容&#xff08;仅询问&#xff09; 方法二&#xff1a; page-conta…...

论爱情《态度》

我犹记得&#xff0c;当吴军的《态度》到手之后&#xff0c;从中间翻开的第一页&#xff0c;便是此。 “合适的人&#xff0c;会让你看到&#xff0c;和得到全世界” -- 第22封 其实在我初中、高中的时候&#xff0c;我便产生一个问题&#xff0c;为什么学校要禁止谈恋爱。 …...

【Redis】大key问题详解

目录 1、什么是大key2、大key的危害【1】阻塞风险【2】网络阻塞【3】内存不均【4】持久化问题 3、如何发现大key【1】使用内置命令【2】使用memory命令&#xff08;Redis 4.0&#xff09;【3】使用scan命令【4】监控工具 4、解决方案【1】拆分大key【2】使用合适的数据结构【3】…...

Photoshop2025(PS2025)软件及安装教程

在数字图像编辑领域&#xff0c;Adobe Photoshop 一直是无可争议的王者。如今&#xff0c;Photoshop 2025 重磅登场&#xff0c;再次为我们带来了惊喜与变革&#xff0c;进一步巩固了它在行业中的领先地位。 Photoshop 2025 在人工智能方面的升级令人瞩目。其全新的 “Magic Se…...

kafka 常用知识点

文章目录 前言kafka 常用知识点1. kafka 概念2. 消息共享和广播3. 分区和副本数量奇偶数 前言 如果您觉得有用的话&#xff0c;记得给博主点个赞&#xff0c;评论&#xff0c;收藏一键三连啊&#xff0c;写作不易啊^ _ ^。   而且听说点赞的人每天的运气都不会太差&#xff0…...

前端生成UUID

UUID(Universally Unique Identifier)是一种在分布式系统中广泛使用的标识符,具有全球唯一性。在前端开发中,生成可靠的UUID对于数据追踪、会话管理、缓存键生成等场景至关重要。接下来将深入探讨UUID的实现原理、前端生成方案及最佳实践。 一、UUID标准与版本 1. UUID结构…...

CRISPR-Cas系统的小型化研究进展-文献精读137

Progress in the miniaturization of CRISPR-Cas systems CRISPR-Cas系统的小型化研究进展 摘要 CRISPR-Cas基因编辑技术由于其简便性和高效性&#xff0c;已被广泛应用于生物学、医学、农学等领域的基础与应用研究。目前广泛使用的Cas核酸酶均具有较大的分子量&#xff08;通…...

Real SQL Programming

目录 SQL in Real Programs Options Stored Procedures Advantages of Stored Procedures Parameters in PSM SQL in Real Programs We have seen only how SQL is used at the generic query interface --- an environment where we sit at a terminal and ask queries …...

实验设计与分析(第6版,Montgomery)第5章析因设计引导5.7节思考题5.14 R语言解题

本文是实验设计与分析&#xff08;第6版&#xff0c;Montgomery著&#xff0c;傅珏生译) 第5章析因设计引导5.7节思考题5.14 R语言解题。主要涉及方差分析&#xff0c;正态假设检验&#xff0c;残差分析&#xff0c;交互作用图。 dataframe<-data.frame( strengthc(9.60,9.…...

告别重复 - Ansible 配置管理入门与核心价值

告别重复 - Ansible 配置管理入门与核心价值 还记得我们在 SRE 基础系列中反复强调的“减少琐事 (Toil)”和“拥抱自动化”吗?想象一下这些场景: 你需要部署一个新的 Web 服务集群,每台服务器都需要安装 Nginx、配置防火墙规则、同步 Web 内容、启动服务……手动操作不仅耗时…...