Commit 2736df929d5e1434687e9043194150b560e0180a

Authored by Gonzalo Aguilar Delgado
1 parent 99c193ad

Fix function resolution over implementation.

Version Bump
Showing 18 changed files with 101 additions and 72 deletions   Show diff stats
level2-mq-api/pom.xml
... ... @@ -4,7 +4,7 @@
4 4 <parent>
5 5 <groupId>com.level2.server.mq</groupId>
6 6 <artifactId>level2-mq</artifactId>
7   - <version>1.4.2-SNAPSHOT</version>
  7 + <version>1.4.3-SNAPSHOT</version>
8 8 </parent>
9 9 <name>Level2 :: MQ :: API</name>
10 10 <url>http://www.level2crm.com</url>
... ...
level2-mq-examples/pom.xml
... ... @@ -3,7 +3,7 @@
3 3 <parent>
4 4 <groupId>com.level2.server.mq</groupId>
5 5 <artifactId>level2-mq</artifactId>
6   - <version>1.4.2-SNAPSHOT</version>
  6 + <version>1.4.3-SNAPSHOT</version>
7 7 </parent>
8 8 <artifactId>level2-mq-examples</artifactId>
9 9 <name>Level2 :: MQ :: Examples</name>
... ... @@ -37,7 +37,7 @@
37 37 <dependency>
38 38 <groupId>com.level2.server.mq</groupId>
39 39 <artifactId>level2-rabbit-mq-impl</artifactId>
40   - <version>1.4.2-SNAPSHOT</version>
  40 + <version>1.4.3-SNAPSHOT</version>
41 41 </dependency>
42 42 <dependency>
43 43 <groupId>org.slf4j</groupId>
... ...
level2-rabbit-mq-impl/pom.xml
... ... @@ -4,7 +4,7 @@
4 4 <parent>
5 5 <groupId>com.level2.server.mq</groupId>
6 6 <artifactId>level2-mq</artifactId>
7   - <version>1.4.2-SNAPSHOT</version>
  7 + <version>1.4.3-SNAPSHOT</version>
8 8 </parent>
9 9 <name>Level2 :: MQ :: RABBIT</name>
10 10 <url>http://www.level2crm.com</url>
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/mapping/ProcessorClass.java
... ... @@ -9,6 +9,7 @@ public class ProcessorClass {
9 9 protected String name;
10 10 protected String className;
11 11 protected String targetClassName;
  12 + protected String queueName;
12 13  
13 14 public ProcessorClass() {
14 15 listMethods = new ArrayList<MethodDescription>();
... ... @@ -46,4 +47,12 @@ public class ProcessorClass {
46 47 this.targetClassName = targetClassName;
47 48 }
48 49  
  50 + public String getQueueName() {
  51 + return queueName;
  52 + }
  53 +
  54 + public void setQueueName(String queueName) {
  55 + this.queueName = queueName;
  56 + }
  57 +
49 58 }
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/processor/QueuedProcessingNode.java
1 1 package com.level2.enterprise.nebula.mq.processor;
2 2  
  3 +import java.util.List;
  4 +
  5 +import com.level2.enterprise.nebula.mq.api.QueueDescriptor;
3 6 import com.level2.enterprise.nebula.mq.processor.api.ProcessingNode;
  7 +import com.level2.enterprise.nebula.mq.processor.impl.QueuedProcessor;
  8 +import com.level2.enterprise.nebula.mq.processor.target.MessageTarget;
4 9  
5 10 public interface QueuedProcessingNode extends ProcessingNode {
  11 + boolean registerQueues(QueueDescriptor[] queueHandlers);
  12 + boolean bindQueue(QueueDescriptor queue, String routingKey);
  13 + void start(String queueHandle);
  14 + QueuedProcessor getProcessor();
6 15 }
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/processor/QueuedRabbitProcessingNodeImpl.java
... ... @@ -59,6 +59,7 @@ public class QueuedRabbitProcessingNodeImpl extends DefaultConsumer implements Q
59 59  
60 60 }
61 61  
  62 + @Override
62 63 public boolean bindQueue(QueueDescriptor queue, String routingKey) {
63 64 boolean retval = false;
64 65 try {
... ... @@ -71,6 +72,7 @@ public class QueuedRabbitProcessingNodeImpl extends DefaultConsumer implements Q
71 72 return retval;
72 73 }
73 74  
  75 + @Override
74 76 public void start(String queueHandle)
75 77 {
76 78 try {
... ... @@ -87,6 +89,7 @@ public class QueuedRabbitProcessingNodeImpl extends DefaultConsumer implements Q
87 89 }
88 90 }
89 91  
  92 + @Override
90 93 public boolean registerQueues(QueueDescriptor[] queueHandlers) {
91 94 boolean retval = true;
92 95  
... ... @@ -167,6 +170,7 @@ public class QueuedRabbitProcessingNodeImpl extends DefaultConsumer implements Q
167 170 super.handleShutdownSignal(consumerTag, sig);
168 171 }
169 172  
  173 + @Override
170 174 public QueuedProcessor getProcessor() {
171 175 return processor;
172 176 }
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/processor/annotated/AnnotatedRabbitMQProcessor.java
... ... @@ -3,5 +3,5 @@ package com.level2.enterprise.nebula.mq.processor.annotated;
3 3 import com.level2.enterprise.nebula.mq.processor.api.ProcessingNode;
4 4  
5 5 public interface AnnotatedRabbitMQProcessor extends ProcessingNode {
6   - void registerTarget(Object object);
  6 + void registerTarget(Class<?> target);
7 7 }
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/processor/annotated/AnnotatedRabbitMQProcessorImpl.java
... ... @@ -3,7 +3,6 @@ package com.level2.enterprise.nebula.mq.processor.annotated;
3 3 import java.util.HashMap;
4 4 import java.util.List;
5 5  
6   -import org.apache.commons.lang.StringUtils;
7 6 import org.slf4j.Logger;
8 7 import org.slf4j.LoggerFactory;
9 8  
... ... @@ -12,98 +11,82 @@ import com.level2.enterprise.nebula.mq.api.QueueDescriptor;
12 11 import com.level2.enterprise.nebula.mq.mapping.MethodDescription;
13 12 import com.level2.enterprise.nebula.mq.mapping.ProcessorClass;
14 13 import com.level2.enterprise.nebula.mq.processor.QueueDescriptorImpl;
  14 +import com.level2.enterprise.nebula.mq.processor.QueuedProcessingNode;
15 15 import com.level2.enterprise.nebula.mq.processor.QueuedRabbitProcessingNodeImpl;
16   -import com.level2.enterprise.nebula.mq.processor.api.Queued;
17 16 import com.level2.enterprise.nebula.mq.processor.target.AnnotatedMessageTargetImpl;
18 17 import com.level2.enterprise.nebula.mq.processor.target.MessageTarget;
19 18 import com.level2.enterprise.nebula.mq.reflection.AnnotationClassMapper;
20 19 import com.level2.enterprise.nebula.mq.reflection.ClassMapper;
21 20 import com.rabbitmq.client.Channel;
22 21  
23   -public class AnnotatedRabbitMQProcessorImpl extends QueuedRabbitProcessingNodeImpl implements AnnotatedRabbitMQProcessor {
  22 +public class AnnotatedRabbitMQProcessorImpl implements AnnotatedRabbitMQProcessor {
24 23 private Logger log = LoggerFactory.getLogger(AnnotatedRabbitMQProcessorImpl.class);
25 24  
26 25 ExchangeDescriptor exchange;
27   - HashMap<String,Object> listTargets;
  26 + HashMap<String,ProcessorClass> listTargets;
28 27 ClassMapper mapper;
  28 + QueuedProcessingNode processorNode;
29 29  
30 30 public AnnotatedRabbitMQProcessorImpl(Channel channel, ExchangeDescriptor exchange) {
31   - super(channel);
  31 + processorNode = new QueuedRabbitProcessingNodeImpl(channel);
32 32 this.exchange = exchange;
33 33 mapper = new AnnotationClassMapper();
34   - listTargets = new HashMap<String,Object>();
  34 + listTargets = new HashMap<String,ProcessorClass>();
35 35 }
36 36  
37 37 @Override
38   - public void registerTarget(Object object)
  38 + public void registerTarget(Class<?> target)
39 39 {
40   - String queueName = getQueueName(object);
41   - if(listTargets.get(queueName)==null){
42   - QueueDescriptor queueHandle = new QueueDescriptorImpl(queueName, this.exchange);
43   - if(!declareQueues(object, queueHandle)){
  40 + ProcessorClass processorClass = mapper.mapObject(target);
  41 + if(listTargets.get(processorClass.getQueueName())==null){
  42 + QueueDescriptor queueHandle = new QueueDescriptorImpl(processorClass.getQueueName(), this.exchange);
  43 + if(!declareQueues(processorClass, queueHandle)){
44 44 log.warn("There was an error declaring targets");
45 45 }
46 46 }
47 47 }
48 48  
49   - /**
50   - * It will return the class name or the name indicated by queueName annotation
51   - * @param target class request will be directed to
52   - * @return a String with the queue name of the target
53   - */
54   - public String getQueueName(Object target)
55   - {
56   - String queueName = target.getClass().getName();
57   - if(target.getClass().isAnnotationPresent(Queued.class)){
58   - Queued annotation = target.getClass().getAnnotation(Queued.class);
59   - if(!StringUtils.isBlank(annotation.queueName())){
60   - queueName = annotation.queueName();
61   - }
62   - }
63   - return queueName;
64   - }
65 49  
66   - protected boolean declareQueues(Object target, QueueDescriptor queueHandle)
  50 +
  51 + protected boolean declareQueues(ProcessorClass processorClass, QueueDescriptor queueHandle)
67 52 {
68 53 boolean retval = true;
69   -
70   - ProcessorClass processorClass = mapper.mapObject(target.getClass());
71   -
72 54 // We will create a thread with one channel that will handle all methods in the object
73 55 // The queue name will be the class name.
74 56 // The method will be the one determined by the class name + method if it has annotation
75 57 if(processorClass!=null){
76 58  
77   - if(registerQueues(new QueueDescriptor[]{queueHandle})){
  59 + if(processorNode.registerQueues(new QueueDescriptor[]{queueHandle})){
78 60 log.debug("Queue {} registered", queueHandle.getQueueHandle());
79   - listTargets.put(target.getClass().getName(), target);
80   - start(queueHandle.getQueueHandle());
  61 + listTargets.put(processorClass.getQueueName(), processorClass);
81 62 }
82 63  
83 64 for(MethodDescription method : processorClass.getListMethods()){
84 65 log.debug("Creating processor MQ for method {}", method.getRoutingKey());
85 66 log.debug("Binding the queue {} to exchange {} with routing key {}",queueHandle.getQueueHandle(), queueHandle.getExchange().toString(), method.getRoutingKey());
86   - if(!bindQueue(queueHandle, method.getRoutingKey())){
  67 + if(!processorNode.bindQueue(queueHandle, method.getRoutingKey())){
87 68 log.warn("Unable to bind to queue {}", queueHandle.getQueueHandle());
88 69 }
89 70 // Assign a worker to this method
90   - if(!createTarget(target, queueHandle, method.getRoutingKey())){
  71 + if(!createTarget(processorClass, queueHandle, method.getRoutingKey())){
91 72 log.warn("There was no possible to assign a worker to the method");
92 73 }
93 74 retval = true;
94 75 }
95 76  
  77 + if(retval) processorNode.start(queueHandle.getQueueHandle());
  78 +
96 79 }
97 80 return retval;
98 81 }
99 82  
100   - protected boolean createTarget(Object target, QueueDescriptor queueHandle, String routingKey)
  83 + protected boolean createTarget(ProcessorClass processorClass, QueueDescriptor queueHandle, String routingKey)
101 84 {
102 85 boolean retval = true;
103   - List<MessageTarget> listTargets = getProcessor().getTargets(routingKey);
  86 + List<MessageTarget> listTargets = processorNode.getProcessor().getTargets(routingKey);
104 87 if(listTargets.size()==0){
105 88 AnnotatedMessageTargetImpl messageTarget = new AnnotatedMessageTargetImpl();
106   - messageTarget.setTarget(target);
  89 + messageTarget.setTarget(processorClass);
107 90 listTargets.add(messageTarget);
108 91 }else{
109 92 log.debug("There's one worker for the queue {} and target {}", queueHandle.getQueueHandle(), routingKey);
... ... @@ -111,9 +94,13 @@ public class AnnotatedRabbitMQProcessorImpl extends QueuedRabbitProcessingNodeIm
111 94 return retval;
112 95 }
113 96  
  97 + public QueuedProcessingNode getProcessorNode() {
  98 + return processorNode;
  99 + }
114 100  
115   -
116   -
117   -
  101 + @Override
  102 + public void shutdown() {
  103 + if(processorNode!=null) processorNode.shutdown();
  104 + }
118 105  
119 106 }
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/processor/api/ProcessingNode.java
1 1 package com.level2.enterprise.nebula.mq.processor.api;
2 2  
  3 +import com.level2.enterprise.nebula.mq.api.QueueDescriptor;
  4 +
3 5 public interface ProcessingNode {
4 6  
5 7 void shutdown();
6 8  
  9 +
7 10 }
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/processor/handler/MessageWorkerImpl.java
... ... @@ -27,8 +27,8 @@ public abstract class MessageWorkerImpl implements MessageWorker {
27 27 public AsyncMessageJob call() {
28 28 log.trace("We started to process a job on thread {}", Thread.currentThread().getName());
29 29 AsyncMessageJob retval = null;
30   - synchronized(messageJob){
31   - if(messageJob!=null){
  30 + if(messageJob!=null){
  31 + synchronized(messageJob){
32 32 try {
33 33 Serializable request = deserializeRequest(messageJob.getRequest(), messageJob.getRequestContentType());
34 34 MessageResult result = handle(request);
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/processor/handler/annotated/AnnotationBasedWorkerImpl.java
... ... @@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory;
9 9  
10 10 import com.level2.enterprise.nebula.mq.api.MediaType;
11 11 import com.level2.enterprise.nebula.mq.api.MessageResult;
  12 +import com.level2.enterprise.nebula.mq.mapping.ProcessorClass;
12 13 import com.level2.enterprise.nebula.mq.processor.handler.AsyncMessageJob;
13 14 import com.level2.enterprise.nebula.mq.processor.handler.CannotHandleMessage;
14 15 import com.level2.enterprise.nebula.mq.processor.handler.MessageWorkerImpl;
... ... @@ -19,7 +20,7 @@ public class AnnotationBasedWorkerImpl extends MessageWorkerImpl implements Anno
19 20 protected Class<? extends Serializable> requestClass;
20 21 protected Class<? extends Serializable> responseClass;
21 22 protected Method handlerMethod;
22   - protected Object target;
  23 + protected ProcessorClass target;
23 24  
24 25 public AnnotationBasedWorkerImpl() {
25 26 super();
... ... @@ -98,11 +99,11 @@ public class AnnotationBasedWorkerImpl extends MessageWorkerImpl implements Anno
98 99 public void setRequestClass(Class<? extends Serializable> requestClass) {
99 100 }
100 101  
101   - public Object getTarget() {
  102 + public ProcessorClass getTarget() {
102 103 return target;
103 104 }
104 105  
105   - public void setTarget(Object target) {
  106 + public void setTarget(ProcessorClass target) {
106 107 this.target = target;
107 108 }
108 109  
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/processor/impl/QueuedProcessorImpl.java
... ... @@ -51,7 +51,7 @@ public class QueuedProcessorImpl implements QueuedProcessor {
51 51 processing=true;
52 52 synchronized(requestQueue){
53 53 AsyncMessageJob messageJob=null;
54   - while((messageJob = requestQueue.poll())!=null){
  54 + while((messageJob = requestQueue.poll())!=null && registeredTargets.size()>0){
55 55 log.trace("Placing job {} - {}", messageJob.hashCode(), messageJob.getDeliveryTag());
56 56 // Try to put in a worker
57 57 if(!placeJob(messageJob)){
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/processor/target/AnnotatedMessageTargetImpl.java
1 1 package com.level2.enterprise.nebula.mq.processor.target;
2 2  
  3 +import com.level2.enterprise.nebula.mq.mapping.ProcessorClass;
3 4 import com.level2.enterprise.nebula.mq.processor.handler.MessageWorker;
4 5 import com.level2.enterprise.nebula.mq.processor.handler.annotated.AnnotationBasedWorkerImpl;
5 6  
6 7 public class AnnotatedMessageTargetImpl extends MessageTargetImpl {
7   - protected Object target;
  8 + protected ProcessorClass target;
8 9  
9 10 public AnnotatedMessageTargetImpl() {
10 11 super(AnnotationBasedWorkerImpl.class);
... ... @@ -17,11 +18,11 @@ public class AnnotatedMessageTargetImpl extends MessageTargetImpl {
17 18 return worker;
18 19 }
19 20  
20   - public Object getTarget() {
  21 + public ProcessorClass getTarget() {
21 22 return target;
22 23 }
23 24  
24   - public void setTarget(Object target) {
  25 + public void setTarget(ProcessorClass target) {
25 26 this.target = target;
26 27 }
27 28  
... ...
level2-rabbit-mq-impl/src/main/java/com/level2/enterprise/nebula/mq/reflection/AnnotationClassMapper.java
... ... @@ -14,11 +14,13 @@ import java.util.List;
14 14 import java.util.zip.ZipEntry;
15 15 import java.util.zip.ZipInputStream;
16 16  
  17 +import org.apache.commons.lang.StringUtils;
17 18 import org.slf4j.Logger;
18 19 import org.slf4j.LoggerFactory;
19 20  
20 21 import com.level2.enterprise.nebula.mq.mapping.MethodDescription;
21 22 import com.level2.enterprise.nebula.mq.mapping.ProcessorClass;
  23 +import com.level2.enterprise.nebula.mq.processor.api.Queued;
22 24 import com.level2.enterprise.nebula.mq.processor.api.QueuedMethod;
23 25  
24 26 public class AnnotationClassMapper implements ClassMapper {
... ... @@ -76,6 +78,7 @@ public class AnnotationClassMapper implements ClassMapper {
76 78 processorClass = new ProcessorClass();
77 79 processorClass.setName(target.getSimpleName());
78 80 processorClass.setClassName(target.getName());
  81 + processorClass.setQueueName(getQueueName(target));
79 82 List<MethodDescription> listMethodDescription= new ArrayList<MethodDescription>();
80 83 for(Method method: listMethods){
81 84 MethodDescription descriptor = this.mapMethod(method);
... ... @@ -97,6 +100,23 @@ public class AnnotationClassMapper implements ClassMapper {
97 100 }
98 101 return methodDescription;
99 102 }
  103 +
  104 + /**
  105 + * It will return the class name or the name indicated by queueName annotation
  106 + * @param target class request will be directed to
  107 + * @return a String with the queue name of the target
  108 + */
  109 + protected String getQueueName(Class<?> target)
  110 + {
  111 + String queueName = target.getName();
  112 + if(target.getClass().isAnnotationPresent(Queued.class)){
  113 + Queued annotation = target.getAnnotation(Queued.class);
  114 + if(!StringUtils.isBlank(annotation.queueName())){
  115 + queueName = annotation.queueName();
  116 + }
  117 + }
  118 + return queueName;
  119 + }
100 120  
101 121 protected boolean isMQValid(Method method, MethodDescription descriptor)
102 122 {
... ...
level2-rabbit-mq-impl/src/test/java/com/level2/enterprise/nebula/mq/AnnotatedRabbitMQRPCTests.java
... ... @@ -49,8 +49,6 @@ public class AnnotatedRabbitMQRPCTests {
49 49 {
50 50 log.info("************* test_SendAndReceive_UsingClass started *************");
51 51  
52   - AnnotatedTestClass targetClass = new AnnotatedTestClassImpl();
53   -
54 52 RabbitConnectionManagerImpl connectionManager = new RabbitConnectionManagerImpl();
55 53 // connectionManager.setAddress("127.0.0.100", 5567, false, 10000);
56 54 assertTrue(connectionManager.connect());
... ... @@ -59,7 +57,7 @@ public class AnnotatedRabbitMQRPCTests {
59 57 AnnotatedRabbitRPCImpl annotatedRPC = new AnnotatedRabbitRPCImpl(connectionManager.getChannel(), RabbitExchangeDescriptor.RABBIT_DEFAULT_EXCHANGE, AnnotatedTestClassImpl.class);
60 58  
61 59 AnnotatedRabbitMQProcessorImpl processor = new AnnotatedRabbitMQProcessorImpl(connectionManager.getChannel(),RabbitExchangeDescriptor.RABBIT_DEFAULT_EXCHANGE);
62   - processor.registerTarget(targetClass);
  60 + processor.registerTarget(AnnotatedTestClassImpl.class);
63 61 String message = "Message 1";
64 62 do{
65 63 String result = (String) annotatedRPC.call("getMessage", message);
... ... @@ -78,7 +76,7 @@ public class AnnotatedRabbitMQRPCTests {
78 76 Assert.assertEquals(String.format("Response to message [%s] on 2",message), result);
79 77 log.info("Finished, received message [{}]", result);
80 78 break;
81   - }while((System.nanoTime()-startTime) / 1000000 < 1000000); //
  79 + }while((System.nanoTime()-startTime) / 1000000 < 100000000); //
82 80  
83 81 processor.shutdown();
84 82 annotatedRPC.shutdown();
... ...
level2-rabbit-mq-impl/src/test/java/com/level2/enterprise/nebula/mq/processor/annotated/AnnotatedRabbitMQProcessorTests.java
... ... @@ -50,13 +50,12 @@ public class AnnotatedRabbitMQProcessorTests {
50 50 @Test
51 51 public void test_QueueManagement() throws Exception
52 52 {
53   - AnnotatedTestClassImpl targetClass = new AnnotatedTestClassImpl();
54 53 RabbitConnectionManagerImpl connectionManager = new RabbitConnectionManagerImpl();
55 54 // connectionManager.setAddress("127.0.0.100", 5567, false, 10000);
56 55 assertTrue(connectionManager.connect());
57 56 log.info("Running...");
58 57 AnnotatedRabbitMQProcessorImpl processor = new AnnotatedRabbitMQProcessorImpl(connectionManager.getChannel(),RabbitMQExchangeTest.RABBIT_DEFAULT_TEST_EXCHANGE);
59   - processor.registerTarget(targetClass);
  58 + processor.registerTarget(RabbitConnectionManagerImpl.class);
60 59  
61 60 // AnnotatedRabbitRPC client=new AnnotatedRabbitRPC(RabbitMQExchangeTest.RABBIT_DEFAULT_TEST_EXCHANGE, AnnotatedTestClassImpl.class){
62 61  
... ... @@ -84,9 +83,8 @@ public class AnnotatedRabbitMQProcessorTests {
84 83 assertTrue(connectionManager.connect());
85 84 log.info("Running...");
86 85  
87   - AnnotatedTestClassImpl targetClass = new AnnotatedTestClassImpl();
88   - AnnotatedRabbitMQProcessorImpl processingNode = new AnnotatedRabbitMQProcessorImpl(connectionManager.getChannel(),RabbitMQExchangeTest.RABBIT_DEFAULT_TEST_EXCHANGE);
89   - processingNode.registerTarget(targetClass);
  86 + AnnotatedRabbitMQProcessorImpl mqProcessor = new AnnotatedRabbitMQProcessorImpl(connectionManager.getChannel(),RabbitMQExchangeTest.RABBIT_DEFAULT_TEST_EXCHANGE);
  87 + mqProcessor.registerTarget(AnnotatedTestClassImpl.class);
90 88  
91 89  
92 90 Channel channel = connectionManager.getChannel();
... ... @@ -113,7 +111,7 @@ public class AnnotatedRabbitMQProcessorTests {
113 111 log.info("TIMING: Publishing {} messages took " + (System.nanoTime()-startTime) / 1000000 + " ms", maxMessages);
114 112  
115 113 for(int i=0; i<5; i++){
116   - while(processingNode.getProcessor().hasPendingResponses() || processingNode.getProcessor().hasPendingRequests()){
  114 + while(mqProcessor.getProcessorNode().getProcessor().hasPendingResponses() || mqProcessor.getProcessorNode().getProcessor().hasPendingRequests()){
117 115 Thread.sleep(500L);
118 116 i=0;
119 117 }
... ... @@ -132,7 +130,7 @@ public class AnnotatedRabbitMQProcessorTests {
132 130 log.warn("Cannot close channel because timeout error: {}", e.getMessage());
133 131 }
134 132  
135   - processingNode.shutdown();
  133 + mqProcessor.shutdown();
136 134 connectionManager.shutdown();
137 135 log.info("************* test_UseBothMethods finished *************");
138 136  
... ... @@ -150,9 +148,8 @@ public class AnnotatedRabbitMQProcessorTests {
150 148 assertTrue(connectionManager.connect());
151 149 log.info("Running...");
152 150  
153   - AnnotatedTestClassImpl targetClass = new AnnotatedTestClassImpl();
154 151 AnnotatedRabbitMQProcessorImpl processingNode = new AnnotatedRabbitMQProcessorImpl(connectionManager.getChannel(),RabbitMQExchangeTest.RABBIT_DEFAULT_TEST_EXCHANGE);
155   - processingNode.registerTarget(targetClass);
  152 + processingNode.registerTarget(AnnotatedTestClassImpl.class);
156 153  
157 154  
158 155 Channel channel = connectionManager.getChannel();
... ... @@ -178,7 +175,7 @@ public class AnnotatedRabbitMQProcessorTests {
178 175 log.info("TIMING: Publishing {} messages took " + (System.nanoTime()-startTime) / 1000000 + " ms", maxMessages);
179 176  
180 177 for(int i=0; i<5; i++){
181   - while(processingNode.getProcessor().hasPendingResponses() || processingNode.getProcessor().hasPendingRequests()){
  178 + while(processingNode.getProcessorNode().getProcessor().hasPendingResponses() || processingNode.getProcessorNode().getProcessor().hasPendingRequests()){
182 179 Thread.sleep(500L);
183 180 i=0;
184 181 }
... ...
level2-rabbit-mq-impl/src/test/resources/log4j.properties
... ... @@ -13,7 +13,7 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c - %m%n
13 13  
14 14 ### set log levels - for more verbose logging change 'info' to 'debug' ###
15 15  
16   -log4j.rootLogger=debug, stdout
  16 +log4j.rootLogger=trace, stdout
17 17  
18 18 log4j.logger.org.hibernate=info
19 19 log4j.logger.org.springframework=info
... ...
pom.xml
... ... @@ -5,7 +5,7 @@
5 5 <groupId>com.level2.server.mq</groupId>
6 6 <artifactId>level2-mq</artifactId>
7 7 <packaging>pom</packaging>
8   - <version>1.4.2-SNAPSHOT</version>
  8 + <version>1.4.3-SNAPSHOT</version>
9 9 <name>Level2 :: MQ</name>
10 10 <prerequisites>
11 11 <maven>3.0.0</maven>
... ...