00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 package com.scalagent.kjoram;
00025
00026 import com.scalagent.kjoram.jms.*;
00027 import com.scalagent.kjoram.excepts.IllegalStateException;
00028 import com.scalagent.kjoram.excepts.*;
00029
00030 public class MessageProducer
00031 {
00033 private int deliveryMode = DeliveryMode.PERSISTENT;
00035 private int priority = 4;
00037 private long timeToLive = 0;
00043 private boolean messageIDDisabled = false;
00045 private boolean timestampDisabled = false;
00047 private boolean identified = true;
00048
00050 protected boolean closed = false;
00052 protected Session sess;
00054 protected Destination dest = null;
00055
00056
00066 MessageProducer(Session sess, Destination dest) throws JMSException
00067 {
00068 this.sess = sess;
00069 this.dest = dest;
00070
00071 if (dest == null)
00072 identified = false;
00073
00074 sess.producers.addElement(this);
00075
00076 if (JoramTracing.dbgClient)
00077 JoramTracing.log(JoramTracing.DEBUG, this + ": created.");
00078 }
00079
00085 public void setDisableMessageID(boolean value) throws JMSException
00086 {
00087 if (closed)
00088 throw new IllegalStateException("Forbidden call on a closed producer.");
00089 }
00090
00097 public void setDeliveryMode(int deliveryMode) throws JMSException
00098 {
00099 if (closed)
00100 throw new IllegalStateException("Forbidden call on a closed producer.");
00101
00102 if (deliveryMode != DeliveryMode.PERSISTENT
00103 && deliveryMode != DeliveryMode.NON_PERSISTENT)
00104 throw new JMSException("Can't set invalid delivery mode.");
00105
00106 this.deliveryMode = deliveryMode;
00107 }
00108
00115 public void setPriority(int priority) throws JMSException
00116 {
00117 if (closed)
00118 throw new IllegalStateException("Forbidden call on a closed producer.");
00119
00120 if (priority < 0 || priority > 9)
00121 throw new JMSException("Can't set invalid priority value.");
00122
00123 this.priority = priority;
00124 }
00125
00131 public void setTimeToLive(long timeToLive) throws JMSException
00132 {
00133 if (closed)
00134 throw new IllegalStateException("Forbidden call on a closed producer.");
00135
00136 this.timeToLive = timeToLive;
00137 }
00138
00144 public void setDisableMessageTimestamp(boolean value) throws JMSException
00145 {
00146 if (closed)
00147 throw new IllegalStateException("Forbidden call on a closed producer.");
00148
00149 this.timestampDisabled = value;
00150 }
00151
00157 public Destination getDestination() throws JMSException
00158 {
00159 if (closed)
00160 throw new IllegalStateException("Forbidden call on a closed producer.");
00161
00162 return dest;
00163 }
00164
00170 public boolean getDisableMessageID() throws JMSException
00171 {
00172 if (closed)
00173 throw new IllegalStateException("Forbidden call on a closed producer.");
00174
00175 return messageIDDisabled;
00176 }
00177
00183 public int getDeliveryMode() throws JMSException
00184 {
00185 if (closed)
00186 throw new IllegalStateException("Forbidden call on a closed producer.");
00187
00188 return deliveryMode;
00189 }
00190
00196 public int getPriority() throws JMSException
00197 {
00198 if (closed)
00199 throw new IllegalStateException("Forbidden call on a closed producer.");
00200
00201 return priority;
00202 }
00203
00209 public long getTimeToLive() throws JMSException
00210 {
00211 if (closed)
00212 throw new IllegalStateException("Forbidden call on a closed producer.");
00213
00214 return timeToLive;
00215 }
00216
00222 public boolean getDisableMessageTimestamp() throws JMSException
00223 {
00224 if (closed)
00225 throw new IllegalStateException("Forbidden call on a closed producer.");
00226
00227 return timestampDisabled;
00228 }
00229
00230
00239 public void send(Message message) throws JMSException
00240 {
00241 if (! identified)
00242 throw new RuntimeException("Can't send message to"
00243 + " an unidentified"
00244 + " destination.");
00245
00246 doSend(dest, message, deliveryMode, priority, timeToLive);
00247 }
00248
00257 public void send(Message message, int deliveryMode,
00258 int priority, long timeToLive) throws JMSException
00259 {
00260 if (! identified)
00261 throw new RuntimeException("Can't send message to"
00262 + " an unidentified"
00263 + " destination.");
00264
00265 doSend(dest, message, deliveryMode, priority, timeToLive);
00266 }
00267
00280 public void send(Destination dest,
00281 Message message) throws JMSException
00282 {
00283 if (identified)
00284 throw new RuntimeException("An unidentified message"
00285 + " producer can't use this"
00286 + " identified message"
00287 + " producer.");
00288 if (dest == null)
00289 throw new RuntimeException("Can't send message to"
00290 + " an unidentified"
00291 + " destination.");
00292
00293 doSend((Destination) dest, message, deliveryMode, priority, timeToLive);
00294 }
00295
00308 public void send(Destination dest, Message message,
00309 int deliveryMode, int priority,
00310 long timeToLive) throws JMSException
00311 {
00312 if (identified)
00313 throw new RuntimeException("An unidentified message"
00314 + " producer can't use this"
00315 + " identified message"
00316 + " producer.");
00317 if (dest == null)
00318 throw new RuntimeException("Can't send message to"
00319 + " an unidentified"
00320 + " destination.");
00321
00322 doSend((Destination) dest, message, deliveryMode, priority, timeToLive);
00323 }
00324
00330 public void close() throws JMSException
00331 {
00332
00333 if (closed)
00334 return;
00335
00336 if (JoramTracing.dbgClient)
00337 JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00338 + ": closing...");
00339
00340 sess.producers.removeElement(this);
00341 closed = true;
00342
00343 if (JoramTracing.dbgClient)
00344 JoramTracing.log(JoramTracing.DEBUG, this + ": closed.");
00345
00346 }
00347
00357 private void doSend(Destination dest, Message message,
00358 int deliveryMode, int priority,
00359 long timeToLive) throws JMSException
00360 {
00361 if (closed)
00362 throw new IllegalStateException("Forbidden call on a closed producer.");
00363
00364 if (JoramTracing.dbgClient)
00365 JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00366 + ": producing...");
00367
00368
00369 String msgID = sess.cnx.nextMessageId();
00370 message.setJMSMessageID(msgID);
00371 message.setJMSDeliveryMode(deliveryMode);
00372 message.setJMSDestination(dest);
00373 if (timeToLive == 0)
00374 message.setJMSExpiration(0);
00375 else
00376 message.setJMSExpiration(System.currentTimeMillis() + timeToLive);
00377 message.setJMSPriority(priority);
00378 if (! timestampDisabled)
00379 message.setJMSTimestamp(System.currentTimeMillis());
00380
00381 com.scalagent.kjoram.messages.Message momMsg = null;
00382
00383
00384 if (message instanceof Message)
00385 momMsg = ((Message) message).getMomMessage();
00386
00387
00388
00389 else if (message instanceof Message) {
00390 try {
00391 Message joramMessage = Message.convertJMSMessage(message);
00392 momMsg = joramMessage.getMomMessage();
00393 }
00394 catch (JMSException jE) {
00395 MessageFormatException mE = new MessageFormatException("Message to"
00396 + " send is"
00397 + " invalid.");
00398 mE.setLinkedException(jE);
00399 throw mE;
00400 }
00401 }
00402 else {
00403 MessageFormatException mE = new MessageFormatException("Message to"
00404 + " send is"
00405 + " invalid.");
00406 throw mE;
00407 }
00408
00409
00410 if (sess.transacted) {
00411 if (JoramTracing.dbgClient)
00412 JoramTracing.log(JoramTracing.DEBUG, "Buffering the message.");
00413
00414 sess.prepareSend(dest,
00415 (com.scalagent.kjoram.messages.Message) momMsg.clone());
00416 }
00417
00418 else {
00419 ProducerMessages pM = new ProducerMessages(dest.getName(), momMsg);
00420
00421 if (JoramTracing.dbgClient)
00422 JoramTracing.log(JoramTracing.DEBUG, "Sending " + momMsg);
00423
00424 sess.cnx.syncRequest(pM);
00425 }
00426 }
00427 }