feat: add group message dispatcher

This commit is contained in:
merlin
2026-02-27 17:55:40 +08:00
parent d64de39541
commit 31ddd5c4fe
6 changed files with 71 additions and 3 deletions

View File

@@ -9,8 +9,15 @@ public class RabbitMQConfig {
public static final String WS_MESSAGE_QUEUE = "ws.message";
public static final String WS_VIDEO_QUEUE = "ws.video";
@Bean
public Queue wsMessageQueue() {
return new Queue(WS_MESSAGE_QUEUE, true); // 持久化队列
}
@Bean
public Queue wsVideoQueue() {
return new Queue(WS_VIDEO_QUEUE, true);
}
}

View File

@@ -13,8 +13,8 @@ public class Message {
@TableId("m_id")
private Integer m_id;
private Integer sender;
private Integer receiver;
private Integer from;
private Integer to;
private String content;
private LocalDateTime time;
}

View File

@@ -28,5 +28,17 @@ public class WebSocketMessageConsumer {
log.info(e.getMessage());
}
}
@RabbitListener(queues = RabbitMQConfig.WS_VIDEO_QUEUE)
public void onVideoMessage(String json) {
try {
JSONObject msg = JSON.parseObject(json);
commandDispatcher.dispatch(msg);
} catch (Exception e) {
log.info(e.getMessage());
}
}
}

View File

@@ -16,6 +16,7 @@ public class CommandDispatcher {
private final Heartbeat heartbeatCommand;
private final SystemNotify systemNotifyCommand;
private final PersonalNotify personalNotifyCommand;
private final GroupMessage groupMessageCommand;
public void dispatch(JSONObject msg) throws IOException {
String cmd = msg.getString("cmd");
@@ -27,6 +28,7 @@ public class CommandDispatcher {
case "HEARTBEAT" -> heartbeatCommand.handle(msg);
case "SYSTEM_NOTIFY" -> systemNotifyCommand.handle(msg);
case "PERSONAL_NOTIFY" -> personalNotifyCommand.handle(msg);
case "GROUP_MESSAGE" -> groupMessageCommand.handle(msg);
default -> {
System.err.println("Unknown command: " + cmd);

View File

@@ -0,0 +1,47 @@
package xin.merlin.myplayerbackend.utils.websocket.command.impl;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.entity.Groups;
import xin.merlin.myplayerbackend.mapper.GroupsMapper;
import xin.merlin.myplayerbackend.utils.websocket.WebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.command.BaseCommandHandler;
import java.io.IOException;
import java.util.List;
@Component
@RequiredArgsConstructor
public class GroupMessage implements BaseCommandHandler {
private final GroupsMapper groupsMapper;
private final WebSocketSessionManager sessionManager;
@Override
public void handle(JSONObject msg) throws IOException {
Integer userId = msg.getInteger("from");
Integer groupId = msg.getInteger("group");
List<Object> objs = groupsMapper.selectObjs(
Wrappers.<Groups>lambdaQuery()
.select(Groups::getId)
.eq(Groups::getG_id, groupId)
.ne(Groups::getId, userId)
);
List<Integer> ids = objs.stream()
.map(o -> (Integer) o)
.toList();
for (Integer id : ids) {
JSONObject copy = new JSONObject(msg);
copy.put("to", id);
sessionManager.sendToUser(id, copy.toJSONString());
}
}
}

View File

@@ -18,7 +18,7 @@ public class Message implements BaseCommandHandler {
public void handle(JSONObject msg) throws IOException {
/*
TODO: 这里需要处理如果目标用户不在线,如何去完成消息的存储
*/
*/
String to = msg.getString("to");
sessionManager.sendToUser(Integer.valueOf(to), msg.toJSONString());
}