Coverage Report - ca.uhn.hl7v2.app.Connection
 
Classes in this File Line Coverage Branch Coverage Complexity
Connection
77%
59/76
55%
11/20
1.65
 
 1  
 /**
 2  
 The contents of this file are subject to the Mozilla Public License Version 1.1 
 3  
 (the "License"); you may not use this file except in compliance with the License. 
 4  
 You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
 5  
 Software distributed under the License is distributed on an "AS IS" basis, 
 6  
 WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
 7  
 specific language governing rights and limitations under the License. 
 8  
 
 9  
 The Original Code is "Connection.java".  Description: 
 10  
 "A TCP/IP connection to a remote HL7 server." 
 11  
 
 12  
 The Initial Developer of the Original Code is University Health Network. Copyright (C) 
 13  
 2002.  All Rights Reserved. 
 14  
 
 15  
 Contributor(s): ______________________________________. 
 16  
 
 17  
 Alternatively, the contents of this file may be used under the terms of the 
 18  
 GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
 19  
 applicable instead of those above.  If you wish to allow use of your version of this 
 20  
 file only under the terms of the GPL and not to allow others to use your version 
 21  
 of this file under the MPL, indicate your decision by deleting  the provisions above 
 22  
 and replace  them with the notice and other provisions required by the GPL License.  
 23  
 If you do not delete the provisions above, a recipient may use your version of 
 24  
 this file under either the MPL or the GPL. 
 25  
  */
 26  
 
 27  
 package ca.uhn.hl7v2.app;
 28  
 
 29  
 import java.io.IOException;
 30  
 import java.net.InetAddress;
 31  
 import java.net.Socket;
 32  
 import java.util.ArrayList;
 33  
 import java.util.Iterator;
 34  
 import java.util.List;
 35  
 import java.util.concurrent.ExecutorService;
 36  
 import java.util.concurrent.Future;
 37  
 import java.util.concurrent.TimeUnit;
 38  
 
 39  
 import javax.net.ssl.SSLSocket;
 40  
 
 41  
 import org.slf4j.Logger;
 42  
 import org.slf4j.LoggerFactory;
 43  
 
 44  
 import ca.uhn.hl7v2.concurrent.BlockingMap;
 45  
 import ca.uhn.hl7v2.concurrent.BlockingHashMap;
 46  
 import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
 47  
 import ca.uhn.hl7v2.llp.HL7Writer;
 48  
 import ca.uhn.hl7v2.llp.LLPException;
 49  
 import ca.uhn.hl7v2.llp.LowerLayerProtocol;
 50  
 import ca.uhn.hl7v2.parser.Parser;
 51  
 
 52  
 /**
 53  
  * A TCP/IP connection to a remote HL7 server.
 54  
  * 
 55  
  * @author Bryan Tripp
 56  
  */
 57  
 public class Connection {
 58  
 
 59  1
         private static final Logger log = LoggerFactory.getLogger(Connection.class);
 60  
 
 61  
         private Initiator initiator;
 62  
         private Responder responder;
 63  
         private List<Socket> sockets;
 64  
         private HL7Writer ackWriter;
 65  
         private HL7Writer sendWriter;
 66  
         private Parser parser;
 67  
         private BlockingMap<String, String> responses;
 68  
         private List<Receiver> receivers;
 69  70
         private boolean open = true;
 70  
         private ExecutorService executorService;
 71  
 
 72  
         /**
 73  
          * Creates a new instance of Connection, with inbound and outbound
 74  
          * communication on a single port.
 75  
          */
 76  
         public Connection(Parser parser, LowerLayerProtocol llp,
 77  
                         Socket bidirectional) throws LLPException, IOException {
 78  4
                 this(parser, llp, bidirectional, DefaultExecutorService
 79  
                                 .getDefaultService());
 80  4
         }
 81  
 
 82  
         public Connection(Parser parser, LowerLayerProtocol llp,
 83  
                         Socket bidirectional, ExecutorService executorService)
 84  48
                         throws LLPException, IOException {
 85  48
                 init(parser, executorService, bidirectional);
 86  48
                 ackWriter = llp.getWriter(bidirectional.getOutputStream());
 87  48
                 sendWriter = ackWriter;
 88  48
                 this.executorService = executorService;
 89  48
                 sockets.add(bidirectional);
 90  48
                 receivers.add(new Receiver(this, llp.getReader(bidirectional
 91  
                                 .getInputStream())));
 92  48
                 this.initiator = new Initiator(this);
 93  48
         }
 94  
 
 95  
         /**
 96  
          * Creates a new instance of Connection, with inbound communication on one
 97  
          * port and outbound on another.
 98  
          */
 99  
         public Connection(Parser parser, LowerLayerProtocol llp, Socket inbound,
 100  
                         Socket outbound) throws LLPException, IOException {
 101  2
                 this(parser, llp, inbound, outbound, DefaultExecutorService
 102  
                                 .getDefaultService());
 103  2
         }
 104  
 
 105  
         /**
 106  
          * Creates a new instance of Connection, with inbound communication on one
 107  
          * port and outbound on another.
 108  
          */
 109  
         public Connection(Parser parser, LowerLayerProtocol llp, Socket inbound,
 110  
                         Socket outbound, ExecutorService executorService)
 111  22
                         throws LLPException, IOException {
 112  22
                 init(parser, executorService, inbound);
 113  22
                 ackWriter = llp.getWriter(inbound.getOutputStream());
 114  22
                 sendWriter = llp.getWriter(outbound.getOutputStream());
 115  22
                 sockets.add(outbound); // always add outbound first ... see getRemoteAddress()
 116  22
                 sockets.add(inbound);
 117  
 
 118  22
                 receivers.add(new Receiver(this,
 119  
                                 llp.getReader(inbound.getInputStream())));
 120  22
                 receivers.add(new Receiver(this, llp.getReader(outbound
 121  
                                 .getInputStream())));
 122  22
                 this.initiator = new Initiator(this);
 123  22
         }
 124  
 
 125  
         /** Common initialization tasks */
 126  
         private void init(Parser parser, ExecutorService executorService, Socket inboundSocket)
 127  
                         throws LLPException {
 128  70
                 this.parser = parser;
 129  70
                 this.executorService = executorService;
 130  70
                 sockets = new ArrayList<Socket>();
 131  70
                 responses = new BlockingHashMap<String, String>(executorService);
 132  70
                 receivers = new ArrayList<Receiver>(2);
 133  70
                 responder = new Responder(inboundSocket);
 134  70
         }
 135  
 
 136  
         /**
 137  
          * Start the receiver thread(s)
 138  
          */
 139  
         public void activate() {
 140  70
                 if (receivers != null) {
 141  70
                         for (Receiver receiver : receivers) {
 142  92
                                 receiver.start();
 143  92
                         }
 144  
                 }
 145  70
         }
 146  
 
 147  
         public ExecutorService getExecutorService() {
 148  92
                 return executorService;
 149  
         }
 150  
 
 151  
         /**
 152  
          * Returns the address of the remote host to which this Connection is
 153  
          * connected. If separate inbound and outbound sockets are used, the address
 154  
          * of the outbound socket is returned (the addresses should normally be the
 155  
          * same, but this isn't checked).
 156  
          */
 157  
         public InetAddress getRemoteAddress() {
 158  43
                 Socket s = sockets.get(0);
 159  43
                 return s.getInetAddress();
 160  
         }
 161  
 
 162  
         /**
 163  
          * Returns the remote port on the remote host to which this Connection is
 164  
          * connected. If separate inbound and outbound sockets are used, the port of
 165  
          * the outbound socket is returned.
 166  
          */
 167  
         public int getRemotePort() {
 168  0
                 Socket s = sockets.get(0);
 169  0
                 return s.getPort();
 170  
         }
 171  
 
 172  
         /** Returns the Initiator associated with this connection */
 173  
         public Initiator getInitiator() {
 174  110
                 return this.initiator;
 175  
         }
 176  
 
 177  
         /** Returns the Responder associated with this connection */
 178  
         public Responder getResponder() {
 179  153
                 return this.responder;
 180  
         }
 181  
 
 182  
         public boolean isSecure() {
 183  0
                 if (isOpen() && sockets.size() > 0) {
 184  0
                         return (sockets.get(0) instanceof SSLSocket);
 185  
                 } else {
 186  0
                         throw new IllegalStateException(
 187  
                                         "Can't determine status on closed socket");
 188  
                 }
 189  
         }
 190  
 
 191  
         /**
 192  
          * Returns the HL7Writer through which unsolicited outbound messages should
 193  
          * be sent.
 194  
          */
 195  
         protected HL7Writer getSendWriter() {
 196  110
                 return this.sendWriter;
 197  
         }
 198  
 
 199  
         /**
 200  
          * Returns the HL7Writer through which responses to inbound messages should
 201  
          * be sent.
 202  
          */
 203  
         protected HL7Writer getAckWriter() {
 204  116
                 return this.ackWriter;
 205  
         }
 206  
 
 207  
         public Parser getParser() {
 208  447
                 return this.parser;
 209  
         }
 210  
 
 211  
         public String toString() {
 212  0
                 StringBuilder buf = new StringBuilder();
 213  0
                 buf.append(getRemoteAddress().getHostName());
 214  0
                 buf.append(":");
 215  0
                 for (Iterator<Socket> iter = sockets.iterator(); iter.hasNext();) {
 216  0
                         Socket socket = iter.next();
 217  0
                         buf.append(socket.getPort());
 218  0
                         if (iter.hasNext())
 219  0
                                 buf.append(",");
 220  0
                 }
 221  0
                 return buf.toString();
 222  
         }
 223  
 
 224  
         /**
 225  
          * Reserves a future incoming message by ack ID. When the incoming message
 226  
          * with the given ack ID arrives, the message will be returned.
 227  
          */
 228  
         protected Future<String> waitForResponse(final String messageID,
 229  
                         long timeout) throws InterruptedException {
 230  110
                 return responses.asyncPoll(messageID, timeout, TimeUnit.MILLISECONDS);
 231  
         }
 232  
 
 233  
         /**
 234  
          * Given the ack ID (MSA-2) of a message, notifies a waiting consumer thread
 235  
          * about a received response.
 236  
          */
 237  
         protected boolean isRecipientWaiting(String ackID, String message) {
 238  110
                 return responses.give(ackID, message);
 239  
         }
 240  
 
 241  
         /** Stops running Receiver threads and closes open sockets */
 242  
         public void close() {
 243  
                 // Mark all running receiver threads to be stopped
 244  123
                 for (Receiver receiver : receivers) {
 245  177
                         if (receiver.isRunning())
 246  92
                                 receiver.stop();
 247  177
                 }
 248  
                 // Forces open sockets to be closed. This causes the Receiver threads to
 249  
                 // eventually terminate
 250  123
                 for (Socket socket : sockets) {
 251  
                         try {
 252  177
                                 if (!socket.isClosed())
 253  93
                                         socket.close();
 254  0
                         } catch (Exception e) {
 255  0
                                 log.error("Error while stopping threads and closing sockets", e);
 256  177
                         }
 257  177
                 }
 258  
 
 259  123
                 open = false;
 260  123
         }
 261  
 
 262  
         public boolean isOpen() {
 263  278
                 return open;
 264  
         }
 265  
 
 266  
 }