并发服务器框架——zinx
zinx框架
Zinx 是一个用 Go 语言编写的高性能、轻量级的 TCP 服务器框架,它被设计为简单、快速且易于使用。Zinx 提供了一系列的功能,包括但不限于连接管理、数据编解码、业务处理、负载均衡等,适用于构建各种 TCP 网络服务,如游戏服务器、即时通讯服务器等。
下面实现zinx的多个功能包括:路由、全局配置、消息封装、读写分离、消息队列、链接管理等。
utils包下GlobalObj.go
package utilsimport ("datarace/zinx/ziface""encoding/json""io/ioutil"
)/*
存储一切有关Zinx框架的全局参数,供其他模块使用
一些参数也可以通过 用户根据 zinx.json来配置
*/
type GlobalObj struct {TcpServer ziface.IServer //当前Zinx的全局Server对象Host string //当前服务器主机IPTcpPort int //当前服务器主机监听端口号Name string //当前服务器名称Version string //当前Zinx版本号MaxPacketSize uint32 //都需数据包的最大值MaxConn int //当前服务器主机允许的最大链接个数WorkerPoolSize uint32 //业务工作Worker池的数量MaxWorkerTaskLen uint32 //业务工作Worker对应负责的任务队列最大任务存储数量ConfFilePath stringMaxMsgChanLen int
}/*
定义一个全局的对象
*/
var GlobalObject *GlobalObj// 读取用户的配置文件
func (g *GlobalObj) Reload() {data, err := ioutil.ReadFile("zinx.json")if err != nil {panic(err)}//将json数据解析到struct中//fmt.Printf("json :%s\n", data)err = json.Unmarshal(data, &GlobalObject)if err != nil {panic(err)}
}
func init() {//初始化GlobalObject变量,设置一些默认值GlobalObject = &GlobalObj{Name: "ZinxServerApp",Version: "V0.4",TcpPort: 7777,Host: "0.0.0.0",MaxConn: 12000,MaxPacketSize: 4096,ConfFilePath: "conf/zinx.json",WorkerPoolSize: 10,MaxWorkerTaskLen: 1024,}//从配置文件中加载一些用户配置的参数GlobalObject.Reload()
}
ziface包
package zifacetype IConnManager interface {Add(conn IConnection) //添加链接Remove(conn IConnection) //删除连接Get(connID uint32) (IConnection, error) //利用ConnID获取链接Len() int //获取当前连接ClearConn() //删除并停止所有链接
}
package zifaceimport "net"type IConnection interface {Start()Stop()GetConnID() uint32GetTCPConnection() *net.TCPConnRemoteAddr() net.AddrSendMsg(msgId uint32, data []byte) error//直接将Message数据发送给远程的TCP客户端(有缓冲)SendBuffMsg(msgId uint32, data []byte) error //添加带缓冲发送消息接口//设置链接属性SetProperty(key string, value interface{})//获取链接属性GetProperty(key string) (interface{}, error)//移除链接属性RemoveProperty(key string)
}
type HandFunc func(*net.TCPConn, []byte, int) error
package zifacetype IDataPack interface {GetHeadLen() int32Pack(msg IMessage) ([]byte, error)Unpack([]byte) (IMessage, error)
}
package zifacetype IMessage interface {GetDataLen() uint32GetMsgId() uint32GetData() []byteSetMsgId(uint32)SetData([]byte)SetDataLen(uint32)
}
package zifacetype IMsgHandle interface {DoMsgHandler(request IRequest) //马上以非阻塞方式处理消息AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑StartWorkerPool() //启动worker工作池SendMsgToTaskQueue(request IRequest) //将消息交给TaskQueue,由worker进行处理
}
package zifacetype IRequest interface {GetConnection() IConnectionGetData() []byteGetMsgID() uint32
}
package zifacetype IRouter interface {PreHandle(req IRequest)Handle(req IRequest)PostHandle(req IRequest)
}
package zifacetype IServer interface {Start()Stop()Serve()AddRouter(msgId uint32, router IRouter)//得到链接管理GetConnMgr() IConnManager//设置该Server的连接创建时Hook函数SetOnConnStart(func(IConnection))//设置该Server的连接断开时的Hook函数SetOnConnStop(func(IConnection))//调用连接OnConnStart Hook函数CallOnConnStart(conn IConnection)//调用连接OnConnStop Hook函数CallOnConnStop(conn IConnection)
}
znet包
connection
package znetimport ("datarace/zinx/utils""datarace/zinx/ziface""errors""fmt""io""net""sync"
)type Connection struct {//当前Conn属于哪个ServerTcpServer ziface.IServer //当前conn属于哪个server,在conn初始化的时候添加即可//当前连接的socket TCP套接字Conn *net.TCPConn//当前连接的ID 也可以称作为SessionID,ID全局唯一ConnID uint32//当前连接的关闭状态isClosed bool//消息管理MsgId和对应处理方法的消息管理模块MsgHandler ziface.IMsgHandle//告知该链接已经退出/停止的channelExitBuffChan chan bool//无缓冲管道,用于读、写两个goroutine之间的消息通信msgChan chan []byte//有关冲管道,用于读、写两个goroutine之间的消息通信msgBuffChan chan []byte//链接属性property map[string]interface{}//保护链接属性修改的锁propertyLock sync.RWMutex
}// 创建连接的方法
func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {//初始化Conn属性c := &Connection{TcpServer: server, //将隶属的server传递进来Conn: conn,ConnID: connID,isClosed: false,MsgHandler: msgHandler,ExitBuffChan: make(chan bool, 1),msgChan: make(chan []byte),msgBuffChan: make(chan []byte, utils.GlobalObject.MaxMsgChanLen), //不要忘记初始化property: make(map[string]interface{}), //对链接属性map初始化}//将新创建的Conn添加到链接管理中c.TcpServer.GetConnMgr().Add(c) //将当前新创建的连接添加到ConnManager中return c
}// 设置链接属性
func (c *Connection) SetProperty(key string, value interface{}) {c.propertyLock.Lock()defer c.propertyLock.Unlock()c.property[key] = value
}// 获取链接属性
func (c *Connection) GetProperty(key string) (interface{}, error) {c.propertyLock.RLock()defer c.propertyLock.RUnlock()if value, ok := c.property[key]; ok {return value, nil} else {return nil, errors.New("no property found")}
}// 移除链接属性
func (c *Connection) RemoveProperty(key string) {c.propertyLock.Lock()defer c.propertyLock.Unlock()delete(c.property, key)
}
func (c *Connection) startReader() {fmt.Println("[Reader Goroutine is running]")defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]")defer c.Stop()for {// 创建拆包解包的对象dp := NewDataPack()//读取客户端的Msg headheadData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head error ", err)break}//拆包,得到msgid 和 datalen 放在msg中msg, err := dp.Unpack(headData)if err != nil {fmt.Println("unpack error ", err)break}//根据 dataLen 读取 data,放在msg.Data中var data []byteif msg.GetDataLen() > 0 {data = make([]byte, msg.GetDataLen())if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data error ", err)continue}}msg.SetData(data)//得到当前客户端请求的Request数据req := Request{conn: c,msg: msg,}//从绑定好的消息和对应的处理方法中执行对应的Handle方法if utils.GlobalObject.WorkerPoolSize > 0 {//已经启动工作池机制,将消息交给Worker处理c.MsgHandler.SendMsgToTaskQueue(&req)} else {//从绑定好的消息和对应的处理方法中执行对应的Handle方法go c.MsgHandler.DoMsgHandler(&req)}}
}// 启动连接,让当前连接开始工作
func (c *Connection) Start() {//1 开启用户从客户端读取数据流程的Goroutinego c.startReader()//2 开启用于写回客户端数据流程的Goroutinego c.StartWriter()//按照用户传递进来的创建连接时需要处理的业务,执行钩子方法c.TcpServer.CallOnConnStart(c)
}//func (c *Connection) Start() {
// go c.startReader()
// for {
// select {
// case <-c.ExitBuffChan:
// return
// }
// }
//}// 停止连接,结束当前连接状态M
func (c *Connection) Stop() {fmt.Println("Conn Stop()...ConnID = ", c.ConnID)//如果当前链接已经关闭if c.isClosed == true {return}c.isClosed = true//==================//如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用c.TcpServer.CallOnConnStop(c)//==================// 关闭socket链接c.Conn.Close()//关闭Writerc.ExitBuffChan <- true//将链接从连接管理器中删除c.TcpServer.GetConnMgr().Remove(c)//关闭该链接全部管道close(c.ExitBuffChan)close(c.msgBuffChan)
}/*
写消息Goroutine, 用户将数据发送给客户端
*/
func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")for {select {case data := <-c.msgChan://有数据要写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send Data error:, ", err, " Conn Writer exit")return}//针对有缓冲channel需要些的数据处理case data, ok := <-c.msgBuffChan:if ok {//有数据要写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")return}} else {breakfmt.Println("msgBuffChan is Closed")}case <-c.ExitBuffChan:return}}
}// 直接将Message数据发送数据给远程的TCP客户端
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed == true {return errors.New("Connection closed when send msg")}//将data封包,并且发送dp := NewDataPack()msg, err := dp.Pack(NewMsgPackage(msgId, data))if err != nil {fmt.Println("Pack error msg id = ", msgId)return errors.New("Pack error msg ")}//写回客户端c.msgChan <- msg //将之前直接回写给conn.Write的方法 改为 发送给Channel 供Writer读取return nil
}// 从当前连接获取原始的socket TCPConn
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}// 获取当前连接ID
func (c *Connection) GetConnID() uint32 {return c.ConnID
}// 获取远程客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}// // 直接将Message数据发送数据给远程的TCP客户端
//
// func (c *Connection) SendMsg(msgId uint32, data []byte) error {
// if c.isClosed == true {
// return errors.New("Connection closed when send msg")
// }
// //将data封包,并且发送
// dp := NewDataPack()
// msg, err := dp.Pack(NewMsgPackage(msgId, data))
// if err != nil {
// fmt.Println("Pack error msg id = ", msgId)
// return errors.New("Pack error msg ")
// }
// //写回客户端
// if _, err := c.Conn.Write(msg); err != nil {
// fmt.Println("Write msg id ", msgId, " error ")
// c.ExitBuffChan <- true
// return errors.New("conn Write error")
// }
// return nil
// }
func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {if c.isClosed == true {return errors.New("Connection closed when send buff msg")}//将data封包,并且发送dp := NewDataPack()msg, err := dp.Pack(NewMsgPackage(msgId, data))if err != nil {fmt.Println("Pack error msg id = ", msgId)return errors.New("Pack error msg ")}//写回客户端c.msgBuffChan <- msgreturn nil
}
ConnManager
package znetimport ("datarace/zinx/ziface""errors""fmt""sync"
)type ConnManager struct {connections map[uint32]ziface.IConnectionconnLock sync.RWMutex
}/*
创建一个链接管理
*/
func NewConnManager() *ConnManager {return &ConnManager{connections: make(map[uint32]ziface.IConnection),}
}// 添加链接
func (connMgr *ConnManager) Add(conn ziface.IConnection) {//保护共享资源Map 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()//将conn连接添加到ConnMananger中connMgr.connections[conn.GetConnID()] = connfmt.Println("connection add to ConnManager successfully: conn num = ", connMgr.Len())
}// 删除连接
func (connMgr *ConnManager) Remove(conn ziface.IConnection) {//保护共享资源Map 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()//删除连接信息delete(connMgr.connections, conn.GetConnID())fmt.Println("connection Remove ConnID=", conn.GetConnID(), " successfully: conn num = ", connMgr.Len())
}// 利用ConnID获取链接
func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) {//保护共享资源Map 加读锁connMgr.connLock.RLock()defer connMgr.connLock.RUnlock()if conn, ok := connMgr.connections[connID]; ok {return conn, nil} else {return nil, errors.New("connection not found")}
}// 获取当前连接
func (connMgr *ConnManager) Len() int {return len(connMgr.connections)
}// 清除并停止所有连接
func (connMgr *ConnManager) ClearConn() {//保护共享资源Map 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()//停止并删除全部的连接信息for connID, conn := range connMgr.connections {//停止conn.Stop()//删除delete(connMgr.connections, connID)}fmt.Println("Clear All Connections successfully: conn num = ", connMgr.Len())
}
datapack
package znetimport ("bytes""datarace/zinx/utils""datarace/zinx/ziface""encoding/binary""errors"
)type DataPack struct{}func NewDataPack() *DataPack {return &DataPack{}
}
func (dp *DataPack) GetHeadLen() uint32 {//Id uint32(4字节) + DataLen uint32(4字节)return 8
}// 封包方法(压缩数据)
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {//创建一个存放bytes字节的缓冲dataBuff := bytes.NewBuffer([]byte{})//写dataLenif err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil {return nil, err}//写msgIDif err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {return nil, err}//写data数据if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {return nil, err}return dataBuff.Bytes(), nil
}
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {//创建一个从输入二进制数据的ioReaderdataBuff := bytes.NewReader(binaryData)//只解压head的信息,得到dataLen和msgIDmsg := &Message{}//读dataLenif err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil {return nil, err}//读msgIDif err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {return nil, err}//判断dataLen的长度是否超出我们允许的最大包长度if utils.GlobalObject.MaxPacketSize > 0 && msg.DataLen > utils.GlobalObject.MaxPacketSize {return nil, errors.New("Too large msg data recieved")}//这里只需要把head的数据拆包出来就可以了,然后再通过head的长度,再从conn读取一次数据return msg, nil
}
message
package znettype Message struct {Id uint32DataLen uint32Data []byte
}func NewMsgPackage(id uint32, data []byte) *Message {return &Message{Id: id,DataLen: uint32(len(data)),Data: data,}
}// 获取消息数据段长度
func (msg *Message) GetDataLen() uint32 {return msg.DataLen
}// 获取消息ID
func (msg *Message) GetMsgId() uint32 {return msg.Id
}// 获取消息内容
func (msg *Message) GetData() []byte {return msg.Data
}// 设置消息数据段长度
func (msg *Message) SetDataLen(len uint32) {msg.DataLen = len
}// 设计消息ID
func (msg *Message) SetMsgId(msgId uint32) {msg.Id = msgId
}// 设计消息内容
func (msg *Message) SetData(data []byte) {msg.Data = data
}
msgHandler
package znetimport ("datarace/zinx/utils""datarace/zinx/ziface""fmt""strconv"
)type MsgHandle struct {Apis map[uint32]ziface.IRouterWorkerPoolSize uint32TaskQueue []chan ziface.IRequest
}func NewMsgHandle() *MsgHandle {return &MsgHandle{Apis: make(map[uint32]ziface.IRouter),WorkerPoolSize: utils.GlobalObject.WorkerPoolSize,TaskQueue: make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize),}
}// 马上以非阻塞方式处理消息
func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {handler, ok := mh.Apis[request.GetMsgID()]if !ok {fmt.Println("api msgId = ", request.GetMsgID(), " is not FOUND!")return}//执行对应处理方法handler.PreHandle(request)handler.Handle(request)handler.PostHandle(request)
}// 为消息添加具体的处理逻辑
func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {//1 判断当前msg绑定的API处理方法是否已经存在if _, ok := mh.Apis[msgId]; ok {panic("repeated api , msgId = " + strconv.Itoa(int(msgId)))}//2 添加msg与api的绑定关系mh.Apis[msgId] = routerfmt.Println("Add api msgId = ", msgId)
}// 启动一个Worker工作流程
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {fmt.Println("Worker ID = ", workerID, " is started.")//不断的等待队列中的消息for {select {//有消息则取出队列的Request,并执行绑定的业务方法case request := <-taskQueue:mh.DoMsgHandler(request)}}
}// 启动worker工作池
func (mh *MsgHandle) StartWorkerPool() {//遍历需要启动worker的数量,依此启动for i := 0; i < int(mh.WorkerPoolSize); i++ {//一个worker被启动//给当前worker对应的任务队列开辟空间mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来go mh.StartOneWorker(i, mh.TaskQueue[i])}
}// 将消息交给TaskQueue,由worker进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {//根据ConnID来分配当前的连接应该由哪个worker负责处理//轮询的平均分配法则//得到需要处理此条连接的workerIDworkerID := request.GetConnection().GetConnID() % mh.WorkerPoolSizefmt.Println("Add ConnID=", request.GetConnection().GetConnID(), " request msgID=", request.GetMsgID(), "to workerID=", workerID)//将请求消息发送给任务队列mh.TaskQueue[workerID] <- request
}
request
package znetimport ("datarace/zinx/ziface"
)type Request struct {conn ziface.IConnectionmsg ziface.IMessage
}func (r *Request) GetConnection() ziface.IConnection {return r.conn
}// 获取请求消息的数据
func (r *Request) GetData() []byte {return r.msg.GetData()
}// 获取请求的消息的ID
func (r *Request) GetMsgID() uint32 {return r.msg.GetMsgId()
}
router
package znetimport "datarace/zinx/ziface"type BaseRouter struct{}func (br *BaseRouter) PreHandle(request ziface.IRequest) {}
func (br *BaseRouter) Handle(request ziface.IRequest) {}
func (br *BaseRouter) PostHandle(request ziface.IRequest) {}
server
package znetimport ("datarace/zinx/utils""datarace/zinx/ziface""fmt""net""time"
)// iServer 接口实现,定义一个Server服务类
type Server struct {//服务器的名称Name string//tcp4 or otherIPVersion string//服务绑定的IP地址IP string//服务绑定的端口Port int//当前Server的消息管理模块,用来绑定MsgId和对应的处理方法msgHandler ziface.IMsgHandle//当前Server的链接管理器ConnMgr ziface.IConnManager//新增两个hook函数原型//该Server的连接创建时Hook函数OnConnStart func(conn ziface.IConnection)//该Server的连接断开时的Hook函数OnConnStop func(conn ziface.IConnection)
}// 得到链接管理
func (s *Server) GetConnMgr() ziface.IConnManager {return s.ConnMgr
}// ============== 实现 ziface.IServer 里的全部接口方法 ========
// 开启网络服务
func (s *Server) Start() {fmt.Printf("[START] Server listenner at IP: %s, Port %d, is starting\n", s.IP, s.Port)fmt.Printf("[START] Server name: %s,listenner at IP: %s, Port %d is starting\n", s.Name, s.IP, s.Port)fmt.Printf("[Zinx] Version: %s, MaxConn: %d, MaxPacketSize: %d\n",utils.GlobalObject.Version,utils.GlobalObject.MaxConn,utils.GlobalObject.MaxPacketSize)//开启一个go去做服务端Linster业务go func() {//1 获取一个TCP的Addrs.msgHandler.StartWorkerPool()addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))if err != nil {fmt.Println("resolve tcp addr err: ", err)return}//2 监听服务器地址listenner, err := net.ListenTCP(s.IPVersion, addr)if err != nil {fmt.Println("listen", s.IPVersion, "err", err)return}//已经监听成功fmt.Println("start Zinx server ", s.Name, " succ, now listenning...")var cid uint32cid = 0//3 启动server网络连接业务for {//3.1 阻塞等待客户端建立连接请求conn, err := listenner.AcceptTCP()if err != nil {fmt.Println("Accept err ", err)continue}//=============//3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn {conn.Close()continue}//=============//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的dealConn := NewConntion(s, conn, cid, s.msgHandler)cid++//3.4 启动当前链接的处理业务go dealConn.Start()//go func() {// //不断的循环从客户端获取数据// for {// buf := make([]byte, 512)// cnt, err := conn.Read(buf)// if err != nil {// fmt.Println("recv buf err ", err)// continue// }// //回显// if _, err := conn.Write(buf[:cnt]); err != nil {// fmt.Println("write back buf err ", err)// continue// }// }//}()}}()
}
func (s *Server) Stop() {fmt.Println("[STOP] Zinx server , name ", s.Name)//将其他需要清理的连接信息或者其他信息 也要一并停止或者清理s.ConnMgr.ClearConn()
}
func (s *Server) Serve() {s.Start()//TODO Server.Serve() 是否在启动服务的时候 还要处理其他的事情呢 可以在这里添加//阻塞,否则主Go退出, listenner的go将会退出for {time.Sleep(10 * time.Second)}
}
func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {s.msgHandler.AddRouter(msgId, router)fmt.Println("Add Router SUCC!! msgID = ", msgId)
}/*
创建一个服务器句柄
*/
func NewServer() *Server {utils.GlobalObject.Reload()s := &Server{Name: utils.GlobalObject.Name,IPVersion: "tcp4",IP: utils.GlobalObject.Host,Port: utils.GlobalObject.TcpPort,msgHandler: NewMsgHandle(), //msgHandler 初始化ConnMgr: NewConnManager(), //创建ConnManage}return s
}// 设置该Server的连接创建时Hook函数
func (s *Server) SetOnConnStart(hookFunc func(ziface.IConnection)) {s.OnConnStart = hookFunc
}// 设置该Server的连接断开时的Hook函数
func (s *Server) SetOnConnStop(hookFunc func(ziface.IConnection)) {s.OnConnStop = hookFunc
}// 调用连接OnConnStart Hook函数
func (s *Server) CallOnConnStart(conn ziface.IConnection) {if s.OnConnStart != nil {fmt.Println("---> CallOnConnStart....")s.OnConnStart(conn)}
}// 调用连接OnConnStop Hook函数
func (s *Server) CallOnConnStop(conn ziface.IConnection) {if s.OnConnStop != nil {fmt.Println("---> CallOnConnStop....")s.OnConnStop(conn)}
}
客户端
package mainimport ("datarace/zinx/znet""fmt""io""net""time"
)/*
模拟客户端
*/
func main() {fmt.Println("Client Test ... start")//3秒之后发起测试请求,给服务端开启服务的机会time.Sleep(3 * time.Second)conn, err := net.Dial("tcp", "127.0.0.1:7777")if err != nil {fmt.Println("client start err, exit!")return}for {//发封包message消息dp := znet.NewDataPack()msg, _ := dp.Pack(znet.NewMsgPackage(0, []byte("Zinx V0.8 Client0 Test Message")))_, err := conn.Write(msg)if err != nil {fmt.Println("write error err ", err)return}//先读出流中的head部分headData := make([]byte, dp.GetHeadLen())_, err = io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止if err != nil {fmt.Println("read head error")break}//将headData字节流 拆包到msg中msgHead, err := dp.Unpack(headData)if err != nil {fmt.Println("server unpack err:", err)return}if msgHead.GetDataLen() > 0 {//msg 是有data数据的,需要再次读取data数据msg := msgHead.(*znet.Message)msg.Data = make([]byte, msg.GetDataLen())//根据dataLen从io中读取字节流_, err := io.ReadFull(conn, msg.Data)if err != nil {fmt.Println("server unpack data err:", err)return}fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))}time.Sleep(1 * time.Second)}
}
服务端
package mainimport ("datarace/zinx/ziface""datarace/zinx/znet""fmt"
)// ping test 自定义路由
type PingRouter struct {znet.BaseRouter
}// Ping Handle
func (this *PingRouter) Handle(request ziface.IRequest) {fmt.Println("Call PingRouter Handle")//先读取客户端的数据,再回写ping...ping...pingfmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))err := request.GetConnection().SendBuffMsg(0, []byte("ping...ping...ping"))if err != nil {fmt.Println(err)}
}type HelloZinxRouter struct {znet.BaseRouter
}// HelloZinxRouter Handle
func (this *HelloZinxRouter) Handle(request ziface.IRequest) {fmt.Println("Call HelloZinxRouter Handle")//先读取客户端的数据,再回写ping...ping...pingfmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))err := request.GetConnection().SendBuffMsg(1, []byte("Hello Zinx Router V0.10"))if err != nil {fmt.Println(err)}
}// 创建连接的时候执行
func DoConnectionBegin(conn ziface.IConnection) {fmt.Println("DoConnecionBegin is Called ... ")//=============设置两个链接属性,在连接创建之后===========fmt.Println("Set conn Name, Home done!")conn.SetProperty("Name", "Aceld")conn.SetProperty("Home", "https://www.jianshu.com/u/35261429b7f1")//===================================================err := conn.SendMsg(2, []byte("DoConnection BEGIN..."))if err != nil {fmt.Println(err)}
}// 连接断开的时候执行
func DoConnectionLost(conn ziface.IConnection) {//============在连接销毁之前,查询conn的Name,Home属性=====if name, err := conn.GetProperty("Name"); err == nil {fmt.Println("Conn Property Name = ", name)}if home, err := conn.GetProperty("Home"); err == nil {fmt.Println("Conn Property Home = ", home)}//===================================================fmt.Println("DoConneciotnLost is Called ... ")
}
func main() {//创建一个server句柄s := znet.NewServer()//注册链接hook回调函数s.SetOnConnStart(DoConnectionBegin)s.SetOnConnStop(DoConnectionLost)//配置路由s.AddRouter(0, &PingRouter{})s.AddRouter(1, &HelloZinxRouter{})//开启服务s.Serve()
}
相关文章:

并发服务器框架——zinx
zinx框架 Zinx 是一个用 Go 语言编写的高性能、轻量级的 TCP 服务器框架,它被设计为简单、快速且易于使用。Zinx 提供了一系列的功能,包括但不限于连接管理、数据编解码、业务处理、负载均衡等,适用于构建各种 TCP 网络服务,如游戏…...

Unity 中计算射线和平面相交距离的原理
有此方法 能够计算射线和平面是否相交以及射线起点到平面交点的距离 代码分析 var dot Vector3.Dot(ray.direction, plane.normal);计算射线和平面法线的点积,如果大于等于0,则说明射线和平面没有相交,否则,说明射线和平面相交…...
浅谈棋牌游戏开发流程七:反外挂与安全体系——守护游戏公平与玩家体验
一、前言:为什么反外挂与安全这么重要? 对于任何一款线上棋牌游戏而言,公平性和玩家安全都是最重要的核心要素之一。如果游戏环境充斥着各式各样的外挂、作弊方式,不仅会毁坏玩家体验,更会导致游戏生态崩塌、口碑下滑…...

《无力逃脱》V1.0.15.920(59069)官方中文版
艾丹是一名三臂赏金猎人,他必须追捕银河系中最危险、最难以捉摸的割喉者。 有些悬赏是金钱,有些则是有价值的信息。艾丹可以利用这些信息找到让他走上这条路的人,同时也会卷入一个全银河系的阴谋中。 拥有三条手臂可以让你同时对付更多的敌…...
六种主流服务器的选择与使用
网络的运行离不开各种服务器,它们各司其职,为我们提供稳定的网络服务。本文带大家了解6种常见服务器类型。 服务器的六大种类 第一种:Web服务器 Web服务器是互联网的核心。当你打开一个网站,比如百度或淘宝,浏览器会…...
TiDB 升级至高版本提示'mysql.tidb_runaway_watch' doesn't exist 问题处理
作者: asd80703406 原文来源: https://tidb.net/blog/90394c97 背景 近期发现很多人从低版本升级至TiDB v7 或者v8版本,均遇到了tidb-server启动失败,提示报错如下: ["get runaway watch record failed"…...

GRU-PFG:利用图神经网络从股票因子中提取股票间相关性
“GRU-PFG: Extract Inter-Stock Correlation from Stock Factors with Graph Neural Network” 论文地址:https://arxiv.org/pdf/2411.18997 摘要 股票预测模型可以分为两个主要类别:第一类,例如GRU和ALSTM,这些模型仅基于股票…...

数字化供应链创新解决方案在零售行业的应用研究——以开源AI智能名片S2B2C商城小程序为例
摘要: 在数字化转型的浪潮中,零售行业正经历着前所未有的变革。特别是在供应链管理方面,线上线下融合、数据孤岛、消费者需求多样化等问题日益凸显,对零售企业的运营效率与市场竞争力构成了严峻挑战。本文深入探讨了零售行业供应…...
安卓Activity执行finish后onNewIntent也执行了
测试反应投屏时下一集可能播放不成功。 首先看一下日志: onCompletion onCast handlerMessage: 2 finish: PlayerActivityabc7fdc onPause: PlayerActivityabc7fdc onNewIntent: PlayerActivityabc7fdc onResume: PlayerActivityabc7fdc onPause: PlayerActivityab…...

数据结构.期末复习.学习笔记(c语言)
《数据结构》复习概要 一、概论 二、基础1. 基本概念2. 四种逻辑结构及特点3. 算法的概念、特性4. 算法设计的4个要求 三、线性结构1.顺序表2.单链表3.循环链表双向链表4.栈(后进先出)5.队列(先进先出) 四、树和二叉树1.树2.二叉…...

Kafaka安装与启动教程
1.下载 先去官网Apache Kafka可以查看到每个版本的发布时间。选择你要安装的版本。 然后进入linux建立要存放的文件夹,用wget命令下载 2.安装 先解压缩: tar -xvzf kafka_2.12-3.5.1.tgz -C ../ 3.配置文件 修改server.properties: cd .…...
根据docker file 编译镜像
比如给到一个Dockerfile 第一步编译镜像 cd /path/to/Dockerfiledocker build -t <DOCKER_IMAGE_NAME> . build 命令编译镜像 -t 镜像名字 . 指dockerfile 所在目录 如果遇到报错 [] Building 0.3s (3/3) FINISHED …...
联邦学习的 AI 大模型微调中,加性、选择性、重参数化和混合微调
联邦学习的 AI 大模型微调中,加性、选择性、重参数化和混合微调 在联邦学习的 AI 大模型微调中,加性、选择性、重参数化和混合微调是不同的操作方式,具体如下: 加性微调 定义与原理:加性微调是在原始模型的基础上添加额外的可训练参数来进行模型调整。这种方式不会改变原…...

android 外挂modem模块实现Telephony相关功能(上网,发短信,打电话)
一.背景 当前模块不支持Telephony相关的功能,例如上网、发短信等功能,就需要外挂另一个模块实现此功能,这就是外挂modem模块实现Telephony功能,此篇主要就是说实现外挂modem模块功能中的Framework层实现逻辑,如下流程是在Android 13中实现的外挂pcie模块的流程 二.ril库相…...

【计算机视觉技术 - 人脸生成】2.GAN网络的构建和训练
GAN 是一种常用的优秀的图像生成模型。我们使用了支持条件生成的 cGAN。下面介绍简单 cGAN 模型的构建以及训练过程。 2.1 在 model 文件夹中新建 nets.py 文件 import torch import torch.nn as nn# 生成器类 class Generator(nn.Module):def __init__(self, nz100, nc3, n…...

数据中台与数据治理服务方案[50页PPT]
本文概述了数据中台与数据治理服务方案的核心要点。数据中台作为政务服务数据化的核心,通过整合各部门业务系统数据,进行建模与加工,以新数据驱动政府管理效率提升与政务服务能力增强。数据治理则聚焦于解决整体架构问题,确保数据…...

【Qt】将控件均匀分布到圆环上
1. 关键代码 for(int i0; i<10; i){/*m_panLabelIcon - 大圆环控件m_slotsIcon[i] - 小圆控件*/QString idxStr QString::number(i1);m_slotsIcon[i] new QLabel(m_panLabelIcon);m_slotsIcon[i]->setFont(ftSlot);m_slotsIcon[i]->setText(idxStr);m_slotsIcon[i]-…...
第四、五章补充:线代本质合集(B站:小崔说数)
视频1:线性空间 原视频:【线性代数的本质】向量空间、基向量的几何解释_哔哩哔哩_bilibili 很多同学在学习线性代数的时候,会遇到一个困扰,就是不知道什么是线性空间。...
2025年贵州省职业院校技能大赛信息安全管理与评估赛项规程
贵州省职业院校技能大赛赛项规程 赛项名称: 信息安全管理与评估 英文名称: Information Security Management and Evaluation 赛项组别: 高职组 赛项编号: GZ032 1 2 一、赛项信息 赛项类别 囚每年赛 □隔年赛(□单数年…...
松鼠状态机流转-@Transit
疑问 状态from to合法性校验,都是在代码中手动进行的吗,不是状态机自动进行的? 注解中from状态,代表当前状态 和谁校验:上下文中初始状态 怎么根据注解找到执行方法的 分析代码,创建运单,怎…...

网络六边形受到攻击
大家读完觉得有帮助记得关注和点赞!!! 抽象 现代智能交通系统 (ITS) 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 (…...

Chapter03-Authentication vulnerabilities
文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...
PHP和Node.js哪个更爽?
先说结论,rust完胜。 php:laravel,swoole,webman,最开始在苏宁的时候写了几年php,当时觉得php真的是世界上最好的语言,因为当初活在舒适圈里,不愿意跳出来,就好比当初活在…...
Java如何权衡是使用无序的数组还是有序的数组
在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...

Ascend NPU上适配Step-Audio模型
1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统,支持多语言对话(如 中文,英文,日语),语音情感(如 开心,悲伤)&#x…...

学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”
2025年#高考 将在近日拉开帷幕,#AI 监考一度冲上热搜。当AI深度融入高考,#时间同步 不再是辅助功能,而是决定AI监考系统成败的“生命线”。 AI亮相2025高考,40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕,江西、…...

R 语言科研绘图第 55 期 --- 网络图-聚类
在发表科研论文的过程中,科研绘图是必不可少的,一张好看的图形会是文章很大的加分项。 为了便于使用,本系列文章介绍的所有绘图都已收录到了 sciRplot 项目中,获取方式: R 语言科研绘图模板 --- sciRplothttps://mp.…...
怎么让Comfyui导出的图像不包含工作流信息,
为了数据安全,让Comfyui导出的图像不包含工作流信息,导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo(推荐) 在 save_images 方法中,删除或注释掉所有与 metadata …...

论文阅读:LLM4Drive: A Survey of Large Language Models for Autonomous Driving
地址:LLM4Drive: A Survey of Large Language Models for Autonomous Driving 摘要翻译 自动驾驶技术作为推动交通和城市出行变革的催化剂,正从基于规则的系统向数据驱动策略转变。传统的模块化系统受限于级联模块间的累积误差和缺乏灵活性的预设规则。…...

倒装芯片凸点成型工艺
UBM(Under Bump Metallization)与Bump(焊球)形成工艺流程。我们可以将整张流程图分为三大阶段来理解: 🔧 一、UBM(Under Bump Metallization)工艺流程(黄色区域ÿ…...