[Toc]

go环境变量设置 export GOPATH=/home/project

Zinx架构设计

客户端请求服务器响应的过程

  1. 要有一个客户端对服务器发起请求。
  2. 我们的服务器应该去启动对客户端的处理模块并打开工作池来提升并发量。
  3. 处理客户端的模块开启两个模块,一个负责读客户端请求,一个负责写客户端请求。
  4. 用于读的功能模块,去任务的消息队列里去请求读数据。用于写的功能模块,通过 API 接口,当然我们的 API 不可能只有一个,所以这里肯定是 APIS。

其过程如下所示:

img

Zinx功能模块

img

v0.1

Zinx目录结构如下所示:

img

iserver.go

1
2
3
4
5
6
7
8
9
10
11
package ziface

// 定义服务器接口
type IServer interface {
// 启动服务器
Start()
// 停止服务器
Stop()
// 开启业务服务方法
Server()
}

server.go *

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

package znet
import (
"fmt"
"net"
"zinx/ziface"
)

// IServer 接口实现,定义一个Server服务类
type Server struct {
//服务器的名称
Name string
//tcp4 or other
IPVersion string
//服务绑定的IP地址
IP string
//服务绑定的端口
Port int
}

/*
创建一个服务器句柄
*/

func NewServer (name string) ziface.IServer {
s := &Server {
Name: name,
IPVersion:"tcp4",
IP:"0.0.0.0",
Port:7777,
}
return s
}

// 开启网络服务
func (s *Server) Start() {
fmt.Printf("[START] Server listenner at IP: %s, Port %d, is starting\n", s.IP, s.Port)

// 开启一个go去做服务端Linster业务
go func() {
// 1. 获取一个tcp的Addr
addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s,%d", s.IP, s. Port))
if err != nil {
fmt.Println("resolve tcp addr err: ", err)
}

// 2. 监听服务器地址
listenner, err := net.ListenTCP(s.IPVersion, addr)
if err != nil {
fmt.Println("listen", s.IPVersion, "err", err)
return
}

// 已经监听成功
fmt.Println("Start zinx Server ", s.Name, " succ, now listenning……")

// 3 启动server网络连接业务
for {
// 3.1 阻塞等待客户端建立连接请求
conn, err := listenner.AcceptTCP()
if err != nil {
fmt.Println("Accept err ", err)
continue
}

// 3.2 TODO Server.Start() 设置服务器最大连接控制,如果超过最大连接,那么关闭此新的连接
// 3.3 TODO Server.Start() 处理该新连接请求的 *业务* 方法,此时应该有handler和conn是绑定的
// 我们这里暂时做一个最大512字节的回显服务
go func() {
// 不断地循环从客户端获取数据
for {
buf := make([]byte, 512)
cnt, err := conn.Read(buf)
if err != nil {
fmt.Println("recv bug err ", err)
continue
}

// 回显
if _, err := conn.Write(buf[:cnt]); err != nil {
fmt.Println("write back buf err", err)
continue
}
}
}()
}
}()
}

// 停止
func (s *Server) Stop() {
fmt.Println("[STOP] Zinx server, name", s.Name)
}

// 开启业务
func (s *Server) Server() {

}

server_test.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package znet

import (
"fmt"
"net"
"testing"
"time"
)

/*
模拟客户端
*/
func ClientTest() {
fmt.Println("Client Test ... start")
// 3秒之后发起测试请求,给服务端开启服务的机会
time.Sleep(3 * time.Second)
conn,err := net.Dial("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("client start err, exit!")
return
}

for {
_, err = conn.Write([]byte("Hello ZINX"))
if err != nil {
fmt.Println("Write error err ", err)
return
}

buf := make([]byte, 512)
cnt, err := conn.Read(buf)
if err != nil {
fmt.Println("read buf err", err)
return
}
fmt.Printf(" server call back: %s, cnt = %d\n", buf, cnt)
time.Sleep(1 * time.Second)
}
}

// Server 模块的测试函数
func TestServer(t *testing.T) {
/*
服务端测试
*/

// 1 创建一个server句柄 s
s := NewServer("[zinx V0.1]")

/*
客户端测试
*/
go ClientTest()

// 2 开启服务
s.Server()

}

[v0.2] 实现链接封装业务与业务绑定

功能如思维导图所示:

image-20220201232354654

文件结构

image-20220202175949470

代码

iconnection.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package ziface
import "net"

// 定义连接接口
type IConnection interface {
// 启动连接,让电气概念连接开始工作
Start()
// 停止连接,结束当前链接状态
Stop()
// 从当前链接获取原始的socket TCPConn
GetTCPConnection() *net.TCPConn
// 获取当前链接ID
GetConnID() uint32
// 获取远程客户端地址信息
RemoteAddr() net.Addr
}

// 定义一个统一处理链接业务的接口
type HandFunc func(*net.TCPConn, []byte, int) error

connection.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package znet

import (
"fmt"
"net"
"zinx/ziface"
)

type Connection struct {
// 当前链接的socket TCP套接字
Conn *net.TCPConn
// 当前连接的ID 也可以称作为SessionID, ID全局唯一
ConnID uint32
// 当前连接的关闭状态
isClosed bool
// 该链接的处理方法api
handleAPI ziface.HandFunc
// 告知该链接已经退出/停止的channel
ExitBufferChan chan bool
}

// 创建连接的方法
func NewConntion(conn *net.TCPConn, connID uint32, callBack_api ziface.HandFunc) *Connection {
c := &Connection{
Conn: conn,
ConnID: connID,
isClosed: false,
handleAPI: callBack_api,
ExitBufferChan: make(chan bool, 1),
}
return c
}

/* 处理conn读数据的Goroutine */
func (c *Connection) StartReader() {
fmt.Println("Reader Goroutine is running")
defer fmt.Println(c.Conn.RemoteAddr().String(), "conn reader exit")
defer c.Stop()

for {
// 读取我们最大的数据到buf中
buf := make([]byte,512)
cnt, err := c.Conn.Read(buf)
if err != nil {
fmt.Println("recv bug err ", err)
c.ExitBufferChan <- true
continue
}

// 调用当前链接业务(这里执行的是当前conn的绑定的handle方法)
if err := c.handleAPI(c.Conn, buf, cnt); err != nil {
fmt.Println("connID ", c.ConnID, " handle is error")
c.ExitBufferChan <- true
return
}
}
}

// 启动链接,让当前连接开始工作
func (c *Connection) Start() {
// 开启处理该链接读取客户端数据之后的请求业务
go c.StartReader()
for {
select {
case <- c.ExitBufferChan:
// 得到退出消息,不再阻塞
return
}
}
}

// 停止连接,结束当前连接状态M
func (c *Connection) Stop() {
// 1. 如果当前链接已经关闭
if c.isClosed == true {
return
}

c.isClosed = true

// TODO Connection Stop() 如果用户注册了该链接的关闭毁掉业务,那么在此刻应该显示调用
// 关闭socket链接
c.Conn.Close()
// 通知从缓冲队列读取数据的业务,该链接已经关闭
c.ExitBufferChan <- true
// 关闭该链接全部管道
close(c.ExitBufferChan)
}

// 从当前连接获取原始的socket TCPConn
func (c *Connection) GetTCPConnection() *net.TCPConn {
return c.Conn
}
// 获取当前ID
func (c *Connection) GetConnID() uint32 {
return c.ConnID
}
// 获取远程客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}

server.go 代码修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// TODO server.go 应该有一个自动生成ID的方法
+ var cid uint32 = 0
// 3 启动server网络连接业务
for {
// 3.1 阻塞等待客户端建立连接请求
conn, err := listenner.AcceptTCP()
if err != nil {
fmt.Println("Accept err ", err)
continue
}

// 3.2 TODO Server.Start() 设置服务器最大连接控制,如果超过最大连接,那么关闭此新的连接
// 3.3 TODO Server.Start() 处理该新连接请求的 *业务* 方法,此时应该有handler和conn是绑定的
+ dealConn := NewConntion(conn, cid, CallBackToClient)
+ cid ++

//3.4 启动当前链接的处理业务
go dealConn.Start()
}
}()

[v0.3] 实现基础路由模块

通俗讲就是实现一个类,系统地去回调一些用户的操作

功能

img

涉及知识点:

  1. 路由功能模块

目录结构

image-20220203160347674

IRequest消息请求抽象类

把客户端请求的连接信息和请求的数据,放在一个叫 Request 的请求类里,这样的好处是我们可以从 Request 里得到全部客户端的请求信息,也为我们之后拓展框架有一定的作用,一旦客户端有额外的含义的数据信息,都可以放在这个 Request 里。可以理解为每次客户端的全部请求数据,Zinx 都会把它们一起放到一个 Request 结构体里。

  1. 创建抽象IRequest
1
2
3
4
5
6
7
8
9
package ziface
/*
IRequest 接口:
实际上是把客户端请求的链接信息 和 请求的数据 包装到了 Request里
*/
type IRequest interface{
GetConnection() IConnection //获取请求连接信息
GetData() []byte //获取请求消息的数据
}

当前的抽象层只提供了两个 Getter 方法,所以有个成员应该是必须的,一个是客户端连接,一个是客户端传递进来的数据,当然随着 Zinx 框架的功能丰富,这里面还应该继续添加新的成员。

  1. 实现Requeset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package znet

import "zinx/ziface"

type Request struct {
conn ziface.IConnection //已经和客户端建立好的 链接
data []byte //客户端请求的数据
}
//获取请求连接信息
func(r *Request) GetConnection() ziface.IConnection {
return r.conn
}
//获取请求消息的数据
func(r *Request) GetData() []byte {
return r.data
}

IRouter路由配置抽象类*

实现一个非常简单基础的路由功能,目的当然就是为了快速的让 Zinx 步入到路由的阶段。

  1. 创建抽象的IRouter

我们知道 router 实际上的作用就是,服务端应用可以给 Zinx 框架配置当前链接的处理业务方法,之前的 Zinx-V0.2 我们的 Zinx 框架处理链接请求的方法是固定的,现在是可以自定义,并且有 3 种接口可以重写。

Handle:是处理当前链接的主业务函数

PreHandle:如果需要在主业务函数之前有前置业务,可以重写这个方法

PostHandle:如果需要在主业务函数之后又后置业务,可以重写这个方法

当然每个方法都有一个唯一的形参 IRequest 对象,也就是客户端请求过来的连接和请求数据,作为我们业务方法的输入数据。

1
2
3
4
5
6
7
8
9
10
package ziface
/*
路由接口, 这里面路由是 使用框架者给该链接自定的 处理业务方法
路由里的IRequest 则包含用该链接的链接信息和该链接的请求数据信息
*/
type IRouter interface{
PreHandle(request IRequest) //在处理conn业务之前的钩子方法
Handle(request IRequest) //处理conn业务的方法
PostHandle(request IRequest) //处理conn业务之后的钩子方法
}
  1. 实现Router
1
2
3
4
5
6
7
8
9
10
11
12
package znet

import "zinx/ziface"

//实现router时,先嵌入这个基类,然后根据需要对这个基类的方法进行重写
type BaseRouter struct {}
//这里之所以BaseRouter的方法都为空,
// 是因为有的Router不希望有PreHandle或PostHandle
// 所以Router全部继承BaseRouter的好处是,不需要实现PreHandle和PostHandle也可以实例化
func (br *BaseRouter)PreHandle(req ziface.IRequest){}
func (br *BaseRouter)Handle(req ziface.IRequest){}
func (br *BaseRouter)PostHandle(req ziface.IRequest){}

IServer增添路由添加功能

这一步需要修改原有的链接结构体,同时对服务中的方法进行修改

server类

  1. iserver.go
1
2
3
4
5
6
7
8
9
10
11
12
package ziface
//定义服务器接口
type IServer interface{
//启动服务器方法
Start()
//停止服务器方法
Stop()
//开启业务服务方法
Serve()
//路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
AddRouter(router IRouter)
}
  1. server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
//iServer 接口实现,定义一个Server服务类
type Server struct {
//服务器的名称
Name string
//tcp4 or other
IPVersion string
//服务绑定的IP地址
IP string
//服务绑定的端口
Port int
//当前Server由用户绑定的回调router,也就是Server注册的链接对应的处理业务
Router ziface.IRouter
}
  1. NewServer()方法中的成员初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
/*
创建一个服务器句柄
*/
func NewServer (name string) ziface.IServer {
s:= &Server {
Name :name,
IPVersion:"tcp4",
IP:"0.0.0.0",
Port:7777,
Router: nil,
}
return s
}

Connection类

  1. connection.go
1
2
3
4
5
6
7
8
9
10
11
12
type Connection struct {
//当前连接的socket TCP套接字
Conn *net.TCPConn
//当前连接的ID 也可以称作为SessionID,ID全局唯一
ConnID uint32
//当前连接的关闭状态
isClosed bool
//该连接的处理方法router
Router ziface.IRouter
//告知该链接已经退出/停止的channel
ExitBuffChan chan bool
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (c *Connection) StartReader() {
fmt.Println("Reader Goroutine is running")
defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
defer c.Stop()
for {
//读取我们最大的数据到buf中
buf := make([]byte, 512)
_, err := c.Conn.Read(buf)
if err != nil {
fmt.Println("recv buf err ", err)
c.ExitBuffChan <- true
continue
}
//得到当前客户端请求的Request数据
req := Request{
conn:c,
data:buf,
}
//从路由Routers 中找到注册绑定Conn的对应Handle
go func (request ziface.IRequest) {
//执行注册的路由方法
c.Router.PreHandle(request)
c.Router.Handle(request)
c.Router.PostHandle(request)
}(&req)
}
}

AddRouter

server类要实现添加路由的方法AddRouter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package znet
import (
"fmt"
"net"
"time"
"zinx/ziface"
)
//iServer 接口实现,定义一个Server服务类
type Server struct {
//服务器的名称
Name string
//tcp4 or other
IPVersion string
//服务绑定的IP地址
IP string
//服务绑定的端口
Port int
//当前Server由用户绑定的回调router,也就是Server注册的链接对应的处理业务
Router ziface.IRouter
}
/*
创建一个服务器句柄
*/
func NewServer (name string) ziface.IServer {
s:= &Server {
Name :name,
IPVersion:"tcp4",
IP:"0.0.0.0",
Port:7777,
Router: nil,
}
return s
}
//============== 实现 ziface.IServer 里的全部接口方法 ========
//开启网络服务
func (s *Server) Start() {
fmt.Printf("[START] Server listenner at IP: %s, Port %d, is starting\n", s.IP, s.Port)
//开启一个go去做服务端Linster业务
go func() {
//1 获取一个TCP的Addr
addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
if err != nil {
fmt.Println("resolve tcp addr err: ", err)
return
}
//2 监听服务器地址
listenner, err:= net.ListenTCP(s.IPVersion, addr)
if err != nil {
fmt.Println("listen", s.IPVersion, "err", err)
return
}
//已经监听成功
fmt.Println("start Zinx server ", s.Name, " succ, now listenning...")
//TODO server.go 应该有一个自动生成ID的方法
var cid uint32
cid = 0
//3 启动server网络连接业务
for {
//3.1 阻塞等待客户端建立连接请求
conn, err := listenner.AcceptTCP()
if err != nil {
fmt.Println("Accept err ", err)
continue
}
//3.2 TODO Server.Start() 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接
//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的
dealConn := NewConntion(conn, cid, s.Router)
cid ++
//3.4 启动当前链接的处理业务
go dealConn.Start()
}
}()
}
func (s *Server) Stop() {
fmt.Println("[STOP] Zinx server , name " , s.Name)
//TODO Server.Stop() 将其他需要清理的连接信息或者其他信息 也要一并停止或者清理
}
func (s *Server) Serve() {
s.Start()
//TODO Server.Serve() 是否在启动服务的时候 还要处理其他的事情呢 可以在这里添加
//阻塞,否则主Go退出, listenner的go将会退出
for {
time.Sleep(10*time.Second)
}
}
//路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
func (s *Server)AddRouter(router ziface.IRouter) {
s.Router = router
fmt.Println("Add Router succ! " )
}

connnection同样要加上相应的路由对应的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package znet
import (
"fmt"
"net"
"zinx/ziface"
)
type Connection struct {
//当前连接的socket TCP套接字
Conn *net.TCPConn
//当前连接的ID 也可以称作为SessionID,ID全局唯一
ConnID uint32
//当前连接的关闭状态
isClosed bool
//该连接的处理方法router
Router ziface.IRouter
//告知该链接已经退出/停止的channel
ExitBuffChan chan bool
}
//创建连接的方法
func NewConntion(conn *net.TCPConn, connID uint32, router ziface.IRouter) *Connection{
c := &Connection{
Conn: conn,
ConnID: connID,
isClosed: false,
Router: router,
ExitBuffChan: make(chan bool, 1),
}
return c
}
func (c *Connection) StartReader() {
fmt.Println("Reader Goroutine is running")
defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
defer c.Stop()
for {
//读取我们最大的数据到buf中
buf := make([]byte, 512)
_, err := c.Conn.Read(buf)
if err != nil {
fmt.Println("recv buf err ", err)
c.ExitBuffChan <- true
continue
}
//得到当前客户端请求的Request数据
req := Request{
conn:c,
data:buf,
}
//从路由Routers 中找到注册绑定Conn的对应Handle
go func (request ziface.IRequest) {
//执行注册的路由方法
c.Router.PreHandle(request)
c.Router.Handle(request)
c.Router.PostHandle(request)
}(&req)
}
}
//启动连接,让当前连接开始工作
func (c *Connection) Start() {
//开启处理该链接读取到客户端数据之后的请求业务
go c.StartReader()
for {
select {
case <- c.ExitBuffChan:
//得到退出消息,不再阻塞
return
}
}
}
//停止连接,结束当前连接状态M
func (c *Connection) Stop() {
//1. 如果当前链接已经关闭
if c.isClosed == true {
return
}
c.isClosed = true
//TODO Connection Stop() 如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
// 关闭socket链接
c.Conn.Close()
//通知从缓冲队列读数据的业务,该链接已经关闭
c.ExitBuffChan <- true
//关闭该链接全部管道
close(c.ExitBuffChan)
}
//从当前连接获取原始的socket TCPConn
func (c *Connection) GetTCPConnection() *net.TCPConn {
return c.Conn
}
//获取当前连接ID
func (c *Connection) GetConnID() uint32{
return c.ConnID
}
//获取远程客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}

测试

Server.go

我们这里自定义了一个类似 Ping 操作的路由,就是当客户端发送数据,我们的处理业务就是返回给客户端”ping..ping..ping..”, 为了测试,当前路由也同时实现了 PreHandle 和 PostHandle 两个方法。实际上 Zinx 会利用模板的设计模式,依次在框架中调用PreHandleHandlePostHandle三个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main
import (
"fmt"
"zinx/ziface"
"zinx/znet"
)
//ping test 自定义路由
type PingRouter struct {
znet.BaseRouter //一定要先基础BaseRouter
}
//Test PreHandle
func (this *PingRouter) PreHandle(request ziface.IRequest) {
fmt.Println("Call Router PreHandle")
_, err := request.GetConnection().GetTCPConnection().Write([]byte("before ping ....\n"))
if err !=nil {
fmt.Println("call back ping ping ping error")
}
}
//Test Handle
func (this *PingRouter) Handle(request ziface.IRequest) {
fmt.Println("Call PingRouter Handle")
_, err := request.GetConnection().GetTCPConnection().Write([]byte("ping...ping...ping\n"))
if err !=nil {
fmt.Println("call back ping ping ping error")
}
}
//Test PostHandle
func (this *PingRouter) PostHandle(request ziface.IRequest) {
fmt.Println("Call Router PostHandle")
_, err := request.GetConnection().GetTCPConnection().Write([]byte("After ping .....\n"))
if err !=nil {
fmt.Println("call back ping ping ping error")
}
}
func main(){
//创建一个server句柄
s := znet.NewServer("[zinx V0.3]")
s.AddRouter(&PingRouter{})
//2 开启服务
s.Serve()
}

Client.go

代码不变

[v0.4] 全局配置模块

增加一个配置文件zinx.json,保存服务器的各项属性,方便修改服务器的参数

功能

img

涉及知识点:

  1. json格式问题
  2. 全局配置文件的好处

zinx目录下新建utils文件夹,在utils文件夹下新建globalobj.go文件,这个go文件就是实现配置文件读取与输出的功能。

配置文件

zinx下新建conf文件下,在其下新建zinx.json配置文件,内容如下:

1
2
3
4
5
6
{
"Name": "demo server",
"Host": "127.0.0.1",
"TcpPort": 7777,
"MaxConn": 3
}

globalobj.go的功能来看,这里可以设置你所有的需要加入到服务器的配置,以GlobalObj结构体中的数据成员为主。

globalobj.go

对新手来讲,这个文件的代码中需要关注的两个函数:

  1. ioutil.ReadFile
  2. json.Unmarshal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package utils

import (
"encoding/json"
"io/ioutil"
"zinx/ziface"
"fmt"
)

/*
存储一些有关Zinx框架的全局参数,供其他模块使用
一些参数也可以通过用户根据zinx.json来配置
*/

type GlobalObj struct {
tcpServer ziface.IServer // 当前Zinx的全局Server对象
Host string // 当前服务器主机IP
TcpPort int // 当前服务器主机监听端口号
Name string // 当前服务器名称
Version string // 当前Zinx版本
MaxPacketSize uint32 // 都需数据包的最大值
MaxConn int // 当前服务器主机允许的最大链接个数
}

/*
定义一个全局的对象
*/
var GlobalObject *GlobalObj

// 读取用户的配置文件
func (g *GlobalObj) Reload() {
data, err := ioutil.ReadFile("D:/Program Files/Go/src/zinx/conf/zinx.json")
if err != nil {
panic(err)
}

// 将 json 数据解析到struct中
// fmt.Printf("json: %s\n", data)
err = json.Unmarshal(data, &GlobalObject)
if err != nil {
panic(err)
}

fmt.Println(fmt.Sprintf("%+v",*GlobalObject))
}

/*
提供init方法,默认加载
*/
func init() {
// 初始化GlobalObject变量,设置一些默认值
GlobalObject = & GlobalObj{
Name: "ZinxServerApp",
Version: "V0.4",
TcpPort: 7777,
Host: "0.0.0.0",
MaxConn: 12000,
MaxPacketSize: 4096,
}

// 从配置文件中加载一些用户配置的参数
GlobalObject.Reload()
}

[V0.5] 消息封装

功能

0.5版本要做的就是把服务器的全部数据都放在一个Request里:

img

涉及知识点:

  1. tcp封包拆包
  2. 消息封装

创建消息封装类型

当前的Request结构如下:

1
2
3
4
type Request struct {
conn ziface.IConnection //已经和客户端建立好的链接
data []byte //客户端请求的数据
}

创建消息封装的结构体以及相关方法:

imessage.go

zinx/ziface/下创建imessage.go文件: 将请求的一个消息封装到 message 中,定义抽象层接口,定义好 Getter 方法和 Setter 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package ziface

/*
将请求的一个消息封装到message中,定义抽象层接口
*/

type IMessage interface {
GetDataLen() uint32 // 获取消息数据段长度
GetMsgId() uint32 // 获取消息ID
GetData() []byte // 获取消息内容
SetMsgId(uint32) // 设置消息ID
SetData([]byte) // 设置消息内容
SetDataLen(uint32) // 设置消息数据段长度
}

message.go

同时创建实例 message 类,在zinx/znet/下,创建message.go文件。

整理一个基本的 message 包,会包含消息 ID数据数据长度三个成员,提供基本的 setter 和 getter 方法,目的是为了以后做封装优化的作用。同时也提供了一个创建一个 message 包的初始化方法NewMegPackage

这里我们只需要要实现 Message 类,写出构造函数,实现接口中对应的方法,比较的简单,大家可以试试先自己尝试实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package znet

type Message struct {
Id uint32 // 消息的ID
DataLen uint32 // 消息的长度
Data []byte // 消息的内容
}

// 创建一个Message的消息包
func NewMsgPackage(id uint32, data []byte) *Message {
return &Message{
Id: id,
DataLen: uint32(len(data)),
Data: data,
}
}

// 获取消息数据段长度
func (msg *Message) GetDataLen() uint32 {
return msg.DataLen;
}
//获取消息ID
func (msg *Message) GetMsgId() uint32 {
return msg.Id
}
//获取消息内容
func (msg *Message) GetData() []byte {
return msg.Data
}
//设置消息数据段长度
func (msg *Message) SetDataLen(len uint32) {
msg.DataLen = len
}
//设计消息ID
func (msg *Message) SetMsgId(msgId uint32) {
msg.Id = msgId
}
//设计消息内容
func (msg *Message) SetData(data []byte) {
msg.Data = data
}

拆包与封包*

采用TCL(Type-Len-Value)封包格式解决TCP粘包问题

5.2 消息的封包与拆包  - 图1

创建拆包封包抽象类

zinx/ziface下,创建idatapack.go文件

我们需要三个方法:

  • 封包数据。
  • 拆包数据。
  • 得到头部长度。
1
2
3
4
5
6
7
8
9
10
package ziface
/*
封包数据和拆包数据
直接面向TCP连接中的数据流,为传输数据添加头部信息,用于处理TCP粘包问题。
*/
type IDataPack interface{
GetHeadLen() uint32 //获取包头长度方法
Pack(msg IMessage)([]byte, error) //封包方法
Unpack([]byte)(IMessage, error) //拆包方法
}

实现拆包封包类

zinx/znet/下,创建datapack.go文件.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package znet
import (
"bytes"
"encoding/binary"
"errors"
"zinx/utils"
"zinx/ziface"
)
//封包拆包类实例,暂时不需要成员
type DataPack struct {}
//封包拆包实例初始化方法
func NewDataPack() *DataPack {
return &DataPack{}
}
//获取包头长度方法
func(dp *DataPack) GetHeadLen() uint32 {
//Id uint32(4字节) + DataLen uint32(4字节)
return 8
}
//封包方法(压缩数据)
func(dp *DataPack) Pack(msg ziface.IMessage)([]byte, error) {
//创建一个存放bytes字节的缓冲
dataBuff := bytes.NewBuffer([]byte{})
//写dataLen
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil {
return nil, err
}
//写msgID
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {
return nil, err
}
//写data数据
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {
return nil ,err
}
return dataBuff.Bytes(), nil
}
//拆包方法(解压数据)
func(dp *DataPack) Unpack(binaryData []byte)(ziface.IMessage, error) {
//创建一个从输入二进制数据的ioReader
dataBuff := bytes.NewReader(binaryData)
//只解压head的信息,得到dataLen和msgID
msg := &Message{}
//读dataLen
if err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil {
return nil, err
}
//读msgID
if err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {
return nil, err
}
//判断dataLen的长度是否超出我们允许的最大包长度
if (utils.GlobalObject.MaxPacketSize > 0 && msg.DataLen > utils.GlobalObject.MaxPacketSize) {
return nil, errors.New("Too large msg data recieved")
}
//这里只需要把head的数据拆包出来就可以了,然后再通过head的长度,再从conn读取一次数据
return msg, nil
}

需要注意的是整理的Unpack方法,因为我们从上图可以知道,我们进行拆包的时候是分两次过程的,第二次是依赖第一次的 dataLen 结果,所以Unpack只能解压出包头 head 的内容,得到 msgId 和 dataLen。之后调用者再根据 dataLen 继续从 io 流中读取 body 中的数据。

测试拆包与封包类型

客户端与服务端的代码如下所示:

  • Server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package main
import (
"fmt"
"io"
"net"
"zinx/znet"
)
//只是负责测试datapack拆包,封包功能
func main() {
//创建socket TCP Server
listener, err := net.Listen("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("server listen err:", err)
return
}
//创建服务器gotoutine,负责从客户端goroutine读取粘包的数据,然后进行解析
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println("server accept err:", err)
}
//处理客户端请求
go func(conn net.Conn) {
//创建封包拆包对象dp
dp := znet.NewDataPack()
for {
//1 先读出流中的head部分
headData := make([]byte, dp.GetHeadLen())
_, err := io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止
if err != nil {
fmt.Println("read head error")
break
}
//将headData字节流 拆包到msg中
msgHead, err := dp.Unpack(headData)
if err != nil {
fmt.Println("server unpack err:", err)
return
}
if msgHead.GetDataLen() > 0 {
//msg 是有data数据的,需要再次读取data数据
//`*`是指针运算符 , 可以表示一个变量是**指针类型** , 也可以表示**一个指针变量所指向的存储单元** , 也就是这个地址所存储的值 .
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetDataLen())
//根据dataLen从io中读取字节流
_, err := io.ReadFull(conn, msg.Data)
if err != nil {
fmt.Println("server unpack data err:", err)
return
}
fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
}
}
}(conn)
}
}
  • Client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main
import (
"fmt"
"net"
"zinx/znet"
)
func main() {
//客户端goroutine,负责模拟粘包的数据,然后进行发送
conn, err := net.Dial("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("client dial err:", err)
return
}
//创建一个封包对象 dp
dp := znet.NewDataPack()
//封装一个msg1包
msg1 := &znet.Message{
Id: 0,
DataLen: 5,
Data: []byte{'h', 'e', 'l', 'l', 'o'},
}
sendData1, err := dp.Pack(msg1)
if err != nil {
fmt.Println("client pack msg1 err:", err)
return
}
msg2 := &znet.Message{
Id: 1,
DataLen: 7,
Data: []byte{'w', 'o', 'r', 'l', 'd', '!', '!'},
}
sendData2, err := dp.Pack(msg2)
if err != nil {
fmt.Println("client temp msg2 err:", err)
return
}
//将sendData1,和 sendData2 拼接一起,组成粘包
sendData1 = append(sendData1, sendData2...)
//向服务器端写数据
conn.Write(sendData1)
//客户端阻塞
select {}
}

Request字段修改

首先我们要将我们之前的 Request 中的[]byte类型的 data 字段改成 Message 类型.。并且我们需要把 irequest 的方法新增一个 GetMsgID。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package znet
import "zinx/ziface"
type Request struct {
conn ziface.IConnection //已经和客户端建立好的 链接
msg ziface.IMessage //客户端请求的数据
}
//获取请求连接信息
func(r *Request) GetConnection() ziface.IConnection {
return r.conn
}
//获取请求消息的数据
func(r *Request) GetData() []byte {
return r.msg.GetData()
}
//获取请求的消息的ID
func (r *Request) GetMsgID() uint32 {
return r.msg.GetMsgId()
}
package ziface
/*
IRequest 接口:
实际上是把客户端请求的链接信息 和 请求的数据 包装到了 Request里
*/
type IRequest interface{
GetConnection() IConnection //获取请求连接信息
GetData() []byte //获取请求消息的数据
GetMsgID() uint32 //hu获取消息的id
}

集成拆包过程

接下来我们需要在 Connection 的StartReader()方法中,修改之前的读取客户端的这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *Connection) StartReader() {
//...
for {
//读取我们最大的数据到buf中
buf := make([]byte, utils.GlobalObject.MaxPacketSize)
_, err := c.Conn.Read(buf)
if err != nil {
fmt.Println("recv buf err ", err)
c.ExitBuffChan <- true
continue
}
//...
}
}

将这个函数做出如下改造:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (c *Connection) StartReader() {
fmt.Println("Reader Goroutine is running")
defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
defer c.Stop()
for {
// 创建拆包解包的对象
dp := NewDataPack()
//读取客户端的Msg head
headData := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
fmt.Println("read msg head error ", err)
c.ExitBuffChan <- true
continue
}
//拆包,得到msgid 和 datalen 放在msg中
msg , err := dp.Unpack(headData)
if err != nil {
fmt.Println("unpack error ", err)
c.ExitBuffChan <- true
continue
}
//根据 dataLen 读取 data,放在msg.Data中
var data []byte
if msg.GetDataLen() > 0 {
data = make([]byte, msg.GetDataLen())
if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
fmt.Println("read msg data error ", err)
c.ExitBuffChan <- true
continue
}
}
msg.SetData(data)
//得到当前客户端请求的Request数据
req := Request{
conn:c,
msg:msg, //将之前的buf 改成 msg
}
//从路由Routers 中找到注册绑定Conn的对应Handle
go func (request ziface.IRequest) {
//执行注册的路由方法
c.Router.PreHandle(request)
c.Router.Handle(request)
c.Router.PostHandle(request)
}(&req)
}
}

提供封包方法

现在我们已经将拆包的功能集成到 Zinx 中了,但是使用 Zinx 的时候,如果我们希望给用户返回一个 TLV 格式的数据,总不能每次都经过这么繁琐的过程,所以我们应该给 Zinx 提供一个封包的接口,供 Zinx 发包使用。 我们在 iconnection.go 中新增 SendMsg()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package ziface
import "net"
//定义连接接口
type IConnection interface {
//启动连接,让当前连接开始工作
Start()
//停止连接,结束当前连接状态M
Stop()
//从当前连接获取原始的socket TCPConn
GetTCPConnection() *net.TCPConn
//获取当前连接ID
GetConnID() uint32
//获取远程客户端地址信息
RemoteAddr() net.Addr
//直接将Message数据发送数据给远程的TCP客户端
SendMsg(msgId uint32, data []byte) error
}

然后,我们到 connection.go 中实现这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//直接将Message数据发送数据给远程的TCP客户端
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed == true {
return errors.New("Connection closed when send msg")
}
//将data封包,并且发送
dp := NewDataPack()
msg, err := dp.Pack(NewMsgPackage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id = ", msgId)
return errors.New("Pack error msg ")
}
//写回客户端
if _, err := c.Conn.Write(msg); err != nil {
fmt.Println("Write msg id ", msgId, " error ")
c.ExitBuffChan <- true
return errors.New("conn Write error")
}
return nil
}

注意,做出修改后,我们需要在 connection.go 中将 io 和 errors 包引入进来。

zinx 0.5 测试

  • Server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main
import (
"fmt"
"zinx/ziface"
"zinx/znet"
)
//ping test 自定义路由
type PingRouter struct {
znet.BaseRouter
}
//Test Handle
func (this *PingRouter) Handle(request ziface.IRequest) {
fmt.Println("Call PingRouter Handle")
//先读取客户端的数据,再回写ping...ping...ping
fmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))
//回写数据
err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping"))
if err != nil {
fmt.Println(err)
}
}
func main() {
//创建一个server句柄
s := znet.NewServer()
//配置路由
s.AddRouter(&PingRouter{})
//开启服务
s.Serve()
}
  • Client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main
import (
"fmt"
"io"
"net"
"time"
"zinx/znet"
)
/*
模拟客户端
*/
func main() {
fmt.Println("Client Test ... start")
//3秒之后发起测试请求,给服务端开启服务的机会
time.Sleep(3 * time.Second)
conn,err := net.Dial("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("client start err, exit!")
return
}
for {
//发封包message消息
dp := znet.NewDataPack()
msg, _ := dp.Pack(znet.NewMsgPackage(0,[]byte("Zinx V0.5 Client Test Message")))
_, err := conn.Write(msg)
if err !=nil {
fmt.Println("write error err ", err)
return
}
//先读出流中的head部分
headData := make([]byte, dp.GetHeadLen())
_, err = io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止
if err != nil {
fmt.Println("read head error")
break
}
//将headData字节流 拆包到msg中
msgHead, err := dp.Unpack(headData)
if err != nil {
fmt.Println("server unpack err:", err)
return
}
if msgHead.GetDataLen() > 0 {
//msg 是有data数据的,需要再次读取data数据
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetDataLen())
//根据dataLen从io中读取字节流
_, err := io.ReadFull(conn, msg.Data)
if err != nil {
fmt.Println("server unpack data err:", err)
return
}
fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
}
time.Sleep(1*time.Second)
}
}

[v0.6] 多路由模式

功能

img

涉及知识点

  1. 多路由模式
  2. 单元测试

创建消息管理模块

  1. 创建消息管理模块抽象类

zinx/ziface下创建imsghandler.go文件, 定义出我们之前图片中的方法。

1
2
3
4
5
6
7
8
package ziface
/*
消息管理抽象层
*/
type IMsgHandle interface{
DoMsgHandler(request IRequest) //马上以非阻塞方式处理消息
AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑
}

这里面有两个方法,AddRouter()就是添加一个 msgId 和一个路由关系到 Apis 中,那么DoMsgHandler()则是调用 Router 中具体Handle()等方法的接口。

  1. 实现消息管理模块

zinx/znet下创建msghandler.go文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package znet
import (
"fmt"
"strconv"
"zinx/ziface"
)
type MsgHandle struct{
Apis map[uint32] ziface.IRouter //存放每个MsgId 所对应的处理方法的map属性
}
func NewMsgHandle() *MsgHandle {
return &MsgHandle {
Apis:make(map[uint32]ziface.IRouter),
}
}
//马上以非阻塞方式处理消息
func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {
handler, ok := mh.Apis[request.GetMsgID()]
if !ok {
fmt.Println("api msgId = ", request.GetMsgID(), " is not FOUND!")
return
}
//执行对应处理方法
handler.PreHandle(request)
handler.Handle(request)
handler.PostHandle(request)
}
//为消息添加具体的处理逻辑
func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {
//1 判断当前msg绑定的API处理方法是否已经存在
if _, ok := mh.Apis[msgId]; ok {
panic("repeated api , msgId = " + strconv.Itoa(int(msgId)))
}
//2 添加msg与api的绑定关系
mh.Apis[msgId] = router
fmt.Println("Add api msgId = ", msgId)
}
  1. zinx其他模块的相应修改

首先iserverAddRouter()的接口要稍微改一下,增添 MsgId 参数.

iserver.go:

1
2
3
4
5
6
7
8
9
10
11
12
package ziface
//定义服务器接口
type IServer interface{
//启动服务器方法
Start()
//停止服务器方法
Stop()
//开启业务服务方法
Serve()
//路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
AddRouter(msgId uint32, router IRouter)
}

其次,Server类中 之前有一个Router成员 ,代表唯一的处理方法,现在应该替换成MsgHandler成员。

zinx/znet/server.go

1
2
3
4
5
6
7
8
9
10
11
12
type Server struct {
//服务器的名称
Name string
//tcp4 or other
IPVersion string
//服务绑定的IP地址
IP string
//服务绑定的端口
Port int
//当前Server的消息管理模块,用来绑定MsgId和对应的处理方法
msgHandler ziface.IMsgHandle
}

初始化 Server 自然也要更正,增加 msgHandler 初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/*
创建一个服务器句柄
*/
func NewServer () ziface.IServer {
utils.GlobalObject.Reload()
s:= &Server {
Name :utils.GlobalObject.Name,
IPVersion:"tcp4",
IP:utils.GlobalObject.Host,
Port:utils.GlobalObject.TcpPort,
msgHandler: NewMsgHandle(), //msgHandler 初始化
}
return s
}

然后当 Server 在处理 conn 请求业务的时候,创建 conn 的时候也需要把 msgHandler 作为参数传递给 Connection 对象。也就是在我们 server.go 的 Start() 方法中的 3.3 注释下进行如下修改:

1
2
3
//...
dealConn := NewConntion(conn, cid, s.msgHandler)
//...

最后,我们的 AddRouter 方法做了修改,所以要重新实现接口方法:

1
2
3
4
//路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
func (s *Server)AddRouter(msgId uint32, router ziface.IRouter) {
s.msgHandler.AddRouter(msgId,router)
}

那么接下来就是 Connection 对象了。固然在 Connection 对象中应该有 MsgHandler 的成员,来查找消息对应的回调路由方法。

zinx/znet/connection.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type Connection struct {
//当前连接的socket TCP套接字
Conn *net.TCPConn
//当前连接的ID 也可以称作为SessionID,ID全局唯一
ConnID uint32
//当前连接的关闭状态
isClosed bool
//消息管理MsgId和对应处理方法的消息管理模块
MsgHandler ziface.IMsgHandle
//告知该链接已经退出/停止的channel
ExitBuffChan chan bool
}
//创建连接的方法
func NewConntion(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection{
c := &Connection{
Conn: conn,
ConnID: connID,
isClosed: false,
MsgHandler: msgHandler,
ExitBuffChan: make(chan bool, 1),
}
return c
}

最后,在 conn 已经拆包之后,需要调用路由业务的时候,我们只需要让 conn 调用 MsgHandler 中的DoMsgHander()方法就好了。

zinx/znet/connection.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (c *Connection) StartReader() {
fmt.Println("[Reader Goroutine is running]")
defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]")
defer c.Stop()
for {
// 创建拆包解包的对象
dp := NewDataPack()
//读取客户端的Msg head
headData := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
fmt.Println("read msg head error ", err)
break
}
//拆包,得到msgid 和 datalen 放在msg中
msg , err := dp.Unpack(headData)
if err != nil {
fmt.Println("unpack error ", err)
break
}
//根据 dataLen 读取 data,放在msg.Data中
var data []byte
if msg.GetDataLen() > 0 {
data = make([]byte, msg.GetDataLen())
if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
fmt.Println("read msg data error ", err)
continue
}
}
msg.SetData(data)
//得到当前客户端请求的Request数据
req := Request{
conn:c,
msg:msg,
}
//从绑定好的消息和对应的处理方法中执行对应的Handle方法
go c.MsgHandler.DoMsgHandler(&req)
}
}

好了,大功告成,我们来测试一下 Zinx 的多路由设置功能吧。

zinx测试

这里我们既然完成了多路由模式,那么就可以进行一个服务端,多个客户端的方式进行测试我们的功能模块了。

我们这里在 zinx 文件夹下新建 Client01.go 文件。

我们在 Server 端设置 2 个路由,一个是 MsgId 为 0 的消息会执行 PingRouter{}重写的Handle()方法,一个是 MsgId 为 1 的消息会执行 HelloZinxRouter{}重写的Handle()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main
import (
"fmt"
"zinx/ziface"
"zinx/znet"
)
//ping test 自定义路由
type PingRouter struct {
znet.BaseRouter
}
//Ping Handle
func (this *PingRouter) Handle(request ziface.IRequest) {
fmt.Println("Call PingRouter Handle")
//先读取客户端的数据,再回写ping...ping...ping
fmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))
err := request.GetConnection().SendMsg(0, []byte("ping...ping...ping"))
if err != nil {
fmt.Println(err)
}
}
//HelloZinxRouter Handle
type HelloZinxRouter struct {
znet.BaseRouter
}
func (this *HelloZinxRouter) Handle(request ziface.IRequest) {
fmt.Println("Call HelloZinxRouter Handle")
//先读取客户端的数据,再回写ping...ping...ping
fmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))
err := request.GetConnection().SendMsg(1, []byte("Hello Zinx Router V0.6"))
if err != nil {
fmt.Println(err)
}
}
func main() {
//创建一个server句柄
s := znet.NewServer()
//配置路由
s.AddRouter(0, &PingRouter{})
s.AddRouter(1, &HelloZinxRouter{})
//开启服务
s.Serve()
}

我们现在写两个客户端,分别发送 0 消息和 1 消息来进行测试 Zinx 是否能够处理 2 个不同的消息业务。

Client.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main
import (
"fmt"
"io"
"net"
"time"
"zinx/znet"
)
/*
模拟客户端
*/
func main() {
fmt.Println("Client Test ... start")
//3秒之后发起测试请求,给服务端开启服务的机会
time.Sleep(3 * time.Second)
conn,err := net.Dial("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("client start err, exit!")
return
}
for {
//发封包message消息
dp := znet.NewDataPack()
msg, _ := dp.Pack(znet.NewMsgPackage(0,[]byte("Zinx V0.6 Client0 Test Message")))
_, err := conn.Write(msg)
if err !=nil {
fmt.Println("write error err ", err)
return
}
//先读出流中的head部分
headData := make([]byte, dp.GetHeadLen())
_, err = io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止
if err != nil {
fmt.Println("read head error")
break
}
//将headData字节流 拆包到msg中
msgHead, err := dp.Unpack(headData)
if err != nil {
fmt.Println("server unpack err:", err)
return
}
if msgHead.GetDataLen() > 0 {
//msg 是有data数据的,需要再次读取data数据
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetDataLen())
//根据dataLen从io中读取字节流
_, err := io.ReadFull(conn, msg.Data)
if err != nil {
fmt.Println("server unpack data err:", err)
return
}
fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
}
time.Sleep(1*time.Second)
}
}

Client01.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main
import (
"fmt"
"io"
"net"
"time"
"zinx/znet"
)
/*
模拟客户端
*/
func main() {
fmt.Println("Client Test ... start")
//3秒之后发起测试请求,给服务端开启服务的机会
time.Sleep(3 * time.Second)
conn,err := net.Dial("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("client start err, exit!")
return
}
for {
//发封包message消息
dp := znet.NewDataPack()
msg, _ := dp.Pack(znet.NewMsgPackage(1,[]byte("Zinx V0.6 Client1 Test Message")))
_, err := conn.Write(msg)
if err !=nil {
fmt.Println("write error err ", err)
return
}
//先读出流中的head部分
headData := make([]byte, dp.GetHeadLen())
_, err = io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止
if err != nil {
fmt.Println("read head error")
break
}
//将headData字节流 拆包到msg中
msgHead, err := dp.Unpack(headData)
if err != nil {
fmt.Println("server unpack err:", err)
return
}
if msgHead.GetDataLen() > 0 {
//msg 是有data数据的,需要再次读取data数据
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetDataLen())
//根据dataLen从io中读取字节流
_, err := io.ReadFull(conn, msg.Data)
if err != nil {
fmt.Println("server unpack data err:", err)
return
}
fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
}
time.Sleep(1*time.Second)
}
}

然后我们点击命令行右上角的分隔按钮,启动三个命令行窗口。值得注意的是,每启动一个窗口,都需要在里面先执行 export GOPATH=/home/project 这道命令。

测试结果:

img

[v0.7] 读写分离

功能

img

接下来我们就要对 Zinx 做一个小小的改变,就是与客户端进修数据交互的 Gouroutine 由一个变成两个,一个专门负责从客户端读取数据,一个专门负责向客户端写数据。这么设计有什么好处,当然是目的就是高内聚,模块的功能单一,对于我们今后扩展功能更加方便。

知识点

  1. Golang并发模型
  2. 读写分离

准备工作

七、Zinx的读写分离模型  - 图1

Server 依然是处理客户端的响应,主要关键的几个方法是 Listen、Accept 等。当建立与客户端的套接字后,那么就会开启两个 Goroutine 分别处理读数据业务和写数据业务,读写数据之间的消息通过一个 Channel 传递。下面我们就开始进行实际的实现。

1. 添加读写模块交互数据的管道

  1. connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type Connection struct {
//当前连接的socket TCP套接字
Conn *net.TCPConn
//当前连接的ID 也可以称作为SessionID,ID全局唯一
ConnID uint32
//当前连接的关闭状态
isClosed bool
//消息管理MsgId和对应处理方法的消息管理模块
MsgHandler ziface.IMsgHandle
//告知该链接已经退出/停止的channel
ExitBuffChan chan bool
//无缓冲管道,用于读、写两个goroutine之间的消息通信
msgChan chan []byte
}
//创建连接的方法
func NewConntion(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection{
c := &Connection{
Conn: conn,
ConnID: connID,
isClosed: false,
MsgHandler: msgHandler,
ExitBuffChan: make(chan bool, 1),
msgChan:make(chan []byte), //msgChan初始化
}
return c
}

2. 创建 Writer Goroutine

zinx/znet/connection.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
写消息Goroutine, 用户将数据发送给客户端
*/
func (c *Connection) StartWriter() {
fmt.Println("[Writer Goroutine is running]")
defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")
for {
select {
case data := <-c.msgChan:
//有数据要写给客户端
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send Data error:, ", err, " Conn Writer exit")
return
}
case <- c.ExitBuffChan:
//conn已经关闭
return
}
}
}

关于 for select 和 channel 的用法:

select 语句只能与通道联用,它一般由若干个分支组成。每次执行这种语句的时候,一般只有一个分支中的代码会被运行。select 语句的分支分为两种,一种叫做候选分支,另一种叫做默认分支。候选分支总是以关键字 case 开头,后跟一个 case 表达式和一个冒号,然后我们可以从下一行开始写入当分支被选中时需要执行的语句。

由于 select 语句是专为通道而设计的,所以每个 case 表达式中都只能包含操作通道的表达式,比如接收表达式。使用一个接收值可以接收通道里的值,使用两个接收值可以判断通道是否已经关闭了。

对于 select 语句的执行规则如下:

  • 每个 case 都必须是一个通信。
  • 所有 Channel 表达式都会被求值。
  • 所有被发送的表达式都会被求值。
  • 如果任意某个通信可以进行,它就执行,其他被忽略。
  • 如果有多个 case 都可以运行,Select 会随机公平地选出一个执行。其他不会执行。 否则:
  • 如果有 default 子句,则执行该语句。
  • 如果没有 default 子句,select 将阻塞,直到某个通信可以运行;Go 不会重新对 Channel 或值进行求值。

注意这里是和 switch 的操作是不一样的,switch 操作中,只要从上到下有一个满足条件了,就会执行相应的那一个 case,select 中,我们是全部计算一遍,然后再从可满足条件的 case 中公平的执行其中一个。这是为了防止有些通道长期得不到执行。

3. Reader 将发送客户端的数据改为发送至Channel

修改 Reader 调用的SendMsg()方法

zinx/znet/connection.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//直接将Message数据发送数据给远程的TCP客户端
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed == true {
return errors.New("Connection closed when send msg")
}
//将data封包,并且发送
dp := NewDataPack()
msg, err := dp.Pack(NewMsgPackage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id = ", msgId)
return errors.New("Pack error msg ")
}
//写回客户端之前是写到客户端,现在是写到channel就可以
c.msgChan <- msg //将之前直接回写给conn.Write的方法 改为 发送给Channel 供Writer读取
return nil
}

修改Start()方法

zinx/znet/connection.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//启动连接,让当前连接开始工作
func (c *Connection) Start() {
//1 开启用户从客户端读取数据流程的Goroutine
go c.StartReader()
//2 开启用于写回客户端数据流程的Goroutine
go c.StartWriter()
for {
select {
case <- c.ExitBuffChan:
//得到退出消息,不再阻塞
return
}
}
}

Zinx测试

0.7版本的测试与0.6一致,因为只修改了内部消息发送的机制,对外的消息接口并没有发生变化

[v0.8] 实现工作池

功能

给Zinx添加消息队列和多任务Worker机制。

img

知识点

  1. 消息队列
  2. 工作池

这一步我们要实现的是,可以通过 worker 的数量来限定处理业务的固定goroutine数量,而不是无限制的开辟Goroutine,随谈我们知道go的调度算法已经做的很极致了,但是大数量的Goroutine依然会带来一些不必要的环境切换成本,这些本应该是服务器应该节省掉的成本。我们可以用消息队列来缓冲worker工作的数据。

设计结构如下:

img

步骤

1. 创建消息队列

首先,处理消息队列的部分,我们应该集成到MsgHandler模块下,因为属于我们消息模块范畴内的。

zinx/znet/msghandler.go

1
2
3
4
5
6
7
8
9
10
11
12
13
type MsgHandle struct {
Apis map[uint32]ziface.IRouter //存放每个MsgId 所对应的处理方法的map属性
WorkerPoolSize uint32 //业务工作Worker池的数量
TaskQueue []chan ziface.IRequest //Worker负责取任务的消息队列
}
func NewMsgHandle() *MsgHandle {
return &MsgHandle{
Apis: make(map[uint32]ziface.IRouter),
WorkerPoolSize:utils.GlobalObject.WorkerPoolSize,
//一个worker对应一个queue
TaskQueue:make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize),
}
}

这里添加两个成员:

  • WokerPoolSize:作为工作池的数量,因为 TaskQueue 中的每个队列应该是和一个 Worker 对应的,所以我们在创建 TaskQueue 中队列数量要和 Worker 的数量一致。
  • TaskQueue真是一个 Request 请求信息的 channel 集合。用来缓冲提供 worker 调用的 Request 请求信息,worker 会从对应的队列中获取客户端的请求数据并且处理掉。

当然WorkerPoolSize最好也可以从GlobalObject获取,并且zinx.json配置文件可以手动配置。

zinx/utils/globalobj.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/*
存储一切有关Zinx框架的全局参数,供其他模块使用
一些参数也可以通过 用户根据 zinx.json来配置
*/
type GlobalObj struct {
/*
Server
*/
TcpServer ziface.IServer //当前Zinx的全局Server对象
Host string //当前服务器主机IP
TcpPort int //当前服务器主机监听端口号
Name string //当前服务器名称
/*
Zinx
*/
Version string //当前Zinx版本号
MaxPacketSize uint32 //都需数据包的最大值
MaxConn int //当前服务器主机允许的最大链接个数
WorkerPoolSize uint32 //业务工作Worker池的数量
MaxWorkerTaskLen uint32 //业务工作Worker对应负责的任务队列最大任务存储数量
/*
config file path
*/
ConfFilePath string
}
//...
//...
/*
提供init方法,默认加载
*/
func init() {
//初始化GlobalObject变量,设置一些默认值
GlobalObject = &GlobalObj{
Name: "ZinxServerApp",
Version: "V0.4",
TcpPort: 7777,
Host: "0.0.0.0",
MaxConn: 12000,
MaxPacketSize: 4096,
ConfFilePath: "conf/zinx.json",
WorkerPoolSize: 10,
MaxWorkerTaskLen: 1024,
}
//从配置文件中加载一些用户配置的参数
GlobalObject.Reload()
}

2. 创建及启动Worker工作池

现在添加 Worker 工作池,先定义一些启动工作池的接口。

zinx/ziface/imsghandler.go

1
2
3
4
5
6
7
8
9
/*
消息管理抽象层
*/
type IMsgHandle interface{
DoMsgHandler(request IRequest) //马上以非阻塞方式处理消息
AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑
StartWorkerPool() //启动worker工作池
SendMsgToTaskQueue(request IRequest) //将消息交给TaskQueue,由worker进行处理
}

zinx/znet/msghandler.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 注意,头文件中要引入 zinx/utils
//启动一个Worker工作流程
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
fmt.Println("Worker ID = ", workerID, " is started.")
//不断的等待队列中的消息
for {
select {
//有消息则取出队列的Request,并执行绑定的业务方法
case request := <-taskQueue:
mh.DoMsgHandler(request)
}
}
}
//启动worker工作池
func (mh *MsgHandle) StartWorkerPool() {
//遍历需要启动worker的数量,依此启动
for i:= 0; i < int(mh.WorkerPoolSize); i++ {
//一个worker被启动
//给当前worker对应的任务队列开辟空间
mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)
//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
go mh.StartOneWorker(i, mh.TaskQueue[i])
}
}

StartWorkerPool()方法是启动 Worker 工作池,这里根据用户配置好的WorkerPoolSize的数量来启动,然后分别给每个 Worker 分配一个TaskQueue,然后用一个 goroutine 来承载一个 Worker 的工作业务。

StartOneWorker()方法就是一个 Worker 的工作业务,每个 worker 是不会退出的(目前没有设定 worker 的停止工作机制),会永久的从对应的 TaskQueue 中等待消息,并处理。

3. 发送消息给消息队列

现在,worker 工作池已经准备就绪了,那么就需要有一个给到 worker 工作池消息的入口,我们再定义一个方法

zinx/znet/msghandler.go

1
2
3
4
5
6
7
8
9
10
//将消息交给TaskQueue,由worker进行处理
func (mh *MsgHandle)SendMsgToTaskQueue(request ziface.IRequest) {
//根据ConnID来分配当前的连接应该由哪个worker负责处理
//轮询的平均分配法则
//得到需要处理此条连接的workerID
workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize
fmt.Println("Add ConnID=", request.GetConnection().GetConnID()," request msgID=", request.GetMsgID(), "to workerID=", workerID)
//将请求消息发送给任务队列
mh.TaskQueue[workerID] <- request
}

SendMsgToTaskQueue()作为工作池的数据入口,这里面采用的是轮询的分配机制,因为不同链接信息都会调用这个入口,那么到底应该由哪个 worker 处理该链接的请求处理,整理用的是一个简单的求模运算。用余数和 workerID 的匹配来进行分配。

最终将 request 请求数据发送给对应 worker 的 TaskQueue,那么对应的 worker 的 Goroutine 就会处理该链接请求了。

4. 工作池代码调用

好了,现在需要将消息队列和多任务 worker 机制集成到我们 Zinx 的中了。我们在 Server 的Start()方法中,在服务端 Accept 之前,启动 Worker 工作池。

zinx/znet/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//开启网络服务,只需要修改这里所提到的部分,对于打了 //... 的部分的意思是原来的代码不需要做修改
func (s *Server) Start() {
//...
//开启一个go去做服务端Linster业务
go func() {
//0 启动worker工作池机制
s.msgHandler.StartWorkerPool()
//1 获取一个TCP的Addr
addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
if err != nil {
fmt.Println("resolve tcp addr err: ", err)
return
}
//...
//...
}
}()
}

其次,当我们已经得到客户端的连接请求过来数据的时候,我们应该将数据发送给 Worker 工作池进行处理。

所以应该在 Connection 的StartReader()方法中修改:

zinx/znet/connection.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 注意,头文件中要引入 zinx/utils
/*
读消息Goroutine,用于从客户端中读取数据
*/
func (c *Connection) StartReader() {
fmt.Println("Reader Goroutine is running")
defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
defer c.Stop()
for {
//...
req := Request{
conn:c,
msg:msg,
}
if utils.GlobalObject.WorkerPoolSize > 0 {
//已经启动工作池机制,将消息交给Worker处理
c.MsgHandler.SendMsgToTaskQueue(&req)
} else {
//从绑定好的消息和对应的处理方法中执行对应的Handle方法
go c.MsgHandler.DoMsgHandler(&req)
}
}
}

这里并没有强制使用多任务 Worker 机制,而是判断用户配置WorkerPoolSize的个数,如果大于 0,那么我就启动多任务机制处理链接请求消息,如果=0 或者<0 那么,我们依然只是之前的开启一个临时的 Goroutine 处理客户端请求消息。

[v0.9] 实现链接控制

功能

img

知识点

  1. 链接管理
  2. 数量限制

步骤