00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 package com.scalagent.kjoram;
00025
00026 import com.scalagent.kjoram.jms.*;
00027 import com.scalagent.kjoram.util.TimerTask;
00028
00029 import java.util.Vector;
00030
00031 import com.scalagent.kjoram.excepts.IllegalStateException;
00032 import com.scalagent.kjoram.excepts.*;
00033
00034
00035 public class MessageConsumer
00036 {
00038 private String selector;
00040 private MessageListener messageListener = null;
00042 private boolean durableSubscriber;
00044 private AbstractJmsRequest pendingReq = null;
00049 private boolean receiving = false;
00051 private TimerTask replyingTask = null;
00052
00054 protected Destination dest;
00059 protected boolean noLocal;
00061 protected boolean closed = false;
00062
00064 Session sess;
00069 String targetName;
00071 boolean queueMode;
00072
00087 MessageConsumer(Session sess, Destination dest, String selector,
00088 String subName, boolean noLocal) throws JMSException
00089 {
00090 if (dest == null)
00091 throw new InvalidDestinationException("Invalid null destination.");
00092
00093 if (dest instanceof TemporaryQueue) {
00094 Connection tempQCnx = ((TemporaryQueue) dest).getCnx();
00095
00096 if (tempQCnx == null || ! tempQCnx.equals(sess.cnx))
00097 throw new JMSSecurityException("Forbidden consumer on this "
00098 + "temporary destination.");
00099 }
00100 else if (dest instanceof TemporaryTopic) {
00101 Connection tempTCnx = ((TemporaryTopic) dest).getCnx();
00102
00103 if (tempTCnx == null || ! tempTCnx.equals(sess.cnx))
00104 throw new JMSSecurityException("Forbidden consumer on this "
00105 + "temporary destination.");
00106 }
00107
00108
00109 if (dest instanceof Topic) {
00110 if (subName == null) {
00111 subName = sess.cnx.nextSubName();
00112 durableSubscriber = false;
00113 }
00114 else
00115 durableSubscriber = true;
00116
00117 sess.cnx.syncRequest(new ConsumerSubRequest(dest.getName(),
00118 subName,
00119 selector,
00120 noLocal,
00121 durableSubscriber));
00122 targetName = subName;
00123 this.noLocal = noLocal;
00124 queueMode = false;
00125 }
00126 else {
00127 targetName = dest.getName();
00128 queueMode = true;
00129 }
00130
00131 this.sess = sess;
00132 this.dest = dest;
00133 this.selector = selector;
00134
00135 sess.consumers.addElement(this);
00136
00137 if (JoramTracing.dbgClient)
00138 JoramTracing.log(JoramTracing.DEBUG, this + ": created.");
00139 }
00140
00152 MessageConsumer(Session sess, Destination dest,
00153 String selector) throws JMSException
00154 {
00155 this(sess, dest, selector, null, false);
00156 }
00157
00159 public String toString()
00160 {
00161 return "Consumer:" + sess.ident;
00162 }
00163
00181 public void setMessageListener(MessageListener messageListener)
00182 throws JMSException
00183 {
00184 if (closed)
00185 throw new IllegalStateException("Forbidden call on a closed consumer.");
00186
00187 if (JoramTracing.dbgClient)
00188 JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00189 + ": setting MessageListener to "
00190 + messageListener);
00191
00192 if (sess.cnx.started && JoramTracing.dbgClient)
00193 JoramTracing.log(JoramTracing.WARN, this + ": improper call"
00194 + " on a started connection.");
00195
00196
00197 if (this.messageListener != null && messageListener == null) {
00198 if (JoramTracing.dbgClient)
00199 JoramTracing.log(JoramTracing.DEBUG, this + ": unsets"
00200 + " listener request.");
00201
00202 sess.cnx.requestsTable.remove(pendingReq.getKey());
00203
00204 this.messageListener = messageListener;
00205 sess.msgListeners--;
00206
00207 ConsumerUnsetListRequest unsetLR = null;
00208 if (queueMode) {
00209 unsetLR = new ConsumerUnsetListRequest(true);
00210 unsetLR.setCancelledRequestId(pendingReq.getRequestId());
00211 }
00212 else {
00213 unsetLR = new ConsumerUnsetListRequest(false);
00214 unsetLR.setTarget(targetName);
00215 }
00216
00217 try {
00218 sess.cnx.syncRequest(unsetLR);
00219 }
00220
00221 catch (JMSException jE) {}
00222 pendingReq = null;
00223
00224
00225 if (sess.msgListeners == 0 && sess.started) {
00226 if (JoramTracing.dbgClient)
00227 JoramTracing.log(JoramTracing.DEBUG, this + ": stops the"
00228 + " session daemon.");
00229 sess.daemon.stop();
00230 sess.daemon = null;
00231 sess.started = false;
00232 }
00233 }
00234
00235 else if (this.messageListener == null && messageListener != null) {
00236 sess.msgListeners++;
00237
00238 if (sess.msgListeners == 1
00239 && (sess.started || sess.cnx.started)) {
00240 if (JoramTracing.dbgClient)
00241 JoramTracing.log(JoramTracing.DEBUG, this + ": starts the"
00242 + " session daemon.");
00243 sess.daemon = new SessionDaemon(sess);
00244 sess.daemon.setDaemon(false);
00245 sess.daemon.start();
00246 sess.started = true;
00247 }
00248
00249 this.messageListener = messageListener;
00250 pendingReq = new ConsumerSetListRequest(targetName, selector, queueMode);
00251 pendingReq.setRequestId(sess.cnx.nextRequestId());
00252 sess.cnx.requestsTable.put(pendingReq.getKey(), this);
00253 sess.cnx.asyncRequest(pendingReq);
00254 }
00255
00256 if (JoramTracing.dbgClient)
00257 JoramTracing.log(JoramTracing.DEBUG, this + ": MessageListener"
00258 + " set.");
00259 }
00260
00266 public MessageListener getMessageListener() throws JMSException
00267 {
00268 if (closed)
00269 throw new IllegalStateException("Forbidden call on a closed consumer.");
00270
00271 return messageListener;
00272 }
00273
00279 public String getMessageSelector() throws JMSException
00280 {
00281 if (closed)
00282 throw new IllegalStateException("Forbidden call on a closed consumer.");
00283
00284 return selector;
00285 }
00286
00296 public Message receive(long timeOut) throws JMSException
00297 {
00298
00299 synchronized(this) {
00300 if (JoramTracing.dbgClient)
00301 JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00302 + ": requests to receive a message.");
00303
00304 if (closed)
00305 throw new IllegalStateException("Forbidden call on a closed consumer.");
00306
00307 if (messageListener != null) {
00308 if (JoramTracing.dbgClient)
00309 JoramTracing.log(JoramTracing.WARN, "Improper call as a"
00310 + " listener exists for this consumer.");
00311 }
00312 else if (sess.msgListeners > 0) {
00313 if (JoramTracing.dbgClient)
00314 JoramTracing.log(JoramTracing.WARN, "Improper call as"
00315 + " asynchronous consumers have already"
00316 + " been set on the session.");
00317 }
00318 pendingReq = new ConsumerReceiveRequest(targetName, selector, timeOut,
00319 queueMode);
00320 pendingReq.setRequestId(sess.cnx.nextRequestId());
00321 receiving = true;
00322
00323
00324 if (timeOut > 0) {
00325 replyingTask = new ConsumerReplyTask(pendingReq);
00326 sess.schedule(replyingTask, timeOut);
00327 }
00328 }
00329
00330
00331 ConsumerMessages reply =
00332 (ConsumerMessages) sess.cnx.syncRequest(pendingReq);
00333
00334
00335 synchronized(this) {
00336 receiving = false;
00337 pendingReq = null;
00338 if (replyingTask != null)
00339 replyingTask.cancel();
00340 if (JoramTracing.dbgClient)
00341 JoramTracing.log(JoramTracing.DEBUG, this + ": received a"
00342 + " reply.");
00343
00344 Vector msgs = reply.getMessages();
00345 if (msgs != null && ! msgs.isEmpty()) {
00346 com.scalagent.kjoram.messages.Message msg =
00347 (com.scalagent.kjoram.messages.Message) msgs.elementAt(0);
00348 String msgId = msg.getIdentifier();
00349
00350 if (sess.autoAck)
00351 sess.cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId,
00352 queueMode));
00353
00354 else
00355 sess.prepareAck(targetName, msgId, queueMode);
00356
00357 return Message.wrapMomMessage(sess, msg);
00358 }
00359 else
00360 return null;
00361 }
00362 }
00363
00373 public Message receive() throws JMSException
00374 {
00375 return receive(0);
00376 }
00377
00387 public Message receiveNoWait() throws JMSException
00388 {
00389 return receive(-1);
00390 }
00391
00397 public void close() throws JMSException
00398 {
00399
00400 if (closed)
00401 return;
00402
00403 if (JoramTracing.dbgClient)
00404 JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00405 + ": closing...");
00406
00407
00408 syncro();
00409
00410
00411 Object lock = null;
00412 if (pendingReq != null)
00413 lock = sess.cnx.requestsTable.remove(pendingReq.getKey());
00414 sess.consumers.removeElement(this);
00415
00416
00417 try {
00418 if (messageListener != null) {
00419 if (JoramTracing.dbgClient)
00420 JoramTracing.log(JoramTracing.DEBUG, "Unsetting listener.");
00421
00422 if (queueMode) {
00423 ConsumerUnsetListRequest unsetLR =
00424 new ConsumerUnsetListRequest(true);
00425 unsetLR.setCancelledRequestId(pendingReq.getRequestId());
00426 sess.cnx.syncRequest(unsetLR);
00427 }
00428 }
00429
00430 if (durableSubscriber)
00431 sess.cnx.syncRequest(new ConsumerCloseSubRequest(targetName));
00432 else if (! queueMode)
00433 sess.cnx.syncRequest(new ConsumerUnsubRequest(targetName));
00434 }
00435
00436 catch (JMSException jE) {}
00437
00438
00439 if (lock != null && receiving) {
00440 if (JoramTracing.dbgClient)
00441 JoramTracing.log(JoramTracing.DEBUG, "Replying to the"
00442 + " pending receive "
00443 + pendingReq.getRequestId()
00444 + " with a null message.");
00445
00446 sess.cnx.repliesTable.put(pendingReq.getKey(), new ConsumerMessages());
00447
00448 synchronized(lock) {
00449 lock.notify();
00450 }
00451 }
00452
00453
00454 syncro();
00455
00456 closed = true;
00457
00458 if (JoramTracing.dbgClient)
00459 JoramTracing.log(JoramTracing.DEBUG, this + ": closed.");
00460 }
00461
00466 synchronized void syncro() {}
00467
00472 synchronized void onMessage(com.scalagent.kjoram.messages.Message message)
00473 {
00474 String msgId = message.getIdentifier();
00475
00476 try {
00477
00478
00479 if (messageListener == null) {
00480 if (JoramTracing.dbgClient)
00481 JoramTracing.log(JoramTracing.WARN, this + ": an"
00482 + " asynchronous delivery arrived"
00483 + " for an improperly unset listener:"
00484 + " denying the message.");
00485 sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,
00486 queueMode, true));
00487 }
00488 else {
00489
00490 if (! sess.autoAck)
00491 sess.prepareAck(targetName, msgId, queueMode);
00492
00493 try {
00494 messageListener.onMessage(Message.wrapMomMessage(sess, message));
00495
00496 if (sess.autoAck)
00497 sess.cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId,
00498 queueMode));
00499 }
00500
00501
00502 catch (JMSException jE) {
00503 JoramTracing.log(JoramTracing.ERROR, this
00504 + ": error while processing the"
00505 + " received message: " + jE);
00506
00507 if (queueMode)
00508 sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,
00509 queueMode));
00510 else
00511 sess.cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,
00512 queueMode));
00513 }
00514
00515
00516 catch (RuntimeException rE) {
00517 JoramTracing.log(JoramTracing.ERROR, this
00518 + ": RuntimeException thrown"
00519 + " by the listener: " + rE);
00520
00521 if (sess.autoAck && queueMode)
00522 sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,
00523 queueMode));
00524 else if (sess.autoAck && ! queueMode)
00525 sess.cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,
00526 queueMode));
00527 }
00528
00529 if (queueMode) {
00530 pendingReq = new ConsumerSetListRequest(targetName, selector, true);
00531 pendingReq.setRequestId(sess.cnx.nextRequestId());
00532 sess.cnx.requestsTable.put(pendingReq.getKey(), this);
00533 sess.cnx.asyncRequest(pendingReq);
00534 }
00535 }
00536 }
00537
00538
00539
00540 catch (JMSException jE) {
00541 JoramTracing.log(JoramTracing.ERROR, this + ": " + jE);
00542 }
00543 }
00544
00549 private class ConsumerReplyTask extends TimerTask
00550 {
00552 private AbstractJmsRequest request;
00554 private ConsumerMessages nullReply;
00555
00561 ConsumerReplyTask(AbstractJmsRequest request)
00562 {
00563 this.request = request;
00564 this.nullReply = new ConsumerMessages(request.getRequestId(),
00565 targetName,
00566 queueMode);
00567 }
00568
00573 public void run()
00574 {
00575 try {
00576 if (JoramTracing.dbgClient)
00577 JoramTracing.log(JoramTracing.WARN, "Receive request" +
00578 " answered because timer expired");
00579
00580 Lock lock = (Lock) sess.cnx.requestsTable.remove(request.getKey());
00581
00582 if (lock == null)
00583 return;
00584
00585 synchronized (lock) {
00586 sess.cnx.repliesTable.put(request.getKey(), nullReply);
00587 lock.notify();
00588 }
00589 }
00590 catch (Exception e) {}
00591 }
00592 }
00593 }