View Javadoc

1   /**
2   The contents of this file are subject to the Mozilla Public License Version 1.1 
3   (the "License"); you may not use this file except in compliance with the License. 
4   You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
5   Software distributed under the License is distributed on an "AS IS" basis, 
6   WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
7   specific language governing rights and limitations under the License. 
8   
9   The Original Code is "ConnectionHub.java".  Description: 
10  "Provides access to shared HL7 Connections" 
11  
12  The Initial Developer of the Original Code is University Health Network. Copyright (C) 
13  2001.  All Rights Reserved. 
14  
15  Contributor(s): ______________________________________. 
16  
17  Alternatively, the contents of this file may be used under the terms of the 
18  GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
19  applicable instead of those above.  If you wish to allow use of your version of this 
20  file only under the terms of the GPL and not to allow others to use your version 
21  of this file under the MPL, indicate your decision by deleting  the provisions above 
22  and replace  them with the notice and other provisions required by the GPL License.  
23  If you do not delete the provisions above, a recipient may use your version of 
24  this file under either the MPL or the GPL. 
25   */
26  
27  package ca.uhn.hl7v2.app;
28  
29  import java.util.Collections;
30  import java.util.Map;
31  import java.util.Set;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.ConcurrentMap;
34  import java.util.concurrent.ExecutorService;
35  
36  import org.slf4j.Logger;
37  import org.slf4j.LoggerFactory;
38  
39  import ca.uhn.hl7v2.DefaultHapiContext;
40  import ca.uhn.hl7v2.HL7Exception;
41  import ca.uhn.hl7v2.HapiContext;
42  import ca.uhn.hl7v2.HapiContextSupport;
43  import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
44  import ca.uhn.hl7v2.llp.LowerLayerProtocol;
45  import ca.uhn.hl7v2.parser.Parser;
46  import ca.uhn.hl7v2.util.ReflectionUtil;
47  import ca.uhn.hl7v2.util.SocketFactory;
48  
49  /**
50   * <p>
51   * Provides access to shared HL7 Connections. The ConnectionHub has at most one connection to any
52   * given address at any time.
53   * </p>
54   * <p>
55   * <b>Synchronization Note:</b> This class should be safe to use in a multithreaded environment. A
56   * synchronization mutex is maintained for any given target host and port, so that if two threads
57   * are trying to connect to two separate destinations neither will block, but if two threads are
58   * trying to connect to the same destination, one will block until the other has finished trying.
59   * Use caution if this class is to be used in an environment where a very large (over 1000) number
60   * of target host/port destinations will be accessed at the same time.
61   * </p>
62   * 
63   * @author Bryan Tripp
64   */
65  public class ConnectionHub extends HapiContextSupport {
66  
67  	private static volatile ConnectionHub instance = null;
68  	private static final Logger log = LoggerFactory.getLogger(ConnectionHub.class);
69  	/**
70  	 * Set a system property with this key to a string containing an integer larger than the default
71  	 * ("1000") if you need to connect to a very large number of targets at the same time in a
72  	 * multithreaded environment.
73  	 */
74  	public static final String MAX_CONCURRENT_TARGETS = ConnectionHub.class.getName() + ".maxSize";
75  	private final ConcurrentMap<String, String> connectionMutexes = new ConcurrentHashMap<String, String>();
76  	private final CountingMap<ConnectionData, Connection> connections;
77  
78  	/** Creates a new instance of ConnectionHub */
79  	private ConnectionHub(ExecutorService executorService) {
80  		this(new DefaultHapiContext(executorService));
81  	}
82  
83  	private ConnectionHub(HapiContext context) {
84  		super(context);
85  		connections = new CountingMap<ConnectionData, Connection>() {
86  
87  			@Override
88  			protected void dispose(Connection connection) {
89  				connection.close();
90  			}
91  
92  			@Override
93  			protected Connection open(ConnectionData connectionData) throws Exception {
94  				return ConnectionFactory
95  						.open(connectionData, getHapiContext().getExecutorService());
96  			}
97  
98  		};
99  	}
100 
101 	public Set<? extends ConnectionData> allConnections() {
102 		return connections.keySet();
103 	}
104 
105 	/**
106 	 * @since 2.0
107 	 */
108 	public Connection attach(ConnectionData data) throws HL7Exception {
109 		try {
110 			Connection conn = null;
111 			// Disallow establishing same connection targets concurrently
112 			connectionMutexes.putIfAbsent(data.toString(), data.toString());
113 			String mutex = connectionMutexes.get(data.toString());
114 			synchronized (mutex) {
115 				discardConnectionIfStale(connections.get(data));
116 				// Create connection or increase counter
117 				conn = connections.put(data);
118 			}
119 			return conn;
120 		} catch (Exception e) {
121 			log.debug("Failed to attach", e);
122 			throw new HL7Exception("Cannot open connection to " + data.getHost() + ":"
123 					+ data.getPort() + "/" + data.getPort2(), e);
124 		}
125 	}
126 
127 	/**
128 	 * Returns a Connection to the given address, opening this Connection if necessary. The given
129 	 * Parser will only be used if a new Connection is opened, so there is no guarantee that the
130 	 * Connection returned will be using the Parser you provide. If you need explicit access to the
131 	 * Parser the Connection is using, call <code>Connection.getParser()</code>.
132 	 * 
133 	 * @since 2.1
134 	 */
135 	public Connection attach(String host, int port, boolean tls) throws HL7Exception {
136 		return attach(new ConnectionData(host, port, 0, getHapiContext().getGenericParser(),
137 				getHapiContext().getLowerLayerProtocol(), tls, getHapiContext()
138 						.getSocketFactory()));
139 	}
140 
141 	/**
142 	 * @since 2.1
143 	 */
144 	public Connection attach(String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
145 		return attach(new ConnectionData(host, outboundPort, inboundPort, getHapiContext()
146 				.getGenericParser(), getHapiContext().getLowerLayerProtocol(), tls,
147 				getHapiContext().getSocketFactory()));
148 	}
149 
150 	/**
151 	 * @since 2.0
152 	 */
153 	public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
154 			Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
155 		return attach(host, outboundPort, inboundPort, parser, llpClass, false);
156 	}
157 
158 	/**
159 	 * @since 2.0
160 	 */
161 	public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
162 		Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
163 		LowerLayerProtocol llp = ReflectionUtil.instantiate(llpClass);
164 		return attach(host, outboundPort, inboundPort, parser, llp, tls);
165 	}
166 
167 	/**
168 	 * @since 2.0
169 	 */
170 	public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
171 			LowerLayerProtocol llp, boolean tls) throws HL7Exception {
172 		return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, null));
173 	}
174 
175 	/**
176 	 * @since 2.1
177 	 */
178 	public Connection attach(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp, boolean tls, SocketFactory socketFactory) throws HL7Exception {
179 		return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, socketFactory));
180 	}
181 
182 	/**
183 	 * @since 2.1
184 	 */
185 	public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp, boolean tls, SocketFactory socketFactory) throws HL7Exception {
186 		return attach(new ConnectionData(host, port, 0, parser, llp, tls, socketFactory));
187 	}
188 
189 	/**
190 	 * @since 2.1
191 	 */
192 	public Connection attach(DefaultHapiContext hapiContext, String host, int port, boolean tls) throws HL7Exception {
193 		return attach(new ConnectionData(host, port, 0, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory()));
194 	}
195 
196 	/**
197 	 * @since 2.1
198 	 */
199 	public Connection attach(DefaultHapiContext hapiContext, String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
200 		return attach(new ConnectionData(host, outboundPort, inboundPort, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory()));
201 	}
202 
203 	/**
204 	 * @since 1.2
205 	 */
206 	public Connection attach(String host, int port, Parser parser,
207 			Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
208 		return attach(host, port, parser, llpClass, false);
209 	}
210 
211 	/**
212 	 * @since 2.0
213 	 */
214 	public Connection attach(String host, int port, Parser parser,
215 			Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
216 		return attach(host, port, 0, parser, llpClass, tls);
217 	}
218 
219 	/**
220 	 * @since 2.0
221 	 */
222 	public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp)
223 			throws HL7Exception {
224 		return attach(host, port, 0, parser, llp, false);
225 	}
226 
227 
228 	/**
229 	 * @since 2.0
230 	 */
231 	public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp,
232 			boolean tls) throws HL7Exception {
233 		return attach(host, port, 0, parser, llp, tls);
234 	}
235 
236 	/**
237 	 * Informs the ConnectionHub that you are done with the given Connection - if no other code is
238 	 * using it, it will be closed, so you should not attempt to use a Connection after detaching
239 	 * from it. If the connection is not enlisted, this method does nothing.
240 	 */
241 	public void detach(Connection c) {
242 		ConnectionData cd = connections.find(c);
243 		if (cd != null)
244 			connections.remove(cd);
245 	}
246 
247 	/**
248 	 * Closes and discards the given Connection so that it can not be returned in subsequent calls
249 	 * to attach(). This method is to be used when there is a problem with a Connection, e.g. socket
250 	 * connection closed by remote host.
251 	 */
252 	public void discard(Connection c) {
253 		ConnectionData cd = connections.find(c);
254 		if (cd != null)
255 			connections.removeAllOf(cd);
256 	}
257 
258 	public void discardAll() {
259 		for (ConnectionData cd : allConnections()) {
260 			connections.removeAllOf(cd);
261 		}
262 	}
263 
264 	private void discardConnectionIfStale(Connection conn) {
265 		if (conn != null && !conn.isOpen()) {
266 			log.info("Discarding connection which appears to be closed. Remote addr: {}",
267 					conn.getRemoteAddress());
268 			discard(conn);
269 			conn = null;
270 		}
271 	}
272 
273 	public Connection getKnownConnection(ConnectionData key) {
274 		return connections.get(key);
275 	}
276 
277 	public boolean isOpen(ConnectionData key) {
278 		return getKnownConnection(key).isOpen();
279 	}
280 
281 	/**
282 	 * Returns the singleton instance of ConnectionHub
283 	 * 
284 	 * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub.
285 	 *             See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub.
286 	 */
287 	public static ConnectionHub getInstance() {
288 		return getInstance(DefaultExecutorService.getDefaultService());
289 	}
290 
291 	/**
292 	 * Returns the singleton instance of ConnectionHub.
293 	 * 
294 	 * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub.
295 	 *             See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub.
296 	 */
297 	public synchronized static ConnectionHub getInstance(ExecutorService service) {
298 		if (instance == null || service.isShutdown()) {
299 			instance = new ConnectionHub(service);
300 		}
301 		return instance;
302 	}
303 
304 	/**
305 	 * Returns the singleton instance of ConnectionHub.
306 	 * 
307 	 * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub.
308 	 *             See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub.
309 	 */
310 	public static ConnectionHub getInstance(HapiContext context) {
311 		if (instance == null || context.getExecutorService().isShutdown()) {
312 			instance = new ConnectionHub(context);
313 		}
314 		return instance;
315 	}
316 
317 	/**
318 	 * <p>
319 	 * Returns a new (non-singleton) instance of the ConnectionHub which uses the given executor
320 	 * service.
321 	 * </p>
322 	 * <p>
323 	 * See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a>
324 	 * for an example of how to use ConnectionHub.
325 	 * </p>
326 	 */
327 	public synchronized static ConnectionHub getNewInstance(HapiContext context) {
328 		return new ConnectionHub(context);
329 	}
330 
331 	/**
332 	 * @deprecated default executor service is shut down automatically
333 	 */
334 	public static void shutdown() {
335 		ConnectionHub hub = getInstance();
336 		if (DefaultExecutorService.isDefaultService(hub.getHapiContext().getExecutorService())) {
337 			hub.getHapiContext().getExecutorService().shutdown();
338 			instance = null;
339 		}
340 	}
341 
342 	/**
343 	 * Helper class that implements a map that increases/decreases a counter when an entry is
344 	 * added/removed. It is furthermore intended that an entry's value is derived from its key.
345 	 * 
346 	 * @param <K> key class
347 	 * @param <D> managed value class
348 	 */
349 	private abstract class CountingMap<K, D> {
350 		private Map<K, Count> content;
351 
352 		public CountingMap() {
353 			super();
354 			content = new ConcurrentHashMap<K, Count>();
355 		}
356 
357 		protected abstract void dispose(D value);
358 
359 		public K find(D value) {
360 			for (Map.Entry<K, Count> entry : content.entrySet()) {
361 				if (entry.getValue().getValue().equals(value)) {
362 					return entry.getKey();
363 				}
364 			}
365 			return null;
366 		}
367 
368 		public D get(K key) {
369 			return content.containsKey(key) ? content.get(key).getValue() : null;
370 		}
371 
372 		public Set<K> keySet() {
373 			return Collections.unmodifiableSet(content.keySet());
374 		}
375 
376 		protected abstract D open(K key) throws Exception;
377 
378 		/**
379 		 * If the key exists, the counter is increased. Otherwise, a value is created, and the
380 		 * key/value pair is added to the map.
381 		 */
382 		public D put(K key) throws Exception {
383 			if (content.containsKey(key)) {
384 				return content.put(key, content.get(key).increase()).getValue();
385 			} else {
386 				Count c = new Count(open(key));
387 				content.put(key, c);
388 				return c.getValue();
389 			}
390 		}
391 
392 		/**
393 		 * If the counter of the key/value is greater than one, the counter is decreased. Otherwise,
394 		 * the entry is removed and the value is cleaned up.
395 		 */
396 		public D remove(K key) {
397 			Count pair = content.get(key);
398 			if (pair == null)
399 				return null;
400 			if (pair.isLast()) {
401 				return removeAllOf(key);
402 			}
403 			return content.put(key, content.get(key).decrease()).getValue();
404 		}
405 
406 		/**
407 		 * The key/value entry is removed and the value is cleaned up.
408 		 */
409 		public D removeAllOf(K key) {
410 			D removed = content.remove(key).value;
411 			dispose(removed);
412 			return removed;
413 		}
414 
415 		private class Count {
416 			private int count;
417 			private D value;
418 
419 			public Count(D value) {
420 				this(value, 1);
421 			}
422 
423 			private Count(D value, int number) {
424 				this.value = value;
425 				this.count = number;
426 			}
427 
428 			Count decrease() {
429 				return !isLast() ? new Count(value, count - 1) : null;
430 			}
431 
432 			public D getValue() {
433 				return value;
434 			}
435 
436 			Count increase() {
437 				return new Count(value, count + 1);
438 			}
439 
440 			boolean isLast() {
441 				return count == 1;
442 			}
443 
444 		}
445 
446 	}
447 
448 }