00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 package com.scalagent.kjoram;
00025
00026 import com.scalagent.kjoram.jms.*;
00027 import com.scalagent.kjoram.util.TimerTask;
00028
00029 import java.util.*;
00030
00031 import com.scalagent.kjoram.excepts.IllegalStateException;
00032 import com.scalagent.kjoram.excepts.*;
00033
00034
00035 public class Session
00036 {
00037 public static final int SESSION_TRANSACTED = 0;
00038 public static final int AUTO_ACKNOWLEDGE = 1;
00039 public static final int CLIENT_ACKNOWLEDGE = 2;
00040 public static final int DUPS_OK_ACKNOWLEDGE = 3;
00041
00043 private TimerTask closingTask = null;
00045 private boolean scheduled = false;
00046
00048 private com.scalagent.kjoram.util.Timer consumersTimer = null;
00049
00051 protected MessageListener messageListener = null;
00052
00054 String ident;
00056 Connection cnx;
00058 boolean transacted;
00060 int acknowledgeMode;
00062 boolean closed = false;
00064 boolean started = false;
00065
00067 boolean autoAck;
00068
00070 Vector consumers;
00072 Vector producers;
00074 Vector browsers;
00076 com.scalagent.kjoram.util.Queue repliesIn;
00078 SessionDaemon daemon = null;
00080 int msgListeners = 0;
00088 Hashtable sendings;
00096 Hashtable deliveries;
00097
00099 ConnectionConsumer connectionConsumer = null;
00100
00101
00111 Session(Connection cnx, boolean transacted,
00112 int acknowledgeMode) throws JMSException
00113 {
00114 if (! transacted
00115 && acknowledgeMode != Session.AUTO_ACKNOWLEDGE
00116 && acknowledgeMode != Session.CLIENT_ACKNOWLEDGE
00117 && acknowledgeMode != Session.DUPS_OK_ACKNOWLEDGE)
00118 throw new JMSException("Can't create a non transacted session with an"
00119 + " invalid acknowledge mode.");
00120
00121 this.ident = cnx.nextSessionId();
00122 this.cnx = cnx;
00123 this.transacted = transacted;
00124 this.acknowledgeMode = acknowledgeMode;
00125
00126 autoAck = ! transacted
00127 && acknowledgeMode != Session.CLIENT_ACKNOWLEDGE;
00128
00129 consumers = new Vector();
00130 producers = new Vector();
00131 browsers = new Vector();
00132 repliesIn = new com.scalagent.kjoram.util.Queue();
00133 sendings = new Hashtable();
00134 deliveries = new Hashtable();
00135
00136
00137
00138 if (transacted && cnx.factoryParameters.txPendingTimer != 0)
00139 closingTask = new SessionCloseTask();
00140
00141 cnx.sessions.addElement(this);
00142
00143 if (JoramTracing.dbgClient)
00144 JoramTracing.log(JoramTracing.DEBUG,this + ": created.");
00145 }
00146
00148 public String toString()
00149 {
00150 return "Sess:" + ident;
00151 }
00152
00153
00159 public int getAcknowledgeMode() throws JMSException
00160 {
00161 return acknowledgeMode;
00162 }
00163
00169 public boolean getTransacted() throws JMSException
00170 {
00171 return transacted;
00172 }
00173
00179 public void setMessageListener(MessageListener messageListener)
00180 throws JMSException
00181 {
00182 this.messageListener = messageListener;
00183 }
00184
00190 public MessageListener getMessageListener() throws JMSException
00191 {
00192 return messageListener;
00193 }
00194
00200 public Message createMessage() throws JMSException
00201 {
00202 if (closed)
00203 throw new IllegalStateException("Forbidden call on a closed session.");
00204
00205 return new Message();
00206 }
00207
00213 public TextMessage createTextMessage() throws JMSException
00214 {
00215 if (closed)
00216 throw new IllegalStateException("Forbidden call on a closed session.");
00217
00218 return new TextMessage();
00219 }
00220
00226 public TextMessage createTextMessage(String text)
00227 throws JMSException
00228 {
00229 if (closed)
00230 throw new IllegalStateException("Forbidden call on a closed session.");
00231
00232 TextMessage message = new TextMessage();
00233 message.setText(text);
00234 return message;
00235 }
00236
00242 public BytesMessage createBytesMessage()
00243 throws JMSException
00244 {
00245 if (closed)
00246 throw new IllegalStateException("Forbidden call on a closed session.");
00247
00248 return new BytesMessage();
00249 }
00250
00256 public MapMessage createMapMessage()
00257 throws JMSException
00258 {
00259 if (closed)
00260 throw new IllegalStateException("Forbidden call on a closed session.");
00261
00262 return new MapMessage();
00263 }
00264
00270 public QueueBrowser
00271 createBrowser(Queue queue, String selector)
00272 throws JMSException
00273 {
00274 if (closed)
00275 throw new IllegalStateException("Forbidden call on a closed session.");
00276
00277 return new QueueBrowser(this, (Queue) queue, selector);
00278 }
00279
00285 public QueueBrowser createBrowser(Queue queue)
00286 throws JMSException
00287 {
00288 if (closed)
00289 throw new IllegalStateException("Forbidden call on a closed session.");
00290
00291 return new QueueBrowser(this, (Queue) queue, null);
00292 }
00293
00301 public MessageProducer createProducer(Destination dest)
00302 throws JMSException
00303 {
00304 if (closed)
00305 throw new IllegalStateException("Forbidden call on a closed session.");
00306
00307 return new MessageProducer(this, (Destination) dest);
00308 }
00309
00317 public MessageConsumer
00318 createConsumer(Destination dest, String selector,
00319 boolean noLocal) throws JMSException
00320 {
00321 if (closed)
00322 throw new IllegalStateException("Forbidden call on a closed session.");
00323
00324 return new MessageConsumer(this, (Destination) dest, selector, null,
00325 noLocal);
00326 }
00327
00335 public MessageConsumer
00336 createConsumer(Destination dest, String selector)
00337 throws JMSException
00338 {
00339 if (closed)
00340 throw new IllegalStateException("Forbidden call on a closed session.");
00341
00342 return new MessageConsumer(this, (Destination) dest, selector);
00343 }
00344
00352 public MessageConsumer createConsumer(Destination dest)
00353 throws JMSException
00354 {
00355 if (closed)
00356 throw new IllegalStateException("Forbidden call on a closed session.");
00357
00358 return new MessageConsumer(this, (Destination) dest, null);
00359 }
00360
00368 public TopicSubscriber
00369 createDurableSubscriber(Topic topic, String name,
00370 String selector,
00371 boolean noLocal) throws JMSException
00372 {
00373 if (closed)
00374 throw new IllegalStateException("Forbidden call on a closed session.");
00375
00376 return new TopicSubscriber(this, (Topic) topic, name, selector, noLocal);
00377 }
00378
00386 public TopicSubscriber
00387 createDurableSubscriber(Topic topic, String name)
00388 throws JMSException
00389 {
00390 if (closed)
00391 throw new IllegalStateException("Forbidden call on a closed session.");
00392
00393 return new TopicSubscriber(this, (Topic) topic, name, null, false);
00394 }
00395
00401 public Queue createQueue(String queueName) throws JMSException
00402 {
00403 if (closed)
00404 throw new IllegalStateException("Forbidden call on a closed session.");
00405
00406 return new Queue(queueName);
00407 }
00408
00415 public Topic createTopic(String topicName) throws JMSException
00416 {
00417 if (closed)
00418 throw new IllegalStateException("Forbidden call on a closed session.");
00419
00420
00421 if (topicName.equals("#AdminTopic")) {
00422 try {
00423 GetAdminTopicReply reply =
00424 (GetAdminTopicReply) cnx.syncRequest(new GetAdminTopicRequest());
00425 if (reply.getId() != null)
00426 return new Topic(reply.getId());
00427 else
00428 throw new JMSException("AdminTopic could not be retrieved.");
00429 }
00430 catch (JMSException exc) {
00431 throw exc;
00432 }
00433 catch (Exception exc) {
00434 throw new JMSException("AdminTopic could not be retrieved: " + exc);
00435 }
00436 }
00437 return new Topic(topicName);
00438 }
00439
00447 public TemporaryQueue createTemporaryQueue() throws JMSException
00448 {
00449 if (closed)
00450 throw new IllegalStateException("Forbidden call on a closed session.");
00451
00452 SessCreateTDReply reply =
00453 (SessCreateTDReply) cnx.syncRequest(new SessCreateTQRequest());
00454 String tempDest = reply.getAgentId();
00455 return new TemporaryQueue(tempDest, cnx);
00456 }
00457
00465 public TemporaryTopic createTemporaryTopic() throws JMSException
00466 {
00467 if (closed)
00468 throw new IllegalStateException("Forbidden call on a closed session.");
00469
00470 SessCreateTDReply reply =
00471 (SessCreateTDReply) cnx.syncRequest(new SessCreateTTRequest());
00472 String tempDest = reply.getAgentId();
00473 return new TemporaryTopic(tempDest, cnx);
00474 }
00475
00477 public synchronized void run()
00478 {
00479 int load = repliesIn.size();
00480 com.scalagent.kjoram.messages.Message momMsg;
00481 String msgId;
00482 String targetName = connectionConsumer.targetName;
00483 boolean queueMode = connectionConsumer.queueMode;
00484
00485 if (JoramTracing.dbgClient)
00486 JoramTracing.log(JoramTracing.DEBUG, "-- " + this
00487 + ": loaded with " + load
00488 + " message(s) and started.");
00489 try {
00490
00491 for (int i = 0; i < load; i++) {
00492 momMsg = (com.scalagent.kjoram.messages.Message) repliesIn.pop();
00493 msgId = momMsg.getIdentifier();
00494
00495
00496
00497 if (messageListener == null) {
00498 JoramTracing.log(JoramTracing.ERROR,this + ": an"
00499 + " asynchronous delivery arrived for"
00500 + " a non existing session listener:"
00501 + " denying the message.");
00502
00503 if (queueMode)
00504 cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId, true));
00505 else
00506 cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,false));
00507 }
00508
00509 else {
00510
00511 if (! autoAck)
00512 prepareAck(targetName, msgId, queueMode);
00513
00514
00515 try {
00516 messageListener.onMessage(Message.wrapMomMessage(this, momMsg));
00517
00518
00519 if (autoAck)
00520 cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId,
00521 queueMode));
00522 }
00523
00524
00525 catch (JMSException jE) {
00526 JoramTracing.log(JoramTracing.ERROR, this
00527 + ": error while processing the"
00528 + " received message: " + jE);
00529 if (queueMode)
00530 cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,
00531 queueMode));
00532 else
00533 cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,
00534 queueMode));
00535 }
00536
00537
00538 catch (RuntimeException rE) {
00539 JoramTracing.log(JoramTracing.ERROR,this
00540 + ": RuntimeException thrown"
00541 + " by the listener: " + rE);
00542 if (autoAck && queueMode)
00543 cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,
00544 queueMode));
00545 else if (autoAck && ! queueMode)
00546 cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,
00547 queueMode));
00548 }
00549 }
00550 }
00551 }
00552 catch (JMSException e) {}
00553 }
00554
00561 public void commit() throws JMSException
00562 {
00563 if (closed)
00564 throw new IllegalStateException("Forbidden call on a closed session.");
00565
00566 if (! transacted)
00567 throw new IllegalStateException("Can't commit a non transacted"
00568 + " session.");
00569
00570 if (JoramTracing.dbgClient)
00571 JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00572 + ": committing...");
00573
00574
00575 if (scheduled) {
00576 closingTask.cancel();
00577 scheduled = false;
00578 }
00579
00580
00581 try {
00582 Enumeration dests = sendings.keys();
00583 String dest;
00584 ProducerMessages pM;
00585 while (dests.hasMoreElements()) {
00586 dest = (String) dests.nextElement();
00587 pM = (ProducerMessages) sendings.remove(dest);
00588 cnx.syncRequest(pM);
00589 }
00590
00591 acknowledge();
00592
00593 if (JoramTracing.dbgClient)
00594 JoramTracing.log(JoramTracing.DEBUG, this + ": committed.");
00595 }
00596
00597 catch (JMSException jE) {
00598 TransactionRolledBackException tE =
00599 new TransactionRolledBackException("A JMSException was thrown during"
00600 + " the commit.");
00601 tE.setLinkedException(jE);
00602
00603 JoramTracing.log(JoramTracing.ERROR, "Exception: " + tE);
00604
00605 rollback();
00606 throw tE;
00607 }
00608 }
00609
00616 public void rollback() throws JMSException
00617 {
00618 if (closed)
00619 throw new IllegalStateException("Forbidden call on a closed session.");
00620
00621 if (! transacted)
00622 throw new IllegalStateException("Can't rollback a non transacted"
00623 + " session.");
00624
00625 if (JoramTracing.dbgClient)
00626 JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00627 + ": rolling back...");
00628
00629
00630 if (scheduled) {
00631 closingTask.cancel();
00632 scheduled = false;
00633 }
00634
00635
00636 deny();
00637
00638 sendings.clear();
00639
00640 if (JoramTracing.dbgClient)
00641 JoramTracing.log(JoramTracing.DEBUG, this + ": rolled back.");
00642 }
00643
00649 public void recover() throws JMSException
00650 {
00651 if (transacted)
00652 throw new IllegalStateException("Can't recover a transacted session.");
00653
00654 if (JoramTracing.dbgClient)
00655 JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00656 + " recovering...");
00657
00658
00659 stop();
00660 deny();
00661
00662 start();
00663
00664 if (JoramTracing.dbgClient)
00665 JoramTracing.log(JoramTracing.DEBUG, this + ": recovered.");
00666 }
00667
00668
00676 public void unsubscribe(String name) throws JMSException
00677 {
00678 MessageConsumer cons;
00679 for (int i = 0; i < consumers.size(); i++) {
00680 cons = (MessageConsumer) consumers.elementAt(i);
00681 if (! cons.queueMode && cons.targetName.equals(name))
00682 throw new JMSException("Can't delete durable subscription " + name
00683 + " as long as an active subscriber exists.");
00684 }
00685 cnx.syncRequest(new ConsumerUnsubRequest(name));
00686 }
00687
00693 public synchronized void close() throws JMSException
00694 {
00695
00696 if (closed)
00697 return;
00698
00699 if (JoramTracing.dbgClient)
00700 JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00701 + ": closing...");
00702
00703
00704 if (consumersTimer != null)
00705 consumersTimer.cancel();
00706
00707
00708 try {
00709 repliesIn.stop();
00710 }
00711 catch (InterruptedException iE) {}
00712
00713
00714 stop();
00715
00716
00717 if (transacted)
00718 rollback();
00719 else
00720 deny();
00721
00722
00723 while (! browsers.isEmpty())
00724 ((QueueBrowser) browsers.elementAt(0)).close();
00725 while (! consumers.isEmpty())
00726 ((MessageConsumer) consumers.elementAt(0)).close();
00727 while (! producers.isEmpty())
00728 ((MessageProducer) producers.elementAt(0)).close();
00729
00730 cnx.sessions.removeElement(this);
00731
00732 closed = true;
00733
00734 if (JoramTracing.dbgClient)
00735 JoramTracing.log(JoramTracing.DEBUG, this + ": closed.");
00736 }
00737
00739 synchronized void schedule(TimerTask task, long timer)
00740 {
00741 if (consumersTimer == null)
00742 consumersTimer = new com.scalagent.kjoram.util.Timer();
00743
00744 try {
00745 consumersTimer.schedule(task, timer);
00746 }
00747 catch (Exception exc) {}
00748 }
00749
00763 void start() throws IllegalStateException
00764 {
00765 if (closed)
00766 throw new IllegalStateException("Forbidden call on a closed session.");
00767
00768 if (JoramTracing.dbgClient)
00769 JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00770 + ": starting...");
00771
00772 repliesIn.start();
00773
00774
00775 if (! started && msgListeners > 0) {
00776 daemon = new SessionDaemon(this);
00777 daemon.setDaemon(false);
00778 daemon.start();
00779 }
00780 started = true;
00781
00782 if (JoramTracing.dbgClient)
00783 JoramTracing.log(JoramTracing.DEBUG, this + ": started.");
00784 }
00785
00801 void stop()
00802 {
00803
00804 if (! started)
00805 return;
00806
00807 if (JoramTracing.dbgClient)
00808 JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00809 + ": stopping...");
00810
00811
00812 if (daemon != null) {
00813 daemon.stop();
00814 daemon = null;
00815 }
00816
00817 if (consumers != null) {
00818 MessageConsumer consumer;
00819 for (int i = 0; i < consumers.size(); i++) {
00820 consumer = (MessageConsumer) consumers.elementAt(i);
00821 consumer.syncro();
00822 }
00823 }
00824
00825 started = false;
00826
00827 if (JoramTracing.dbgClient)
00828 JoramTracing.log(JoramTracing.DEBUG, this + ": stopped.");
00829 }
00830
00838 void prepareSend(Destination dest, com.scalagent.kjoram.messages.Message msg)
00839 {
00840
00841 if (scheduled)
00842 closingTask.cancel();
00843
00844 ProducerMessages pM = (ProducerMessages) sendings.get(dest.getName());
00845 if (pM == null) {
00846 pM = new ProducerMessages(dest.getName());
00847 sendings.put(dest.getName(), pM);
00848 }
00849 pM.addMessage(msg);
00850
00851
00852 if (scheduled)
00853 cnx.schedule(closingTask);
00854 }
00855
00866 void prepareAck(String name, String id, boolean queueMode)
00867 {
00868
00869 if (scheduled)
00870 closingTask.cancel();
00871
00872 MessageAcks acks = (MessageAcks) deliveries.get(name);
00873 if (acks == null) {
00874 acks = new MessageAcks(queueMode);
00875 deliveries.put(name, acks);
00876 }
00877 acks.addId(id);
00878
00879
00880 if (closingTask != null) {
00881 scheduled = true;
00882 cnx.schedule(closingTask);
00883 }
00884 }
00885
00891 void acknowledge() throws IllegalStateException
00892 {
00893 String target;
00894 MessageAcks acks;
00895
00896 Enumeration targets = deliveries.keys();
00897 while (targets.hasMoreElements()) {
00898 target = (String) targets.nextElement();
00899 acks = (MessageAcks) deliveries.remove(target);
00900 cnx.asyncRequest(new SessAckRequest(target, acks.getIds(),
00901 acks.getQueueMode()));
00902 }
00903 }
00904
00906 void deny()
00907 {
00908 try {
00909 String target;
00910 MessageAcks acks;
00911 SessDenyRequest deny;
00912
00913 Enumeration targets = deliveries.keys();
00914 while (targets.hasMoreElements()) {
00915 target = (String) targets.nextElement();
00916 acks = (MessageAcks) deliveries.remove(target);
00917 deny = new SessDenyRequest(target, acks.getIds(), acks.getQueueMode());
00918 if (acks.getQueueMode())
00919 cnx.syncRequest(deny);
00920 else
00921 cnx.asyncRequest(deny);
00922 }
00923 }
00924 catch (JMSException jE) {}
00925 }
00926
00931 void distribute(AbstractJmsReply asyncReply)
00932 {
00933
00934 ConsumerMessages reply = (ConsumerMessages) asyncReply;
00935
00936
00937 MessageConsumer cons = null;
00938 if (reply.getQueueMode()) {
00939 cons =
00940 (MessageConsumer) cnx.requestsTable.remove(reply.getKey());
00941 }
00942 else
00943 cons = (MessageConsumer) cnx.requestsTable.get(reply.getKey());
00944
00945
00946 if (cons != null) {
00947 Vector msgs = reply.getMessages();
00948 for (int i = 0; i < msgs.size(); i++)
00949 cons.onMessage((com.scalagent.kjoram.messages.Message) msgs.elementAt(i));
00950 }
00951
00952
00953
00954 else {
00955 if (JoramTracing.dbgClient)
00956 JoramTracing.log(JoramTracing.WARN, this + ": an asynchronous"
00957 + " delivery arrived for an improperly"
00958 + " closed consumer: denying the"
00959 + " messages.");
00960
00961 Vector msgs = reply.getMessages();
00962 com.scalagent.kjoram.messages.Message msg;
00963 Vector ids = new Vector();
00964 for (int i = 0; i < msgs.size(); i++) {
00965 msg = (com.scalagent.kjoram.messages.Message) msgs.elementAt(i);
00966 ids.addElement(msg.getIdentifier());
00967 }
00968
00969 if (ids.isEmpty())
00970 return;
00971
00972 try {
00973 cnx.asyncRequest(new SessDenyRequest(reply.comesFrom(), ids,
00974 reply.getQueueMode(), true));
00975 }
00976 catch (JMSException jE) {}
00977 }
00978 }
00979
00985 private class SessionCloseTask extends TimerTask
00986 {
00988 public void run()
00989 {
00990 try {
00991 if (JoramTracing.dbgClient)
00992 JoramTracing.log(JoramTracing.WARN, "Session closed "
00993 + "because of pending transaction");
00994 close();
00995 }
00996 catch (Exception e) {}
00997 }
00998 }
00999 }