Coverage Report - ca.uhn.hl7v2.app.ConnectionHub
 
Classes in this File Line Coverage Branch Coverage Complexity
ConnectionHub
62%
42/67
45%
9/20
1.533
ConnectionHub$1
100%
4/4
N/A
1.533
ConnectionHub$CountingMap
92%
23/25
83%
10/12
1.533
ConnectionHub$CountingMap$Count
100%
11/11
75%
3/4
1.533
 
 1  
 /**
 2  
 The contents of this file are subject to the Mozilla Public License Version 1.1 
 3  
 (the "License"); you may not use this file except in compliance with the License. 
 4  
 You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
 5  
 Software distributed under the License is distributed on an "AS IS" basis, 
 6  
 WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
 7  
 specific language governing rights and limitations under the License. 
 8  
 
 9  
 The Original Code is "ConnectionHub.java".  Description: 
 10  
 "Provides access to shared HL7 Connections" 
 11  
 
 12  
 The Initial Developer of the Original Code is University Health Network. Copyright (C) 
 13  
 2001.  All Rights Reserved. 
 14  
 
 15  
 Contributor(s): ______________________________________. 
 16  
 
 17  
 Alternatively, the contents of this file may be used under the terms of the 
 18  
 GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
 19  
 applicable instead of those above.  If you wish to allow use of your version of this 
 20  
 file only under the terms of the GPL and not to allow others to use your version 
 21  
 of this file under the MPL, indicate your decision by deleting  the provisions above 
 22  
 and replace  them with the notice and other provisions required by the GPL License.  
 23  
 If you do not delete the provisions above, a recipient may use your version of 
 24  
 this file under either the MPL or the GPL. 
 25  
  */
 26  
 
 27  
 package ca.uhn.hl7v2.app;
 28  
 
 29  
 import java.util.Collections;
 30  
 import java.util.Map;
 31  
 import java.util.Set;
 32  
 import java.util.concurrent.ConcurrentHashMap;
 33  
 import java.util.concurrent.ConcurrentMap;
 34  
 import java.util.concurrent.ExecutorService;
 35  
 
 36  
 import org.slf4j.Logger;
 37  
 import org.slf4j.LoggerFactory;
 38  
 
 39  
 import ca.uhn.hl7v2.DefaultHapiContext;
 40  
 import ca.uhn.hl7v2.HL7Exception;
 41  
 import ca.uhn.hl7v2.HapiContext;
 42  
 import ca.uhn.hl7v2.HapiContextSupport;
 43  
 import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
 44  
 import ca.uhn.hl7v2.llp.LowerLayerProtocol;
 45  
 import ca.uhn.hl7v2.parser.Parser;
 46  
 import ca.uhn.hl7v2.util.ReflectionUtil;
 47  
 import ca.uhn.hl7v2.util.SocketFactory;
 48  
 
 49  
 /**
 50  
  * <p>
 51  
  * Provides access to shared HL7 Connections. The ConnectionHub has at most one connection to any
 52  
  * given address at any time.
 53  
  * </p>
 54  
  * <p>
 55  
  * <b>Synchronization Note:</b> This class should be safe to use in a multithreaded environment. A
 56  
  * synchronization mutex is maintained for any given target host and port, so that if two threads
 57  
  * are trying to connect to two separate destinations neither will block, but if two threads are
 58  
  * trying to connect to the same destination, one will block until the other has finished trying.
 59  
  * Use caution if this class is to be used in an environment where a very large (over 1000) number
 60  
  * of target host/port destinations will be accessed at the same time.
 61  
  * </p>
 62  
  * 
 63  
  * @author Bryan Tripp
 64  
  */
 65  
 public class ConnectionHub extends HapiContextSupport {
 66  
 
 67  1
         private static volatile ConnectionHub instance = null;
 68  1
         private static final Logger log = LoggerFactory.getLogger(ConnectionHub.class);
 69  
         /**
 70  
          * Set a system property with this key to a string containing an integer larger than the default
 71  
          * ("1000") if you need to connect to a very large number of targets at the same time in a
 72  
          * multithreaded environment.
 73  
          */
 74  1
         public static final String MAX_CONCURRENT_TARGETS = ConnectionHub.class.getName() + ".maxSize";
 75  9
         private final ConcurrentMap<String, String> connectionMutexes = new ConcurrentHashMap<String, String>();
 76  
         private final CountingMap<ConnectionData, Connection> connections;
 77  
 
 78  
         /** Creates a new instance of ConnectionHub */
 79  
         private ConnectionHub(ExecutorService executorService) {
 80  0
                 this(new DefaultHapiContext(executorService));
 81  0
         }
 82  
 
 83  
         private ConnectionHub(HapiContext context) {
 84  9
                 super(context);
 85  86
                 connections = new CountingMap<ConnectionData, Connection>() {
 86  
 
 87  
                         @Override
 88  
                         protected void dispose(Connection connection) {
 89  21
                                 connection.close();
 90  21
                         }
 91  
 
 92  
                         @Override
 93  
                         protected Connection open(ConnectionData connectionData) throws Exception {
 94  56
                                 return ConnectionFactory
 95  
                                                 .open(connectionData, getHapiContext().getExecutorService());
 96  
                         }
 97  
 
 98  
                 };
 99  9
         }
 100  
 
 101  
         public Set<? extends ConnectionData> allConnections() {
 102  37
                 return connections.keySet();
 103  
         }
 104  
 
 105  
         /**
 106  
          * @since 2.0
 107  
          */
 108  
         public Connection attach(ConnectionData data) throws HL7Exception {
 109  
                 try {
 110  259
                         Connection conn = null;
 111  
                         // Disallow establishing same connection targets concurrently
 112  259
                         connectionMutexes.putIfAbsent(data.toString(), data.toString());
 113  259
                         String mutex = connectionMutexes.get(data.toString());
 114  259
                         synchronized (mutex) {
 115  259
                                 discardConnectionIfStale(connections.get(data));
 116  
                                 // Create connection or increase counter
 117  259
                                 conn = connections.put(data);
 118  231
                         }
 119  231
                         return conn;
 120  28
                 } catch (Exception e) {
 121  28
                         log.debug("Failed to attach", e);
 122  28
                         throw new HL7Exception("Cannot open connection to " + data.getHost() + ":"
 123  
                                         + data.getPort() + "/" + data.getPort2(), e);
 124  
                 }
 125  
         }
 126  
 
 127  
         /**
 128  
          * Returns a Connection to the given address, opening this Connection if necessary. The given
 129  
          * Parser will only be used if a new Connection is opened, so there is no guarantee that the
 130  
          * Connection returned will be using the Parser you provide. If you need explicit access to the
 131  
          * Parser the Connection is using, call <code>Connection.getParser()</code>.
 132  
          * 
 133  
          * @since 2.1
 134  
          */
 135  
         public Connection attach(String host, int port, boolean tls) throws HL7Exception {
 136  140
                 return attach(new ConnectionData(host, port, 0, getHapiContext().getGenericParser(),
 137  
                                 getHapiContext().getLowerLayerProtocol(), tls, getHapiContext()
 138  
                                                 .getSocketFactory()));
 139  
         }
 140  
 
 141  
         /**
 142  
          * @since 2.1
 143  
          */
 144  
         public Connection attach(String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
 145  111
                 return attach(new ConnectionData(host, outboundPort, inboundPort, getHapiContext()
 146  
                                 .getGenericParser(), getHapiContext().getLowerLayerProtocol(), tls,
 147  
                                 getHapiContext().getSocketFactory()));
 148  
         }
 149  
 
 150  
         /**
 151  
          * @since 2.0
 152  
          */
 153  
         public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
 154  
                         Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
 155  0
                 return attach(host, outboundPort, inboundPort, parser, llpClass, false);
 156  
         }
 157  
 
 158  
         /**
 159  
          * @since 2.0
 160  
          */
 161  
         public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
 162  
                 Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
 163  0
                 LowerLayerProtocol llp = ReflectionUtil.instantiate(llpClass);
 164  0
                 return attach(host, outboundPort, inboundPort, parser, llp, tls);
 165  
         }
 166  
 
 167  
         /**
 168  
          * @since 2.0
 169  
          */
 170  
         public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
 171  
                         LowerLayerProtocol llp, boolean tls) throws HL7Exception {
 172  2
                 return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, null));
 173  
         }
 174  
 
 175  
         /**
 176  
          * @since 2.1
 177  
          */
 178  
         public Connection attach(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp, boolean tls, SocketFactory socketFactory) throws HL7Exception {
 179  0
                 return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, socketFactory));
 180  
         }
 181  
 
 182  
         /**
 183  
          * @since 2.1
 184  
          */
 185  
         public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp, boolean tls, SocketFactory socketFactory) throws HL7Exception {
 186  0
                 return attach(new ConnectionData(host, port, 0, parser, llp, tls, socketFactory));
 187  
         }
 188  
 
 189  
         /**
 190  
          * @since 2.1
 191  
          */
 192  
         public Connection attach(DefaultHapiContext hapiContext, String host, int port, boolean tls) throws HL7Exception {
 193  6
                 return attach(new ConnectionData(host, port, 0, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory()));
 194  
         }
 195  
 
 196  
         /**
 197  
          * @since 2.1
 198  
          */
 199  
         public Connection attach(DefaultHapiContext hapiContext, String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
 200  0
                 return attach(new ConnectionData(host, outboundPort, inboundPort, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory()));
 201  
         }
 202  
 
 203  
         /**
 204  
          * @since 1.2
 205  
          */
 206  
         public Connection attach(String host, int port, Parser parser,
 207  
                         Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
 208  0
                 return attach(host, port, parser, llpClass, false);
 209  
         }
 210  
 
 211  
         /**
 212  
          * @since 2.0
 213  
          */
 214  
         public Connection attach(String host, int port, Parser parser,
 215  
                         Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
 216  0
                 return attach(host, port, 0, parser, llpClass, tls);
 217  
         }
 218  
 
 219  
         /**
 220  
          * @since 2.0
 221  
          */
 222  
         public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp)
 223  
                         throws HL7Exception {
 224  0
                 return attach(host, port, 0, parser, llp, false);
 225  
         }
 226  
 
 227  
 
 228  
         /**
 229  
          * @since 2.0
 230  
          */
 231  
         public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp,
 232  
                         boolean tls) throws HL7Exception {
 233  2
                 return attach(host, port, 0, parser, llp, tls);
 234  
         }
 235  
 
 236  
         /**
 237  
          * Informs the ConnectionHub that you are done with the given Connection - if no other code is
 238  
          * using it, it will be closed, so you should not attempt to use a Connection after detaching
 239  
          * from it. If the connection is not enlisted, this method does nothing.
 240  
          */
 241  
         public void detach(Connection c) {
 242  11
                 ConnectionData cd = connections.find(c);
 243  11
                 if (cd != null)
 244  9
                         connections.remove(cd);
 245  11
         }
 246  
 
 247  
         /**
 248  
          * Closes and discards the given Connection so that it can not be returned in subsequent calls
 249  
          * to attach(). This method is to be used when there is a problem with a Connection, e.g. socket
 250  
          * connection closed by remote host.
 251  
          */
 252  
         public void discard(Connection c) {
 253  7
                 ConnectionData cd = connections.find(c);
 254  7
                 if (cd != null)
 255  7
                         connections.removeAllOf(cd);
 256  7
         }
 257  
 
 258  
         public void discardAll() {
 259  20
                 for (ConnectionData cd : allConnections()) {
 260  7
                         connections.removeAllOf(cd);
 261  7
                 }
 262  20
         }
 263  
 
 264  
         private void discardConnectionIfStale(Connection conn) {
 265  259
                 if (conn != null && !conn.isOpen()) {
 266  3
                         log.info("Discarding connection which appears to be closed. Remote addr: {}",
 267  
                                         conn.getRemoteAddress());
 268  3
                         discard(conn);
 269  3
                         conn = null;
 270  
                 }
 271  259
         }
 272  
 
 273  
         public Connection getKnownConnection(ConnectionData key) {
 274  0
                 return connections.get(key);
 275  
         }
 276  
 
 277  
         public boolean isOpen(ConnectionData key) {
 278  0
                 return getKnownConnection(key).isOpen();
 279  
         }
 280  
 
 281  
         /**
 282  
          * Returns the singleton instance of ConnectionHub
 283  
          * 
 284  
          * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub.
 285  
          *             See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub.
 286  
          */
 287  
         public static ConnectionHub getInstance() {
 288  0
                 return getInstance(DefaultExecutorService.getDefaultService());
 289  
         }
 290  
 
 291  
         /**
 292  
          * Returns the singleton instance of ConnectionHub.
 293  
          * 
 294  
          * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub.
 295  
          *             See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub.
 296  
          */
 297  
         public synchronized static ConnectionHub getInstance(ExecutorService service) {
 298  0
                 if (instance == null || service.isShutdown()) {
 299  0
                         instance = new ConnectionHub(service);
 300  
                 }
 301  0
                 return instance;
 302  
         }
 303  
 
 304  
         /**
 305  
          * Returns the singleton instance of ConnectionHub.
 306  
          * 
 307  
          * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub.
 308  
          *             See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub.
 309  
          */
 310  
         public static ConnectionHub getInstance(HapiContext context) {
 311  0
                 if (instance == null || context.getExecutorService().isShutdown()) {
 312  0
                         instance = new ConnectionHub(context);
 313  
                 }
 314  0
                 return instance;
 315  
         }
 316  
 
 317  
         /**
 318  
          * <p>
 319  
          * Returns a new (non-singleton) instance of the ConnectionHub which uses the given executor
 320  
          * service.
 321  
          * </p>
 322  
          * <p>
 323  
          * See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a>
 324  
          * for an example of how to use ConnectionHub.
 325  
          * </p>
 326  
          */
 327  
         public synchronized static ConnectionHub getNewInstance(HapiContext context) {
 328  9
                 return new ConnectionHub(context);
 329  
         }
 330  
 
 331  
         /**
 332  
          * @deprecated default executor service is shut down automatically
 333  
          */
 334  
         public static void shutdown() {
 335  0
                 ConnectionHub hub = getInstance();
 336  0
                 if (DefaultExecutorService.isDefaultService(hub.getHapiContext().getExecutorService())) {
 337  0
                         hub.getHapiContext().getExecutorService().shutdown();
 338  0
                         instance = null;
 339  
                 }
 340  0
         }
 341  
 
 342  
         /**
 343  
          * Helper class that implements a map that increases/decreases a counter when an entry is
 344  
          * added/removed. It is furthermore intended that an entry's value is derived from its key.
 345  
          * 
 346  
          * @param <K> key class
 347  
          * @param <D> managed value class
 348  
          */
 349  
         private abstract class CountingMap<K, D> {
 350  
                 private Map<K, Count> content;
 351  
 
 352  9
                 public CountingMap() {
 353  9
                         super();
 354  9
                         content = new ConcurrentHashMap<K, Count>();
 355  9
                 }
 356  
 
 357  
                 protected abstract void dispose(D value);
 358  
 
 359  
                 public K find(D value) {
 360  18
                         for (Map.Entry<K, Count> entry : content.entrySet()) {
 361  16
                                 if (entry.getValue().getValue().equals(value)) {
 362  16
                                         return entry.getKey();
 363  
                                 }
 364  0
                         }
 365  2
                         return null;
 366  
                 }
 367  
 
 368  
                 public D get(K key) {
 369  259
                         return content.containsKey(key) ? content.get(key).getValue() : null;
 370  
                 }
 371  
 
 372  
                 public Set<K> keySet() {
 373  37
                         return Collections.unmodifiableSet(content.keySet());
 374  
                 }
 375  
 
 376  
                 protected abstract D open(K key) throws Exception;
 377  
 
 378  
                 /**
 379  
                  * If the key exists, the counter is increased. Otherwise, a value is created, and the
 380  
                  * key/value pair is added to the map.
 381  
                  */
 382  
                 public D put(K key) throws Exception {
 383  259
                         if (content.containsKey(key)) {
 384  203
                                 return content.put(key, content.get(key).increase()).getValue();
 385  
                         } else {
 386  56
                                 Count c = new Count(open(key));
 387  28
                                 content.put(key, c);
 388  28
                                 return c.getValue();
 389  
                         }
 390  
                 }
 391  
 
 392  
                 /**
 393  
                  * If the counter of the key/value is greater than one, the counter is decreased. Otherwise,
 394  
                  * the entry is removed and the value is cleaned up.
 395  
                  */
 396  
                 public D remove(K key) {
 397  9
                         Count pair = content.get(key);
 398  9
                         if (pair == null)
 399  0
                                 return null;
 400  9
                         if (pair.isLast()) {
 401  7
                                 return removeAllOf(key);
 402  
                         }
 403  2
                         return content.put(key, content.get(key).decrease()).getValue();
 404  
                 }
 405  
 
 406  
                 /**
 407  
                  * The key/value entry is removed and the value is cleaned up.
 408  
                  */
 409  
                 public D removeAllOf(K key) {
 410  21
                         D removed = content.remove(key).value;
 411  21
                         dispose(removed);
 412  21
                         return removed;
 413  
                 }
 414  
 
 415  21
                 private class Count {
 416  
                         private int count;
 417  
                         private D value;
 418  
 
 419  
                         public Count(D value) {
 420  28
                                 this(value, 1);
 421  28
                         }
 422  
 
 423  233
                         private Count(D value, int number) {
 424  233
                                 this.value = value;
 425  233
                                 this.count = number;
 426  233
                         }
 427  
 
 428  
                         Count decrease() {
 429  2
                                 return !isLast() ? new Count(value, count - 1) : null;
 430  
                         }
 431  
 
 432  
                         public D getValue() {
 433  455
                                 return value;
 434  
                         }
 435  
 
 436  
                         Count increase() {
 437  203
                                 return new Count(value, count + 1);
 438  
                         }
 439  
 
 440  
                         boolean isLast() {
 441  11
                                 return count == 1;
 442  
                         }
 443  
 
 444  
                 }
 445  
 
 446  
         }
 447  
 
 448  
 }