com/scalagent/joram/mom/dest/ftp/FtpQueueImpl.java

00001 /*
00002  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
00003  * Copyright (C) 2001 - 2008 ScalAgent Distributed Technologies
00004  *
00005  * This library is free software; you can redistribute it and/or
00006  * modify it under the terms of the GNU Lesser General Public
00007  * License as published by the Free Software Foundation; either
00008  * version 2.1 of the License, or any later version.
00009  * 
00010  * This library is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013  * Lesser General Public License for more details.
00014  * 
00015  * You should have received a copy of the GNU Lesser General Public
00016  * License along with this library; if not, write to the Free Software
00017  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00018  * USA.
00019  *
00020  * Initial developer(s): ScalAgent Distributed Technologies
00021  * Contributor(s): 
00022  */
00023 package com.scalagent.joram.mom.dest.ftp;
00024 
00025 import java.util.Enumeration;
00026 import java.util.Hashtable;
00027 import java.util.Properties;
00028 
00029 import org.objectweb.joram.mom.dest.QueueImpl;
00030 import org.objectweb.joram.mom.notifications.ClientMessages;
00031 import org.objectweb.joram.mom.util.DMQManager;
00032 import org.objectweb.joram.shared.MessageErrorConstants;
00033 import org.objectweb.joram.shared.messages.Message;
00034 import org.objectweb.util.monolog.api.BasicLevel;
00035 import org.objectweb.util.monolog.api.Logger;
00036 
00037 import fr.dyade.aaa.agent.AgentId;
00038 import fr.dyade.aaa.agent.Debug;
00039 
00044 public class FtpQueueImpl extends QueueImpl {
00046   private static final long serialVersionUID = 1L;
00047 
00048   public static Logger logger = Debug.getLogger(FtpQueueImpl.class.getName());
00049   
00050   private String user = "anonymous";
00051   private String pass = "no@no.no";
00052   private String path = null;
00053   private transient TransferItf transfer = null;
00054   private AgentId dmq = null;
00055   private int clientContext;
00056   private int requestId;
00057 
00058   public String ftpImplName = "com.scalagent.joram.mom.dest.ftp.TransferImplRef";
00059 
00060   private Hashtable transferTable;
00067   public FtpQueueImpl(AgentId adminId,
00068                       Properties prop) {
00069     super(adminId, prop);
00070     setProperties(prop);
00071 
00072     transferTable = new Hashtable();
00073     try {
00074       if ((ftpImplName != null) && (ftpImplName.length() > 0))
00075         transfer = (TransferItf) Class.forName(ftpImplName).newInstance();
00076     } catch (Exception exc) {
00077       transfer = null;
00078       logger.log(BasicLevel.ERROR, 
00079                  "FtpQueueImpl : transfer = null" ,exc);
00080     }
00081     if (logger.isLoggable(BasicLevel.DEBUG))
00082       logger.log(BasicLevel.DEBUG, "--- " + this +
00083                  " transfer = "+ transfer);
00084   }
00085 
00086   protected void setProperties(Properties prop) {
00087     user = prop.getProperty("user", user);
00088     pass = prop.getProperty("pass", pass);
00089     path = prop.getProperty("path", path);
00090     ftpImplName = prop.getProperty("ftpImpl", ftpImplName);
00091   }
00092   
00098   public void initialize(boolean firstTime) {
00099     if (logger.isLoggable(BasicLevel.DEBUG))
00100       logger.log(BasicLevel.DEBUG, "initialize(" + firstTime + ')');
00101     
00102     super.initialize(firstTime);
00103 
00104     try {
00105       if ((ftpImplName != null) && (ftpImplName.length() > 0))
00106         transfer = (TransferItf) Class.forName(ftpImplName).newInstance();
00107     } catch (Exception exc) {
00108       transfer = null;
00109       logger.log(BasicLevel.ERROR, 
00110                   "--- " + this + " initialize : transfer = null" ,exc);
00111     }
00112 
00113     if (logger.isLoggable(BasicLevel.DEBUG))
00114       logger.log(BasicLevel.DEBUG,
00115                   "--- " + this + " initialize transfer = "+ transfer);
00116 
00117     if (transfer != null) {
00118       if (logger.isLoggable(BasicLevel.DEBUG))
00119         logger.log(BasicLevel.DEBUG,
00120                    "--- " + this + " initialize : transferTable = " + transferTable);
00121 
00122       for (Enumeration e = transferTable.elements(); e.hasMoreElements(); ) {
00123         Message msg = (Message) e.nextElement();
00124         FtpMessage ftpMsg = new FtpMessage(msg);
00125         FtpThread t = new FtpThread(transfer,
00126                                     (FtpMessage) ftpMsg.clone(),
00127                                     getId(),
00128                                     dmq,
00129                                     clientContext,
00130                                     requestId,
00131                                     user,
00132                                     pass,
00133                                     path);
00134         t.start();
00135       }
00136     }
00137   }
00138 
00139   public String toString() {
00140     return "FtpQueueImpl:" + getId().toString();
00141   }
00142 
00143   public void ftpNot(FtpNot not) {
00144     if (logger.isLoggable(BasicLevel.DEBUG))
00145       logger.log(BasicLevel.DEBUG, "--- " + this +
00146                  " ftpNot(" + not + ")\n" +
00147                  "transferTable = " + transferTable);
00148     Message msg = (Message) not.getMessages().get(0);
00149     storeMessage(new org.objectweb.joram.mom.messages.Message(msg));
00150     deliverMessages(0);
00151     transferTable.remove(new FtpMessage(msg).getIdentifier());
00152 
00153     if (logger.isLoggable(BasicLevel.DEBUG))
00154       logger.log(BasicLevel.DEBUG,
00155                  "--- " + this + " doProcess : transferTable = " + transferTable);
00156   }
00157 
00158   public ClientMessages preProcess(AgentId from, ClientMessages not) {
00159     for (Enumeration msgs = not.getMessages().elements();
00160          msgs.hasMoreElements();) {
00161       Message msg = (Message) msgs.nextElement();
00162       if (isFtpMsg(msg)) {
00163         doProcessFtp(not,msg);
00164         not.getMessages().remove(msg);
00165       }
00166     }
00167     if (not.getMessages().size() > 0) {
00168       return not;
00169     }
00170     return null;
00171   }
00172 
00173   protected boolean isFtpMsg(Message message) {
00174     FtpMessage msg = new FtpMessage(message);
00175     return (msg.propertyExists(SharedObj.url) &&
00176             msg.propertyExists(SharedObj.crc) &&
00177             msg.propertyExists(SharedObj.ack));
00178   }
00179 
00180   protected void doProcessFtp(ClientMessages not,
00181                               Message msg) {
00182 
00183     if (logger.isLoggable(BasicLevel.DEBUG))
00184       logger.log(BasicLevel.DEBUG,
00185                  "--- " + this + " doProcessFtp(" + not + "," + msg + ")");
00186 
00187     if (transfer != null) {
00188       dmq = not.getDMQId();
00189       if (dmq == null && super.dmqId != null)
00190         dmq = super.dmqId;
00191       else if ( dmq == null)
00192         dmq = QueueImpl.getDefaultDMQId();
00193 
00194       clientContext = not.getClientContext();
00195       requestId = not.getRequestId();
00196 
00197       FtpMessage ftpMsg = new FtpMessage(msg);
00198       transferTable.put(ftpMsg.getIdentifier(),ftpMsg);
00199       FtpThread t = new FtpThread(transfer,
00200                                   (FtpMessage) ftpMsg.clone(),
00201                                   getId(),
00202                                   dmq,
00203                                   clientContext,
00204                                   requestId,
00205                                   user,
00206                                   pass,
00207                                   path);
00208       t.start();
00209     } else {
00210       DMQManager dmqManager = new DMQManager(not.getDMQId(), dmqId, getId());
00211       nbMsgsSentToDMQSinceCreation++;
00212       dmqManager.addDeadMessage(msg, MessageErrorConstants.NOT_WRITEABLE);
00213       dmqManager.sendToDMQ();
00214     }
00215   }
00216 }

Generated on Tue Sep 16 16:14:22 2008 for joram by  doxygen 1.5.0