001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.failover; 018 019import java.io.BufferedReader; 020import java.io.FileReader; 021import java.io.IOException; 022import java.io.InputStreamReader; 023import java.io.InterruptedIOException; 024import java.net.InetAddress; 025import java.net.MalformedURLException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.URL; 029import java.util.ArrayList; 030import java.util.Collections; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.LinkedHashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.StringTokenizer; 037import java.util.concurrent.CopyOnWriteArrayList; 038import java.util.concurrent.atomic.AtomicReference; 039 040import org.apache.activemq.broker.SslContext; 041import org.apache.activemq.command.Command; 042import org.apache.activemq.command.ConnectionControl; 043import org.apache.activemq.command.ConsumerControl; 044import org.apache.activemq.command.ConnectionId; 045import org.apache.activemq.command.MessageDispatch; 046import org.apache.activemq.command.MessagePull; 047import org.apache.activemq.command.RemoveInfo; 048import org.apache.activemq.command.Response; 049 050import org.apache.activemq.state.ConnectionStateTracker; 051import org.apache.activemq.state.Tracked; 052import org.apache.activemq.thread.Task; 053import org.apache.activemq.thread.TaskRunner; 054import org.apache.activemq.thread.TaskRunnerFactory; 055import org.apache.activemq.transport.CompositeTransport; 056import org.apache.activemq.transport.DefaultTransportListener; 057import org.apache.activemq.transport.FutureResponse; 058import org.apache.activemq.transport.ResponseCallback; 059import org.apache.activemq.transport.Transport; 060import org.apache.activemq.transport.TransportFactory; 061import org.apache.activemq.transport.TransportListener; 062import org.apache.activemq.util.IOExceptionSupport; 063import org.apache.activemq.util.ServiceSupport; 064import org.apache.activemq.util.URISupport; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068/** 069 * A Transport that is made reliable by being able to fail over to another 070 * transport when a transport failure is detected. 071 */ 072public class FailoverTransport implements CompositeTransport { 073 074 private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class); 075 private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10; 076 private static final int INFINITE = -1; 077 private TransportListener transportListener; 078 private boolean disposed; 079 private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>(); 080 private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>(); 081 082 private final Object reconnectMutex = new Object(); 083 private final Object backupMutex = new Object(); 084 private final Object sleepMutex = new Object(); 085 private final Object listenerMutex = new Object(); 086 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); 087 private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>(); 088 089 private URI connectedTransportURI; 090 private URI failedConnectTransportURI; 091 private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>(); 092 private final TaskRunnerFactory reconnectTaskFactory; 093 private final TaskRunner reconnectTask; 094 private boolean started; 095 private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; 096 private long maxReconnectDelay = 1000 * 30; 097 private double backOffMultiplier = 2d; 098 private long timeout = INFINITE; 099 private boolean useExponentialBackOff = true; 100 private boolean randomize = true; 101 private int maxReconnectAttempts = INFINITE; 102 private int startupMaxReconnectAttempts = INFINITE; 103 private int connectFailures; 104 private int warnAfterReconnectAttempts = 10; 105 private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; 106 private Exception connectionFailure; 107 private boolean firstConnection = true; 108 // optionally always have a backup created 109 private boolean backup = false; 110 private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>(); 111 private int backupPoolSize = 1; 112 private boolean trackMessages = false; 113 private boolean trackTransactionProducers = true; 114 private int maxCacheSize = 128 * 1024; 115 private final TransportListener disposedListener = new DefaultTransportListener() { 116 }; 117 private final TransportListener myTransportListener = createTransportListener(); 118 private boolean updateURIsSupported = true; 119 private boolean reconnectSupported = true; 120 // remember for reconnect thread 121 private SslContext brokerSslContext; 122 private String updateURIsURL = null; 123 private boolean rebalanceUpdateURIs = true; 124 private boolean doRebalance = false; 125 private boolean connectedToPriority = false; 126 127 private boolean priorityBackup = false; 128 private final ArrayList<URI> priorityList = new ArrayList<URI>(); 129 private boolean priorityBackupAvailable = false; 130 private String nestedExtraQueryOptions; 131 private boolean shuttingDown = false; 132 133 public FailoverTransport() { 134 brokerSslContext = SslContext.getCurrentSslContext(); 135 stateTracker.setTrackTransactions(true); 136 // Setup a task that is used to reconnect the a connection async. 137 reconnectTaskFactory = new TaskRunnerFactory(); 138 reconnectTaskFactory.init(); 139 reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() { 140 @Override 141 public boolean iterate() { 142 boolean result = false; 143 if (!started) { 144 return result; 145 } 146 boolean buildBackup = true; 147 synchronized (backupMutex) { 148 if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) { 149 result = doReconnect(); 150 buildBackup = false; 151 } 152 } 153 if (buildBackup) { 154 buildBackups(); 155 if (priorityBackup && !connectedToPriority) { 156 try { 157 doDelay(); 158 if (reconnectTask == null) { 159 return true; 160 } 161 reconnectTask.wakeup(); 162 } catch (InterruptedException e) { 163 LOG.debug("Reconnect task has been interrupted.", e); 164 } 165 } 166 } else { 167 // build backups on the next iteration 168 buildBackup = true; 169 try { 170 if (reconnectTask == null) { 171 return true; 172 } 173 reconnectTask.wakeup(); 174 } catch (InterruptedException e) { 175 LOG.debug("Reconnect task has been interrupted.", e); 176 } 177 } 178 return result; 179 } 180 181 }, "ActiveMQ Failover Worker: " + System.identityHashCode(this)); 182 } 183 184 TransportListener createTransportListener() { 185 return new TransportListener() { 186 @Override 187 public void onCommand(Object o) { 188 Command command = (Command) o; 189 if (command == null) { 190 return; 191 } 192 if (command.isResponse()) { 193 Object object = null; 194 synchronized (requestMap) { 195 object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId())); 196 } 197 if (object != null && object.getClass() == Tracked.class) { 198 ((Tracked) object).onResponses(command); 199 } 200 } 201 202 if (command.isConnectionControl()) { 203 handleConnectionControl((ConnectionControl) command); 204 } 205 else if (command.isConsumerControl()) { 206 ConsumerControl consumerControl = (ConsumerControl)command; 207 if (consumerControl.isClose()) { 208 stateTracker.processRemoveConsumer(consumerControl.getConsumerId(), RemoveInfo.LAST_DELIVERED_UNKNOWN); 209 } 210 211 } 212 if (transportListener != null) { 213 transportListener.onCommand(command); 214 } 215 } 216 217 @Override 218 public void onException(IOException error) { 219 try { 220 handleTransportFailure(error); 221 } catch (InterruptedException e) { 222 Thread.currentThread().interrupt(); 223 transportListener.onException(new InterruptedIOException()); 224 } 225 } 226 227 @Override 228 public void transportInterupted() { 229 if (transportListener != null) { 230 transportListener.transportInterupted(); 231 } 232 } 233 234 @Override 235 public void transportResumed() { 236 if (transportListener != null) { 237 transportListener.transportResumed(); 238 } 239 } 240 }; 241 } 242 243 public final void disposeTransport(Transport transport) { 244 transport.setTransportListener(disposedListener); 245 ServiceSupport.dispose(transport); 246 } 247 248 public final void handleTransportFailure(IOException e) throws InterruptedException { 249 if (shuttingDown) { 250 // shutdown info sent and remote socket closed and we see that before a local close 251 // let the close do the work 252 return; 253 } 254 255 if (LOG.isTraceEnabled()) { 256 LOG.trace(this + " handleTransportFailure: " + e, e); 257 } 258 259 // could be blocked in write with the reconnectMutex held, but still needs to be whacked 260 Transport transport = connectedTransport.getAndSet(null); 261 if (transport != null) { 262 disposeTransport(transport); 263 } 264 265 synchronized (reconnectMutex) { 266 if (transport != null && connectedTransport.get() == null) { 267 268 boolean reconnectOk = false; 269 270 if (canReconnect()) { 271 reconnectOk = true; 272 } 273 LOG.warn("Transport (" + connectedTransportURI + ") failed" 274 + (reconnectOk ? "," : ", not") + " attempting to automatically reconnect", e); 275 276 failedConnectTransportURI = connectedTransportURI; 277 connectedTransportURI = null; 278 connectedToPriority = false; 279 280 if (reconnectOk) { 281 // notify before any reconnect attempt so ack state can be whacked 282 if (transportListener != null) { 283 transportListener.transportInterupted(); 284 } 285 286 updated.remove(failedConnectTransportURI); 287 reconnectTask.wakeup(); 288 } else if (!isDisposed()) { 289 propagateFailureToExceptionListener(e); 290 } 291 } 292 } 293 } 294 295 private boolean canReconnect() { 296 return started && 0 != calculateReconnectAttemptLimit(); 297 } 298 299 public final void handleConnectionControl(ConnectionControl control) { 300 String reconnectStr = control.getReconnectTo(); 301 if (LOG.isTraceEnabled()) { 302 LOG.trace("Received ConnectionControl: {}", control); 303 } 304 305 if (reconnectStr != null) { 306 reconnectStr = reconnectStr.trim(); 307 if (reconnectStr.length() > 0) { 308 try { 309 URI uri = new URI(reconnectStr); 310 if (isReconnectSupported()) { 311 reconnect(uri); 312 LOG.info("Reconnected to: " + uri); 313 } 314 } catch (Exception e) { 315 LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e); 316 } 317 } 318 } 319 processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers()); 320 } 321 322 private final void processNewTransports(boolean rebalance, String newTransports) { 323 if (newTransports != null) { 324 newTransports = newTransports.trim(); 325 if (newTransports.length() > 0 && isUpdateURIsSupported()) { 326 List<URI> list = new ArrayList<URI>(); 327 StringTokenizer tokenizer = new StringTokenizer(newTransports, ","); 328 while (tokenizer.hasMoreTokens()) { 329 String str = tokenizer.nextToken(); 330 try { 331 URI uri = new URI(str); 332 list.add(uri); 333 } catch (Exception e) { 334 LOG.error("Failed to parse broker address: " + str, e); 335 } 336 } 337 if (list.isEmpty() == false) { 338 try { 339 updateURIs(rebalance, list.toArray(new URI[list.size()])); 340 } catch (IOException e) { 341 LOG.error("Failed to update transport URI's from: " + newTransports, e); 342 } 343 } 344 } 345 } 346 } 347 348 @Override 349 public void start() throws Exception { 350 synchronized (reconnectMutex) { 351 if (LOG.isDebugEnabled()) { 352 LOG.debug("Started " + this); 353 } 354 if (started) { 355 return; 356 } 357 started = true; 358 stateTracker.setMaxCacheSize(getMaxCacheSize()); 359 stateTracker.setTrackMessages(isTrackMessages()); 360 stateTracker.setTrackTransactionProducers(isTrackTransactionProducers()); 361 if (connectedTransport.get() != null) { 362 stateTracker.restore(connectedTransport.get()); 363 } else { 364 reconnect(false); 365 } 366 } 367 } 368 369 @Override 370 public void stop() throws Exception { 371 Transport transportToStop = null; 372 List<Transport> backupsToStop = new ArrayList<Transport>(backups.size()); 373 374 try { 375 synchronized (reconnectMutex) { 376 if (LOG.isDebugEnabled()) { 377 LOG.debug("Stopped " + this); 378 } 379 if (!started) { 380 return; 381 } 382 started = false; 383 disposed = true; 384 385 if (connectedTransport.get() != null) { 386 transportToStop = connectedTransport.getAndSet(null); 387 } 388 reconnectMutex.notifyAll(); 389 } 390 synchronized (sleepMutex) { 391 sleepMutex.notifyAll(); 392 } 393 } finally { 394 reconnectTask.shutdown(); 395 reconnectTaskFactory.shutdownNow(); 396 } 397 398 synchronized(backupMutex) { 399 for (BackupTransport backup : backups) { 400 backup.setDisposed(true); 401 Transport transport = backup.getTransport(); 402 if (transport != null) { 403 transport.setTransportListener(disposedListener); 404 backupsToStop.add(transport); 405 } 406 } 407 backups.clear(); 408 } 409 for (Transport transport : backupsToStop) { 410 try { 411 if (LOG.isTraceEnabled()) { 412 LOG.trace("Stopped backup: " + transport); 413 } 414 disposeTransport(transport); 415 } catch (Exception e) { 416 } 417 } 418 if (transportToStop != null) { 419 transportToStop.stop(); 420 } 421 } 422 423 public long getInitialReconnectDelay() { 424 return initialReconnectDelay; 425 } 426 427 public void setInitialReconnectDelay(long initialReconnectDelay) { 428 this.initialReconnectDelay = initialReconnectDelay; 429 } 430 431 public long getMaxReconnectDelay() { 432 return maxReconnectDelay; 433 } 434 435 public void setMaxReconnectDelay(long maxReconnectDelay) { 436 this.maxReconnectDelay = maxReconnectDelay; 437 } 438 439 public long getReconnectDelay() { 440 return reconnectDelay; 441 } 442 443 public void setReconnectDelay(long reconnectDelay) { 444 this.reconnectDelay = reconnectDelay; 445 } 446 447 public double getReconnectDelayExponent() { 448 return backOffMultiplier; 449 } 450 451 public void setReconnectDelayExponent(double reconnectDelayExponent) { 452 this.backOffMultiplier = reconnectDelayExponent; 453 } 454 455 public Transport getConnectedTransport() { 456 return connectedTransport.get(); 457 } 458 459 public URI getConnectedTransportURI() { 460 return connectedTransportURI; 461 } 462 463 public int getMaxReconnectAttempts() { 464 return maxReconnectAttempts; 465 } 466 467 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 468 this.maxReconnectAttempts = maxReconnectAttempts; 469 } 470 471 public int getStartupMaxReconnectAttempts() { 472 return this.startupMaxReconnectAttempts; 473 } 474 475 public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) { 476 this.startupMaxReconnectAttempts = startupMaxReconnectAttempts; 477 } 478 479 public long getTimeout() { 480 return timeout; 481 } 482 483 public void setTimeout(long timeout) { 484 this.timeout = timeout; 485 } 486 487 /** 488 * @return Returns the randomize. 489 */ 490 public boolean isRandomize() { 491 return randomize; 492 } 493 494 /** 495 * @param randomize The randomize to set. 496 */ 497 public void setRandomize(boolean randomize) { 498 this.randomize = randomize; 499 } 500 501 public boolean isBackup() { 502 return backup; 503 } 504 505 public void setBackup(boolean backup) { 506 this.backup = backup; 507 } 508 509 public int getBackupPoolSize() { 510 return backupPoolSize; 511 } 512 513 public void setBackupPoolSize(int backupPoolSize) { 514 this.backupPoolSize = backupPoolSize; 515 } 516 517 public int getCurrentBackups() { 518 return this.backups.size(); 519 } 520 521 public boolean isTrackMessages() { 522 return trackMessages; 523 } 524 525 public void setTrackMessages(boolean trackMessages) { 526 this.trackMessages = trackMessages; 527 } 528 529 public boolean isTrackTransactionProducers() { 530 return this.trackTransactionProducers; 531 } 532 533 public void setTrackTransactionProducers(boolean trackTransactionProducers) { 534 this.trackTransactionProducers = trackTransactionProducers; 535 } 536 537 public int getMaxCacheSize() { 538 return maxCacheSize; 539 } 540 541 public void setMaxCacheSize(int maxCacheSize) { 542 this.maxCacheSize = maxCacheSize; 543 } 544 545 public boolean isPriorityBackup() { 546 return priorityBackup; 547 } 548 549 public void setPriorityBackup(boolean priorityBackup) { 550 this.priorityBackup = priorityBackup; 551 } 552 553 public void setPriorityURIs(String priorityURIs) { 554 StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ","); 555 while (tokenizer.hasMoreTokens()) { 556 String str = tokenizer.nextToken(); 557 try { 558 URI uri = new URI(str); 559 priorityList.add(uri); 560 } catch (Exception e) { 561 LOG.error("Failed to parse broker address: " + str, e); 562 } 563 } 564 } 565 566 @Override 567 public void oneway(Object o) throws IOException { 568 569 Command command = (Command) o; 570 Exception error = null; 571 try { 572 573 synchronized (reconnectMutex) { 574 575 if (command != null && connectedTransport.get() == null) { 576 if (command.isShutdownInfo()) { 577 // Skipping send of ShutdownInfo command when not connected. 578 return; 579 } else if (command instanceof RemoveInfo || command.isMessageAck()) { 580 // Simulate response to RemoveInfo command or MessageAck (as it will be stale) 581 stateTracker.track(command); 582 if (command.isResponseRequired()) { 583 Response response = new Response(); 584 response.setCorrelationId(command.getCommandId()); 585 myTransportListener.onCommand(response); 586 } 587 return; 588 } else if (command instanceof MessagePull) { 589 // Simulate response to MessagePull if timed as we can't honor that now. 590 MessagePull pullRequest = (MessagePull) command; 591 if (pullRequest.getTimeout() != 0) { 592 MessageDispatch dispatch = new MessageDispatch(); 593 dispatch.setConsumerId(pullRequest.getConsumerId()); 594 dispatch.setDestination(pullRequest.getDestination()); 595 myTransportListener.onCommand(dispatch); 596 } 597 return; 598 } 599 } 600 601 // Keep trying until the message is sent. 602 for (int i = 0; !disposed; i++) { 603 try { 604 605 // Wait for transport to be connected. 606 Transport transport = connectedTransport.get(); 607 long start = System.currentTimeMillis(); 608 boolean timedout = false; 609 while (transport == null && !disposed && connectionFailure == null 610 && !Thread.currentThread().isInterrupted() && willReconnect()) { 611 if (LOG.isTraceEnabled()) { 612 LOG.trace("Waiting for transport to reconnect..: " + command); 613 } 614 long end = System.currentTimeMillis(); 615 if (command.isMessage() && timeout > 0 && (end - start > timeout)) { 616 timedout = true; 617 if (LOG.isInfoEnabled()) { 618 LOG.info("Failover timed out after " + (end - start) + "ms"); 619 } 620 break; 621 } 622 try { 623 reconnectMutex.wait(100); 624 } catch (InterruptedException e) { 625 Thread.currentThread().interrupt(); 626 if (LOG.isDebugEnabled()) { 627 LOG.debug("Interupted: " + e, e); 628 } 629 } 630 transport = connectedTransport.get(); 631 } 632 633 if (transport == null) { 634 // Previous loop may have exited due to use being 635 // disposed. 636 if (disposed) { 637 error = new IOException("Transport disposed."); 638 } else if (connectionFailure != null) { 639 error = connectionFailure; 640 } else if (timedout == true) { 641 error = new IOException("Failover timeout of " + timeout + " ms reached."); 642 } else if (!willReconnect()) { 643 error = new IOException("Reconnect attempts of " + maxReconnectAttempts + " exceeded"); 644 } else { 645 error = new IOException("Unexpected failure."); 646 } 647 break; 648 } 649 650 Tracked tracked = null; 651 try { 652 tracked = stateTracker.track(command); 653 } catch (IOException ioe) { 654 LOG.debug("Cannot track the command " + command, ioe); 655 } 656 // If it was a request and it was not being tracked by 657 // the state tracker, 658 // then hold it in the requestMap so that we can replay 659 // it later. 660 synchronized (requestMap) { 661 if (tracked != null && tracked.isWaitingForResponse()) { 662 requestMap.put(Integer.valueOf(command.getCommandId()), tracked); 663 } else if (tracked == null && command.isResponseRequired()) { 664 requestMap.put(Integer.valueOf(command.getCommandId()), command); 665 } 666 } 667 668 // Send the message. 669 try { 670 transport.oneway(command); 671 stateTracker.trackBack(command); 672 if (command.isShutdownInfo()) { 673 shuttingDown = true; 674 } 675 } catch (IOException e) { 676 677 // If the command was not tracked.. we will retry in 678 // this method 679 if (tracked == null && canReconnect()) { 680 681 // since we will retry in this method.. take it 682 // out of the request 683 // map so that it is not sent 2 times on 684 // recovery 685 if (command.isResponseRequired()) { 686 requestMap.remove(Integer.valueOf(command.getCommandId())); 687 } 688 689 // Rethrow the exception so it will handled by 690 // the outer catch 691 throw e; 692 } else { 693 // Handle the error but allow the method to return since the 694 // tracked commands are replayed on reconnect. 695 if (LOG.isDebugEnabled()) { 696 LOG.debug("Send oneway attempt: " + i + " failed for command:" + command); 697 } 698 handleTransportFailure(e); 699 } 700 } 701 702 return; 703 704 } catch (IOException e) { 705 if (LOG.isDebugEnabled()) { 706 LOG.debug("Send oneway attempt: " + i + " failed for command:" + command); 707 } 708 handleTransportFailure(e); 709 } 710 } 711 } 712 } catch (InterruptedException e) { 713 // Some one may be trying to stop our thread. 714 Thread.currentThread().interrupt(); 715 throw new InterruptedIOException(); 716 } 717 718 if (!disposed) { 719 if (error != null) { 720 if (error instanceof IOException) { 721 throw (IOException) error; 722 } 723 throw IOExceptionSupport.create(error); 724 } 725 } 726 } 727 728 private boolean willReconnect() { 729 return firstConnection || 0 != calculateReconnectAttemptLimit(); 730 } 731 732 @Override 733 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 734 throw new AssertionError("Unsupported Method"); 735 } 736 737 @Override 738 public Object request(Object command) throws IOException { 739 throw new AssertionError("Unsupported Method"); 740 } 741 742 @Override 743 public Object request(Object command, int timeout) throws IOException { 744 throw new AssertionError("Unsupported Method"); 745 } 746 747 @Override 748 public void add(boolean rebalance, URI u[]) { 749 boolean newURI = false; 750 for (URI uri : u) { 751 if (!contains(uri)) { 752 uris.add(uri); 753 newURI = true; 754 } 755 } 756 if (newURI) { 757 reconnect(rebalance); 758 } 759 } 760 761 @Override 762 public void remove(boolean rebalance, URI u[]) { 763 for (URI uri : u) { 764 uris.remove(uri); 765 } 766 // rebalance is automatic if any connected to removed/stopped broker 767 } 768 769 public void add(boolean rebalance, String u) { 770 try { 771 URI newURI = new URI(u); 772 if (contains(newURI) == false) { 773 uris.add(newURI); 774 reconnect(rebalance); 775 } 776 777 } catch (Exception e) { 778 LOG.error("Failed to parse URI: " + u); 779 } 780 } 781 782 public void reconnect(boolean rebalance) { 783 synchronized (reconnectMutex) { 784 if (started) { 785 if (rebalance) { 786 doRebalance = true; 787 } 788 LOG.debug("Waking up reconnect task"); 789 try { 790 reconnectTask.wakeup(); 791 } catch (InterruptedException e) { 792 Thread.currentThread().interrupt(); 793 } 794 } else { 795 LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport."); 796 } 797 } 798 } 799 800 private List<URI> getConnectList() { 801 if (!updated.isEmpty()) { 802 return updated; 803 } 804 ArrayList<URI> l = new ArrayList<URI>(uris); 805 boolean removed = false; 806 if (failedConnectTransportURI != null) { 807 removed = l.remove(failedConnectTransportURI); 808 } 809 if (randomize) { 810 // Randomly, reorder the list by random swapping 811 for (int i = 0; i < l.size(); i++) { 812 // meed parenthesis due other JDKs (see AMQ-4826) 813 int p = ((int) (Math.random() * 100)) % l.size(); 814 URI t = l.get(p); 815 l.set(p, l.get(i)); 816 l.set(i, t); 817 } 818 } 819 if (removed) { 820 l.add(failedConnectTransportURI); 821 } 822 if (LOG.isDebugEnabled()) { 823 LOG.debug("urlList connectionList:" + l + ", from: " + uris); 824 } 825 return l; 826 } 827 828 @Override 829 public TransportListener getTransportListener() { 830 return transportListener; 831 } 832 833 @Override 834 public void setTransportListener(TransportListener commandListener) { 835 synchronized (listenerMutex) { 836 this.transportListener = commandListener; 837 listenerMutex.notifyAll(); 838 } 839 } 840 841 @Override 842 public <T> T narrow(Class<T> target) { 843 844 if (target.isAssignableFrom(getClass())) { 845 return target.cast(this); 846 } 847 Transport transport = connectedTransport.get(); 848 if (transport != null) { 849 return transport.narrow(target); 850 } 851 return null; 852 853 } 854 855 protected void restoreTransport(Transport t) throws Exception, IOException { 856 t.start(); 857 // send information to the broker - informing it we are an ft client 858 ConnectionControl cc = new ConnectionControl(); 859 cc.setFaultTolerant(true); 860 t.oneway(cc); 861 stateTracker.restore(t); 862 Map<Integer, Command> tmpMap = null; 863 synchronized (requestMap) { 864 tmpMap = new LinkedHashMap<Integer, Command>(requestMap); 865 } 866 for (Command command : tmpMap.values()) { 867 if (LOG.isTraceEnabled()) { 868 LOG.trace("restore requestMap, replay: " + command); 869 } 870 t.oneway(command); 871 } 872 } 873 874 public boolean isUseExponentialBackOff() { 875 return useExponentialBackOff; 876 } 877 878 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 879 this.useExponentialBackOff = useExponentialBackOff; 880 } 881 882 @Override 883 public String toString() { 884 return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString(); 885 } 886 887 @Override 888 public String getRemoteAddress() { 889 Transport transport = connectedTransport.get(); 890 if (transport != null) { 891 return transport.getRemoteAddress(); 892 } 893 return null; 894 } 895 896 @Override 897 public boolean isFaultTolerant() { 898 return true; 899 } 900 901 private void doUpdateURIsFromDisk() { 902 // If updateURIsURL is specified, read the file and add any new 903 // transport URI's to this FailOverTransport. 904 // Note: Could track file timestamp to avoid unnecessary reading. 905 String fileURL = getUpdateURIsURL(); 906 if (fileURL != null) { 907 BufferedReader in = null; 908 String newUris = null; 909 StringBuffer buffer = new StringBuffer(); 910 911 try { 912 in = new BufferedReader(getURLStream(fileURL)); 913 while (true) { 914 String line = in.readLine(); 915 if (line == null) { 916 break; 917 } 918 buffer.append(line); 919 } 920 newUris = buffer.toString(); 921 } catch (IOException ioe) { 922 LOG.error("Failed to read updateURIsURL: " + fileURL, ioe); 923 } finally { 924 if (in != null) { 925 try { 926 in.close(); 927 } catch (IOException ioe) { 928 // ignore 929 } 930 } 931 } 932 933 processNewTransports(isRebalanceUpdateURIs(), newUris); 934 } 935 } 936 937 final boolean doReconnect() { 938 Exception failure = null; 939 synchronized (reconnectMutex) { 940 941 // First ensure we are up to date. 942 doUpdateURIsFromDisk(); 943 944 if (disposed || connectionFailure != null) { 945 reconnectMutex.notifyAll(); 946 } 947 if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) { 948 return false; 949 } else { 950 List<URI> connectList = getConnectList(); 951 if (connectList.isEmpty()) { 952 failure = new IOException("No uris available to connect to."); 953 } else { 954 if (doRebalance) { 955 if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) { 956 // already connected to first in the list, no need to rebalance 957 doRebalance = false; 958 return false; 959 } else { 960 if (LOG.isDebugEnabled()) { 961 LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList); 962 } 963 964 try { 965 Transport transport = this.connectedTransport.getAndSet(null); 966 if (transport != null) { 967 disposeTransport(transport); 968 } 969 } catch (Exception e) { 970 if (LOG.isDebugEnabled()) { 971 LOG.debug("Caught an exception stopping existing transport for rebalance", e); 972 } 973 } 974 } 975 doRebalance = false; 976 } 977 978 resetReconnectDelay(); 979 980 Transport transport = null; 981 URI uri = null; 982 983 // If we have a backup already waiting lets try it. 984 synchronized (backupMutex) { 985 if ((priorityBackup || backup) && !backups.isEmpty()) { 986 ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups); 987 if (randomize) { 988 Collections.shuffle(l); 989 } 990 BackupTransport bt = l.remove(0); 991 backups.remove(bt); 992 transport = bt.getTransport(); 993 uri = bt.getUri(); 994 myTransportListener.onCommand(bt.getBrokerInfo()); 995 if (priorityBackup && priorityBackupAvailable) { 996 Transport old = this.connectedTransport.getAndSet(null); 997 if (old != null) { 998 disposeTransport(old); 999 } 1000 priorityBackupAvailable = false; 1001 } 1002 } 1003 } 1004 1005 // Sleep for the reconnectDelay if there's no backup and we aren't trying 1006 // for the first time, or we were disposed for some reason. 1007 if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) { 1008 synchronized (sleepMutex) { 1009 if (LOG.isDebugEnabled()) { 1010 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); 1011 } 1012 try { 1013 sleepMutex.wait(reconnectDelay); 1014 } catch (InterruptedException e) { 1015 Thread.currentThread().interrupt(); 1016 } 1017 } 1018 } 1019 1020 Iterator<URI> iter = connectList.iterator(); 1021 while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) { 1022 1023 try { 1024 SslContext.setCurrentSslContext(brokerSslContext); 1025 1026 // We could be starting with a backup and if so we wait to grab a 1027 // URI from the pool until next time around. 1028 if (transport == null) { 1029 uri = addExtraQueryOptions(iter.next()); 1030 transport = TransportFactory.compositeConnect(uri); 1031 } 1032 1033 if (LOG.isDebugEnabled()) { 1034 LOG.debug("Attempting " + connectFailures + "th connect to: " + uri); 1035 } 1036 transport.setTransportListener(myTransportListener); 1037 transport.start(); 1038 1039 if (started && !firstConnection) { 1040 restoreTransport(transport); 1041 } 1042 1043 if (LOG.isDebugEnabled()) { 1044 LOG.debug("Connection established"); 1045 } 1046 reconnectDelay = initialReconnectDelay; 1047 connectedTransportURI = uri; 1048 connectedTransport.set(transport); 1049 connectedToPriority = isPriority(connectedTransportURI); 1050 reconnectMutex.notifyAll(); 1051 connectFailures = 0; 1052 1053 // Make sure on initial startup, that the transportListener 1054 // has been initialized for this instance. 1055 synchronized (listenerMutex) { 1056 if (transportListener == null) { 1057 try { 1058 // if it isn't set after 2secs - it probably never will be 1059 listenerMutex.wait(2000); 1060 } catch (InterruptedException ex) { 1061 } 1062 } 1063 } 1064 1065 if (transportListener != null) { 1066 transportListener.transportResumed(); 1067 } else { 1068 if (LOG.isDebugEnabled()) { 1069 LOG.debug("transport resumed by transport listener not set"); 1070 } 1071 } 1072 1073 if (firstConnection) { 1074 firstConnection = false; 1075 LOG.info("Successfully connected to " + uri); 1076 } else { 1077 LOG.info("Successfully reconnected to " + uri); 1078 } 1079 1080 return false; 1081 } catch (Exception e) { 1082 failure = e; 1083 if (LOG.isDebugEnabled()) { 1084 LOG.debug("Connect fail to: " + uri + ", reason: " + e); 1085 } 1086 if (transport != null) { 1087 try { 1088 transport.stop(); 1089 transport = null; 1090 } catch (Exception ee) { 1091 if (LOG.isDebugEnabled()) { 1092 LOG.debug("Stop of failed transport: " + transport + 1093 " failed with reason: " + ee); 1094 } 1095 } 1096 } 1097 } finally { 1098 SslContext.setCurrentSslContext(null); 1099 } 1100 } 1101 } 1102 } 1103 1104 int reconnectLimit = calculateReconnectAttemptLimit(); 1105 1106 connectFailures++; 1107 if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) { 1108 LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)"); 1109 connectionFailure = failure; 1110 1111 // Make sure on initial startup, that the transportListener has been 1112 // initialized for this instance. 1113 synchronized (listenerMutex) { 1114 if (transportListener == null) { 1115 try { 1116 listenerMutex.wait(2000); 1117 } catch (InterruptedException ex) { 1118 } 1119 } 1120 } 1121 1122 propagateFailureToExceptionListener(connectionFailure); 1123 return false; 1124 } 1125 1126 int warnInterval = getWarnAfterReconnectAttempts(); 1127 if (warnInterval > 0 && (connectFailures % warnInterval) == 0) { 1128 LOG.warn("Failed to connect to {} after: {} attempt(s) continuing to retry.", 1129 uris, connectFailures); 1130 } 1131 } 1132 1133 if (!disposed) { 1134 doDelay(); 1135 } 1136 1137 return !disposed; 1138 } 1139 1140 private void doDelay() { 1141 if (reconnectDelay > 0) { 1142 synchronized (sleepMutex) { 1143 if (LOG.isDebugEnabled()) { 1144 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection"); 1145 } 1146 try { 1147 sleepMutex.wait(reconnectDelay); 1148 } catch (InterruptedException e) { 1149 Thread.currentThread().interrupt(); 1150 } 1151 } 1152 } 1153 1154 if (useExponentialBackOff) { 1155 // Exponential increment of reconnect delay. 1156 reconnectDelay *= backOffMultiplier; 1157 if (reconnectDelay > maxReconnectDelay) { 1158 reconnectDelay = maxReconnectDelay; 1159 } 1160 } 1161 } 1162 1163 private void resetReconnectDelay() { 1164 if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) { 1165 reconnectDelay = initialReconnectDelay; 1166 } 1167 } 1168 1169 /* 1170 * called with reconnectMutex held 1171 */ 1172 private void propagateFailureToExceptionListener(Exception exception) { 1173 if (transportListener != null) { 1174 if (exception instanceof IOException) { 1175 transportListener.onException((IOException)exception); 1176 } else { 1177 transportListener.onException(IOExceptionSupport.create(exception)); 1178 } 1179 } 1180 reconnectMutex.notifyAll(); 1181 } 1182 1183 private int calculateReconnectAttemptLimit() { 1184 int maxReconnectValue = this.maxReconnectAttempts; 1185 if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) { 1186 maxReconnectValue = this.startupMaxReconnectAttempts; 1187 } 1188 return maxReconnectValue; 1189 } 1190 1191 private boolean shouldBuildBackups() { 1192 return (backup && backups.size() < backupPoolSize) || (priorityBackup && !(priorityBackupAvailable || connectedToPriority)); 1193 } 1194 1195 final boolean buildBackups() { 1196 synchronized (backupMutex) { 1197 if (!disposed && shouldBuildBackups()) { 1198 ArrayList<URI> backupList = new ArrayList<URI>(priorityList); 1199 List<URI> connectList = getConnectList(); 1200 for (URI uri: connectList) { 1201 if (!backupList.contains(uri)) { 1202 backupList.add(uri); 1203 } 1204 } 1205 // removed disposed backups 1206 List<BackupTransport> disposedList = new ArrayList<BackupTransport>(); 1207 for (BackupTransport bt : backups) { 1208 if (bt.isDisposed()) { 1209 disposedList.add(bt); 1210 } 1211 } 1212 backups.removeAll(disposedList); 1213 disposedList.clear(); 1214 for (Iterator<URI> iter = backupList.iterator(); !disposed && iter.hasNext() && shouldBuildBackups(); ) { 1215 URI uri = addExtraQueryOptions(iter.next()); 1216 if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) { 1217 try { 1218 SslContext.setCurrentSslContext(brokerSslContext); 1219 BackupTransport bt = new BackupTransport(this); 1220 bt.setUri(uri); 1221 if (!backups.contains(bt)) { 1222 Transport t = TransportFactory.compositeConnect(uri); 1223 t.setTransportListener(bt); 1224 t.start(); 1225 bt.setTransport(t); 1226 if (priorityBackup && isPriority(uri)) { 1227 priorityBackupAvailable = true; 1228 backups.add(0, bt); 1229 // if this priority backup overflows the pool 1230 // remove the backup with the lowest priority 1231 if (backups.size() > backupPoolSize) { 1232 BackupTransport disposeTransport = backups.remove(backups.size() - 1); 1233 disposeTransport.setDisposed(true); 1234 Transport transport = disposeTransport.getTransport(); 1235 if (transport != null) { 1236 transport.setTransportListener(disposedListener); 1237 disposeTransport(transport); 1238 } 1239 } 1240 } else { 1241 backups.add(bt); 1242 } 1243 } 1244 } catch (Exception e) { 1245 LOG.debug("Failed to build backup ", e); 1246 } finally { 1247 SslContext.setCurrentSslContext(null); 1248 } 1249 } 1250 } 1251 } 1252 } 1253 return false; 1254 } 1255 1256 protected boolean isPriority(URI uri) { 1257 if (!priorityBackup) { 1258 return false; 1259 } 1260 1261 if (!priorityList.isEmpty()) { 1262 return priorityList.contains(uri); 1263 } 1264 return uris.indexOf(uri) == 0; 1265 } 1266 1267 @Override 1268 public boolean isDisposed() { 1269 return disposed; 1270 } 1271 1272 @Override 1273 public boolean isConnected() { 1274 return connectedTransport.get() != null; 1275 } 1276 1277 @Override 1278 public void reconnect(URI uri) throws IOException { 1279 add(true, new URI[]{uri}); 1280 } 1281 1282 @Override 1283 public boolean isReconnectSupported() { 1284 return this.reconnectSupported; 1285 } 1286 1287 public void setReconnectSupported(boolean value) { 1288 this.reconnectSupported = value; 1289 } 1290 1291 @Override 1292 public boolean isUpdateURIsSupported() { 1293 return this.updateURIsSupported; 1294 } 1295 1296 public void setUpdateURIsSupported(boolean value) { 1297 this.updateURIsSupported = value; 1298 } 1299 1300 @Override 1301 public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException { 1302 if (isUpdateURIsSupported()) { 1303 HashSet<URI> copy = new HashSet<URI>(); 1304 synchronized (reconnectMutex) { 1305 copy.addAll(this.updated); 1306 updated.clear(); 1307 if (updatedURIs != null && updatedURIs.length > 0) { 1308 for (URI uri : updatedURIs) { 1309 if (uri != null && !updated.contains(uri)) { 1310 updated.add(uri); 1311 } 1312 } 1313 } 1314 } 1315 if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) { 1316 buildBackups(); 1317 reconnect(rebalance); 1318 } 1319 } 1320 } 1321 1322 /** 1323 * @return the updateURIsURL 1324 */ 1325 public String getUpdateURIsURL() { 1326 return this.updateURIsURL; 1327 } 1328 1329 /** 1330 * @param updateURIsURL the updateURIsURL to set 1331 */ 1332 public void setUpdateURIsURL(String updateURIsURL) { 1333 this.updateURIsURL = updateURIsURL; 1334 } 1335 1336 /** 1337 * @return the rebalanceUpdateURIs 1338 */ 1339 public boolean isRebalanceUpdateURIs() { 1340 return this.rebalanceUpdateURIs; 1341 } 1342 1343 /** 1344 * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set 1345 */ 1346 public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) { 1347 this.rebalanceUpdateURIs = rebalanceUpdateURIs; 1348 } 1349 1350 @Override 1351 public int getReceiveCounter() { 1352 Transport transport = connectedTransport.get(); 1353 if (transport == null) { 1354 return 0; 1355 } 1356 return transport.getReceiveCounter(); 1357 } 1358 1359 public int getConnectFailures() { 1360 return connectFailures; 1361 } 1362 1363 public void connectionInterruptProcessingComplete(ConnectionId connectionId) { 1364 synchronized (reconnectMutex) { 1365 stateTracker.connectionInterruptProcessingComplete(this, connectionId); 1366 } 1367 } 1368 1369 public ConnectionStateTracker getStateTracker() { 1370 return stateTracker; 1371 } 1372 1373 private boolean contains(URI newURI) { 1374 boolean result = false; 1375 for (URI uri : uris) { 1376 if (compareURIs(newURI, uri)) { 1377 result = true; 1378 break; 1379 } 1380 } 1381 1382 return result; 1383 } 1384 1385 private boolean compareURIs(final URI first, final URI second) { 1386 1387 boolean result = false; 1388 if (first == null || second == null) { 1389 return result; 1390 } 1391 1392 if (first.getPort() == second.getPort()) { 1393 InetAddress firstAddr = null; 1394 InetAddress secondAddr = null; 1395 try { 1396 firstAddr = InetAddress.getByName(first.getHost()); 1397 secondAddr = InetAddress.getByName(second.getHost()); 1398 1399 if (firstAddr.equals(secondAddr)) { 1400 result = true; 1401 } 1402 1403 } catch(IOException e) { 1404 1405 if (firstAddr == null) { 1406 LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e); 1407 } else { 1408 LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e); 1409 } 1410 1411 if (first.getHost().equalsIgnoreCase(second.getHost())) { 1412 result = true; 1413 } 1414 } 1415 } 1416 1417 return result; 1418 } 1419 1420 private InputStreamReader getURLStream(String path) throws IOException { 1421 InputStreamReader result = null; 1422 URL url = null; 1423 try { 1424 url = new URL(path); 1425 result = new InputStreamReader(url.openStream()); 1426 } catch (MalformedURLException e) { 1427 // ignore - it could be a path to a a local file 1428 } 1429 if (result == null) { 1430 result = new FileReader(path); 1431 } 1432 return result; 1433 } 1434 1435 private URI addExtraQueryOptions(URI uri) { 1436 try { 1437 if( nestedExtraQueryOptions!=null && !nestedExtraQueryOptions.isEmpty() ) { 1438 if( uri.getQuery() == null ) { 1439 uri = URISupport.createURIWithQuery(uri, nestedExtraQueryOptions); 1440 } else { 1441 uri = URISupport.createURIWithQuery(uri, uri.getQuery()+"&"+nestedExtraQueryOptions); 1442 } 1443 } 1444 } catch (URISyntaxException e) { 1445 throw new RuntimeException(e); 1446 } 1447 return uri; 1448 } 1449 1450 public void setNestedExtraQueryOptions(String nestedExtraQueryOptions) { 1451 this.nestedExtraQueryOptions = nestedExtraQueryOptions; 1452 } 1453 1454 public int getWarnAfterReconnectAttempts() { 1455 return warnAfterReconnectAttempts; 1456 } 1457 1458 /** 1459 * Sets the number of Connect / Reconnect attempts that must occur before a warn message 1460 * is logged indicating that the transport is not connected. This can be useful when the 1461 * client is running inside some container or service as it give an indication of some 1462 * problem with the client connection that might not otherwise be visible. To disable the 1463 * log messages this value should be set to a value @{code attempts <= 0} 1464 * 1465 * @param warnAfterReconnectAttempts 1466 * The number of failed connection attempts that must happen before a warning is logged. 1467 */ 1468 public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) { 1469 this.warnAfterReconnectAttempts = warnAfterReconnectAttempts; 1470 } 1471 1472}