package com.hmsoft.common.socket; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; import org.springframework.retry.RecoveryCallback; import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; import org.springframework.retry.backoff.FixedBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryTemplate; import com.hmsoft.remote.rmi.common.utils.NetUtils; import com.hmsoft.remote.rmi.exception.RemotingException; import com.hmsoft.remote.rmi.future.IFuture; import com.hmsoft.remote.rmi.future.TransFuture; import com.hmsoft.remote.rmi.handler.ExchangeHandler; import com.hmsoft.remote.rmi.model.Message; import com.hmsoft.remote.rmi.model.Request; import com.hmsoft.remote.rmi.rpc.NettyServer; import com.hmsoft.remote.rmi.transport.NettyChannel; import com.hmsoft.remote.rmi.transport.NettyTransporter; /** * * ExchangeClient * */ public class ExchangeSever { private NettyServer server; //TODO... private static final int TIME_OUT = 5000; public static String MGR_REMOTE_ADDR = null; public ExchangeSever(String url, ExchangeHandler handler) { try { this.server = NettyTransporter.getInstance().bind(url, handler); } catch (RemotingException e) { throw new IllegalArgumentException("handler == null"); } } /** * 发送到指定客户端 * @param message * @param sent * @throws RemotingException */ /*public Object send(Message message) throws RemotingException { NettyChannel channel = getSpecifyChannel(message.getShoolRemoteAddr()); final Request request = new Request(); request.setVersion("1.0"); request.setTwoWay(true); request.setData(message); //TransFuture future = new TransFuture(channel, request, TIME_OUT); Set excuteSet = new HashSet(); final IExecuteLogic logic = new RequestLogic(request, channel); Object obj = ExecuteRetryUtil.execute(4, 5000, logic, excuteSet); return obj; }*/ /** * 发送到指定客户端 * @param message * @param sent * @throws RemotingException */ public TransFuture send(Message message) throws RemotingException { NettyChannel channel = this.getSpecifyChannel(message.getShoolRemoteAddr()); final Request request = new Request(); request.setVersion("1.0"); request.setTwoWay(true); request.setData(message); TransFuture future = new TransFuture(channel, request, TIME_OUT); try { channel.send(request, false); } catch (RemotingException e) { future.cancel(); throw e; } return future; } /** * 发送到管理端 * @param message * @param sent * @throws RemotingException */ /*public Object sendToMgr(Message message) throws RemotingException { NettyChannel channel = getSpecifyChannel(MGR_REMOTE_ADDR); final Request request = new Request(); request.setVersion("1.0"); request.setTwoWay(true); request.setData(message); Set excuteSet = new HashSet(); final IExecuteLogic logic = new RequestLogic(request, channel); Object obj = ExecuteRetryUtil.execute(4, 5000, logic, excuteSet); return obj; }*/ public Object sendToMgr(Message message) throws RemotingException { NettyChannel channel = this.getSpecifyChannel(MGR_REMOTE_ADDR); final Request request = new Request(); request.setVersion("1.0"); request.setTwoWay(true); request.setData(message); TransFuture future = new TransFuture(channel, request, TIME_OUT); try { channel.send(request, false); } catch (RemotingException e) { future.cancel(); throw e; } return future.get(); } /** * 发送到所有学校 * @param request * @param timeout * @return * @throws RemotingException */ /*public int sendToAllSch(Message message) throws RemotingException { // create request. int errCode = 0; Set set = this.server.getChannelsMap().keySet(); Set excuteSet = new HashSet(); Set failSet = new HashSet(); for (String schAddr : set) { if (!schAddr.equals(MGR_REMOTE_ADDR)) { final Request req = new Request(); req.setVersion("1.0"); req.setTwoWay(true); req.setData(message); final NettyChannel chnnl = this.getSpecifyChannel(schAddr); excuteSet.add(schAddr); final IExecuteLogic logic = new RequestLogic(req, chnnl); excuteSet.add(schAddr); Object obj = ExecuteRetryUtil.execute(4, 5000, logic, excuteSet); failSet.add(excuteSet.iterator().next()); } } if (failSet.size() > 0) { errCode = -1; } return errCode; }*/ /** * 发送到所有学校 * @param request * @param timeout * @return * @throws RemotingException */ public int sendToAllSch(Message message) throws RemotingException { // create request. Map failMap = new HashMap(); List futureList = new ArrayList(); Set set = this.server.getChannelsMap().keySet(); for (String schAddr : set) { if (!schAddr.equals(MGR_REMOTE_ADDR)) { final Request req = new Request(); req.setVersion("1.0"); req.setTwoWay(true); req.setData(message); final NettyChannel chnnl = this.server.getChannelsMap().get(schAddr); IFuture future = new TransFuture(chnnl, req, TIME_OUT); if (chnnl.isConnected()) { chnnl.send(req, false); futureList.add(future); failMap.put(req.getId(), chnnl); } else { this.server.getChannelsMap().remove(NetUtils.toAddressString(chnnl.getRemoteAddress())); failMap.remove(NetUtils.toAddressString(chnnl.getRemoteAddress())); } } } int errCode = 0; for (IFuture fut : futureList) { try { fut.get(); } catch(RemotingException e) { //失败重发 errCode = -1; NettyChannel ch = failMap.get(fut.getId()); if (ch.isConnected()) { ch.send(fut.getRequest(), false); } } } return errCode; } private NettyChannel getSpecifyChannel(String specifyAddr) { NettyChannel specifyChannel = this.server.getChannelsMap().get(specifyAddr); if (specifyChannel.isConnected()) { return specifyChannel; } else { this.server.getChannelsMap().remove(NetUtils.toAddressString(specifyChannel.getRemoteAddress())); } return null; } public static void main(String[] args) { final RetryTemplate retryTemplate = new RetryTemplate(); SimpleRetryPolicy policy = new SimpleRetryPolicy(5, Collections., Boolean> singletonMap(Exception.class, true)); FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(1000); retryTemplate.setRetryPolicy(policy); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); //重试业务操作 final RetryCallback retryCallback = new RetryCallback() { @Override public Object doWithRetry(RetryContext context) throws Exception { //业务逻辑 boolean result = pushCouponByVpmsaa(); if (!result) { throw new RuntimeException(); } System.err.println("重试后执行成功." + result); return true; } }; //执行指定次数后任然失败的回调 final RecoveryCallback recoveryCallback = new RecoveryCallback() { @Override public Object recover(RetryContext context) throws Exception { //System.err.println("执行3次重试后还是报错,回调."); return Integer.MAX_VALUE; } }; try { retryTemplate.execute(retryCallback, recoveryCallback); } catch(Exception e) { e.printStackTrace(); } } public static Boolean pushCouponByVpmsaa() { Random random = new Random(); int a = random.nextInt(11); System.out.println("a是--->" + a); if (a == 8) { return true; } else { return false; } } }