Commit 77c66d1e89dca741ee2c67b13dbf033a2470dc26

Authored by Gonzalo Aguilar Delgado
1 parent 42488076

Fix AnnotatedRPC and add performance test

level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/processor/DirectResponseProcessorImpl.java 0 → 100644
... ... @@ -0,0 +1,109 @@
  1 +package com.level2.enterprise.nebula.mq.processor;
  2 +
  3 +import java.util.concurrent.CompletionService;
  4 +import java.util.concurrent.ExecutionException;
  5 +import java.util.concurrent.Future;
  6 +
  7 +import org.slf4j.Logger;
  8 +import org.slf4j.LoggerFactory;
  9 +
  10 +import com.level2.enterprise.nebula.mq.api.ProcessingNode;
  11 +import com.level2.enterprise.nebula.mq.message.MQMessage;
  12 +import com.level2.enterprise.nebula.mq.message.MQMessageImpl;
  13 +import com.level2.enterprise.nebula.mq.processor.ResponseProcessor;
  14 +
  15 +public class DirectResponseProcessorImpl implements ResponseProcessor {
  16 + private Logger log = LoggerFactory.getLogger(DirectResponseProcessorImpl.class);
  17 +
  18 + protected boolean exit;
  19 + protected boolean processing;
  20 + protected CompletionService<MQMessage> completionService;
  21 + protected ProcessingNode processingNode;
  22 +
  23 + public DirectResponseProcessorImpl(CompletionService<MQMessage> completionService, ProcessingNode processingNode) {
  24 + this.completionService = completionService;
  25 + exit = false;
  26 + processing=false;
  27 + this.processingNode=processingNode;
  28 + }
  29 +
  30 + public void handleResponse(MQMessage message)
  31 + {
  32 + try {
  33 + if(message.isHandled()){
  34 + processingNode.ack(message.getDeliveryTag());
  35 + }
  36 + if(message.getResponse()!=null && message.getResponse().getPayload()!=null){
  37 + if(message.getReplyTo()!=null){
  38 + if(!processingNode.respond(message.getReplyTo(),
  39 + message.getCorrelationId(), message.getResponse().getContentType(),
  40 + message.getResponse().getPayload())){
  41 + log.warn("The response was not sent!");
  42 + }
  43 +
  44 + }else{
  45 + log.debug("There's nobody to respond. ReplyTo is empty");
  46 + }
  47 + }else{
  48 + log.debug("There's no response to be returned to caller");
  49 + }
  50 +
  51 + }catch(Exception ex){
  52 + log.warn("Message result processing gave an error: {}", ex.getMessage(), ex);
  53 + }
  54 + message.stopChrono();
  55 + }
  56 +
  57 + @Override
  58 + public void run() {
  59 + processing=true;
  60 + Future<MQMessage> future=null;
  61 + MQMessage result=null;
  62 + while(!exit){
  63 + try {
  64 + future = completionService.take(); // This should be locked until something is completed
  65 + } catch (InterruptedException e1) {
  66 + log.warn("The service has been interrupted while processing responses");
  67 + }
  68 + if(future!=null){
  69 + try {
  70 + result = future.get();
  71 + future = null;
  72 + } catch (InterruptedException e) {
  73 + log.warn("The future was interruped while getting result: {}", e.getMessage());
  74 + } catch (ExecutionException e) {
  75 + log.warn("The process was broken while getting result: {}", e.getMessage());
  76 + }
  77 + if(result!=null){
  78 + log.trace("Got result handled {}", result.isHandled());
  79 + handleResponse(result);
  80 + result = null;
  81 + }else{
  82 + log.trace("Work completed without result");
  83 + }
  84 + }else{
  85 + log.warn("Finishing processing since we were interrupted!");
  86 + exit=true;
  87 + }
  88 + }
  89 + log.debug("Response processor finished");
  90 + processing=false;
  91 + }
  92 +
  93 + @Override
  94 + public void shutdown() {
  95 + exit=true;
  96 +// log.info("Shutting down {}...", this.getClass().getSimpleName());
  97 + log.info("{} down", this.getClass().getSimpleName());
  98 + }
  99 +
  100 + public boolean isProcessing() {
  101 + return processing;
  102 + }
  103 +
  104 + public void setProcessing(boolean processing) {
  105 + this.processing = processing;
  106 + }
  107 +
  108 +
  109 +}
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/processor/QueuedProcessorNodeImpl.java
... ... @@ -43,7 +43,7 @@ public class QueuedProcessorNodeImpl extends DefaultConsumer implements QueuedPr
43 43 completionService = new ExecutorCompletionService<MQMessage>(executor);
44 44 // this.consumer=consumer;
45 45 requestProcessor = new QueuedRequestProcessorImpl(completionService, target);
46   - responseProcessor = new QueuedResponseProcessorImpl(completionService, this);
  46 + responseProcessor = new DirectResponseProcessorImpl(completionService, this);
47 47  
48 48 // Start two threads 1 process each to handle request and responses
49 49 // FIXME This can be also a pool of threads
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/processor/QueuedResponseProcessorImpl.java
... ... @@ -1,107 +0,0 @@
1   -package com.level2.enterprise.nebula.mq.processor;
2   -
3   -import java.util.concurrent.CompletionService;
4   -import java.util.concurrent.ExecutionException;
5   -import java.util.concurrent.Future;
6   -
7   -import org.slf4j.Logger;
8   -import org.slf4j.LoggerFactory;
9   -
10   -import com.level2.enterprise.nebula.mq.api.ProcessingNode;
11   -import com.level2.enterprise.nebula.mq.message.MQMessage;
12   -import com.level2.enterprise.nebula.mq.message.MQMessageImpl;
13   -import com.level2.enterprise.nebula.mq.processor.ResponseProcessor;
14   -
15   -public class QueuedResponseProcessorImpl implements ResponseProcessor {
16   - private Logger log = LoggerFactory.getLogger(QueuedResponseProcessorImpl.class);
17   -
18   - protected boolean exit;
19   - protected boolean processing;
20   - protected CompletionService<MQMessage> completionService;
21   - protected ProcessingNode processingNode;
22   -
23   - public QueuedResponseProcessorImpl(CompletionService<MQMessage> completionService, ProcessingNode processingNode) {
24   - this.completionService = completionService;
25   - exit = false;
26   - processing=false;
27   - this.processingNode=processingNode;
28   - }
29   -
30   - public void handleResponse(MQMessage message)
31   - {
32   - try {
33   - if(message.isHandled()){
34   - processingNode.ack(message.getDeliveryTag());
35   - }
36   - if(message.getResponse()!=null && message.getResponse().getPayload()!=null){
37   - if(message.getReplyTo()!=null){
38   - if(!processingNode.respond(message.getReplyTo(),
39   - message.getCorrelationId(), message.getResponse().getContentType(),
40   - message.getResponse().getPayload())){
41   - log.warn("The response was not sent!");
42   - }
43   -
44   - }else{
45   - log.debug("There's nobody to respond. ReplyTo is empty");
46   - }
47   - }else{
48   - log.debug("There's no response to be returned to caller");
49   - }
50   -
51   - }catch(Exception ex){
52   - log.warn("Message result processing gave an error: {}", ex.getMessage(), ex);
53   - }
54   - message.stopChrono();
55   - }
56   -
57   - @Override
58   - public void run() {
59   - processing=true;
60   - Future<MQMessage> future=null;
61   - MQMessage result=null;
62   - while(!exit){
63   - try {
64   - future = completionService.take(); // This should be locked until something is completed
65   - } catch (InterruptedException e1) {
66   - log.warn("The service has been interrupted while processing responses");
67   - }
68   - if(future!=null){
69   - try {
70   - result = future.get();
71   - future = null;
72   - } catch (InterruptedException e) {
73   - log.warn("The future was interruped while getting result: {}", e.getMessage());
74   - } catch (ExecutionException e) {
75   - log.warn("The process was broken while getting result: {}", e.getMessage());
76   - }
77   - if(result!=null){
78   - log.trace("Got result handled {}", result.isHandled());
79   - handleResponse(result);
80   - result = null;
81   - }
82   - }else{
83   - log.warn("Finishing processing since we were interrupted!");
84   - exit=true;
85   - }
86   - }
87   - log.debug("Response processor finished");
88   - processing=false;
89   - }
90   -
91   - @Override
92   - public void shutdown() {
93   - exit=true;
94   -// log.info("Shutting down {}...", this.getClass().getSimpleName());
95   - log.info("{} down", this.getClass().getSimpleName());
96   - }
97   -
98   - public boolean isProcessing() {
99   - return processing;
100   - }
101   -
102   - public void setProcessing(boolean processing) {
103   - this.processing = processing;
104   - }
105   -
106   -
107   -}
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/rpc/AnnotatedMQTargetRPCImpl.java 0 → 100644
... ... @@ -0,0 +1,278 @@
  1 +package com.level2.enterprise.nebula.mq.rpc;
  2 +
  3 +import java.io.Serializable;
  4 +import java.util.ArrayList;
  5 +import java.util.List;
  6 +
  7 +import org.apache.commons.lang.StringUtils;
  8 +import org.slf4j.Logger;
  9 +import org.slf4j.LoggerFactory;
  10 +
  11 +import com.level2.enterprise.nebula.mq.api.ExchangeDescriptor;
  12 +import com.level2.enterprise.nebula.mq.api.InvalidMQMessageException;
  13 +import com.level2.enterprise.nebula.mq.api.MQConnectionManager;
  14 +import com.level2.enterprise.nebula.mq.api.QueueDescriptor;
  15 +import com.level2.enterprise.nebula.mq.connection.RabbitMQConsumer;
  16 +import com.level2.enterprise.nebula.mq.mapping.MethodDescription;
  17 +import com.level2.enterprise.nebula.mq.mapping.ProcessorClass;
  18 +import com.level2.enterprise.nebula.mq.processor.RabbitMQQueueDescriptorImpl;
  19 +import com.level2.enterprise.nebula.mq.reflection.AnnotationClassMapper;
  20 +import com.level2.enterprise.nebula.mq.reflection.ClassMapper;
  21 +
  22 +public class AnnotatedMQTargetRPCImpl extends RabbitRPCImpl implements AnnotatedRPC {
  23 + private Logger log = LoggerFactory.getLogger(AnnotatedMQTargetRPCImpl.class);
  24 +
  25 + ExchangeDescriptor exchange;
  26 + List<RPCMessageType> methodsAvailable;
  27 + ClassMapper mapper;
  28 + String replyQueueName;
  29 +
  30 +// public AnnotatedMQTargetRPCImpl(Channel channel, ExchangeDescriptor exchange, Class<?> target) {
  31 +// super(channel);
  32 +// this.exchange = exchange;
  33 +// if(this.connect()){
  34 +// log.info("We are connected to MQ");
  35 +// addTarget(target);
  36 +// }
  37 +// }
  38 +
  39 + public AnnotatedMQTargetRPCImpl(MQConnectionManager connectionManager, ExchangeDescriptor exchange) {
  40 + super(connectionManager);
  41 + this.exchange = exchange;
  42 + mapper = new AnnotationClassMapper();
  43 + // We are the consumer
  44 + consumer = (RabbitMQConsumer) connectionManager.createConsumer();
  45 + consumer.setProcessingNode(this);
  46 +
  47 + methodsAvailable = new ArrayList<RPCMessageType>();
  48 + }
  49 +
  50 + @Override
  51 + public void registerTarget(Class<?> target)
  52 + {
  53 +// log.debug("Finding methods available");
  54 +// methodsAvailable=findMethods(target);
  55 + ProcessorClass processorClass = mapper.mapObject(target);
  56 + if(processorClass!=null){
  57 + QueueDescriptor queueHandle = new RabbitMQQueueDescriptorImpl(processorClass.getQueueName(), this.exchange);
  58 + if(declareQueues(processorClass, queueHandle)){
  59 + // Assign a worker to this method
  60 +// if(!createTarget(processorClass, queueHandle.getQueueHandle(), target)){
  61 +// log.warn("There was no possible to assign a worker to the method");
  62 +// }
  63 + // Bind ourselves
  64 + if(StringUtils.isBlank(replyQueueName))
  65 + replyQueueName = bindResponseQueue(exchange);
  66 + }
  67 + }
  68 + }
  69 +
  70 +
  71 + protected boolean declareQueues(ProcessorClass processorClass, QueueDescriptor queueHandle)
  72 + {
  73 + boolean retval = true;
  74 + // We will create a thread with one channel that will handle all methods in the object
  75 + // The queue name will be the class name.
  76 + // The method will be the one determined by the class name + method if it has annotation
  77 + if(connectionManager.registerQueue(queueHandle, consumer)){
  78 + log.debug("Queue {} registered", queueHandle.getQueueHandle());
  79 +// if(!createTarget(processorClass, processorClass.getQueueName())){
  80 +// log.warn("Cannot create a new target");
  81 +// }
  82 + }
  83 +
  84 + for(MethodDescription method : processorClass.getListMethods()){
  85 + log.debug("Creating processor MQ for method {}", method.getRoutingKey());
  86 + log.debug("Binding the queue {} to exchange {} with routing key {}",queueHandle.getQueueHandle(), queueHandle.getExchange().toString(), method.getRoutingKey());
  87 +
  88 + if(!connectionManager.bindQueue(queueHandle, consumer, method.getRoutingKey())){
  89 + log.warn("Cannot bind the queue {} to exchange {} with routing key {}",queueHandle.getQueueHandle(), queueHandle.getExchange().toString(), method.getRoutingKey());
  90 + }
  91 +
  92 + RPCMessageType rpcMessage = new RPCMessageType(queueHandle.getQueueHandle(), exchange);
  93 + rpcMessage.setMethodName(method.getRoutingKey());
  94 + try {
  95 + rpcMessage.setRequestClass((Class<? extends Serializable>) Class.forName(method.getRequestClassName()));
  96 + } catch (ClassNotFoundException e) {
  97 + log.warn("Cannot find the class for the method request it can go unespecified: {}", e.getMessage());
  98 + }
  99 +
  100 + try {
  101 + rpcMessage.setResponseClass((Class<? extends Serializable>) Class.forName(method.getResponeClassName()));
  102 + } catch (ClassNotFoundException e) {
  103 + log.warn("Cannot find the class for the method response it can go unespecified: {}", e.getMessage());
  104 + }
  105 +
  106 +
  107 + methodsAvailable.add(rpcMessage);
  108 + retval = true;
  109 + }
  110 +// if(retval){
  111 +// QueueDescriptor[] queueList = {queueHandle};
  112 +// if(!connectionManager.subscribe(queueList, consumer)){
  113 +// log.warn("We were unable to subscribe to the queues");
  114 +// }
  115 +// }
  116 +
  117 +// if(retval) processorNode.start(queueHandle.getQueueHandle());
  118 +
  119 + return retval;
  120 + }
  121 +
  122 + @Override
  123 + public boolean bindMethod(RPCMessageType messageType)
  124 + {
  125 + boolean matched = false;
  126 + for(RPCMessageType rpcMessage: methodsAvailable){
  127 + if(messageType.equals(rpcMessage.getMethodName())){
  128 + matched=true;
  129 + break;
  130 + }
  131 + }
  132 + return matched;
  133 + }
  134 +// protected List<RPCMessageType> findMethods(Class<?> target)
  135 +// {
  136 +// List<RPCMessageType> result = new ArrayList<RPCMessageType>();
  137 +// List<Method> listMethods = getMethodsOfInterfacesAnnotatedWith(target, QueuedMethod.class);
  138 +// if(listMethods==null || listMethods.size()==0){
  139 +// log.debug("Fallback to method annotated in class");
  140 +// listMethods = getMethodsAnnotatedWith(target, QueuedMethod.class);
  141 +// }
  142 +//
  143 +// String queueName = getQueueName(target);
  144 +// RPCMessageType queueHolder = new RPCMessageType(queueName, exchange);
  145 +// if(registerQueues(new QueueDescriptor[]{queueHolder})){
  146 +// log.debug("Queue {} registered", queueHolder.getQueueHandle());
  147 +// }
  148 +//
  149 +// for(Method method : listMethods){
  150 +// queueHolder = new RPCMessageType(queueName, exchange); // Create new instance
  151 +// String methodName = String.format("%s.%s", method.getDeclaringClass().getName(), method.getName());
  152 +// log.info("Creating target MQ method for {}", methodName);
  153 +// if(isMQValid(method)){
  154 +// QueuedMethod annotation = method.getAnnotation(QueuedMethod.class);
  155 +// log.info("We are connecting method {} to queue {}", methodName, queueHolder.getQueueHandle());
  156 +// queueHolder.setMethodName(methodName);
  157 +// queueHolder.setRequestClass(getFirstParameter(method));
  158 +// queueHolder.setResponseClass((Class<? extends Serializable>) method.getReturnType());
  159 +// if(!bindMethod(queueHolder)){
  160 +// log.warn("We were unable to bind to {} so it surely will fail to get response", queueHolder.getMethodName());
  161 +// }else{
  162 +// result.add(queueHolder);
  163 +//
  164 +// }
  165 +// }
  166 +// }
  167 +// return result;
  168 +// }
  169 +//
  170 +// /**
  171 +// * It will return the class name or the name indicated by queueName annotation
  172 +// * @param target class request will be directed to
  173 +// * @return a String with the queue name of the target
  174 +// */
  175 +// public String getQueueName(Class<?> target)
  176 +// {
  177 +// String queueName = target.getName();
  178 +// if(target.isAnnotationPresent(Queued.class)){
  179 +// Queued annotation = target.getAnnotation(Queued.class);
  180 +// if(!StringUtils.isBlank(annotation.queueName())){
  181 +// queueName = annotation.queueName();
  182 +// }
  183 +// }
  184 +// return queueName;
  185 +// }
  186 +//
  187 +// @SuppressWarnings("unchecked")
  188 +// protected Class<? extends Serializable> getFirstParameter(Method method)
  189 +// {
  190 +// Class<? extends Serializable> retval=null;
  191 +// Class<?>parameters[] = method.getParameterTypes();
  192 +// if(Serializable.class.isAssignableFrom(parameters[0])){
  193 +// retval = (Class<? extends Serializable>) parameters[0];
  194 +// }
  195 +// return retval;
  196 +// }
  197 +//
  198 +// protected boolean isMQValid(Method method)
  199 +// {
  200 +// boolean isValid = true;
  201 +// Class<?>parameters[] = method.getParameterTypes();
  202 +// if(parameters.length>1){
  203 +// log.warn("We only support methods of 1 parameter");
  204 +// isValid = false;
  205 +// }else{
  206 +// for(Class<?>param : parameters){
  207 +// if(!Serializable.class.isAssignableFrom(param)){
  208 +// log.warn("All parameters should be Serializable. Parameter {} is not", param.getName());
  209 +// isValid=false;
  210 +// break;
  211 +// }
  212 +// }
  213 +// }
  214 +// return isValid;
  215 +// }
  216 +
  217 +//
  218 +// public static List<Method> getMethodsOfInterfacesAnnotatedWith(final Class<?> type, final Class<? extends Annotation> annotation) {
  219 +// final List<Method> methods = new ArrayList<Method>();
  220 +// Class<?> klass = type;
  221 +// while (klass != null && klass != Object.class) {
  222 +// for(Class<?>iklass: klass.getInterfaces()){
  223 +// // need to iterated thought hierarchy in order to retrieve methods from above the current instance
  224 +// // iterate though the list of methods declared in the class represented by klass variable, and add those annotated with the specified annotation
  225 +// final List<Method> allMethods = new ArrayList<Method>(Arrays.asList(iklass.getDeclaredMethods()));
  226 +// for (final Method method : allMethods) {
  227 +// if (method.isAnnotationPresent(annotation)) {
  228 +//// Annotation annotInstance = method.getAnnotation(annotation);
  229 +// // TODO process annotInstance
  230 +// methods.add(method);
  231 +// }
  232 +// }
  233 +// }
  234 +// // move to the upper class in the hierarchy in search for more methods
  235 +// klass = klass.getSuperclass();
  236 +// }
  237 +// return methods;
  238 +// }
  239 +//
  240 +// public static List<Method> getMethodsAnnotatedWith(final Class<?> type, final Class<? extends Annotation> annotation) {
  241 +// final List<Method> methods = new ArrayList<Method>();
  242 +// Class<?> klass = type;
  243 +// while (klass != Object.class) {
  244 +// // need to iterated thought hierarchy in order to retrieve methods from above the current instance
  245 +// // iterate though the list of methods declared in the class represented by klass variable, and add those annotated with the specified annotation
  246 +// final List<Method> allMethods = new ArrayList<Method>(Arrays.asList(klass.getDeclaredMethods()));
  247 +// for (final Method method : allMethods) {
  248 +// if (method.isAnnotationPresent(annotation)) {
  249 +//// Annotation annotInstance = method.getAnnotation(annotation);
  250 +// // TODO process annotInstance
  251 +// methods.add(method);
  252 +// }
  253 +// }
  254 +// // move to the upper class in the hierarchy in search for more methods
  255 +// klass = klass.getSuperclass();
  256 +// }
  257 +// return methods;
  258 +// }
  259 +//
  260 + @Override
  261 + public Serializable call(String name, Serializable object, long timeout)
  262 + {
  263 + Serializable retval = null;
  264 + for(RPCMessageType holder : methodsAvailable){
  265 + if(holder.getMethodName().endsWith(name)){
  266 + holder.setReplyQueue(this.replyQueueName);
  267 +// holder.setRequestClass(object.getClass());
  268 + try {
  269 + retval = (Serializable) this.call(holder, object, timeout);
  270 + } catch (InvalidMQMessageException e) {
  271 + log.info("Invalid message: {}", e.getMessage(), e);
  272 + }
  273 + }
  274 + }
  275 + return retval;
  276 + }
  277 +
  278 +}
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/rpc/AnnotatedRPC.java
... ... @@ -6,6 +6,6 @@ import com.level2.enterprise.nebula.mq.api.RabbitClientNode;
6 6  
7 7 public interface AnnotatedRPC extends RabbitClientNode {
8 8  
9   - void addTarget(Class<?> target);
10   - Serializable call(String name, Serializable object);
  9 + void registerTarget(Class<?> target);
  10 + Serializable call(String name, Serializable object, long timeout);
11 11 }
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/rpc/AnnotatedRabbitRPCImpl.java
... ... @@ -1,193 +0,0 @@
1   -package com.level2.enterprise.nebula.mq.rpc;
2   -
3   -import java.io.Serializable;
4   -import java.lang.annotation.Annotation;
5   -import java.lang.reflect.Method;
6   -import java.util.ArrayList;
7   -import java.util.Arrays;
8   -import java.util.List;
9   -
10   -import org.apache.commons.lang.StringUtils;
11   -import org.slf4j.Logger;
12   -import org.slf4j.LoggerFactory;
13   -
14   -import com.level2.enterprise.nebula.mq.api.ExchangeDescriptor;
15   -import com.level2.enterprise.nebula.mq.api.InvalidMQMessageException;
16   -import com.level2.enterprise.nebula.mq.api.QueueDescriptor;
17   -import com.level2.enterprise.nebula.mq.processor.api.Queued;
18   -import com.level2.enterprise.nebula.mq.processor.api.QueuedMethod;
19   -import com.rabbitmq.client.Channel;
20   -
21   -public class AnnotatedRabbitRPCImpl extends RabbitRPCImpl implements AnnotatedRPC {
22   - private Logger log = LoggerFactory.getLogger(AnnotatedRabbitRPCImpl.class);
23   -
24   - ExchangeDescriptor exchange;
25   - List<RPCMessageType> methodsAvailable;
26   -
27   - public AnnotatedRabbitRPCImpl(Channel channel, ExchangeDescriptor exchange, Class<?> target) {
28   - super(channel);
29   - this.exchange = exchange;
30   - if(this.connect()){
31   - log.info("We are connected to MQ");
32   - addTarget(target);
33   - }
34   - }
35   -
36   - public AnnotatedRabbitRPCImpl(Channel channel, ExchangeDescriptor exchange) {
37   - super(channel);
38   - this.exchange = exchange;
39   - if(this.connect()){
40   - log.info("We are connected to MQ");
41   - }
42   - }
43   -
44   - @Override
45   - public void addTarget(Class<?> target)
46   - {
47   - log.debug("Finding methods available");
48   - methodsAvailable=findMethods(target);
49   - }
50   -
51   - protected List<RPCMessageType> findMethods(Class<?> target)
52   - {
53   - List<RPCMessageType> result = new ArrayList<RPCMessageType>();
54   - List<Method> listMethods = getMethodsOfInterfacesAnnotatedWith(target, QueuedMethod.class);
55   - if(listMethods==null || listMethods.size()==0){
56   - log.debug("Fallback to method annotated in class");
57   - listMethods = getMethodsAnnotatedWith(target, QueuedMethod.class);
58   - }
59   -
60   - String queueName = getQueueName(target);
61   - RPCMessageType queueHolder = new RPCMessageType(queueName, exchange);
62   - if(registerQueues(new QueueDescriptor[]{queueHolder})){
63   - log.debug("Queue {} registered", queueHolder.getQueueHandle());
64   - }
65   -
66   - for(Method method : listMethods){
67   - queueHolder = new RPCMessageType(queueName, exchange); // Create new instance
68   - String methodName = String.format("%s.%s", method.getDeclaringClass().getName(), method.getName());
69   - log.info("Creating target MQ method for {}", methodName);
70   - if(isMQValid(method)){
71   - QueuedMethod annotation = method.getAnnotation(QueuedMethod.class);
72   - log.info("We are connecting method {} to queue {}", methodName, queueHolder.getQueueHandle());
73   - queueHolder.setMethodName(methodName);
74   - queueHolder.setRequestClass(getFirstParameter(method));
75   - queueHolder.setResponseClass((Class<? extends Serializable>) method.getReturnType());
76   - if(!bindMethod(queueHolder)){
77   - log.warn("We were unable to bind to {} so it surely will fail to get response", queueHolder.getMethodName());
78   - }else{
79   - result.add(queueHolder);
80   -
81   - }
82   - }
83   - }
84   - return result;
85   - }
86   -
87   - /**
88   - * It will return the class name or the name indicated by queueName annotation
89   - * @param target class request will be directed to
90   - * @return a String with the queue name of the target
91   - */
92   - public String getQueueName(Class<?> target)
93   - {
94   - String queueName = target.getName();
95   - if(target.isAnnotationPresent(Queued.class)){
96   - Queued annotation = target.getAnnotation(Queued.class);
97   - if(!StringUtils.isBlank(annotation.queueName())){
98   - queueName = annotation.queueName();
99   - }
100   - }
101   - return queueName;
102   - }
103   -
104   - @SuppressWarnings("unchecked")
105   - protected Class<? extends Serializable> getFirstParameter(Method method)
106   - {
107   - Class<? extends Serializable> retval=null;
108   - Class<?>parameters[] = method.getParameterTypes();
109   - if(Serializable.class.isAssignableFrom(parameters[0])){
110   - retval = (Class<? extends Serializable>) parameters[0];
111   - }
112   - return retval;
113   - }
114   -
115   - protected boolean isMQValid(Method method)
116   - {
117   - boolean isValid = true;
118   - Class<?>parameters[] = method.getParameterTypes();
119   - if(parameters.length>1){
120   - log.warn("We only support methods of 1 parameter");
121   - isValid = false;
122   - }else{
123   - for(Class<?>param : parameters){
124   - if(!Serializable.class.isAssignableFrom(param)){
125   - log.warn("All parameters should be Serializable. Parameter {} is not", param.getName());
126   - isValid=false;
127   - break;
128   - }
129   - }
130   - }
131   - return isValid;
132   - }
133   -
134   -
135   - public static List<Method> getMethodsOfInterfacesAnnotatedWith(final Class<?> type, final Class<? extends Annotation> annotation) {
136   - final List<Method> methods = new ArrayList<Method>();
137   - Class<?> klass = type;
138   - while (klass != null && klass != Object.class) {
139   - for(Class<?>iklass: klass.getInterfaces()){
140   - // need to iterated thought hierarchy in order to retrieve methods from above the current instance
141   - // iterate though the list of methods declared in the class represented by klass variable, and add those annotated with the specified annotation
142   - final List<Method> allMethods = new ArrayList<Method>(Arrays.asList(iklass.getDeclaredMethods()));
143   - for (final Method method : allMethods) {
144   - if (method.isAnnotationPresent(annotation)) {
145   -// Annotation annotInstance = method.getAnnotation(annotation);
146   - // TODO process annotInstance
147   - methods.add(method);
148   - }
149   - }
150   - }
151   - // move to the upper class in the hierarchy in search for more methods
152   - klass = klass.getSuperclass();
153   - }
154   - return methods;
155   - }
156   -
157   - public static List<Method> getMethodsAnnotatedWith(final Class<?> type, final Class<? extends Annotation> annotation) {
158   - final List<Method> methods = new ArrayList<Method>();
159   - Class<?> klass = type;
160   - while (klass != Object.class) {
161   - // need to iterated thought hierarchy in order to retrieve methods from above the current instance
162   - // iterate though the list of methods declared in the class represented by klass variable, and add those annotated with the specified annotation
163   - final List<Method> allMethods = new ArrayList<Method>(Arrays.asList(klass.getDeclaredMethods()));
164   - for (final Method method : allMethods) {
165   - if (method.isAnnotationPresent(annotation)) {
166   -// Annotation annotInstance = method.getAnnotation(annotation);
167   - // TODO process annotInstance
168   - methods.add(method);
169   - }
170   - }
171   - // move to the upper class in the hierarchy in search for more methods
172   - klass = klass.getSuperclass();
173   - }
174   - return methods;
175   - }
176   -
177   - @Override
178   - public Serializable call(String name, Serializable object)
179   - {
180   - Serializable retval = null;
181   - for(RPCMessageType holder : methodsAvailable){
182   - if(holder.getMethodName().endsWith(name)){
183   - try {
184   - retval = (Serializable) this.call(holder, object, 10000L);
185   - } catch (InvalidMQMessageException e) {
186   - log.info("Invalid message: {}", e.getMessage(), e);
187   - }
188   - }
189   - }
190   - return retval;
191   - }
192   -
193   -}
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/rpc/RabbitRPCImpl.java
... ... @@ -13,6 +13,7 @@ import org.apache.commons.lang.StringUtils;
13 13 import org.slf4j.Logger;
14 14 import org.slf4j.LoggerFactory;
15 15  
  16 +import com.level2.enterprise.nebula.mq.api.ExchangeDescriptor;
16 17 import com.level2.enterprise.nebula.mq.api.InvalidMQMessageException;
17 18 import com.level2.enterprise.nebula.mq.api.MQConnectionManager;
18 19 import com.level2.enterprise.nebula.mq.api.MediaType;
... ... @@ -49,7 +50,7 @@ public class RabbitRPCImpl extends AbstractMQTargetManagerImpl implements RPC {
49 50 @Override
50 51 public boolean bindMethod(RPCMessageType messageType)
51 52 {
52   -
  53 + boolean retval=false;
53 54 if(consumer!=null){
54 55 consumer.shutdown();
55 56 consumer = null;
... ... @@ -59,23 +60,35 @@ public class RabbitRPCImpl extends AbstractMQTargetManagerImpl implements RPC {
59 60 consumer.setProcessingNode(this);
60 61  
61 62 if(StringUtils.isBlank(messageType.getReplyQueue())){
62   - messageType.setReplyQueue(connectionManager.declareQueue(consumer));
  63 + messageType.setReplyQueue(bindResponseQueue(messageType.getExchange()));
63 64 log.debug("Created reply queue {}",messageType.getReplyQueue());
64   -
65 65 }
66 66 QueueDescriptor replyqueue = new RabbitMQQueueDescriptorImpl(messageType.getReplyQueue(), messageType.getExchange());
67 67 QueueDescriptor[] queueList = {replyqueue};
68 68 QueueDescriptor queue = new RabbitMQQueueDescriptorImpl(messageType.getQueueHandle(), messageType.getExchange());
69 69  
70 70 if(connectionManager.bindQueue(queue, consumer, messageType.getMethodName())){
71   -
  71 + retval=true;
72 72 }else{
73 73 log.warn("Some of the registration failed we are not binded to the queue {}", queue.getQueueHandle());
  74 +
74 75 }
75 76  
76   - return connectionManager.subscribe(queueList, consumer);
  77 + return retval;
77 78 // return bindQueue(queueHandle, messageType.getMethodName());
78 79 }
  80 +
  81 + protected String bindResponseQueue(ExchangeDescriptor exchange)
  82 + {
  83 + String replyQueue = connectionManager.declareQueue(consumer);
  84 + QueueDescriptor replyqueue = new RabbitMQQueueDescriptorImpl(replyQueue, exchange);
  85 + QueueDescriptor[] queueList = {replyqueue};
  86 + if(!connectionManager.subscribe(queueList, consumer)){
  87 + log.warn("Cannot connect to reply queue {}", replyQueue);
  88 + replyQueue=null;
  89 + }
  90 + return replyQueue;
  91 + }
79 92 //
80 93 // public boolean bindQueue(QueueDescriptor queue, String routingKey) {
81 94 // boolean retval = false;
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/target/annotated/AnnotatedMQTargetManagerImpl.java
... ... @@ -80,6 +80,7 @@ public class AnnotatedMQTargetManagerImpl extends AbstractMQTargetManagerImpl im
80 80 boolean retval = true;
81 81 List<MessageTarget> listTargets = registeredTargets.get(routingKey);
82 82 if(listTargets == null){
  83 + // We register the target as it doesn't exists
83 84 listTargets = new ArrayList<MessageTarget>();
84 85 registeredTargets.put(routingKey, listTargets);
85 86 }
... ... @@ -90,7 +91,7 @@ public class AnnotatedMQTargetManagerImpl extends AbstractMQTargetManagerImpl im
90 91 messageTarget.setInstance(instance);
91 92 listTargets.add(messageTarget);
92 93 }else{
93   - log.debug("There's one worker for the target {}", routingKey);
  94 + log.debug("There's already one worker for the target {}", routingKey);
94 95 }
95 96 return retval;
96 97 }
... ...
level2-rabbit-mq-impl/src/test/java/com/level2/enterprise/nebula/mq/AnnotatedRabbitMQRPCTests.java
... ... @@ -17,7 +17,7 @@ import com.level2.enterprise.nebula.mq.processor.RabbitMQExchangeTest;
17 17 import com.level2.enterprise.nebula.mq.processor.annotated.AnnotatedTestClass;
18 18 import com.level2.enterprise.nebula.mq.processor.annotated.AnnotatedTestClassImpl;
19 19 import com.level2.enterprise.nebula.mq.processor.target.AnnotatedTargetManagerImpl;
20   -import com.level2.enterprise.nebula.mq.rpc.AnnotatedRabbitRPCImpl;
  20 +import com.level2.enterprise.nebula.mq.rpc.AnnotatedMQTargetRPCImpl;
21 21 import com.level2.enterprise.nebula.mq.rpc.RPCMessageType;
22 22 import com.level2.enterprise.nebula.mq.rpc.RabbitRPCImpl;
23 23  
... ... @@ -54,7 +54,7 @@ public class AnnotatedRabbitMQRPCTests {
54 54 assertTrue(connectionManager.connect());
55 55 log.info("Connected...");
56 56  
57   - AnnotatedRabbitRPCImpl annotatedRPC = new AnnotatedRabbitRPCImpl(connectionManager.getChannel(), RabbitExchangeDescriptor.RABBIT_DEFAULT_EXCHANGE, AnnotatedTestClassImpl.class);
  57 + AnnotatedMQTargetRPCImpl annotatedRPC = new AnnotatedMQTargetRPCImpl(connectionManager.getChannel(), RabbitExchangeDescriptor.RABBIT_DEFAULT_EXCHANGE, AnnotatedTestClassImpl.class);
58 58  
59 59 AnnotatedMQTargetManagerImpl processor = new AnnotatedMQTargetManagerImpl(connectionManager.getChannel(),RabbitExchangeDescriptor.RABBIT_DEFAULT_EXCHANGE);
60 60 processor.registerTarget(AnnotatedTestClassImpl.class);
... ...
level2-rabbit-mq-impl/src/test/java/com/level2/enterprise/nebula/mq/processor/annotated/AnnotatedRabbitMQProcessorTests.java
... ... @@ -13,6 +13,7 @@ import org.slf4j.LoggerFactory;
13 13  
14 14 import com.level2.enterprise.nebula.mq.connection.RabbitConnectionManagerImpl;
15 15 import com.level2.enterprise.nebula.mq.processor.exchange.TestRabbitMQExchange;
  16 +import com.level2.enterprise.nebula.mq.rpc.AnnotatedMQTargetRPCImpl;
16 17 import com.level2.enterprise.nebula.mq.rpc.RPC;
17 18 import com.level2.enterprise.nebula.mq.rpc.RPCMessageType;
18 19 import com.level2.enterprise.nebula.mq.rpc.RabbitRPCImpl;
... ... @@ -153,4 +154,66 @@ public class AnnotatedRabbitMQProcessorTests {
153 154 log.info("Finished");
154 155 }
155 156  
  157 + @Test
  158 + public void test_TestMessage_Performance_Annotated() throws Exception
  159 + {
  160 + RabbitConnectionManagerImpl connectionManager = new RabbitConnectionManagerImpl();
  161 + assertTrue(connectionManager.connect());
  162 + log.info("Connected");
  163 + AnnotatedTestClass targetClass = new AnnotatedTestClassImpl();
  164 +
  165 + AnnotatedMQTargetManagerImpl processor = new AnnotatedMQTargetManagerImpl(connectionManager,TestRabbitMQExchange.RABBIT_DEFAULT_TEST_EXCHANGE);
  166 + processor.registerTarget(targetClass);
  167 + processor.start();
  168 +
  169 +// RPC rpc = new RabbitRPCImpl(connectionManager);
  170 +
  171 +// RPCMessageType messageType = new RPCMessageType("com.level2.enterprise.nebula.mq.processor.annotated.AnnotatedTestClass", TestRabbitMQExchange.RABBIT_DEFAULT_TEST_EXCHANGE);
  172 +// messageType.setRequestClass(String.class);
  173 +// messageType.setResponseClass(String.class);
  174 +// messageType.setMethodName("com.level2.enterprise.nebula.mq.processor.annotated.AnnotatedTestClass.getMessage");
  175 +// assertTrue(rpc.bindMethod(messageType));
  176 +
  177 + AnnotatedMQTargetRPCImpl rpc = new AnnotatedMQTargetRPCImpl(connectionManager,TestRabbitMQExchange.RABBIT_DEFAULT_TEST_EXCHANGE);
  178 + rpc.registerTarget(AnnotatedTestClass.class);
  179 +
  180 +
  181 +
  182 + log.info("Starting publishing on thread " + Thread.currentThread().getId());
  183 + long count = 0;
  184 + long messageCount = 5000L;
  185 + long timeCount = 0;
  186 + long testStartTime =System.nanoTime();
  187 + do{
  188 + long startMSG = System.nanoTime();
  189 + Object response = rpc.call("getMessage", "Hello RPC", 1000L);
  190 + long stopMSG = (System.nanoTime()-startMSG);
  191 + timeCount += stopMSG;
  192 + if(response!=null){
  193 + log.debug("The response message was [{}]", response);
  194 + Assert.assertEquals("Response to message [Hello RPC]", response);
  195 + }
  196 + count++;
  197 +
  198 + startMSG = System.nanoTime();
  199 + response = rpc.call("getMessage2", "Hello RPC", 1000L);
  200 + stopMSG = (System.nanoTime()-startMSG);
  201 + timeCount += stopMSG;
  202 + if(response!=null){
  203 + log.debug("The response message was [{}]", response);
  204 + Assert.assertEquals("Response to message [Hello RPC] on 2", response);
  205 + }
  206 + count++;
  207 +
  208 + }while(count<messageCount);
  209 +
  210 + long timming = (System.nanoTime()-testStartTime) /1000000;
  211 + log.info("The overall time took {} ms seconds, average time {} ms", timming, timeCount /1000000.0/messageCount);
  212 + Assert.assertTrue("Performance is not really ok", timming<messageCount*1.5);
  213 +
  214 + processor.shutdown();
  215 + connectionManager.shutdown();
  216 + log.info("Finished");
  217 + }
  218 +
156 219 }
... ...
level2-rabbit-mq-impl/src/test/resources/log4j.properties
... ... @@ -13,7 +13,7 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c - %m%n
13 13  
14 14 ### set log levels - for more verbose logging change 'info' to 'debug' ###
15 15  
16   -log4j.rootLogger=INFO, stdout
  16 +log4j.rootLogger=TRACE, stdout
17 17  
18 18 log4j.logger.org.hibernate=info
19 19 log4j.logger.org.springframework=info
... ...