Home

Awesome

基于websocket单台机器支持百万连接分布式聊天(IM)系统

PkgGoDev Release Go Report Card OpenIssue ClosedIssue Stars Forks Stargazers over time

本文将介绍如何实现一个基于websocket分布式聊天(IM)系统。

使用golang实现websocket通讯,单机可以支持百万连接,使用gin框架、nginx负载、可以水平部署、程序内部相互通讯、使用grpc通讯协议。

本文内容比较长,如果直接想clone项目体验直接进入项目体验 goWebSocket项目下载 ,文本从介绍webSocket是什么开始,然后开始介绍这个项目,以及在Nginx中配置域名做webSocket的转发,然后介绍如何搭建一个分布式系统。

目录

1、项目说明

1.1 goWebSocket

本文将介绍如何实现一个基于websocket聊天(IM)分布式系统。

使用golang实现websocket通讯,单机支持百万连接,使用gin框架、nginx负载、可以水平部署、程序内部相互通讯、使用grpc通讯协议。

1.2 项目体验

2、介绍webSocket

2.1 webSocket 是什么

WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。

它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。

2.2 webSocket的兼容性

浏览器开始支持webSocket的版本

golang、java、php、node.js、python、nginx 都有不错的支持

Android可以使用java-webSocket对webSocket支持

iOS 4.2及更高版本具有WebSockets支持

2.3 为什么要用webSocket

目前大多数的请求都是使用HTTP,都是由客户端发起一个请求,有服务端处理,然后返回结果,不可以服务端主动向某一个客户端主动发送数据

服务端处理一个请求

2.4 webSocket建立过程

客户端发起升级协议的请求,采用标准的HTTP报文格式,在报文中添加头部信息

Connection: Upgrade表明连接需要升级

Upgrade: websocket需要升级到 websocket协议

Sec-WebSocket-Version: 13 协议的版本为13

Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA== 这个是base64 encode 的值,是浏览器随机生成的,与服务器响应的 Sec-WebSocket-Accept对应

# Request Headers
Connection: Upgrade
Host: im.91vh.com
Origin: http://im.91vh.com
Pragma: no-cache
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA==
Sec-WebSocket-Version: 13
Upgrade: websocket

浏览器 Network

服务端接收到升级协议的请求,如果服务端支持升级协议会做如下响应

返回:

Status Code: 101 Switching Protocols 表示支持切换协议

# Response Headers
Connection: upgrade
Date: Fri, 09 Aug 2019 07:36:59 GMT
Sec-WebSocket-Accept: mB5emvxi2jwTUhDdlRtADuBax9E=
Server: nginx/1.12.1
Upgrade: websocket

websocket接收和发送数据

3、如何实现基于webSocket的长连接系统

3.1 使用go实现webSocket服务端

3.1.1 启动端口监听

go websocket.StartWebSocket()
// 启动程序
func StartWebSocket() {
	http.HandleFunc("/acc", wsPage)
	http.ListenAndServe(":8089", nil)
}

3.1.2 升级协议

func wsPage(w http.ResponseWriter, req *http.Request) {

	// 升级协议
	conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
		fmt.Println("升级协议", "ua:", r.Header["User-Agent"], "referer:", r.Header["Referer"])

		return true
	}}).Upgrade(w, req, nil)
	if err != nil {
		http.NotFound(w, req)

		return
	}

	fmt.Println("webSocket 建立连接:", conn.RemoteAddr().String())

	currentTime := uint64(time.Now().Unix())
	client := NewClient(conn.RemoteAddr().String(), conn, currentTime)

	go client.read()
	go client.write()

	// 用户连接事件
	clientManager.Register <- client
}

3.1.3 客户端连接的管理

// 连接管理
type ClientManager struct {
	Clients     map[*Client]bool   // 全部的连接
	ClientsLock sync.RWMutex       // 读写锁
	Users       map[string]*Client // 登录的用户 // appID+uuid
	UserLock    sync.RWMutex       // 读写锁
	Register    chan *Client       // 连接连接处理
	Login       chan *login        // 用户登录处理
	Unregister  chan *Client       // 断开连接处理程序
	Broadcast   chan []byte        // 广播 向全部成员发送数据
}

// 初始化
func NewClientManager() (clientManager *ClientManager) {
	clientManager = &ClientManager{
		Clients:    make(map[*Client]bool),
		Users:      make(map[string]*Client),
		Register:   make(chan *Client, 1000),
		Login:      make(chan *login, 1000),
		Unregister: make(chan *Client, 1000),
		Broadcast:  make(chan []byte, 1000),
	}

	return
}

3.1.4 注册客户端的socket的写的异步处理程序

// 向客户端写数据
func (c *Client) write() {
	defer func() {
		if r := recover(); r != nil {
			fmt.Println("write stop", string(debug.Stack()), r)

		}
	}()

	defer func() {
		clientManager.Unregister <- c
		c.Socket.Close()
		fmt.Println("Client发送数据 defer", c)
	}()

	for {
		select {
		case message, ok := <-c.Send:
			if !ok {
				// 发送数据错误 关闭连接
				fmt.Println("Client发送数据 关闭连接", c.Addr, "ok", ok)

				return
			}

			c.Socket.WriteMessage(websocket.TextMessage, message)
		}
	}
}

3.1.5 注册客户端的socket的读的异步处理程序

// 读取客户端数据
func (c *Client) read() {
	defer func() {
		if r := recover(); r != nil {
			fmt.Println("write stop", string(debug.Stack()), r)
		}
	}()

	defer func() {
		fmt.Println("读取客户端数据 关闭send", c)
		close(c.Send)
	}()

	for {
		_, message, err := c.Socket.ReadMessage()
		if err != nil {
			fmt.Println("读取客户端数据 错误", c.Addr, err)

			return
		}

		// 处理程序
		fmt.Println("读取客户端数据 处理:", string(message))
		ProcessData(c, message)
	}
}

3.1.6 接收客户端数据并处理

{"seq":"1565336219141-266129","cmd":"login","data":{"userID":"马远","appID":101}}
{"seq":"1565336219141-266129","cmd":"login","response":{"code":200,"codeMsg":"Success","data":null}}
/************************  请求数据  **************************/
// 通用请求数据格式
type Request struct {
	Seq  string      `json:"seq"`            // 消息的唯一ID
	Cmd  string      `json:"cmd"`            // 请求命令字
	Data interface{} `json:"data,omitempty"` // 数据 json
}

// 登录请求数据
type Login struct {
	ServiceToken string `json:"serviceToken"` // 验证用户是否登录
	AppID        uint32 `json:"appID,omitempty"`
	UserID       string `json:"userID,omitempty"`
}

// 心跳请求数据
type HeartBeat struct {
	UserID string `json:"userID,omitempty"`
}
/************************  响应数据  **************************/
type Head struct {
	Seq      string    `json:"seq"`      // 消息的ID
	Cmd      string    `json:"cmd"`      // 消息的cmd 动作
	Response *Response `json:"response"` // 消息体
}

type Response struct {
	Code    uint32      `json:"code"`
	CodeMsg string      `json:"codeMsg"`
	Data    interface{} `json:"data"` // 数据 json
}

3.1.7 使用路由的方式处理客户端的请求数据

// Websocket 路由
func WebsocketInit() {
	websocket.Register("login", websocket.LoginController)
	websocket.Register("heartbeat", websocket.HeartbeatController)
}

3.1.8 防止内存溢出和Goroutine不回收

client_manager.go

// 定时清理超时连接
func ClearTimeoutConnections() {
    currentTime := uint64(time.Now().Unix())

    for client := range clientManager.Clients {
        if client.IsHeartbeatTimeout(currentTime) {
            fmt.Println("心跳时间超时 关闭连接", client.Addr, client.UserID, client.LoginTime, client.HeartbeatTime)

            client.Socket.Close()
        }
    }
}

3.2 使用javaScript实现webSocket客户端

3.2.1 启动并注册监听程序

ws = new WebSocket("ws://127.0.0.1:8089/acc");

 
ws.onopen = function(evt) {
  console.log("Connection open ...");
};
 
ws.onmessage = function(evt) {
  console.log( "Received Message: " + evt.data);
  data_array = JSON.parse(evt.data);
  console.log( data_array);
};
 
ws.onclose = function(evt) {
  console.log("Connection closed.");
};

3.2.2 发送数据

登录:
ws.send('{"seq":"2323","cmd":"login","data":{"userID":"11","appID":101}}');

心跳:
ws.send('{"seq":"2324","cmd":"heartbeat","data":{}}');

ping 查看服务是否正常:
ws.send('{"seq":"2325","cmd":"ping","data":{}}');

关闭连接:
ws.close();

3.3 发送消息

3.3.1 文本消息

客户端只要知道发送用户是谁,还有内容就可以显示文本消息,这里我们重点关注一下数据部分

target:定义接收的目标,目前未设置

type:消息的类型,text 文本消息 img 图片消息

msg:文本消息内容

from:消息的发送者

文本消息的结构:

{
  "seq": "1569080188418-747717",
  "cmd": "msg",
  "response": {
    "code": 200,
    "codeMsg": "Ok",
    "data": {
      "target": "",
      "type": "text",
      "msg": "hello",
      "from": "马超"
    }
  }
}

这样一个文本消息的结构就设计完成了,客户端在接收到消息内容就可以展现到 IM 界面上

3.3.2 图片和语言消息

发送图片消息,发送消息者的客户端需要先把图片上传到文件服务器,上传成功以后获得图片访问的 URL,然后由发送消息者的客户端需要将图片 URL 发送到 gowebsocket,gowebsocket 图片的消息格式发送给目标客户端,消息接收者客户端接收到图片的 URL 就可以显示图片消息。

图片消息的结构:

{
  "type": "img",
  "from": "马超",
  "url": "http://91vh.com/images/home_logo.png",
  "secret": "消息鉴权 secret",
  "size": {
    "width": 480,
    "height": 720
  }
}

语言消息、和视频消息和图片消息类似,都是先把文件上传服务器,然后通过 gowebsocket 传递文件的 URL,需要注意的是部分消息涉及到隐私的文件,文件访问的时候需要做好鉴权信息,不能让非接收用户也能查看到别人的消息内容。

4、goWebSocket 项目

4.1 项目说明

4.2 项目依赖

# 主要使用到的包
github.com/gin-gonic/gin@v1.4.0
github.com/redis/go-redis/v9
github.com/gorilla/websocket
github.com/spf13/viper
google.golang.org/grpc
github.com/golang/protobuf

4.3 项目启动

git clone git@github.com:link1st/gowebsocket.git
# 或
git clone https://github.com/link1st/gowebsocket.git
cd gowebsocket
cd config
mv app.yaml.example app.yaml
# 修改项目监听端口,redis连接等(默认127.0.0.1:3306)
vim app.yaml
# 返回项目目录,为以后启动做准备
cd ..
app:
  logFile: log/gin.log # 日志文件位置
  httpPort: 8080 # http端口
  webSocketPort: 8089 # webSocket端口
  rpcPort: 9001 # 分布式部署程序内部通讯端口
  httpUrl: 127.0.0.1:8080
  webSocketUrl:  127.0.0.1:8089


redis:
  addr: "localhost:6379"
  password: ""
  DB: 0
  poolSize: 30
  minIDleConns: 30
go run main.go

4.4 接口文档

4.4.1.1 接口说明
4.4.1 HTTP接口文档

线上:http://im.91vh.com

测试:http://im.91vh.com

4.4.1.2 聊天页面
参数必填类型说明示例
appIDuint32appID/房间ID101
4.4.1.3 获取房间用户列表
参数必填类型说明示例
appIDuint32appID/房间ID101
参数必填类型说明示例
codeint错误码200
msgstring错误信息Success
dataarray返回数据
userCountint房间内用户总数1
userListlist用户列表
{
    "code": 200,
    "msg": "Success",
    "data": {
        "userCount": 1,
        "userList": [
            "黄帝"
        ]
    }
}
4.4.1.4 查询用户是否在线
参数必填类型说明示例
appIDuint32appID/房间ID101
userIDstring用户ID黄帝
参数必填类型说明示例
codeint错误码200
msgstring错误信息Success
dataarray返回数据
onlinebool发送结果 true:在线 false:不在线true
userIDstring用户ID黄帝
{
    "code": 200,
    "msg": "Success",
    "data": {
        "online": true,
        "userID": "黄帝"
    }
}
4.4.1.5 给用户发送消息
参数必填类型说明示例
appIDuint32appID/房间ID101
userIDstring用户id黄帝
msgIDstring消息ID避免重复发送
messagestring消息内容hello
参数必填类型说明示例
codeint错误码200
msgstring错误信息Success
dataarray返回数据
sendResultsbool发送结果 true:成功 false:失败true
{
    "code": 200,
    "msg": "Success",
    "data": {
        "sendResults": true
    }
}
4.4.1.6 给全员用户发送消息
参数必填类型说明示例
appIDuint32appID/房间ID101
userIDstring用户id黄帝
msgIDstring消息ID避免重复发送
messagestring消息内容hello
参数必填类型说明示例
codeint错误码200
msgstring错误信息Success
dataarray返回数据
sendResultsbool发送结果 true:成功 false:失败true
{
    "code": 200,
    "msg": "Success",
    "data": {
        "sendResults": true
    }
}
4.4.2 RPC接口文档
4.4.2.1 接口说明
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.protobuf";
option java_outer_classname = "ProtobufProto";


package protobuf;

// The AccServer service definition.
service AccServer {
    // 查询用户是否在线
    rpc QueryUsersOnline (QueryUsersOnlineReq) returns (QueryUsersOnlineRsp) {
    }
    // 发送消息
    rpc SendMsg (SendMsgReq) returns (SendMsgRsp) {
    }
    // 给这台机器的房间内所有用户发送消息
    rpc SendMsgAll (SendMsgAllReq) returns (SendMsgAllRsp) {
    }
    // 获取用户列表
    rpc GetUserList (GetUserListReq) returns (GetUserListRsp) {
    }
}

// 查询用户是否在线
message QueryUsersOnlineReq {
    uint32 appID = 1; // AppID
    string userID = 2; // 用户ID
}

message QueryUsersOnlineRsp {
    uint32 retCode = 1;
    string errMsg = 2;
    bool online = 3;
}

// 发送消息
message SendMsgReq {
    string seq = 1; // 序列号
    uint32 appID = 2; // appID/房间ID
    string userID = 3; // 用户ID
    string cms = 4; // cms 动作: msg/enter/exit
    string type = 5; // type 消息类型,默认是 text
    string msg = 6; // msg
    bool isLocal = 7; // 是否查询本机 acc内部调用为:true(本机查询不到即结束)
}

message SendMsgRsp {
    uint32 retCode = 1;
    string errMsg = 2;
    string sendMsgID = 3;
}

// 给这台机器的房间内所有用户发送消息
message SendMsgAllReq {
    string seq = 1; // 序列号
    uint32 appID = 2; // appID/房间ID
    string userID = 3; // 不发送的用户ID
    string cms = 4; // cms 动作: msg/enter/exit
    string type = 5; // type 消息类型,默认是 text
    string msg = 6; // msg
}

message SendMsgAllRsp {
    uint32 retCode = 1;
    string errMsg = 2;
    string sendMsgID = 3;
}

// 获取用户列表
message GetUserListReq {
    uint32 appID = 1;
}

message GetUserListRsp {
    uint32 retCode = 1;
    string errMsg = 2;
    repeated string userID = 3;
}
4.4.2.2 查询用户是否在线
4.4.2.3 发送消息
4.4.2.4 给指定房间所有用户发送消息
4.4.2.5 获取房间内全部用户

5、webSocket项目Nginx配置

5.1 为什么要配置Nginx

5.2 nginx配置

upstream  go-im
{
    server 127.0.0.1:8080 weight=1 max_fails=2 fail_timeout=10s;
    keepalive 16;
}

upstream  go-acc
{
    server 127.0.0.1:8089 weight=1 max_fails=2 fail_timeout=10s;
    keepalive 16;
}


server {
    listen       80 ;
    server_name  im.91vh.com;
    index index.html index.htm ;


    location /acc {
        proxy_set_header Host $host;
        proxy_pass http://go-acc;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
        proxy_set_header Connection "";
        proxy_redirect off;
        proxy_intercept_errors on;
        client_max_body_size 10m;
    }

    location /
    {
        proxy_set_header Host $host;
        proxy_pass http://go-im;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
        proxy_redirect off;
        proxy_intercept_errors on;
        client_max_body_size 30m;
    }

    access_log  /link/log/nginx/access/im.log;
    error_log   /link/log/nginx/access/im.error.log;
}

5.3 问题处理

/link/server/tengine/sbin/nginx -t

nginx: [emerg] unknown "connection_upgrade" variable
configuration file /link/server/tengine/conf/nginx.conf test failed
http{
	fastcgi_temp_file_write_size 128k;
..... # 需要添加的内容

    #support websocket
    map $http_upgrade $connection_upgrade {
        default upgrade;
        ''      close;
    }

.....
    gzip on;
    
}

6、压测

6.1 Linux内核优化

被压测服务器需要保持100W长连接,客户和服务器端是通过socket通讯的,每个连接需要建立一个socket,程序需要保持100W长连接就需要单个程序能打开100W个文件句柄

# 查看系统默认的值
ulimit -n
# 设置最大打开文件数
ulimit -n 1000000

通过修改配置文件的方式修改程序最大打开句柄数

root soft nofile 1040000
root hard nofile 1040000

root soft nofile 1040000
root hard nproc 1040000

root soft core unlimited
root hard core unlimited

* soft nofile 1040000
* hard nofile 1040000

* soft nofile 1040000
* hard nproc 1040000

* soft core unlimited
* hard core unlimited

修改完成以后需要重启机器配置才能生效

file-max的值需要大于limits设置的值

# file-max 设置的值参考
cat /proc/sys/fs/file-max
12553500

vim /etc/sysctl.conf

# 配置参考
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 0
net.ipv4.ip_local_port_range = 1024 65000
net.ipv4.tcp_mem = 786432 2097152 3145728
net.ipv4.tcp_rmem = 4096 4096 16777216
net.ipv4.tcp_wmem = 4096 4096 16777216

sysctl -p 修改配置以后使得配置生效命令

6.2 压测准备

6.3 压测数据

连接数内存
10000281M
1000002.7g
2000005.4g
50000013.1g
100000025.8g

7、如何基于webSocket实现一个分布式Im

7.1 说明

7.2 架构

用户连接时序图

分布是系统随机给用户发送消息

7.3 分布式系统部署

# app.yaml 配置文件信息
app:
  logFile: log/gin.log
  httpPort: 8080
  webSocketPort: 8089
  rpcPort: 9001
  httpUrl: im.91vh.com
  webSocketUrl:  im.91vh.com

# 在启动项目
go run main.go 

# 将第一个项目拷贝一份
cp -rf gowebsocket gowebsocket1
# app.yaml 修改配置文件
app:
  logFile: log/gin.log
  httpPort: 8081
  webSocketPort: 8090
  rpcPort: 9002
  httpUrl: im.91vh.com
  webSocketUrl:  im.91vh.com

# 在启动第二个项目
go run main.go 

在之前Nginx配置项中添加第二台机器的Ip和端口

upstream  go-im
{
    server 127.0.0.1:8080 weight=1 max_fails=2 fail_timeout=10s;
    server 127.0.0.1:8081 weight=1 max_fails=2 fail_timeout=10s;
    keepalive 16;
}

upstream  go-acc
{
    server 127.0.0.1:8089 weight=1 max_fails=2 fail_timeout=10s;
    server 127.0.0.1:8090 weight=1 max_fails=2 fail_timeout=10s;
    keepalive 16;
}

查看请求是否落在两个项目上 实验两个用户分别连接不同的项目(gowebsocket和gowebsocket1)是否也可以相互发送消息

本项目只是演示了这个项目如何分布式部署,以及分布式部署以后模块如何进行相互通讯 完全解决系统没有单点的故障,还需 Nginx集群、redis cluster等

8、回顾和反思

8.1 在其它系统应用

8.2 已经实现的功能

IM实现细节:

8.2 需要完善、优化

8.3 总结

9、参考文献

维基百科 WebSocket

阮一峰 WebSocket教程

WebSocket协议:5分钟从入门到精通

go-stress-testing 单台机器100w连接压测实战

github 搜:link1st 查看项目 gowebsocket

https://github.com/link1st/gowebsocket

意见反馈

<p align="center"> <img border="0" src="img/微信二维码.jpeg" alt="添加link1st的微信" width="200"/> </p>

赞助商

<p align="center"> <a href="https://www.jetbrains.com/?from=gowebsocket"> <img border="0" src="img/jetbrains_logo.png" width="200"/> </a> </p>