View Javadoc

1   /**
2   The contents of this file are subject to the Mozilla Public License Version 1.1 
3   (the "License"); you may not use this file except in compliance with the License. 
4   You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
5   Software distributed under the License is distributed on an "AS IS" basis, 
6   WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
7   specific language governing rights and limitations under the License. 
8   
9   The Original Code is "Initiator.java".  Description: 
10  "Performs the initiation role of a message exchange accorging to HL7's original 
11   mode rules." 
12  
13  The Initial Developer of the Original Code is University Health Network. Copyright (C) 
14  2002.  All Rights Reserved. 
15  
16  Contributor(s): ______________________________________. 
17  
18  Alternatively, the contents of this file may be used under the terms of the 
19  GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
20  applicable instead of those above.  If you wish to allow use of your version of this 
21  file only under the terms of the GPL and not to allow others to use your version 
22  of this file under the MPL, indicate your decision by deleting  the provisions above 
23  and replace  them with the notice and other provisions required by the GPL License.  
24  If you do not delete the provisions above, a recipient may use your version of 
25  this file under either the MPL or the GPL. 
26  
27   */
28  
29  package ca.uhn.hl7v2.app;
30  
31  import java.io.IOException;
32  import java.net.Socket;
33  import java.util.concurrent.ExecutionException;
34  import java.util.concurrent.Future;
35  
36  import org.slf4j.Logger;
37  import org.slf4j.LoggerFactory;
38  
39  import ca.uhn.hl7v2.ErrorCode;
40  import ca.uhn.hl7v2.HL7Exception;
41  import ca.uhn.hl7v2.llp.LLPException;
42  import ca.uhn.hl7v2.llp.LowerLayerProtocol;
43  import ca.uhn.hl7v2.llp.MinLowerLayerProtocol;
44  import ca.uhn.hl7v2.model.Message;
45  import ca.uhn.hl7v2.parser.Parser;
46  import ca.uhn.hl7v2.parser.PipeParser;
47  import ca.uhn.hl7v2.util.Terser;
48  import ca.uhn.hl7v2.util.idgenerator.IDGenerator;
49  import ca.uhn.hl7v2.util.idgenerator.InMemoryIDGenerator;
50  
51  /**
52   * <p>
53   * Performs the initiation role of a message exchange (i.e sender of the first
54   * message; analogous to the client in a client-server interaction), according
55   * to HL7's original mode processing rules.
56   * </p>
57   * <p>
58   * The <code>sendAndReceive(...)</code> method blocks until either a response is
59   * received with the matching message ID, or until a timeout period has passed.
60   * The timeout defaults to 10000 ms (10 sec) but can be configured by setting
61   * the system property "ca.uhn.hl7v2.app.initiator.timeout" to an integer value
62   * representing the number of ms after which to time out.
63   * </p>
64   * <p>
65   * At the time of writing, enhanced mode, two-phase reply, continuation
66   * messages, and batch processing are unsupported.
67   * </p>
68   * 
69   * @author Bryan Tripp
70   */
71  public class Initiator {
72  
73  	private static final Logger log = LoggerFactory.getLogger(Initiator.class);
74  	private static final Logger rawOutbound = LoggerFactory
75  			.getLogger("ca.uhn.hl7v2.raw.outbound");
76  	private static final Logger rawInbound = LoggerFactory
77  			.getLogger("ca.uhn.hl7v2.raw.inbound");
78  	private Connection conn;
79  	private int timeoutMillis = 10000;
80  
81  	/**
82  	 * Creates a new instance of Initiator.
83  	 * 
84  	 * @param conn
85  	 *            the Connection associated with this Initiator.
86  	 */
87  	Initiator(Connection conn) throws LLPException {
88  		this.conn = conn;
89  
90  		// see if timeout has been set
91  		String timeout = System
92  				.getProperty("ca.uhn.hl7v2.app.initiator.timeout");
93  		if (timeout != null) {
94  			try {
95  				timeoutMillis = Integer.parseInt(timeout);
96  				log.debug("Setting Initiator timeout to {} ms", timeout);
97  			} catch (NumberFormatException e) {
98  				log.warn(timeout
99  						+ " is not a valid integer - Initiator is using default timeout");
100 			}
101 		}
102 	}
103 
104 	/**
105 	 * Sends a message to a responder system, receives the reply, and returns
106 	 * the reply as a Message object. This method is thread-safe - multiple
107 	 * threads can share an Initiator and call this method. Responses are
108 	 * returned to the calling thread on the basis of message ID.
109 	 */
110 	public Message sendAndReceive(Message out) throws HL7Exception,
111 			LLPException, IOException {
112 		if (out == null) {
113 			throw new HL7Exception("Can't encode null message",
114 					ErrorCode.REQUIRED_FIELD_MISSING);
115 		}
116 
117 		// register message with response Receiver(s) (by message ID)
118 		Terser t = new Terser(out);
119 		String messID = t.get("/MSH-10");
120 
121 		if (messID == null || messID.length() == 0) {
122 			throw new HL7Exception(
123 					"MSH segment missing required field Control ID (MSH-10)",
124 					ErrorCode.REQUIRED_FIELD_MISSING);
125 		}
126 
127 		// log and send message
128 		String outbound = conn.getParser().encode(out);
129 		rawOutbound.debug(outbound);
130 		Future<String> inbound = null;
131 		try {
132 			String message;
133 			inbound = conn.waitForResponse(messID, timeoutMillis);
134 			conn.getSendWriter().writeMessage(outbound);
135 			if (inbound != null && (message = inbound.get()) != null) {
136 				// log that we got the message
137 				log.debug("Initiator received message: {}", message);
138 				rawInbound.debug(message);
139 				Message response = conn.getParser().parse(message);
140 				log.debug("response parsed");
141 				return response;
142 			}
143 		} catch (IOException e) {
144 			if (inbound != null)
145 				inbound.cancel(true);
146 			conn.close();
147 			throw e;
148 		} catch (InterruptedException e) {
149 		} catch (ExecutionException e) {
150 		}
151 
152 		throw new HL7Exception(
153 				"Timeout waiting for response to message with control ID "
154 						+ messID);
155 	}
156 
157 	/**
158 	 * Sets the time (in milliseconds) that the initiator will wait for a
159 	 * response for a given message before timing out and throwing an exception
160 	 * (default is 10 seconds).
161 	 */
162 	public void setTimeoutMillis(int timeout) {
163 		this.timeoutMillis = timeout;
164 	}
165 
166 	/**
167 	 * Test harness
168 	 */
169 	public static void main(String args[]) {
170 		if (args.length != 2) {
171 			System.out.println("Usage: ca.uhn.hl7v2.app.Initiator host port");
172 		}
173 
174 		try {
175 
176 			// set up connection to server
177 			String host = args[0];
178 			int port = Integer.parseInt(args[1]);
179 
180 			final Parser parser = new PipeParser();
181 			LowerLayerProtocol llp = new MinLowerLayerProtocol();
182 			Connection connection = new Connection(parser, llp, new Socket(
183 					host, port));
184 			final Initiator initiator = connection.getInitiator();
185 			connection.activate();
186 			final String outText = "MSH|^~\\&|||||||ACK^^ACK|||R|2.4|\rMSA|AA";
187 			final IDGenerator generator = new InMemoryIDGenerator();
188 
189 			// get a bunch of threads to send messages
190 			for (int i = 0; i < 1000; i++) {
191 				Thread sender = new Thread(new Runnable() {
192 					
193 					public void run() {
194 						try {
195 							// get message ID
196 							String ID = generator.getID();
197 							Message out = parser.parse(outText);
198 							Terser tOut = new Terser(out);
199 							tOut.set("/MSH-10", ID);
200 
201 							// send, get response
202 							Message in = initiator.sendAndReceive(out);
203 							// get ACK ID
204 							Terser tIn = new Terser(in);
205 							String ackID = tIn.get("/MSA-2");
206 							if (ID.equals(ackID)) {
207 								System.out.println("OK - ack ID matches");
208 							} else {
209 								throw new RuntimeException(
210 										"Ack ID for message " + ID + " is "
211 												+ ackID);
212 							}
213 
214 						} catch (Exception e) {
215 							e.printStackTrace();
216 						}
217 					}
218 				});
219 				sender.start();
220 			}
221 
222 		} catch (Exception e) {
223 			e.printStackTrace();
224 		}
225 	}
226 
227 }