com/scalagent/kjoram/MessageConsumer.java

00001 /*
00002  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
00003  * Copyright (C) 2001 - ScalAgent Distributed Technologies
00004  * Copyright (C) 1996 - Dyade
00005  *
00006  * This library is free software; you can redistribute it and/or
00007  * modify it under the terms of the GNU Lesser General Public
00008  * License as published by the Free Software Foundation; either
00009  * version 2.1 of the License, or any later version.
00010  * 
00011  * This library is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00014  * Lesser General Public License for more details.
00015  * 
00016  * You should have received a copy of the GNU Lesser General Public
00017  * License along with this library; if not, write to the Free Software
00018  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00019  * USA.
00020  *
00021  * Initial developer(s): Frederic Maistre (INRIA)
00022  * Contributor(s): Nicolas Tachker (ScalAgent)
00023  */
00024 package com.scalagent.kjoram;
00025 
00026 import com.scalagent.kjoram.jms.*;
00027 import com.scalagent.kjoram.util.TimerTask;
00028 
00029 import java.util.Vector;
00030 
00031 import com.scalagent.kjoram.excepts.IllegalStateException;
00032 import com.scalagent.kjoram.excepts.*;
00033 
00034 
00035 public class MessageConsumer
00036 {
00038   private String selector;
00040   private MessageListener messageListener = null;
00042   private boolean durableSubscriber;
00044   private AbstractJmsRequest pendingReq = null;
00049   private boolean receiving = false;
00051   private TimerTask replyingTask = null;
00052 
00054   protected Destination dest;
00059   protected boolean noLocal;
00061   protected boolean closed = false;
00062 
00064   Session sess;
00069   String targetName;
00071   boolean queueMode;
00072 
00087   MessageConsumer(Session sess, Destination dest, String selector,
00088                   String subName, boolean noLocal) throws JMSException
00089   {
00090     if (dest == null)
00091       throw new InvalidDestinationException("Invalid null destination.");
00092 
00093     if (dest instanceof TemporaryQueue) {
00094       Connection tempQCnx = ((TemporaryQueue) dest).getCnx();
00095 
00096       if (tempQCnx == null || ! tempQCnx.equals(sess.cnx))
00097         throw new JMSSecurityException("Forbidden consumer on this "
00098                                        + "temporary destination.");
00099     }
00100     else if (dest instanceof TemporaryTopic) {
00101       Connection tempTCnx = ((TemporaryTopic) dest).getCnx();
00102     
00103       if (tempTCnx == null || ! tempTCnx.equals(sess.cnx))
00104         throw new JMSSecurityException("Forbidden consumer on this "
00105                                        + "temporary destination.");
00106     }
00107 
00108     // If the destination is a topic, the consumer is a subscriber:
00109     if (dest instanceof Topic) {
00110       if (subName == null) {
00111         subName = sess.cnx.nextSubName();
00112         durableSubscriber = false;
00113       }
00114       else
00115         durableSubscriber = true;
00116 
00117       sess.cnx.syncRequest(new ConsumerSubRequest(dest.getName(),
00118                                                   subName,
00119                                                   selector,
00120                                                   noLocal,
00121                                                   durableSubscriber));
00122       targetName = subName;
00123       this.noLocal = noLocal;
00124       queueMode = false;
00125     }
00126     else {
00127       targetName = dest.getName();
00128       queueMode = true;
00129     }
00130 
00131     this.sess = sess;
00132     this.dest = dest;
00133     this.selector = selector;
00134 
00135     sess.consumers.addElement(this);
00136 
00137     if (JoramTracing.dbgClient)
00138       JoramTracing.log(JoramTracing.DEBUG, this + ": created.");
00139   }
00140 
00152   MessageConsumer(Session sess, Destination dest,
00153                   String selector) throws JMSException
00154   {
00155     this(sess, dest, selector, null, false);
00156   }
00157 
00159   public String toString()
00160   {
00161     return "Consumer:" + sess.ident;
00162   }
00163 
00181   public void setMessageListener(MessageListener messageListener)
00182               throws JMSException
00183   {
00184     if (closed)
00185       throw new IllegalStateException("Forbidden call on a closed consumer.");
00186 
00187     if (JoramTracing.dbgClient)
00188       JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00189                        + ": setting MessageListener to "
00190                        + messageListener);
00191 
00192     if (sess.cnx.started && JoramTracing.dbgClient)
00193       JoramTracing.log(JoramTracing.WARN, this + ": improper call"
00194                        + " on a started connection.");
00195     
00196     // If unsetting the listener:
00197     if (this.messageListener != null && messageListener == null) {
00198       if (JoramTracing.dbgClient)
00199         JoramTracing.log(JoramTracing.DEBUG, this + ": unsets"
00200                          + " listener request.");
00201 
00202       sess.cnx.requestsTable.remove(pendingReq.getKey());
00203 
00204       this.messageListener = messageListener;
00205       sess.msgListeners--;
00206 
00207       ConsumerUnsetListRequest unsetLR = null;
00208       if (queueMode) {
00209         unsetLR = new ConsumerUnsetListRequest(true);
00210         unsetLR.setCancelledRequestId(pendingReq.getRequestId());
00211       }
00212       else {
00213         unsetLR = new ConsumerUnsetListRequest(false);
00214         unsetLR.setTarget(targetName);
00215       }
00216 
00217       try {
00218         sess.cnx.syncRequest(unsetLR);
00219       } 
00220       // A JMSException might be caught if the connection is broken.
00221       catch (JMSException jE) {}
00222       pendingReq = null;
00223 
00224       // Stopping the daemon if not needed anymore:
00225       if (sess.msgListeners == 0 && sess.started) {
00226         if (JoramTracing.dbgClient)
00227           JoramTracing.log(JoramTracing.DEBUG, this + ": stops the"
00228                            + " session daemon.");
00229         sess.daemon.stop();
00230         sess.daemon = null;
00231         sess.started = false;
00232       }
00233     }
00234     // Else, if setting a new listener:
00235     else if (this.messageListener == null && messageListener != null) {
00236       sess.msgListeners++;
00237 
00238       if (sess.msgListeners == 1
00239           && (sess.started || sess.cnx.started)) {
00240         if (JoramTracing.dbgClient)
00241           JoramTracing.log(JoramTracing.DEBUG, this + ": starts the"
00242                            + " session daemon.");
00243         sess.daemon = new SessionDaemon(sess);
00244         sess.daemon.setDaemon(false);
00245         sess.daemon.start();
00246         sess.started = true;
00247       }
00248 
00249       this.messageListener = messageListener;
00250       pendingReq = new ConsumerSetListRequest(targetName, selector, queueMode);
00251       pendingReq.setRequestId(sess.cnx.nextRequestId());
00252       sess.cnx.requestsTable.put(pendingReq.getKey(), this);
00253       sess.cnx.asyncRequest(pendingReq);
00254     }
00255 
00256     if (JoramTracing.dbgClient)
00257       JoramTracing.log(JoramTracing.DEBUG, this + ": MessageListener"
00258                        + " set.");
00259   }
00260 
00266   public MessageListener getMessageListener() throws JMSException
00267   {
00268     if (closed)
00269       throw new IllegalStateException("Forbidden call on a closed consumer.");
00270 
00271     return messageListener;
00272   }
00273 
00279   public String getMessageSelector() throws JMSException
00280   {
00281     if (closed)
00282       throw new IllegalStateException("Forbidden call on a closed consumer.");
00283 
00284     return selector;
00285   }
00286 
00296   public Message receive(long timeOut) throws JMSException
00297   {
00298     // Synchronizing with a possible "close".
00299     synchronized(this) {
00300       if (JoramTracing.dbgClient)
00301         JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00302                          + ": requests to receive a message.");
00303 
00304       if (closed)
00305         throw new IllegalStateException("Forbidden call on a closed consumer.");
00306 
00307       if (messageListener != null) {
00308         if (JoramTracing.dbgClient)
00309           JoramTracing.log(JoramTracing.WARN, "Improper call as a"
00310                            + " listener exists for this consumer.");
00311       }
00312       else if (sess.msgListeners > 0) {
00313         if (JoramTracing.dbgClient)
00314           JoramTracing.log(JoramTracing.WARN, "Improper call as"
00315                            + " asynchronous consumers have already"
00316                            + " been set on the session.");
00317       }
00318       pendingReq = new ConsumerReceiveRequest(targetName, selector, timeOut,
00319                                               queueMode);
00320       pendingReq.setRequestId(sess.cnx.nextRequestId());
00321       receiving = true;
00322 
00323       // In case of a timer, scheduling the receive:
00324       if (timeOut > 0) {
00325         replyingTask = new ConsumerReplyTask(pendingReq);
00326         sess.schedule(replyingTask, timeOut);
00327       }
00328     }
00329 
00330     // Expecting an answer:
00331     ConsumerMessages reply =
00332      (ConsumerMessages) sess.cnx.syncRequest(pendingReq);
00333 
00334     // Synchronizing again with a possible "close":
00335     synchronized(this) {
00336       receiving = false;
00337       pendingReq = null;
00338       if (replyingTask != null)
00339         replyingTask.cancel();
00340       if (JoramTracing.dbgClient)
00341         JoramTracing.log(JoramTracing.DEBUG, this + ": received a"
00342                          + " reply.");
00343 
00344       Vector msgs = reply.getMessages();
00345       if (msgs != null && ! msgs.isEmpty()) {
00346         com.scalagent.kjoram.messages.Message msg =
00347           (com.scalagent.kjoram.messages.Message) msgs.elementAt(0);
00348         String msgId = msg.getIdentifier();
00349         // Auto ack: acknowledging the message:
00350         if (sess.autoAck)
00351           sess.cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId,
00352                                                        queueMode));
00353         // Session ack: passing the id for later ack or deny:
00354         else
00355           sess.prepareAck(targetName, msgId, queueMode);
00356 
00357         return Message.wrapMomMessage(sess, msg);
00358       }
00359       else
00360         return null;
00361       }
00362     }
00363 
00373   public Message receive() throws JMSException
00374   {
00375     return receive(0);
00376   }
00377 
00387   public Message receiveNoWait() throws JMSException
00388   {
00389     return receive(-1);
00390   }
00391 
00397   public void close() throws JMSException
00398   {
00399     // Ignoring the call if consumer is already closed:
00400     if (closed)
00401       return;
00402 
00403     if (JoramTracing.dbgClient)
00404       JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00405                        + ": closing...");
00406 
00407     // Synchronizig with a possible receive() or onMessage() ongoing process.
00408     syncro();
00409 
00410     // Removing this resource's reference from everywhere:
00411     Object lock = null;
00412     if (pendingReq != null)
00413       lock = sess.cnx.requestsTable.remove(pendingReq.getKey());
00414     sess.consumers.removeElement(this);
00415 
00416     // Unsetting the listener, if any:
00417     try {
00418       if (messageListener != null) {
00419         if (JoramTracing.dbgClient)
00420           JoramTracing.log(JoramTracing.DEBUG, "Unsetting listener.");
00421 
00422         if (queueMode) {
00423           ConsumerUnsetListRequest unsetLR =
00424             new ConsumerUnsetListRequest(true);
00425           unsetLR.setCancelledRequestId(pendingReq.getRequestId());
00426           sess.cnx.syncRequest(unsetLR);
00427         }
00428       }
00429 
00430       if (durableSubscriber)
00431         sess.cnx.syncRequest(new ConsumerCloseSubRequest(targetName));
00432       else if (! queueMode)
00433         sess.cnx.syncRequest(new ConsumerUnsubRequest(targetName));
00434     }
00435     // A JMSException might be caught if the connection is broken.
00436     catch (JMSException jE) {}
00437 
00438     // In the case of a pending "receive" request, replying by a null to it:
00439     if (lock != null && receiving) {
00440       if (JoramTracing.dbgClient)
00441         JoramTracing.log(JoramTracing.DEBUG, "Replying to the"
00442                          + " pending receive "
00443                          + pendingReq.getRequestId()
00444                          + " with a null message.");
00445 
00446       sess.cnx.repliesTable.put(pendingReq.getKey(), new ConsumerMessages());
00447 
00448       synchronized(lock) {
00449         lock.notify();
00450       }
00451     }
00452 
00453     // Synchronizing again:
00454     syncro();
00455 
00456     closed = true;
00457     
00458     if (JoramTracing.dbgClient)
00459       JoramTracing.log(JoramTracing.DEBUG, this + ": closed.");
00460   }
00461 
00466   synchronized void syncro() {}
00467   
00472   synchronized void onMessage(com.scalagent.kjoram.messages.Message message)
00473   {
00474     String msgId = message.getIdentifier();
00475 
00476     try {
00477       // If the listener has been unset without having stopped the
00478       // connection, this case might happen:
00479       if (messageListener == null) {
00480         if (JoramTracing.dbgClient)
00481           JoramTracing.log(JoramTracing.WARN, this + ": an"
00482                            + " asynchronous delivery arrived"
00483                            + " for an improperly unset listener:"
00484                            + " denying the message.");
00485         sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,
00486                                                      queueMode, true));
00487       }
00488       else {
00489         // In session ack mode, preparing later ack or deny:
00490         if (! sess.autoAck)
00491           sess.prepareAck(targetName, msgId, queueMode);
00492 
00493         try {
00494           messageListener.onMessage(Message.wrapMomMessage(sess, message));
00495           // Auto ack: acknowledging the message:
00496           if (sess.autoAck)
00497             sess.cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId,
00498                                                          queueMode));
00499         }
00500         // Catching a JMSException means that the building of the Joram
00501         // message went wrong: denying as expected by the spec:
00502         catch (JMSException jE) {
00503           JoramTracing.log(JoramTracing.ERROR, this
00504                            + ": error while processing the"
00505                            + " received message: " + jE);
00506           
00507           if (queueMode)
00508             sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,
00509                                                          queueMode));
00510           else
00511             sess.cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,
00512                                                           queueMode));
00513         }
00514         // Catching a RuntimeException means that the client onMessage() code
00515         // is incorrect; denying as expected by the JMS spec:
00516         catch (RuntimeException rE) {
00517           JoramTracing.log(JoramTracing.ERROR, this
00518                            + ": RuntimeException thrown"
00519                            + " by the listener: " + rE);
00520 
00521           if (sess.autoAck && queueMode)
00522             sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,
00523                                                          queueMode));
00524           else if (sess.autoAck && ! queueMode)
00525             sess.cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,
00526                                                           queueMode));
00527         }
00528         // Sending a new request if queue mode:
00529         if (queueMode) {
00530           pendingReq = new ConsumerSetListRequest(targetName, selector, true);
00531           pendingReq.setRequestId(sess.cnx.nextRequestId());
00532           sess.cnx.requestsTable.put(pendingReq.getKey(), this);
00533           sess.cnx.asyncRequest(pendingReq);
00534         }
00535       }
00536     }
00537     // Catching an IllegalStateException means that the acknowledgement or
00538     // denying went wrong because the connection has been lost. Nothing more
00539     // can be done here.
00540     catch (JMSException jE) {
00541       JoramTracing.log(JoramTracing.ERROR, this + ": " + jE);
00542     }
00543   }
00544 
00549   private class ConsumerReplyTask extends TimerTask
00550   {
00552     private AbstractJmsRequest request;
00554     private ConsumerMessages nullReply;
00555 
00561     ConsumerReplyTask(AbstractJmsRequest request)
00562     {
00563       this.request = request;
00564       this.nullReply = new ConsumerMessages(request.getRequestId(),
00565                                             targetName,
00566                                             queueMode);
00567     }
00568 
00573     public void run()
00574     {
00575       try {
00576         if (JoramTracing.dbgClient)
00577           JoramTracing.log(JoramTracing.WARN, "Receive request" +
00578                            " answered because timer expired");
00579 
00580         Lock lock = (Lock) sess.cnx.requestsTable.remove(request.getKey());
00581 
00582         if (lock == null)
00583           return;
00584 
00585         synchronized (lock) {
00586           sess.cnx.repliesTable.put(request.getKey(), nullReply);
00587           lock.notify();
00588         }
00589       }
00590       catch (Exception e) {}
00591     }
00592   }
00593 }

Generated on Tue Sep 16 16:14:24 2008 for joram by  doxygen 1.5.0