View Javadoc

1   /**
2   The contents of this file are subject to the Mozilla Public License Version 1.1 
3   (the "License"); you may not use this file except in compliance with the License. 
4   You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
5   Software distributed under the License is distributed on an "AS IS" basis, 
6   WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
7   specific language governing rights and limitations under the License. 
8   
9   The Original Code is "Connection.java".  Description: 
10  "A TCP/IP connection to a remote HL7 server." 
11  
12  The Initial Developer of the Original Code is University Health Network. Copyright (C) 
13  2002.  All Rights Reserved. 
14  
15  Contributor(s): ______________________________________. 
16  
17  Alternatively, the contents of this file may be used under the terms of the 
18  GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
19  applicable instead of those above.  If you wish to allow use of your version of this 
20  file only under the terms of the GPL and not to allow others to use your version 
21  of this file under the MPL, indicate your decision by deleting  the provisions above 
22  and replace  them with the notice and other provisions required by the GPL License.  
23  If you do not delete the provisions above, a recipient may use your version of 
24  this file under either the MPL or the GPL. 
25   */
26  
27  package ca.uhn.hl7v2.app;
28  
29  import java.io.IOException;
30  import java.net.InetAddress;
31  import java.net.Socket;
32  import java.util.ArrayList;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.concurrent.ExecutorService;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.TimeUnit;
38  
39  import javax.net.ssl.SSLSocket;
40  
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  import ca.uhn.hl7v2.concurrent.BlockingMap;
45  import ca.uhn.hl7v2.concurrent.BlockingHashMap;
46  import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
47  import ca.uhn.hl7v2.llp.HL7Writer;
48  import ca.uhn.hl7v2.llp.LLPException;
49  import ca.uhn.hl7v2.llp.LowerLayerProtocol;
50  import ca.uhn.hl7v2.parser.Parser;
51  
52  /**
53   * A TCP/IP connection to a remote HL7 server.
54   * 
55   * @author Bryan Tripp
56   */
57  public class Connection {
58  
59  	private static final Logger log = LoggerFactory.getLogger(Connection.class);
60  
61  	private Initiator initiator;
62  	private Responder responder;
63  	private List<Socket> sockets;
64  	private HL7Writer ackWriter;
65  	private HL7Writer sendWriter;
66  	private Parser parser;
67  	private BlockingMap<String, String> responses;
68  	private List<Receiver> receivers;
69  	private boolean open = true;
70  	private ExecutorService executorService;
71  
72  	/**
73  	 * Creates a new instance of Connection, with inbound and outbound
74  	 * communication on a single port.
75  	 */
76  	public Connection(Parser parser, LowerLayerProtocol llp,
77  			Socket bidirectional) throws LLPException, IOException {
78  		this(parser, llp, bidirectional, DefaultExecutorService
79  				.getDefaultService());
80  	}
81  
82  	public Connection(Parser parser, LowerLayerProtocol llp,
83  			Socket bidirectional, ExecutorService executorService)
84  			throws LLPException, IOException {
85  		init(parser, executorService, bidirectional);
86  		ackWriter = llp.getWriter(bidirectional.getOutputStream());
87  		sendWriter = ackWriter;
88  		this.executorService = executorService;
89  		sockets.add(bidirectional);
90  		receivers.add(new Receiver(this, llp.getReader(bidirectional
91  				.getInputStream())));
92  		this.initiator = new Initiator(this);
93  	}
94  
95  	/**
96  	 * Creates a new instance of Connection, with inbound communication on one
97  	 * port and outbound on another.
98  	 */
99  	public Connection(Parser parser, LowerLayerProtocol llp, Socket inbound,
100 			Socket outbound) throws LLPException, IOException {
101 		this(parser, llp, inbound, outbound, DefaultExecutorService
102 				.getDefaultService());
103 	}
104 
105 	/**
106 	 * Creates a new instance of Connection, with inbound communication on one
107 	 * port and outbound on another.
108 	 */
109 	public Connection(Parser parser, LowerLayerProtocol llp, Socket inbound,
110 			Socket outbound, ExecutorService executorService)
111 			throws LLPException, IOException {
112 		init(parser, executorService, inbound);
113 		ackWriter = llp.getWriter(inbound.getOutputStream());
114 		sendWriter = llp.getWriter(outbound.getOutputStream());
115 		sockets.add(outbound); // always add outbound first ... see getRemoteAddress()
116 		sockets.add(inbound);
117 
118 		receivers.add(new Receiver(this,
119 				llp.getReader(inbound.getInputStream())));
120 		receivers.add(new Receiver(this, llp.getReader(outbound
121 				.getInputStream())));
122 		this.initiator = new Initiator(this);
123 	}
124 
125 	/** Common initialization tasks */
126 	private void init(Parser parser, ExecutorService executorService, Socket inboundSocket)
127 			throws LLPException {
128 		this.parser = parser;
129 		this.executorService = executorService;
130 		sockets = new ArrayList<Socket>();
131 		responses = new BlockingHashMap<String, String>(executorService);
132 		receivers = new ArrayList<Receiver>(2);
133 		responder = new Responder(inboundSocket);
134 	}
135 
136 	/**
137 	 * Start the receiver thread(s)
138 	 */
139 	public void activate() {
140 		if (receivers != null) {
141 			for (Receiver receiver : receivers) {
142 				receiver.start();
143 			}
144 		}
145 	}
146 
147 	public ExecutorService getExecutorService() {
148 		return executorService;
149 	}
150 
151 	/**
152 	 * Returns the address of the remote host to which this Connection is
153 	 * connected. If separate inbound and outbound sockets are used, the address
154 	 * of the outbound socket is returned (the addresses should normally be the
155 	 * same, but this isn't checked).
156 	 */
157 	public InetAddress getRemoteAddress() {
158 		Socket s = sockets.get(0);
159 		return s.getInetAddress();
160 	}
161 
162 	/**
163 	 * Returns the remote port on the remote host to which this Connection is
164 	 * connected. If separate inbound and outbound sockets are used, the port of
165 	 * the outbound socket is returned.
166 	 */
167 	public int getRemotePort() {
168 		Socket s = sockets.get(0);
169 		return s.getPort();
170 	}
171 
172 	/** Returns the Initiator associated with this connection */
173 	public Initiator getInitiator() {
174 		return this.initiator;
175 	}
176 
177 	/** Returns the Responder associated with this connection */
178 	public Responder getResponder() {
179 		return this.responder;
180 	}
181 
182 	public boolean isSecure() {
183 		if (isOpen() && sockets.size() > 0) {
184 			return (sockets.get(0) instanceof SSLSocket);
185 		} else {
186 			throw new IllegalStateException(
187 					"Can't determine status on closed socket");
188 		}
189 	}
190 
191 	/**
192 	 * Returns the HL7Writer through which unsolicited outbound messages should
193 	 * be sent.
194 	 */
195 	protected HL7Writer getSendWriter() {
196 		return this.sendWriter;
197 	}
198 
199 	/**
200 	 * Returns the HL7Writer through which responses to inbound messages should
201 	 * be sent.
202 	 */
203 	protected HL7Writer getAckWriter() {
204 		return this.ackWriter;
205 	}
206 
207 	public Parser getParser() {
208 		return this.parser;
209 	}
210 
211 	public String toString() {
212 		StringBuilder buf = new StringBuilder();
213 		buf.append(getRemoteAddress().getHostName());
214 		buf.append(":");
215 		for (Iterator<Socket> iter = sockets.iterator(); iter.hasNext();) {
216 			Socket socket = iter.next();
217 			buf.append(socket.getPort());
218 			if (iter.hasNext())
219 				buf.append(",");
220 		}
221 		return buf.toString();
222 	}
223 
224 	/**
225 	 * Reserves a future incoming message by ack ID. When the incoming message
226 	 * with the given ack ID arrives, the message will be returned.
227 	 */
228 	protected Future<String> waitForResponse(final String messageID,
229 			long timeout) throws InterruptedException {
230 		return responses.asyncPoll(messageID, timeout, TimeUnit.MILLISECONDS);
231 	}
232 
233 	/**
234 	 * Given the ack ID (MSA-2) of a message, notifies a waiting consumer thread
235 	 * about a received response.
236 	 */
237 	protected boolean isRecipientWaiting(String ackID, String message) {
238 		return responses.give(ackID, message);
239 	}
240 
241 	/** Stops running Receiver threads and closes open sockets */
242 	public void close() {
243 		// Mark all running receiver threads to be stopped
244 		for (Receiver receiver : receivers) {
245 			if (receiver.isRunning())
246 				receiver.stop();
247 		}
248 		// Forces open sockets to be closed. This causes the Receiver threads to
249 		// eventually terminate
250 		for (Socket socket : sockets) {
251 			try {
252 				if (!socket.isClosed())
253 					socket.close();
254 			} catch (Exception e) {
255 				log.error("Error while stopping threads and closing sockets", e);
256 			}
257 		}
258 
259 		open = false;
260 	}
261 
262 	public boolean isOpen() {
263 		return open;
264 	}
265 
266 }