View Javadoc

1   package ca.uhn.hl7v2.concurrent;
2   
3   import java.util.Collection;
4   import java.util.Map;
5   import java.util.Set;
6   import java.util.concurrent.Callable;
7   import java.util.concurrent.ConcurrentHashMap;
8   import java.util.concurrent.ConcurrentMap;
9   import java.util.concurrent.CountDownLatch;
10  import java.util.concurrent.ExecutorService;
11  import java.util.concurrent.Executors;
12  import java.util.concurrent.Future;
13  import java.util.concurrent.TimeUnit;
14  
15  /**
16   * Default Implementation of a {@link BlockingMap}.
17   * <p>
18   * Note: While it is not actively prevented that more then one thread waits for
19   * an entry, it is not guaranteed that all waiting threads will receive the
20   * entry once it became available. Other implementations may choose to count the
21   * waiting threads and/or to remove an available value after a grace period.
22   * 
23   * @param <K>
24   * @param <V>
25   */
26  public class BlockingHashMap<K, V> implements BlockingMap<K, V> {
27  
28  	private final ConcurrentMap<K, V> map = new ConcurrentHashMap<K, V>();
29  	private final ConcurrentMap<K, CountDownLatch> latches = new ConcurrentHashMap<K, CountDownLatch>();
30  	private final ExecutorService executor;
31  	
32  	public BlockingHashMap() {
33  		this(Executors.newCachedThreadPool());
34  	}
35  	
36  	public BlockingHashMap(ExecutorService executor) {
37  		super();
38  		this.executor = executor;
39  	}
40  
41  	/**
42  	 * Returns the keys of available entries
43  	 * 
44  	 * @see java.util.Map#keySet()
45  	 */
46  	public Set<K> keySet() {
47  		return map.keySet();
48  	}
49  
50  	/**
51  	 * Returns an available entry without removing it from the map
52  	 * 
53  	 * @see java.util.Map#get(java.lang.Object)
54  	 */
55  	public V get(Object key) {
56  		return map.get(key);
57  	}
58  
59  	/**
60  	 * Returns <code>true</code> if an entry with the given key is available
61  	 * 
62  	 * @see java.util.Map#containsKey(java.lang.Object)
63  	 */
64  	public boolean containsKey(Object key) {
65  		return map.containsKey(key);
66  	}
67  
68  	/**
69  	 * @see java.util.Map#put(java.lang.Object, java.lang.Object)
70  	 */
71  	synchronized public V put(K key, V value) {
72  		V result = map.put(key, value);
73  		latchFor(key).countDown();
74  		return result;
75  	}
76  
77  	/**
78  	 * @see ca.uhn.hl7v2.concurrent.BlockingMap#give(java.lang.Object,
79  	 *      java.lang.Object)
80  	 */
81  	synchronized public boolean give(K key, V value) {
82  		if (!latches.containsKey(key)) {
83  			return false;
84  		}
85  		put(key, value);
86  		return true;
87  	}
88  
89  	public V take(K key) throws InterruptedException {
90  		latchFor(key).await();
91  		latches.remove(key);
92  		return map.remove(key); // likely to fail there are n > 1 consumers
93  	}
94  	
95  
96  	public Future<V> asyncTake(final K key) throws InterruptedException {
97  		latchFor(key);
98  		return executor.submit(new Callable<V>() {
99  
100 			public V call() throws Exception {
101 				return take(key);
102 			}
103 		});
104 	}
105 
106 	public V poll(K key, long timeout, TimeUnit unit)
107 			throws InterruptedException {
108 		if (latchFor(key).await(timeout, unit)) {
109 			latches.remove(key);
110 			return map.remove(key);
111 		}
112 		return null;
113 	}
114 	
115 	public Future<V> asyncPoll(final K key, final long timeout, final TimeUnit unit) {
116 		latchFor(key);
117 		return executor.submit(new Callable<V>() {
118 
119 			public V call() throws Exception {
120 				return poll(key, timeout, unit);
121 			}
122 		});		
123 	}
124 	
125 
126 	/**
127 	 * Returns true if no entry is available for consumers
128 	 * 
129 	 * @see java.util.Map#isEmpty()
130 	 */
131 	public boolean isEmpty() {
132 		return map.isEmpty();
133 	}
134 
135 	/**
136 	 * Returns the number of available values
137 	 * 
138 	 * @see java.util.Map#size()
139 	 */
140 	public int size() {
141 		return map.size();
142 	}
143 
144 	/**
145 	 * Removes an entry, regardless whether a value has been set or not. Waiting
146 	 * consumers will receive a null value.
147 	 * 
148 	 * @see java.util.Map#remove(java.lang.Object)
149 	 */
150 	synchronized public V remove(Object key) {
151 		V result = map.remove(key);
152 		CountDownLatch latch = latches.remove(key);
153 		if (latch != null)
154 			latch.countDown();
155 		return result;
156 	}
157 
158 	/**
159 	 * Clears all existing entries. Waiting consumers will receive a null value
160 	 * for each removed entry.
161 	 * 
162 	 * @see java.util.Map#clear()
163 	 */
164 	public void clear() {
165 		for (K key : latches.keySet()) {
166 			remove(key);
167 		}
168 	}
169 
170 	public Collection<V> values() {
171 		return map.values();
172 	}
173 
174 	public Set<java.util.Map.Entry<K, V>> entrySet() {
175 		return map.entrySet();
176 	}
177 
178 	public void putAll(Map<? extends K, ? extends V> t) {
179 		for (Entry<? extends K, ? extends V> entry : t.entrySet()) {
180 			put(entry.getKey(), entry.getValue());
181 		}
182 	}
183 
184 	public boolean containsValue(Object value) {
185 		return map.containsValue(value);
186 	}
187 
188 	private synchronized CountDownLatch latchFor(K key) {
189 		CountDownLatch latch = latches.get(key);
190 		if (latch == null) {
191 			latch = new CountDownLatch(1);
192 			latches.put(key, latch);
193 		}
194 		return latch;
195 	}
196 
197 }