View Javadoc

1   /**
2    * The contents of this file are subject to the Mozilla Public License Version 1.1
3    * (the "License"); you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at http://www.mozilla.org/MPL/
5    * Software distributed under the License is distributed on an "AS IS" basis,
6    * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the
7    * specific language governing rights and limitations under the License.
8    *
9    * The Original Code is ""  Description:
10   * ""
11   *
12   * The Initial Developer of the Original Code is University Health Network. Copyright (C)
13   * 2001.  All Rights Reserved.
14   *
15   * Contributor(s): ______________________________________.
16   *
17   * Alternatively, the contents of this file may be used under the terms of the
18   * GNU General Public License (the  "GPL"), in which case the provisions of the GPL are
19   * applicable instead of those above.  If you wish to allow use of your version of this
20   * file only under the terms of the GPL and not to allow others to use your version
21   * of this file under the MPL, indicate your decision by deleting  the provisions above
22   * and replace  them with the notice and other provisions required by the GPL License.
23   * If you do not delete the provisions above, a recipient may use your version of
24   * this file under either the MPL or the GPL.
25   */
26  package ca.uhn.hl7v2.testpanel.model.conn;
27  
28  import java.awt.EventQueue;
29  import java.io.IOException;
30  import java.io.StringReader;
31  import java.io.StringWriter;
32  import java.util.Date;
33  import java.util.LinkedList;
34  import java.util.concurrent.CountDownLatch;
35  import java.util.concurrent.TimeUnit;
36  
37  import javax.xml.bind.JAXB;
38  import javax.xml.bind.annotation.XmlAccessType;
39  import javax.xml.bind.annotation.XmlAccessorType;
40  import javax.xml.bind.annotation.XmlType;
41  
42  import org.slf4j.Logger;
43  import org.slf4j.LoggerFactory;
44  
45  import ca.uhn.hl7v2.HL7Exception;
46  import ca.uhn.hl7v2.app.Connection;
47  import ca.uhn.hl7v2.app.ConnectionHub;
48  import ca.uhn.hl7v2.llp.LLPException;
49  import ca.uhn.hl7v2.llp.LowerLayerProtocol;
50  import ca.uhn.hl7v2.model.Message;
51  import ca.uhn.hl7v2.parser.EncodingCharacters;
52  import ca.uhn.hl7v2.parser.Parser;
53  import ca.uhn.hl7v2.testpanel.model.ActivityIncomingMessage;
54  import ca.uhn.hl7v2.testpanel.model.ActivityInfo;
55  import ca.uhn.hl7v2.testpanel.model.ActivityInfoError;
56  import ca.uhn.hl7v2.testpanel.model.ActivityOutgoingMessage;
57  import ca.uhn.hl7v2.testpanel.model.msg.AbstractMessage;
58  import ca.uhn.hl7v2.testpanel.model.msg.Hl7V2MessageBase;
59  import ca.uhn.hl7v2.testpanel.model.msg.Hl7V2MessageCollection;
60  import ca.uhn.hl7v2.testpanel.ui.IDestroyable;
61  import ca.uhn.hl7v2.testpanel.util.IProgressCallback.OperationCancelRequestedException;
62  import ca.uhn.hl7v2.testpanel.util.ISendProgressCallback;
63  import ca.uhn.hl7v2.util.SocketFactory;
64  
65  @XmlAccessorType(XmlAccessType.FIELD)
66  @XmlType(name = "OutboundConnection")
67  public class OutboundConnection extends AbstractConnection implements IDestroyable {
68  
69  	private static final Logger ourLog = LoggerFactory.getLogger(OutboundConnection.class);
70  
71  	private transient Connection myConnection;
72  	private transient ConnectionMonitorThread myConnectionMonitorThread;
73  	private transient Parser myParser;
74  	private transient MessageSenderThread myMessageSenderThread;
75  
76  
77  	public OutboundConnection() {
78  	}
79  
80  
81  	@Override
82  	public String exportConfigToXml() {
83  		StringWriter writer = new StringWriter();
84  		JAXB.marshal(this, writer);
85  		return writer.toString();
86  	}
87  
88  
89  	public static OutboundConnection fromXml(String theXml) {
90  		return JAXB.unmarshal(new StringReader(theXml), OutboundConnection.class);
91  	}
92  
93  
94  	public synchronized void sendMessages(Hl7V2MessageCollection theMessages, ISendProgressCallback theTransmissionCallback) {
95  		if (myMessageSenderThread != null) {
96  			throw new IllegalStateException("Already sending messages");
97  		}
98  		myMessageSenderThread = new MessageSenderThread(theMessages, theTransmissionCallback);
99  		myMessageSenderThread.start();
100 	}
101 
102 
103 	public void start() {
104 		super.start();
105 
106 		if (myConnectionMonitorThread != null) {
107 			return;
108 		}
109 
110 		myParser = createParser();
111 
112 		ourLog.info("Starting up outgoing interface {}", getName());
113 
114 		myConnectionMonitorThread = new ConnectionMonitorThread();
115 		setStatus(StatusEnum.TRYING_TO_START);
116 		myConnectionMonitorThread.start();
117 	}
118 
119 
120 	public void stop() {
121 		super.stop();
122 
123 		ourLog.info("Shutting down outgoing interface {}", getName());
124 
125 		ConnectionMonitorThread thread = myConnectionMonitorThread;
126 		myConnectionMonitorThread = null;
127 
128 		if (thread != null) {
129 			thread.interrupt();
130 		}
131 
132 	}
133 
134 	private class ConnectionMonitorThread extends Thread {
135 
136 		private CountDownLatch myStartupLatch = new CountDownLatch(1);
137 
138 
139 		private void doRun() throws LLPException {
140 			LowerLayerProtocol llpClass = null;
141 			llpClass = createLlp();
142 
143 			boolean tls = isTls();
144 
145 			String desc = OutboundConnection.this.isNameIsExplicitlySet() ? OutboundConnection.this.getName() + " (" + OutboundConnection.this.createDescription() + ")" : OutboundConnection.this.createDescription();
146 			ourLog.info("Starting outbound interface " + desc);
147 
148 			Connection connection = null;
149 			while (myConnectionMonitorThread == this) {
150 
151 				if (getStatus() == StatusEnum.TRYING_TO_START) {
152 					String msg = "Attempting outbound connection to " + desc;
153 					ourLog.info(msg);
154 					addActivityInfoInSwingThread(msg);
155 
156 					try {
157 
158 						SocketFactory socketFactory = getSocketFactory();
159 
160 						if (isDualPort()) {
161 							connection = ConnectionHub.getInstance().attach(getHost(), getOutgoingPort(), getIncomingOrSinglePort(), myParser, llpClass, tls, socketFactory);
162 						} else {
163 							connection = ConnectionHub.getInstance().attach(getHost(), getIncomingOrSinglePort(), myParser, llpClass, tls, socketFactory);
164 						}
165 
166 						myConnection = connection;
167 
168 						msg = "Successfully connected to " + createDescription();
169 						ourLog.info(msg);
170 						addActivityInfoInSwingThread(msg);
171 
172 						myStartupLatch.countDown();
173 
174 					} catch (HL7Exception e) {
175 						Throwable ex = e;
176 						if (e.getCause() != null) {
177 							ex = e.getCause();
178 						}
179 
180 						ourLog.warn("Failed to connect to " + createDescription() + " - Message was " + ex.getMessage());
181 						addActivity(new ActivityInfoError(new Date(), "Failed to connect to " + createDescription() + " - " + ex.getMessage()));
182 					}
183 
184 					if (myConnection != null) {
185 						if (myConnection.isOpen()) {
186 							setStatus(StatusEnum.STARTED);
187 							setStatusLine("Connected");
188 						} else {
189 							setStatus(StatusEnum.TRYING_TO_START);
190 							setStatusLine("Lost connection, retrying...");
191 						}
192 					}
193 					if (myConnection == null) {
194 						setStatus(StatusEnum.TRYING_TO_START);
195 						setStatusLine("Trying to connect...");
196 					}
197 
198 					try {
199 						Thread.sleep(1000);
200 					} catch (InterruptedException e) {
201 						// ignore
202 					}
203 
204 				}
205 
206 				try {
207 					Thread.sleep(250);
208 				} catch (InterruptedException e) {
209 					// ignore
210 				}
211 
212 			}
213 
214 		}
215 
216 
217 		/**
218 		 * {@inheritDoc}
219 		 */
220 		@Override
221 		public void run() {
222 			setStatusLine("Trying to connect...");
223 
224 			try {
225 
226 				doRun();
227 				setStatusLine("Connection stopped");
228 
229 			} catch (Throwable e) {
230 				ourLog.error("Connection failed with an unexpected error!", e);
231 				setStatusLine("Failed with an error: " + e.getMessage());
232 			} finally {
233 				if (myConnection != null) {
234 					ConnectionHub.getInstance().discard(myConnection);
235 				}
236 				myConnectionMonitorThread = null;
237 				setStatus(StatusEnum.STOPPED);
238 			}
239 		}
240 
241 
242 		/**
243 		 * @return Returns true if we got a connection
244 		 */
245 		public boolean waitUntilWeHaveAConnection() {
246 			try {
247 				return myStartupLatch.await(5000, TimeUnit.MILLISECONDS);
248 			} catch (InterruptedException e) {
249 				return false;
250 			}
251 		}
252 
253 	}
254 
255 	public class MessageSenderThread extends Thread {
256 
257 		private Hl7V2MessageCollection myMessages;
258 		private ISendProgressCallback myTransmissionCallback;
259 		private boolean myCancelled;
260 		private long myStartTime;
261 		private StatusEnum myInitialStatus;
262 		private int myTotalMessages;
263 		private int mySentMessages;
264 		private LinkedList<Integer> myResponseTimes = new LinkedList<Integer>();
265 		private long myLastUpdate = 0;
266 
267 
268 		public MessageSenderThread(Hl7V2MessageCollection theMessages, ISendProgressCallback theTransmissionCallback) {
269 			myMessages = theMessages;
270 			myTransmissionCallback = theTransmissionCallback;
271 		}
272 
273 
274 		@Override
275 		public void run() {
276 
277 			EventQueue.invokeLater(new Runnable() {
278 				@Override
279 				public void run() {
280 					myTransmissionCallback.activityStarted();
281 				}
282 			});
283 
284 			try {
285 
286 				if (!doStart()) {
287 					return;
288 				}
289 
290 				int sendNumberOfTimes = myMessages.getSendNumberOfTimes();
291 
292 				myTotalMessages = myMessages.countMessagesOfType(Hl7V2MessageBase.class) * sendNumberOfTimes;
293 				mySentMessages = 0;
294 
295 				myStartTime = System.currentTimeMillis();
296 				for (int curRep = 1; curRep <= sendNumberOfTimes; curRep++) {
297 					doSend();
298 				}
299 
300 				sendUpdate(1.0);
301 				
302 				doStop();
303 
304 			} finally {
305 				EventQueue.invokeLater(new Runnable() {
306 					@Override
307 					public void run() {
308 						myTransmissionCallback.activityStopped();
309 					}
310 				});
311 				synchronized (OutboundConnection.this) {
312 					myMessageSenderThread = null;
313 				}
314 
315 				ourLog.info("Transmission thread shutting down");
316 			}
317 
318 		}
319 
320 
321 		private void doStop() {
322 			long delay = System.currentTimeMillis() - myStartTime;
323 			int i = myTotalMessages;
324 
325 			StringBuilder b = new StringBuilder();
326 			b.append("Sent ");
327 			b.append(i);
328 			b.append(" message");
329 			b.append((i != 1 ? "s" : ""));
330 			b.append(" in ");
331 			b.append(delay);
332 			b.append("ms");
333 			if (mySentMessages > 0) {
334 				b.append("<br/>");
335 				b.append("Average: ");
336 				b.append(delay / mySentMessages);
337 				b.append("ms / message");
338 			}
339 			
340 			addActivity(new ActivityInfo(new Date(), b.toString()));
341 
342 			if (myInitialStatus != StatusEnum.STARTED) {
343 				OutboundConnection.this.stop();
344 			}
345 		}
346 
347 
348 		private boolean doStart() {
349 			myInitialStatus = getStatus();
350 			if (myInitialStatus != StatusEnum.STARTED) {
351 				OutboundConnection.this.start();
352 			}
353 
354 			if (myConnectionMonitorThread == null) {
355 				throw new IllegalStateException("Interface not started");
356 			}
357 
358 			if (getStatus() != StatusEnum.STARTED) {
359 				ourLog.info("Waiting for interface {} to start...", OutboundConnection.this.getName());
360 				addActivity(new ActivityInfo(new Date(), "Starting interface \"" + OutboundConnection.this.getName() + "\"..."));
361 
362 				boolean gotConnection = myConnectionMonitorThread.waitUntilWeHaveAConnection();
363 				if (!gotConnection) {
364 					ourLog.info("Failed to connect to {}, shutting down interface and aborting send", createDescription());
365 					addActivity(new ActivityInfoError(new Date(), "Failed to connect to interface. Aborting send."));
366 					OutboundConnection.this.stop();
367 					return false;
368 				}
369 			}
370 
371 			return true;
372 		}
373 
374 
375 		private void doSend() {
376 			int i = 0;
377 			for (AbstractMessage<?> abstractMessage : myMessages.getMessages()) {
378 				if (myCancelled) {
379 					return;
380 				}
381 
382 				final double complete = (((double) mySentMessages) / myTotalMessages);
383 				long now = System.currentTimeMillis();
384 				long elapsedSinceLastUpdate = now - myLastUpdate;
385 				if (elapsedSinceLastUpdate > 1000) {
386 					sendUpdate(complete);
387 					myLastUpdate = now;
388 				}
389 
390 				i++;
391 
392 				if (abstractMessage instanceof Hl7V2MessageBase) {
393 
394 					Message msg = ((Hl7V2MessageBase) abstractMessage).getParsedMessage();
395 					ourLog.info("Sending message " + i + "/" + myTotalMessages + " of type " + msg.getClass());
396 					try {
397 
398 						beforeProcessingNewMessageOut();
399 						addActivity(new ActivityOutgoingMessage(new Date(), getEncoding(), myParser.encode(msg), EncodingCharacters.getInstance(msg)));
400 
401 						long beforeSend = now;
402 						Message response = myConnection.getInitiator().sendAndReceive(msg);
403 
404 						mySentMessages++;
405 
406 						long sendTime = now - beforeSend;
407 						myResponseTimes.add((int) sendTime);
408 						while (myResponseTimes.size() > 100) {
409 							myResponseTimes.pop();
410 						}
411 
412 						beforeProcessingNewMessageIn();
413 						addActivity(new ActivityIncomingMessage(new Date(), getEncoding(), myParser.encode(response), EncodingCharacters.getInstance(response)));
414 
415 					} catch (HL7Exception e) {
416 						ourLog.error("Failed to transmit message. ", e);
417 						addActivity(new ActivityInfoError(new Date(), "Failed to transmit message. " + e.getMessage()));
418 					} catch (LLPException e) {
419 						ourLog.error("Failed to transmit message. ", e);
420 						addActivity(new ActivityInfoError(new Date(), "Failed to transmit message. " + e.getMessage()));
421 					} catch (IOException e) {
422 						ourLog.error("Failed to transmit message. Shutting down interface. ", e);
423 						addActivity(new ActivityInfoError(new Date(), "Failed to transmit message. Shutting down interface. " + e.getMessage()));
424 						OutboundConnection.this.stop();
425 					}
426 
427 				} else {
428 
429 					ourLog.info("Skipping unknown message");
430 
431 				}
432 
433 			}
434 
435 		}
436 
437 
438 		private void sendUpdate(final double complete) {
439 			long now = System.currentTimeMillis();
440 	        long elapsed = now - myStartTime;
441 	        if (elapsed == 0) {
442 	        	elapsed = 1;
443 	        }
444 	        long throughputPerSecond = (mySentMessages * 1000) / elapsed;
445 	        long avgSendTime = 0;
446 	        if (myResponseTimes.size() > 0) {
447 	        	long total = 0;
448 	        	for (Integer next : myResponseTimes) {
449 	        		total += next;
450 	        	}
451 	        	avgSendTime = total / myResponseTimes.size();
452 	        }
453 	        final int avgSendTimeF = (int) avgSendTime;
454 	        final int throughputPerSecondF = (int) throughputPerSecond;
455 	        EventQueue.invokeLater(new Runnable() {
456 	        	@Override
457 	        	public void run() {
458 	        		myTransmissionCallback.updateAvgResponseTimeMillis(avgSendTimeF);
459 	        		myTransmissionCallback.updateAvgThroughputPerSecond(throughputPerSecondF);
460 	        	}
461 	        });
462 	        
463 	        EventQueue.invokeLater(new Runnable() {
464 	        	@Override
465 	        	public void run() {
466 	        		try {
467 	        			myTransmissionCallback.progressUpdate(complete);
468 	        		} catch (OperationCancelRequestedException e) {
469 	        			ourLog.info("Detected that transmission cancel was requested");
470 	        			myCancelled = true;
471 	        		}
472 	        	}
473 	        });
474         }
475 
476 	}
477 
478 }