123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- package com.hmsoft.common.socket;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Random;
- import java.util.Set;
- import org.springframework.retry.RecoveryCallback;
- import org.springframework.retry.RetryCallback;
- import org.springframework.retry.RetryContext;
- import org.springframework.retry.backoff.FixedBackOffPolicy;
- import org.springframework.retry.policy.SimpleRetryPolicy;
- import org.springframework.retry.support.RetryTemplate;
- import com.hmsoft.remote.rmi.common.utils.NetUtils;
- import com.hmsoft.remote.rmi.exception.RemotingException;
- import com.hmsoft.remote.rmi.future.IFuture;
- import com.hmsoft.remote.rmi.future.TransFuture;
- import com.hmsoft.remote.rmi.handler.ExchangeHandler;
- import com.hmsoft.remote.rmi.model.Message;
- import com.hmsoft.remote.rmi.model.Request;
- import com.hmsoft.remote.rmi.rpc.NettyServer;
- import com.hmsoft.remote.rmi.transport.NettyChannel;
- import com.hmsoft.remote.rmi.transport.NettyTransporter;
- /**
- *
- * ExchangeClient
- *
- */
- public class ExchangeSever {
- private NettyServer server;
- //TODO...
- private static final int TIME_OUT = 5000;
- public static String MGR_REMOTE_ADDR = null;
-
- public ExchangeSever(String url, ExchangeHandler handler) {
- try {
- this.server = NettyTransporter.getInstance().bind(url, handler);
- } catch (RemotingException e) {
- throw new IllegalArgumentException("handler == null");
- }
- }
-
- /**
- * 发送到指定客户端
- * @param message
- * @param sent
- * @throws RemotingException
- */
- /*public Object send(Message message) throws RemotingException {
- NettyChannel channel = getSpecifyChannel(message.getShoolRemoteAddr());
- final Request request = new Request();
- request.setVersion("1.0");
- request.setTwoWay(true);
- request.setData(message);
- //TransFuture future = new TransFuture(channel, request, TIME_OUT);
- Set<String> excuteSet = new HashSet<String>();
- final IExecuteLogic logic = new RequestLogic(request, channel);
- Object obj = ExecuteRetryUtil.execute(4, 5000, logic, excuteSet);
- return obj;
- }*/
-
- /**
- * 发送到指定客户端
- * @param message
- * @param sent
- * @throws RemotingException
- */
- public TransFuture send(Message message) throws RemotingException {
- NettyChannel channel = this.getSpecifyChannel(message.getShoolRemoteAddr());
- final Request request = new Request();
- request.setVersion("1.0");
- request.setTwoWay(true);
- request.setData(message);
- TransFuture future = new TransFuture(channel, request, TIME_OUT);
- try {
- channel.send(request, false);
- } catch (RemotingException e) {
- future.cancel();
- throw e;
- }
- return future;
- }
-
- /**
- * 发送到管理端
- * @param message
- * @param sent
- * @throws RemotingException
- */
- /*public Object sendToMgr(Message message) throws RemotingException {
- NettyChannel channel = getSpecifyChannel(MGR_REMOTE_ADDR);
- final Request request = new Request();
- request.setVersion("1.0");
- request.setTwoWay(true);
- request.setData(message);
- Set<String> excuteSet = new HashSet<String>();
- final IExecuteLogic logic = new RequestLogic(request, channel);
- Object obj = ExecuteRetryUtil.execute(4, 5000, logic, excuteSet);
- return obj;
- }*/
-
- public Object sendToMgr(Message message) throws RemotingException {
- NettyChannel channel = this.getSpecifyChannel(MGR_REMOTE_ADDR);
- final Request request = new Request();
- request.setVersion("1.0");
- request.setTwoWay(true);
- request.setData(message);
- TransFuture future = new TransFuture(channel, request, TIME_OUT);
- try {
- channel.send(request, false);
- } catch (RemotingException e) {
- future.cancel();
- throw e;
- }
- return future.get();
- }
-
- /**
- * 发送到所有学校
- * @param request
- * @param timeout
- * @return
- * @throws RemotingException
- */
- /*public int sendToAllSch(Message message) throws RemotingException {
- // create request.
- int errCode = 0;
- Set<String> set = this.server.getChannelsMap().keySet();
- Set<String> excuteSet = new HashSet<String>();
- Set<String> failSet = new HashSet<String>();
- for (String schAddr : set) {
- if (!schAddr.equals(MGR_REMOTE_ADDR)) {
- final Request req = new Request();
- req.setVersion("1.0");
- req.setTwoWay(true);
- req.setData(message);
- final NettyChannel chnnl = this.getSpecifyChannel(schAddr);
- excuteSet.add(schAddr);
- final IExecuteLogic logic = new RequestLogic(req, chnnl);
- excuteSet.add(schAddr);
- Object obj = ExecuteRetryUtil.execute(4, 5000, logic, excuteSet);
- failSet.add(excuteSet.iterator().next());
- }
- }
- if (failSet.size() > 0) {
- errCode = -1;
- }
- return errCode;
- }*/
-
- /**
- * 发送到所有学校
- * @param request
- * @param timeout
- * @return
- * @throws RemotingException
- */
- public int sendToAllSch(Message message) throws RemotingException {
- // create request.
- Map<Long, NettyChannel> failMap = new HashMap<Long, NettyChannel>();
- List<IFuture> futureList = new ArrayList<IFuture>();
- Set<String> set = this.server.getChannelsMap().keySet();
- for (String schAddr : set) {
- if (!schAddr.equals(MGR_REMOTE_ADDR)) {
- final Request req = new Request();
- req.setVersion("1.0");
- req.setTwoWay(true);
- req.setData(message);
- final NettyChannel chnnl = this.server.getChannelsMap().get(schAddr);
- IFuture future = new TransFuture(chnnl, req, TIME_OUT);
- if (chnnl.isConnected()) {
- chnnl.send(req, false);
- futureList.add(future);
- failMap.put(req.getId(), chnnl);
- } else {
- this.server.getChannelsMap().remove(NetUtils.toAddressString(chnnl.getRemoteAddress()));
- failMap.remove(NetUtils.toAddressString(chnnl.getRemoteAddress()));
- }
- }
- }
- int errCode = 0;
- for (IFuture fut : futureList) {
- try {
- fut.get();
- } catch(RemotingException e) {
- //失败重发
- errCode = -1;
- NettyChannel ch = failMap.get(fut.getId());
- if (ch.isConnected()) {
- ch.send(fut.getRequest(), false);
- }
- }
- }
- return errCode;
- }
-
- private NettyChannel getSpecifyChannel(String specifyAddr) {
- NettyChannel specifyChannel = this.server.getChannelsMap().get(specifyAddr);
- if (specifyChannel.isConnected()) {
- return specifyChannel;
- } else {
- this.server.getChannelsMap().remove(NetUtils.toAddressString(specifyChannel.getRemoteAddress()));
- }
- return null;
- }
-
- public static void main(String[] args) {
- final RetryTemplate retryTemplate = new RetryTemplate();
- SimpleRetryPolicy policy = new SimpleRetryPolicy(5, Collections.<Class<? extends Throwable>, Boolean> singletonMap(Exception.class, true));
- FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
- fixedBackOffPolicy.setBackOffPeriod(1000);
- retryTemplate.setRetryPolicy(policy);
- retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
- //重试业务操作
- final RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() {
- @Override
- public Object doWithRetry(RetryContext context) throws Exception {
- //业务逻辑
- boolean result = pushCouponByVpmsaa();
- if (!result) {
- throw new RuntimeException();
- }
- System.err.println("重试后执行成功." + result);
- return true;
- }
-
- };
- //执行指定次数后任然失败的回调
- final RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {
- @Override
- public Object recover(RetryContext context) throws Exception {
- //System.err.println("执行3次重试后还是报错,回调.");
- return Integer.MAX_VALUE;
- }
-
- };
- try {
- retryTemplate.execute(retryCallback, recoveryCallback);
- } catch(Exception e) {
- e.printStackTrace();
- }
- }
-
- public static Boolean pushCouponByVpmsaa() {
- Random random = new Random();
- int a = random.nextInt(11);
- System.out.println("a是--->" + a);
- if (a == 8) {
- return true;
- } else {
- return false;
- }
- }
- }
|