前言
提到websocket
相信大家都听说过,它的初衷是为了解决客户端浏览器与服务端进行双向通信,是在单个tcp
连接上进行全双工通讯的协议。在没有websocket之前只能通过浏览器到服务端的请求应答模式比如轮询,来实现服务端的变更响应到客户端,现在服务端也可以主动发送数据到客户端浏览器。websocket
协议和http
协议平行,都属于tcp/ip四层模型
中的第四层应用层。由于websocket
握手阶段采用http
协议,所以也需要进行跨域处理。它的协议标识是ws
或wss
对应了常规标识和安全通信协议标识。本文重点并不是介绍websocket
协议相关,而是提供一种基于asp.net core原生websocket的方式实现集群的实现思路。关于这套思路其实很早之前我就构思过了,只是之前一直没有系统的整理出来,本篇文章就来和大家分享一下,由于主要是提供一种思路,所以涉及到具体细节或者业务相关的可能没有体现出来,还望大家理解。
实现
咱们的重点关键字就是两个websocket
和集群
,实现的框架便是基于asp.net core
,我也基于golang
实现了一套,本文涉及到的相关源码和golang版本的实现都已上传至,具体仓库地址可以转到文末自行跳转到#示例源码中查看。既然涉及到集群,这里咱们就用nginx
作为反向代理,来搭建一个集群实例。大致的示例结构如下图所示
redis
在这里扮演的角色呢,是用来处理server
端的消息相互传递用的,主要是使用的redis的pub/sub
功能来实现的,这里便涉及到几个核心问题
- 首先,集群状态每个用户被分发到具体的哪台服务器上是不得而知的
- 其次,处在不同
server
端的不同用户间的相互通信是需要一个传递媒介 - 最后,针对不同的场景比如单发消息、分组消息、全部通知等要有不同的处理策略
这里需要考虑的是,如果需要搭建实时通信服务器的话,需要注意集群的隔离性,主要是和核心业务进行隔离,毕竟websocket
需要保持长链接、且消息的大小需要评估。
上面提到了redis
的主要功能就是用来传递消息用的,毕竟每个server服务器是无状态的。这当然不是必须的,任何可以进行消息分发的中间件都可以,比如消息队列rabbitmq、kafka、rocketmq、mqtt等,甚至只要能把要处理的消息存储起来都可以比如缓存甚至是关系型数据库等等。这压力使用redis主要是因为操作起来简单、轻量级、灵活,让大家关注点在思路上,而不是使用中案件的代码上。
nginx配置
通过上面的图我们可以看到,我们这里构建集群示例使用的nginx,如果让nginx支持websocket的话,需要额外的配置,这个在网上有很多相关的文章介绍,这里就来列一下咱们示例的nginx配置,在配置文件nginx.conf
里
//上游服务器地址也就是websocket服务的真实地址 upstream wsbackend { server 127.0.0.1:5001; server 127.0.0.1:5678; } server { listen 5000; server_name localhost; location ~/chat/{ //upstream地址 proxy_pass http://wsbackend; proxy_connect_timeout 60s; proxy_read_timeout 3600s; proxy_send_timeout 3600s; //记得转发避免踩坑 proxy_set_header host $host; proxy_http_version 1.1; //http升级成websocket协议的头标识 proxy_set_header upgrade $http_upgrade; proxy_set_header connection "upgrade"; } }
这套配置呢,在搜索引擎上能收到很多,不过不妨碍我把使用的粘贴出来。这一套亲测有效,也是我使用的配置,请放心使用。个人认为如果是线上环境采用的负载均衡策略可以选择ip_hash
的方式,保证同一个ip的客户端用户可以分发到一台websocket实例中去,这样的话能尽量避免使用redis的用户频道做消息传递。好了,接下来准备开始展示具体实现的代码了。
一对一发送
首先介绍的就是一对一发送的情况,也就是我把消息发给你,聊天的时候私聊的情况。这里呢涉及到两种情况
- 如果你需要通信的客户端和你连接在一个server端里,这样的话可以直接在链接里找到这个端的通信实例直接发送。
- 如果你需要通信的客户端和你不在一个server端里,这个时候咱们就需要借助redis的
pub/sub
的功能,把消息传递给另一个server端。
咱们通过一张图大致的展示一下它的工作方式
解释一下,每个客户端注册到websocket
服务里的时候会在redis里订阅一个user:用户唯一标识
的频道,这个频道用于接收和当前websocket连接不在一个服务端的其他websocket发送过来的消息。
每次发送消息的时候你会知道你要发送给谁,不在当前服务器的话则发送到redis的user:用户唯一标识
频道,这样的话目标websocket就能收到消息了。
首先是注入相关的依赖项,这里我使用的redis客户端是freeredis
,主要是因为操作起来简单,具体实现代码如下
var builder = webapplication.createbuilder(args); //注册freeredis builder.services.addsingleton(provider => { var logger = provider.getservice>(); redisclient cli = new redisclient("127.0.0.1:6379"); cli.notice = (s, e) => logger?.loginformation(e.log); return cli; }); //注册websocket具体操作的类 builder.services.addsingleton (); builder.services.addcontrollers(); var app = builder.build(); var websocketoptions = new websocketoptions { keepaliveinterval = timespan.fromminutes(2) }; //注册websocket中间件 app.usewebsockets(websocketoptions); app.mapget("/", () => "hello world!"); app.mapcontrollers(); app.run();
接下来我们定义一个controller用来处理websocket请求
public class websocketcontroller : controllerbase { private readonly ilogger_logger; private readonly websockethandler _sockethandler; public websocketcontroller(ilogger logger, websockethandler sockethandler, websocketchannelhandler websocketchannelhandler) { _logger = logger; _sockethandler = sockethandler; } //这里的id代表当前连接的客户端唯一标识比如用户唯一标识 [httpget("/chat/user/{id}")] public async task chatuser(string id) { //判断是否是websocket请求 if (httpcontext.websockets.iswebsocketrequest) { _logger.loginformation($"user:{id}-{request.httpcontext.connection.remoteipaddress}:{request.httpcontext.connection.remoteport} join"); var websocket = await httpcontext.websockets.acceptwebsocketasync(); //处理请求相关 await _sockethandler.handle(id, websocket); } else { httpcontext.response.statuscode = statuscodes.status400badrequest; } } }
这里的websockethandler是用来处理具体逻辑用的,咱们看一下相关代码
public class websockethandler:idisposable { //存储当前服务用户的集合 private readonly userconnection userconnection = new(); //redis频道前缀 private readonly string userprefix = "user:"; //用户对应的redis频道 private readonly concurrentdictionary_disposables = new(); private readonly ilogger _logger; //redis客户端 private readonly redisclient _redisclient; public websockethandler(ilogger logger, redisclient redisclient) { _logger = logger; _redisclient = redisclient; } public async task handle(string id, websocket websocket) { //把当前用户连接存储起来 _ = userconnection.getoradd(id, websocket); //订阅一个当前用户的频道 await submsg($"{userprefix}{id}"); var buffer = new byte[1024 * 4]; //接收发送过来的消息,这个方法是阻塞的,如果没收到消息则一直阻塞 var receiveresult = await websocket.receiveasync(new arraysegment (buffer), cancellationtoken.none); //循环接收消息 while (websocket.state == websocketstate.open) { try { //因为缓冲区长度是固定的所以要获取实际长度 string msg = encoding.utf8.getstring(buffer[..receiveresult.count]).trimend('\0'); //接收的到消息转换成实体 msgbody msgbody = jsonconvert.deserializeobject (msg); //发送到其他客户端的数据 byte[] sendbyte = encoding.utf8.getbytes($"user {id} send:{msgbody.msg}"); _logger.loginformation($"user {id} send:{msgbody.msg}"); //判断目标客户端是否在当前当前服务,如果在当前服务直接扎到目标连接直接发送 if (userconnection.trygetvalue(msgbody.id, out var targetsocket)) { if (targetsocket.state == websocketstate.open) { await targetsocket.sendasync(new arraysegment (sendbyte, 0, sendbyte.length), receiveresult.messagetype, true, cancellationtoken.none); } } else { //如果要发送的目标端不在当前服务,则发送给目标redis端的频道 channelmsgbody channelmsgbody = new channelmsgbody { fromid = id, toid = msgbody.id, msg = msgbody.msg }; //目标的redis频道 _redisclient.publish($"{userprefix}{msgbody.id}", jsonconvert.serializeobject(channelmsgbody)); } //继续阻塞循环接收消息 receiveresult = await websocket.receiveasync(new arraysegment (buffer), cancellationtoken.none); } catch (exception ex) { _logger.logerror(ex, ex.message); break; } } //循环结束意味着当前端已经退出 //从当前用户的集合移除当前用户 _ = userconnection.tryremove(id, out _); //关闭当前websocket连接 await websocket.closeasync(receiveresult.closestatus.value, receiveresult.closestatusdescription, cancellationtoken.none); //在当前订阅集合移除当前用户 _disposables.tryremove($"{userprefix}{id}", out var disposable); //关闭当前用户的通道 disposable.dispose(); } private async task submsg(string channel) { //订阅当前用户频道 var sub = _redisclient.subscribe(channel, async (channel, data) => { //接收过来当前频道数据,说明发送端不在当前服务 channelmsgbody msgbody = jsonconvert.deserializeobject (data.tostring()); byte[] sendbyte = encoding.utf8.getbytes($"user {msgbody.fromid} send:{msgbody.msg}"); //在当前服务找到目标的websocket连接并发送消息 if (userconnection.trygetvalue(msgbody.toid, out var targetsocket)) { if (targetsocket.state == websocketstate.open) { await targetsocket.sendasync(new arraysegment (sendbyte, 0, sendbyte.length), websocketmessagetype.text, true, cancellationtoken.none); } } }); //把redis订阅频道添加到集合中 _disposables.tryadd(channel, sub); } //程序退出的时候取消当前服务订阅的redis频道 public void dispose() { foreach (var disposable in _disposables) { disposable.value.dispose(); } _disposables.clear(); } }
这里涉及到几个辅助相关的类,其中userconnection
类是存储注册到当前服务的连接,msgbody
类用来接受客户端发送过来的消息,channelmsgbody
是用来发送redis频道的相关消息,因为要把相关消息通过redis发布出去,咱们列一下这几个类的相关代码
//注册到当前服务的连接 public class userconnection : ienumerable> { //存储用户唯一标识和websocket的对应关系 private concurrentdictionary _users = new concurrentdictionary (); //当前服务的用户数量 public int count => _users.count; public websocket getoradd(string userid, websocket websocket) { return _users.getoradd(userid, websocket); } public bool trygetvalue(string userid, out websocket websocket) { return _users.trygetvalue(userid, out websocket); } public bool tryremove(string userid, out websocket websocket) { return _users.tryremove(userid, out websocket); } public void clear() { _users.clear(); } public ienumerator > getenumerator() { return _users.getenumerator(); } ienumerator ienumerable.getenumerator() { return this.getenumerator(); } } //客户端消息 public class msgbody { //目标用户标识 public string id { get; set; } //要发送的消息 public string msg { get; set; } } //频道订阅消息 public class channelmsgbody { //用户标识 public string fromid { get; set; } //目标用户标识,也就是要发送给谁 public string toid { get; set; } //要发送的消息 public string msg { get; set; } }
这样的话关于一对一发送消息的相关逻辑就实现完成了,启动两个server端,由于nginx默认的负载均衡策略是轮询,所以注册两个用户的话会被分发到不同的服务里去
用postman
连接三个连接唯一标识分别是1、2、3
,模拟一下消息发送,效果如下,发送效果
接收效果
群组发送
上面我们展示了一对一发送的情况,接下来我们来看一下,群组发送的情况。群组发送的话就是只要大家都加入一个群组,只要客户端在群组里发送一条消息,则注册到当前群组内的所有客户端都可以收到消息。相对于一对一的情况就是如果当前websocket服务端如果存在用户加入某个群组,则当前当前websocket服务端则可以订阅一个group:群组唯一标识
的redis频道,集群中的其他websocket服务器通过这个redis频道接收群组消息,通过一张图描述一下
群组的实现方式相对于一对一要简单一点
- 发送端可以不用考虑当前服务中的客户端连接,一股脑的交给redis把消息发布出去
- 如果有websocket服务中的用户订阅了当前分组则可以接受消息,获取组内的用户循环发送消息
展示一下代码实现的方式,首先是定义一个action用于表示群组的相关场景
//包含两个标识一个是组别标识一个是注册到组别的用户 [httpget("/chat/group/{groupid}/{userid}")] public async task chatgroup(string groupid, string userid) { if (httpcontext.websockets.iswebsocketrequest) { _logger.loginformation($"group:{groupid} user:{userid}-{request.httpcontext.connection.remoteipaddress}:{request.httpcontext.connection.remoteport} join"); var websocket = await httpcontext.websockets.acceptwebsocketasync(); //调用handlegroup处理群组相关的消息 await _sockethandler.handlegroup(groupid, userid, websocket); } else { httpcontext.response.statuscode = statuscodes.status400badrequest; } }
接下来看一下handlegroup的相关逻辑,还是在websockethandler类中,看一下代码实现
public class websockethandler:idisposable { private readonly userconnection userconnection = new(); private readonly groupuser groupuser = new(); private readonly semaphoreslim _lock = new(1, 1); private readonly concurrentdictionary_disposables = new(); private readonly string groupprefix = "group:"; private readonly ilogger _logger; private readonly redisclient _redisclient; public websockethandler(ilogger logger, redisclient redisclient) { _logger = logger; _redisclient = redisclient; } public async task handlegroup(string groupid, string userid, websocket websocket) { //因为群组的集合可能会存在很多用户一起访问所以限制访问数量 await _lock.waitasync(); //初始化群组容器 群唯一标识为key 群员容器为value var currentgroup = groupuser.groups.getoradd(groupid, new userconnection { }); //当前用户加入当前群组 _ = currentgroup.getoradd(userid, websocket); //只有有当前websocket服务的第一个加入当前组的时候才去订阅群组频道 //如果不限制的话则会出现如果当前websocket服务有多个用户在一个组内则会重复收到redis消息 if (currentgroup.count == 1) { //订阅redis频道 await subgroupmsg($"{groupprefix}{groupid}"); } _lock.release(); var buffer = new byte[1024 * 4]; //阻塞接收websocket消息 var receiveresult = await websocket.receiveasync(new arraysegment (buffer), cancellationtoken.none); //服务不退出的话则一直等待接收 while (websocket.state == websocketstate.open) { try { string msg = encoding.utf8.getstring(buffer[..receiveresult.count]).trimend('\0'); _logger.loginformation($"group 【{groupid}】 user 【{userid}】 send:{msg}"); //组装redis频道发布的消息,目标为群组标识 channelmsgbody channelmsgbody = new channelmsgbody { fromid = userid, toid = groupid, msg = msg }; //通过redis发布消息 _redisclient.publish($"{groupprefix}{groupid}", jsonconvert.serializeobject(channelmsgbody)); receiveresult = await websocket.receiveasync(new arraysegment (buffer), cancellationtoken.none); } catch (exception ex) { _logger.logerror(ex, ex.message); break; } } //如果客户端退出则在当前群组集合删除当前用户 _ = currentgroup.tryremove(userid, out _); await websocket.closeasync(receiveresult.closestatus.value, receiveresult.closestatusdescription, cancellationtoken.none); } private async task subgroupmsg(string channel) { var sub = _redisclient.subscribe(channel, async (channel, data) => { channelmsgbody msgbody = jsonconvert.deserializeobject (data.tostring()); byte[] sendbyte = encoding.utf8.getbytes($"group 【{msgbody.toid}】 user 【{msgbody.fromid}】 send:{msgbody.msg}"); //在当前websocket服务器找到当前群组里的用户 groupuser.groups.trygetvalue(msgbody.toid, out var currentgroup); //循环当前websocket服务器里的用户发送消息 foreach (var user in currentgroup) { //不用给自己发送了 if (user.key == msgbody.fromid) { continue; } if (user.value.state == websocketstate.open) { await user.value.sendasync(new arraysegment (sendbyte, 0, sendbyte.length), websocketmessagetype.text, true, cancellationtoken.none); } } }); //把当前频道加入订阅集合 _disposables.tryadd(channel, sub); } }
这里涉及到了groupuser
类,是来存储群组和群组用户的对应关系的,定义如下
public class groupuser { //key为群组的唯一标识 public concurrentdictionarygroups = new concurrentdictionary (); }
演示一下把两个用户添加到一个群组内,然后发送接收消息的场景,用户u1发送
用户u2接收
发送所有人
发送给所有用户的逻辑比较简单,不用考虑到用户限制,只要用户连接到了websocket集群则都可以接收到这个消息,大致工作方式如下图所示
这个比较简单,咱们直接看实现代码,首先是定义一个地址,用于发布消息
//把用户注册进去 [httpget("/chat/all/{id}")] public async task chatall(string id) { if (httpcontext.websockets.iswebsocketrequest) { _logger.loginformation($"all user:{id}-{request.httpcontext.connection.remoteipaddress}:{request.httpcontext.connection.remoteport} join"); var websocket = await httpcontext.websockets.acceptwebsocketasync(); await _sockethandler.handleall(id, websocket); } else { httpcontext.response.statuscode = statuscodes.status400badrequest; } }
具体的实现逻辑还是在handlegroup类里,是handleall方法,看一下具体实现
public class websockethandler:idisposable { private readonly userconnection allconnection = new(); private readonly concurrentdictionary_disposables = new(); private readonly string all = "all"; private readonly ilogger _logger; private readonly redisclient _redisclient; public websockethandler(ilogger logger, redisclient redisclient) { _logger = logger; _redisclient = redisclient; } public async task handleall(string id, websocket websocket) { await _lock.waitasync(); //把用户加入用户集合 _ = allconnection.getoradd(id, websocket); //websocket集群中的每个服务只定义一次 if (allconnection.count == 1) { await suballmsg(all); } _lock.release(); var buffer = new byte[1024 * 4]; //阻塞接收信息 var receiveresult = await websocket.receiveasync(new arraysegment (buffer), cancellationtoken.none); while (websocket.state == websocketstate.open) { try { string msg = encoding.utf8.getstring(buffer[..receiveresult.count]).trimend('\0'); _logger.loginformation($"user {id} send:{msg}"); //获取接收信息 channelmsgbody channelmsgbody = new channelmsgbody { fromid = id, msg = msg }; //把消息通过redis发布到集群中的其他服务 _redisclient.publish(all, jsonconvert.serializeobject(channelmsgbody)); receiveresult = await websocket.receiveasync(new arraysegment (buffer), cancellationtoken.none); } catch (exception ex) { _logger.logerror(ex, ex.message); break; } } //用户退出则删除集合中的当前用户信息 _ = allconnection.tryremove(id, out _); await websocket.closeasync(receiveresult.closestatus.value, receiveresult.closestatusdescription, cancellationtoken.none); } private async task suballmsg(string channel) { var sub = _redisclient.subscribe(channel, async (channel, data) => { channelmsgbody msgbody = jsonconvert.deserializeobject (data.tostring()); byte[] sendbyte = encoding.utf8.getbytes($"user 【{msgbody.fromid}】 send all:{msgbody.msg}"); //接收到消息后遍历用户集合把消息发送给所有用户 foreach (var user in allconnection) { //如果包含当前用户跳过 if (user.key == msgbody.fromid) { continue; } if (user.value.state == websocketstate.open) { await user.value.sendasync(new arraysegment (sendbyte, 0, sendbyte.length), websocketmessagetype.text, true, cancellationtoken.none); } } }); _disposables.tryadd(channel, sub); } }
效果在这里就不展示了,和群组的效果是类似的,只是一个是部分用户,一个是全部的用户。
整合到一起
上面我们分别展示了一对一、群组、所有人的场景,但是实际使用的时候,每个用户只需要注册到websocket集群一次也就是保持一个连接即可,而不是一对一一个连接、注册群组一个连接、所有消息的时候一个连接。所以我们需要把上面的演示整合一下,一个用户只需要连接到websocket集群一次即可,至于发送给谁,加入什么群组,接收全部消息等都是连接后通过一些标识区分的,而不必每个类型的操作都注册一次,就和微信和qq一样我只要登录了即可,至于其他操作都是靠数据标识区分的。接下来咱们就整合一下代码达到这个效果,大致的思路是
- 用户连接到websocket集群,把用户和连接保存到当前websocket服务器的用户集合中去。
- 一对一发送的时候,只需要在具体的服务器中找到具体的客户端发送消息
- 群组的时候,先把当前用户标识加入群组集合即可,接收消息的时候根据群组集合里的用户标识去用户集合里去拿具体的websocket连接发送消息
- 全员消息的时候,直接遍历集群中的每个websocket服务里的用户集合里的websocket连接训话发送消息
这样的话就保证了每个客户端用户在集群中只会绑定一个连接,首先还是单独定义一个action,用于让客户端用户连接上来,具体实现代码如下所示
public class websocketchannelcontroller : controllerbase { private readonly ilogger_logger; private readonly websocketchannelhandler _websocketchannelhandler; public websocketchannelcontroller(ilogger logger, websocketchannelhandler websocketchannelhandler) { _logger = logger; _websocketchannelhandler = websocketchannelhandler; } //只需要把当前用户连接到服务即可 [httpget("/chat/channel/{id}")] public async task channel(string id) { if (httpcontext.websockets.iswebsocketrequest) { _logger.loginformation($"user:{id}-{request.httpcontext.connection.remoteipaddress}:{request.httpcontext.connection.remoteport} join"); var websocket = await httpcontext.websockets.acceptwebsocketasync(); await _websocketchannelhandler.handlechannel(id, websocket); } else { httpcontext.response.statuscode = statuscodes.status400badrequest; } } }
接下来看一下websocketchannelhandler类的handlechannel方法实现,用于处理不同的消息,比如一对一、群组、全员消息等不同类型的消息
public class websocketchannelhandler : idisposable { //用于存储当前websocket服务器链接上来的所有用户对应关系 private readonly userconnection userconnection = new(); //用于存储群组和用户关系,用户集合采用hashset保证每个用户只加入一个群组一次 private readonly concurrentdictionary> groupuser = new concurrentdictionary >(); private readonly semaphoreslim _lock = new(1, 1); //存放redis订阅实例 private readonly concurrentdictionary _disposables = new(); //一对一redis频道前缀 private readonly string userprefix = "user:"; //群组redis频道前缀 private readonly string groupprefix = "group:"; //全员redis频道 private readonly string all = "all"; private readonly ilogger _logger; private readonly redisclient _redisclient; public websocketchannelhandler(ilogger logger, redisclient redisclient) { _logger = logger; _redisclient = redisclient; } public async task handlechannel(string id, websocket websocket) { await _lock.waitasync(); //每次连接进来就添加到用户集合 _ = userconnection.getoradd(id, websocket); //每个websocket服务实例只需要订阅一次全员消息频道 await submsg($"{userprefix}{id}"); if (userconnection.count == 1) { await suballmsg(all); } _lock.release(); var buffer = new byte[1024 * 4]; //接收客户端消息 var receiveresult = await websocket.receiveasync(new arraysegment (buffer), cancellationtoken.none); while (websocket.state == websocketstate.open) { try { string msg = encoding.utf8.getstring(buffer[..receiveresult.count]).trimend('\0'); //读取客户端消息 channeldata channeldata = jsonconvert.deserializeobject (msg); //判断消息类型 switch (channeldata.method) { //一对一 case "one": await handleone(id, channeldata.msgbody, receiveresult); break; //把用户加入群组 case "usergroup": await addusergroup(id, channeldata.group, websocket); break; //处理群组消息 case "group": await handlegroup(channeldata.group, id, websocket, channeldata.msgbody); break; //处理全员消息 default: await handleall(id, channeldata.msgbody); break; } receiveresult = await websocket.receiveasync(new arraysegment (buffer), cancellationtoken.none); } catch (exception ex) { _logger.logerror(ex, ex.message); break; } } await websocket.closeasync(receiveresult.closestatus.value, receiveresult.closestatusdescription, cancellationtoken.none); //在群组中移除当前用户 foreach (var users in groupuser.values) { lock (users) { users.remove(id); } } //当前客户端用户退出则移除连接 _ = userconnection.tryremove(id, out _); //取消用户频道订阅 _disposables.remove($"{userprefix}{id}", out var sub); sub?.dispose(); } public void dispose() { foreach (var disposable in _disposables) { disposable.value.dispose(); } _disposables.clear(); } }
这里涉及到了channeldata
类是用于接收客户端消息的类模板,具体定义如下
public class channeldata { //消息类型 比如一对一 群组 全员 public string method { get; set; } //群组标识 public string group { get; set; } //消息体 public object msgbody { get; set; } }
类中并不会包含当前用户信息,因为连接到当前服务的时候已经提供了客户端唯一标识。结合上面的处理代码我们可以看出,客户端用户连接到websocket实例之后,先注册当前用户的redis订阅频道并且当前实例仅注册一次全员消息的redis频道,用于处理非当前实例注册客户端的一对一消息处理和全员消息处理,然后等待接收客户端消息,根据客户端消息的消息类型来判断是进行一对一、群组、或者全员的消息类型处理,它的工作流程入下图所示
由代码和上面的流程图可知,它根据不同的标识去处理不同类型的消息,接下来我们可以看下每种消息类型的处理方式。
一对一处理
首先是一对一的消息处理情况,看一下具体的处理逻辑,首先是一对一发布消息
private async task handleone(string id, object msg, websocketreceiveresult receiveresult) { msgbody msgbody = jsonconvert.deserializeobject(jsonconvert.serializeobject(msg)); byte[] sendbyte = encoding.utf8.getbytes($"user {id} send:{msgbody.msg}"); _logger.loginformation($"user {id} send:{msgbody.msg}"); //判断目标用户是否在当前websocket服务器 if (userconnection.trygetvalue(msgbody.id, out var targetsocket)) { if (targetsocket.state == websocketstate.open) { await targetsocket.sendasync(new arraysegment (sendbyte, 0, sendbyte.length), receiveresult.messagetype, true, cancellationtoken.none); } } else { //如果不在当前服务器,则直接把消息发布到具体的用户频道去,由具体用户去订阅 channelmsgbody channelmsgbody = new channelmsgbody { fromid = id, toid = msgbody.id, msg = msgbody.msg }; _redisclient.publish($"{userprefix}{msgbody.id}", jsonconvert.serializeobject(channelmsgbody)); } }
接下来是用于处理订阅其他用户发送过来消息的逻辑,这个和整合之前的逻辑是一致的,在当前服务器中找到用户对应的连接,发送消息
private async task submsg(string channel) { var sub = _redisclient.subscribe(channel, async (channel, data) => { channelmsgbody msgbody = jsonconvert.deserializeobject(data.tostring()); byte[] sendbyte = encoding.utf8.getbytes($"user {msgbody.fromid} send:{msgbody.msg}"); if (userconnection.trygetvalue(msgbody.toid, out var targetsocket)) { if (targetsocket.state == websocketstate.open) { await targetsocket.sendasync(new arraysegment (sendbyte, 0, sendbyte.length), websocketmessagetype.text, true, cancellationtoken.none); } else { _ = userconnection.tryremove(msgbody.fromid, out _); } } }); //把订阅实例加入集合 _disposables.tryadd(channel, sub); }
如果给某个用户发送消息则可以使用如下的消息格式
{"method":"one", "msgbody":{"id":"2","msg":"hello"}}
method为one代表着是私聊一对一的情况,消息体内id为要发送给的具体用户标识和消息体。
群组处理
接下来看群组处理方式,这个和之前的逻辑是有出入的,首先是用户要先加入到某个群组然后才能接收群组消息或者在群组中发送消息,之前是一个用户对应多个连接,整合了之后集群中每个用户只关联唯一的一个websocket连接,首先看用户加入群组的逻辑
private async task addusergroup(string user, string group, websocket websocket) { //获取群组信息 var currentgroup = groupuser.getoradd(group, new hashset()); lock (currentgroup) { //把用户标识加入当前组 _ = currentgroup.add(user); } //每个组的redis频道,在每台websocket服务器实例只注册一次订阅 if (currentgroup.count == 1) { //订阅当前组消息 await subgroupmsg($"{groupprefix}{group}"); } string addmsg = $"user 【{user}】 add to group 【{group}】"; byte[] sendbyte = encoding.utf8.getbytes(addmsg); await websocket.sendasync(new arraysegment (sendbyte, 0, sendbyte.length), websocketmessagetype.text, true, cancellationtoken.none); //如果有用户加入群组,则通知其他群成员 channelmsgbody channelmsgbody = new channelmsgbody { fromid = user, toid = group, msg = addmsg }; _redisclient.publish($"{groupprefix}{group}", jsonconvert.serializeobject(channelmsgbody)); }
用户想要在群组内发消息,则必须先加入到一个具体的群组内,具体的加入群组的格式如下
{"method":"usergroup", "group":"g1"}
method为usergroup代表着用户加入群组的业务类型,group代表着你要加入的群组唯一标识。接下来就看下,用户发送群组消息的逻辑了
private async task handlegroup(string groupid, string userid, websocket websocket, object msgbody) { //判断群组是否存在 var hasvalue = groupuser.trygetvalue(groupid, out var users); if (!hasvalue) { byte[] sendbyte = encoding.utf8.getbytes($"group【{groupid}】 not exists"); await websocket.sendasync(new arraysegment(sendbyte, 0, sendbyte.length), websocketmessagetype.text, true, cancellationtoken.none); return; } //只有加入到当前群组,才能在群组内发送消息 if (!users.contains(userid)) { byte[] sendbyte = encoding.utf8.getbytes($"user 【{userid}】 not in 【{groupid}】"); await websocket.sendasync(new arraysegment (sendbyte, 0, sendbyte.length), websocketmessagetype.text, true, cancellationtoken.none); return; } _logger.loginformation($"group 【{groupid}】 user 【{userid}】 send:{msgbody}"); //发送群组消息 channelmsgbody channelmsgbody = new channelmsgbody { fromid = userid, toid = groupid, msg = msgbody.tostring() }; _redisclient.publish($"{groupprefix}{groupid}", jsonconvert.serializeobject(channelmsgbody)); }
加入群组之后则可以发送和接收群组内的消息了,给群组发送消息的格式如下
{"method":"group", "group":"g1", "msgbody":"hi all"}
method为group代表着用户加入群组的业务类型,group则代表你要发送到具体的群组的唯一标识,msgbody则是发送到群组内的消息。最后再来看下订阅群组内消息的情况,也就是处理群组消息的逻辑
private async task subgroupmsg(string channel) { var sub = _redisclient.subscribe(channel, async (channel, data) => { //接收群组订阅消息 channelmsgbody msgbody = jsonconvert.deserializeobject(data.tostring()); byte[] sendbyte = encoding.utf8.getbytes($"group 【{msgbody.toid}】 user 【{msgbody.fromid}】 send:{msgbody.msg}"); //获取当前服务器实例中当前群组的所有用户连接 groupuser.trygetvalue(msgbody.toid, out var currentgroup); foreach (var user in currentgroup) { if (user == msgbody.fromid) { continue; } //通过群组内的用户标识去用户集合获取用户集合里的用户唯一连接发送消息 if (userconnection.trygetvalue(user, out var targetsocket) && targetsocket.state == websocketstate.open) { await targetsocket.sendasync(new arraysegment (sendbyte, 0, sendbyte.length), websocketmessagetype.text, true, cancellationtoken.none); } else { currentgroup.remove(user); } } }); _disposables.tryadd(channel, sub); }
全员消息处理
全员消息处理相对来说思路比较简单,因为当服务启动的时候就会监听redis的全员消息频道,这样的话具体的实现也就只包含发送和接收全员消息了,首先看一下全员消息发送的逻辑
private async task handleall(string id, object msgbody) { _logger.loginformation($"user {id} send:{msgbody}"); //直接给redis的全员频道发送消息 channelmsgbody channelmsgbody = new channelmsgbody { fromid = id, msg = msgbody.tostring() }; _redisclient.publish(all, jsonconvert.serializeobject(channelmsgbody)); }
全员消息的发送数据格式如下所示
{"method":"all", "msgbody":"hello all"}
method为all代表着全员消息类型,msgbody则代表着具体消息。接收消息出里同样很简单,订阅redis全员消息频道,然后遍历当前websocket服务器实例内的所有用户获取连接发送消息,具体逻辑如下
private async task suballmsg(string channel) { var sub = _redisclient.subscribe(channel, async (channel, data) => { channelmsgbody msgbody = jsonconvert.deserializeobject(data.tostring()); byte[] sendbyte = encoding.utf8.getbytes($"user 【{msgbody.fromid}】 send all:{msgbody.msg}"); //获取当前服务器实例内所有用户的连接 foreach (var user in userconnection) { //不给自己发送消息,因为发送的时候可以通过具体的业务代码处理 if (user.key == msgbody.fromid) { continue; } //给每个用户发送消息 if (user.value.state == websocketstate.open) { await user.value.sendasync(new arraysegment (sendbyte, 0, sendbyte.length), websocketmessagetype.text, true, cancellationtoken.none); } else { _ = userconnection.tryremove(user.key, out _); } } }); _disposables.tryadd(channel, sub); }
示例源码
由于篇幅有限,没办法设计到全部的相关源码,因此在这里贴出来github
相关的地址,方便大家查看和运行源码。相关的源码我这里实现了两个版本,一个是基于asp.net core的版本,一个是基于golang的版本。两份源码的实现思路是一致的,所以这两份代码可以运行在一套集群示例里,配置在一套nginx里,并且连接到同一个redis实例里即可
asp.net core
源码示例golang
源码示例
仓库里还涉及到本人闲暇之余开源的其他仓库,由于本人能力有限难登大雅之堂,就不做广告了,有兴趣的同学可以自行浏览一下。
总结
本文基于asp.net core
框架提供了一个基于websocket
做集群的示例,由于思想是通用的,所以基于这个思路楼主也实现了golang
版本。其实在之前就想自己动手搞一搞关于websocket集群方面的设计,本篇文章算是对之前想法的一个落地操作。其核心思路文章已经做了相关介绍,由于这些只是博主关于构思的实现,可能有很多细节尚未体现到,还希望大家多多理解。其核心思路总结一下
- 首先是,利用可以构建websocket服务的框架,在当前服务实例中保存当前客户端用户和websocket的连接关系
- 如果消息的目标客户端不在当前服务器,可以利用redis频道、消息队列相关、甚至是数据库类的共享回话发送的消息,由目标服务器获取目标是否属于自己的ws会话
- 本文设计的思路使用的是无状态的方式,即websocket服务实例之间不存在直接的消息通信和相互的服务地址存储,当然也可以利用redis等存储在线用户信息等,这个可以参考具体业务自行设计
读万卷书,行万里路。在这个时刻都在变化点的环境里,唯有不断的进化自己,多接触多尝试不用的事物,多扩展自己的认知思维,方能构建自己的底层逻辑。毕竟越底层越抽象,越通用越抽象。面对未知的挑战,自身作为自己坚强的后盾,可能才会让自己更踏实。
以上就是asp.net core websocket集群实现思路详解的详细内容,更多关于asp.net core websocket的资料请关注其它相关文章!