Coverage Report - ca.uhn.hl7v2.app.HL7Service
 
Classes in this File Line Coverage Branch Coverage Complexity
HL7Service
48%
45/93
32%
9/28
2
HL7Service$ConnectionCleaner
100%
21/21
100%
4/4
2
 
 1  
 /**
 2  
 The contents of this file are subject to the Mozilla Public License Version 1.1 
 3  
 (the "License"); you may not use this file except in compliance with the License. 
 4  
 You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
 5  
 Software distributed under the License is distributed on an "AS IS" basis, 
 6  
 WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
 7  
 specific language governing rights and limitations under the License. 
 8  
 
 9  
 The Original Code is "HL7Service.java".  Description: 
 10  
 "Accepts incoming TCP/IP connections and creates Connection objects" 
 11  
 
 12  
 The Initial Developer of the Original Code is University Health Network. Copyright (C) 
 13  
 2001.  All Rights Reserved. 
 14  
 
 15  
 Contributor(s): Kyle Buza 
 16  
 
 17  
 Alternatively, the contents of this file may be used under the terms of the 
 18  
 GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
 19  
 applicable instead of those above.  If you wish to allow use of your version of this 
 20  
 file only under the terms of the GPL and not to allow others to use your version 
 21  
 of this file under the MPL, indicate your decision by deleting  the provisions above 
 22  
 and replace  them with the notice and other provisions required by the GPL License.  
 23  
 If you do not delete the provisions above, a recipient may use your version of 
 24  
 this file under either the MPL or the GPL. 
 25  
 
 26  
  */
 27  
 
 28  
 package ca.uhn.hl7v2.app;
 29  
 
 30  
 import java.io.BufferedReader;
 31  
 import java.io.File;
 32  
 import java.io.FileReader;
 33  
 import java.io.IOException;
 34  
 import java.net.ServerSocket;
 35  
 import java.util.ArrayList;
 36  
 import java.util.Iterator;
 37  
 import java.util.List;
 38  
 import java.util.NoSuchElementException;
 39  
 import java.util.StringTokenizer;
 40  
 import java.util.concurrent.ExecutorService;
 41  
 
 42  
 import org.slf4j.Logger;
 43  
 import org.slf4j.LoggerFactory;
 44  
 
 45  
 import ca.uhn.hl7v2.HL7Exception;
 46  
 import ca.uhn.hl7v2.HapiContext;
 47  
 import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
 48  
 import ca.uhn.hl7v2.concurrent.Service;
 49  
 import ca.uhn.hl7v2.llp.LowerLayerProtocol;
 50  
 import ca.uhn.hl7v2.parser.Parser;
 51  
 import ca.uhn.hl7v2.protocol.ApplicationRouter.AppRoutingData;
 52  
 import ca.uhn.hl7v2.protocol.ReceivingApplication;
 53  
 import ca.uhn.hl7v2.protocol.ReceivingApplicationExceptionHandler;
 54  
 import ca.uhn.hl7v2.protocol.impl.AppRoutingDataImpl;
 55  
 import ca.uhn.hl7v2.protocol.impl.AppWrapper;
 56  
 import ca.uhn.hl7v2.protocol.impl.ApplicationRouterImpl;
 57  
 
 58  
 /**
 59  
  * <p>
 60  
  * An HL7 service. Accepts incoming TCP/IP connections and creates Connection
 61  
  * objects. Uses a single MessageTypeRouter object (for all Connections) to
 62  
  * define the Applications to which message are sent. To configure, use
 63  
  * registerApplication() or loadApplicationsFromFile().
 64  
  * </p>
 65  
  * </p>A separate thread looks for Connections that have been closed (locally or
 66  
  * remotely) and discards them. </p>
 67  
  * 
 68  
  * @author Bryan Tripp
 69  
  * @author Christian Ohr
 70  
  */
 71  75
 public abstract class HL7Service extends Service {
 72  
 
 73  1
         private static final Logger log = LoggerFactory.getLogger(HL7Service.class);
 74  
 
 75  
         private final List<Connection> connections;
 76  
         private final Parser parser;
 77  
         private final LowerLayerProtocol llp;
 78  
         private final List<ConnectionListener> listeners;
 79  
         private final ConnectionCleaner cleaner;
 80  
         private final ApplicationRouterImpl applicationRouter;
 81  
 
 82  
         public HL7Service(HapiContext theHapiContext) {
 83  11
                 this(theHapiContext.getGenericParser(), theHapiContext.getLowerLayerProtocol(), theHapiContext.getExecutorService());
 84  11
         }
 85  
 
 86  
         /** Creates a new instance of Server using a default thread pool */
 87  
         public HL7Service(Parser parser, LowerLayerProtocol llp) {
 88  0
                 this(parser, llp, DefaultExecutorService.getDefaultService());
 89  0
         }
 90  
 
 91  
         /** Creates a new instance of Server */
 92  
         public HL7Service(Parser parser, LowerLayerProtocol llp,
 93  
                         ExecutorService executorService) {
 94  18
                 super("HL7 Server", executorService);
 95  18
                 this.connections = new ArrayList<Connection>();
 96  18
                 this.listeners = new ArrayList<ConnectionListener>();
 97  18
                 this.parser = parser;
 98  18
                 this.llp = llp;
 99  18
                 this.applicationRouter = new ApplicationRouterImpl(parser);
 100  18
                 this.cleaner = new ConnectionCleaner(this);
 101  
                 
 102  
                 // 960101
 103  18
                 assert !this.cleaner.isRunning();
 104  18
         }
 105  
 
 106  
         /**
 107  
          * Called after startup before the thread enters its main loop. This
 108  
          * implementation launches a cleaner thread that removes stale connections
 109  
          * from the connection list. Override to initialize resources for the
 110  
          * running thread, e.g. opening {@link ServerSocket}s etc.
 111  
          */
 112  
         @Override
 113  
         protected void afterStartup() {
 114  
                 // Fix for bug 960101: Don't start the cleaner thread until the
 115  
                 // server is started.
 116  16
                 cleaner.start();
 117  16
         }
 118  
 
 119  
         /**
 120  
          * Called after the thread has left its main loop. This implementation stops
 121  
          * the connection cleaner thread and closes any open connections. Override
 122  
          * to clean up additional resources from the running thread, e.g. closing
 123  
          * {@link ServerSocket}s.
 124  
          */
 125  
         @Override
 126  
         protected void afterTermination() {
 127  12
                 super.afterTermination();
 128  12
                 cleaner.stopAndWait();
 129  12
                 for (Connection c : connections) {
 130  7
                         c.close();
 131  7
                 }
 132  12
         }
 133  
 
 134  
         /**
 135  
          * Returns true if the thread should continue to run, false otherwise (ie if
 136  
          * stop() has been called).
 137  
          * 
 138  
          * @deprecated Use {@link #isRunning()}. Deprecated as of version 0.6.
 139  
          */
 140  
         protected boolean keepRunning() {
 141  0
                 return isRunning();
 142  
         }
 143  
 
 144  
         LowerLayerProtocol getLlp() {
 145  36
                 return llp;
 146  
         }
 147  
         
 148  
         Parser getParser() {
 149  36
                 return parser;
 150  
         }
 151  
         
 152  
         /**
 153  
          * Called by subclasses when a new Connection is made. Registers the
 154  
          * MessageTypeRouter with the given Connection and stores it.
 155  
          */
 156  
         public synchronized void newConnection(Connection c) {
 157  36
                 c.getResponder().setApplicationRouter(applicationRouter);
 158  36
                 c.activate();
 159  36
                 connections.add(c); // keep track of connections
 160  36
                 notifyListeners(c);
 161  36
         }
 162  
 
 163  
         /**
 164  
          * Returns a connection to a remote host that was initiated by the given
 165  
          * remote host. If the connection has not been made, this method blocks
 166  
          * until the remote host connects. TODO currently nobody calls this...
 167  
          */
 168  
         public Connection getRemoteConnection(String IP) {
 169  0
                 Connection conn = null;
 170  0
                 while (conn == null) {
 171  
                         // check all connections ...
 172  0
                         int c = 0;
 173  0
                         synchronized (this) {
 174  0
                                 while (conn == null && c < connections.size()) {
 175  0
                                         Connection nextConn = connections.get(c);
 176  0
                                         if (nextConn.getRemoteAddress().getHostAddress().equals(IP))
 177  0
                                                 conn = nextConn;
 178  0
                                         c++;
 179  0
                                 }
 180  0
                         }
 181  
 
 182  0
                         if (conn == null) {
 183  
                                 try {
 184  0
                                         Thread.sleep(100);
 185  0
                                 } catch (InterruptedException e) {
 186  
                     // don't care
 187  0
                                 }
 188  
                         }
 189  0
                 }
 190  0
                 return conn;
 191  
         }
 192  
 
 193  
         /** Returns all currently active connections. */
 194  
         public synchronized List<Connection> getRemoteConnections() {
 195  888
                 return connections;
 196  
         }
 197  
 
 198  
         /**
 199  
          * Registers the given ConnectionListener with the HL7Service - when a
 200  
          * remote host makes a new Connection, all registered listeners will be
 201  
          * notified.
 202  
          */
 203  
         public synchronized void registerConnectionListener(
 204  
                         ConnectionListener listener) {
 205  1
                 listeners.add(listener);
 206  1
         }
 207  
 
 208  
         /** Notifies all listeners that a Connection is new or discarded. */
 209  
         private void notifyListeners(Connection c) {
 210  65
                 for (ConnectionListener cl : listeners) {
 211  2
                         if (c.isOpen()) {
 212  1
                                 cl.connectionReceived(c);
 213  
                         } else {
 214  1
                                 cl.connectionDiscarded(c);
 215  
                         }
 216  2
                 }
 217  65
         }
 218  
 
 219  
         /**
 220  
          * Registers the given application to handle messages corresponding to the
 221  
          * given type and trigger event. Only one application can be registered for
 222  
          * a given message type and trigger event combination. A repeated
 223  
          * registration for a particular combination of type and trigger event
 224  
          * over-writes the previous one. Note that the wildcard "*" for messageType
 225  
          * or triggerEvent means any type or event, respectively.
 226  
          */
 227  
         public synchronized void registerApplication(String messageType,
 228  
                         String triggerEvent, Application handler) {
 229  9
                 ReceivingApplication handlerWrapper = new AppWrapper(handler);
 230  9
                 applicationRouter.bindApplication(new AppRoutingDataImpl(messageType, triggerEvent, "*", "*"), handlerWrapper);
 231  9
         }
 232  
 
 233  
         /**
 234  
          * Registers the given application to handle messages corresponding to the
 235  
          * given type and trigger event. Only one application can be registered for
 236  
          * a given message type and trigger event combination. A repeated
 237  
          * registration for a particular combination of type and trigger event
 238  
          * over-writes the previous one. Note that the wildcard "*" for messageType
 239  
          * or triggerEvent means any type or event, respectively.
 240  
          */
 241  
         public void registerApplication(String messageType, String triggerEvent, ReceivingApplication handler) {
 242  0
                 applicationRouter.bindApplication(new AppRoutingDataImpl(messageType, triggerEvent, "*", "*"), handler);
 243  0
         }
 244  
 
 245  
         /**
 246  
          * Registers the given application to handle messages corresponding to ALL
 247  
          * message types and trigger events.
 248  
          */
 249  
         public synchronized void registerApplication(AppRoutingData appRouting, ReceivingApplication application) {
 250  3
                 if (appRouting == null) {
 251  0
                         throw new NullPointerException("appRouting can not be null");
 252  
                 }
 253  3
                 applicationRouter.bindApplication(appRouting, application);
 254  3
         }
 255  
 
 256  
         /**
 257  
          * Registers the given application to handle messages corresponding to ALL
 258  
          * message types and trigger events.
 259  
          */
 260  
         public synchronized void registerApplication(ReceivingApplication application) {
 261  3
                 registerApplication(new AppRoutingDataImpl("*", "*", "*", "*"), application);
 262  3
         }
 263  
         
 264  
     /**
 265  
      * Sets an exception handler which will be invoked in the event of a
 266  
      * failure during parsing, processing, or encoding of an
 267  
      * incoming message or its response.
 268  
      */
 269  
         public synchronized void setExceptionHandler(ReceivingApplicationExceptionHandler exHandler) {
 270  0
                 applicationRouter.setExceptionHandler(exHandler);
 271  0
         }
 272  
         
 273  
         
 274  
         /**
 275  
          * <p>
 276  
          * A convenience method for registering applications (using
 277  
          * <code>registerApplication()
 278  
          * </code>) with this service. Information about which Applications should
 279  
          * handle which messages is read from the given text file. Each line in the
 280  
          * file should have the following format (entries tab delimited):
 281  
          * </p>
 282  
          * <p>
 283  
          * message_type &#009; trigger_event &#009; application_class
 284  
          * </p>
 285  
          * <p>
 286  
          * message_type &#009; trigger_event &#009; application_class
 287  
          * </p>
 288  
          * <p>
 289  
          * Note that message type and event can be the wildcard "*", which means
 290  
          * any.
 291  
          * </p>
 292  
          * <p>
 293  
          * For example, if you write an Application called
 294  
          * org.yourorganiztion.ADTProcessor that processes several types of ADT
 295  
          * messages, and another called org.yourorganization.ResultProcessor that
 296  
          * processes result messages, you might have a file that looks like this:
 297  
          * </p>
 298  
          * <p>
 299  
          * ADT &#009; * &#009; org.yourorganization.ADTProcessor<br>
 300  
          * ORU &#009; R01 &#009; org.yourorganization.ResultProcessor
 301  
          * </p>
 302  
          * <p>
 303  
          * Each class listed in this file must implement Application and must have a
 304  
          * zero-argument constructor.
 305  
          * </p>
 306  
          */
 307  
         public void loadApplicationsFromFile(File f) throws IOException,
 308  
                         HL7Exception, ClassNotFoundException, InstantiationException,
 309  
                         IllegalAccessException {
 310  0
                 BufferedReader in = null;
 311  
                 try {
 312  0
                         in = new BufferedReader(new FileReader(f));
 313  
                         String line;
 314  0
                         while ((line = in.readLine()) != null) {
 315  
                                 // parse application registration information
 316  0
                                 StringTokenizer tok = new StringTokenizer(line, "\t", false);
 317  
                                 String type, event, className;
 318  
         
 319  0
                                 if (tok.hasMoreTokens()) { // skip blank lines
 320  
                                         try {
 321  0
                                                 type = tok.nextToken();
 322  0
                                                 event = tok.nextToken();
 323  0
                                                 className = tok.nextToken();
 324  0
                                         } catch (NoSuchElementException ne) {
 325  0
                                                 throw new HL7Exception(
 326  
                                                                 "Can't register applications from file "
 327  
                                                                                 + f.getName()
 328  
                                                                                 + ". The line '"
 329  
                                                                                 + line
 330  
                                                                                 + "' is not of the form: message_type [tab] trigger_event [tab] application_class.");
 331  0
                                         }
 332  
         
 333  
                                         try {
 334  
                                                 @SuppressWarnings("unchecked")
 335  0
                                                 Class<? extends Application> appClass = (Class<? extends Application>) Class
 336  
                                                                 .forName(className); // may throw
 337  
                                                                                                                 // ClassNotFoundException
 338  0
                                                 Application app = appClass.newInstance();
 339  0
                                                 registerApplication(type, event, app);
 340  0
                                         } catch (ClassCastException cce) {
 341  0
                                                 throw new HL7Exception("The specified class, " + className
 342  
                                                                 + ", doesn't implement Application.");
 343  0
                                         }
 344  
         
 345  
                                 }
 346  0
                         }
 347  
                 } finally {
 348  0
                         if (in != null) {
 349  
                                 try {
 350  0
                                         in.close();
 351  0
                                 } catch (IOException e) {
 352  
                     // don't care
 353  0
                                 }
 354  
                         }
 355  
                 }
 356  0
         }
 357  
 
 358  
         /**
 359  
          * Runnable that looks for closed Connections and discards them. It would be
 360  
          * nice to find a way to externalize this safely so that it could be re-used
 361  
          * by (for example) TestPanel. It could take a Vector of Connections as an
 362  
          * argument, instead of an HL7Service, but some problems might arise if
 363  
          * other threads were iterating through the Vector while this one was
 364  
          * removing elements from it.
 365  
          * 
 366  
          * Note: this could be started as daemon, so we don't need to care about
 367  
          * termination.
 368  
          */
 369  
         private class ConnectionCleaner extends Service {
 370  
 
 371  
                 private final HL7Service service;
 372  
 
 373  18
                 public ConnectionCleaner(HL7Service service) {
 374  18
                         super("ConnectionCleaner", service.getExecutorService());
 375  18
                         this.service = service;
 376  18
                 }
 377  
 
 378  
                 @Override
 379  
                 public void start() {
 380  16
                         log.info("Starting ConnectionCleaner service");
 381  16
                         super.start();
 382  16
                 }
 383  
 
 384  
                 public void handle() {
 385  
                         try {
 386  889
                                 Thread.sleep(500);
 387  884
                                 synchronized (service) {
 388  884
                                         Iterator<Connection> it = service.getRemoteConnections()
 389  
                                                         .iterator();
 390  946
                                         while (it.hasNext()) {
 391  62
                                                 Connection conn = it.next();
 392  62
                                                 if (!conn.isOpen()) {
 393  29
                                                         log.debug(
 394  
                                                                         "Removing connection from {} from connection list",
 395  
                                                                         conn.getRemoteAddress().getHostAddress());
 396  29
                                                         it.remove();
 397  29
                                                         service.notifyListeners(conn);
 398  
                                                 }
 399  62
                                         }
 400  884
                                 }
 401  1
                         } catch (InterruptedException e) {
 402  
                 // don't care
 403  884
                         }
 404  885
                 }
 405  
 
 406  
         }
 407  
 
 408  
 }