00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 package com.scalagent.joram.mom.dest.ftp;
00024
00025 import java.util.Enumeration;
00026 import java.util.Hashtable;
00027 import java.util.Properties;
00028
00029 import org.objectweb.joram.mom.dest.QueueImpl;
00030 import org.objectweb.joram.mom.notifications.ClientMessages;
00031 import org.objectweb.joram.mom.util.DMQManager;
00032 import org.objectweb.joram.shared.MessageErrorConstants;
00033 import org.objectweb.joram.shared.messages.Message;
00034 import org.objectweb.util.monolog.api.BasicLevel;
00035 import org.objectweb.util.monolog.api.Logger;
00036
00037 import fr.dyade.aaa.agent.AgentId;
00038 import fr.dyade.aaa.agent.Debug;
00039
00044 public class FtpQueueImpl extends QueueImpl {
00046 private static final long serialVersionUID = 1L;
00047
00048 public static Logger logger = Debug.getLogger(FtpQueueImpl.class.getName());
00049
00050 private String user = "anonymous";
00051 private String pass = "no@no.no";
00052 private String path = null;
00053 private transient TransferItf transfer = null;
00054 private AgentId dmq = null;
00055 private int clientContext;
00056 private int requestId;
00057
00058 public String ftpImplName = "com.scalagent.joram.mom.dest.ftp.TransferImplRef";
00059
00060 private Hashtable transferTable;
00067 public FtpQueueImpl(AgentId adminId,
00068 Properties prop) {
00069 super(adminId, prop);
00070 setProperties(prop);
00071
00072 transferTable = new Hashtable();
00073 try {
00074 if ((ftpImplName != null) && (ftpImplName.length() > 0))
00075 transfer = (TransferItf) Class.forName(ftpImplName).newInstance();
00076 } catch (Exception exc) {
00077 transfer = null;
00078 logger.log(BasicLevel.ERROR,
00079 "FtpQueueImpl : transfer = null" ,exc);
00080 }
00081 if (logger.isLoggable(BasicLevel.DEBUG))
00082 logger.log(BasicLevel.DEBUG, "--- " + this +
00083 " transfer = "+ transfer);
00084 }
00085
00086 protected void setProperties(Properties prop) {
00087 user = prop.getProperty("user", user);
00088 pass = prop.getProperty("pass", pass);
00089 path = prop.getProperty("path", path);
00090 ftpImplName = prop.getProperty("ftpImpl", ftpImplName);
00091 }
00092
00098 public void initialize(boolean firstTime) {
00099 if (logger.isLoggable(BasicLevel.DEBUG))
00100 logger.log(BasicLevel.DEBUG, "initialize(" + firstTime + ')');
00101
00102 super.initialize(firstTime);
00103
00104 try {
00105 if ((ftpImplName != null) && (ftpImplName.length() > 0))
00106 transfer = (TransferItf) Class.forName(ftpImplName).newInstance();
00107 } catch (Exception exc) {
00108 transfer = null;
00109 logger.log(BasicLevel.ERROR,
00110 "--- " + this + " initialize : transfer = null" ,exc);
00111 }
00112
00113 if (logger.isLoggable(BasicLevel.DEBUG))
00114 logger.log(BasicLevel.DEBUG,
00115 "--- " + this + " initialize transfer = "+ transfer);
00116
00117 if (transfer != null) {
00118 if (logger.isLoggable(BasicLevel.DEBUG))
00119 logger.log(BasicLevel.DEBUG,
00120 "--- " + this + " initialize : transferTable = " + transferTable);
00121
00122 for (Enumeration e = transferTable.elements(); e.hasMoreElements(); ) {
00123 Message msg = (Message) e.nextElement();
00124 FtpMessage ftpMsg = new FtpMessage(msg);
00125 FtpThread t = new FtpThread(transfer,
00126 (FtpMessage) ftpMsg.clone(),
00127 getId(),
00128 dmq,
00129 clientContext,
00130 requestId,
00131 user,
00132 pass,
00133 path);
00134 t.start();
00135 }
00136 }
00137 }
00138
00139 public String toString() {
00140 return "FtpQueueImpl:" + getId().toString();
00141 }
00142
00143 public void ftpNot(FtpNot not) {
00144 if (logger.isLoggable(BasicLevel.DEBUG))
00145 logger.log(BasicLevel.DEBUG, "--- " + this +
00146 " ftpNot(" + not + ")\n" +
00147 "transferTable = " + transferTable);
00148 Message msg = (Message) not.getMessages().get(0);
00149 storeMessage(new org.objectweb.joram.mom.messages.Message(msg));
00150 deliverMessages(0);
00151 transferTable.remove(new FtpMessage(msg).getIdentifier());
00152
00153 if (logger.isLoggable(BasicLevel.DEBUG))
00154 logger.log(BasicLevel.DEBUG,
00155 "--- " + this + " doProcess : transferTable = " + transferTable);
00156 }
00157
00158 public ClientMessages preProcess(AgentId from, ClientMessages not) {
00159 for (Enumeration msgs = not.getMessages().elements();
00160 msgs.hasMoreElements();) {
00161 Message msg = (Message) msgs.nextElement();
00162 if (isFtpMsg(msg)) {
00163 doProcessFtp(not,msg);
00164 not.getMessages().remove(msg);
00165 }
00166 }
00167 if (not.getMessages().size() > 0) {
00168 return not;
00169 }
00170 return null;
00171 }
00172
00173 protected boolean isFtpMsg(Message message) {
00174 FtpMessage msg = new FtpMessage(message);
00175 return (msg.propertyExists(SharedObj.url) &&
00176 msg.propertyExists(SharedObj.crc) &&
00177 msg.propertyExists(SharedObj.ack));
00178 }
00179
00180 protected void doProcessFtp(ClientMessages not,
00181 Message msg) {
00182
00183 if (logger.isLoggable(BasicLevel.DEBUG))
00184 logger.log(BasicLevel.DEBUG,
00185 "--- " + this + " doProcessFtp(" + not + "," + msg + ")");
00186
00187 if (transfer != null) {
00188 dmq = not.getDMQId();
00189 if (dmq == null && super.dmqId != null)
00190 dmq = super.dmqId;
00191 else if ( dmq == null)
00192 dmq = QueueImpl.getDefaultDMQId();
00193
00194 clientContext = not.getClientContext();
00195 requestId = not.getRequestId();
00196
00197 FtpMessage ftpMsg = new FtpMessage(msg);
00198 transferTable.put(ftpMsg.getIdentifier(),ftpMsg);
00199 FtpThread t = new FtpThread(transfer,
00200 (FtpMessage) ftpMsg.clone(),
00201 getId(),
00202 dmq,
00203 clientContext,
00204 requestId,
00205 user,
00206 pass,
00207 path);
00208 t.start();
00209 } else {
00210 DMQManager dmqManager = new DMQManager(not.getDMQId(), dmqId, getId());
00211 nbMsgsSentToDMQSinceCreation++;
00212 dmqManager.addDeadMessage(msg, MessageErrorConstants.NOT_WRITEABLE);
00213 dmqManager.sendToDMQ();
00214 }
00215 }
00216 }