Coverage Report - ca.uhn.hl7v2.protocol.impl.ProcessorImpl
 
Classes in this File Line Coverage Branch Coverage Complexity
ProcessorImpl
89%
179/201
71%
89/124
3.808
ProcessorImpl$1
71%
5/7
N/A
3.808
ProcessorImpl$Cycler
100%
13/13
100%
2/2
3.808
ProcessorImpl$ExpiringTransportable
100%
4/4
N/A
3.808
 
 1  
 /*
 2  
 The contents of this file are subject to the Mozilla Public License Version 1.1 
 3  
 (the "License"); you may not use this file except in compliance with the License. 
 4  
 You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
 5  
 Software distributed under the License is distributed on an "AS IS" basis, 
 6  
 WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
 7  
 specific language governing rights and limitations under the License. 
 8  
 
 9  
 The Original Code is "ProcessorImpl.java".  Description: 
 10  
 "A default implementation of Processor." 
 11  
 
 12  
 The Initial Developer of the Original Code is University Health Network. Copyright (C) 
 13  
 2004.  All Rights Reserved. 
 14  
 
 15  
 Contributor(s): ______________________________________. 
 16  
 
 17  
 Alternatively, the contents of this file may be used under the terms of the 
 18  
 GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
 19  
 applicable instead of those above.  If you wish to allow use of your version of this 
 20  
 file only under the terms of the GPL and not to allow others to use your version 
 21  
 of this file under the MPL, indicate your decision by deleting  the provisions above 
 22  
 and replace  them with the notice and other provisions required by the GPL License.  
 23  
 If you do not delete the provisions above, a recipient may use your version of 
 24  
 this file under either the MPL or the GPL. 
 25  
 */
 26  
 
 27  
 package ca.uhn.hl7v2.protocol.impl;
 28  
 
 29  
 import java.util.HashMap;
 30  
 import java.util.Iterator;
 31  
 import java.util.Map;
 32  
 import java.util.concurrent.ExecutorService;
 33  
 import java.util.concurrent.Executors;
 34  
 
 35  
 import org.slf4j.Logger;
 36  
 import org.slf4j.LoggerFactory;
 37  
 
 38  
 import ca.uhn.hl7v2.HL7Exception;
 39  
 import ca.uhn.hl7v2.preparser.PreParser;
 40  
 import ca.uhn.hl7v2.protocol.Processor;
 41  
 import ca.uhn.hl7v2.protocol.ProcessorContext;
 42  
 import ca.uhn.hl7v2.protocol.TransportException;
 43  
 import ca.uhn.hl7v2.protocol.TransportLayer;
 44  
 import ca.uhn.hl7v2.protocol.Transportable;
 45  
 
 46  
 /**
 47  
  * A default implementation of <code>Processor</code>.  
 48  
  *  
 49  
  * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
 50  
  * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
 51  
  */
 52  5
 public class ProcessorImpl implements Processor {
 53  
 
 54  1
     private static final Logger log = LoggerFactory.getLogger(ProcessorImpl.class);
 55  
 
 56  
     private ProcessorContext myContext;
 57  
     private final Map<String, ExpiringTransportable> myAcceptAcks;
 58  
     private final Map<String, Long> myReservations;
 59  
     private final Map<String, ExpiringTransportable> myAvailableMessages;
 60  
     private boolean myThreaded; //true if separate threads are calling cycle()  
 61  
     private Cycler ackCycler;
 62  
     private Cycler nonAckCycler;
 63  
     private ExecutorService myResponseExecutorService;
 64  
     
 65  
     /**
 66  
      * @param theContext source of supporting services 
 67  
      * @param isThreaded true if this class should create threads in which to call cycle(), and 
 68  
      *  in which to send responses from Applications.  This is the preferred mode.  Use false 
 69  
      *  if threading is not allowed, eg you are running the code in an EJB container.  In this case, 
 70  
      *  the send() and receive() methods will call cycle() themselves as needed.  However, cycle() 
 71  
      *  makes potentially blocking calls, so these methods may not return until the next message 
 72  
      *  is received from the remote server, regardless of timeout.  Probably the worst example of this
 73  
      *  would be if receive() was called to wait for an application ACK that was specified as "RE" (ie
 74  
      *  required on error).  No response will be returned if the message is processed without error, 
 75  
      *  and in a non-threaded environment, receive() will block forever.  Use true if you can, otherwise
 76  
      *  study this class carefully.
 77  
      *   
 78  
      * TODO: write a MLLPTransport with non-blocking IO  
 79  
      * TODO: reconnect transport layers on error and retry 
 80  
      */
 81  14
     public ProcessorImpl(ProcessorContext theContext, boolean isThreaded) {
 82  14
         myContext = theContext;
 83  14
         myThreaded = isThreaded;
 84  14
         myAcceptAcks = new HashMap<String, ExpiringTransportable>();
 85  14
         myReservations = new HashMap<String, Long>();
 86  14
         myAvailableMessages = new HashMap<String, ExpiringTransportable>();
 87  
         
 88  14
         if (isThreaded) {
 89  14
             myResponseExecutorService = Executors.newSingleThreadExecutor(); 
 90  
 
 91  14
                 ackCycler = new Cycler(this, true);
 92  14
             Thread ackThd = new Thread(ackCycler);
 93  14
             ackThd.start();
 94  14
             nonAckCycler = new Cycler(this, false);
 95  14
             Thread nonAckThd = new Thread(nonAckCycler);
 96  14
             nonAckThd.start();            
 97  
         }
 98  14
     }
 99  
     
 100  
     /**
 101  
      * If self-threaded, stops threads that have been created.  
 102  
      */
 103  
     public void stop() {
 104  13
         if (myThreaded) {
 105  13
             ackCycler.stop();
 106  13
             nonAckCycler.stop();
 107  
 
 108  13
             myResponseExecutorService.shutdownNow();
 109  
         }
 110  13
     }
 111  
 
 112  
     /**
 113  
      * @see ca.uhn.hl7v2.protocol.Processor#send(ca.uhn.hl7v2.protocol.Transportable, int, long)
 114  
      */
 115  
     public void send(Transportable theMessage, int maxRetries, long retryIntervalMillis) throws HL7Exception {
 116  12
         String[] fieldPaths = {"MSH-10", "MSH-15", "MSH-16"};
 117  12
         String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
 118  12
         String controlId = fields[0];
 119  12
         String needAcceptAck = fields[1];
 120  12
         String needAppAck = fields[2];
 121  
         
 122  12
         checkValidAckNeededCode(needAcceptAck);
 123  
         
 124  12
         trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
 125  
         
 126  12
         boolean originalMode = (needAcceptAck == null && needAppAck == null); 
 127  12
         if (originalMode || !NE.equals(needAcceptAck)) {
 128  
         
 129  10
             Transportable response = null;
 130  10
             int retries = 0;
 131  
             do {
 132  24
                 long until = System.currentTimeMillis() + retryIntervalMillis;
 133  39015
                 while (response == null && System.currentTimeMillis() < until) {
 134  38991
                     synchronized (this) {
 135  38991
                         ExpiringTransportable et = myAcceptAcks.remove(controlId);
 136  38991
                         if (et == null) {
 137  38985
                             cycleIfNeeded(true);
 138  
                         } else {
 139  6
                             response = et.transportable;
 140  
                         }
 141  38991
                     }
 142  38991
                     sleepIfNeeded();
 143  
                 }
 144  
                 
 145  24
                 if ((response == null && needAcceptAck != null && needAcceptAck.equals(AL))
 146  
                         || (response != null && isReject(response))) {
 147  3
                     log.info("Resending message {}", controlId);
 148  3
                     trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
 149  3
                     response = null;                    
 150  
                 }
 151  
                 
 152  24
                 if (response != null && isError(response)) {
 153  2
                     String[] errMsgPath = {"MSA-3"};
 154  2
                     String[] errMsg = PreParser.getFields(response.getMessage(), errMsgPath);                    
 155  2
                     throw new HL7Exception("Error message received: " + errMsg[0]);
 156  
                 }
 157  
                 
 158  22
             } while (response == null && ++retries <= maxRetries);
 159  
         }
 160  10
     }
 161  
     
 162  
     private void checkValidAckNeededCode(String theCode) throws HL7Exception {
 163  
         //must be one of the below ... 
 164  12
         if ( !(theCode == null || theCode.equals("") 
 165  
                 ||theCode.equals(AL) || theCode.equals(ER) 
 166  
                 || theCode.equals(NE) || theCode.equals(SU)) ) {
 167  0
             throw new HL7Exception("MSH-15 must be AL, ER, NE, or SU in the outgoing message");
 168  
         }            
 169  12
     }
 170  
     
 171  
     /**
 172  
      * Calls cycle() if we do not expect another thread to be doing so
 173  
      * @param expectingAck as in cycle
 174  
      */
 175  
     private void cycleIfNeeded(boolean expectingAck) throws HL7Exception {
 176  44649
         if (!myThreaded) {
 177  0
             cycle(expectingAck);
 178  
         }        
 179  44649
     }
 180  
     
 181  
     /**
 182  
      * Sleeps for 1 ms if externally threaded (this is to let the CPU idle).   
 183  
      */
 184  
     private void sleepIfNeeded() {
 185  44955
         if (myThreaded) {
 186  
             try {
 187  44955
                 Thread.sleep(1);
 188  44953
             } catch (InterruptedException e) { /* no problem */ }
 189  
         }                
 190  44955
     }
 191  
     
 192  
     /** Returns true if a CR or AR ACK */ 
 193  
     private static boolean isReject(Transportable theMessage) throws HL7Exception {
 194  6
         boolean reject = false;
 195  6
         String[] fieldPaths = {"MSA-1"};
 196  6
         String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
 197  6
         if (fields[0] != null && (fields[0].equals(CR) || fields[0].equals(AR))) {
 198  1
             reject = true;
 199  
         }        
 200  6
         return reject;
 201  
     }
 202  
 
 203  
     /** Returns true if a CE or AE ACK */ 
 204  
     private static boolean isError(Transportable theMessage) throws HL7Exception {
 205  5
         boolean error = false;
 206  5
         String[] fieldPaths = {"MSA-1"};
 207  5
         String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
 208  5
         if (fields[0] != null && (fields[0].equals(CE) || fields[0].equals(AE))) {
 209  2
             error = true;
 210  
         }
 211  5
         return error;
 212  
     }
 213  
 
 214  
     /**
 215  
      * @see ca.uhn.hl7v2.protocol.Processor#reserve(java.lang.String, long)
 216  
      */
 217  
     public synchronized void reserve(String theAckId, long thePeriodMillis) {
 218  4
         Long expiry = new Long(System.currentTimeMillis() + thePeriodMillis);
 219  4
         myReservations.put(theAckId, expiry);
 220  4
     }
 221  
     
 222  
     /**
 223  
      * Tries to send the message, and if there is an error reconnects and tries again. 
 224  
      */
 225  
     private void trySend(TransportLayer theTransport, Transportable theTransportable) throws TransportException {
 226  
         try {
 227  15
             theTransport.send(theTransportable);
 228  0
         } catch (TransportException e) {
 229  0
             theTransport.disconnect();
 230  0
             theTransport.connect();
 231  0
             theTransport.send(theTransportable);
 232  15
         }
 233  15
     }
 234  
     
 235  
     
 236  
     /**
 237  
      * Tries to receive a message, and if there is an error reconnects and tries again. 
 238  
      */
 239  
     private Transportable tryReceive(TransportLayer theTransport) throws TransportException {
 240  304
         Transportable message = null;
 241  
         try {
 242  304
             message = theTransport.receive();            
 243  15
         } catch (TransportException e) {
 244  15
             theTransport.disconnect();
 245  15
             theTransport.connect();
 246  11
             message = theTransport.receive();
 247  289
         }
 248  300
         return message;
 249  
     }
 250  
 
 251  
     /** 
 252  
      * @see ca.uhn.hl7v2.protocol.Processor#cycle(boolean)
 253  
      */
 254  
     public void cycle(boolean expectingAck) throws HL7Exception {
 255  304
         log.debug("In cycle()");
 256  
             
 257  304
             cleanReservations();
 258  304
         cleanAcceptAcks();
 259  304
         cleanReservedMessages();
 260  
 
 261  304
         Transportable in = null;
 262  
         try {
 263  304
             if (expectingAck) {
 264  155
                 in = tryReceive(myContext.getLocallyDrivenTransportLayer());
 265  
             } else {
 266  149
                 in = tryReceive(myContext.getRemotelyDrivenTransportLayer());
 267  
             }
 268  0
         } catch (TransportException e) {
 269  
             try {
 270  0
                 Thread.sleep(1000);
 271  0
             } catch (InterruptedException e1) {}
 272  0
             throw e;
 273  300
         }
 274  
         
 275  
         // log
 276  300
         if (in != null) {
 277  12
                log.debug("Received message: {}", in.getMessage());
 278  
         } else {
 279  288
                 log.debug("Received no message");
 280  
         }
 281  
         
 282  
         // If we have a message, handle it
 283  300
         if (in != null) { 
 284  12
             String acceptAckNeeded = null;
 285  
 //            String appAckNeeded = null;
 286  12
             String ackCode = null;
 287  12
             String ackId = null;
 288  
             
 289  
             try {
 290  12
                     String[] fieldPaths = {"MSH-15", "MSH-16", "MSA-1", "MSA-2"};
 291  12
                     String[] fields = PreParser.getFields(in.getMessage(), fieldPaths);         
 292  11
                                 acceptAckNeeded = fields[0];
 293  
 //                                appAckNeeded = fields[1];
 294  11
                                 ackCode = fields[2];
 295  11
                                 ackId = fields[3];
 296  1
             } catch (HL7Exception e) {
 297  1
                     log.warn("Failed to parse accept ack fields in incoming message", e);
 298  11
             }
 299  
             
 300  12
             if (ackId != null && ackCode != null && ackCode.startsWith("C")) {
 301  6
                 long expiryTime = System.currentTimeMillis() + 1000 * 60;
 302  6
                 myAcceptAcks.put(ackId, new ExpiringTransportable(in, expiryTime));
 303  6
             } else {
 304  6
                 AcceptAcknowledger.AcceptACK ack = AcceptAcknowledger.validate(getContext(), in);
 305  
             
 306  5
                 if ((acceptAckNeeded != null && acceptAckNeeded.equals(AL)) 
 307  
                     || (acceptAckNeeded != null && acceptAckNeeded.equals(ER) && !ack.isAcceptable()) 
 308  
                     || (acceptAckNeeded != null && acceptAckNeeded.equals(SU) && ack.isAcceptable())) {
 309  0
                     trySend(myContext.getRemotelyDrivenTransportLayer(), ack.getMessage());    
 310  
                 }
 311  
   
 312  5
                 if (ack.isAcceptable()) {
 313  5
                     if (isReserved(ackId)) {
 314  
                             
 315  1
                             log.debug("Received expected ACK message with ACK ID: {}", ackId);
 316  
                             
 317  1
                         removeReservation(ackId);
 318  1
                         long expiryTime = System.currentTimeMillis() + 1000 * 60 * 5;                
 319  1
                         myAvailableMessages.put(ackId, new ExpiringTransportable(in, expiryTime));
 320  
                         
 321  1
                     } else {
 322  
 
 323  4
                             log.debug("Sending message to router");
 324  4
                         Transportable out = myContext.getRouter().processMessage(in);
 325  4
                         sendAppResponse(out);
 326  
                         
 327  4
                     }
 328  
                 } else {
 329  
                         // TODO: should we do something more here? Might be nice to 
 330  
                         // allow a configurable handler for this situation
 331  0
                         log.warn("Incoming message was not acceptable");
 332  
                 }
 333  
                 
 334  
             }
 335  11
         } else {
 336  288
             String transport = expectingAck ? " Locally driven " : "Remotely driven";
 337  288
             log.debug("{} TransportLayer.receive() returned null.", transport);
 338  
         }
 339  
         
 340  299
         sleepIfNeeded();
 341  
 
 342  299
         log.debug("Exiting cycle()");
 343  299
     }
 344  
     
 345  
     /** Sends in a new thread if isThreaded, otherwise in current thread */
 346  
     private void sendAppResponse(final Transportable theResponse) {
 347  4
         final ProcessorImpl processor = this;
 348  4
         Runnable sender = new Runnable() {
 349  
             public void run() {
 350  
                 try {
 351  4
                         log.debug("Sending response: {}", theResponse);
 352  
                         
 353  
                     //TODO: make configurable 
 354  4
                         processor.send(theResponse, 2, 3000);
 355  
                         
 356  0
                 } catch (HL7Exception e) {
 357  0
                     log.error("Error trying to send response from Application", e);
 358  4
                 }
 359  4
             }
 360  
         };
 361  
         
 362  4
         if (myThreaded) {
 363  4
             myResponseExecutorService.execute(sender);
 364  
         } else {
 365  0
             sender.run();
 366  
         }
 367  4
     }
 368  
     
 369  
     /**
 370  
      * Removes expired message reservations from the reservation list.  
 371  
      */
 372  
     private synchronized void cleanReservations() {
 373  304
         Iterator<String> it = myReservations.keySet().iterator();
 374  428
         while (it.hasNext()) {
 375  124
             String ackId = it.next();
 376  124
             Long expiry = myReservations.get(ackId);
 377  124
             if (System.currentTimeMillis() > expiry.longValue()) {
 378  0
                 it.remove();
 379  
             }
 380  124
         }
 381  304
     }
 382  
     
 383  
     /**
 384  
      * Discards expired accept acknowledgements (these are used in retry protocol; see send()).   
 385  
      */
 386  
     private synchronized void cleanAcceptAcks() {
 387  304
         Iterator<String> it = myAcceptAcks.keySet().iterator();
 388  308
         while (it.hasNext()) {
 389  4
             String ackId = it.next();
 390  4
             ExpiringTransportable et = myAcceptAcks.get(ackId);
 391  4
             if (System.currentTimeMillis() > et.expiryTime) {
 392  0
                 it.remove();
 393  
             }
 394  4
         }        
 395  304
     }
 396  
     
 397  
     private synchronized void cleanReservedMessages() throws HL7Exception {
 398  304
         Iterator<String> it = myAvailableMessages.keySet().iterator();
 399  305
         while (it.hasNext()) {
 400  1
             String ackId = it.next();            
 401  1
             ExpiringTransportable et = myAvailableMessages.get(ackId);
 402  1
             if (System.currentTimeMillis() > et.expiryTime) {
 403  0
                 it.remove();
 404  
                 
 405  
                 //send to an Application 
 406  0
                 Transportable out = myContext.getRouter().processMessage(et.transportable);
 407  0
                 sendAppResponse(out);                
 408  
             }
 409  1
         }  
 410  304
     }
 411  
     
 412  
     private synchronized boolean isReserved(String ackId) {
 413  9
         boolean reserved = false;
 414  9
         if (myReservations.containsKey(ackId)) {
 415  1
             reserved = true;
 416  
         }
 417  9
         return reserved;
 418  
     }
 419  
     
 420  
     private synchronized void removeReservation(String ackId) {
 421  1
         myReservations.remove(ackId);
 422  1
     }
 423  
     
 424  
 
 425  
     /**
 426  
      * @see ca.uhn.hl7v2.protocol.Processor#isAvailable(java.lang.String)
 427  
      */
 428  
     public boolean isAvailable(String theAckId) {
 429  0
         boolean available = false;
 430  0
         if (myAvailableMessages.containsKey(theAckId)) {
 431  0
             available = true;
 432  
         }
 433  0
         return available;
 434  
     }
 435  
 
 436  
     /** 
 437  
      * @see ca.uhn.hl7v2.protocol.Processor#receive(java.lang.String, long)
 438  
      */
 439  
     public Transportable receive(String theAckId, long theTimeoutMillis) throws HL7Exception {
 440  4
         if (!isReserved(theAckId)) {
 441  4
             reserve(theAckId, theTimeoutMillis);
 442  
         }
 443  
         
 444  4
         Transportable in = null;
 445  4
         long until = System.currentTimeMillis() + theTimeoutMillis;
 446  
         do {
 447  5665
             synchronized (this) {
 448  5665
                 ExpiringTransportable et = myAvailableMessages.get(theAckId);                
 449  5665
                 if (et == null) {
 450  5664
                     cycleIfNeeded(false);
 451  
                 } else {
 452  1
                     in = et.transportable;
 453  
                 }
 454  5665
             }
 455  5665
             sleepIfNeeded();
 456  5665
         } while (in == null && System.currentTimeMillis() < until);
 457  4
         return in;
 458  
     }
 459  
 
 460  
     /** 
 461  
      * @see ca.uhn.hl7v2.protocol.Processor#getContext()
 462  
      */
 463  
     public ProcessorContext getContext() {
 464  9
         return myContext;
 465  
     }
 466  
     
 467  
     /**
 468  
      * A struct for Transportable collection entries that time out.  
 469  
      *  
 470  
      * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
 471  
      * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
 472  
      */
 473  
     class ExpiringTransportable {
 474  
         public Transportable transportable;
 475  
         public long expiryTime;
 476  
         
 477  7
         public ExpiringTransportable(Transportable theTransportable, long theExpiryTime) {
 478  7
             transportable = theTransportable;
 479  7
             expiryTime = theExpiryTime;
 480  7
         }
 481  
     }
 482  
     
 483  
     /**
 484  
      * A Runnable that repeatedly calls the cycle() method of this class.  
 485  
      * 
 486  
      * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
 487  
      * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
 488  
      */
 489  
     private static class Cycler implements Runnable {
 490  
 
 491  
         private Processor myProcessor;
 492  
         private boolean myExpectingAck;
 493  
         private boolean isRunning;
 494  
         
 495  
         /**
 496  
          * @param theProcessor the processor on which to call cycle()
 497  
          * @param isExpectingAck passed to cycle()
 498  
          */
 499  28
         public Cycler(Processor theProcessor, boolean isExpectingAck) {
 500  28
             myProcessor = theProcessor;
 501  28
             myExpectingAck = isExpectingAck;
 502  28
             isRunning = true;
 503  28
         }
 504  
         
 505  
         /**
 506  
          * Execution will stop at the end of the next cycle.  
 507  
          */
 508  
         public void stop() {
 509  26
             isRunning = false;
 510  26
         }
 511  
         
 512  
         /** 
 513  
          * Calls cycle() repeatedly on the Processor given in the 
 514  
          * constructor, until stop() is called.  
 515  
          * 
 516  
          * @see java.lang.Runnable#run()
 517  
          */
 518  
         public void run() {
 519  328
             while (isRunning) {
 520  
                 try {
 521  304
                     myProcessor.cycle(myExpectingAck);
 522  1
                 } catch (HL7Exception e) {
 523  1
                     log.error("Error processing message", e);
 524  300
                 }
 525  
             }
 526  24
         }        
 527  
     }
 528  
 
 529  
 }