View Javadoc

1   /*
2   The contents of this file are subject to the Mozilla Public License Version 1.1 
3   (the "License"); you may not use this file except in compliance with the License. 
4   You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
5   Software distributed under the License is distributed on an "AS IS" basis, 
6   WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
7   specific language governing rights and limitations under the License. 
8   
9   The Original Code is "ProcessorImpl.java".  Description: 
10  "A default implementation of Processor." 
11  
12  The Initial Developer of the Original Code is University Health Network. Copyright (C) 
13  2004.  All Rights Reserved. 
14  
15  Contributor(s): ______________________________________. 
16  
17  Alternatively, the contents of this file may be used under the terms of the 
18  GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
19  applicable instead of those above.  If you wish to allow use of your version of this 
20  file only under the terms of the GPL and not to allow others to use your version 
21  of this file under the MPL, indicate your decision by deleting  the provisions above 
22  and replace  them with the notice and other provisions required by the GPL License.  
23  If you do not delete the provisions above, a recipient may use your version of 
24  this file under either the MPL or the GPL. 
25  */
26  
27  package ca.uhn.hl7v2.protocol.impl;
28  
29  import java.util.HashMap;
30  import java.util.Iterator;
31  import java.util.Map;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.Executors;
34  
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  
38  import ca.uhn.hl7v2.HL7Exception;
39  import ca.uhn.hl7v2.preparser.PreParser;
40  import ca.uhn.hl7v2.protocol.Processor;
41  import ca.uhn.hl7v2.protocol.ProcessorContext;
42  import ca.uhn.hl7v2.protocol.TransportException;
43  import ca.uhn.hl7v2.protocol.TransportLayer;
44  import ca.uhn.hl7v2.protocol.Transportable;
45  
46  /**
47   * A default implementation of <code>Processor</code>.  
48   *  
49   * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
50   * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
51   */
52  public class ProcessorImpl implements Processor {
53  
54      private static final Logger log = LoggerFactory.getLogger(ProcessorImpl.class);
55  
56      private ProcessorContext myContext;
57      private final Map<String, ExpiringTransportable> myAcceptAcks;
58      private final Map<String, Long> myReservations;
59      private final Map<String, ExpiringTransportable> myAvailableMessages;
60      private boolean myThreaded; //true if separate threads are calling cycle()  
61      private Cycler ackCycler;
62      private Cycler nonAckCycler;
63      private ExecutorService myResponseExecutorService;
64      
65      /**
66       * @param theContext source of supporting services 
67       * @param isThreaded true if this class should create threads in which to call cycle(), and 
68       *  in which to send responses from Applications.  This is the preferred mode.  Use false 
69       *  if threading is not allowed, eg you are running the code in an EJB container.  In this case, 
70       *  the send() and receive() methods will call cycle() themselves as needed.  However, cycle() 
71       *  makes potentially blocking calls, so these methods may not return until the next message 
72       *  is received from the remote server, regardless of timeout.  Probably the worst example of this
73       *  would be if receive() was called to wait for an application ACK that was specified as "RE" (ie
74       *  required on error).  No response will be returned if the message is processed without error, 
75       *  and in a non-threaded environment, receive() will block forever.  Use true if you can, otherwise
76       *  study this class carefully.
77       *   
78       * TODO: write a MLLPTransport with non-blocking IO  
79       * TODO: reconnect transport layers on error and retry 
80       */
81      public ProcessorImpl(ProcessorContext theContext, boolean isThreaded) {
82          myContext = theContext;
83          myThreaded = isThreaded;
84          myAcceptAcks = new HashMap<String, ExpiringTransportable>();
85          myReservations = new HashMap<String, Long>();
86          myAvailableMessages = new HashMap<String, ExpiringTransportable>();
87          
88          if (isThreaded) {
89              myResponseExecutorService = Executors.newSingleThreadExecutor(); 
90  
91          	ackCycler = new Cycler(this, true);
92              Thread ackThd = new Thread(ackCycler);
93              ackThd.start();
94              nonAckCycler = new Cycler(this, false);
95              Thread nonAckThd = new Thread(nonAckCycler);
96              nonAckThd.start();            
97          }
98      }
99      
100     /**
101      * If self-threaded, stops threads that have been created.  
102      */
103     public void stop() {
104         if (myThreaded) {
105             ackCycler.stop();
106             nonAckCycler.stop();
107 
108             myResponseExecutorService.shutdownNow();
109         }
110     }
111 
112     /**
113      * @see ca.uhn.hl7v2.protocol.Processor#send(ca.uhn.hl7v2.protocol.Transportable, int, long)
114      */
115     public void send(Transportable theMessage, int maxRetries, long retryIntervalMillis) throws HL7Exception {
116         String[] fieldPaths = {"MSH-10", "MSH-15", "MSH-16"};
117         String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
118         String controlId = fields[0];
119         String needAcceptAck = fields[1];
120         String needAppAck = fields[2];
121         
122         checkValidAckNeededCode(needAcceptAck);
123         
124         trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
125         
126         boolean originalMode = (needAcceptAck == null && needAppAck == null); 
127         if (originalMode || !NE.equals(needAcceptAck)) {
128         
129             Transportable response = null;
130             int retries = 0;
131             do {
132                 long until = System.currentTimeMillis() + retryIntervalMillis;
133                 while (response == null && System.currentTimeMillis() < until) {
134                     synchronized (this) {
135                         ExpiringTransportable et = myAcceptAcks.remove(controlId);
136                         if (et == null) {
137                             cycleIfNeeded(true);
138                         } else {
139                             response = et.transportable;
140                         }
141                     }
142                     sleepIfNeeded();
143                 }
144                 
145                 if ((response == null && needAcceptAck != null && needAcceptAck.equals(AL))
146                         || (response != null && isReject(response))) {
147                     log.info("Resending message {}", controlId);
148                     trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
149                     response = null;                    
150                 }
151                 
152                 if (response != null && isError(response)) {
153                     String[] errMsgPath = {"MSA-3"};
154                     String[] errMsg = PreParser.getFields(response.getMessage(), errMsgPath);                    
155                     throw new HL7Exception("Error message received: " + errMsg[0]);
156                 }
157                 
158             } while (response == null && ++retries <= maxRetries);
159         }
160     }
161     
162     private void checkValidAckNeededCode(String theCode) throws HL7Exception {
163         //must be one of the below ... 
164         if ( !(theCode == null || theCode.equals("") 
165                 ||theCode.equals(AL) || theCode.equals(ER) 
166                 || theCode.equals(NE) || theCode.equals(SU)) ) {
167             throw new HL7Exception("MSH-15 must be AL, ER, NE, or SU in the outgoing message");
168         }            
169     }
170     
171     /**
172      * Calls cycle() if we do not expect another thread to be doing so
173      * @param expectingAck as in cycle
174      */
175     private void cycleIfNeeded(boolean expectingAck) throws HL7Exception {
176         if (!myThreaded) {
177             cycle(expectingAck);
178         }        
179     }
180     
181     /**
182      * Sleeps for 1 ms if externally threaded (this is to let the CPU idle).   
183      */
184     private void sleepIfNeeded() {
185         if (myThreaded) {
186             try {
187                 Thread.sleep(1);
188             } catch (InterruptedException e) { /* no problem */ }
189         }                
190     }
191     
192     /** Returns true if a CR or AR ACK */ 
193     private static boolean isReject(Transportable theMessage) throws HL7Exception {
194         boolean reject = false;
195         String[] fieldPaths = {"MSA-1"};
196         String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
197         if (fields[0] != null && (fields[0].equals(CR) || fields[0].equals(AR))) {
198             reject = true;
199         }        
200         return reject;
201     }
202 
203     /** Returns true if a CE or AE ACK */ 
204     private static boolean isError(Transportable theMessage) throws HL7Exception {
205         boolean error = false;
206         String[] fieldPaths = {"MSA-1"};
207         String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
208         if (fields[0] != null && (fields[0].equals(CE) || fields[0].equals(AE))) {
209             error = true;
210         }
211         return error;
212     }
213 
214     /**
215      * @see ca.uhn.hl7v2.protocol.Processor#reserve(java.lang.String, long)
216      */
217     public synchronized void reserve(String theAckId, long thePeriodMillis) {
218         Long expiry = new Long(System.currentTimeMillis() + thePeriodMillis);
219         myReservations.put(theAckId, expiry);
220     }
221     
222     /**
223      * Tries to send the message, and if there is an error reconnects and tries again. 
224      */
225     private void trySend(TransportLayer theTransport, Transportable theTransportable) throws TransportException {
226         try {
227             theTransport.send(theTransportable);
228         } catch (TransportException e) {
229             theTransport.disconnect();
230             theTransport.connect();
231             theTransport.send(theTransportable);
232         }
233     }
234     
235     
236     /**
237      * Tries to receive a message, and if there is an error reconnects and tries again. 
238      */
239     private Transportable tryReceive(TransportLayer theTransport) throws TransportException {
240         Transportable message = null;
241         try {
242             message = theTransport.receive();            
243         } catch (TransportException e) {
244             theTransport.disconnect();
245             theTransport.connect();
246             message = theTransport.receive();
247         }
248         return message;
249     }
250 
251     /** 
252      * @see ca.uhn.hl7v2.protocol.Processor#cycle(boolean)
253      */
254     public void cycle(boolean expectingAck) throws HL7Exception {
255         log.debug("In cycle()");
256     	
257     	cleanReservations();
258         cleanAcceptAcks();
259         cleanReservedMessages();
260 
261         Transportable in = null;
262         try {
263             if (expectingAck) {
264                 in = tryReceive(myContext.getLocallyDrivenTransportLayer());
265             } else {
266                 in = tryReceive(myContext.getRemotelyDrivenTransportLayer());
267             }
268         } catch (TransportException e) {
269             try {
270                 Thread.sleep(1000);
271             } catch (InterruptedException e1) {}
272             throw e;
273         }
274         
275         // log
276         if (in != null) {
277                log.debug("Received message: {}", in.getMessage());
278         } else {
279         	log.debug("Received no message");
280         }
281         
282         // If we have a message, handle it
283         if (in != null) { 
284             String acceptAckNeeded = null;
285 //            String appAckNeeded = null;
286             String ackCode = null;
287             String ackId = null;
288             
289             try {
290 	            String[] fieldPaths = {"MSH-15", "MSH-16", "MSA-1", "MSA-2"};
291 	            String[] fields = PreParser.getFields(in.getMessage(), fieldPaths);         
292 				acceptAckNeeded = fields[0];
293 //				appAckNeeded = fields[1];
294 				ackCode = fields[2];
295 				ackId = fields[3];
296             } catch (HL7Exception e) {
297             	log.warn("Failed to parse accept ack fields in incoming message", e);
298             }
299             
300             if (ackId != null && ackCode != null && ackCode.startsWith("C")) {
301                 long expiryTime = System.currentTimeMillis() + 1000 * 60;
302                 myAcceptAcks.put(ackId, new ExpiringTransportable(in, expiryTime));
303             } else {
304                 AcceptAcknowledger.AcceptACK ack = AcceptAcknowledger.validate(getContext(), in);
305             
306                 if ((acceptAckNeeded != null && acceptAckNeeded.equals(AL)) 
307                     || (acceptAckNeeded != null && acceptAckNeeded.equals(ER) && !ack.isAcceptable()) 
308                     || (acceptAckNeeded != null && acceptAckNeeded.equals(SU) && ack.isAcceptable())) {
309                     trySend(myContext.getRemotelyDrivenTransportLayer(), ack.getMessage());    
310                 }
311   
312                 if (ack.isAcceptable()) {
313                     if (isReserved(ackId)) {
314                     	
315                     	log.debug("Received expected ACK message with ACK ID: {}", ackId);
316                     	
317                         removeReservation(ackId);
318                         long expiryTime = System.currentTimeMillis() + 1000 * 60 * 5;                
319                         myAvailableMessages.put(ackId, new ExpiringTransportable(in, expiryTime));
320                         
321                     } else {
322 
323                     	log.debug("Sending message to router");
324                         Transportable out = myContext.getRouter().processMessage(in);
325                         sendAppResponse(out);
326                         
327                     }
328                 } else {
329                 	// TODO: should we do something more here? Might be nice to 
330                 	// allow a configurable handler for this situation
331                 	log.warn("Incoming message was not acceptable");
332                 }
333                 
334             }
335         } else {
336             String transport = expectingAck ? " Locally driven " : "Remotely driven";
337             log.debug("{} TransportLayer.receive() returned null.", transport);
338         }
339         
340         sleepIfNeeded();
341 
342         log.debug("Exiting cycle()");
343     }
344     
345     /** Sends in a new thread if isThreaded, otherwise in current thread */
346     private void sendAppResponse(final Transportable theResponse) {
347         final ProcessorImpl processor = this;
348         Runnable sender = new Runnable() {
349             public void run() {
350                 try {
351                 	log.debug("Sending response: {}", theResponse);
352                 	
353                     //TODO: make configurable 
354                 	processor.send(theResponse, 2, 3000);
355                 	
356                 } catch (HL7Exception e) {
357                     log.error("Error trying to send response from Application", e);
358                 }
359             }
360         };
361         
362         if (myThreaded) {
363             myResponseExecutorService.execute(sender);
364         } else {
365             sender.run();
366         }
367     }
368     
369     /**
370      * Removes expired message reservations from the reservation list.  
371      */
372     private synchronized void cleanReservations() {
373         Iterator<String> it = myReservations.keySet().iterator();
374         while (it.hasNext()) {
375             String ackId = it.next();
376             Long expiry = myReservations.get(ackId);
377             if (System.currentTimeMillis() > expiry.longValue()) {
378                 it.remove();
379             }
380         }
381     }
382     
383     /**
384      * Discards expired accept acknowledgements (these are used in retry protocol; see send()).   
385      */
386     private synchronized void cleanAcceptAcks() {
387         Iterator<String> it = myAcceptAcks.keySet().iterator();
388         while (it.hasNext()) {
389             String ackId = it.next();
390             ExpiringTransportable et = myAcceptAcks.get(ackId);
391             if (System.currentTimeMillis() > et.expiryTime) {
392                 it.remove();
393             }
394         }        
395     }
396     
397     private synchronized void cleanReservedMessages() throws HL7Exception {
398         Iterator<String> it = myAvailableMessages.keySet().iterator();
399         while (it.hasNext()) {
400             String ackId = it.next();            
401             ExpiringTransportable et = myAvailableMessages.get(ackId);
402             if (System.currentTimeMillis() > et.expiryTime) {
403                 it.remove();
404                 
405                 //send to an Application 
406                 Transportable out = myContext.getRouter().processMessage(et.transportable);
407                 sendAppResponse(out);                
408             }
409         }  
410     }
411     
412     private synchronized boolean isReserved(String ackId) {
413         boolean reserved = false;
414         if (myReservations.containsKey(ackId)) {
415             reserved = true;
416         }
417         return reserved;
418     }
419     
420     private synchronized void removeReservation(String ackId) {
421         myReservations.remove(ackId);
422     }
423     
424 
425     /**
426      * @see ca.uhn.hl7v2.protocol.Processor#isAvailable(java.lang.String)
427      */
428     public boolean isAvailable(String theAckId) {
429         boolean available = false;
430         if (myAvailableMessages.containsKey(theAckId)) {
431             available = true;
432         }
433         return available;
434     }
435 
436     /** 
437      * @see ca.uhn.hl7v2.protocol.Processor#receive(java.lang.String, long)
438      */
439     public Transportable receive(String theAckId, long theTimeoutMillis) throws HL7Exception {
440         if (!isReserved(theAckId)) {
441             reserve(theAckId, theTimeoutMillis);
442         }
443         
444         Transportable in = null;
445         long until = System.currentTimeMillis() + theTimeoutMillis;
446         do {
447             synchronized (this) {
448                 ExpiringTransportable et = myAvailableMessages.get(theAckId);                
449                 if (et == null) {
450                     cycleIfNeeded(false);
451                 } else {
452                     in = et.transportable;
453                 }
454             }
455             sleepIfNeeded();
456         } while (in == null && System.currentTimeMillis() < until);
457         return in;
458     }
459 
460     /** 
461      * @see ca.uhn.hl7v2.protocol.Processor#getContext()
462      */
463     public ProcessorContext getContext() {
464         return myContext;
465     }
466     
467     /**
468      * A struct for Transportable collection entries that time out.  
469      *  
470      * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
471      * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
472      */
473     class ExpiringTransportable {
474         public Transportable transportable;
475         public long expiryTime;
476         
477         public ExpiringTransportable(Transportable theTransportable, long theExpiryTime) {
478             transportable = theTransportable;
479             expiryTime = theExpiryTime;
480         }
481     }
482     
483     /**
484      * A Runnable that repeatedly calls the cycle() method of this class.  
485      * 
486      * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
487      * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
488      */
489     private static class Cycler implements Runnable {
490 
491         private Processor myProcessor;
492         private boolean myExpectingAck;
493         private boolean isRunning;
494         
495         /**
496          * @param theProcessor the processor on which to call cycle()
497          * @param isExpectingAck passed to cycle()
498          */
499         public Cycler(Processor theProcessor, boolean isExpectingAck) {
500             myProcessor = theProcessor;
501             myExpectingAck = isExpectingAck;
502             isRunning = true;
503         }
504         
505         /**
506          * Execution will stop at the end of the next cycle.  
507          */
508         public void stop() {
509             isRunning = false;
510         }
511         
512         /** 
513          * Calls cycle() repeatedly on the Processor given in the 
514          * constructor, until stop() is called.  
515          * 
516          * @see java.lang.Runnable#run()
517          */
518         public void run() {
519             while (isRunning) {
520                 try {
521                     myProcessor.cycle(myExpectingAck);
522                 } catch (HL7Exception e) {
523                     log.error("Error processing message", e);
524                 }
525             }
526         }        
527     }
528 
529 }