Sfoglia il codice sorgente

merge from ..components/examcloud-ws-starter

deason 4 anni fa
parent
commit
8fb63fe502

+ 5 - 1
pom.xml

@@ -15,9 +15,13 @@
     <dependencies>
         <dependency>
             <groupId>cn.com.qmth.examcloud</groupId>
-            <artifactId>examcloud-ws-starter</artifactId>
+            <artifactId>examcloud-web</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
         <dependency>
             <groupId>cn.com.qmth.examcloud.rpc</groupId>
             <artifactId>examcloud-ws-api</artifactId>

+ 73 - 0
src/main/java/cn/com/qmth/examcloud/ws/starter/api/controller/WebSocketController.java

@@ -0,0 +1,73 @@
+package cn.com.qmth.examcloud.ws.starter.api.controller;
+
+import javax.websocket.Session;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.ThreadContext;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+
+import cn.com.qmth.examcloud.api.commons.security.bean.User;
+import cn.com.qmth.examcloud.commons.exception.StatusException;
+import cn.com.qmth.examcloud.commons.util.JsonUtil;
+import cn.com.qmth.examcloud.commons.util.ThreadLocalUtil;
+import cn.com.qmth.examcloud.web.support.ControllerSupport;
+import cn.com.qmth.examcloud.web.support.WithoutStackTrace;
+import cn.com.qmth.examcloud.ws.starter.core.MessageOut;
+import cn.com.qmth.examcloud.ws.starter.core.SessionInfo;
+import cn.com.qmth.examcloud.ws.starter.core.WebSocketHelper;
+
+@RestController
+@RequestMapping("api/ctr/ws")
+public class WebSocketController extends ControllerSupport {
+
+	@WithoutStackTrace
+	@PostMapping("{path}")
+	public String post(@PathVariable String path, @RequestBody String message) {
+
+		User accessUser = getAccessUser();
+
+		Session session = WebSocketHelper.getSession(path, accessUser.getKey());
+
+		if (null == session) {
+			throw new StatusException("100001", "no ws session about path [" + path + "]");
+		}
+
+		SessionInfo sessionInfo = WebSocketHelper.getSessionInfo(session);
+
+		if (null == sessionInfo) {
+			throw new StatusException("100002", "no ws session info");
+		}
+
+		MessageOut out = new MessageOut(path, sessionInfo.getSessionId());
+
+		JsonElement jsonEl = null;
+		if (StringUtils.isNotBlank(message)) {
+			try {
+				jsonEl = JsonParser.parseString(message);
+			} catch (JsonSyntaxException e) {
+				throw new StatusException("100003", "message is not a json string");
+			}
+		}
+
+		out.setContent(jsonEl);
+
+		try {
+			ThreadContext.put("TRACE_ID", sessionInfo.getSessionId());
+			WebSocketHelper.sendText(session, path, out);
+		} finally {
+			ThreadContext.put("TRACE_ID", ThreadLocalUtil.getTraceId());
+		}
+
+		String json = JsonUtil.toJson(out);
+		return json;
+	}
+
+}

+ 22 - 0
src/main/java/cn/com/qmth/examcloud/ws/starter/core/Message.java

@@ -0,0 +1,22 @@
+package cn.com.qmth.examcloud.ws.starter.core;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * 接口ID 注解
+ *
+ * @author WANGWEI
+ * @date 2019年3月15日
+ * @Copyright (c) 2018-? http://qmth.com.cn All Rights Reserved.
+ */
+@Target({ElementType.PARAMETER})
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface Message {
+
+	String value() default "";
+}

+ 22 - 0
src/main/java/cn/com/qmth/examcloud/ws/starter/core/MessageHandler.java

@@ -0,0 +1,22 @@
+package cn.com.qmth.examcloud.ws.starter.core;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * 接口ID 注解
+ *
+ * @author WANGWEI
+ * @date 2019年3月15日
+ * @Copyright (c) 2018-? http://qmth.com.cn All Rights Reserved.
+ */
+@Target({ElementType.METHOD, ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface MessageHandler {
+
+	String value() default "";
+}

+ 97 - 0
src/main/java/cn/com/qmth/examcloud/ws/starter/core/MessageHandlerHolder.java

@@ -0,0 +1,97 @@
+package cn.com.qmth.examcloud.ws.starter.core;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.annotation.AnnotationUtils;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import com.google.common.collect.Maps;
+
+import cn.com.qmth.examcloud.web.support.SpringContextHolder;
+
+/**
+ * 消息处理器 管理器
+ *
+ * @author WANGWEI
+ * @date 2019年11月22日
+ * @Copyright (c) 2018-? http://qmth.com.cn All Rights Reserved.
+ */
+@Component
+@Order(100)
+public class MessageHandlerHolder implements ApplicationRunner {
+
+	private static Map<String, Method> methodMap = Maps.newConcurrentMap();
+
+	private static Map<String, Object> beanMap = Maps.newConcurrentMap();
+
+	public static Method getMethod(String path) {
+		return methodMap.get(path);
+	}
+
+	public static Object getBean(String path) {
+		return beanMap.get(path);
+	}
+
+	@Override
+	public void run(ApplicationArguments args) throws Exception {
+
+		Map<String, Object> map = SpringContextHolder.getApplicationContext()
+				.getBeansWithAnnotation(MessageHandler.class);
+
+		for (Object bean : map.values()) {
+			MessageHandler messageHandlerOfClass = AnnotationUtils.findAnnotation(bean.getClass(),
+					MessageHandler.class);
+			String valueOfClass = messageHandlerOfClass.value();
+
+			Method[] methods = bean.getClass().getDeclaredMethods();
+
+			for (Method method : methods) {
+				MessageHandler messageHandlerOfMethod = method.getAnnotation(MessageHandler.class);
+				if (null == messageHandlerOfMethod) {
+					continue;
+				}
+				String valueOfMethod = messageHandlerOfMethod.value();
+
+				String path = join(valueOfClass, valueOfMethod);
+				methodMap.put(path, method);
+				beanMap.put(path, bean);
+			}
+
+		}
+
+	}
+
+	private String join(String valueOfClass, String valueOfMethod) {
+		valueOfClass = valueOfClass.trim();
+		valueOfMethod = valueOfMethod.trim();
+
+		if (valueOfClass.startsWith("/")) {
+			valueOfClass = valueOfClass.substring(1);
+		}
+		if (valueOfClass.endsWith("/")) {
+			valueOfClass = valueOfClass.substring(0, valueOfClass.length() - 1);
+		}
+		if (valueOfMethod.startsWith("/")) {
+			valueOfMethod = valueOfMethod.substring(1);
+		}
+		if (valueOfMethod.endsWith("/")) {
+			valueOfMethod = valueOfMethod.substring(0, valueOfMethod.length() - 1);
+		}
+
+		if (StringUtils.isNotEmpty(valueOfClass) && StringUtils.isNotEmpty(valueOfMethod)) {
+			return valueOfClass + "/" + valueOfMethod;
+		} else if (StringUtils.isNotEmpty(valueOfClass)) {
+			return valueOfClass;
+		} else if (StringUtils.isNotEmpty(valueOfMethod)) {
+			return valueOfMethod;
+		} else {
+			return "";
+		}
+	}
+
+}

+ 118 - 0
src/main/java/cn/com/qmth/examcloud/ws/starter/core/MessageOut.java

@@ -0,0 +1,118 @@
+package cn.com.qmth.examcloud.ws.starter.core;
+
+import java.util.Date;
+
+import cn.com.qmth.examcloud.api.commons.exchange.JsonSerializable;
+
+/**
+ * 服务端消息
+ *
+ * @author WANGWEI
+ * @date 2019年11月22日
+ * @Copyright (c) 2018-? http://qmth.com.cn All Rights Reserved.
+ */
+public class MessageOut implements JsonSerializable {
+
+	private static final long serialVersionUID = -2255327250550304236L;
+
+	private Date date;
+
+	private String path;
+
+	private String sessionId;
+
+	private Status status;
+
+	private String eventId;
+
+	private Object content;
+
+	/**
+	 * 构造函数
+	 *
+	 * @param path
+	 */
+	public MessageOut(String path) {
+		super();
+		this.path = path;
+		this.status = new Status();
+	}
+
+	/**
+	 * 构造函数
+	 *
+	 * @param path
+	 * @param sessionId
+	 */
+	public MessageOut(String path, String sessionId) {
+		super();
+		this.path = path;
+		this.sessionId = sessionId;
+		this.status = new Status();
+	}
+
+	/**
+	 * 设置状态
+	 *
+	 * @author WANGWEI
+	 * @param code
+	 * @param desc
+	 */
+	public void setStatus(String code, String desc) {
+		if (null == this.status) {
+			this.status = new Status();
+		}
+
+		this.status.setCode(code);
+		this.status.setDesc(desc);
+	}
+
+	public Date getDate() {
+		return date;
+	}
+
+	public void setDate(Date date) {
+		this.date = date;
+	}
+
+	public String getPath() {
+		return path;
+	}
+
+	public void setPath(String path) {
+		this.path = path;
+	}
+
+	public String getSessionId() {
+		return sessionId;
+	}
+
+	public void setSessionId(String sessionId) {
+		this.sessionId = sessionId;
+	}
+
+	public Status getStatus() {
+		return status;
+	}
+
+	public void setStatus(Status status) {
+		this.status = status;
+	}
+
+	public String getEventId() {
+		return eventId;
+	}
+
+	public void setEventId(String eventId) {
+		this.eventId = eventId;
+	}
+
+	public Object getContent() {
+		return content;
+	}
+
+	public void setContent(Object content) {
+		this.content = content;
+	}
+
+}

+ 44 - 0
src/main/java/cn/com/qmth/examcloud/ws/starter/core/SessionInfo.java

@@ -0,0 +1,44 @@
+package cn.com.qmth.examcloud.ws.starter.core;
+
+import cn.com.qmth.examcloud.api.commons.security.bean.User;
+
+/**
+ * WebSocket session
+ *
+ * @author WANGWEI
+ * @date 2019年11月28日
+ * @Copyright (c) 2018-? http://qmth.com.cn All Rights Reserved.
+ */
+public class SessionInfo {
+
+	private String index;
+
+	private String sessionId;
+
+	private User user;
+
+	public String getIndex() {
+		return index;
+	}
+
+	public void setIndex(String index) {
+		this.index = index;
+	}
+
+	public String getSessionId() {
+		return sessionId;
+	}
+
+	public void setSessionId(String sessionId) {
+		this.sessionId = sessionId;
+	}
+
+	public User getUser() {
+		return user;
+	}
+
+	public void setUser(User user) {
+		this.user = user;
+	}
+
+}

+ 56 - 0
src/main/java/cn/com/qmth/examcloud/ws/starter/core/Status.java

@@ -0,0 +1,56 @@
+package cn.com.qmth.examcloud.ws.starter.core;
+
+import cn.com.qmth.examcloud.api.commons.exchange.JsonSerializable;
+
+/**
+ * 消息状态
+ *
+ * @author WANGWEI
+ * @date 2019年11月28日
+ * @Copyright (c) 2018-? http://qmth.com.cn All Rights Reserved.
+ */
+public class Status implements JsonSerializable {
+
+	private static final long serialVersionUID = 6990729545055549495L;
+
+	private String code = "200";
+
+	private String desc = "OK";
+
+	/**
+	 * 构造函数
+	 *
+	 */
+	public Status() {
+		super();
+	}
+
+	/**
+	 * 构造函数
+	 *
+	 * @param code
+	 * @param desc
+	 */
+	public Status(String code, String desc) {
+		super();
+		this.code = code;
+		this.desc = desc;
+	}
+
+	public String getCode() {
+		return code;
+	}
+
+	public void setCode(String code) {
+		this.code = code;
+	}
+
+	public String getDesc() {
+		return desc;
+	}
+
+	public void setDesc(String desc) {
+		this.desc = desc;
+	}
+
+}

+ 17 - 0
src/main/java/cn/com/qmth/examcloud/ws/starter/core/WebSocketConfig.java

@@ -0,0 +1,17 @@
+package cn.com.qmth.examcloud.ws.starter.core;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+@Configuration
+@EnableWebSocket
+public class WebSocketConfig {
+
+	@Bean
+	public ServerEndpointExporter serverEndpointExporter() {
+		return new ServerEndpointExporter();
+	}
+
+}

+ 167 - 0
src/main/java/cn/com/qmth/examcloud/ws/starter/core/WebSocketHelper.java

@@ -0,0 +1,167 @@
+package cn.com.qmth.examcloud.ws.starter.core;
+
+import java.util.Date;
+import java.util.Map;
+
+import javax.websocket.Session;
+
+import org.apache.commons.io.IOUtils;
+
+import com.google.common.collect.Maps;
+
+import cn.com.qmth.examcloud.api.commons.security.bean.User;
+import cn.com.qmth.examcloud.api.commons.security.bean.UserType;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLog;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLogFactory;
+import cn.com.qmth.examcloud.commons.util.JsonUtil;
+import cn.com.qmth.examcloud.commons.util.StringUtil;
+
+/**
+ * WebSocket helper
+ *
+ * @author WANGWEI
+ * @date 2019年11月22日
+ * @Copyright (c) 2018-? http://qmth.com.cn All Rights Reserved.
+ */
+public class WebSocketHelper {
+
+	private static final ExamCloudLog WS_LOG = ExamCloudLogFactory.getLog("WS_LOGGER");
+
+	private static final Map<String, Session> INDEX_MAP = Maps.newConcurrentMap();
+
+	private static final Map<Session, SessionInfo> SESESION_INFO_MAP = Maps.newConcurrentMap();
+
+	/**
+	 * 重新设置session
+	 *
+	 * @author WANGWEI
+	 * @param path
+	 * @param user
+	 * @param session
+	 * @param sessionId
+	 */
+	public static void setSession(String path, User user, Session session, String sessionId) {
+
+		String index = user.getKey() + ":" + path;
+		Session oldSession = INDEX_MAP.get(index);
+		if (null != oldSession && !oldSession.equals(session)) {
+			SESESION_INFO_MAP.remove(oldSession);
+			IOUtils.closeQuietly(oldSession);
+		}
+
+		INDEX_MAP.put(index, session);
+
+		SessionInfo si = new SessionInfo();
+		si.setIndex(index);
+		si.setUser(user);
+		si.setSessionId(sessionId);
+
+		SESESION_INFO_MAP.put(session, si);
+
+	}
+
+	/**
+	 * 关闭 session
+	 *
+	 * @author WANGWEI
+	 * @param session
+	 */
+	public static void closeSession(Session session) {
+		SessionInfo si = SESESION_INFO_MAP.get(session);
+
+		if (null != si) {
+			INDEX_MAP.remove(si.getIndex());
+		}
+
+		SESESION_INFO_MAP.remove(session);
+
+		IOUtils.closeQuietly(session);
+	}
+
+	/**
+	 * 获取 session
+	 *
+	 * @author WANGWEI
+	 * @param path
+	 * @param key
+	 * @return
+	 */
+	public static Session getSession(String path, String key) {
+		String index = key + ":" + path;
+		Session session = INDEX_MAP.get(index);
+		return session;
+	}
+
+	/**
+	 * 获取 session
+	 *
+	 * @author WANGWEI
+	 * @param path
+	 * @param rootOrgId
+	 * @param userType
+	 * @param userId
+	 * @return
+	 */
+	public static Session getSession(String path, Long rootOrgId, UserType userType, Long userId) {
+
+		String key = StringUtil.join("U_", userType.getCode(), "_", rootOrgId, "_", userId);
+
+		return getSession(path, key);
+	}
+
+	/**
+	 * 获取 SessionInfo
+	 *
+	 * @author WANGWEI
+	 * @param session
+	 * @return
+	 */
+	public static SessionInfo getSessionInfo(Session session) {
+		return SESESION_INFO_MAP.get(session);
+	}
+
+	/**
+	 * 发送消息
+	 *
+	 * @author WANGWEI
+	 * @param session
+	 * @param path
+	 * @param out
+	 */
+	public static void sendText(Session session, String path, MessageOut out) {
+		try {
+			out.setDate(new Date());
+			String message = JsonUtil.toJson(out);
+			if (WS_LOG.isDebugEnabled()) {
+				WS_LOG.debug("[sendText]. path=" + path + "; message=" + message);
+			}
+			session.getBasicRemote().sendText(message);
+		} catch (Exception e) {
+			WS_LOG.error("[sendText-FAIL]. path=" + path, e);
+			IOUtils.closeQuietly(session);
+			return;
+		}
+	}
+
+	/**
+	 * 发送消息
+	 *
+	 * @author WANGWEI
+	 * @param session
+	 * @param path
+	 * @param out
+	 */
+	public static void sendText(Session session, String path, String out) {
+		try {
+			if (WS_LOG.isDebugEnabled()) {
+				WS_LOG.error("[sendText]. path=" + path + "; message=" + out);
+			}
+			session.getBasicRemote().sendText(out);
+		} catch (Exception e) {
+			WS_LOG.error("[sendText-FAIL]. path=" + path, e);
+			IOUtils.closeQuietly(session);
+			return;
+		}
+	}
+
+}

+ 264 - 0
src/main/java/cn/com/qmth/examcloud/ws/starter/core/WebSocketServerEndpoint.java

@@ -0,0 +1,264 @@
+package cn.com.qmth.examcloud.ws.starter.core;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.websocket.OnClose;
+import javax.websocket.OnError;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+import javax.websocket.Session;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.logging.log4j.ThreadContext;
+import org.springframework.stereotype.Component;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+
+import cn.com.qmth.examcloud.api.commons.security.bean.User;
+import cn.com.qmth.examcloud.commons.exception.StatusException;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLog;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLogFactory;
+import cn.com.qmth.examcloud.commons.util.JsonUtil;
+import cn.com.qmth.examcloud.web.redis.RedisClient;
+import cn.com.qmth.examcloud.web.support.SpringContextHolder;
+
+/**
+ * websocket ServerEndpoint
+ *
+ * @author WANGWEI
+ * @date 2019年11月27日
+ * @Copyright (c) 2018-? http://qmth.com.cn All Rights Reserved.
+ */
+@ServerEndpoint("/api/ws/{path}")
+@Component
+public class WebSocketServerEndpoint {
+
+	private static final ExamCloudLog WS_LOG = ExamCloudLogFactory.getLog("WS_LOGGER");
+
+	private static RedisClient redisClient;
+
+	private static AtomicLong counter = new AtomicLong(0L);
+
+	private String sessionId;
+
+	private static RedisClient getRedisClient() {
+		if (null != redisClient) {
+			return redisClient;
+		}
+		redisClient = SpringContextHolder.getBean(RedisClient.class);
+		return redisClient;
+	}
+
+	@OnOpen
+	public void onOpen(Session session, @PathParam("path") String path) {
+		long amount = counter.incrementAndGet();
+
+		this.sessionId = UUID.randomUUID().toString().replace("-", "");
+
+		ThreadContext.put("TRACE_ID", sessionId);
+
+		if (WS_LOG.isDebugEnabled()) {
+			WS_LOG.debug("[onOpen]. path=" + path + "; sessionsAmount=" + amount);
+		}
+
+		MessageOut out = new MessageOut(path, sessionId);
+		out.setEventId("open");
+
+		Method method = MessageHandlerHolder.getMethod(path);
+		if (null == method) {
+			out.setStatus("404", "NOT FOUND");
+			WebSocketHelper.sendText(session, path, out);
+			IOUtils.closeQuietly(session);
+			return;
+		}
+
+		String key = getRequestParameter(session, "key");
+		String token = getRequestParameter(session, "token");
+
+		if (StringUtils.isBlank(key)) {
+			WS_LOG.error("[onOpen-FAIL]. path=" + path + ". key is blank");
+			IOUtils.closeQuietly(session);
+			return;
+		}
+		if (StringUtils.isBlank(token)) {
+			WS_LOG.error("[onOpen-FAIL]. path=" + path + ". token is blank");
+			IOUtils.closeQuietly(session);
+			return;
+		}
+
+		User user = getRedisClient().get(key, User.class);
+
+		if (null == user) {
+			out.setStatus("403", "no login");
+			WebSocketHelper.sendText(session, path, out);
+			IOUtils.closeQuietly(session);
+			return;
+		} else if (!token.equals(user.getToken())) {
+			out.setStatus("403", "token is wrong");
+			WebSocketHelper.sendText(session, path, out);
+			IOUtils.closeQuietly(session);
+			return;
+		}
+
+		ThreadContext.put("CALLER", user.getKey());
+
+		WebSocketHelper.sendText(session, path, out);
+
+		WebSocketHelper.setSession(path, user, session, sessionId);
+	}
+
+	/**
+	 * 获取请求参数
+	 *
+	 * @author WANGWEI
+	 * @param session
+	 * @param key
+	 * @return
+	 */
+	private String getRequestParameter(Session session, String key) {
+		Map<String, List<String>> params = session.getRequestParameterMap();
+		List<String> list = params.get(key);
+
+		if (CollectionUtils.isEmpty(list)) {
+			return null;
+		}
+		String value = StringUtils.join(list, ",");
+		return value;
+	}
+
+	@OnClose
+	public void onClose(Session session, @PathParam("path") String path) {
+
+		long amount = counter.decrementAndGet();
+
+		SessionInfo sessionInfo = WebSocketHelper.getSessionInfo(session);
+		if (null != sessionInfo) {
+			ThreadContext.put("TRACE_ID", sessionInfo.getSessionId());
+			ThreadContext.put("CALLER", sessionInfo.getUser().getKey());
+		} else {
+			ThreadContext.put("TRACE_ID", this.sessionId);
+		}
+
+		WS_LOG.debug("[onClose]. path=" + path + "; sessionsAmount=" + amount);
+		WebSocketHelper.closeSession(session);
+	}
+
+	@OnMessage
+	public void onMessage(Session session, @PathParam("path") String path, String message) {
+
+		SessionInfo sessionInfo = WebSocketHelper.getSessionInfo(session);
+		ThreadContext.put("TRACE_ID", sessionInfo.getSessionId());
+		ThreadContext.put("CALLER", sessionInfo.getUser().getKey());
+
+		MessageOut out = new MessageOut(path, sessionInfo.getSessionId());
+		try {
+			JsonElement jsonEl = JsonParser.parseString(message);
+			message = JsonUtil.toJson(jsonEl);
+		} catch (JsonSyntaxException e) {
+			out.setStatus("500", "message is not a json string");
+			WebSocketHelper.sendText(session, path, out);
+			IOUtils.closeQuietly(session);
+			return;
+		}
+
+		if (WS_LOG.isDebugEnabled()) {
+			WS_LOG.debug("[onMessage]. path=" + path + ". message=" + message);
+		}
+
+		SessionInfo si = WebSocketHelper.getSessionInfo(session);
+
+		User user = getRedisClient().get(si.getUser().getKey(), User.class);
+
+		if (null == user) {
+			out.setStatus("403", "no login");
+			WebSocketHelper.sendText(session, path, out);
+			IOUtils.closeQuietly(session);
+			return;
+		} else if (!si.getUser().getToken().equals(user.getToken())) {
+			out.setStatus("403", "token is wrong");
+			WebSocketHelper.sendText(session, path, out);
+			IOUtils.closeQuietly(session);
+			return;
+		}
+
+		Object result = null;
+		try {
+
+			Method method = MessageHandlerHolder.getMethod(path);
+			Object bean = MessageHandlerHolder.getBean(path);
+
+			Annotation[][] an2 = method.getParameterAnnotations();
+
+			boolean hasMessageParam = false;
+			int messageParamIndex = 0;
+			OUTER : for (int i = 0; i < an2.length; i++) {
+				Annotation[] an1 = an2[i];
+				for (Annotation an : an1) {
+					if (an.annotationType().equals(Message.class)) {
+						hasMessageParam = true;
+						messageParamIndex = i;
+						break OUTER;
+					}
+				}
+			}
+
+			Class<?>[] parameterTypes = method.getParameterTypes();
+			Object[] args = new Object[parameterTypes.length];
+			for (int i = 0; i < parameterTypes.length; i++) {
+				Class<?> curType = parameterTypes[i];
+				if (hasMessageParam && i == messageParamIndex) {
+					try {
+						args[i] = JsonUtil.fromJson(message, curType);
+					} catch (Exception e) {
+						throw new StatusException("500", "fail to parse message to object");
+					}
+					continue;
+				}
+				if (curType.equals(User.class)) {
+					args[i] = user;
+					continue;
+				}
+			}
+
+			result = method.invoke(bean, args);
+
+			out.setContent(result);
+
+		} catch (StatusException e) {
+			WS_LOG.error("[onMessage-FAIL]. path=" + path + "", e);
+			out.setStatus(e.getCode(), e.getDesc());
+		} catch (Exception e) {
+			WS_LOG.error("[onMessage-FAIL]. path=" + path + "", e);
+			out.setStatus("500", "系统异常");
+		}
+
+		WebSocketHelper.sendText(session, path, out);
+	}
+
+	@OnError
+	public void onError(Session session, @PathParam("path") String path, Throwable t) {
+
+		SessionInfo sessionInfo = WebSocketHelper.getSessionInfo(session);
+		if (null != sessionInfo) {
+			ThreadContext.put("TRACE_ID", sessionInfo.getSessionId());
+			ThreadContext.put("CALLER", sessionInfo.getUser().getKey());
+		} else {
+			ThreadContext.put("TRACE_ID", this.sessionId);
+		}
+
+		WS_LOG.error("[onError]. path=" + path, t);
+		WebSocketHelper.closeSession(session);
+	}
+
+}

+ 18 - 0
src/main/java/cn/com/qmth/examcloud/ws/starter/handler/TestHandler.java

@@ -0,0 +1,18 @@
+package cn.com.qmth.examcloud.ws.starter.handler;
+
+import org.springframework.stereotype.Component;
+
+import cn.com.qmth.examcloud.api.commons.security.bean.User;
+import cn.com.qmth.examcloud.ws.starter.core.Message;
+import cn.com.qmth.examcloud.ws.starter.core.MessageHandler;
+
+@Component
+@MessageHandler("test")
+public class TestHandler {
+
+	@MessageHandler
+	public String test(User user, @Message String message) {
+		return "Hello";
+	}
+
+}