From 31ddd5c4fe4eba71aaa64ac7f1c0912e543baaeb Mon Sep 17 00:00:00 2001 From: merlin Date: Fri, 27 Feb 2026 17:55:40 +0800 Subject: [PATCH] feat: add group message dispatcher --- .../config/RabbitMQConfig.java | 7 +++ .../myplayerbackend/entity/Message.java | 4 +- .../websocket/WebSocketMessageConsumer.java | 12 +++++ .../websocket/command/CommandDispatcher.java | 2 + .../websocket/command/impl/GroupMessage.java | 47 +++++++++++++++++++ .../utils/websocket/command/impl/Message.java | 2 +- 6 files changed, 71 insertions(+), 3 deletions(-) create mode 100644 src/main/java/xin/merlin/myplayerbackend/utils/websocket/command/impl/GroupMessage.java diff --git a/src/main/java/xin/merlin/myplayerbackend/config/RabbitMQConfig.java b/src/main/java/xin/merlin/myplayerbackend/config/RabbitMQConfig.java index 9df43ba..a7b2b72 100644 --- a/src/main/java/xin/merlin/myplayerbackend/config/RabbitMQConfig.java +++ b/src/main/java/xin/merlin/myplayerbackend/config/RabbitMQConfig.java @@ -9,8 +9,15 @@ public class RabbitMQConfig { public static final String WS_MESSAGE_QUEUE = "ws.message"; + public static final String WS_VIDEO_QUEUE = "ws.video"; + @Bean public Queue wsMessageQueue() { return new Queue(WS_MESSAGE_QUEUE, true); // 持久化队列 } + + @Bean + public Queue wsVideoQueue() { + return new Queue(WS_VIDEO_QUEUE, true); + } } diff --git a/src/main/java/xin/merlin/myplayerbackend/entity/Message.java b/src/main/java/xin/merlin/myplayerbackend/entity/Message.java index 1550049..49767c8 100644 --- a/src/main/java/xin/merlin/myplayerbackend/entity/Message.java +++ b/src/main/java/xin/merlin/myplayerbackend/entity/Message.java @@ -13,8 +13,8 @@ public class Message { @TableId("m_id") private Integer m_id; - private Integer sender; - private Integer receiver; + private Integer from; + private Integer to; private String content; private LocalDateTime time; } diff --git a/src/main/java/xin/merlin/myplayerbackend/utils/websocket/WebSocketMessageConsumer.java b/src/main/java/xin/merlin/myplayerbackend/utils/websocket/WebSocketMessageConsumer.java index 2f1f4c7..8f49df3 100644 --- a/src/main/java/xin/merlin/myplayerbackend/utils/websocket/WebSocketMessageConsumer.java +++ b/src/main/java/xin/merlin/myplayerbackend/utils/websocket/WebSocketMessageConsumer.java @@ -28,5 +28,17 @@ public class WebSocketMessageConsumer { log.info(e.getMessage()); } } + + @RabbitListener(queues = RabbitMQConfig.WS_VIDEO_QUEUE) + public void onVideoMessage(String json) { + try { + JSONObject msg = JSON.parseObject(json); + commandDispatcher.dispatch(msg); + + } catch (Exception e) { + log.info(e.getMessage()); + } + + } } diff --git a/src/main/java/xin/merlin/myplayerbackend/utils/websocket/command/CommandDispatcher.java b/src/main/java/xin/merlin/myplayerbackend/utils/websocket/command/CommandDispatcher.java index 8292c0c..cf2a2a9 100644 --- a/src/main/java/xin/merlin/myplayerbackend/utils/websocket/command/CommandDispatcher.java +++ b/src/main/java/xin/merlin/myplayerbackend/utils/websocket/command/CommandDispatcher.java @@ -16,6 +16,7 @@ public class CommandDispatcher { private final Heartbeat heartbeatCommand; private final SystemNotify systemNotifyCommand; private final PersonalNotify personalNotifyCommand; + private final GroupMessage groupMessageCommand; public void dispatch(JSONObject msg) throws IOException { String cmd = msg.getString("cmd"); @@ -27,6 +28,7 @@ public class CommandDispatcher { case "HEARTBEAT" -> heartbeatCommand.handle(msg); case "SYSTEM_NOTIFY" -> systemNotifyCommand.handle(msg); case "PERSONAL_NOTIFY" -> personalNotifyCommand.handle(msg); + case "GROUP_MESSAGE" -> groupMessageCommand.handle(msg); default -> { System.err.println("Unknown command: " + cmd); diff --git a/src/main/java/xin/merlin/myplayerbackend/utils/websocket/command/impl/GroupMessage.java b/src/main/java/xin/merlin/myplayerbackend/utils/websocket/command/impl/GroupMessage.java new file mode 100644 index 0000000..9f1f0a1 --- /dev/null +++ b/src/main/java/xin/merlin/myplayerbackend/utils/websocket/command/impl/GroupMessage.java @@ -0,0 +1,47 @@ +package xin.merlin.myplayerbackend.utils.websocket.command.impl; + +import com.alibaba.fastjson2.JSONObject; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; +import xin.merlin.myplayerbackend.entity.Groups; +import xin.merlin.myplayerbackend.mapper.GroupsMapper; +import xin.merlin.myplayerbackend.utils.websocket.WebSocketSessionManager; +import xin.merlin.myplayerbackend.utils.websocket.command.BaseCommandHandler; + +import java.io.IOException; +import java.util.List; + +@Component +@RequiredArgsConstructor +public class GroupMessage implements BaseCommandHandler { + + private final GroupsMapper groupsMapper; + + private final WebSocketSessionManager sessionManager; + + @Override + public void handle(JSONObject msg) throws IOException { + + Integer userId = msg.getInteger("from"); + Integer groupId = msg.getInteger("group"); + + List objs = groupsMapper.selectObjs( + Wrappers.lambdaQuery() + .select(Groups::getId) + .eq(Groups::getG_id, groupId) + .ne(Groups::getId, userId) + ); + + List ids = objs.stream() + .map(o -> (Integer) o) + .toList(); + + for (Integer id : ids) { + JSONObject copy = new JSONObject(msg); + copy.put("to", id); + sessionManager.sendToUser(id, copy.toJSONString()); + } + } + +} diff --git a/src/main/java/xin/merlin/myplayerbackend/utils/websocket/command/impl/Message.java b/src/main/java/xin/merlin/myplayerbackend/utils/websocket/command/impl/Message.java index 8787571..ddadaa8 100644 --- a/src/main/java/xin/merlin/myplayerbackend/utils/websocket/command/impl/Message.java +++ b/src/main/java/xin/merlin/myplayerbackend/utils/websocket/command/impl/Message.java @@ -18,7 +18,7 @@ public class Message implements BaseCommandHandler { public void handle(JSONObject msg) throws IOException { /* TODO: 这里需要处理如果目标用户不在线,如何去完成消息的存储 - */ +*/ String to = msg.getString("to"); sessionManager.sendToUser(Integer.valueOf(to), msg.toJSONString()); }