001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.vm; 018 019import java.io.IOException; 020import java.io.InterruptedIOException; 021import java.net.URI; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.LinkedBlockingQueue; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicBoolean; 026import java.util.concurrent.atomic.AtomicLong; 027 028import org.apache.activemq.command.ShutdownInfo; 029import org.apache.activemq.thread.Task; 030import org.apache.activemq.thread.TaskRunner; 031import org.apache.activemq.thread.TaskRunnerFactory; 032import org.apache.activemq.transport.FutureResponse; 033import org.apache.activemq.transport.ResponseCallback; 034import org.apache.activemq.transport.Transport; 035import org.apache.activemq.transport.TransportDisposedIOException; 036import org.apache.activemq.transport.TransportListener; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * A Transport implementation that uses direct method invocations. 042 */ 043public class VMTransport implements Transport, Task { 044 protected static final Logger LOG = LoggerFactory.getLogger(VMTransport.class); 045 046 private static final AtomicLong NEXT_ID = new AtomicLong(0); 047 048 // Transport Configuration 049 protected VMTransport peer; 050 protected TransportListener transportListener; 051 protected boolean marshal; 052 protected boolean async = true; 053 protected int asyncQueueDepth = 2000; 054 protected final URI location; 055 protected final long id; 056 057 // Implementation 058 private volatile LinkedBlockingQueue<Object> messageQueue; 059 private volatile TaskRunnerFactory taskRunnerFactory; 060 private volatile TaskRunner taskRunner; 061 062 // Transport State 063 protected final AtomicBoolean started = new AtomicBoolean(); 064 protected final AtomicBoolean disposed = new AtomicBoolean(); 065 066 private volatile int receiveCounter; 067 068 public VMTransport(URI location) { 069 this.location = location; 070 this.id = NEXT_ID.getAndIncrement(); 071 } 072 073 public void setPeer(VMTransport peer) { 074 this.peer = peer; 075 } 076 077 @Override 078 public void oneway(Object command) throws IOException { 079 080 if (disposed.get()) { 081 throw new TransportDisposedIOException("Transport disposed."); 082 } 083 084 if (peer == null) { 085 throw new IOException("Peer not connected."); 086 } 087 088 try { 089 090 if (peer.disposed.get()) { 091 throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."); 092 } 093 094 if (peer.async) { 095 peer.getMessageQueue().put(command); 096 peer.wakeup(); 097 return; 098 } 099 100 if (!peer.started.get()) { 101 LinkedBlockingQueue<Object> pending = peer.getMessageQueue(); 102 int sleepTimeMillis; 103 boolean accepted = false; 104 do { 105 sleepTimeMillis = 0; 106 // the pending queue is drained on start so we need to ensure we add before 107 // the drain commences, otherwise we never get the command dispatched! 108 synchronized (peer.started) { 109 if (!peer.started.get()) { 110 accepted = pending.offer(command); 111 if (!accepted) { 112 sleepTimeMillis = 500; 113 } 114 } 115 } 116 // give start thread a chance if we will loop 117 TimeUnit.MILLISECONDS.sleep(sleepTimeMillis); 118 119 } while (!accepted && !peer.started.get()); 120 if (accepted) { 121 return; 122 } 123 } 124 } catch (InterruptedException e) { 125 Thread.currentThread().interrupt(); 126 InterruptedIOException iioe = new InterruptedIOException(e.getMessage()); 127 iioe.initCause(e); 128 throw iioe; 129 } 130 131 dispatch(peer, peer.messageQueue, command); 132 } 133 134 public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) { 135 TransportListener transportListener = transport.getTransportListener(); 136 if (transportListener != null) { 137 // Lock here on the target transport's started since we want to wait for its start() 138 // method to finish dispatching out of the queue before we do our own. 139 synchronized (transport.started) { 140 141 // Ensure that no additional commands entered the queue in the small time window 142 // before the start method locks the dispatch lock and the oneway method was in 143 // an put operation. 144 while(pending != null && !pending.isEmpty() && !transport.isDisposed()) { 145 doDispatch(transport, transportListener, pending.poll()); 146 } 147 148 // We are now in sync mode and won't enqueue any more commands to the target 149 // transport so lets clean up its resources. 150 transport.messageQueue = null; 151 152 // Don't dispatch if either end was disposed already. 153 if (command != null && !this.disposed.get() && !transport.isDisposed()) { 154 doDispatch(transport, transportListener, command); 155 } 156 } 157 } 158 } 159 160 public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) { 161 transport.receiveCounter++; 162 transportListener.onCommand(command); 163 } 164 165 @Override 166 public void start() throws Exception { 167 168 if (transportListener == null) { 169 throw new IOException("TransportListener not set."); 170 } 171 172 // If we are not in async mode we lock the dispatch lock here and then start to 173 // prevent any sync dispatches from occurring until we dispatch the pending messages 174 // to maintain delivery order. When async this happens automatically so just set 175 // started and wakeup the task runner. 176 if (!async) { 177 synchronized (started) { 178 if (started.compareAndSet(false, true)) { 179 LinkedBlockingQueue<Object> mq = getMessageQueue(); 180 Object command; 181 while ((command = mq.poll()) != null && !disposed.get() ) { 182 receiveCounter++; 183 doDispatch(this, transportListener, command); 184 } 185 } 186 } 187 } else { 188 if (started.compareAndSet(false, true)) { 189 wakeup(); 190 } 191 } 192 } 193 194 @Override 195 public void stop() throws Exception { 196 // Only need to do this once, all future oneway calls will now 197 // fail as will any asnyc jobs in the task runner. 198 if (disposed.compareAndSet(false, true)) { 199 200 TaskRunner tr = taskRunner; 201 LinkedBlockingQueue<Object> mq = this.messageQueue; 202 203 taskRunner = null; 204 messageQueue = null; 205 206 if (mq != null) { 207 mq.clear(); 208 } 209 210 // don't wait for completion 211 if (tr != null) { 212 try { 213 tr.shutdown(1); 214 } catch(Exception e) { 215 } 216 tr = null; 217 } 218 219 if (peer.transportListener != null) { 220 // let the peer know that we are disconnecting after attempting 221 // to cleanly shutdown the async tasks so that this is the last 222 // command it see's. 223 try { 224 peer.transportListener.onCommand(new ShutdownInfo()); 225 } catch (Exception ignore) { 226 } 227 228 // let any requests pending a response see an exception 229 try { 230 peer.transportListener.onException(new TransportDisposedIOException("peer (" + this + ") stopped.")); 231 } catch (Exception ignore) { 232 } 233 } 234 235 // shutdown task runner factory 236 if (taskRunnerFactory != null) { 237 taskRunnerFactory.shutdownNow(); 238 taskRunnerFactory = null; 239 } 240 } 241 } 242 243 protected void wakeup() { 244 if (async && started.get()) { 245 try { 246 getTaskRunner().wakeup(); 247 } catch (InterruptedException e) { 248 Thread.currentThread().interrupt(); 249 } catch (TransportDisposedIOException e) { 250 } 251 } 252 } 253 254 /** 255 * @see org.apache.activemq.thread.Task#iterate() 256 */ 257 @Override 258 public boolean iterate() { 259 260 final TransportListener tl = transportListener; 261 262 LinkedBlockingQueue<Object> mq; 263 try { 264 mq = getMessageQueue(); 265 } catch (TransportDisposedIOException e) { 266 return false; 267 } 268 269 Object command = mq.poll(); 270 if (command != null && !disposed.get()) { 271 tl.onCommand(command); 272 return !mq.isEmpty() && !disposed.get(); 273 } else { 274 if(disposed.get()) { 275 mq.clear(); 276 } 277 return false; 278 } 279 } 280 281 @Override 282 public void setTransportListener(TransportListener commandListener) { 283 this.transportListener = commandListener; 284 } 285 286 public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException { 287 LinkedBlockingQueue<Object> result = messageQueue; 288 if (result == null) { 289 synchronized (this) { 290 result = messageQueue; 291 if (result == null) { 292 if (disposed.get()) { 293 throw new TransportDisposedIOException("The Transport has been disposed"); 294 } 295 296 messageQueue = result = new LinkedBlockingQueue<Object>(this.asyncQueueDepth); 297 } 298 } 299 } 300 return result; 301 } 302 303 protected TaskRunner getTaskRunner() throws TransportDisposedIOException { 304 TaskRunner result = taskRunner; 305 if (result == null) { 306 synchronized (this) { 307 result = taskRunner; 308 if (result == null) { 309 if (disposed.get()) { 310 throw new TransportDisposedIOException("The Transport has been disposed"); 311 } 312 313 String name = "ActiveMQ VMTransport: " + toString(); 314 if (taskRunnerFactory == null) { 315 taskRunnerFactory = new TaskRunnerFactory(name); 316 taskRunnerFactory.init(); 317 } 318 taskRunner = result = taskRunnerFactory.createTaskRunner(this, name); 319 } 320 } 321 } 322 return result; 323 } 324 325 @Override 326 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 327 throw new AssertionError("Unsupported Method"); 328 } 329 330 @Override 331 public Object request(Object command) throws IOException { 332 throw new AssertionError("Unsupported Method"); 333 } 334 335 @Override 336 public Object request(Object command, int timeout) throws IOException { 337 throw new AssertionError("Unsupported Method"); 338 } 339 340 @Override 341 public TransportListener getTransportListener() { 342 return transportListener; 343 } 344 345 @Override 346 public <T> T narrow(Class<T> target) { 347 if (target.isAssignableFrom(getClass())) { 348 return target.cast(this); 349 } 350 return null; 351 } 352 353 public boolean isMarshal() { 354 return marshal; 355 } 356 357 public void setMarshal(boolean marshal) { 358 this.marshal = marshal; 359 } 360 361 @Override 362 public String toString() { 363 return location + "#" + id; 364 } 365 366 @Override 367 public String getRemoteAddress() { 368 if (peer != null) { 369 return peer.toString(); 370 } 371 return null; 372 } 373 374 /** 375 * @return the async 376 */ 377 public boolean isAsync() { 378 return async; 379 } 380 381 /** 382 * @param async the async to set 383 */ 384 public void setAsync(boolean async) { 385 this.async = async; 386 } 387 388 /** 389 * @return the asyncQueueDepth 390 */ 391 public int getAsyncQueueDepth() { 392 return asyncQueueDepth; 393 } 394 395 /** 396 * @param asyncQueueDepth the asyncQueueDepth to set 397 */ 398 public void setAsyncQueueDepth(int asyncQueueDepth) { 399 this.asyncQueueDepth = asyncQueueDepth; 400 } 401 402 @Override 403 public boolean isFaultTolerant() { 404 return false; 405 } 406 407 @Override 408 public boolean isDisposed() { 409 return disposed.get(); 410 } 411 412 @Override 413 public boolean isConnected() { 414 return !disposed.get(); 415 } 416 417 @Override 418 public void reconnect(URI uri) throws IOException { 419 throw new IOException("Transport reconnect is not supported"); 420 } 421 422 @Override 423 public boolean isReconnectSupported() { 424 return false; 425 } 426 427 @Override 428 public boolean isUpdateURIsSupported() { 429 return false; 430 } 431 432 @Override 433 public void updateURIs(boolean reblance,URI[] uris) throws IOException { 434 throw new IOException("URI update feature not supported"); 435 } 436 437 @Override 438 public int getReceiveCounter() { 439 return receiveCounter; 440 } 441}