[Toc]
go环境变量设置 export GOPATH=/home/project
Zinx架构设计 客户端请求服务器响应的过程
要有一个客户端对服务器发起请求。
我们的服务器应该去启动对客户端的处理模块并打开工作池来提升并发量。
处理客户端的模块开启两个模块,一个负责读客户端请求,一个负责写客户端请求。
用于读的功能模块,去任务的消息队列里去请求读数据。用于写的功能模块,通过 API 接口,当然我们的 API 不可能只有一个,所以这里肯定是 APIS。
其过程如下所示:
Zinx功能模块
v0.1 Zinx目录结构如下所示:
iserver.go 1 2 3 4 5 6 7 8 9 10 11 package zifacetype IServer interface { Start() Stop() Server() }
server.go *
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 package znetimport ( "fmt" "net" "zinx/ziface" )type Server struct { Name string IPVersion string IP string Port int }func NewServer (name string ) ziface.IServer { s := &Server { Name: name, IPVersion:"tcp4" , IP:"0.0.0.0" , Port:7777 , } return s }func (s *Server) Start() { fmt.Printf("[START] Server listenner at IP: %s, Port %d, is starting\n" , s.IP, s.Port) go func () { addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s,%d" , s.IP, s. Port)) if err != nil { fmt.Println("resolve tcp addr err: " , err) } listenner, err := net.ListenTCP(s.IPVersion, addr) if err != nil { fmt.Println("listen" , s.IPVersion, "err" , err) return } fmt.Println("Start zinx Server " , s.Name, " succ, now listenning……" ) for { conn, err := listenner.AcceptTCP() if err != nil { fmt.Println("Accept err " , err) continue } go func () { for { buf := make ([]byte , 512 ) cnt, err := conn.Read(buf) if err != nil { fmt.Println("recv bug err " , err) continue } if _, err := conn.Write(buf[:cnt]); err != nil { fmt.Println("write back buf err" , err) continue } } }() } }() }func (s *Server) Stop() { fmt.Println("[STOP] Zinx server, name" , s.Name) }func (s *Server) Server() { }
server_test.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package znetimport ( "fmt" "net" "testing" "time" )func ClientTest () { fmt.Println("Client Test ... start" ) time.Sleep(3 * time.Second) conn,err := net.Dial("tcp" , "127.0.0.1:7777" ) if err != nil { fmt.Println("client start err, exit!" ) return } for { _, err = conn.Write([]byte ("Hello ZINX" )) if err != nil { fmt.Println("Write error err " , err) return } buf := make ([]byte , 512 ) cnt, err := conn.Read(buf) if err != nil { fmt.Println("read buf err" , err) return } fmt.Printf(" server call back: %s, cnt = %d\n" , buf, cnt) time.Sleep(1 * time.Second) } }func TestServer (t *testing.T) { s := NewServer("[zinx V0.1]" ) go ClientTest() s.Server() }
[v0.2] 实现链接封装业务与业务绑定 功能如思维导图所示:
文件结构
代码 iconnection.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package zifaceimport "net" type IConnection interface { Start() Stop() GetTCPConnection() *net.TCPConn GetConnID() uint32 RemoteAddr() net.Addr }type HandFunc func (*net.TCPConn, []byte , int ) error
connection.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 package znetimport ( "fmt" "net" "zinx/ziface" )type Connection struct { Conn *net.TCPConn ConnID uint32 isClosed bool handleAPI ziface.HandFunc ExitBufferChan chan bool }func NewConntion (conn *net.TCPConn, connID uint32 , callBack_api ziface.HandFunc) *Connection { c := &Connection{ Conn: conn, ConnID: connID, isClosed: false , handleAPI: callBack_api, ExitBufferChan: make (chan bool , 1 ), } return c }func (c *Connection) StartReader() { fmt.Println("Reader Goroutine is running" ) defer fmt.Println(c.Conn.RemoteAddr().String(), "conn reader exit" ) defer c.Stop() for { buf := make ([]byte ,512 ) cnt, err := c.Conn.Read(buf) if err != nil { fmt.Println("recv bug err " , err) c.ExitBufferChan <- true continue } if err := c.handleAPI(c.Conn, buf, cnt); err != nil { fmt.Println("connID " , c.ConnID, " handle is error" ) c.ExitBufferChan <- true return } } }func (c *Connection) Start() { go c.StartReader() for { select { case <- c.ExitBufferChan: return } } }func (c *Connection) Stop() { if c.isClosed == true { return } c.isClosed = true c.Conn.Close() c.ExitBufferChan <- true close (c.ExitBufferChan) }func (c *Connection) GetTCPConnection() *net.TCPConn { return c.Conn }func (c *Connection) GetConnID() uint32 { return c.ConnID }func (c *Connection) RemoteAddr() net.Addr { return c.Conn.RemoteAddr() }
server.go 代码修改 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 + var cid uint32 = 0 for { conn, err := listenner.AcceptTCP() if err != nil { fmt.Println("Accept err " , err) continue } + dealConn := NewConntion(conn, cid, CallBackToClient) + cid ++ go dealConn.Start() } }()
[v0.3] 实现基础路由模块
通俗讲就是实现一个类,系统地去回调一些用户的操作
功能
涉及知识点:
路由功能模块
目录结构
IRequest消息请求抽象类
把客户端请求的连接信息和请求的数据,放在一个叫 Request 的请求类里,这样的好处是我们可以从 Request 里得到全部客户端的请求信息,也为我们之后拓展框架有一定的作用,一旦客户端有额外的含义的数据信息,都可以放在这个 Request 里。可以理解为每次客户端的全部请求数据,Zinx 都会把它们一起放到一个 Request 结构体里。
创建抽象IRequest
层
1 2 3 4 5 6 7 8 9 package zifacetype IRequest interface { GetConnection() IConnection GetData() []byte }
当前的抽象层只提供了两个 Getter 方法,所以有个成员应该是必须的,一个是客户端连接,一个是客户端传递进来的数据,当然随着 Zinx 框架的功能丰富,这里面还应该继续添加新的成员。
实现Requeset
类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package znetimport "zinx/ziface" type Request struct { conn ziface.IConnection data []byte }func (r *Request) GetConnection() ziface.IConnection { return r.conn }func (r *Request) GetData() []byte { return r.data }
IRouter路由配置抽象类*
实现一个非常简单基础的路由功能,目的当然就是为了快速的让 Zinx 步入到路由的阶段。
创建抽象的IRouter
层
我们知道 router 实际上的作用就是,服务端应用可以给 Zinx 框架配置当前链接的处理业务方法,之前的 Zinx-V0.2 我们的 Zinx 框架处理链接请求的方法是固定的,现在是可以自定义,并且有 3 种接口可以重写。
Handle:是处理当前链接的主业务函数
PreHandle:如果需要在主业务函数之前有前置业务,可以重写这个方法
PostHandle:如果需要在主业务函数之后又后置业务,可以重写这个方法
当然每个方法都有一个唯一的形参 IRequest 对象,也就是客户端请求过来的连接和请求数据,作为我们业务方法的输入数据。
1 2 3 4 5 6 7 8 9 10 package zifacetype IRouter interface { PreHandle(request IRequest) Handle(request IRequest) PostHandle(request IRequest) }
实现Router
类
1 2 3 4 5 6 7 8 9 10 11 12 package znetimport "zinx/ziface" type BaseRouter struct {}func (br *BaseRouter) PreHandle(req ziface.IRequest){}func (br *BaseRouter) Handle(req ziface.IRequest){}func (br *BaseRouter) PostHandle(req ziface.IRequest){}
IServer增添路由添加功能
这一步需要修改原有的链接结构体,同时对服务中的方法进行修改
server类
iserver.go
1 2 3 4 5 6 7 8 9 10 11 12 package zifacetype IServer interface { Start() Stop() Serve() AddRouter(router IRouter) }
server.go
1 2 3 4 5 6 7 8 9 10 11 12 13 type Server struct { Name string IPVersion string IP string Port int Router ziface.IRouter }
NewServer()
方法中的成员初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 func NewServer (name string ) ziface.IServer { s:= &Server { Name :name, IPVersion:"tcp4" , IP:"0.0.0.0" , Port:7777 , Router: nil , } return s }
Connection类
connection.go
1 2 3 4 5 6 7 8 9 10 11 12 type Connection struct { Conn *net.TCPConn ConnID uint32 isClosed bool Router ziface.IRouter ExitBuffChan chan bool }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (c *Connection) StartReader() { fmt.Println("Reader Goroutine is running" ) defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!" ) defer c.Stop() for { buf := make ([]byte , 512 ) _, err := c.Conn.Read(buf) if err != nil { fmt.Println("recv buf err " , err) c.ExitBuffChan <- true continue } req := Request{ conn:c, data:buf, } go func (request ziface.IRequest) { c.Router.PreHandle(request) c.Router.Handle(request) c.Router.PostHandle(request) }(&req) } }
AddRouter 在server
类要实现添加路由的方法AddRouter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 package znetimport ( "fmt" "net" "time" "zinx/ziface" )type Server struct { Name string IPVersion string IP string Port int Router ziface.IRouter }func NewServer (name string ) ziface.IServer { s:= &Server { Name :name, IPVersion:"tcp4" , IP:"0.0.0.0" , Port:7777 , Router: nil , } return s }func (s *Server) Start() { fmt.Printf("[START] Server listenner at IP: %s, Port %d, is starting\n" , s.IP, s.Port) go func () { addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d" , s.IP, s.Port)) if err != nil { fmt.Println("resolve tcp addr err: " , err) return } listenner, err:= net.ListenTCP(s.IPVersion, addr) if err != nil { fmt.Println("listen" , s.IPVersion, "err" , err) return } fmt.Println("start Zinx server " , s.Name, " succ, now listenning..." ) var cid uint32 cid = 0 for { conn, err := listenner.AcceptTCP() if err != nil { fmt.Println("Accept err " , err) continue } dealConn := NewConntion(conn, cid, s.Router) cid ++ go dealConn.Start() } }() }func (s *Server) Stop() { fmt.Println("[STOP] Zinx server , name " , s.Name) }func (s *Server) Serve() { s.Start() for { time.Sleep(10 *time.Second) } }func (s *Server) AddRouter(router ziface.IRouter) { s.Router = router fmt.Println("Add Router succ! " ) }
在connnection
同样要加上相应的路由对应的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 package znetimport ( "fmt" "net" "zinx/ziface" )type Connection struct { Conn *net.TCPConn ConnID uint32 isClosed bool Router ziface.IRouter ExitBuffChan chan bool }func NewConntion (conn *net.TCPConn, connID uint32 , router ziface.IRouter) *Connection{ c := &Connection{ Conn: conn, ConnID: connID, isClosed: false , Router: router, ExitBuffChan: make (chan bool , 1 ), } return c }func (c *Connection) StartReader() { fmt.Println("Reader Goroutine is running" ) defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!" ) defer c.Stop() for { buf := make ([]byte , 512 ) _, err := c.Conn.Read(buf) if err != nil { fmt.Println("recv buf err " , err) c.ExitBuffChan <- true continue } req := Request{ conn:c, data:buf, } go func (request ziface.IRequest) { c.Router.PreHandle(request) c.Router.Handle(request) c.Router.PostHandle(request) }(&req) } }func (c *Connection) Start() { go c.StartReader() for { select { case <- c.ExitBuffChan: return } } }func (c *Connection) Stop() { if c.isClosed == true { return } c.isClosed = true c.Conn.Close() c.ExitBuffChan <- true close (c.ExitBuffChan) }func (c *Connection) GetTCPConnection() *net.TCPConn { return c.Conn }func (c *Connection) GetConnID() uint32 { return c.ConnID }func (c *Connection) RemoteAddr() net.Addr { return c.Conn.RemoteAddr() }
测试 Server.go
我们这里自定义了一个类似 Ping 操作的路由,就是当客户端发送数据,我们的处理业务就是返回给客户端”ping..ping..ping..”, 为了测试,当前路由也同时实现了 PreHandle 和 PostHandle 两个方法。实际上 Zinx 会利用模板的设计模式,依次在框架中调用PreHandle
、Handle
、PostHandle
三个方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package mainimport ( "fmt" "zinx/ziface" "zinx/znet" )type PingRouter struct { znet.BaseRouter }func (this *PingRouter) PreHandle(request ziface.IRequest) { fmt.Println("Call Router PreHandle" ) _, err := request.GetConnection().GetTCPConnection().Write([]byte ("before ping ....\n" )) if err !=nil { fmt.Println("call back ping ping ping error" ) } }func (this *PingRouter) Handle(request ziface.IRequest) { fmt.Println("Call PingRouter Handle" ) _, err := request.GetConnection().GetTCPConnection().Write([]byte ("ping...ping...ping\n" )) if err !=nil { fmt.Println("call back ping ping ping error" ) } }func (this *PingRouter) PostHandle(request ziface.IRequest) { fmt.Println("Call Router PostHandle" ) _, err := request.GetConnection().GetTCPConnection().Write([]byte ("After ping .....\n" )) if err !=nil { fmt.Println("call back ping ping ping error" ) } }func main () { s := znet.NewServer("[zinx V0.3]" ) s.AddRouter(&PingRouter{}) s.Serve() }
Client.go 代码不变
[v0.4] 全局配置模块
增加一个配置文件zinx.json
,保存服务器的各项属性,方便修改服务器的参数
功能
涉及知识点:
json格式问题
全局配置文件的好处
在zinx
目录下新建utils
文件夹,在utils
文件夹下新建globalobj.go
文件,这个go文件就是实现配置文件读取与输出的功能。
配置文件 在zinx
下新建conf
文件下,在其下新建zinx.json
配置文件,内容如下:
1 2 3 4 5 6 { "Name" : "demo server" , "Host" : "127.0.0.1" , "TcpPort" : 7777 , "MaxConn" : 3 }
从globalobj.go
的功能来看,这里可以设置你所有的需要加入到服务器的配置,以GlobalObj
结构体中的数据成员为主。
globalobj.go 对新手来讲,这个文件的代码中需要关注的两个函数:
ioutil.ReadFile
json.Unmarshal
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 package utilsimport ( "encoding/json" "io/ioutil" "zinx/ziface" "fmt" )type GlobalObj struct { tcpServer ziface.IServer Host string TcpPort int Name string Version string MaxPacketSize uint32 MaxConn int }var GlobalObject *GlobalObjfunc (g *GlobalObj) Reload() { data, err := ioutil.ReadFile("D:/Program Files/Go/src/zinx/conf/zinx.json" ) if err != nil { panic (err) } err = json.Unmarshal(data, &GlobalObject) if err != nil { panic (err) } fmt.Println(fmt.Sprintf("%+v" ,*GlobalObject)) }func init () { GlobalObject = & GlobalObj{ Name: "ZinxServerApp" , Version: "V0.4" , TcpPort: 7777 , Host: "0.0.0.0" , MaxConn: 12000 , MaxPacketSize: 4096 , } GlobalObject.Reload() }
[V0.5] 消息封装 功能 0.5版本要做的就是把服务器的全部数据都放在一个Request
里:
涉及知识点:
tcp封包拆包
消息封装
创建消息封装类型 当前的Request
结构如下:
1 2 3 4 type Request struct { conn ziface.IConnection data []byte }
创建消息封装的结构体以及相关方法:
imessage.go 在zinx/ziface/
下创建imessage.go
文件: 将请求的一个消息封装到 message 中,定义抽象层接口,定义好 Getter 方法和 Setter 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 package zifacetype IMessage interface { GetDataLen() uint32 GetMsgId() uint32 GetData() []byte SetMsgId(uint32 ) SetData([]byte ) SetDataLen(uint32 ) }
message.go 同时创建实例 message 类,在zinx/znet/
下,创建message.go
文件。
整理一个基本的 message 包,会包含消息 ID ,数据 ,数据长度 三个成员,提供基本的 setter 和 getter 方法,目的是为了以后做封装优化的作用。同时也提供了一个创建一个 message 包的初始化方法NewMegPackage
。
这里我们只需要要实现 Message 类,写出构造函数,实现接口中对应的方法,比较的简单,大家可以试试先自己尝试实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package znettype Message struct { Id uint32 DataLen uint32 Data []byte }func NewMsgPackage (id uint32 , data []byte ) *Message { return &Message{ Id: id, DataLen: uint32 (len (data)), Data: data, } }func (msg *Message) GetDataLen() uint32 { return msg.DataLen; }func (msg *Message) GetMsgId() uint32 { return msg.Id }func (msg *Message) GetData() []byte { return msg.Data }func (msg *Message) SetDataLen(len uint32 ) { msg.DataLen = len }func (msg *Message) SetMsgId(msgId uint32 ) { msg.Id = msgId }func (msg *Message) SetData(data []byte ) { msg.Data = data }
拆包与封包*
采用TCL(Type-Len-Value)封包格式解决TCP粘包问题
创建拆包封包抽象类 在zinx/ziface
下,创建idatapack.go
文件
我们需要三个方法:
1 2 3 4 5 6 7 8 9 10 package zifacetype IDataPack interface { GetHeadLen() uint32 Pack(msg IMessage)([]byte , error ) Unpack([]byte )(IMessage, error ) }
实现拆包封包类 在zinx/znet/
下,创建datapack.go
文件.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package znetimport ( "bytes" "encoding/binary" "errors" "zinx/utils" "zinx/ziface" )type DataPack struct {}func NewDataPack () *DataPack { return &DataPack{} }func (dp *DataPack) GetHeadLen() uint32 { return 8 }func (dp *DataPack) Pack(msg ziface.IMessage)([]byte , error ) { dataBuff := bytes.NewBuffer([]byte {}) if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil { return nil , err } if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil { return nil , err } if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil { return nil ,err } return dataBuff.Bytes(), nil }func (dp *DataPack) Unpack(binaryData []byte )(ziface.IMessage, error ) { dataBuff := bytes.NewReader(binaryData) msg := &Message{} if err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil { return nil , err } if err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil { return nil , err } if (utils.GlobalObject.MaxPacketSize > 0 && msg.DataLen > utils.GlobalObject.MaxPacketSize) { return nil , errors.New("Too large msg data recieved" ) } return msg, nil }
需要注意的是整理的Unpack
方法,因为我们从上图可以知道,我们进行拆包的时候是分两次过程的,第二次是依赖第一次的 dataLen 结果,所以Unpack
只能解压出包头 head 的内容,得到 msgId 和 dataLen。之后调用者再根据 dataLen 继续从 io 流中读取 body 中的数据。
测试拆包与封包类型 客户端与服务端的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 package mainimport ( "fmt" "io" "net" "zinx/znet" )func main () { listener, err := net.Listen("tcp" , "127.0.0.1:7777" ) if err != nil { fmt.Println("server listen err:" , err) return } for { conn, err := listener.Accept() if err != nil { fmt.Println("server accept err:" , err) } go func (conn net.Conn) { dp := znet.NewDataPack() for { headData := make ([]byte , dp.GetHeadLen()) _, err := io.ReadFull(conn, headData) if err != nil { fmt.Println("read head error" ) break } msgHead, err := dp.Unpack(headData) if err != nil { fmt.Println("server unpack err:" , err) return } if msgHead.GetDataLen() > 0 { msg := msgHead.(*znet.Message) msg.Data = make ([]byte , msg.GetDataLen()) _, err := io.ReadFull(conn, msg.Data) if err != nil { fmt.Println("server unpack data err:" , err) return } fmt.Println("==> Recv Msg: ID=" , msg.Id, ", len=" , msg.DataLen, ", data=" , string (msg.Data)) } } }(conn) } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package mainimport ( "fmt" "net" "zinx/znet" )func main () { conn, err := net.Dial("tcp" , "127.0.0.1:7777" ) if err != nil { fmt.Println("client dial err:" , err) return } dp := znet.NewDataPack() msg1 := &znet.Message{ Id: 0 , DataLen: 5 , Data: []byte {'h' , 'e' , 'l' , 'l' , 'o' }, } sendData1, err := dp.Pack(msg1) if err != nil { fmt.Println("client pack msg1 err:" , err) return } msg2 := &znet.Message{ Id: 1 , DataLen: 7 , Data: []byte {'w' , 'o' , 'r' , 'l' , 'd' , '!' , '!' }, } sendData2, err := dp.Pack(msg2) if err != nil { fmt.Println("client temp msg2 err:" , err) return } sendData1 = append (sendData1, sendData2...) conn.Write(sendData1) select {} }
Request字段修改 首先我们要将我们之前的 Request 中的[]byte
类型的 data 字段改成 Message 类型.。并且我们需要把 irequest 的方法新增一个 GetMsgID。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package znetimport "zinx/ziface" type Request struct { conn ziface.IConnection msg ziface.IMessage }func (r *Request) GetConnection() ziface.IConnection { return r.conn }func (r *Request) GetData() []byte { return r.msg.GetData() }func (r *Request) GetMsgID() uint32 { return r.msg.GetMsgId() }package zifacetype IRequest interface { GetConnection() IConnection GetData() []byte GetMsgID() uint32 }
集成拆包过程 接下来我们需要在 Connection 的StartReader()
方法中,修改之前的读取客户端的这段代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (c *Connection) StartReader() { for { buf := make ([]byte , utils.GlobalObject.MaxPacketSize) _, err := c.Conn.Read(buf) if err != nil { fmt.Println("recv buf err " , err) c.ExitBuffChan <- true continue } } }
将这个函数做出如下改造:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 func (c *Connection) StartReader() { fmt.Println("Reader Goroutine is running" ) defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!" ) defer c.Stop() for { dp := NewDataPack() headData := make ([]byte , dp.GetHeadLen()) if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil { fmt.Println("read msg head error " , err) c.ExitBuffChan <- true continue } msg , err := dp.Unpack(headData) if err != nil { fmt.Println("unpack error " , err) c.ExitBuffChan <- true continue } var data []byte if msg.GetDataLen() > 0 { data = make ([]byte , msg.GetDataLen()) if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil { fmt.Println("read msg data error " , err) c.ExitBuffChan <- true continue } } msg.SetData(data) req := Request{ conn:c, msg:msg, } go func (request ziface.IRequest) { c.Router.PreHandle(request) c.Router.Handle(request) c.Router.PostHandle(request) }(&req) } }
提供封包方法 现在我们已经将拆包的功能集成到 Zinx 中了,但是使用 Zinx 的时候,如果我们希望给用户返回一个 TLV 格式的数据,总不能每次都经过这么繁琐的过程,所以我们应该给 Zinx 提供一个封包的接口,供 Zinx 发包使用。 我们在 iconnection.go 中新增 SendMsg()方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package zifaceimport "net" type IConnection interface { Start() Stop() GetTCPConnection() *net.TCPConn GetConnID() uint32 RemoteAddr() net.Addr SendMsg(msgId uint32 , data []byte ) error }
然后,我们到 connection.go 中实现这个方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (c *Connection) SendMsg(msgId uint32 , data []byte ) error { if c.isClosed == true { return errors.New("Connection closed when send msg" ) } dp := NewDataPack() msg, err := dp.Pack(NewMsgPackage(msgId, data)) if err != nil { fmt.Println("Pack error msg id = " , msgId) return errors.New("Pack error msg " ) } if _, err := c.Conn.Write(msg); err != nil { fmt.Println("Write msg id " , msgId, " error " ) c.ExitBuffChan <- true return errors.New("conn Write error" ) } return nil }
注意,做出修改后,我们需要在 connection.go 中将 io 和 errors 包引入进来。
zinx 0.5 测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package mainimport ( "fmt" "zinx/ziface" "zinx/znet" )type PingRouter struct { znet.BaseRouter }func (this *PingRouter) Handle(request ziface.IRequest) { fmt.Println("Call PingRouter Handle" ) fmt.Println("recv from client : msgId=" , request.GetMsgID(), ", data=" , string (request.GetData())) err := request.GetConnection().SendMsg(1 , []byte ("ping...ping...ping" )) if err != nil { fmt.Println(err) } }func main () { s := znet.NewServer() s.AddRouter(&PingRouter{}) s.Serve() }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package mainimport ( "fmt" "io" "net" "time" "zinx/znet" )func main () { fmt.Println("Client Test ... start" ) time.Sleep(3 * time.Second) conn,err := net.Dial("tcp" , "127.0.0.1:7777" ) if err != nil { fmt.Println("client start err, exit!" ) return } for { dp := znet.NewDataPack() msg, _ := dp.Pack(znet.NewMsgPackage(0 ,[]byte ("Zinx V0.5 Client Test Message" ))) _, err := conn.Write(msg) if err !=nil { fmt.Println("write error err " , err) return } headData := make ([]byte , dp.GetHeadLen()) _, err = io.ReadFull(conn, headData) if err != nil { fmt.Println("read head error" ) break } msgHead, err := dp.Unpack(headData) if err != nil { fmt.Println("server unpack err:" , err) return } if msgHead.GetDataLen() > 0 { msg := msgHead.(*znet.Message) msg.Data = make ([]byte , msg.GetDataLen()) _, err := io.ReadFull(conn, msg.Data) if err != nil { fmt.Println("server unpack data err:" , err) return } fmt.Println("==> Recv Msg: ID=" , msg.Id, ", len=" , msg.DataLen, ", data=" , string (msg.Data)) } time.Sleep(1 *time.Second) } }
[v0.6] 多路由模式 功能
涉及知识点
多路由模式
单元测试
创建消息管理模块
创建消息管理模块抽象类
在zinx/ziface
下创建imsghandler.go
文件, 定义出我们之前图片中的方法。
1 2 3 4 5 6 7 8 package zifacetype IMsgHandle interface { DoMsgHandler(request IRequest) AddRouter(msgId uint32 , router IRouter) }
这里面有两个方法,AddRouter()
就是添加一个 msgId 和一个路由关系到 Apis 中,那么DoMsgHandler()
则是调用 Router 中具体Handle()
等方法的接口。
实现消息管理模块
在zinx/znet
下创建msghandler.go
文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package znetimport ( "fmt" "strconv" "zinx/ziface" )type MsgHandle struct { Apis map [uint32 ] ziface.IRouter }func NewMsgHandle () *MsgHandle { return &MsgHandle { Apis:make (map [uint32 ]ziface.IRouter), } }func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) { handler, ok := mh.Apis[request.GetMsgID()] if !ok { fmt.Println("api msgId = " , request.GetMsgID(), " is not FOUND!" ) return } handler.PreHandle(request) handler.Handle(request) handler.PostHandle(request) }func (mh *MsgHandle) AddRouter(msgId uint32 , router ziface.IRouter) { if _, ok := mh.Apis[msgId]; ok { panic ("repeated api , msgId = " + strconv.Itoa(int (msgId))) } mh.Apis[msgId] = router fmt.Println("Add api msgId = " , msgId) }
zinx其他模块的相应修改
首先iserver
的AddRouter()
的接口要稍微改一下,增添 MsgId 参数.
iserver.go:
1 2 3 4 5 6 7 8 9 10 11 12 package zifacetype IServer interface { Start() Stop() Serve() AddRouter(msgId uint32 , router IRouter) }
其次,Server
类中 之前有一个Router
成员 ,代表唯一的处理方法,现在应该替换成MsgHandler
成员。
zinx/znet/server.go
1 2 3 4 5 6 7 8 9 10 11 12 type Server struct { Name string IPVersion string IP string Port int msgHandler ziface.IMsgHandle }
初始化 Server 自然也要更正,增加 msgHandler 初始化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func NewServer () ziface.IServer { utils.GlobalObject.Reload() s:= &Server { Name :utils.GlobalObject.Name, IPVersion:"tcp4" , IP:utils.GlobalObject.Host, Port:utils.GlobalObject.TcpPort, msgHandler: NewMsgHandle(), } return s }
然后当 Server 在处理 conn 请求业务的时候,创建 conn 的时候也需要把 msgHandler 作为参数传递给 Connection 对象。也就是在我们 server.go 的 Start() 方法中的 3.3 注释下进行如下修改:
1 2 3 dealConn := NewConntion(conn, cid, s.msgHandler)
最后,我们的 AddRouter 方法做了修改,所以要重新实现接口方法:
1 2 3 4 func (s *Server) AddRouter(msgId uint32 , router ziface.IRouter) { s.msgHandler.AddRouter(msgId,router) }
那么接下来就是 Connection 对象了。固然在 Connection 对象中应该有 MsgHandler 的成员,来查找消息对应的回调路由方法。
zinx/znet/connection.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 type Connection struct { Conn *net.TCPConn ConnID uint32 isClosed bool MsgHandler ziface.IMsgHandle ExitBuffChan chan bool }func NewConntion (conn *net.TCPConn, connID uint32 , msgHandler ziface.IMsgHandle) *Connection{ c := &Connection{ Conn: conn, ConnID: connID, isClosed: false , MsgHandler: msgHandler, ExitBuffChan: make (chan bool , 1 ), } return c }
最后,在 conn 已经拆包之后,需要调用路由业务的时候,我们只需要让 conn 调用 MsgHandler 中的DoMsgHander()
方法就好了。
zinx/znet/connection.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func (c *Connection) StartReader() { fmt.Println("[Reader Goroutine is running]" ) defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]" ) defer c.Stop() for { dp := NewDataPack() headData := make ([]byte , dp.GetHeadLen()) if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil { fmt.Println("read msg head error " , err) break } msg , err := dp.Unpack(headData) if err != nil { fmt.Println("unpack error " , err) break } var data []byte if msg.GetDataLen() > 0 { data = make ([]byte , msg.GetDataLen()) if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil { fmt.Println("read msg data error " , err) continue } } msg.SetData(data) req := Request{ conn:c, msg:msg, } go c.MsgHandler.DoMsgHandler(&req) } }
好了,大功告成,我们来测试一下 Zinx 的多路由设置功能吧。
zinx测试 这里我们既然完成了多路由模式,那么就可以进行一个服务端,多个客户端的方式进行测试我们的功能模块了。
我们这里在 zinx 文件夹下新建 Client01.go 文件。
我们在 Server 端设置 2 个路由,一个是 MsgId 为 0 的消息会执行 PingRouter{}重写的Handle()
方法,一个是 MsgId 为 1 的消息会执行 HelloZinxRouter{}重写的Handle()
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package mainimport ( "fmt" "zinx/ziface" "zinx/znet" )type PingRouter struct { znet.BaseRouter }func (this *PingRouter) Handle(request ziface.IRequest) { fmt.Println("Call PingRouter Handle" ) fmt.Println("recv from client : msgId=" , request.GetMsgID(), ", data=" , string (request.GetData())) err := request.GetConnection().SendMsg(0 , []byte ("ping...ping...ping" )) if err != nil { fmt.Println(err) } }type HelloZinxRouter struct { znet.BaseRouter }func (this *HelloZinxRouter) Handle(request ziface.IRequest) { fmt.Println("Call HelloZinxRouter Handle" ) fmt.Println("recv from client : msgId=" , request.GetMsgID(), ", data=" , string (request.GetData())) err := request.GetConnection().SendMsg(1 , []byte ("Hello Zinx Router V0.6" )) if err != nil { fmt.Println(err) } }func main () { s := znet.NewServer() s.AddRouter(0 , &PingRouter{}) s.AddRouter(1 , &HelloZinxRouter{}) s.Serve() }
我们现在写两个客户端,分别发送 0 消息和 1 消息来进行测试 Zinx 是否能够处理 2 个不同的消息业务。
Client.go:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package mainimport ( "fmt" "io" "net" "time" "zinx/znet" )func main () { fmt.Println("Client Test ... start" ) time.Sleep(3 * time.Second) conn,err := net.Dial("tcp" , "127.0.0.1:7777" ) if err != nil { fmt.Println("client start err, exit!" ) return } for { dp := znet.NewDataPack() msg, _ := dp.Pack(znet.NewMsgPackage(0 ,[]byte ("Zinx V0.6 Client0 Test Message" ))) _, err := conn.Write(msg) if err !=nil { fmt.Println("write error err " , err) return } headData := make ([]byte , dp.GetHeadLen()) _, err = io.ReadFull(conn, headData) if err != nil { fmt.Println("read head error" ) break } msgHead, err := dp.Unpack(headData) if err != nil { fmt.Println("server unpack err:" , err) return } if msgHead.GetDataLen() > 0 { msg := msgHead.(*znet.Message) msg.Data = make ([]byte , msg.GetDataLen()) _, err := io.ReadFull(conn, msg.Data) if err != nil { fmt.Println("server unpack data err:" , err) return } fmt.Println("==> Recv Msg: ID=" , msg.Id, ", len=" , msg.DataLen, ", data=" , string (msg.Data)) } time.Sleep(1 *time.Second) } }
Client01.go:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package mainimport ( "fmt" "io" "net" "time" "zinx/znet" )func main () { fmt.Println("Client Test ... start" ) time.Sleep(3 * time.Second) conn,err := net.Dial("tcp" , "127.0.0.1:7777" ) if err != nil { fmt.Println("client start err, exit!" ) return } for { dp := znet.NewDataPack() msg, _ := dp.Pack(znet.NewMsgPackage(1 ,[]byte ("Zinx V0.6 Client1 Test Message" ))) _, err := conn.Write(msg) if err !=nil { fmt.Println("write error err " , err) return } headData := make ([]byte , dp.GetHeadLen()) _, err = io.ReadFull(conn, headData) if err != nil { fmt.Println("read head error" ) break } msgHead, err := dp.Unpack(headData) if err != nil { fmt.Println("server unpack err:" , err) return } if msgHead.GetDataLen() > 0 { msg := msgHead.(*znet.Message) msg.Data = make ([]byte , msg.GetDataLen()) _, err := io.ReadFull(conn, msg.Data) if err != nil { fmt.Println("server unpack data err:" , err) return } fmt.Println("==> Recv Msg: ID=" , msg.Id, ", len=" , msg.DataLen, ", data=" , string (msg.Data)) } time.Sleep(1 *time.Second) } }
然后我们点击命令行右上角的分隔按钮,启动三个命令行窗口。值得注意的是,每启动一个窗口,都需要在里面先执行 export GOPATH=/home/project
这道命令。
测试结果:
[v0.7] 读写分离 功能
接下来我们就要对 Zinx 做一个小小的改变,就是与客户端进修数据交互的 Gouroutine 由一个变成两个,一个专门负责从客户端读取数据,一个专门负责向客户端写数据。这么设计有什么好处,当然是目的就是高内聚,模块的功能单一,对于我们今后扩展功能更加方便。
知识点
Golang并发模型
读写分离
准备工作
Server 依然是处理客户端的响应,主要关键的几个方法是 Listen、Accept 等。当建立与客户端的套接字后,那么就会开启两个 Goroutine 分别处理读数据业务和写数据业务,读写数据之间的消息通过一个 Channel 传递。下面我们就开始进行实际的实现。
1. 添加读写模块交互数据的管道
connection.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 type Connection struct { Conn *net.TCPConn ConnID uint32 isClosed bool MsgHandler ziface.IMsgHandle ExitBuffChan chan bool msgChan chan []byte }func NewConntion (conn *net.TCPConn, connID uint32 , msgHandler ziface.IMsgHandle) *Connection{ c := &Connection{ Conn: conn, ConnID: connID, isClosed: false , MsgHandler: msgHandler, ExitBuffChan: make (chan bool , 1 ), msgChan:make (chan []byte ), } return c }
2. 创建 Writer Goroutine
zinx/znet/connection.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (c *Connection) StartWriter() { fmt.Println("[Writer Goroutine is running]" ) defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]" ) for { select { case data := <-c.msgChan: if _, err := c.Conn.Write(data); err != nil { fmt.Println("Send Data error:, " , err, " Conn Writer exit" ) return } case <- c.ExitBuffChan: return } } }
关于 for select 和 channel 的用法:
select 语句只能与通道联用,它一般由若干个分支组成。每次执行这种语句的时候,一般只有一个分支中的代码会被运行。select 语句的分支分为两种,一种叫做候选分支,另一种叫做默认分支。候选分支总是以关键字 case 开头,后跟一个 case 表达式和一个冒号,然后我们可以从下一行开始写入当分支被选中时需要执行的语句。
由于 select 语句是专为通道而设计的,所以每个 case 表达式中都只能包含操作通道的表达式,比如接收表达式。使用一个接收值可以接收通道里的值,使用两个接收值可以判断通道是否已经关闭了。
对于 select 语句的执行规则如下:
每个 case 都必须是一个通信。
所有 Channel 表达式都会被求值。
所有被发送的表达式都会被求值。
如果任意某个通信可以进行,它就执行,其他被忽略。
如果有多个 case 都可以运行,Select 会随机公平地选出一个执行。其他不会执行。 否则:
如果有 default 子句,则执行该语句。
如果没有 default 子句,select 将阻塞,直到某个通信可以运行;Go 不会重新对 Channel 或值进行求值。
注意这里是和 switch 的操作是不一样的,switch 操作中,只要从上到下有一个满足条件了,就会执行相应的那一个 case,select 中,我们是全部计算一遍,然后再从可满足条件的 case 中公平的执行其中一个。这是为了防止有些通道长期得不到执行。
3. Reader 将发送客户端的数据改为发送至Channel 修改 Reader 调用的SendMsg()
方法
zinx/znet/connection.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (c *Connection) SendMsg(msgId uint32 , data []byte ) error { if c.isClosed == true { return errors.New("Connection closed when send msg" ) } dp := NewDataPack() msg, err := dp.Pack(NewMsgPackage(msgId, data)) if err != nil { fmt.Println("Pack error msg id = " , msgId) return errors.New("Pack error msg " ) } c.msgChan <- msg return nil }
修改Start()
方法
zinx/znet/connection.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (c *Connection) Start() { go c.StartReader() go c.StartWriter() for { select { case <- c.ExitBuffChan: return } } }
Zinx测试 0.7版本的测试与0.6一致,因为只修改了内部消息发送的机制,对外的消息接口并没有发生变化
[v0.8] 实现工作池 功能 给Zinx添加消息队列和多任务Worker机制。
知识点
消息队列
工作池
这一步我们要实现的是,可以通过 worker 的数量来限定处理业务的固定goroutine数量,而不是无限制的开辟Goroutine,随谈我们知道go的调度算法已经做的很极致了,但是大数量的Goroutine依然会带来一些不必要的环境切换成本,这些本应该是服务器应该节省掉的成本。我们可以用消息队列来缓冲worker工作的数据。
设计结构如下:
步骤 1. 创建消息队列 首先,处理消息队列的部分,我们应该集成到MsgHandler
模块下,因为属于我们消息模块范畴内的。
zinx/znet/msghandler.go
1 2 3 4 5 6 7 8 9 10 11 12 13 type MsgHandle struct { Apis map [uint32 ]ziface.IRouter WorkerPoolSize uint32 TaskQueue []chan ziface.IRequest }func NewMsgHandle () *MsgHandle { return &MsgHandle{ Apis: make (map [uint32 ]ziface.IRouter), WorkerPoolSize:utils.GlobalObject.WorkerPoolSize, TaskQueue:make ([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize), } }
这里添加两个成员:
WokerPoolSize
:作为工作池的数量,因为 TaskQueue 中的每个队列应该是和一个 Worker 对应的,所以我们在创建 TaskQueue 中队列数量要和 Worker 的数量一致。
TaskQueue
真是一个 Request 请求信息的 channel 集合。用来缓冲提供 worker 调用的 Request 请求信息,worker 会从对应的队列中获取客户端的请求数据并且处理掉。
当然WorkerPoolSize
最好也可以从GlobalObject
获取,并且zinx.json
配置文件可以手动配置。
zinx/utils/globalobj.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 type GlobalObj struct { TcpServer ziface.IServer Host string TcpPort int Name string Version string MaxPacketSize uint32 MaxConn int WorkerPoolSize uint32 MaxWorkerTaskLen uint32 ConfFilePath string }func init () { GlobalObject = &GlobalObj{ Name: "ZinxServerApp" , Version: "V0.4" , TcpPort: 7777 , Host: "0.0.0.0" , MaxConn: 12000 , MaxPacketSize: 4096 , ConfFilePath: "conf/zinx.json" , WorkerPoolSize: 10 , MaxWorkerTaskLen: 1024 , } GlobalObject.Reload() }
2. 创建及启动Worker工作池 现在添加 Worker 工作池,先定义一些启动工作池的接口。
zinx/ziface/imsghandler.go
1 2 3 4 5 6 7 8 9 type IMsgHandle interface { DoMsgHandler(request IRequest) AddRouter(msgId uint32 , router IRouter) StartWorkerPool() SendMsgToTaskQueue(request IRequest) }
zinx/znet/msghandler.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func (mh *MsgHandle) StartOneWorker(workerID int , taskQueue chan ziface.IRequest) { fmt.Println("Worker ID = " , workerID, " is started." ) for { select { case request := <-taskQueue: mh.DoMsgHandler(request) } } }func (mh *MsgHandle) StartWorkerPool() { for i:= 0 ; i < int (mh.WorkerPoolSize); i++ { mh.TaskQueue[i] = make (chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen) go mh.StartOneWorker(i, mh.TaskQueue[i]) } }
StartWorkerPool()
方法是启动 Worker 工作池,这里根据用户配置好的WorkerPoolSize
的数量来启动,然后分别给每个 Worker 分配一个TaskQueue
,然后用一个 goroutine 来承载一个 Worker 的工作业务。
StartOneWorker()
方法就是一个 Worker 的工作业务,每个 worker 是不会退出的(目前没有设定 worker 的停止工作机制),会永久的从对应的 TaskQueue 中等待消息,并处理。
3. 发送消息给消息队列 现在,worker 工作池已经准备就绪了,那么就需要有一个给到 worker 工作池消息的入口,我们再定义一个方法
zinx/znet/msghandler.go
1 2 3 4 5 6 7 8 9 10 func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) { workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize fmt.Println("Add ConnID=" , request.GetConnection().GetConnID()," request msgID=" , request.GetMsgID(), "to workerID=" , workerID) mh.TaskQueue[workerID] <- request }
SendMsgToTaskQueue()
作为工作池的数据入口,这里面采用的是轮询的分配机制,因为不同链接信息都会调用这个入口,那么到底应该由哪个 worker 处理该链接的请求处理,整理用的是一个简单的求模运算。用余数和 workerID 的匹配来进行分配。
最终将 request 请求数据发送给对应 worker 的 TaskQueue,那么对应的 worker 的 Goroutine 就会处理该链接请求了。
4. 工作池代码调用 好了,现在需要将消息队列和多任务 worker 机制集成到我们 Zinx 的中了。我们在 Server 的Start()
方法中,在服务端 Accept 之前,启动 Worker 工作池。
zinx/znet/server.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (s *Server) Start() { go func () { s.msgHandler.StartWorkerPool() addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d" , s.IP, s.Port)) if err != nil { fmt.Println("resolve tcp addr err: " , err) return } } }() }
其次,当我们已经得到客户端的连接请求过来数据的时候,我们应该将数据发送给 Worker 工作池进行处理。
所以应该在 Connection 的StartReader()
方法中修改:
zinx/znet/connection.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (c *Connection) StartReader() { fmt.Println("Reader Goroutine is running" ) defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!" ) defer c.Stop() for { req := Request{ conn:c, msg:msg, } if utils.GlobalObject.WorkerPoolSize > 0 { c.MsgHandler.SendMsgToTaskQueue(&req) } else { go c.MsgHandler.DoMsgHandler(&req) } } }
这里并没有强制使用多任务 Worker 机制,而是判断用户配置WorkerPoolSize
的个数,如果大于 0,那么我就启动多任务机制处理链接请求消息,如果=0 或者<0 那么,我们依然只是之前的开启一个临时的 Goroutine 处理客户端请求消息。
[v0.9] 实现链接控制 功能
知识点
链接管理
数量限制
步骤