View Javadoc

1   package ca.uhn.hl7v2.hoh.encoder;
2   
3   import java.io.ByteArrayOutputStream;
4   import java.io.IOException;
5   import java.io.InputStream;
6   import java.net.SocketException;
7   import java.net.SocketTimeoutException;
8   import java.nio.charset.Charset;
9   import java.nio.charset.UnsupportedCharsetException;
10  import java.util.ArrayList;
11  import java.util.LinkedHashMap;
12  import java.util.List;
13  import java.util.Map;
14  import java.util.regex.Pattern;
15  
16  import ca.uhn.hl7v2.hoh.api.DecodeException;
17  import ca.uhn.hl7v2.hoh.api.NonHl7ResponseException;
18  import ca.uhn.hl7v2.hoh.sign.SignatureFailureException;
19  import ca.uhn.hl7v2.hoh.sign.SignatureVerificationException;
20  import ca.uhn.hl7v2.hoh.util.GZipUtils;
21  import ca.uhn.hl7v2.hoh.util.IOUtils;
22  import ca.uhn.hl7v2.hoh.util.StringUtils;
23  import ca.uhn.hl7v2.hoh.util.repackage.Base64;
24  
25  public abstract class AbstractHl7OverHttpDecoder extends AbstractHl7OverHttp {
26  
27  	private static final Pattern WHITESPACE_PATTERN = Pattern.compile("\\s+");
28  
29  	/**
30  	 * Default amount of time that the decoder will attempt to read before
31  	 * timing out and throwing an IOException (30000ms)
32  	 * 
33  	 * @see #setReadTimeout(long)
34  	 */
35  	public static final int DEFAULT_READ_TIMEOUT = 30 * 1000;
36  
37  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(AbstractHl7OverHttpDecoder.class);
38  
39  	private byte[] myBytes;
40  	private List<String> myConformanceProblems;
41  	private int myContentLength = -1;
42  	private String myContentType;
43  	private boolean myGzipCoding;
44  	private long myLastStartedReading;
45  	private long myReadTimeout = DEFAULT_READ_TIMEOUT;
46  	private String myResponseName;
47  	private Integer myResponseStatus;
48  	private TransferEncoding myTransferEncoding;
49  	private String mySignature;
50  	private EncodingStyle myEncodingStyle;
51  
52  	private void addConformanceProblem(String theString) {
53  		ourLog.debug("Conformance problem detected: {}", theString);
54  		if (myConformanceProblems == null) {
55  			myConformanceProblems = new ArrayList<String>();
56  		}
57  		myConformanceProblems.add(theString);
58  	}
59  
60  	protected abstract void authorize() throws AuthorizationFailureException;
61  
62  	public void decode() throws DecodeException, SignatureVerificationException {
63  		ourLog.trace("Entering decode()");
64  		
65  		verifyNotUsed();
66  
67  		decodeHeaders();
68  		authorize();
69  		decodeBody();
70  		verifySignature();
71  
72  		ourLog.trace("Exiting decode()");
73  	}
74  
75  	private void decodeBody() throws DecodeException {
76  		byte[] bytes = myBytes;
77  
78  		if (myGzipCoding) {
79  			ourLog.debug("Decoding message contents using GZIP encoding style");
80  			try {
81  				bytes = GZipUtils.uncompress(bytes);
82  			} catch (IOException e) {
83  				throw new DecodeException("Failed to uncompress GZip content", e);
84  			}
85  		}
86  
87  		Charset charset = getCharset();
88  		
89  		ourLog.debug("Message is {} bytes with charset {}", bytes.length, charset.name());
90  		if (ourLog.isTraceEnabled()) {
91  			ourLog.trace("Raw message: {}", StringUtils.asciiEscape(bytes, charset));
92  		}
93  		
94  		String messageString = new String(bytes, charset);
95  		setMessage(messageString);
96  	}
97  
98  	private void decodeHeaders() throws DecodeException {
99  
100 		ourLog.trace("Headers are: {}", getHeaders());
101 
102 		for (Map.Entry<String, String> nextEntry : getHeaders().entrySet()) {
103 			String nextHeader = nextEntry.getKey().toLowerCase();
104 			String nextValue = nextEntry.getValue();
105 
106 			if ("transfer-encoding".equals(nextHeader)) {
107 				if ("chunked".equalsIgnoreCase(nextValue)) {
108 					myTransferEncoding = TransferEncoding.CHUNKED;
109 				} else {
110 					throw new DecodeException("Unknown transfer encoding: " + nextValue);
111 				}
112 			} else if ("content-length".equals(nextHeader)) {
113 				try {
114 					myContentLength = Integer.parseInt(nextValue);
115 				} catch (NumberFormatException e) {
116 					addConformanceProblem("Could not parse Content-Length header value: " + nextHeader);
117 				}
118 			} else if ("content-type".equals(nextHeader)) {
119 				int colonIndex = nextValue.indexOf(';');
120 				if (colonIndex == -1) {
121 					myContentType = nextValue;
122 				} else {
123 					myContentType = nextValue.substring(0, colonIndex);
124 					myEncodingStyle = EncodingStyle.withNameCaseInsensitive(myContentType);
125 					String charsetDef = nextValue.substring(colonIndex + 1).trim();
126 					if (charsetDef.startsWith("charset=")) {
127 						String charsetName = charsetDef.substring(8);
128 						Charset charset;
129 						try {
130 							charset = Charset.forName(charsetName);
131 						} catch (UnsupportedCharsetException e) {
132 							addConformanceProblem("Unsupported or invalid charset: " + charsetName);
133 							continue;
134 						}
135 						setCharset(charset);
136 					}
137 				}
138 
139 				myContentType = myContentType.trim();
140 
141 			} else if ("authorization".equals(nextHeader)) {
142 				int spaceIndex = nextValue.indexOf(' ');
143 				if (spaceIndex == -1) {
144 					throw new DecodeException("Invalid authorization header. No authorization style detected");
145 				}
146 				String type = nextValue.substring(0, spaceIndex);
147 				if ("basic".equalsIgnoreCase(type)) {
148 					String encodedCredentials = nextValue.substring(spaceIndex + 1);
149 					byte[] decodedCredentials = Base64.decodeBase64(encodedCredentials);
150 					String credentialsString = new String(decodedCredentials, getDefaultCharset());
151 					int colonIndex = credentialsString.indexOf(':');
152 					if (colonIndex == -1) {
153 						setUsername(credentialsString);
154 					} else {
155 						setUsername(credentialsString.substring(0, colonIndex));
156 						setPassword(credentialsString.substring(colonIndex + 1));
157 					}
158 				} else {
159 					addConformanceProblem("Invalid authorization type. Only basic authorization is supported.");
160 				}
161 			} else if ("content-coding".equals(nextHeader)) {
162 				if (StringUtils.isNotBlank(nextValue)) {
163 					if ("gzip".equals(nextValue)) {
164 						myGzipCoding = true;
165 					} else {
166 						throw new DecodeException("Unknown content-coding: " + nextValue);
167 					}
168 				}
169 			} else if (HTTP_HEADER_HL7_SIGNATURE_LC.equals(nextHeader)) {
170 				mySignature = nextValue;
171 			}
172 
173 		}
174 
175 	}
176 
177 	/**
178 	 * Returns the {@link EncodingStyle} associated with the incoming message,
179 	 * or <code>null</code>. This will be set automatically based on the value
180 	 * of the <code>Content-Type</code> header, and will be set to
181 	 * <code>null</code> if the content type is not provided, or if the content
182 	 * type does not correspond to an HL7 type.
183 	 * 
184 	 * @see {@link EncodingStyle} for a list of appropriate content types
185 	 */
186 	public EncodingStyle getEncodingStyle() {
187 		return myEncodingStyle;
188 	}
189 
190 	private void doReadContentsFromInputStreamAndDecode(InputStream theInputStream) throws DecodeException, AuthorizationFailureException, IOException, SignatureVerificationException {
191 		decodeHeaders();
192 		authorize();
193 		if (myTransferEncoding == TransferEncoding.CHUNKED) {
194 			myBytes = readBytesChunked(theInputStream);
195 		} else {
196 			myBytes = readBytesNonChunked(theInputStream);
197 		}
198 
199 		decodeBody();
200 		
201 		if (getContentType() == null) {
202 			throw new DecodeException("Content-Type not specified");
203 		}
204 		if (getEncodingStyle() == null) {
205 			throw new NonHl7ResponseException("Invalid Content-Type: " + getContentType(), getContentType(), getMessage());
206 		}
207 		
208 		verifySignature();
209 	}
210 
211 	private byte[] readBytesChunked(InputStream theInputStream) throws DecodeException, IOException {
212 		ourLog.debug("Decoding message bytes using CHUNKED encoding style");
213 		byte[] byteBuffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
214 		ByteArrayOutputStream bos = new ByteArrayOutputStream(IOUtils.DEFAULT_BUFFER_SIZE);
215 
216 		while (true) {
217 			String nextSize;
218 			try {
219 				nextSize = readLine(theInputStream);
220 			} catch (IOException e) {
221 				throw new DecodeException("Failed to decode CHUNKED encoding", e);
222 			}
223 
224 			ourLog.trace("Going to interpret CHUNKED size value: {}", nextSize);
225 			
226 			if (nextSize.length() == 0) {
227 				break;
228 			}
229 
230 			int nextSizeInt;
231 			try {
232 				nextSizeInt = Integer.parseInt(nextSize, 16);
233 			} catch (NumberFormatException e) {
234 				throw new DecodeException("Failed to decode CHUNKED encoding", e);
235 			}
236 
237 			ourLog.debug("Next CHUNKED size: {}", nextSizeInt);
238 
239 			if (nextSizeInt < 0) {
240 				throw new DecodeException("Received invalid octet count in chunked transfer encoding: " + nextSize);
241 			}
242 
243 			if (nextSizeInt > 0) {
244 				int totalRead = 0;
245 				myLastStartedReading = System.currentTimeMillis();
246 				do {
247 					int nextRead = Math.min(nextSizeInt, byteBuffer.length);
248 					int bytesRead = theInputStream.read(byteBuffer, 0, nextRead);
249 					if (bytesRead == -1) {
250 						ourLog.debug("Exception in readBytesChunked(InputStream): Reached EOF. Buffer has {} bytes", bos.size());
251 						throw new DecodeException("Reached EOF while reading in message chunk");
252 					}
253 					if (bytesRead == 0 && totalRead < nextSizeInt) {
254 						pauseDuringTimedOutRead();
255 					}
256 					totalRead += bytesRead;
257 
258 					ourLog.debug("Read {} byte chunk", bytesRead);
259 					bos.write(byteBuffer, 0, bytesRead);
260 
261 				} while (totalRead < nextSizeInt);
262 			}
263 
264 			// Try to read a trailing CRLF
265 			int nextChar;
266 			boolean had13 = false;
267 			boolean had10 = false;
268 			boolean trailing = false;
269 			while (true) {
270 				try {
271 					nextChar = theInputStream.read();
272 				} catch (SocketTimeoutException e) {
273 					break;
274 				}
275 
276 				if (nextChar == -1) {
277 					break;
278 				} else if (nextChar == 13) {
279 					if (had13) {
280 						/* 
281 						 * This is an attempt to be tolerant of people using the wrong
282 						 * end of line sequence (it should be CRLF), as is the 
283 						 * had10 below 
284 						 */
285 						trailing = true;
286 					}
287 					had13 = true;
288 					continue;
289 				} else if (nextChar == 10) {
290 					if (had10) {
291 						trailing = true;
292 					}
293 					continue;
294 				} else {
295 					break;
296 				}
297 			}
298 			
299 			if (trailing) {
300 				break;
301 			}
302 
303 		} // while
304 
305 		return bos.toByteArray();
306 	}
307 
308 	private void verifySignature() throws SignatureVerificationException, DecodeException {
309 		if (getSigner() != null && StringUtils.isBlank(mySignature)) {
310 			String mode = (this instanceof Hl7OverHttpRequestDecoder) ? "request" : "response";
311 			throw new SignatureVerificationException("No HL7 Signature found in " + mode);
312 		}
313 		if (getSigner() != null) {
314 			try {
315 				getSigner().verify(myBytes, mySignature);
316 			} catch (SignatureFailureException e) {
317 				throw new DecodeException("Failed to verify signature due to an error (signature may possibly be valid, but verification failed)", e);
318 			}
319 		}
320 	}
321 
322 	public List<String> getConformanceProblems() {
323 		if (myConformanceProblems == null) {
324 			myConformanceProblems = new ArrayList<String>();
325 		}
326 		return myConformanceProblems;
327 	}
328 
329 	/**
330 	 * @return Returns the content type associated with the message (e.g. application/hl7-v2)
331 	 */
332 	public String getContentType() {
333 		return myContentType;
334 	}
335 
336 	/**
337 	 * @return the responseName
338 	 */
339 	public String getResponseName() {
340 		return myResponseName;
341 	}
342 
343 	/**
344 	 * @return the responseStatus
345 	 */
346 	public Integer getResponseStatus() {
347 		return myResponseStatus;
348 	}
349 
350 	protected abstract String readActionLineAndDecode(InputStream theInputStream) throws IOException, NoMessageReceivedException, DecodeException;
351 
352 	private byte[] readBytesNonChunked(InputStream theInputStream) throws IOException {
353 		int length = myContentLength > 0 ? myContentLength : IOUtils.DEFAULT_BUFFER_SIZE;
354 		ByteArrayOutputStream bos = new ByteArrayOutputStream(length);
355 
356 		byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
357 		myLastStartedReading = System.currentTimeMillis();
358 		while ((myContentLength < 0 || bos.size() < myContentLength)) {
359 			if (myContentLength < 0) {
360 				try {
361 					if (theInputStream.available() <= 0) {
362 						break;
363 					}
364 				} catch (IOException e) {
365 					ourLog.debug("Received IOException while calling inputStream#available()", e);
366 					throw e;
367 				}
368 			}
369 
370 			int max = buffer.length;
371 			if (myContentLength > 0) {
372 				max = myContentLength - bos.size();
373 			}
374 			try {
375 				int bytesRead = theInputStream.read(buffer, 0, max);
376 				myLastStartedReading = System.currentTimeMillis();
377 				if (bytesRead == -1) {
378 					break;
379 				}
380 				bos.write(buffer, 0, bytesRead);
381 			} catch (SocketTimeoutException e) {
382 				long elapsed = System.currentTimeMillis() - myLastStartedReading;
383 				if (elapsed > myReadTimeout) {
384 					throw e;
385 				} else {
386 					ourLog.debug("Trying to read for {} / {}ms, going to keep trying", elapsed, myReadTimeout);
387 					try {
388 						Thread.sleep(100);
389 					} catch (InterruptedException e1) {
390 						// ignore
391 					}
392 				}
393 			} catch (IOException e) {
394 				ourLog.debug("Received IOException while calling inputStream#available()", e);
395 				throw e;
396 			}
397 		}
398 
399 		return bos.toByteArray();
400 	}
401 
402 	/**
403 	 * Read in the contents of the raw message from the input stream and decode
404 	 * entire the message. This method assumes that the headers have been
405 	 * provided using {@link #setHeaders(LinkedHashMap)}
406 	 * 
407 	 * @param theInputStream
408 	 *            The inputstream to read the raw message from
409 	 * @throws AuthorizationFailureException
410 	 *             If the authorization check fails. This will only be thrown if
411 	 *             this decoder is decoding a request message, and an
412 	 *             authorization callback has been provided, and the
413 	 *             authorization fails.
414 	 * @throws DecodeException
415 	 *             If the message can not be decoded for any reason
416 	 * @throws IOException
417 	 *             If there is a failure while reading from the inputstream
418 	 * @throws SignatureVerificationException
419 	 *             If the signature verification fails. This will only occur if
420 	 *             {@link #setSigner(ca.uhn.hl7v2.hoh.sign.ISigner) a signer}
421 	 *             has been provided.
422 	 */
423 	public void readContentsFromInputStreamAndDecode(InputStream theInputStream) throws AuthorizationFailureException, DecodeException, IOException, SignatureVerificationException {
424 		verifyNotUsed();
425 
426 		doReadContentsFromInputStreamAndDecode(theInputStream);
427 	}
428 
429 	protected String readFirstLine(InputStream theInputStream) throws IOException, NoMessageReceivedException {
430 		ourLog.trace("Entering readFirstLine(InputStream) with IS: {}", theInputStream);
431 		String retVal = readLine(theInputStream, true);
432 		ourLog.trace("Exiting readFirstLine(InputStream) with result: {}", retVal);
433 		return retVal;
434 	}
435 
436 	/**
437 	 * Note that if {@link #setPath(String)} is called, this method will assume
438 	 * that the first line of the HTTP request has already been read from the
439 	 * input stream. If {@link #setHeaders(java.util.LinkedHashMap)} has been
440 	 * called, this method will assume that the HTTP headers have already been
441 	 * read from the input stream as well as the double-LF (ASCII-10) that
442 	 * proceeds the headers.
443 	 * 
444 	 * 
445 	 * @param theInputStream
446 	 *            The inputstream to read the raw message from
447 	 * @throws AuthorizationFailureException
448 	 *             If the authorization check fails. This will only be thrown if
449 	 *             this decoder is decoding a request message, and an
450 	 *             authorization callback has been provided, and the
451 	 *             authorization fails.
452 	 * @throws DecodeException
453 	 *             If the message can not be decoded for any reason
454 	 * @throws IOException
455 	 *             If there is a failure while reading from the inputstream
456 	 * @throws SignatureVerificationException
457 	 *             If the signature verification fails. This will only occur if
458 	 *             {@link #setSigner(ca.uhn.hl7v2.hoh.sign.ISigner) a signer}
459 	 *             has been provided.
460 	 */
461 	public void readHeadersAndContentsFromInputStreamAndDecode(InputStream theInputStream) throws IOException, DecodeException, NoMessageReceivedException, SignatureVerificationException {
462 		verifyNotUsed();
463 
464 		String actionLine = readActionLineAndDecode(theInputStream);
465 
466 		ourLog.debug("Read action line: {}", actionLine);
467 
468 		if (getHeaders() == null) {
469 			setHeaders(new LinkedHashMap<String, String>());
470 
471 			while (true) {
472 				String nextLine = readLine(theInputStream);
473 				if (nextLine.length() == 0) {
474 					break;
475 				}
476 
477 				int colonIndex = nextLine.indexOf(':');
478 				if (colonIndex == -1) {
479 					throw new DecodeException("Invalid HTTP header line detected. Value is: " + nextLine);
480 				}
481 
482 				String key = nextLine.substring(0, colonIndex);
483 				String value = nextLine.substring(colonIndex + 1).trim();
484 				getHeaders().put(key, value);
485 			}
486 		}
487 
488 		doReadContentsFromInputStreamAndDecode(theInputStream);
489 
490 	}
491 
492 	private String readLine(InputStream theInputStream) throws IOException {
493 		try {
494 			return readLine(theInputStream, false);
495 		} catch (NoMessageReceivedException e) {
496 			throw new Error("Threw a NoMessageReceivedException. This should not happen.", e);
497 		}
498 	}
499 
500 	private String readLine(InputStream theInputStream, boolean theFirstLine) throws IOException, NoMessageReceivedException {
501 		
502 		myLastStartedReading = System.currentTimeMillis();
503 
504 		StringBuilder retVal = new StringBuilder();
505 		while (true) {
506 
507 			int b;
508 			try {
509 				b = theInputStream.read();
510 			} catch (SocketTimeoutException e) {
511 				if (retVal.length() == 0 && theFirstLine) {
512 					ourLog.trace("No message received, aborting readLine(InputStream, boolean)");
513 					throw new NoMessageReceivedException();
514 				}
515 				ourLog.trace("No message received in readLine(InputStream, boolean), going to wait and continue");
516 				pauseDuringTimedOutRead();
517 				continue;
518 			}
519 
520 			if (b == 13) {
521 				continue;
522 			} else if (b == 10) {
523 				break;
524 			} else if (b == -1) {
525 				ourLog.debug("Current read line is: {}", retVal);
526 				ourLog.info("Read -1 from input stream, closing it");
527 				theInputStream.close();
528 				if (retVal.length() == 0) {
529 					throw new SocketException("Received EOF from input stream");
530 				}
531 				break;
532 			} else if (b < ' ') {
533 				continue;
534 			} else {
535 				retVal.append((char) b);
536 			}
537 		}
538 
539 		ourLog.debug("Current read line is: {}", retVal);
540 
541 		return WHITESPACE_PATTERN.matcher(retVal.toString()).replaceAll(" ").trim();
542 	}
543 
544 	private void pauseDuringTimedOutRead() throws SocketTimeoutException {
545 		long elapsed = System.currentTimeMillis() - myLastStartedReading;
546 		if (elapsed > myReadTimeout) {
547 			ourLog.trace("Elapsed time of {} exceeds max {}, throwing SocketTimeoutException", elapsed, myReadTimeout);
548 			throw new SocketTimeoutException();
549 		}
550 		try {
551 			Thread.sleep(100);
552 		} catch (InterruptedException e1) {
553 			// ignore
554 		}
555 	}
556 
557 	/**
558 	 * Sets the number of milliseconds that the decoder will attempt to read
559 	 * from an InputStream before timing out and throwing an exception
560 	 */
561 	public void setReadTimeout(long theReadTimeout) {
562 		myReadTimeout = theReadTimeout;
563 	}
564 
565 	/**
566 	 * @param theResponseName
567 	 *            the responseName to set
568 	 */
569 	public void setResponseName(String theResponseName) {
570 		myResponseName = theResponseName;
571 	}
572 
573 	/**
574 	 * @param theResponseStatus
575 	 *            the responseStatus to set
576 	 */
577 	public void setResponseStatus(Integer theResponseStatus) {
578 		myResponseStatus = theResponseStatus;
579 	}
580 
581 }