View Javadoc

1   package ca.uhn.hl7v2.hoh.raw.client;
2   
3   import java.io.IOException;
4   import java.net.Socket;
5   import java.net.URL;
6   import java.text.SimpleDateFormat;
7   import java.util.ArrayList;
8   import java.util.Date;
9   import java.util.IdentityHashMap;
10  import java.util.Iterator;
11  import java.util.List;
12  import java.util.Map;
13  import java.util.Map.Entry;
14  import java.util.concurrent.Executors;
15  import java.util.concurrent.ScheduledExecutorService;
16  import java.util.concurrent.TimeUnit;
17  
18  import ca.uhn.hl7v2.hoh.api.IClientMultithreaded;
19  import ca.uhn.hl7v2.hoh.util.Validate;
20  
21  /**
22   * <p>
23   * Raw message sender using the HL7 over HTTP specification which uses a
24   * {@link ScheduledExecutorService} to provide advanced functionality such as
25   * persistent connections which time out and close automatically.
26   * </p>
27   * <p>
28   * This connector uses an executor service which can start worker threads, so
29   * use caution if embedding within a J2EE container.
30   * </p>
31   */
32  public class HohRawClientMultithreaded extends AbstractRawClient implements IClientMultithreaded {
33  
34  	/**
35  	 * Default {@link #setSocketTimeout(long) Socket Timeout}, 10000ms
36  	 */
37  	public static final long DEFAULT_SOCKET_TIMEOUT = 10000;
38  
39  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(HohRawClientMultithreaded.class);
40  
41  	private final ScheduledExecutorService myExecutorService;
42  	private Map<Socket, Long> myIdleSocketsToTimeBecameIdle = new IdentityHashMap<Socket, Long>();
43  	private final SimpleDateFormat myLogTimeFormat = new SimpleDateFormat("HH:mm:ss,SSS");
44  	private boolean myReapingScheduled;
45  	private long mySocketTimeout = DEFAULT_SOCKET_TIMEOUT;
46  
47  	/**
48  	 * Constructor
49  	 */
50  	public HohRawClientMultithreaded() {
51  		myExecutorService = Executors.newScheduledThreadPool(1);
52  	}
53  
54  	/**
55  	 * Constructor
56  	 * 
57  	 * @param theHost
58  	 *            The HOST (name/address). E.g. "192.168.1.1"
59  	 * @param thePort
60  	 *            The PORT. E.g. "8080"
61  	 * @param thePath
62  	 *            The path being requested (must either be blank or start with
63  	 *            '/' and contain a path). E.g. "/Apps/Receiver.jsp"
64  	 */
65  	public HohRawClientMultithreaded(String theHost, int thePort, String thePath) {
66  		this();
67  
68  		setHost(theHost);
69  		setPort(thePort);
70  		setUriPath(thePath);
71  	}
72  
73  	/**
74  	 * Constructor
75  	 * 
76  	 * @param theHost
77  	 *            The HOST (name/address). E.g. "192.168.1.1"
78  	 * @param thePort
79  	 *            The PORT. E.g. "8080"
80  	 * @param theUriPath
81  	 *            The URI path being requested (must either be blank or start with
82  	 *            '/' and contain a path). E.g. "/Apps/Receiver.jsp"
83  	 * @param theExecutorService
84  	 *            The executor service to use for detecting stale sockets
85  	 */
86  	public HohRawClientMultithreaded(String theHost, int thePort, String theUriPath, ScheduledExecutorService theExecutorService) {
87  		super(theHost, thePort, theUriPath);
88  		Validate.notNull(theExecutorService, "executorService");
89  
90  		myExecutorService = theExecutorService;
91  	}
92  
93  	/**
94  	 * Constructor
95  	 * 
96  	 * @param theUrl
97  	 *            The URL to connect to
98  	 * @param theExecutorService
99  	 *            The executor service to use for detecting stale sockets
100 	 */
101 	public HohRawClientMultithreaded(URL theUrl) {
102 		this();
103 		setUrl(theUrl);
104 	}
105 
106 	/**
107 	 * Constructor
108 	 * 
109 	 * @param theUrl
110 	 *            The URL to connect to
111 	 * @param theExecutorService
112 	 *            The executor service to use for detecting stale sockets
113 	 */
114 	public HohRawClientMultithreaded(URL theUrl, ScheduledExecutorService theExecutorService) {
115 		super(theUrl);
116 		Validate.notNull(theExecutorService, "executorService");
117 
118 		myExecutorService = theExecutorService;
119 	}
120 
121 	@Override
122 	protected synchronized Socket provideSocket() throws IOException {
123 		Socket retVal;
124 		if (myIdleSocketsToTimeBecameIdle.size() == 0) {
125 			ourLog.info("Creating new remote connection to {}:{}", getHost(), getPort());
126 			retVal = connect();
127 		} else {
128 			retVal = myIdleSocketsToTimeBecameIdle.keySet().iterator().next();
129 			myIdleSocketsToTimeBecameIdle.remove(retVal);
130 		}
131 		return retVal;
132 	}
133 
134 	/**
135 	 * Returns a socket to the pool. If the socket is closed, it will
136 	 * not be returned.
137 	 */
138 	@Override
139 	protected synchronized void returnSocket(Socket theSocket) {
140 		if (theSocket.isClosed()) {
141 			return;
142 		}
143 		
144 		long now = System.currentTimeMillis();
145 
146 		// TODO: reap immediately if timeout is 0
147 		
148 		if (ourLog.isDebugEnabled()) {
149 			if (mySocketTimeout == -1) {
150 				ourLog.debug("Returning socket, will not attempt to reap");
151 			} else {
152 				ourLog.debug("Returning socket, will be eligible for reaping at " + myLogTimeFormat.format(new Date(now + mySocketTimeout)));
153 			}
154 		}
155 
156 		myIdleSocketsToTimeBecameIdle.put(theSocket, now);
157 		scheduleReaping();
158 	}
159 
160 	private void scheduleReaping() {
161 		long now = System.currentTimeMillis();
162 		if (myReapingScheduled) {
163 			ourLog.debug("Reaping already scheduled");
164 			return;
165 		}
166 
167 		if (myIdleSocketsToTimeBecameIdle.size() < 1) {
168 			return;
169 		}
170 
171 		if (mySocketTimeout == -1) {
172 			return;
173 		}
174 		
175 		long earliestReapingTime = Long.MAX_VALUE;
176 		for (Long next : myIdleSocketsToTimeBecameIdle.values()) {
177 			long nextReapingTime = next + mySocketTimeout;
178 			if (nextReapingTime < earliestReapingTime) {
179 				earliestReapingTime = nextReapingTime;
180 			}
181 		}
182 
183 		long delay = earliestReapingTime - now;
184 		if (ourLog.isDebugEnabled()) {
185 			ourLog.debug("Scheduling socket reaping in {} ms at {}", delay, myLogTimeFormat.format(new Date(earliestReapingTime)));
186 		}
187 
188 		myExecutorService.schedule(new TimeoutTask(), delay, TimeUnit.MILLISECONDS);
189 		myReapingScheduled = true;
190 	}
191 
192 	/**
193 	 * {@inheritDoc}
194 	 */
195 	public long getSocketTimeout() {
196 		return mySocketTimeout;
197 	}
198 
199 	/**
200 	 * {@inheritDoc}
201 	 */
202 	public synchronized void setSocketTimeout(long theSocketTimeout) {
203 		if (mySocketTimeout < -1) {
204 			throw new IllegalArgumentException("Socket timeout must be -1, 0, or a positive integer");
205 		}
206 		mySocketTimeout = theSocketTimeout;
207 		myReapingScheduled = false;
208 		scheduleReaping();
209 	}
210 
211 	private class TimeoutTask implements Runnable {
212 		public void run() {
213 
214 			if (mySocketTimeout == -1) {
215 				return;
216 			}
217 			
218 			ourLog.debug("Beginning socket reaping pass");
219 			try {
220 
221 				List<Socket> socketsToClose = new ArrayList<Socket>();
222 				long closeIfActiveBefore = System.currentTimeMillis() - mySocketTimeout;
223 				synchronized (HohRawClientMultithreaded.this) {
224 
225 					for (Iterator<Map.Entry<Socket, Long>> iter = myIdleSocketsToTimeBecameIdle.entrySet().iterator(); iter.hasNext();) {
226 						Entry<Socket, Long> nextEntry = iter.next();
227 						if (nextEntry.getValue() < closeIfActiveBefore) {
228 							Socket key = nextEntry.getKey();
229 							socketsToClose.add(key);
230 							ourLog.info("Closing idle socket with local port {} because it has been idle since {}", key.getLocalPort(), new Date(nextEntry.getValue()));
231 							iter.remove();
232 						} else {
233 							if (ourLog.isDebugEnabled()) {
234 								ourLog.debug("Next socket has " + (nextEntry.getValue() - closeIfActiveBefore) + "ms remaining");
235 							}
236 						}
237 					}
238 
239 					myReapingScheduled = false;
240 					scheduleReaping();
241 				}
242 
243 				for (Socket next : socketsToClose) {
244 					closeSocket(next);
245 				}
246 			} catch (Throwable e) {
247 				ourLog.error("Failure during reaper pass", e);
248 			}
249 		}
250 	}
251 
252 }