00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 package com.scalagent.kjoram;
00025
00026 import com.scalagent.kjoram.excepts.IllegalStateException;
00027 import com.scalagent.kjoram.excepts.*;
00028 import com.scalagent.kjoram.jms.*;
00029 import com.scalagent.kjoram.*;
00030 import com.scalagent.kjoram.util.StoppedQueueException;
00031
00032 import java.util.*;
00033
00034 public class Connection
00035 {
00037 private ConnectionItf connectionImpl;
00038
00040 private String proxyId;
00042 private int key;
00043
00045 private ConnectionMetaData metaData = null;
00047 private ExceptionListener excListener = null;
00048
00050 private int requestsC = 0;
00052 private int sessionsC = 0;
00054 private int messagesC = 0;
00056 private int subsC = 0;
00057
00059 private com.scalagent.kjoram.util.Timer sessionsTimer = null;
00060
00062 FactoryParameters factoryParameters;
00063
00065 Driver driver;
00066
00068 boolean started = false;
00070 boolean closing = false;
00072 boolean closed = false;
00074 Vector sessions;
00076 Vector cconsumers;
00081 Hashtable requestsTable;
00085 Hashtable repliesTable;
00086
00087 String name = null;
00088
00098 public Connection(FactoryParameters factoryParameters,
00099 ConnectionItf connectionImpl) throws JMSException
00100 {
00101 try {
00102 this.factoryParameters = factoryParameters;
00103
00104 sessions = new Vector();
00105 requestsTable = new Hashtable();
00106 repliesTable = new Hashtable();
00107
00108 this.connectionImpl = connectionImpl;
00109 name = connectionImpl.getUserName();
00110
00111
00112 driver = connectionImpl.createDriver(this);
00113 driver.start();
00114
00115
00116 CnxConnectRequest req = new CnxConnectRequest();
00117 CnxConnectReply rep = (CnxConnectReply) syncRequest(req);
00118 proxyId = rep.getProxyId();
00119 key = rep.getCnxKey();
00120
00121
00122 if (factoryParameters.txPendingTimer != 0)
00123 sessionsTimer = new com.scalagent.kjoram.util.Timer();
00124
00125 if (JoramTracing.dbgClient)
00126 JoramTracing.log(JoramTracing.DEBUG, this + ": opened.");
00127 }
00128
00129 catch (JMSException jE) {
00130 JoramTracing.log(JoramTracing.ERROR, jE);
00131 throw jE;
00132 }
00133 }
00134
00135 public String getUserName() {
00136 return name;
00137 }
00138
00140 public String toString()
00141 {
00142 return "Cnx:" + proxyId + "-" + key;
00143 }
00144
00150 public boolean equals(Object obj)
00151 {
00152 return (obj instanceof Connection)
00153 && toString().equals(obj.toString());
00154 }
00155
00156
00166 public ConnectionConsumer
00167 createConnectionConsumer(Destination dest, String selector,
00168 ServerSessionPool sessionPool,
00169 int maxMessages) throws JMSException
00170 {
00171 if (closed)
00172 throw new IllegalStateException("Forbidden call on a closed"
00173 + " connection.");
00174
00175 return new ConnectionConsumer(this, (Destination) dest, selector,
00176 sessionPool, maxMessages);
00177 }
00178
00188 public ConnectionConsumer
00189 createDurableConnectionConsumer(Topic topic, String subName,
00190 String selector,
00191 ServerSessionPool sessPool,
00192 int maxMessages) throws JMSException
00193 {
00194 if (closed)
00195 throw new IllegalStateException("Forbidden call on a closed"
00196 + " connection.");
00197
00198 return new ConnectionConsumer(this, (Topic) topic, subName, selector,
00199 sessPool, maxMessages);
00200 }
00201
00208 public Session
00209 createSession(boolean transacted, int acknowledgeMode)
00210 throws JMSException
00211 {
00212 if (closed)
00213 throw new IllegalStateException("Forbidden call on a closed"
00214 + " connection.");
00215
00216 return new Session(this, transacted, acknowledgeMode);
00217 }
00218
00224 public void setExceptionListener(ExceptionListener listener)
00225 throws JMSException
00226 {
00227 if (closed)
00228 throw new IllegalStateException("Forbidden call on a closed"
00229 + " connection.");
00230 this.excListener = listener;
00231 }
00232
00238 public ExceptionListener getExceptionListener() throws JMSException
00239 {
00240 if (closed)
00241 throw new IllegalStateException("Forbidden call on a closed"
00242 + " connection.");
00243 return excListener;
00244 }
00245
00251 synchronized void onException(JMSException jE)
00252 {
00253 if (JoramTracing.dbgClient)
00254 JoramTracing.log(JoramTracing.WARN, this + ": " + jE);
00255
00256 if (excListener != null)
00257 excListener.onException(jE);
00258 }
00259
00265 public void setClientID(String clientID) throws JMSException
00266 {
00267 throw new IllegalStateException("ClientID is already set by the"
00268 + " provider.");
00269 }
00270
00276 public String getClientID() throws JMSException
00277 {
00278 if (closed)
00279 throw new IllegalStateException("Forbidden call on a closed"
00280 + " connection.");
00281 return proxyId;
00282 }
00283
00289 public ConnectionMetaData getMetaData() throws JMSException
00290 {
00291 if (closed)
00292 throw new IllegalStateException("Forbidden call on a closed"
00293 + " connection.");
00294 if (metaData == null)
00295 metaData = new ConnectionMetaData();
00296 return metaData;
00297 }
00298
00304 public void start() throws JMSException
00305 {
00306
00307 if (closed)
00308 throw new IllegalStateException("Forbidden call on a closed"
00309 + " connection.");
00310
00311
00312 if (started)
00313 return;
00314
00315 if (JoramTracing.dbgClient)
00316 JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00317 + ": starting...");
00318
00319
00320 Session session;
00321 for (int i = 0; i < sessions.size(); i++) {
00322 session = (Session) sessions.elementAt(i);
00323 session.repliesIn.start();
00324 session.start();
00325 }
00326
00327 asyncRequest(new CnxStartRequest());
00328
00329 started = true;
00330
00331 if (JoramTracing.dbgClient)
00332 JoramTracing.log(JoramTracing.DEBUG, this + ": started.");
00333 }
00334
00341 public void stop() throws JMSException
00342 {
00343 IllegalStateException isE = null;
00344
00345
00346 if (closed)
00347 throw new IllegalStateException("Forbidden call on a closed"
00348 + " connection.");
00349
00350
00351 if (! started)
00352 return;
00353
00354 if (JoramTracing.dbgClient)
00355 JoramTracing.log(JoramTracing.DEBUG, this + ": stopping...");
00356
00357
00358 try {
00359 syncRequest(new CnxStopRequest());
00360 }
00361
00362 catch (IllegalStateException caughtISE) {
00363 isE = caughtISE;
00364 }
00365
00366
00367
00368
00369 Session session;
00370 for (int i = 0; i < sessions.size(); i++) {
00371 session = (Session) sessions.elementAt(i);
00372 try {
00373 session.repliesIn.stop();
00374 }
00375 catch (InterruptedException iE) {}
00376 session.stop();
00377 }
00378
00379 started = false;
00380
00381 if (isE != null) {
00382 JoramTracing.log(JoramTracing.ERROR, isE);
00383 throw isE;
00384 }
00385
00386 if (JoramTracing.dbgClient)
00387 JoramTracing.log(JoramTracing.DEBUG, this + ": is stopped.");
00388 }
00389
00390
00397 public void close() throws JMSException
00398 {
00399
00400 if (closed)
00401 return;
00402
00403 closing = true;
00404
00405 if (JoramTracing.dbgClient)
00406 JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00407 + ": closing...");
00408
00409
00410 if (sessionsTimer != null)
00411 sessionsTimer.cancel();
00412
00413
00414 try {
00415 stop();
00416 }
00417
00418 catch (JMSException jE) {}
00419
00420
00421 Session session;
00422 while (! sessions.isEmpty()) {
00423 session = (Session) sessions.elementAt(0);
00424 try {
00425 session.close();
00426 }
00427
00428 catch (JMSException jE) {}
00429 }
00430
00431
00432 if (cconsumers != null) {
00433 ConnectionConsumer cc;
00434 while (! cconsumers.isEmpty()) {
00435 cc = (ConnectionConsumer) cconsumers.elementAt(0);
00436 cc.close();
00437 }
00438 }
00439
00440
00441 connectionImpl.close();
00442
00443
00444 if (! driver.stopping)
00445 driver.stop();
00446
00447 requestsTable.clear();
00448 requestsTable = null;
00449 repliesTable.clear();
00450 repliesTable = null;
00451
00452 closed = true;
00453
00454 if (JoramTracing.dbgClient)
00455 JoramTracing.log(JoramTracing.DEBUG, this + ": closed.");
00456 }
00457
00459 synchronized int nextRequestId()
00460 {
00461 if (requestsC == Integer.MAX_VALUE)
00462 requestsC = 0;
00463 return requestsC++;
00464 }
00465
00467 synchronized String nextSessionId()
00468 {
00469 if (sessionsC == Integer.MAX_VALUE)
00470 sessionsC = 0;
00471 sessionsC++;
00472 return "c" + key + "s" + sessionsC;
00473 }
00474
00476 synchronized String nextMessageId()
00477 {
00478 if (messagesC == Integer.MAX_VALUE)
00479 messagesC = 0;
00480 messagesC++;
00481 return "ID:" + proxyId + "c" + key + "m" + messagesC;
00482 }
00483
00485 synchronized String nextSubName()
00486 {
00487 if (subsC == Integer.MAX_VALUE)
00488 subsC = 0;
00489 subsC++;
00490 return "c" + key + "sub" + subsC;
00491 }
00492
00494 synchronized void schedule(com.scalagent.kjoram.util.TimerTask task)
00495 {
00496 if (sessionsTimer == null)
00497 return;
00498
00499 try {
00500 sessionsTimer.schedule(task, factoryParameters.txPendingTimer * 1000);
00501 }
00502 catch (Exception exc) {}
00503 }
00504
00516 AbstractJmsReply syncRequest(AbstractJmsRequest request) throws JMSException
00517 {
00518 if (closed)
00519 throw new IllegalStateException("Forbidden call on a closed"
00520 + " connection.");
00521
00522 if (request.getRequestId() == -1)
00523 request.setRequestId(nextRequestId());
00524
00525 int requestId = request.getRequestId();
00526
00527 try {
00528 if (JoramTracing.dbgClient)
00529 JoramTracing.log(JoramTracing.DEBUG, this + ": sends request: "
00530 + request.getClass().getName()
00531 + " with id: " + requestId);
00532
00533 Lock lock = new Lock();
00534 requestsTable.put(request.getKey(), lock);
00535 synchronized(lock) {
00536 connectionImpl.send(request);
00537 while (true) {
00538 try {
00539 lock.wait();
00540 break;
00541 }
00542 catch (InterruptedException iE) {
00543 if (JoramTracing.dbgClient)
00544 JoramTracing.log(JoramTracing.WARN,this
00545 + ": caught InterruptedException");
00546 continue;
00547 }
00548 }
00549 requestsTable.remove(request.getKey());
00550 }
00551 }
00552
00553 catch (Exception e) {
00554 JMSException jE = null;
00555 if (e instanceof JMSException)
00556 throw (JMSException) e;
00557 else
00558 jE = new JMSException("Exception while getting a reply.");
00559
00560 jE.setLinkedException(e);
00561
00562
00563 if (requestsTable != null)
00564 requestsTable.remove(request.getKey());
00565
00566 JoramTracing.log(JoramTracing.ERROR, jE);
00567 throw jE;
00568 }
00569
00570 AbstractJmsReply reply =
00571 (AbstractJmsReply) repliesTable.remove(request.getKey());
00572
00573 if (JoramTracing.dbgClient)
00574 JoramTracing.log(JoramTracing.DEBUG, this + ": got reply.");
00575
00576
00577
00578 if (reply == null)
00579 throw new IllegalStateException("Connection is broken.");
00580
00581 else if (reply instanceof MomExceptionReply) {
00582 MomException mE = ((MomExceptionReply) reply).getException();
00583
00584 if (mE instanceof AccessException)
00585 throw new JMSSecurityException(mE.getMessage());
00586 else if (mE instanceof DestinationException)
00587 throw new InvalidDestinationException(mE.getMessage());
00588 else
00589 throw new JMSException(mE.getMessage());
00590 }
00591
00592 else
00593 return reply;
00594 }
00595
00601 void asyncRequest(AbstractJmsRequest request) throws IllegalStateException
00602 {
00603 if (closed)
00604 throw new IllegalStateException("Forbidden call on a closed"
00605 + " connection.");
00606
00607 if (request.getRequestId() == -1)
00608 request.setRequestId(nextRequestId());
00609
00610 try {
00611 if (JoramTracing.dbgClient)
00612 JoramTracing.log(JoramTracing.DEBUG, this + ": sends request: "
00613 + request.getClass().getName()
00614 + " with id: " + request.getRequestId());
00615 connectionImpl.send(request);
00616 }
00617
00618 catch (IllegalStateException exc) {
00619
00620 requestsTable.remove(request.getKey());
00621
00622 JoramTracing.log(JoramTracing.ERROR, exc);
00623 throw exc;
00624 }
00625 }
00626
00635 void distribute(AbstractJmsReply reply)
00636 {
00637
00638 int correlationId = reply.getCorrelationId();
00639
00640 if (JoramTracing.dbgClient)
00641 JoramTracing.log(JoramTracing.DEBUG, this + ": got reply: "
00642 + correlationId);
00643
00644 Object obj = null;
00645 if (correlationId != -1)
00646 obj = requestsTable.get(reply.getKey());
00647
00648
00649
00650 if (obj instanceof Lock) {
00651 repliesTable.put(reply.getKey(), reply);
00652
00653 synchronized(obj) {
00654 obj.notify();
00655 }
00656 }
00657
00658 else if (reply instanceof MomExceptionReply) {
00659
00660 requestsTable.remove(reply.getKey());
00661
00662 MomException mE = ((MomExceptionReply) reply).getException();
00663 JMSException jE = null;
00664
00665 if (mE instanceof AccessException)
00666 jE = new JMSSecurityException(mE.getMessage());
00667 else if (mE instanceof DestinationException)
00668 jE = new InvalidDestinationException(mE.getMessage());
00669 else
00670 jE = new JMSException(mE.getMessage());
00671
00672 onException(jE);
00673 }
00674
00675 else if (obj != null) {
00676 try {
00677
00678 if (obj instanceof ConnectionConsumer)
00679 ((ConnectionConsumer) obj).repliesIn.push(reply);
00680 else if (obj instanceof MessageConsumer)
00681 ((MessageConsumer) obj).sess.repliesIn.push(reply);
00682 }
00683 catch (StoppedQueueException sqE) {
00684 denyDelivery((ConsumerMessages) reply);
00685 }
00686 }
00687
00688 else if (reply instanceof ConsumerMessages)
00689 denyDelivery((ConsumerMessages) reply);
00690 }
00691
00693 private void denyDelivery(ConsumerMessages delivery)
00694 {
00695 Vector msgs = delivery.getMessages();
00696 com.scalagent.kjoram.messages.Message msg;
00697 Vector ids = new Vector();
00698
00699 for (int i = 0; i < msgs.size(); i++) {
00700 msg = (com.scalagent.kjoram.messages.Message) msgs.elementAt(i);
00701 ids.addElement(msg.getIdentifier());
00702 }
00703
00704 if (ids.isEmpty())
00705 return;
00706
00707 try {
00708
00709
00710 asyncRequest(new SessDenyRequest(delivery.comesFrom(), ids,
00711 delivery.getQueueMode(), true));
00712 }
00713
00714 catch (JMSException jE) {}
00715 }
00716 }