00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 package com.scalagent.kjoram;
00025
00026 import com.scalagent.kjoram.jms.*;
00027 import com.scalagent.kjoram.excepts.*;
00028
00029 import java.util.Vector;
00030 import java.util.Enumeration;
00031
00032
00033 public class ConnectionConsumer
00034 {
00036 private Connection cnx;
00038 private boolean durable = false;
00040 private String selector;
00042 private ServerSessionPool sessionPool;
00044 private int maxMessages;
00049 private CCDaemon ccDaemon;
00051 private com.scalagent.kjoram.jms.AbstractJmsRequest currentReq = null;
00053 private boolean closed = false;
00054
00056 String targetName;
00058 boolean queueMode = true;
00063 com.scalagent.kjoram.util.Queue repliesIn;
00064
00065
00084 ConnectionConsumer(Connection cnx, Destination dest, String subName,
00085 String selector, ServerSessionPool sessionPool,
00086 int maxMessages) throws JMSException
00087 {
00088 if (sessionPool == null)
00089 throw new JMSException("Invalid ServerSessionPool parameter: "
00090 + sessionPool);
00091 if (maxMessages <= 0)
00092 throw new JMSException("Invalid maxMessages parameter: " + maxMessages);
00093
00094 this.cnx = cnx;
00095 this.selector = selector;
00096 this.sessionPool = sessionPool;
00097 this.maxMessages = maxMessages;
00098
00099 if (dest instanceof Queue)
00100 targetName = dest.getName();
00101 else if (subName == null) {
00102 queueMode = false;
00103 targetName = cnx.nextSubName();
00104 }
00105 else {
00106 queueMode = false;
00107 targetName = subName;
00108 durable = true;
00109 }
00110
00111 repliesIn = new com.scalagent.kjoram.util.Queue();
00112
00113 if (cnx.cconsumers == null)
00114 cnx.cconsumers = new Vector();
00115
00116 cnx.cconsumers.addElement(this);
00117
00118 ccDaemon = new CCDaemon(this);
00119 ccDaemon.setDaemon(true);
00120 ccDaemon.start();
00121
00122
00123 if (! queueMode)
00124 cnx.syncRequest(new ConsumerSubRequest(dest.getName(), targetName,
00125 selector, false, durable));
00126
00127
00128 currentReq = new ConsumerSetListRequest(targetName, selector, queueMode);
00129 currentReq.setRequestId(cnx.nextRequestId());
00130 cnx.requestsTable.put(currentReq.getKey(), this);
00131 cnx.asyncRequest(currentReq);
00132
00133 if (JoramTracing.dbgClient)
00134 JoramTracing.log(JoramTracing.DEBUG, this + ": created.");
00135 }
00136
00154 ConnectionConsumer(Connection cnx, Destination dest, String selector,
00155 ServerSessionPool sessionPool,
00156 int maxMessages) throws JMSException
00157 {
00158 this(cnx, dest, null, selector, sessionPool, maxMessages);
00159 }
00160
00162 public String toString()
00163 {
00164 return "ConnCons:" + cnx.toString();
00165 }
00166
00167
00173 public ServerSessionPool getServerSessionPool() throws JMSException
00174 {
00175 if (closed)
00176 throw new com.scalagent.kjoram.excepts.
00177 IllegalStateException("Forbidden call on a closed"
00178 + " ConnectionConsumer.");
00179 return sessionPool;
00180 }
00181
00182
00188 public void close() throws JMSException
00189 {
00190 cnx.requestsTable.remove(currentReq.getKey());
00191 ccDaemon.stop();
00192
00193
00194 if (! queueMode) {
00195 try {
00196 if (durable)
00197 cnx.syncRequest(new ConsumerCloseSubRequest(targetName));
00198 else
00199 cnx.syncRequest(new ConsumerUnsubRequest(targetName));
00200 }
00201
00202 catch (JMSException jE) {}
00203 }
00204 cnx.cconsumers.removeElement(this);
00205 }
00206
00211 class CCDaemon extends com.scalagent.kjoram.util.Daemon
00212 {
00214 private ConnectionConsumer cc;
00215
00220 CCDaemon(ConnectionConsumer cc)
00221 {
00222 super(cc.toString());
00223 this.cc = cc;
00224 }
00225
00227 public void run()
00228 {
00229 ConsumerMessages reply;
00230 Vector deliveries = new Vector();
00231 ServerSession serverSess;
00232 Session sess;
00233 int counter;
00234
00235 try {
00236 while (running) {
00237 canStop = true;
00238
00239 try {
00240
00241 repliesIn.get();
00242 }
00243 catch (Exception iE) {
00244 continue;
00245 }
00246 canStop = false;
00247
00248
00249 try {
00250 if (JoramTracing.dbgClient)
00251 JoramTracing.log(JoramTracing.DEBUG, "--- " + cc
00252 + ": got a delivery.");
00253
00254
00255 serverSess = sessionPool.getServerSession();
00256 sess = (Session) serverSess.getSession();
00257 sess.connectionConsumer = cc;
00258 counter = 1;
00259
00260
00261
00262 while (counter <= maxMessages && repliesIn.size() > 0) {
00263
00264
00265 if (queueMode) {
00266 cnx.requestsTable.remove(currentReq.getKey());
00267 currentReq = new ConsumerSetListRequest(targetName, selector,
00268 queueMode);
00269 currentReq.setRequestId(cnx.nextRequestId());
00270 cnx.requestsTable.put(currentReq.getKey(), cc);
00271 cnx.asyncRequest(currentReq);
00272 }
00273
00274 reply = (ConsumerMessages) repliesIn.pop();
00275 for (Enumeration e = reply.getMessages().elements(); e.hasMoreElements(); ) {
00276 deliveries.addElement(e.nextElement());
00277 }
00278
00279 while (! deliveries.isEmpty()) {
00280 while (counter <= maxMessages && ! deliveries.isEmpty()) {
00281 if (JoramTracing.dbgClient)
00282 JoramTracing.log(JoramTracing.DEBUG, "Passes a"
00283 + " message to a session.");
00284 Object obj = deliveries.elementAt(0);
00285 deliveries.removeElementAt(0);
00286 sess.repliesIn.push(obj);
00287 counter++;
00288 }
00289 if (counter > maxMessages) {
00290 if (JoramTracing.dbgClient)
00291 JoramTracing.log(JoramTracing.DEBUG, "Starts the"
00292 + " session.");
00293 serverSess.start();
00294 counter = 1;
00295
00296 if (! deliveries.isEmpty() || repliesIn.size() > 0) {
00297 serverSess = sessionPool.getServerSession();
00298 sess =
00299 (Session) serverSess.getSession();
00300 sess.connectionConsumer = cc;
00301 }
00302 }
00303 }
00304 }
00305
00306
00307 if (JoramTracing.dbgClient)
00308 JoramTracing.log(JoramTracing.DEBUG, "No more delivery.");
00309 if (counter > 1) {
00310 if (JoramTracing.dbgClient)
00311 JoramTracing.log(JoramTracing.DEBUG, "Starts the"
00312 + " session.");
00313 counter = 1;
00314 serverSess.start();
00315 }
00316 }
00317
00318
00319 catch (JMSException jE) {
00320 canStop = true;
00321 try {
00322 cc.close();
00323 }
00324 catch (JMSException jE2) {}
00325 }
00326 }
00327 }
00328 finally {
00329 finish();
00330 }
00331 }
00332
00334 public void shutdown()
00335 {}
00336
00338 public void close()
00339 {
00340 if (JoramTracing.dbgClient)
00341 JoramTracing.log(JoramTracing.DEBUG, "CCDaemon finished.");
00342 }
00343 }
00344 }