com/scalagent/kjoram/Connection.java

00001 /*
00002  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
00003  * Copyright (C) 2001 - ScalAgent Distributed Technologies
00004  * Copyright (C) 1996 - Dyade
00005  *
00006  * This library is free software; you can redistribute it and/or
00007  * modify it under the terms of the GNU Lesser General Public
00008  * License as published by the Free Software Foundation; either
00009  * version 2.1 of the License, or any later version.
00010  * 
00011  * This library is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00014  * Lesser General Public License for more details.
00015  * 
00016  * You should have received a copy of the GNU Lesser General Public
00017  * License along with this library; if not, write to the Free Software
00018  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00019  * USA.
00020  *
00021  * Initial developer(s): Frederic Maistre (INRIA)
00022  * Contributor(s): Nicolas Tachker (ScalAgent)
00023  */
00024 package com.scalagent.kjoram;
00025 
00026 import com.scalagent.kjoram.excepts.IllegalStateException;
00027 import com.scalagent.kjoram.excepts.*;
00028 import com.scalagent.kjoram.jms.*;
00029 import com.scalagent.kjoram.*;
00030 import com.scalagent.kjoram.util.StoppedQueueException;
00031 
00032 import java.util.*;
00033 
00034 public class Connection
00035 { 
00037   private ConnectionItf connectionImpl;
00038   
00040   private String proxyId;
00042   private int key;
00043 
00045   private ConnectionMetaData metaData = null;
00047   private ExceptionListener excListener = null;
00048 
00050   private int requestsC = 0;
00052   private int sessionsC = 0;
00054   private int messagesC = 0;
00056   private int subsC = 0;
00057 
00059   private com.scalagent.kjoram.util.Timer sessionsTimer = null;
00060 
00062   FactoryParameters factoryParameters;
00063 
00065   Driver driver;
00066 
00068   boolean started = false;
00070   boolean closing = false;
00072   boolean closed = false;
00074   Vector sessions;
00076   Vector cconsumers;
00081   Hashtable requestsTable;
00085   Hashtable repliesTable;
00086 
00087   String name = null;
00088 
00098   public Connection(FactoryParameters factoryParameters,
00099                     ConnectionItf connectionImpl) throws JMSException
00100   {
00101     try {
00102       this.factoryParameters = factoryParameters;
00103 
00104       sessions = new Vector();
00105       requestsTable = new Hashtable();
00106       repliesTable = new Hashtable();
00107     
00108       this.connectionImpl = connectionImpl;
00109       name = connectionImpl.getUserName();
00110 
00111       // Creating and starting the connection's driver:
00112       driver = connectionImpl.createDriver(this);
00113       driver.start();
00114   
00115       // Requesting the connection key and proxy identifier:
00116       CnxConnectRequest req = new CnxConnectRequest();
00117       CnxConnectReply rep = (CnxConnectReply) syncRequest(req);
00118       proxyId = rep.getProxyId();
00119       key = rep.getCnxKey();
00120 
00121       // Transactions will be scheduled; creating a timer.
00122       if (factoryParameters.txPendingTimer != 0)
00123         sessionsTimer = new com.scalagent.kjoram.util.Timer();
00124 
00125       if (JoramTracing.dbgClient)
00126         JoramTracing.log(JoramTracing.DEBUG, this + ": opened."); 
00127     }
00128     // Connection could not be established:
00129     catch (JMSException jE) {
00130       JoramTracing.log(JoramTracing.ERROR, jE);
00131       throw jE;
00132     }
00133   }
00134 
00135   public String getUserName() {
00136     return name;
00137   }
00138 
00140   public String toString()
00141   {
00142     return "Cnx:" + proxyId + "-" + key;
00143   }
00144 
00150   public boolean equals(Object obj)
00151   {
00152     return (obj instanceof Connection)
00153            && toString().equals(obj.toString());
00154   }
00155 
00156 
00166   public ConnectionConsumer
00167       createConnectionConsumer(Destination dest, String selector,
00168                                ServerSessionPool sessionPool,
00169                                int maxMessages) throws JMSException
00170   {
00171     if (closed)
00172       throw new IllegalStateException("Forbidden call on a closed"
00173                                       + " connection.");
00174 
00175     return new ConnectionConsumer(this, (Destination) dest, selector,
00176                                   sessionPool, maxMessages);
00177   }
00178 
00188   public ConnectionConsumer
00189       createDurableConnectionConsumer(Topic topic, String subName,
00190                                       String selector,
00191                                       ServerSessionPool sessPool,
00192                                       int maxMessages) throws JMSException
00193   {
00194     if (closed)
00195       throw new IllegalStateException("Forbidden call on a closed"
00196                                       + " connection.");
00197 
00198     return new ConnectionConsumer(this, (Topic) topic, subName, selector,
00199                                   sessPool, maxMessages);
00200   }
00201 
00208   public Session
00209       createSession(boolean transacted, int acknowledgeMode)
00210     throws JMSException
00211   {
00212     if (closed)
00213       throw new IllegalStateException("Forbidden call on a closed"
00214                                       + " connection.");
00215 
00216     return new Session(this, transacted, acknowledgeMode);
00217   }
00218 
00224   public void setExceptionListener(ExceptionListener listener)
00225               throws JMSException
00226   {
00227     if (closed)
00228       throw new IllegalStateException("Forbidden call on a closed"
00229                                       + " connection.");
00230     this.excListener = listener;
00231   }
00232 
00238   public ExceptionListener getExceptionListener() throws JMSException
00239   {
00240     if (closed)
00241       throw new IllegalStateException("Forbidden call on a closed"
00242                                       + " connection.");
00243     return excListener;
00244   }
00245 
00251   synchronized void onException(JMSException jE)
00252   {
00253     if (JoramTracing.dbgClient)
00254       JoramTracing.log(JoramTracing.WARN, this + ": " + jE);
00255 
00256     if (excListener != null)
00257       excListener.onException(jE);
00258   }
00259 
00265   public void setClientID(String clientID) throws JMSException
00266   {
00267     throw new IllegalStateException("ClientID is already set by the"
00268                                     + " provider.");
00269   }
00270 
00276   public String getClientID() throws JMSException
00277   {
00278     if (closed)
00279       throw new IllegalStateException("Forbidden call on a closed"
00280                                       + " connection.");
00281     return proxyId;
00282   }
00283 
00289   public ConnectionMetaData getMetaData() throws JMSException
00290   {
00291     if (closed)
00292       throw new IllegalStateException("Forbidden call on a closed"
00293                                       + " connection.");
00294     if (metaData == null)
00295       metaData = new ConnectionMetaData();
00296     return metaData;
00297   }
00298 
00304   public void start() throws JMSException
00305   {
00306     // If closed, throwing an exception:
00307     if (closed)
00308       throw new IllegalStateException("Forbidden call on a closed"
00309                                       + " connection.");
00310 
00311     // Ignoring the call if the connection is started:
00312     if (started)
00313       return;
00314 
00315     if (JoramTracing.dbgClient)
00316       JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00317                        + ": starting..."); 
00318 
00319     // Starting the sessions:
00320     Session session;
00321     for (int i = 0; i < sessions.size(); i++) {
00322       session = (Session) sessions.elementAt(i);
00323       session.repliesIn.start();
00324       session.start();
00325     }
00326     // Sending a start request to the server:
00327     asyncRequest(new CnxStartRequest());
00328 
00329     started = true;
00330 
00331     if (JoramTracing.dbgClient)
00332       JoramTracing.log(JoramTracing.DEBUG, this + ": started."); 
00333   }
00334 
00341   public void stop() throws JMSException
00342   {
00343     IllegalStateException isE = null;
00344 
00345     // If closed, throwing an exception:
00346     if (closed)
00347       throw new IllegalStateException("Forbidden call on a closed"
00348                                       + " connection.");
00349 
00350     // Ignoring the call if the connection is already stopped:
00351     if (! started)
00352       return;
00353 
00354     if (JoramTracing.dbgClient)
00355       JoramTracing.log(JoramTracing.DEBUG, this + ": stopping..."); 
00356 
00357     // Sending a synchronous "stop" request to the server:
00358     try {
00359       syncRequest(new CnxStopRequest());
00360     }
00361     // Catching an IllegalStateException if the connection is broken:
00362     catch (IllegalStateException caughtISE) {
00363       isE = caughtISE;
00364     }
00365 
00366     // At this point, the server won't deliver messages anymore,
00367     // the connection just waits for the sessions to have finished their
00368     // processings.
00369     Session session;
00370     for (int i = 0; i < sessions.size(); i++) {
00371       session = (Session) sessions.elementAt(i);
00372       try {
00373         session.repliesIn.stop();
00374       }
00375       catch (InterruptedException iE) {}
00376       session.stop();
00377     }
00378 
00379     started = false;
00380 
00381     if (isE != null) {
00382       JoramTracing.log(JoramTracing.ERROR, isE);
00383       throw isE;
00384     }
00385 
00386     if (JoramTracing.dbgClient)
00387       JoramTracing.log(JoramTracing.DEBUG, this + ": is stopped."); 
00388   }
00389 
00390 
00397   public void close() throws JMSException
00398   {
00399     // Ignoring the call if the connection is closed:
00400     if (closed)
00401       return;
00402 
00403     closing = true;
00404 
00405     if (JoramTracing.dbgClient)
00406       JoramTracing.log(JoramTracing.DEBUG, "--- " + this 
00407                        + ": closing...");
00408 
00409     // Finishing the timer, if any:
00410     if (sessionsTimer != null)
00411       sessionsTimer.cancel();
00412 
00413     // Stopping the connection:
00414     try {
00415       stop();
00416     }
00417     // Catching a JMSException if the connection is broken:
00418     catch (JMSException jE) {}
00419 
00420     // Closing the sessions:
00421     Session session;
00422     while (! sessions.isEmpty()) {
00423       session = (Session) sessions.elementAt(0);
00424       try {
00425         session.close();
00426       }
00427       // Catching a JMSException if the connection is broken:
00428       catch (JMSException jE) {}
00429     }
00430 
00431     // Closing the connection consumers:
00432     if (cconsumers != null) {
00433       ConnectionConsumer cc;
00434       while (! cconsumers.isEmpty()) {
00435         cc = (ConnectionConsumer) cconsumers.elementAt(0);
00436         cc.close();
00437       }
00438     }
00439     
00440     // Closing the connection:
00441     connectionImpl.close();
00442 
00443     // Shutting down the driver, if needed:
00444     if (! driver.stopping)
00445       driver.stop();
00446 
00447     requestsTable.clear();
00448     requestsTable = null;
00449     repliesTable.clear();
00450     repliesTable = null;
00451 
00452     closed = true;
00453 
00454     if (JoramTracing.dbgClient)
00455       JoramTracing.log(JoramTracing.DEBUG, this + ": closed.");
00456   }
00457 
00459   synchronized int nextRequestId()
00460   {
00461     if (requestsC == Integer.MAX_VALUE)
00462       requestsC = 0;
00463     return requestsC++;
00464   }
00465 
00467   synchronized String nextSessionId()
00468   {
00469     if (sessionsC == Integer.MAX_VALUE)
00470       sessionsC = 0;
00471     sessionsC++;
00472     return "c" + key + "s" + sessionsC;
00473   }
00474  
00476   synchronized String nextMessageId()
00477   {
00478     if (messagesC == Integer.MAX_VALUE)
00479       messagesC = 0;
00480     messagesC++;
00481     return "ID:" + proxyId + "c" + key + "m" + messagesC;
00482   }
00483 
00485   synchronized String nextSubName()
00486   {
00487     if (subsC == Integer.MAX_VALUE)
00488       subsC = 0;
00489     subsC++;
00490     return "c"  + key + "sub" + subsC;
00491   }
00492 
00494   synchronized void schedule(com.scalagent.kjoram.util.TimerTask task)
00495   {
00496     if (sessionsTimer == null)
00497       return;
00498 
00499     try {
00500       sessionsTimer.schedule(task, factoryParameters.txPendingTimer * 1000);
00501     }
00502     catch (Exception exc) {}
00503   }
00504   
00516   AbstractJmsReply syncRequest(AbstractJmsRequest request) throws JMSException
00517   {
00518     if (closed)
00519       throw new IllegalStateException("Forbidden call on a closed"
00520                                       + " connection.");
00521 
00522     if (request.getRequestId() == -1)
00523       request.setRequestId(nextRequestId());
00524 
00525     int requestId = request.getRequestId();
00526 
00527     try {
00528       if (JoramTracing.dbgClient)
00529         JoramTracing.log(JoramTracing.DEBUG, this + ": sends request: "
00530                          + request.getClass().getName()
00531                          + " with id: " + requestId);
00532 
00533       Lock lock = new Lock();
00534       requestsTable.put(request.getKey(), lock);
00535       synchronized(lock) {
00536         connectionImpl.send(request);
00537         while (true) {
00538           try {
00539             lock.wait();
00540             break;
00541           }
00542           catch (InterruptedException iE) {
00543             if (JoramTracing.dbgClient)
00544               JoramTracing.log(JoramTracing.WARN,this
00545                                + ": caught InterruptedException");
00546             continue;
00547           }
00548         }
00549         requestsTable.remove(request.getKey());
00550       }
00551     }
00552     // Catching an exception because of...
00553     catch (Exception e) {
00554       JMSException jE = null;
00555       if (e instanceof JMSException)
00556         throw (JMSException) e;
00557       else
00558         jE = new JMSException("Exception while getting a reply.");
00559 
00560       jE.setLinkedException(e);
00561 
00562       // Unregistering the request:
00563       if (requestsTable != null)
00564         requestsTable.remove(request.getKey());
00565 
00566       JoramTracing.log(JoramTracing.ERROR, jE);
00567       throw jE;
00568     }
00569     // Finally, returning the reply:
00570     AbstractJmsReply reply =
00571       (AbstractJmsReply) repliesTable.remove(request.getKey());
00572 
00573     if (JoramTracing.dbgClient)
00574       JoramTracing.log(JoramTracing.DEBUG, this + ": got reply.");
00575 
00576     // If the reply is null, it means that the requester has been unlocked
00577     // by the driver because it detected a connection failure:
00578     if (reply == null)
00579       throw new IllegalStateException("Connection is broken.");
00580     // Else, if the reply notifies of an error: throwing the appropriate exc: 
00581     else if (reply instanceof MomExceptionReply) {
00582       MomException mE = ((MomExceptionReply) reply).getException();
00583 
00584       if (mE instanceof AccessException)
00585         throw new JMSSecurityException(mE.getMessage());
00586       else if (mE instanceof DestinationException)
00587         throw new InvalidDestinationException(mE.getMessage());
00588       else
00589         throw new JMSException(mE.getMessage());
00590     }
00591     // Else: returning the reply:
00592     else
00593       return reply;
00594   }
00595 
00601   void asyncRequest(AbstractJmsRequest request) throws IllegalStateException
00602   {
00603     if (closed)
00604       throw new IllegalStateException("Forbidden call on a closed"
00605                                       + " connection.");
00606 
00607     if (request.getRequestId() == -1)
00608       request.setRequestId(nextRequestId());
00609 
00610     try {
00611       if (JoramTracing.dbgClient)
00612         JoramTracing.log(JoramTracing.DEBUG, this + ": sends request: "
00613                          + request.getClass().getName()
00614                          + " with id: " + request.getRequestId());
00615       connectionImpl.send(request);
00616     }
00617     // In the case of a broken connection:
00618     catch (IllegalStateException exc) {
00619       // Removes the potentially stored requester:
00620       requestsTable.remove(request.getKey());
00621 
00622       JoramTracing.log(JoramTracing.ERROR, exc);
00623       throw exc;
00624     }
00625   }
00626 
00635   void distribute(AbstractJmsReply reply)
00636   {
00637     // Getting the correlation identifier:
00638     int correlationId = reply.getCorrelationId();
00639 
00640     if (JoramTracing.dbgClient)
00641       JoramTracing.log(JoramTracing.DEBUG, this + ": got reply: "
00642                        + correlationId);
00643 
00644     Object obj = null;
00645     if (correlationId != -1)
00646       obj = requestsTable.get(reply.getKey());
00647 
00648     // If the request is a synchronous request, putting the reply in the
00649     // replies table and unlocking the requester:
00650     if (obj instanceof Lock) {
00651       repliesTable.put(reply.getKey(), reply);
00652 
00653       synchronized(obj) {
00654         obj.notify();
00655       }
00656     }
00657     // If the reply is an asynchronous exception, passing it:
00658     else if (reply instanceof MomExceptionReply) {
00659       // Removing the potential consumer object from the table:
00660       requestsTable.remove(reply.getKey());
00661 
00662       MomException mE = ((MomExceptionReply) reply).getException();
00663       JMSException jE = null;
00664 
00665       if (mE instanceof AccessException)
00666         jE = new JMSSecurityException(mE.getMessage());
00667       else if (mE instanceof DestinationException)
00668         jE = new InvalidDestinationException(mE.getMessage());
00669       else
00670         jE = new JMSException(mE.getMessage());
00671 
00672       onException(jE);
00673     }
00674     // Else, if the reply is an asynchronous delivery:
00675     else if (obj != null) {
00676       try {
00677         // Passing the reply to its consumer:
00678         if (obj instanceof ConnectionConsumer)
00679           ((ConnectionConsumer) obj).repliesIn.push(reply);
00680         else if (obj instanceof MessageConsumer)
00681           ((MessageConsumer) obj).sess.repliesIn.push(reply);
00682       }
00683       catch (StoppedQueueException sqE) {
00684         denyDelivery((ConsumerMessages) reply);
00685       }
00686     }
00687     // Finally, if the requester disappeared, denying the delivery:
00688     else if (reply instanceof ConsumerMessages)
00689       denyDelivery((ConsumerMessages) reply);
00690   }
00691 
00693   private void denyDelivery(ConsumerMessages delivery)
00694   {
00695     Vector msgs = delivery.getMessages();
00696     com.scalagent.kjoram.messages.Message msg;
00697     Vector ids = new Vector();
00698 
00699     for (int i = 0; i < msgs.size(); i++) {
00700       msg = (com.scalagent.kjoram.messages.Message) msgs.elementAt(i);
00701       ids.addElement(msg.getIdentifier());
00702     }
00703 
00704     if (ids.isEmpty())
00705       return;
00706 
00707     try {
00708       // Sending the denying as an asynchronous request, as no synchronous
00709       // behaviour is expected here:
00710       asyncRequest(new SessDenyRequest(delivery.comesFrom(), ids,
00711                                        delivery.getQueueMode(), true));
00712     }
00713     // If sthg goes wrong while denying, nothing more can be done!
00714     catch (JMSException jE) {}
00715   }
00716 }

Generated on Tue Sep 16 16:14:23 2008 for joram by  doxygen 1.5.0