TeamTalk梳理概括
文章目录
- 即时通讯重点概括
- 展开聊聊单聊消息流转流程
- 展开聊聊群聊消息流转流程
- 群成员管理
- 数据库
- MySQL连接池设计
- redis连接池设计
- 文件传输原理
- 实时性
- 并发能力
- db_proxy_server reactor响应处理流程
- 单聊消息
- 消息如何封装?如何保证对端完整解析一帧消息?协议格式?
- 消息序号(msg_id )为什么使用redis生成?
- 展开聊聊单聊消息流转流程
- 展开聊聊群聊消息流转流程
- 群聊消息流转
- 怎么保证数据的不丢失以及重复包?
- 消息未读计数是怎么实现的?
- 群成员管理
即时通讯重点概括
展开聊聊单聊消息流转流程
- 消息如何封装
- 怎么解决半包、粘包问题?
- 消息流转流程介绍下?
- 消息序号(msg_id )在哪里生成以及生成方式
- 怎么保证数据的不丢失以及重复包?
- 接收端收到数据后(收到消息区别于阅读消息)如何应答?
- 消息发送后服务器怎么应答?
- 消息发送时的seq有什么作用(业务层的ack机制)
展开聊聊群聊消息流转流程
- 如何推送群聊
- 群消息计数器(msg_id )
- 群会话如何更新(每有一个人发送消息,则其他人都需要更新会话消息)
群成员管理
- 如何创建群
- 如何删除群
- 怎么使用redis管理群成员
消息未读计数是怎么实现的? - 服务器怎么保留消息未读计数(redis 单聊和群聊机制不同)
- 客户端的未读消息计数从何而来
- 客户端未读消息计数清0时向服务器发送了什么?服务器又是怎么清除未读消息计数(单聊和群聊机制不同)
数据库
- 数据库表设计(表达笼统)
- 密码存储方式
- 未读消息如何体现
- 聊天消息分表问题(单聊和群聊消息表)
- 最近会话表
MySQL连接池设计
- 为什么使用连接池
- 连接池设置多大合适?
redis连接池设计
- 为什么使用连接池
- 连接池设置多大合适?
文件传输原理
- 在线传输和离线传输有什么区别
实时性
- Http(登录、图片服务)
- Socket
- websocket
展开聊聊登录流程
并发能力
- 如何做到百万并发
- 如何做到千万并发
db_proxy_server reactor响应处理流程
- 数据入口 reactor CProxyConn:: HandlePduBuf
- 怎么初始化epoll+线程池
- 任务封装
- 把任务放入线程池
- 执行任务
- 把要回应的数据放入回复列表CProxyConn::SendResponsePdulist
- epoll所在线程读取回复列表的数据发给请求端
单聊消息
消息如何封装?如何保证对端完整解析一帧消息?协议格式?
- 答:消息封装采用包头(Header)+包体(Body)的格式。包头自定义格式如下代码所示,包体采用protobuf序列化。
typedef struct {uint32_t length; // the whole pdu lengthuint16_t version; // pdu version numberuint16_t flag; // not useduint16_t service_id; //uint16_t command_id; //uint16_t seq_num; // 包序号uint16_t reversed; // 保留
} PduHeader_t;
- 答:
- 采用tcp保证数据传输可靠性
- 通过包头的 length 字段标记一帧消息的长度
- 通过service id 和 command id区分不同的命令(比如登录、退出等)
- 解决数据TCP粘包(包头长度字段)、半包(放入网络库的缓冲区)问题
void CImConn::OnRead()
{for (;;){uint32_t free_buf_len = m_in_buf.GetAllocSize() - m_in_buf.GetWriteOffset();if (free_buf_len < READ_BUF_SIZE)m_in_buf.Extend(READ_BUF_SIZE);int ret = netlib_recv(m_handle, m_in_buf.GetBuffer() + m_in_buf.GetWriteOffset(), READ_BUF_SIZE);if (ret <= 0)break;m_recv_bytes += ret;m_in_buf.IncWriteOffset(ret);m_last_recv_tick = get_tick_count();}CImPdu* pPdu = NULL;try{while ( ( pPdu = CImPdu::ReadPdu(m_in_buf.GetBuffer(), m_in_buf.GetWriteOffset()) ) ){uint32_t pdu_len = pPdu->GetLength();HandlePdu(pPdu);m_in_buf.Read(NULL, pdu_len);delete pPdu;pPdu = NULL;
// ++g_recv_pkt_cnt;}} catch (CPduException& ex) {log("!!!catch exception, sid=%u, cid=%u, err_code=%u, err_msg=%s, close the connection ",ex.GetServiceId(), ex.GetCommandId(), ex.GetErrorCode(), ex.GetErrorMsg());if (pPdu) {delete pPdu;pPdu = NULL;}OnClose();}
}
message IMMsgData{//cmd id: 0x0301required uint32 from_user_id = 1; //消息发送方required uint32 to_session_id = 2; //消息接受方required uint32 msg_id = 3;required uint32 create_time = 4; required IM.BaseDefine.MsgType msg_type = 5;required bytes msg_data = 6;optional bytes attach_data = 20;
}message IMMsgDataAck{//cmd id: 0x0302required uint32 user_id = 1; //发送此信令的用户idrequired uint32 session_id = 2; required uint32 msg_id = 3;required IM.BaseDefine.SessionType session_type = 4;
}
消息序号(msg_id )为什么使用redis生成?
- 消息ID(msg_id )的作用是防止消息乱序。
- 消息ID(msg_id )为什么这么设计?
答:msg_id 存储在 unread 连接池所在的redis数据库。单聊 msg_id 的 key涉及到 nRelateId。nRelateId 从关系表(IMRelationShip :两个用户id的映射关系)中获取。
/*** 获取会话关系ID* 对于群组,必须把nUserBId设置为群ID** @param nUserAId <#nUserAId description#>* @param nUserBId <#nUserBId description#>* @param bAdd <#bAdd description#>* @param nStatus 0 获取未被删除会话,1获取所有。*/
uint32_t CRelationModel::getRelationId(uint32_t nUserAId, uint32_t nUserBId, bool bAdd)
{uint32_t nRelationId = INVALID_VALUE;if (nUserAId == 0 || nUserBId == 0) {log("invalied user id:%u->%u", nUserAId, nUserBId);return nRelationId;}CDBManager* pDBManager = CDBManager::getInstance();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");if (pDBConn){uint32_t nBigId = nUserAId > nUserBId ? nUserAId : nUserBId;uint32_t nSmallId = nUserAId > nUserBId ? nUserBId : nUserAId;string strSql = "select id from IMRelationShip where smallId=" + int2string(nSmallId) + " and bigId="+ int2string(nBigId) + " and status = 0";CResultSet* pResultSet = pDBConn->ExecuteQuery(strSql.c_str());if (pResultSet){while (pResultSet->Next()){nRelationId = pResultSet->GetInt("id");}delete pResultSet;}else{log("there is no result for sql:%s", strSql.c_str());}pDBManager->RelDBConn(pDBConn);if (nRelationId == INVALID_VALUE && bAdd){nRelationId = addRelation(nSmallId, nBigId);}}else{log("no db connection for teamtalk_slave");}return nRelationId;
}
- 群聊和单聊msg_id 的区别:key设置不同。
uint32_t CMessageModel::getMsgId(uint32_t nRelateId)
{uint32_t nMsgId = 0;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if(pCacheConn){string strKey = "msg_id_" + int2string(nRelateId);nMsgId = pCacheConn->incrBy(strKey, 1);pCacheManager->RelCacheConn(pCacheConn);}return nMsgId;
}/*** 获取一个群组的msgId,自增,通过redis控制* @param nGroupId 群Id* @return 返回msgId*/
uint32_t CGroupMessageModel::getMsgId(uint32_t nGroupId)
{uint32_t nMsgId = 0;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if(pCacheConn){// TODOstring strKey = "group_msg_id_" + int2string(nGroupId);nMsgId = pCacheConn->incrBy(strKey, 1);pCacheManager->RelCacheConn(pCacheConn);}else{log("no cache connection for unread");}return nMsgId;
}
展开聊聊单聊消息流转流程
答:两个用户A给B发消息,用户A把聊天消息封装好以后发送给MsgServer;同时把消息进行持久化,将聊天消息发给这个 DBProxy(数据库代理服务),存储消息成功后,DBProxyServer组包应答MsgServer,MsgServer收到回复后组包应答Client A。如果 A 和 B 两个用户不在同一个 MsgServer 上,那么会通过这个 RouteServer 去中转Pdu包数据(广播给所有的MsgServer,MsgServer再广播给Client B),B收到消息后应答MsgServer,至此,流程结束。然后如果是一些热点数据,我们同时也会写Redis。
群聊消息
展开聊聊群聊消息流转流程
群聊消息流转
void CGroupChat::HandleGroupMessage(CImPdu* pPdu)
{IM::Message::IMMsgData msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));uint32_t from_user_id = msg.from_user_id();uint32_t to_group_id = msg.to_session_id();string msg_data = msg.msg_data();uint32_t msg_id = msg.msg_id();if (msg_id == 0) {log("HandleGroupMsg, write db failed, %u->%u. ", from_user_id, to_group_id);return;}uint8_t msg_type = msg.msg_type();CDbAttachData attach_data((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());log("HandleGroupMsg, %u->%u, msg id=%u. ", from_user_id, to_group_id, msg_id);CMsgConn* pFromConn = CImUserManager::GetInstance()->GetMsgConnByHandle(from_user_id,attach_data.GetHandle());if (pFromConn){//接收反馈IM::Message::IMMsgDataAck msg2;msg2.set_user_id(from_user_id);msg2.set_session_id(to_group_id);msg2.set_msg_id(msg_id);msg2.set_session_type(::IM::BaseDefine::SESSION_TYPE_GROUP);CImPdu pdu;pdu.SetPBMsg(&msg2);pdu.SetServiceId(SID_MSG);pdu.SetCommandId(CID_MSG_DATA_ACK);pdu.SetSeqNum(pPdu->GetSeqNum());pFromConn->SendPdu(&pdu);}CRouteServConn* pRouteConn = get_route_serv_conn();if (pRouteConn){pRouteConn->SendPdu(pPdu);}// 服务器没有群的信息,向DB服务器请求群信息,并带上消息作为附件,返回时在发送该消息给其他群成员//IM::BaseDefine::GroupVersionInfo group_version_info;CPduAttachData pduAttachData(ATTACH_TYPE_HANDLE_AND_PDU, attach_data.GetHandle(), pPdu->GetBodyLength(), pPdu->GetBodyData());IM::Group::IMGroupInfoListReq msg3;msg3.set_user_id(from_user_id);IM::BaseDefine::GroupVersionInfo* group_version_info = msg3.add_group_version_list();group_version_info->set_group_id(to_group_id);group_version_info->set_version(0);msg3.set_attach_data(pduAttachData.GetBuffer(), pduAttachData.GetLength());CImPdu pdu;pdu.SetPBMsg(&msg3);pdu.SetServiceId(SID_GROUP);pdu.SetCommandId(CID_GROUP_INFO_REQUEST);CDBServConn* pDbConn = get_db_serv_conn();if(pDbConn){pDbConn->SendPdu(&pdu);}
}
void CGroupChat::HandleGroupInfoResponse(CImPdu* pPdu)
{IM::Group::IMGroupInfoListRsp msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));uint32_t user_id = msg.user_id();uint32_t group_cnt = msg.group_info_list_size();CPduAttachData pduAttachData((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());log("HandleGroupInfoResponse, user_id=%u, group_cnt=%u. ", user_id, group_cnt);//此处是查询成员时使用,主要用于群消息从数据库获得msg_id后进行发送,一般此时group_cnt = 1if (pduAttachData.GetPduLength() > 0 && group_cnt > 0){IM::BaseDefine::GroupInfo group_info = msg.group_info_list(0);uint32_t group_id = group_info.group_id();log("GroupInfoRequest is send by server, group_id=%u ", group_id);std::set<uint32_t> group_member_set;for (uint32_t i = 0; i < group_info.group_member_list_size(); i++){uint32_t member_user_id = group_info.group_member_list(i);group_member_set.insert(member_user_id);}if (group_member_set.find(user_id) == group_member_set.end()){log("user_id=%u is not in group, group_id=%u. ", user_id, group_id);return;}IM::Message::IMMsgData msg2;CHECK_PB_PARSE_MSG(msg2.ParseFromArray(pduAttachData.GetPdu(), pduAttachData.GetPduLength()));CImPdu pdu;pdu.SetPBMsg(&msg2);pdu.SetServiceId(SID_MSG);pdu.SetCommandId(CID_MSG_DATA);//Push相关IM::Server::IMGroupGetShieldReq msg3;msg3.set_group_id(group_id);msg3.set_attach_data(pdu.GetBodyData(), pdu.GetBodyLength());for (uint32_t i = 0; i < group_info.group_member_list_size(); i++){uint32_t member_user_id = group_info.group_member_list(i);msg3.add_user_id(member_user_id);CImUser* pToImUser = CImUserManager::GetInstance()->GetImUserById(member_user_id);if (pToImUser){CMsgConn* pFromConn = NULL;if( member_user_id == user_id ){uint32_t reqHandle = pduAttachData.GetHandle();if(reqHandle != 0)pFromConn = CImUserManager::GetInstance()->GetMsgConnByHandle(user_id, reqHandle);}pToImUser->BroadcastData(pdu.GetBuffer(), pdu.GetLength(), pFromConn);}}CImPdu pdu2;pdu2.SetPBMsg(&msg3);pdu2.SetServiceId(SID_OTHER);pdu2.SetCommandId(CID_OTHER_GET_SHIELD_REQ);CDBServConn* pDbConn = get_db_serv_conn();if (pDbConn){pDbConn->SendPdu(&pdu2);}}else if (pduAttachData.GetPduLength() == 0){//正常获取群信息的返回CMsgConn* pConn = CImUserManager::GetInstance()->GetMsgConnByHandle(user_id, pduAttachData.GetHandle());if (pConn){msg.clear_attach_data();pPdu->SetPBMsg(&msg);pConn->SendPdu(pPdu);}}
}
同步群组聊天信息:群会话如何更新(每有一个人发送消息,则其他人都需要更新会话消息)
分析:
如果和单聊类似,实时更新会有大量操作数据库的成本。
某个群成员发消息时,存储消息成功后,会更新群的最新发言时间。
后续优化,将session放在redis中查询不方便???所以要分库分表???
void CGroupModel::updateGroupChat(uint32_t nGroupId)
{CDBManager* pDBManager = CDBManager::getInstance();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_master");if(pDBConn){uint32_t nNow = (uint32_t)time(NULL);string strSql = "update IMGroup set lastChated=" + int2string(nNow) + " where id=" + int2string(nGroupId);pDBConn->ExecuteUpdate(strSql.c_str());pDBManager->RelDBConn(pDBConn);}else{log("no db connection for teamtalk_master");}
}
CSyncCenter类,群会话的更新-独立的线程
CSyncCenter :: doSyncGroupChat
- 根据时间节点将需要更新会话的群id和最近的聊天时间读取出来放到map
- 保存当前时间到CSyncCenter
- 根据群id从redis读取群成员,然后遍历群成员更新会话信息。
/*** 开启内网数据同步以及群组聊天记录同步*/
void CSyncCenter::startSync()
{
#ifdef _WIN32(void)CreateThread(NULL, 0, doSyncGroupChat, NULL, 0, &m_nGroupChatThreadId);
#else(void)pthread_create(&m_nGroupChatThreadId, NULL, doSyncGroupChat, NULL);
#endif
}
//谈取更新的时间大于之前更新的时间点,把对应的群id-nLastChat读取出来放到mapChangedGroup存储
/*** 同步群组聊天信息** @param arg NULL** @return NULL*/
void* CSyncCenter::doSyncGroupChat(void* arg)
{m_bSyncGroupChatRuning = true;CDBManager* pDBManager = CDBManager::getInstance();map<uint32_t, uint32_t> mapChangedGroup;do{mapChangedGroup.clear();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");if(pDBConn){string strSql = "select id, lastChated from IMGroup where status=0 and lastChated >=" + int2string(m_pInstance->getLastUpdateGroup());CResultSet* pResult = pDBConn->ExecuteQuery(strSql.c_str());if(pResult){while (pResult->Next()) {uint32_t nGroupId = pResult->GetInt("id");uint32_t nLastChat = pResult->GetInt("lastChated");if(nLastChat != 0){mapChangedGroup[nGroupId] = nLastChat;}}delete pResult;}pDBManager->RelDBConn(pDBConn);}else{log("no db connection for teamtalk_slave");}m_pInstance->updateLastUpdateGroup(time(NULL));for (auto it=mapChangedGroup.begin(); it!=mapChangedGroup.end(); ++it){uint32_t nGroupId =it->first;list<uint32_t> lsUsers;uint32_t nUpdate = it->second;// 读取该群的群成员CGroupModel::getInstance()->getGroupUser(nGroupId, lsUsers);//遍历群成员,更新Sessionfor (auto it1=lsUsers.begin(); it1!=lsUsers.end(); ++it1){uint32_t nUserId = *it1;uint32_t nSessionId = INVALID_VALUE;nSessionId = CSessionModel::getInstance()->getSessionId(nUserId, nGroupId, IM::BaseDefine::SESSION_TYPE_GROUP, true);if(nSessionId != INVALID_VALUE){CSessionModel::getInstance()->updateSession(nSessionId, nUpdate);}else{CSessionModel::getInstance()->addSession(nUserId, nGroupId, IM::BaseDefine::SESSION_TYPE_GROUP);}}}
// } while (!m_pInstance->m_pCondSync->waitTime(5*1000));} while (m_pInstance->m_bSyncGroupChatWaitting && !(m_pInstance->m_pCondGroupChat->waitTime(5*1000)));
// } while(m_pInstance->m_bSyncGroupChatWaitting);m_bSyncGroupChatRuning = false;return NULL;
}
怎么保证数据的不丢失以及重复包?
- 包头的seq_num字段(包序号),未回复消息列表
- 业务层的ack机制,收到数据会回复ack
消息未读计数是怎么实现的?
服务器怎么保留消息未读计数(单聊和群聊)
7. 单聊和群聊消息未读计数机制为什么不同?
答:加入该群很大,有1000人,999个人的未读消息计数都+1,效率低下。
8. 单聊消息未读计数机制
- key设计:“unread_” + int2string(nToId);field:int2string(nFromId),value:自增1
- 发送消息时,将消息写入 mysql 消息表成功后,更新redis。
void CMessageModel::incMsgCount(uint32_t nFromId, uint32_t nToId)
{CacheManager* pCacheManager = CacheManager::getInstance();// increase message countCacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn) {pCacheConn->hincrBy("unread_" + int2string(nToId), int2string(nFromId), 1);pCacheManager->RelCacheConn(pCacheConn);} else {log("no cache connection to increase unread count: %d->%d", nFromId, nToId);}
}
9. 群聊消息未读计数机制
- 群总的消息计数key设计:int2string(nGroupId) + _im_group_msg;
- field:count
- 群内某个成员已经读取的消息计数key设计:
int2string(nUserId) + "_" + int2string(nGroupId) + _im_user_group
- field:count
所以:群内某个成员未读消息计数 = 群总消息数量 - 该成员已经读取的消息数量#define GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX "_im_group_msg"
#define GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX "_im_user_group"
#define GROUP_COUNTER_SUBKEY_COUNTER_FIELD "count"
增加群消息计数
/*** 增加群消息计数* @param nUserId 用户Id* @param nGroupId 群组Id* @return 成功返回true,失败返回false*/
bool CGroupMessageModel::incMessageCount(uint32_t nUserId, uint32_t nGroupId)
{bool bRet = false;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;pCacheConn->hincrBy(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD, 1);map<string, string> mapGroupCount;bool bRet = pCacheConn->hgetAll(strGroupKey, mapGroupCount);if(bRet){string strUserKey = int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;string strReply = pCacheConn->hmset(strUserKey, mapGroupCount);if(!strReply.empty()){bRet = true;}else{log("hmset %s failed !", strUserKey.c_str());}}else{log("hgetAll %s failed!", strGroupKey.c_str());}pCacheManager->RelCacheConn(pCacheConn);}else{log("no cache connection for unread");}return bRet;
}
获取用户群未读消息计数
/*** 获取用户群未读消息计数* @param nUserId 用户Id* @param nTotalCnt 总条数* @param lsUnreadCount 每个会话的未读信息包含了条数,最后一个消息的Id,最后一个消息的类型,最后一个消息的类容*/
void CGroupMessageModel::getUnreadMsgCount(uint32_t nUserId, uint32_t &nTotalCnt, list<IM::BaseDefine::UnreadInfo>& lsUnreadCount)
{list<uint32_t> lsGroupId;CGroupModel::getInstance()->getUserGroupIds(nUserId, lsGroupId, 0);uint32_t nCount = 0;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){for(auto it=lsGroupId.begin(); it!=lsGroupId.end(); ++it){uint32_t nGroupId = *it;string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;string strGroupCnt = pCacheConn->hget(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);if(strGroupCnt.empty()){
// log("hget %s : count failed !", strGroupKey.c_str());continue;}uint32_t nGroupCnt = (uint32_t)(atoi(strGroupCnt.c_str()));string strUserKey = int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;string strUserCnt = pCacheConn->hget(strUserKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);uint32_t nUserCnt = ( strUserCnt.empty() ? 0 : ((uint32_t)atoi(strUserCnt.c_str())) );if(nGroupCnt >= nUserCnt) {nCount = nGroupCnt - nUserCnt;}if(nCount > 0){IM::BaseDefine::UnreadInfo cUnreadInfo;cUnreadInfo.set_session_id(nGroupId);cUnreadInfo.set_session_type(IM::BaseDefine::SESSION_TYPE_GROUP);cUnreadInfo.set_unread_cnt(nCount);nTotalCnt += nCount;string strMsgData;uint32_t nMsgId;IM::BaseDefine::MsgType nType;uint32_t nFromId;getLastMsg(nGroupId, nMsgId, strMsgData, nType, nFromId);if(IM::BaseDefine::MsgType_IsValid(nType)){cUnreadInfo.set_latest_msg_id(nMsgId);cUnreadInfo.set_latest_msg_data(strMsgData);cUnreadInfo.set_latest_msg_type(nType);cUnreadInfo.set_latest_msg_from_user_id(nFromId);lsUnreadCount.push_back(cUnreadInfo);}else{log("invalid msgType. userId=%u, groupId=%u, msgType=%u, msgId=%u", nUserId, nGroupId, nType, nMsgId);}}}pCacheManager->RelCacheConn(pCacheConn);}else{log("no cache connection for unread");}
}
清除未读消息
单聊和群聊清除未读消息都是调用如下函数
1. 单聊直接删掉key
2. 群聊将该成员的已读消息数量设置成群总消息数量。
m_handler_map.insert(make_pair(uint32_t(CID_MSG_READ_ACK), DB_PROXY::clearUnreadMsgCounter));
void CUserModel::clearUserCounter(uint32_t nUserId, uint32_t nPeerId, IM::BaseDefine::SessionType nSessionType)
{if(IM::BaseDefine::SessionType_IsValid(nSessionType)){CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){// Clear P2P msg Counterif(nSessionType == IM::BaseDefine::SESSION_TYPE_SINGLE){int nRet = pCacheConn->hdel("unread_" + int2string(nUserId), int2string(nPeerId));if(!nRet){log("hdel failed %d->%d", nPeerId, nUserId);}}// Clear Group msg Counterelse if(nSessionType == IM::BaseDefine::SESSION_TYPE_GROUP){string strGroupKey = int2string(nPeerId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;map<string, string> mapGroupCount;bool bRet = pCacheConn->hgetAll(strGroupKey, mapGroupCount);if(bRet){string strUserKey = int2string(nUserId) + "_" + int2string(nPeerId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;string strReply = pCacheConn->hmset(strUserKey, mapGroupCount);if(strReply.empty()) {log("hmset %s failed !", strUserKey.c_str());}}else{log("hgetall %s failed!", strGroupKey.c_str());}}pCacheManager->RelCacheConn(pCacheConn);}else{log("no cache connection for unread");}}else{log("invalid sessionType. userId=%u, fromId=%u, sessionType=%u", nUserId, nPeerId, nSessionType);}
}
群成员管理
- 为什么使用 redis 管理群成员?
答:发群消息需要通知群成员,群成员很多从redis获取,提高效率。 - 群成员管理使用redis 缓存设计,以hash为存储结构,
- key 为 “group_member_”+int2string(nGroupId);
- field 为 userId(用户id)
- Value 为 创建时间:int2string(nCreated)
- 加入成员:insertNewMember,插入mysql数据库的同时也插入redis缓存
/*** 修改群成员,增加或删除** @param pPdu 收到的packet包指针* @param conn_uuid 该包过来的socket 描述符*/void modifyMember(CImPdu* pPdu, uint32_t conn_uuid){IM::Group::IMGroupChangeMemberReq msg;IM::Group::IMGroupChangeMemberRsp msgResp;if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength())){uint32_t nUserId = msg.user_id();uint32_t nGroupId = msg.group_id();IM::BaseDefine::GroupModifyType nType = msg.change_type();if (IM::BaseDefine::GroupModifyType_IsValid(nType) &&CGroupModel::getInstance()->isValidateGroupId(nGroupId)) {CImPdu* pPduRes = new CImPdu;uint32_t nCnt = msg.member_id_list_size();set<uint32_t> setUserId;for(uint32_t i=0; i<nCnt;++i){setUserId.insert(msg.member_id_list(i));}list<uint32_t> lsCurUserId;bool bRet = CGroupModel::getInstance()->modifyGroupMember(nUserId, nGroupId, nType, setUserId, lsCurUserId);msgResp.set_user_id(nUserId);msgResp.set_group_id(nGroupId);msgResp.set_change_type(nType);msgResp.set_result_code(bRet?0:1);if(bRet){for(auto it=setUserId.begin(); it!=setUserId.end(); ++it){msgResp.add_chg_user_id_list(*it);}for(auto it=lsCurUserId.begin(); it!=lsCurUserId.end(); ++it){msgResp.add_cur_user_id_list(*it);}}log("userId=%u, groupId=%u, result=%u, changeCount:%u, currentCount=%u",nUserId, nGroupId, bRet?0:1, msgResp.chg_user_id_list_size(), msgResp.cur_user_id_list_size());msgResp.set_attach_data(msg.attach_data());pPduRes->SetPBMsg(&msgResp);pPduRes->SetSeqNum(pPdu->GetSeqNum());pPduRes->SetServiceId(IM::BaseDefine::SID_GROUP);pPduRes->SetCommandId(IM::BaseDefine::CID_GROUP_CHANGE_MEMBER_RESPONSE);CProxyConn::AddResponsePdu(conn_uuid, pPduRes);}else{log("invalid groupModifyType or groupId. userId=%u, groupId=%u, groupModifyType=%u", nUserId, nGroupId, nType);}}else{log("parse pb failed");}}
bool CGroupModel::insertNewMember(uint32_t nGroupId, set<uint32_t>& setUsers)
{bool bRet = false;uint32_t nUserCnt = (uint32_t)setUsers.size();if(nGroupId != INVALID_VALUE && nUserCnt > 0){CDBManager* pDBManager = CDBManager::getInstance();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");if (pDBConn){uint32_t nCreated = (uint32_t)time(NULL);// 获取 已经存在群里的用户string strClause;bool bFirst = true;for (auto it=setUsers.begin(); it!=setUsers.end(); ++it){if(bFirst){bFirst = false;strClause = int2string(*it);}else{strClause += ("," + int2string(*it));}}string strSql = "select userId from IMGroupMember where groupId=" + int2string(nGroupId) + " and userId in (" + strClause + ")";CResultSet* pResult = pDBConn->ExecuteQuery(strSql.c_str());set<uint32_t> setHasUser;if(pResult){while (pResult->Next()) {setHasUser.insert(pResult->GetInt("userId"));}delete pResult;}else{log("no result for sql:%s", strSql.c_str());}pDBManager->RelDBConn(pDBConn);pDBConn = pDBManager->GetDBConn("teamtalk_master");if (pDBConn){CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("group_member");if (pCacheConn){// 设置已经存在群中人的状态if (!setHasUser.empty()){strClause.clear();bFirst = true;for (auto it=setHasUser.begin(); it!=setHasUser.end(); ++it) {if(bFirst){bFirst = false;strClause = int2string(*it);}else{strClause += ("," + int2string(*it));}}strSql = "update IMGroupMember set status=0, updated="+int2string(nCreated)+" where groupId=" + int2string(nGroupId) + " and userId in (" + strClause + ")";pDBConn->ExecuteUpdate(strSql.c_str());}strSql = "insert into IMGroupMember(`groupId`, `userId`, `status`, `created`, `updated`) values\(?,?,?,?,?)";//插入新成员auto it = setUsers.begin();uint32_t nStatus = 0;uint32_t nIncMemberCnt = 0;for (;it != setUsers.end();){uint32_t nUserId = *it;if(setHasUser.find(nUserId) == setHasUser.end()){CPrepareStatement* pStmt = new CPrepareStatement();if (pStmt->Init(pDBConn->GetMysql(), strSql)){uint32_t index = 0;pStmt->SetParam(index++, nGroupId);pStmt->SetParam(index++, nUserId);pStmt->SetParam(index++, nStatus);pStmt->SetParam(index++, nCreated);pStmt->SetParam(index++, nCreated);pStmt->ExecuteUpdate();++nIncMemberCnt;delete pStmt;}else{setUsers.erase(it++);delete pStmt;continue;}}++it;}if(nIncMemberCnt != 0){strSql = "update IMGroup set userCnt=userCnt+" + int2string(nIncMemberCnt) + " where id="+int2string(nGroupId);pDBConn->ExecuteUpdate(strSql.c_str());}//更新一份到redis中string strKey = "group_member_"+int2string(nGroupId);for(auto it = setUsers.begin(); it!=setUsers.end(); ++it){pCacheConn->hset(strKey, int2string(*it), int2string(nCreated));}pCacheManager->RelCacheConn(pCacheConn);bRet = true;}else{log("no cache connection");}pDBManager->RelDBConn(pDBConn);}else{log("no db connection for teamtalk_master");}}else{log("no db connection for teamtalk_slave");}}return bRet;
}
- 删除成员:removeMember;从mysql数据库删除的同时,也从redis缓存删除
bool CGroupModel::removeMember(uint32_t nGroupId, set<uint32_t> &setUser, list<uint32_t>& lsCurUserId)
{if(setUser.size() <= 0){return true;}bool bRet = false;CDBManager* pDBManager = CDBManager::getInstance();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_master");if(pDBConn){CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("group_member");if (pCacheConn){string strClause ;bool bFirst = true;for(auto it= setUser.begin(); it!=setUser.end();++it){if (bFirst) {bFirst = false;strClause = int2string(*it);}else{strClause += ("," + int2string(*it));}}string strSql = "update IMGroupMember set status=1 where groupId =" + int2string(nGroupId) + " and userId in(" + strClause + ")";pDBConn->ExecuteUpdate(strSql.c_str());//从redis中删除成员string strKey = "group_member_"+ int2string(nGroupId);for (auto it=setUser.begin(); it!=setUser.end(); ++it) {string strField = int2string(*it);pCacheConn->hdel(strKey, strField);}pCacheManager->RelCacheConn(pCacheConn);bRet = true;}else{log("no cache connection");}pDBManager->RelDBConn(pDBConn);if (bRet){getGroupUser(nGroupId,lsCurUserId);}}else{log("no db connection for teamtalk_master");}return bRet;
}
redis连接池设计
6. 为什么使用连接池?
答:对象复用,减小频繁创建链接释放链接的开销时间。
7. CacheInstances=unread,group_set,token,sync,group_member 5 个连接池
8. 为什么分开不同的db redis?
答:方便扩展。
9. pool_name的意义?
答:抽象,不必关注redis是否分布式。
class CacheManager {
public:virtual ~CacheManager();static CacheManager* getInstance();int Init();CacheConn* GetCacheConn(const char* pool_name);void RelCacheConn(CacheConn* pCacheConn);
private:CacheManager();private:static CacheManager* s_cache_manager;map<string, CachePool*> m_cache_pool_map;
};
int CacheManager::Init()
{CConfigFileReader config_file("dbproxyserver.conf");//CacheInstances=unread,group_set,token,sync,group_memberchar* cache_instances = config_file.GetConfigName("CacheInstances");if (!cache_instances) {log("not configure CacheIntance");return 1;}char host[64];char port[64];char db[64];char maxconncnt[64];CStrExplode instances_name(cache_instances, ',');for (uint32_t i = 0; i < instances_name.GetItemCnt(); i++) {char* pool_name = instances_name.GetItem(i);//printf("%s", pool_name);snprintf(host, 64, "%s_host", pool_name);snprintf(port, 64, "%s_port", pool_name);snprintf(db, 64, "%s_db", pool_name);snprintf(maxconncnt, 64, "%s_maxconncnt", pool_name);char* cache_host = config_file.GetConfigName(host);char* str_cache_port = config_file.GetConfigName(port);char* str_cache_db = config_file.GetConfigName(db);char* str_max_conn_cnt = config_file.GetConfigName(maxconncnt);if (!cache_host || !str_cache_port || !str_cache_db || !str_max_conn_cnt) {log("not configure cache instance: %s", pool_name);return 2;}CachePool* pCachePool = new CachePool(pool_name, cache_host, atoi(str_cache_port),atoi(str_cache_db), atoi(str_max_conn_cnt));if (pCachePool->Init()) {log("Init cache pool failed");return 3;}m_cache_pool_map.insert(make_pair(pool_name, pCachePool));}return 0;
}
相关文章:
TeamTalk梳理概括
文章目录 即时通讯重点概括展开聊聊单聊消息流转流程展开聊聊群聊消息流转流程群成员管理数据库MySQL连接池设计redis连接池设计文件传输原理实时性并发能力 db_proxy_server reactor响应处理流程单聊消息消息如何封装?如何保证对端完整解析一帧消息?协议…...

构建“零工市场小程序”,服务灵活就业“大民生”
如今,灵活就业已成为现代劳动力市场的重要组成部分。然而,这一就业形态也面临着信息不对称、匹配效率低下等一系列挑战。为有效解决这些问题,构建一个高效、便捷的“零工市场小程序”显得尤为重要。 二、零工市场现状与挑战 市场规模与增长趋…...

【组件】前端js HEIC/HEIF 转换为JPEG、PNG或GIF格式 苹果格式
【组件】前端js HEIC/HEIF 转换为JPEG、PNG或GIF格式 Heic2any: Client-side conversion of HEIC/HEIF image files to JPEG,PNG, or GIF in the browser.https://alexcorvi.github.io/heic2any/#demo GitHub - alexcorvi/heic2any: Converting HEIF/HEIF image formats to PN…...
Vue3中slot插槽的几种使用实践
【1】默认插槽 父组件 <Category title"今日美食城市"><img :src"imgUrl" alt""> </Category>子组件 <div class"category"><h2>{{title}}</h2><slot>默认内容</slot> </div&g…...

SSH工具 MobaXterm的使用与快捷配置
软件下载/安装与链接服务器/本地虚拟机 文章目录 软件下载/安装与链接服务器/本地虚拟机软件下载软件安装使用软件链接非本地机器并设置用户密码我不想有确定密码的弹窗 其余便捷配置配置右键粘贴SSH链接设置 软件下载 如果你访问不了这个网址,可以评论区找博主或者…...
git 远程分支同步本地落后的有冲突的分支
如果你的本地分支已经修改了很多代码,但同时也已经落后于远程分支。这个时候你需要在主分支上拉最新的代码,然后切换到你的分支。 如主分支是 main ,从分支是xing。 首先切换到子分支 $ git checkout xing 然后请求merge主分支main的代码 …...

如何基于Java解析国密数字证书
一、说明 随着信息安全的重要性日益凸显,数字证书在各种安全通信场景中扮演着至关重要的角色。国密算法,作为我国自主研发的加密算法标准,其应用也愈发广泛。然而,在Java环境中解析使用国密算法的数字证书时,我们可能…...

java实现系统文件管理
java实现系统文件管理 环境:jdk17springbootVueElementUI 背景:公司所做的项目需要别的系统向我们服务器上传文件,当我们需要查看这些文件什么时候上传的、文件数据是怎样的,只能去机房,排查问题效率较低,…...
pytorch快速入门(一)—— 基本工具及平台介绍
前言 该pytorch学习笔记应该配合b站小土堆的《pytorch深度学习快速入门教程》使用 环境配置:Anaconda Python编译器:pycharm、jupyter 两大法宝函数 dir():知道包中有什么东西(函数 / 属性..…...

『功能项目』怪物的有限状态机【42】
本章项目成果展示 我们打开上一篇41项目优化 - 框架加载资源的项目, 本章要做的事情是按照框架的思想构建项目并完成怪物的自动巡逻状态,当主角靠近怪物时,怪物会朝向主角释放技能 首先新建脚本:BossCtrl.cs (通常把xxxCtrl.cs脚…...

【C++】模板进阶:深入解析模板特化
C语法相关知识点可以通过点击以下链接进行学习一起加油!命名空间缺省参数与函数重载C相关特性类和对象-上篇类和对象-中篇类和对象-下篇日期类C/C内存管理模板初阶String使用String模拟实现Vector使用及其模拟实现List使用及其模拟实现容器适配器Stack与Queue 本章将…...

Python数据分析-世界上最富有的1000人
一、研究背景 随着全球化的加速发展和技术的进步,财富分配问题日益成为全球关注的焦点。财富的不平等现象日益明显,少数极富有的个人掌握了全球大部分的财富资源。了解全球最富有个人的财富分布及其背后的行业和国家因素,对于分析全球经济趋…...
CSS中隐藏滚动条的同时保留滚动功能
在CSS中,我们可以通过一些技巧来隐藏滚动条,同时保留滚动功能。以下是几种常用的方法和具体的实现步骤。 1. 使用 overflow 和 ::-webkit-scrollbar 这种方法适用于大多数现代浏览器。通过设置 overflow 属性启用滚动,同时利用 ::-webkit-s…...

我的标志:奇特的头像
<!DOCTYPE html> <html lang="zh-CN"> <head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>与妖为邻</title><style>figu…...

中国空间计算产业链发展分析
2024中国空间计算产业链拆解 空间计算设备主要包括AR、VR、MR等终端设备。VR设备通常包括头戴式显示器(VR头盔)、手柄或追踪器等组件,用以完全封闭用户视野,营造虚拟环境体验。这些设备配备高分辨率显示屏、内置传感器和跟踪器。 …...

DAY14信息打点-JS 架构框架识别泄漏提取API 接口枚举FUZZ 爬虫插件项目
本课意义: 1.如何从表现中的JS提取价值信息 2.如何从地址中FUZZ提取未知的JS文件 3.如何从JS开放框架WebPack进行测试 一、JS 前端架构-识别&分析 在JS中寻找更多的URL地址,在JS代码逻辑(加密算法、APIKey配置、验证逻辑)中进…...

TS - tsconfig.json 和 tsconfig.node.json 的关系,如何在TS 中使用 JS 不报错
目录 1,前言2,二者关系2.1,使用 3,遇到的问题3.1,TS 中使用 JS 1,前言 通过 Vite 创建的 Vue3 TS 项目,根目录下会有 tsconfig.json 和 tsconfig.node.json 文件,并且存在引用关系…...

revisiting拉普拉斯模板
二维向量的二阶微分是Hessian矩阵,拉普拉斯算子是将两个独立的二阶微分求和,对二阶微分的近似。 我不认同冈萨雷斯的8邻域拉普拉斯模板。 MATLAB图像处理工具箱中fspecial函数’laplacian’参数给的拉普拉斯模板: 对于数字滤波器ÿ…...

深入分析计算机网络性能指标
速率带宽吞吐量时延时延带宽积往返时间RTT利用率丢包率图书推荐内容简介作者简介 速率 连接在计算机网络上的主机在数字信道上传送比特的速率,也称为比特率或数据率。 基本单位:bit/s(b/s、bps) 常用单位:kb/s&#x…...

pyflink 安装和测试
FPY Warning! 安装 apache-Flink # pip install apache-Flink -i https://pypi.tuna.tsinghua.edu.cn/simple/ Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple/ Collecting apache-FlinkDownloading https://pypi.tuna.tsinghua.edu.cn/packages/7f/a3/ad502…...

以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...

【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...

【Linux】自动化构建-Make/Makefile
前言 上文我们讲到了Linux中的编译器gcc/g 【Linux】编译器gcc/g及其库的详细介绍-CSDN博客 本来我们将一个对于编译来说很重要的工具:make/makfile 1.背景 在一个工程中源文件不计其数,其按类型、功能、模块分别放在若干个目录中,mak…...

云安全与网络安全:核心区别与协同作用解析
在数字化转型的浪潮中,云安全与网络安全作为信息安全的两大支柱,常被混淆但本质不同。本文将从概念、责任分工、技术手段、威胁类型等维度深入解析两者的差异,并探讨它们的协同作用。 一、核心区别 定义与范围 网络安全:聚焦于保…...
命令行关闭Windows防火墙
命令行关闭Windows防火墙 引言一、防火墙:被低估的"智能安检员"二、优先尝试!90%问题无需关闭防火墙方案1:程序白名单(解决软件误拦截)方案2:开放特定端口(解决网游/开发端口不通)三、命令行极速关闭方案方法一:PowerShell(推荐Win10/11)方法二:CMD命令…...

ArcGIS Pro+ArcGIS给你的地图加上北回归线!
今天来看ArcGIS Pro和ArcGIS中如何给制作的中国地图或者其他大范围地图加上北回归线。 我们将在ArcGIS Pro和ArcGIS中一同介绍。 1 ArcGIS Pro中设置北回归线 1、在ArcGIS Pro中初步设置好经纬格网等,设置经线、纬线都以10间隔显示。 2、需要插入背会归线…...
Go 并发编程基础:select 多路复用
select 是 Go 并发编程中非常强大的语法结构,它允许程序同时等待多个通道操作的完成,从而实现多路复用机制,是协程调度、超时控制、通道竞争等场景的核心工具。 一、什么是 select select 类似于 switch 语句,但它用于监听多个通…...

Java在word中指定位置插入图片。
Java使用(Poi-tl) 在word(docx)中指定位置插入图片 Poi-tl 简介Maven 依赖配置Poi-tl 实现原理与步骤1. 模板标签规范2.完整实现代码3.效果展示 Poi-tl 简介 Poi-tl 是基于 Apache POI 的 Java 开源文档处理库,专注于…...
RMQ 算法详解(区间最值问题)
RMQ 算法详解(区间最值问题) 问题介绍解决方法暴力法ST表法基本思想算法步骤C实现 问题介绍 RMQ问题是OI中经常遇到的问题,主要是一下形式: 给你一堆数,不断的对里面的数进行操作,例如:让某个…...