com/scalagent/kjoram/MessageProducer.java

00001 /*
00002  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
00003  * Copyright (C) 2001 - ScalAgent Distributed Technologies
00004  * Copyright (C) 1996 - Dyade
00005  *
00006  * This library is free software; you can redistribute it and/or
00007  * modify it under the terms of the GNU Lesser General Public
00008  * License as published by the Free Software Foundation; either
00009  * version 2.1 of the License, or any later version.
00010  * 
00011  * This library is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00014  * Lesser General Public License for more details.
00015  * 
00016  * You should have received a copy of the GNU Lesser General Public
00017  * License along with this library; if not, write to the Free Software
00018  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00019  * USA.
00020  *
00021  * Initial developer(s): Frederic Maistre (INRIA)
00022  * Contributor(s): Nicolas Tachker (ScalAgent)
00023  */
00024 package com.scalagent.kjoram;
00025 
00026 import com.scalagent.kjoram.jms.*;
00027 import com.scalagent.kjoram.excepts.IllegalStateException;
00028 import com.scalagent.kjoram.excepts.*;
00029 
00030 public class MessageProducer
00031 {
00033   private int deliveryMode = DeliveryMode.PERSISTENT;
00035   private int priority = 4;
00037   private long timeToLive = 0;
00043   private boolean messageIDDisabled = false;
00045   private boolean timestampDisabled = false;
00047   private boolean identified = true;
00048 
00050   protected boolean closed = false;
00052   protected Session sess;
00054   protected Destination dest = null;
00055   
00056 
00066   MessageProducer(Session sess, Destination dest) throws JMSException
00067     {
00068       this.sess = sess;
00069       this.dest = dest;
00070 
00071       if (dest == null)
00072         identified = false;
00073 
00074       sess.producers.addElement(this);
00075 
00076       if (JoramTracing.dbgClient)
00077         JoramTracing.log(JoramTracing.DEBUG, this + ": created.");
00078     }
00079 
00085   public void setDisableMessageID(boolean value) throws JMSException
00086     {
00087       if (closed)
00088         throw new IllegalStateException("Forbidden call on a closed producer.");
00089     }
00090 
00097   public void setDeliveryMode(int deliveryMode) throws JMSException
00098     {
00099       if (closed)
00100         throw new IllegalStateException("Forbidden call on a closed producer.");
00101 
00102       if (deliveryMode != DeliveryMode.PERSISTENT
00103           && deliveryMode != DeliveryMode.NON_PERSISTENT)
00104         throw new JMSException("Can't set invalid delivery mode.");
00105 
00106       this.deliveryMode = deliveryMode;
00107     }
00108 
00115   public void setPriority(int priority) throws JMSException
00116     {
00117       if (closed)
00118         throw new IllegalStateException("Forbidden call on a closed producer.");
00119 
00120       if (priority < 0 || priority > 9)
00121         throw new JMSException("Can't set invalid priority value.");
00122 
00123       this.priority = priority;
00124     }
00125 
00131   public void setTimeToLive(long timeToLive) throws JMSException
00132     {
00133       if (closed)
00134         throw new IllegalStateException("Forbidden call on a closed producer.");
00135 
00136       this.timeToLive = timeToLive;
00137     }
00138 
00144   public void setDisableMessageTimestamp(boolean value) throws JMSException
00145     {
00146       if (closed)
00147         throw new IllegalStateException("Forbidden call on a closed producer.");
00148 
00149       this.timestampDisabled = value;
00150     }
00151 
00157   public Destination getDestination() throws JMSException
00158     {
00159       if (closed)
00160         throw new IllegalStateException("Forbidden call on a closed producer.");
00161 
00162       return dest;
00163     }
00164 
00170   public boolean getDisableMessageID() throws JMSException
00171     {
00172       if (closed)
00173         throw new IllegalStateException("Forbidden call on a closed producer.");
00174 
00175       return messageIDDisabled;
00176     }
00177 
00183   public int getDeliveryMode() throws JMSException
00184     {
00185       if (closed)
00186         throw new IllegalStateException("Forbidden call on a closed producer.");
00187 
00188       return deliveryMode;
00189     }
00190 
00196   public int getPriority() throws JMSException
00197     {
00198       if (closed)
00199         throw new IllegalStateException("Forbidden call on a closed producer.");
00200 
00201       return priority;
00202     }
00203 
00209   public long getTimeToLive() throws JMSException
00210     {
00211       if (closed)
00212         throw new IllegalStateException("Forbidden call on a closed producer.");
00213 
00214       return timeToLive;
00215     }
00216 
00222   public boolean getDisableMessageTimestamp() throws JMSException
00223     {
00224       if (closed)
00225         throw new IllegalStateException("Forbidden call on a closed producer.");
00226 
00227       return timestampDisabled;
00228     }
00229 
00230 
00239   public void send(Message message) throws JMSException
00240     {
00241       if (! identified)
00242         throw new RuntimeException("Can't send message to"
00243                                    + " an unidentified"
00244                                    + " destination.");
00245       // Actually producing it:
00246       doSend(dest, message, deliveryMode, priority, timeToLive);
00247     }
00248 
00257   public void send(Message message, int deliveryMode,
00258                    int priority, long timeToLive) throws JMSException
00259     {
00260       if (! identified)
00261         throw new RuntimeException("Can't send message to"
00262                                    + " an unidentified"
00263                                    + " destination.");
00264       // Actually producing it:
00265       doSend(dest, message, deliveryMode, priority, timeToLive);
00266     }
00267 
00280   public void send(Destination dest,
00281                    Message message) throws JMSException
00282     {
00283       if (identified)
00284         throw new RuntimeException("An unidentified message"
00285                                    + " producer can't use this"
00286                                    + " identified message"
00287                                    + " producer.");
00288       if (dest == null)
00289         throw new RuntimeException("Can't send message to"
00290                                    + " an unidentified"
00291                                    + " destination.");
00292       
00293       doSend((Destination) dest, message, deliveryMode, priority, timeToLive);
00294     }
00295 
00308   public void send(Destination dest, Message message,
00309                    int deliveryMode, int priority,
00310                    long timeToLive) throws JMSException
00311     {
00312       if (identified)
00313         throw new RuntimeException("An unidentified message"
00314                                    + " producer can't use this"
00315                                    + " identified message"
00316                                    + " producer.");
00317       if (dest == null)
00318         throw new RuntimeException("Can't send message to"
00319                                    + " an unidentified"
00320                                    + " destination.");
00321 
00322       doSend((Destination) dest, message, deliveryMode, priority, timeToLive);
00323     }
00324 
00330   public void close() throws JMSException
00331     {
00332       // Ignoring call if producer is already closed:
00333       if (closed)
00334         return;
00335 
00336       if (JoramTracing.dbgClient)
00337         JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00338                          + ": closing...");
00339 
00340       sess.producers.removeElement(this);
00341       closed = true;
00342 
00343       if (JoramTracing.dbgClient)
00344         JoramTracing.log(JoramTracing.DEBUG, this + ": closed.");
00345 
00346     }
00347 
00357   private void doSend(Destination dest, Message message,
00358                       int deliveryMode, int priority,
00359                       long timeToLive) throws JMSException
00360     {
00361       if (closed)
00362         throw new IllegalStateException("Forbidden call on a closed producer.");
00363 
00364       if (JoramTracing.dbgClient)
00365         JoramTracing.log(JoramTracing.DEBUG, "--- " + this
00366                          + ": producing...");
00367 
00368       // Updating the message property fields:
00369       String msgID = sess.cnx.nextMessageId();
00370       message.setJMSMessageID(msgID);
00371       message.setJMSDeliveryMode(deliveryMode);
00372       message.setJMSDestination(dest);
00373       if (timeToLive == 0)
00374         message.setJMSExpiration(0);
00375       else
00376         message.setJMSExpiration(System.currentTimeMillis() + timeToLive);
00377       message.setJMSPriority(priority);
00378       if (! timestampDisabled)
00379         message.setJMSTimestamp(System.currentTimeMillis());
00380 
00381       com.scalagent.kjoram.messages.Message momMsg = null;
00382       // If the message to send is a proprietary one, getting the MOM message
00383       // it wraps:
00384       if (message instanceof Message)
00385         momMsg = ((Message) message).getMomMessage();
00386 
00387       // If the message to send is a non proprietary JMS message, building
00388       // a proprietary message and then getting the MOM message it wraps:
00389       else if (message instanceof Message) {
00390         try {
00391           Message joramMessage = Message.convertJMSMessage(message);
00392           momMsg = joramMessage.getMomMessage();
00393         }
00394         catch (JMSException jE) {
00395           MessageFormatException mE = new MessageFormatException("Message to"
00396                                                                  + " send is"
00397                                                                  + " invalid.");
00398           mE.setLinkedException(jE);
00399           throw mE;
00400         }
00401       }
00402       else {
00403         MessageFormatException mE = new MessageFormatException("Message to"
00404                                                                + " send is"
00405                                                                + " invalid.");
00406         throw mE;
00407       }
00408 
00409       // If the session is transacted, keeping the request for later delivery:
00410       if (sess.transacted) {
00411         if (JoramTracing.dbgClient)
00412           JoramTracing.log(JoramTracing.DEBUG, "Buffering the message.");
00413 
00414         sess.prepareSend(dest,
00415                          (com.scalagent.kjoram.messages.Message) momMsg.clone());
00416       }
00417       // If not, building a new request and sending it:
00418       else {
00419       ProducerMessages pM = new ProducerMessages(dest.getName(), momMsg);
00420 
00421       if (JoramTracing.dbgClient)
00422         JoramTracing.log(JoramTracing.DEBUG, "Sending " + momMsg);
00423       
00424         sess.cnx.syncRequest(pM);
00425       }
00426     }
00427 }

Generated on Tue Sep 16 16:14:24 2008 for joram by  doxygen 1.5.0