com/scalagent/kjoram/ConnectionConsumer.java

00001 /*
00002  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
00003  * Copyright (C) 2001 - ScalAgent Distributed Technologies
00004  * Copyright (C) 1996 - Dyade
00005  *
00006  * This library is free software; you can redistribute it and/or
00007  * modify it under the terms of the GNU Lesser General Public
00008  * License as published by the Free Software Foundation; either
00009  * version 2.1 of the License, or any later version.
00010  * 
00011  * This library is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00014  * Lesser General Public License for more details.
00015  * 
00016  * You should have received a copy of the GNU Lesser General Public
00017  * License along with this library; if not, write to the Free Software
00018  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00019  * USA.
00020  *
00021  * Initial developer(s): Frederic Maistre (INRIA)
00022  * Contributor(s): Nicolas Tachker (ScalAgent)
00023  */
00024 package com.scalagent.kjoram;
00025 
00026 import com.scalagent.kjoram.jms.*;
00027 import com.scalagent.kjoram.excepts.*;
00028 
00029 import java.util.Vector;
00030 import java.util.Enumeration;
00031 
00032 
00033 public class ConnectionConsumer
00034 {
00036   private Connection cnx;
00038   private boolean durable = false;
00040   private String selector;
00042   private ServerSessionPool sessionPool;
00044   private int maxMessages;
00049   private CCDaemon ccDaemon;
00051   private com.scalagent.kjoram.jms.AbstractJmsRequest currentReq = null;
00053   private boolean closed = false;
00054 
00056   String targetName;
00058   boolean queueMode = true;
00063   com.scalagent.kjoram.util.Queue repliesIn;
00064 
00065 
00084   ConnectionConsumer(Connection cnx, Destination dest, String subName,
00085                      String selector, ServerSessionPool sessionPool,
00086                      int maxMessages) throws JMSException
00087   {
00088     if (sessionPool == null)
00089       throw new JMSException("Invalid ServerSessionPool parameter: "
00090                              + sessionPool);
00091     if (maxMessages <= 0)
00092       throw new JMSException("Invalid maxMessages parameter: " + maxMessages);
00093     
00094     this.cnx = cnx;
00095     this.selector = selector;
00096     this.sessionPool = sessionPool;
00097     this.maxMessages = maxMessages;
00098 
00099     if (dest instanceof Queue)
00100       targetName = dest.getName();
00101     else if (subName == null) {
00102       queueMode = false;
00103       targetName = cnx.nextSubName();
00104     }
00105     else {
00106       queueMode = false;
00107       targetName = subName;
00108       durable = true;
00109     }
00110 
00111     repliesIn = new com.scalagent.kjoram.util.Queue();
00112 
00113     if (cnx.cconsumers == null)
00114       cnx.cconsumers = new Vector();
00115  
00116     cnx.cconsumers.addElement(this);
00117 
00118     ccDaemon = new CCDaemon(this);
00119     ccDaemon.setDaemon(true);
00120     ccDaemon.start();
00121 
00122     // If the consumer is a subscriber, subscribing to the target topic:
00123     if (! queueMode) 
00124       cnx.syncRequest(new ConsumerSubRequest(dest.getName(), targetName,
00125                                              selector, false, durable));
00126 
00127     // Sending a listener request:
00128     currentReq = new ConsumerSetListRequest(targetName, selector, queueMode);
00129     currentReq.setRequestId(cnx.nextRequestId());
00130     cnx.requestsTable.put(currentReq.getKey(), this);
00131     cnx.asyncRequest(currentReq);
00132 
00133     if (JoramTracing.dbgClient)
00134       JoramTracing.log(JoramTracing.DEBUG, this + ": created.");
00135   }
00136 
00154   ConnectionConsumer(Connection cnx, Destination dest, String selector,
00155                      ServerSessionPool sessionPool,
00156                      int maxMessages) throws JMSException
00157   {
00158     this(cnx, dest, null, selector, sessionPool, maxMessages);
00159   }
00160 
00162   public String toString()
00163   {
00164     return "ConnCons:" + cnx.toString();
00165   }
00166 
00167 
00173   public ServerSessionPool getServerSessionPool() throws JMSException
00174   {
00175     if (closed)
00176       throw new com.scalagent.kjoram.excepts.
00177         IllegalStateException("Forbidden call on a closed"
00178                               + " ConnectionConsumer.");
00179     return sessionPool;
00180   }
00181 
00182 
00188   public void close() throws JMSException
00189   {
00190     cnx.requestsTable.remove(currentReq.getKey());
00191     ccDaemon.stop();
00192 
00193     // If the consumer is a subscriber, managing the subscription closing: 
00194     if (! queueMode) {
00195       try {
00196         if (durable) 
00197           cnx.syncRequest(new ConsumerCloseSubRequest(targetName));
00198         else
00199           cnx.syncRequest(new ConsumerUnsubRequest(targetName));
00200       }
00201       // A JMSException might be caught if the connection is broken.
00202       catch (JMSException jE) {}
00203     }
00204     cnx.cconsumers.removeElement(this);
00205   }
00206 
00211 class CCDaemon extends com.scalagent.kjoram.util.Daemon
00212 {
00214   private ConnectionConsumer cc;
00215 
00220   CCDaemon(ConnectionConsumer cc)
00221   {
00222     super(cc.toString());
00223     this.cc = cc;
00224   }
00225 
00227   public void run()
00228   {
00229     ConsumerMessages reply;
00230     Vector deliveries = new Vector();
00231     ServerSession serverSess;
00232     Session sess;
00233     int counter;
00234 
00235     try {
00236       while (running) {
00237         canStop = true; 
00238 
00239         try {
00240           // Expecting a reply:
00241           repliesIn.get();
00242         }
00243         catch (Exception iE) {
00244           continue;
00245         }
00246         canStop = false;
00247 
00248         // Processing the delivery:
00249         try {
00250           if (JoramTracing.dbgClient)
00251             JoramTracing.log(JoramTracing.DEBUG, "--- " + cc
00252                              + ": got a delivery.");
00253 
00254           // Getting a server's session:
00255           serverSess = sessionPool.getServerSession();
00256           sess = (Session) serverSess.getSession();
00257           sess.connectionConsumer = cc;
00258           counter = 1;
00259 
00260           // As long as there are messages to deliver, passing to session(s)
00261           // as many messages as possible:
00262           while (counter <= maxMessages && repliesIn.size() > 0) {
00263             
00264             // If the consumer is a queue consumer, sending a new request:
00265             if (queueMode) {
00266               cnx.requestsTable.remove(currentReq.getKey());
00267               currentReq = new ConsumerSetListRequest(targetName, selector,
00268                                                       queueMode);
00269               currentReq.setRequestId(cnx.nextRequestId());
00270               cnx.requestsTable.put(currentReq.getKey(), cc);
00271               cnx.asyncRequest(currentReq);
00272             }
00273 
00274             reply = (ConsumerMessages) repliesIn.pop();
00275             for (Enumeration e = reply.getMessages().elements(); e.hasMoreElements(); ) {
00276               deliveries.addElement(e.nextElement());
00277             }
00278 
00279             while (! deliveries.isEmpty()) {
00280               while (counter <= maxMessages && ! deliveries.isEmpty()) {
00281                 if (JoramTracing.dbgClient)
00282                   JoramTracing.log(JoramTracing.DEBUG, "Passes a"
00283                                    + " message to a session.");
00284                 Object obj = deliveries.elementAt(0);
00285                 deliveries.removeElementAt(0);
00286                 sess.repliesIn.push(obj);
00287                 counter++;
00288               }
00289               if (counter > maxMessages) {
00290                 if (JoramTracing.dbgClient)
00291                   JoramTracing.log(JoramTracing.DEBUG, "Starts the"
00292                                    + " session.");
00293                 serverSess.start(); 
00294                 counter = 1;
00295 
00296                 if (! deliveries.isEmpty() || repliesIn.size() > 0) {
00297                   serverSess = sessionPool.getServerSession();
00298                   sess =
00299                     (Session) serverSess.getSession();
00300                   sess.connectionConsumer = cc;
00301                 }
00302               }
00303             }
00304           }
00305           // There is no more message to deliver and no more delivery, 
00306           // starting the last session to which messages have been passed:
00307           if (JoramTracing.dbgClient)
00308             JoramTracing.log(JoramTracing.DEBUG, "No more delivery.");
00309           if (counter > 1) {
00310             if (JoramTracing.dbgClient)
00311               JoramTracing.log(JoramTracing.DEBUG, "Starts the"
00312                                + " session.");
00313             counter = 1;
00314             serverSess.start();
00315           }
00316         }
00317         // A JMSException will be caught if the application server failed
00318         // to provide a session: closing the consumer.
00319         catch (JMSException jE) {
00320           canStop = true;
00321           try {
00322             cc.close();
00323           }
00324           catch (JMSException jE2) {}
00325         }
00326       }
00327     }
00328     finally {
00329       finish();
00330     }
00331   }
00332 
00334   public void shutdown()
00335   {}
00336 
00338   public void close()
00339   {
00340     if (JoramTracing.dbgClient)
00341       JoramTracing.log(JoramTracing.DEBUG, "CCDaemon finished.");
00342   }
00343 }
00344 }

Generated on Tue Sep 16 16:14:23 2008 for joram by  doxygen 1.5.0