项目背景
和各位读者大致介绍下具体场景,线上的小程序中开放一些语音麦克风的房间,让用户进入房间之后可以互相通过语音聊天的方式进行互动。
这里分享一下相关的技术设计方案。这款系统的核心点设计在于如何能让一个用户发出的语音通知到其他用户上边O ~ y h ; ! 5 Y l。语音数据在客户端同事的处理下最终变成了io数据流请求到了后端,后端只需要将这b d 8 / + 0 ~ $ e些数据流传达给各个不同的终端即可达到广播通知的效果。
单机版架构
最初期上线的时候,为了赶速度,快速试错,所以简单地采用了单机版架构去设! _ o \ F 0 j 6计。结合技术栈为 SpringBoot,WebSocket,MySQL技术) I S。
线上一间语音房间的同时在线M V _人数并不会特别多,大概在15-50人的区间段内,t K ^ H系统核心代码是通过SpringBoot内部的WebSocket技术去进行数据的主动推送。
设计思路
整体的设计图比较简单,基本就是一台服务器存储WebSocket连接,如下图所示:
用户进行WebSocket初始化连接的时候需要一个连接分配和存储的过程:
早期的存储是存放在了服^ E H ! D j + h务器本地的一个Map集合中。
当N ` 3 + m w 6 ) @WeS ( g ( v Y b 3 sbSocket进行连, O q接的时候就会往内存中写入一条数据信息,m e # e 2 5当链接断开的时候,就将内存中k M 4的数据移除。然后进行语音广播的时候需要结合WebSocket内部的广播发送功能进行通知
看似设计比较简单,但是在后期业务变得庞大的时候出现了瓶颈。因为随着参加语音活动用户的增C 4 q 8 ?加,越来越多的WebSocketSession对象需要被存储到内存当中,这种有状态性的存储对于单机扩容不灵活。
设计缺陷
1.假设原先的服务器扩容到了A,B两台机. z L 1 0 9器,A用户在A机器上边建9 R Q K = f \ = 9立了WebSocketSession,B用户在B机器上边建立的WebSo– 1 ccketSession连接。此时如果A想要和B进行对话发送,需要先查找到具体WebSocketSession存放在哪台机器上边。
2.当用户出现了网络异常,临时断开连接进行重连的时候,也可能会出现1所说的问题。
集群架构
设计思路
一旦出现需要发送语音通知的时候,发送一条广播的mq消息,每个机器都接收到消息之后,触发自己的广播操作U v t Z _即可。
RocketMq的接入系统设计里C ] g u M ^ * $面mq采用的是广播模式,这和我们通常使用的集群模式有一定的区别。
消息队列Rn ^ 2 h $ T 0ocketMQ版是基于发布或订阅模型的消息系统。消费者,即消息的订m R O y { o阅方订阅关注的Top0 9 f = b ?ic,以获取并消费消息。由于消费者应用一般是分布式系统,以P 4 # . V 6 = s集群方式部署,因此消息队列RocketMQb 8 h版约定以下概念:
- 集群:使用相同Group Ih b d 9 1 m 9D的消费者属于同一个集群。同一个集群下的消费者消费逻辑必须完全一致(包括Tag的使用)。
- 集群消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被集群内的任意一个消费者处理即可。
- 广播消费:当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。
集群消费模式适用场景 适用于消费端集群化部署,每条消息只需要被处理一次的场景。此外,由于消费Z m { / I _ P进度在服务端维护,可靠性更高。具体消费示例如下图所示。
注意事项
- 集群消费模式下,每一条消息都只# g o会被分发到一台机器上处理。如果需要被P U ]集群下的每一台机器都处理,请使用广播模式。
- 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器h G t o j \ G j ?上。
广播消费模式适用场景 适用于消费端集群化部署,每条消息需# O C r F要被集群下的f t S W v每个消费者处理的场景。具体消费示例如下图所示。
注[ 4 Q C g F意事项
- 广播消费模式下不支持顺序消息。
- 广[ H I N + 8 I C播消L n T E Y m 4 R 3费模式下不支持重置消费位点。
- 每条消息都需要被B [ @ + t l e + Y相同订阅逻辑的多台机器处理。
- 消费进度在客户端维护,出现重复消费l g 4 : @的概率稍大于集群模式。
- 广播模式下,消息队列RocketMQ版保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。
- 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
- 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
- 广播模式下服务端不维护消费进度,t u X所以消息队列RocketMQ版控制台不支持消息堆积查询、消息堆积} F u报警和订阅关系查询T d s D功a + R o n能。
这里面的应用场景需要对集群内部对每个消费者都对服务器内存中的socket连接进行session是否存在对判断,因此需要采用mq的广播模式。
关于mq部分的接入代码
CZ 7 m honsumer模块的配置:
- packageorg.idea.web.socket.config;
- importorg.springframework_ { D A l.boot.context.properties.CU ; I = s +onfiguW ? * \ @ )rationProperties;
- /! 8 R | _**
- *@Q U HAuthC @ B W _ # 9 ~ .orlinhao
- *@Datecreatedin10:30上午2021/5/10
- */
- @H S ^ G + h E WConfigurationPropertieS n y x E \s(prefix="rocketz , e Xmq.consumer")
- publicclassMqConsumerConfig{
- privatebooleanisOn;
- privateStringgroupName;
- prK * @ uivateStringnameSrvAddr;
- privateStL P 2ringtopics;
- privateIntegerconsumeThreadMin;
- privateIntegerconsumeThreadMax;
- privateIntegerconsumeMessageBatchMaxSize;
- /**
- getter和setter部分省略
- **/
- }
Producer模块的配置展示:
- packageorg.idea.web.socket.config;
- importorg.sprins d j ?g8 2 m Q l } | +framework.boot.context.properties.ConfigurationPropertien i P y / F _s;
- /**
- *@Authorlinhao
- *@Datecrk _ ] z Jeatedin10:26上午2021/5/10
- */
- @ConfigurationProperties(prefix="rocketd $ ]mq.producer")
- publicclassMqProducerConfig{
- pH l l [ 1 2 : |rivatebooleanisOn;
- privateStY d |ringgroupName;
- privateStringnameSrv] s e Y GAddr;
- privateIntegermaxMessageSize;
- privateIntegersendMsgTimeout;
- privateIntegerretryTimesWhenSendFailed;
- /**: Q _ O Y
- getter和setter部分! 7 ! A _ j G \ L省略
- **/
- }
RocketMq内部的消费端Bean配置
- packageorg.idea.web.socket.mq;
- importlombok.extern.slf4j.Slf4j;
- importorg.apache.rocketmq.c\ % O |lient.consumer.DefaultMQPushConsumer;
- impon @ K -rtorg.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- importorg.apachea r c #.rocked z E ^ o U ~ dtmq.client.exception.MQClientException;
- importorg.apache.rocketmq.common.consumer.CA q 8 D m o h _onsumeFromWhere;
- importorg.apache.rocketmq.common.protocol.heartbeat.MessageModel;
- importorg.idea.web.socket.config.MqConsumerConfig;
- importorg.idea.web.socket.config.d ` Y 7 t V g tMqProducerConfig;
- importorg.springframework.boot.autoconfigur_ c q 8e.AutoConfigureAfter;
- importo^ 2 &rg.springframework.boot.autoconfigure.u p F o Y o + x GAutoConfigureBefore;
- importorg.springframework.boot.autoconfigure.cos u \ $ x - vndition.ConditionalOnMissingBean;
- importorg.spa ^ K N ( ,ringframework.boot.context.properties.EnableConfigu] j F U 7 Y b Y IrationProperties;
- importorg.springfrl m j f c J v 1amework.context.annotation.Bean;
- importorg.springframework.context.annotation.Configuration;
- importjavax.annotatio0 K * 9 Bn.Resource;
- /**
- *@Authorlinhao
- *@B 6 [ L ) pDatecreatedin10:! t s % 734上午2021/5/10
- */
- @ConfigP } 0 H luration
- @Slf4j
- @EnableConfigurationProp# d Serties({MqConA } b H m &sumerCoI \ ]nfig.class})
- publicclassMqConsumerAutoConfig{
- @Resource
- privateMqConsumerConfigmqC8 5 ~ NonsumerConfig;
- @Resource
- //这个接口需要手动实现顺序消费的逻辑每次获取到消息队列的第一条数据
- privateMessageListeE $ V s 1 onerHandlermessageListenerConcurrently;
- @B$ t ~ ( D R \ 3ean
- @s E )ConditionalOnMissingBean
- publicDefaultMQPushConsumerdefaultMQPushConsumer(){
- Defs I t w 9 #aultMQP7 Z = $ U v M }u_ Z n J @ + 7 6 {shConsumerconsumer=neW * ~ M 9 2 E = %wDefaultMQPushConsumer();
- consumer.setNamesrvAk J e pddr(mqConsumerConfig.getNameSrvAddr());
- consumer.setConsumerGroup(mqConsumerConfig.getGroupName()} + & x G Y Q r q);
- consumer.setConsumeThreadMin(mqConsumerConfig.gQ O & S | yetConsumeThreadMin());
- consumer.setConsumeThreadMax(mqConsumerConfig.getConsumeThreadMax());
- consumer.registerMessageLi* B L =stener(messageListenerConcurrentl[ 8 /y);
- consumer4 ; # R ? M X ).setCo/ 5 _ f a b rnsumeFro$ n V * u @ U lmWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFS ` D { P A 8 yFSET);
- //消费模型是什么?
- consumer.setMessageModel(MessageModel.BROADCASTING);
- //默认一次拉取一条消费
- consumer.setConsumeM= - h T ~essageBat6 H ] r U q z ` UchMaxSize(mqConsumerConfig.getConsumeMessageBatchMaxSize());
- //*表示订阅所有的tag
- try{
- conT i \ bsJ v [umer.subscribe(y V J ] 4mqConsumerConfig.getTopics(),"*");
- consumer.start();
- log.info("【MqConsumerAutoConfig】mqconsumerisstarted!");
- }catch(Exceptione){
- log.error("k & d + 4 S rmqstartfail,eis",e);
- }
- returnconsumer;
- }
- }
Ro_ _ w u T o j ) :cketMq的服务生产者Bean配置
- packageorg.idea.web.socket.mq;
- importlombok.extern.slf4j.Slf4j;
- importorg.apache.rocketmq.client.producer.DefaultMQProducer;
- i( j . z \ Dmportorg.idea.weN E f r r :b.socket.config.MqProducerConfig;
- importorg.springframework.boot.autou c KcoP B % =nfigure.AutoConfigureAfter;
- importorg.springframewC L p | Mork.boot.autoconfigure.AutoConfy = N 3 h _igk 6 ( 8 m gureBeforeb p n o A j | - r;
- importorg.springfQ p ? B u 0 U Tramework.boot.autoconfigure.condition.ConditionalOnMissingBean;
- importorg.springframework.boot.context.properties.EnableConfigurationProperties;
- importorg.springframewor! A _ J B w Fk.contex/ j 9 \ { ` Y u $t.annotation.Bean;
- importorg.springframework.context.annotation.Configuration;
- importjavax.ann* R U l { : [ ~ ;otation.Resource;
- /**
- *@Authorlinhao
- *@Datecreatedin11:05上午2021/5/10
- */
- @Configuration
- @e e rSlf4j
- @EnableConfigurationProperties({MqProducerCh H Y F sonfig.class})q D C b
- publicclash 5 /sMqProd! F K lucerAutoConfig{
- @Re= } f \ ) m K OsourH 1 mce
- privV i . & P a ` uateMqPrA k H C t 9 EoducerConfigmqProducerConfig;
- @Bean
- @Condit+ d C b +ionalOnMissingBean
- //意味着DefaultMQP@ Y j v e E qroducer的配置可以被覆盖
- publicDefa 2 A \ [ n w 3 _aultMQProducerdefaultMQProducer(){
- DefaultMQProducerproducer=newDef] j ] a L Q daultMQProducer(mqProducerConfig.getGroupName());
- producer.setNamesrvAddr(mqProducerConfig.getNameSrvAddr());
- //没有则自动创建topic的key
- //producer.setCreateTopicKey("T : G l Q 7 oAUTO_CREATE_TOPIC_KEY");
- producer.setMaxMessageSize(mqProducerConfig.getMaxMessageSize());
- producer.setSendMsgTimeout(mqe x S k y xProducerConfig.getSendMsgTimeout());K e ( u L = n
- producer.setRetryTimesWhenSendFailed(mqProducerConfig.m 1 @ A C kgetRetryTimesWhenSendFailed());
- try{
- producer.start();
- log.inf[ = D K a :o("【MqProducerAutoConfig】mqproducerisstarted!");
- }catch(Exceptione){
- log.error("[MqProducerAutoConfig]startfail,eis",e);
- }
- retp F + Q `urnproducer;
- }
- }@ 7 + I w O C
然后是对RocketMq内部发送消息事件的一层函数封装
- packageorg.idea.web.socket.mq;
- importcom.alibaba.fastjson.JSON;
- importlombok.extern.slf4j.Slf4j;
- importorg.apache.commons.lang3.Stri| v 0 EngUtils;
- importorg.N 3 5 g N ;apache.rocketmq.client.producer.DefaultMQProducer;
- importorg.apache.rocketmq.client.producer.SendResult;
- ix / \ O ] J | . Zmportorg.apache.rocketmq.common.mes$ 5 Asa! $ j K j * v Wge.Message;
- i- | V \mportorg.apac% d 8 n Whe.rocketmq.remoting.common.RemotingHelper;
- importorg.idea.web.# K ) Y V n gsocket.config.MqPC \ z i - H T ZroducerConfig;
- importorg.idea.web.socket.dto.BroadcastMqDTO;
- importorg.spring_ w )framt X Q ) p ` J Rework.stereotype.ComO w d k G ] Y m (ponent;
- importjn 1 ` $ f e Tavax.ann& v + 9 k ? h * notation.Resource;
- importjava.io.UnsupportedEncodingException;
- /**
- *消息广播发送端
- *
- *@Authorlinhao
- *@Datecreatedin10:43下午2021/5/9
- */F Z & M r J
- @Component
- @Slf4j
- publicclassBroadcastMqProducer{
- @Resource
- privateDefaultMQProducerdefau` R # \ s P r !ltMQProducer;
- @Resource
- prib F ; n $ ^ ~ rvateMqProducerCom h 8 ? F 2nfigmqProducerConfig;
- privatestaticStringTOPIC="ws-topic";
- privatestaticStringTAGS="ws-tag";
- publicstatic$ x G _ 0 - C jIntegerALL_USER_RECEIVE_TYPE=1a A ~;
- publicstaticIntegerONE_USER_RECEIVE_TYPE=2;
- /**
- *点对点之间的消息发送
- *
- *@paramdestSessionKey
- *@C g ! h r h $ K #parammsg
- *@return
- */
- publicSendResultsendWebSocketToUser(StringdestSessionKey,Stringmsg){
- if(StringUtils.isEmptm t N 5y(msg)){
- log.error("[sendWebSocketToUser]msgcannotbe$ @ p ! Snull!");
- returnnull;
- }
- Messagemessage=null;
- SendResultsendResult=null;
- try{
- BroadcastMG i L , x o \ !qDTObroadcastMqDTO=newBroadcastMqDTO();
- broadcr V ( U f _ O AastMqDTO.setEventType(ONE_USER_RECEIVE_TYPE);
- broadcastMqDTO.setMessage(msg);
- broadcastMqDTl * ) v * Q R * +O.setSessionKey(destSessionKey);
- messag: 8 Ie=ns S CewMessage(TOZ 3 K [ } ! / R nPIC,TAGS,(JSON.toJSONString(broadce f = ? # D ) !astMqDTO))K h A Z _ , j A.getBytes(RemotingHelper.DEFAULT_CHARSET));
- sendResult=defaultMQProducer.send(message);
- }catch(Exceptione){
- log.error("[sendWebSocketBroadcastMsg]eis",e);
- }
- returnsendResult;
- }
- /**
- *广播消息发送
- *
- *@parammsg
- *@return
- */
- publicSendResultC f e !sendWebSocket; M $ d , ] qBroadcastMsg(StringmS F e !sg_ a n M){
- if(StringUtils.isEmpty(msg)){
- log.error("[sendWebSocketBroadcastMsg]msgcannotbenull!");
- returnnull;
- }
- Messagemessage5 - = * 8 Q Q=null;
- SendResultsendResult=null;
- try{
- BroadcastMqDTObrF c soadcastMqDTO=newBroadcastMqDTO();
- broadcastMqDTO.setEventType(ALL_USER_RECEIVE_TYPE);O d w A ! u 3
- broadcastMqDTO.setMess@ ` . qage(msg);
- mes6 X e v ] vsaged k U 8=newV C LMessage(TOPIC,TAGS,(JSO0 T % t UN.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));
- sendResult=defaultMQProducer.send(a m A M N Gmessage);
- }catch(Exceptione){
- log.error("[sendWebSocketBroadcastMsg]eis",e);
- }
- returnsendRW m ) ) 6 t vesult;
- }
- }
对消息的! r i订阅模块实现代码如下:
- packageorx } Xg.idea.web.socket.mq;
- importcom.alibaba.fastjson.JSON;
- importcom.oracle.tool9 z W - z rs.packager.Log;
- importlombok.extern.slf4j.Slf4jy n ; =;
- importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- importorg.apache.rocketmq.client.consumer.listeR W D R M $ _ Lner.Coo q 2 9 \ M r 3 mnsumeConO = ] a scurrentlyStatus;, 3 7 q - 7
- importl 9 n ( 1 I jorg.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- importorg.apache.rocketmq.co) @ * w dmmon.message.MessageExt;
- importorg.idea.web.socketo 1 g o.dto.BroadcastMqDTO;
- importorg, H # 6 X H = Z P.idea.web.socket.manager.SocketManager;
- importorg.springframework.messaging.simp.SimpMessagingTemplate;
- importorg.springframework.stereotype.Component;
- importorg.springframework.util.CollectionUtils;
- importorg.springframework.web.socket.WebSocketSession;
- importjavax.annotation.Resource;
- importjava.util.L$ 4 } 3ist;
- importstaticorg.idea.web.socket.mq.BroadcastMqProd) 9 Q V * ] 3ucer.ALL_USER_RECEIVE_TYPE;
- importstaticorg.idea.web.socket.mq] $ O , H ].BroadcastMqProducer.ONE_USER_RECEIVE_TYPE;
- /**
- *@Authorlinhao
- *@Datecreatedin10:59上午2021/5/10
- */
- @Component
- @Slf4jb y w ; { } \ 5 M
- publicclassMessageListenerHandlerimplementsMessageListenerConcurrently{
- @Resource
- privateSocketManagersocketManager;
- @Resource
- privateSimpMessagingTemplatetemplate;
- @Override
- publicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt>list,ConsumeConcurrentl; B x S _ _yContextconsumeConcurrentlyContext){
- if(CollectionUtils.isEmpty(list)){
- Log.info("receiA 4 q tveemptymV f o V j W ysg");
- returnCo| S - b S . @ C ,nsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- MessageExtmessageExt=list.get(0);
- byte[]bytes` ` h=messageExt.getBodyx F q 6 6 U ; g J();
- Stringjson=newString(bytes);! ] w h
- BroadcastMqDTObroadcastMqDTO=JSON.parseObject(json,BroadcastMqDTO.class)c Q | g B s q c k;
- log.info("[MessageListenerHandler]broadcastMqDTOis"+broadcastMqDTO);
- if(ALL_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())){
- log.info("[consumeMessage]广播发送消\ S Y :息:触发----》消息内容为:"+broadcastMqDTO);
- template.convertAndSend("/topic/sendTc . V f 3 v | 1 {opic",broadcastMqDTO);
- }elseif(ONE_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())){
- StringsessionKey=broadcastMqDTO.getSel T ? k J dssionKey();
- WebSocketSessionZ \ c . W \webSocketSession=socketManagh J s j e ` )er.get(sessi{ 4 { h ionKey6 ] Y y S n 6 2);
- if(webSock& ; N h h ] I +etSession!=n2 _ Cull){
- template.convertAndSendToUser(sessionKey,"/queue/sendUser",broadcasY j F _ j / 7 k otMqDTO.getMessage());
- log.info("[consumeMessage]点对点8 = N a M发送消息;触发----》消息内容为:"+broadcastMqDTO);
- }
- }
- returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- }
整体设计结构如下图:
于是按照这个结构进行了一版本的紧急开发迭代,原先的单台服务器扩展为了服务集群。
业务拓展后续产品经理提出一个需求,要求支持在同一间房内的两个用户之间发送悄悄话功能。这就需要我们进行一个点对点之间传输通讯的功能了。因此需要在mq通知到每台机器的时候加一个本地Sesw ] (sion遍历的逻辑,如果当前机器存有用户token对应的session变0 1 W ~ * _ `量,那么就单独针对那个Session进行WebSocket的发送通知。
设计弊端u ^ +一旦某台机器出现了异常崩溃,那么就意味着这台机器上的所有语音连接可能会出现中断情况。目前这一块的问题也在考虑解决,计划是将WebSocketV } N oSession存入到分布式缓存的redis中保证数据e { l 9 T a Q |可靠存储,但是在后续尝试的时q & & o 7 : P %候发现WebSocketSession对象没有实现序列化= p & 0 q n接口,在存储到Redis的时] A 3 # ) O r d %候会出现异常。目前这个问题还在寻找解决思路中F k b . y t –,不知道各位读者6 J ^朋j C 5 N ^友们有什么好的思路。
遇到的问题点用户请求直接访问到了我们的内部服务器,如果在请求的中间加入一台nginx做负载均衡则需要在nginx中配置一些额外信息。
项目的源代码比较多,这里我把核心部分的代码整理了一份,感兴趣的朋友可以到我的gitee上边去下载:
https://gitee.co. J m J om/IdeaHome_admin/socket-framework
点赞 0