ExchangeSever.java 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package com.hmsoft.common.socket;
  2. import java.util.ArrayList;
  3. import java.util.Collections;
  4. import java.util.HashMap;
  5. import java.util.List;
  6. import java.util.Map;
  7. import java.util.Random;
  8. import java.util.Set;
  9. import org.springframework.retry.RecoveryCallback;
  10. import org.springframework.retry.RetryCallback;
  11. import org.springframework.retry.RetryContext;
  12. import org.springframework.retry.backoff.FixedBackOffPolicy;
  13. import org.springframework.retry.policy.SimpleRetryPolicy;
  14. import org.springframework.retry.support.RetryTemplate;
  15. import com.hmsoft.remote.rmi.common.utils.NetUtils;
  16. import com.hmsoft.remote.rmi.exception.RemotingException;
  17. import com.hmsoft.remote.rmi.future.IFuture;
  18. import com.hmsoft.remote.rmi.future.TransFuture;
  19. import com.hmsoft.remote.rmi.handler.ExchangeHandler;
  20. import com.hmsoft.remote.rmi.model.Message;
  21. import com.hmsoft.remote.rmi.model.Request;
  22. import com.hmsoft.remote.rmi.rpc.NettyServer;
  23. import com.hmsoft.remote.rmi.transport.NettyChannel;
  24. import com.hmsoft.remote.rmi.transport.NettyTransporter;
  25. /**
  26. *
  27. * ExchangeClient
  28. *
  29. */
  30. public class ExchangeSever {
  31. private NettyServer server;
  32. //TODO...
  33. private static final int TIME_OUT = 5000;
  34. public static String MGR_REMOTE_ADDR = null;
  35. public ExchangeSever(String url, ExchangeHandler handler) {
  36. try {
  37. this.server = NettyTransporter.getInstance().bind(url, handler);
  38. } catch (RemotingException e) {
  39. throw new IllegalArgumentException("handler == null");
  40. }
  41. }
  42. /**
  43. * 发送到指定客户端
  44. * @param message
  45. * @param sent
  46. * @throws RemotingException
  47. */
  48. /*public Object send(Message message) throws RemotingException {
  49. NettyChannel channel = getSpecifyChannel(message.getShoolRemoteAddr());
  50. final Request request = new Request();
  51. request.setVersion("1.0");
  52. request.setTwoWay(true);
  53. request.setData(message);
  54. //TransFuture future = new TransFuture(channel, request, TIME_OUT);
  55. Set<String> excuteSet = new HashSet<String>();
  56. final IExecuteLogic logic = new RequestLogic(request, channel);
  57. Object obj = ExecuteRetryUtil.execute(4, 5000, logic, excuteSet);
  58. return obj;
  59. }*/
  60. /**
  61. * 发送到指定客户端
  62. * @param message
  63. * @param sent
  64. * @throws RemotingException
  65. */
  66. public TransFuture send(Message message) throws RemotingException {
  67. NettyChannel channel = this.getSpecifyChannel(message.getShoolRemoteAddr());
  68. final Request request = new Request();
  69. request.setVersion("1.0");
  70. request.setTwoWay(true);
  71. request.setData(message);
  72. TransFuture future = new TransFuture(channel, request, TIME_OUT);
  73. try {
  74. channel.send(request, false);
  75. } catch (RemotingException e) {
  76. future.cancel();
  77. throw e;
  78. }
  79. return future;
  80. }
  81. /**
  82. * 发送到管理端
  83. * @param message
  84. * @param sent
  85. * @throws RemotingException
  86. */
  87. /*public Object sendToMgr(Message message) throws RemotingException {
  88. NettyChannel channel = getSpecifyChannel(MGR_REMOTE_ADDR);
  89. final Request request = new Request();
  90. request.setVersion("1.0");
  91. request.setTwoWay(true);
  92. request.setData(message);
  93. Set<String> excuteSet = new HashSet<String>();
  94. final IExecuteLogic logic = new RequestLogic(request, channel);
  95. Object obj = ExecuteRetryUtil.execute(4, 5000, logic, excuteSet);
  96. return obj;
  97. }*/
  98. public Object sendToMgr(Message message) throws RemotingException {
  99. NettyChannel channel = this.getSpecifyChannel(MGR_REMOTE_ADDR);
  100. final Request request = new Request();
  101. request.setVersion("1.0");
  102. request.setTwoWay(true);
  103. request.setData(message);
  104. TransFuture future = new TransFuture(channel, request, TIME_OUT);
  105. try {
  106. channel.send(request, false);
  107. } catch (RemotingException e) {
  108. future.cancel();
  109. throw e;
  110. }
  111. return future.get();
  112. }
  113. /**
  114. * 发送到所有学校
  115. * @param request
  116. * @param timeout
  117. * @return
  118. * @throws RemotingException
  119. */
  120. /*public int sendToAllSch(Message message) throws RemotingException {
  121. // create request.
  122. int errCode = 0;
  123. Set<String> set = this.server.getChannelsMap().keySet();
  124. Set<String> excuteSet = new HashSet<String>();
  125. Set<String> failSet = new HashSet<String>();
  126. for (String schAddr : set) {
  127. if (!schAddr.equals(MGR_REMOTE_ADDR)) {
  128. final Request req = new Request();
  129. req.setVersion("1.0");
  130. req.setTwoWay(true);
  131. req.setData(message);
  132. final NettyChannel chnnl = this.getSpecifyChannel(schAddr);
  133. excuteSet.add(schAddr);
  134. final IExecuteLogic logic = new RequestLogic(req, chnnl);
  135. excuteSet.add(schAddr);
  136. Object obj = ExecuteRetryUtil.execute(4, 5000, logic, excuteSet);
  137. failSet.add(excuteSet.iterator().next());
  138. }
  139. }
  140. if (failSet.size() > 0) {
  141. errCode = -1;
  142. }
  143. return errCode;
  144. }*/
  145. /**
  146. * 发送到所有学校
  147. * @param request
  148. * @param timeout
  149. * @return
  150. * @throws RemotingException
  151. */
  152. public int sendToAllSch(Message message) throws RemotingException {
  153. // create request.
  154. Map<Long, NettyChannel> failMap = new HashMap<Long, NettyChannel>();
  155. List<IFuture> futureList = new ArrayList<IFuture>();
  156. Set<String> set = this.server.getChannelsMap().keySet();
  157. for (String schAddr : set) {
  158. if (!schAddr.equals(MGR_REMOTE_ADDR)) {
  159. final Request req = new Request();
  160. req.setVersion("1.0");
  161. req.setTwoWay(true);
  162. req.setData(message);
  163. final NettyChannel chnnl = this.server.getChannelsMap().get(schAddr);
  164. IFuture future = new TransFuture(chnnl, req, TIME_OUT);
  165. if (chnnl.isConnected()) {
  166. chnnl.send(req, false);
  167. futureList.add(future);
  168. failMap.put(req.getId(), chnnl);
  169. } else {
  170. this.server.getChannelsMap().remove(NetUtils.toAddressString(chnnl.getRemoteAddress()));
  171. failMap.remove(NetUtils.toAddressString(chnnl.getRemoteAddress()));
  172. }
  173. }
  174. }
  175. int errCode = 0;
  176. for (IFuture fut : futureList) {
  177. try {
  178. fut.get();
  179. } catch(RemotingException e) {
  180. //失败重发
  181. errCode = -1;
  182. NettyChannel ch = failMap.get(fut.getId());
  183. if (ch.isConnected()) {
  184. ch.send(fut.getRequest(), false);
  185. }
  186. }
  187. }
  188. return errCode;
  189. }
  190. private NettyChannel getSpecifyChannel(String specifyAddr) {
  191. NettyChannel specifyChannel = this.server.getChannelsMap().get(specifyAddr);
  192. if (specifyChannel.isConnected()) {
  193. return specifyChannel;
  194. } else {
  195. this.server.getChannelsMap().remove(NetUtils.toAddressString(specifyChannel.getRemoteAddress()));
  196. }
  197. return null;
  198. }
  199. public static void main(String[] args) {
  200. final RetryTemplate retryTemplate = new RetryTemplate();
  201. SimpleRetryPolicy policy = new SimpleRetryPolicy(5, Collections.<Class<? extends Throwable>, Boolean> singletonMap(Exception.class, true));
  202. FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
  203. fixedBackOffPolicy.setBackOffPeriod(1000);
  204. retryTemplate.setRetryPolicy(policy);
  205. retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
  206. //重试业务操作
  207. final RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() {
  208. @Override
  209. public Object doWithRetry(RetryContext context) throws Exception {
  210. //业务逻辑
  211. boolean result = pushCouponByVpmsaa();
  212. if (!result) {
  213. throw new RuntimeException();
  214. }
  215. System.err.println("重试后执行成功." + result);
  216. return true;
  217. }
  218. };
  219. //执行指定次数后任然失败的回调
  220. final RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {
  221. @Override
  222. public Object recover(RetryContext context) throws Exception {
  223. //System.err.println("执行3次重试后还是报错,回调.");
  224. return Integer.MAX_VALUE;
  225. }
  226. };
  227. try {
  228. retryTemplate.execute(retryCallback, recoveryCallback);
  229. } catch(Exception e) {
  230. e.printStackTrace();
  231. }
  232. }
  233. public static Boolean pushCouponByVpmsaa() {
  234. Random random = new Random();
  235. int a = random.nextInt(11);
  236. System.out.println("a是--->" + a);
  237. if (a == 8) {
  238. return true;
  239. } else {
  240. return false;
  241. }
  242. }
  243. }