diff --git a/pom.xml b/pom.xml index 4a05f8a..50166df 100644 --- a/pom.xml +++ b/pom.xml @@ -58,11 +58,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka-test - test - org.springframework.security spring-security-test @@ -107,6 +102,12 @@ springdoc-openapi-starter-webmvc-ui 2.8.14 + + com.alibaba + fastjson + 2.0.45 + + diff --git a/src/main/java/xin/merlin/myplayerbackend/config/WebsocketConfig.java b/src/main/java/xin/merlin/myplayerbackend/config/WebsocketConfig.java new file mode 100644 index 0000000..f376f90 --- /dev/null +++ b/src/main/java/xin/merlin/myplayerbackend/config/WebsocketConfig.java @@ -0,0 +1,26 @@ +package xin.merlin.myplayerbackend.config; + + +import lombok.RequiredArgsConstructor; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; +import xin.merlin.myplayerbackend.config.security.WebsocketInterceptor; +import xin.merlin.myplayerbackend.utils.websocket.CustomWebSocketHandler; + +@Configuration +@EnableWebSocket +@RequiredArgsConstructor +public class WebsocketConfig implements WebSocketConfigurer { + + private final CustomWebSocketHandler handler; + + private final WebsocketInterceptor interceptor; + + + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + registry.addHandler(handler, "/online").addInterceptors(interceptor); + } +} diff --git a/src/main/java/xin/merlin/myplayerbackend/config/security/WebsocketInterceptor.java b/src/main/java/xin/merlin/myplayerbackend/config/security/WebsocketInterceptor.java new file mode 100644 index 0000000..6bd9d5d --- /dev/null +++ b/src/main/java/xin/merlin/myplayerbackend/config/security/WebsocketInterceptor.java @@ -0,0 +1,47 @@ +package xin.merlin.myplayerbackend.config.security; + + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.server.HandshakeInterceptor; +import xin.merlin.myplayerbackend.utils.JwtUtil; + +import java.util.Map; + +@Slf4j +@Component +@RequiredArgsConstructor +public class WebsocketInterceptor implements HandshakeInterceptor { + + private final JwtUtil jwtUtil; + + @Override + public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception { + try { + // 获取 token + String token = request.getHeaders().getFirst("Authorization"); + if (token != null && token.startsWith("Bearer ")) { + token = token.substring(7); + } else { + return false; + } + + Integer id = jwtUtil.getId(token); + attributes.put("userId", id); + return true; + + }catch (Exception e){ + log.error(e.getMessage()); + return false; + } + } + + @Override + public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { + + } +} diff --git a/src/main/java/xin/merlin/myplayerbackend/utils/websocket/CustomWebSocketHandler.java b/src/main/java/xin/merlin/myplayerbackend/utils/websocket/CustomWebSocketHandler.java new file mode 100644 index 0000000..4ece913 --- /dev/null +++ b/src/main/java/xin/merlin/myplayerbackend/utils/websocket/CustomWebSocketHandler.java @@ -0,0 +1,56 @@ +package xin.merlin.myplayerbackend.utils.websocket; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + + +@Slf4j +@Component +@RequiredArgsConstructor +public class CustomWebSocketHandler extends TextWebSocketHandler { + + private final WebSocketSessionManager webSocketSessionManager; + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + Integer userId = (Integer) session.getAttributes().get("userId"); + webSocketSessionManager.addSession(userId, session); + log.info("用户 {} 已连接", userId); + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + String payload = message.getPayload(); + JSONObject msg = JSON.parseObject(payload); + String type = msg.getString("type"); + switch (type) { + case "chat": + Integer toUser = msg.getInteger("toUser"); + webSocketSessionManager.sendToUser(toUser, payload); + break; + + case "broadcast": + webSocketSessionManager.broadcast(payload); + break; + + case "signal": + webSocketSessionManager.sendToUser(msg.getInteger("toUser"), payload); + break; + + } + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + Integer userId = (Integer) session.getAttributes().get("userId"); + webSocketSessionManager.removeSession(userId); + log.info("用户 {} 已断开", userId); + } +} diff --git a/src/main/java/xin/merlin/myplayerbackend/utils/websocket/WebSocketSessionManager.java b/src/main/java/xin/merlin/myplayerbackend/utils/websocket/WebSocketSessionManager.java new file mode 100644 index 0000000..aca924b --- /dev/null +++ b/src/main/java/xin/merlin/myplayerbackend/utils/websocket/WebSocketSessionManager.java @@ -0,0 +1,41 @@ +package xin.merlin.myplayerbackend.utils.websocket; + + +import org.springframework.stereotype.Component; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Component +public class WebSocketSessionManager { + + private static final Map websocketSessions = new ConcurrentHashMap<>(); + + public void addSession(Integer id, WebSocketSession session) {websocketSessions.put(id, session);} + + public WebSocketSession getSession(Integer id) {return websocketSessions.get(id);} + + public void removeSession(Integer id) {websocketSessions.remove(id);} + + public Map getSessions() {return websocketSessions;} + + public void sendToUser(Integer userId, String message) throws IOException { + WebSocketSession session = websocketSessions.get(userId); + if (session != null && session.isOpen()) { + session.sendMessage(new TextMessage(message)); + } + } + + public void broadcast(String message) throws IOException { + for (WebSocketSession session : websocketSessions.values()) { + if (session.isOpen()) { + session.sendMessage(new TextMessage(message)); + } + } + } + + +}