001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020import java.security.GeneralSecurityException; 021import java.security.cert.X509Certificate; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Properties; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.ExecutionException; 031import java.util.concurrent.ExecutorService; 032import java.util.concurrent.Executors; 033import java.util.concurrent.Future; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.TimeoutException; 036import java.util.concurrent.atomic.AtomicBoolean; 037 038import javax.management.ObjectName; 039 040import org.apache.activemq.DestinationDoesNotExistException; 041import org.apache.activemq.Service; 042import org.apache.activemq.advisory.AdvisoryBroker; 043import org.apache.activemq.advisory.AdvisorySupport; 044import org.apache.activemq.broker.BrokerService; 045import org.apache.activemq.broker.BrokerServiceAware; 046import org.apache.activemq.broker.ConnectionContext; 047import org.apache.activemq.broker.TransportConnection; 048import org.apache.activemq.broker.region.AbstractRegion; 049import org.apache.activemq.broker.region.DurableTopicSubscription; 050import org.apache.activemq.broker.region.Region; 051import org.apache.activemq.broker.region.RegionBroker; 052import org.apache.activemq.broker.region.Subscription; 053import org.apache.activemq.broker.region.policy.PolicyEntry; 054import org.apache.activemq.command.ActiveMQDestination; 055import org.apache.activemq.command.ActiveMQMessage; 056import org.apache.activemq.command.ActiveMQTempDestination; 057import org.apache.activemq.command.ActiveMQTopic; 058import org.apache.activemq.command.BrokerId; 059import org.apache.activemq.command.BrokerInfo; 060import org.apache.activemq.command.Command; 061import org.apache.activemq.command.ConnectionError; 062import org.apache.activemq.command.ConnectionId; 063import org.apache.activemq.command.ConnectionInfo; 064import org.apache.activemq.command.ConsumerId; 065import org.apache.activemq.command.ConsumerInfo; 066import org.apache.activemq.command.DataStructure; 067import org.apache.activemq.command.DestinationInfo; 068import org.apache.activemq.command.ExceptionResponse; 069import org.apache.activemq.command.KeepAliveInfo; 070import org.apache.activemq.command.Message; 071import org.apache.activemq.command.MessageAck; 072import org.apache.activemq.command.MessageDispatch; 073import org.apache.activemq.command.MessageId; 074import org.apache.activemq.command.NetworkBridgeFilter; 075import org.apache.activemq.command.ProducerInfo; 076import org.apache.activemq.command.RemoveInfo; 077import org.apache.activemq.command.RemoveSubscriptionInfo; 078import org.apache.activemq.command.Response; 079import org.apache.activemq.command.SessionInfo; 080import org.apache.activemq.command.ShutdownInfo; 081import org.apache.activemq.command.SubscriptionInfo; 082import org.apache.activemq.command.WireFormatInfo; 083import org.apache.activemq.filter.DestinationFilter; 084import org.apache.activemq.filter.MessageEvaluationContext; 085import org.apache.activemq.security.SecurityContext; 086import org.apache.activemq.transport.DefaultTransportListener; 087import org.apache.activemq.transport.FutureResponse; 088import org.apache.activemq.transport.ResponseCallback; 089import org.apache.activemq.transport.Transport; 090import org.apache.activemq.transport.TransportDisposedIOException; 091import org.apache.activemq.transport.TransportFilter; 092import org.apache.activemq.transport.tcp.SslTransport; 093import org.apache.activemq.util.IdGenerator; 094import org.apache.activemq.util.IntrospectionSupport; 095import org.apache.activemq.util.LongSequenceGenerator; 096import org.apache.activemq.util.MarshallingSupport; 097import org.apache.activemq.util.ServiceStopper; 098import org.apache.activemq.util.ServiceSupport; 099import org.slf4j.Logger; 100import org.slf4j.LoggerFactory; 101 102/** 103 * A useful base class for implementing demand forwarding bridges. 104 */ 105public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { 106 private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class); 107 protected static final String DURABLE_SUB_PREFIX = "NC-DS_"; 108 protected final Transport localBroker; 109 protected final Transport remoteBroker; 110 protected IdGenerator idGenerator = new IdGenerator(); 111 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 112 protected ConnectionInfo localConnectionInfo; 113 protected ConnectionInfo remoteConnectionInfo; 114 protected SessionInfo localSessionInfo; 115 protected ProducerInfo producerInfo; 116 protected String remoteBrokerName = "Unknown"; 117 protected String localClientId; 118 protected ConsumerInfo demandConsumerInfo; 119 protected int demandConsumerDispatched; 120 protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false); 121 protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false); 122 protected final AtomicBoolean bridgeFailed = new AtomicBoolean(); 123 protected final AtomicBoolean disposed = new AtomicBoolean(); 124 protected BrokerId localBrokerId; 125 protected ActiveMQDestination[] excludedDestinations; 126 protected ActiveMQDestination[] dynamicallyIncludedDestinations; 127 protected ActiveMQDestination[] staticallyIncludedDestinations; 128 protected ActiveMQDestination[] durableDestinations; 129 protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); 130 protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); 131 protected final BrokerId localBrokerPath[] = new BrokerId[]{null}; 132 protected final CountDownLatch startedLatch = new CountDownLatch(2); 133 protected final CountDownLatch localStartedLatch = new CountDownLatch(1); 134 protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); 135 protected NetworkBridgeConfiguration configuration; 136 protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory(); 137 138 protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null}; 139 protected BrokerId remoteBrokerId; 140 141 protected final NetworkBridgeStatistics networkBridgeStatistics = new NetworkBridgeStatistics(); 142 143 private NetworkBridgeListener networkBridgeListener; 144 private boolean createdByDuplex; 145 private BrokerInfo localBrokerInfo; 146 private BrokerInfo remoteBrokerInfo; 147 148 private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed); 149 private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed); 150 151 private final AtomicBoolean started = new AtomicBoolean(); 152 private TransportConnection duplexInitiatingConnection; 153 private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean(); 154 protected BrokerService brokerService = null; 155 private ObjectName mbeanObjectName; 156 private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor(); 157 private Transport duplexInboundLocalBroker = null; 158 private ProducerInfo duplexInboundLocalProducerInfo; 159 160 public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { 161 this.configuration = configuration; 162 this.localBroker = localBroker; 163 this.remoteBroker = remoteBroker; 164 } 165 166 public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception { 167 this.localBrokerInfo = localBrokerInfo; 168 this.remoteBrokerInfo = remoteBrokerInfo; 169 this.duplexInitiatingConnection = connection; 170 start(); 171 serviceRemoteCommand(remoteBrokerInfo); 172 } 173 174 @Override 175 public void start() throws Exception { 176 if (started.compareAndSet(false, true)) { 177 178 if (brokerService == null) { 179 throw new IllegalArgumentException("BrokerService is null on " + this); 180 } 181 182 networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics()); 183 184 if (isDuplex()) { 185 duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker()); 186 duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() { 187 188 @Override 189 public void onCommand(Object o) { 190 Command command = (Command) o; 191 serviceLocalCommand(command); 192 } 193 194 @Override 195 public void onException(IOException error) { 196 serviceLocalException(error); 197 } 198 }); 199 duplexInboundLocalBroker.start(); 200 } 201 202 localBroker.setTransportListener(new DefaultTransportListener() { 203 204 @Override 205 public void onCommand(Object o) { 206 Command command = (Command) o; 207 serviceLocalCommand(command); 208 } 209 210 @Override 211 public void onException(IOException error) { 212 if (!futureLocalBrokerInfo.isDone()) { 213 futureLocalBrokerInfo.cancel(true); 214 return; 215 } 216 serviceLocalException(error); 217 } 218 }); 219 220 remoteBroker.setTransportListener(new DefaultTransportListener() { 221 222 @Override 223 public void onCommand(Object o) { 224 Command command = (Command) o; 225 serviceRemoteCommand(command); 226 } 227 228 @Override 229 public void onException(IOException error) { 230 if (!futureRemoteBrokerInfo.isDone()) { 231 futureRemoteBrokerInfo.cancel(true); 232 return; 233 } 234 serviceRemoteException(error); 235 } 236 }); 237 238 remoteBroker.start(); 239 localBroker.start(); 240 241 if (!disposed.get()) { 242 try { 243 triggerStartAsyncNetworkBridgeCreation(); 244 } catch (IOException e) { 245 LOG.warn("Caught exception from remote start", e); 246 } 247 } else { 248 LOG.warn("Bridge was disposed before the start() method was fully executed."); 249 throw new TransportDisposedIOException(); 250 } 251 } 252 } 253 254 @Override 255 public void stop() throws Exception { 256 if (started.compareAndSet(true, false)) { 257 if (disposed.compareAndSet(false, true)) { 258 LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName); 259 260 futureRemoteBrokerInfo.cancel(true); 261 futureLocalBrokerInfo.cancel(true); 262 263 NetworkBridgeListener l = this.networkBridgeListener; 264 if (l != null) { 265 l.onStop(this); 266 } 267 try { 268 // local start complete 269 if (startedLatch.getCount() < 2) { 270 LOG.trace("{} unregister bridge ({}) to {}", new Object[]{ 271 configuration.getBrokerName(), this, remoteBrokerName 272 }); 273 brokerService.getBroker().removeBroker(null, remoteBrokerInfo); 274 brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); 275 } 276 277 remoteBridgeStarted.set(false); 278 final CountDownLatch sendShutdown = new CountDownLatch(1); 279 280 brokerService.getTaskRunnerFactory().execute(new Runnable() { 281 @Override 282 public void run() { 283 try { 284 serialExecutor.shutdown(); 285 if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) { 286 List<Runnable> pendingTasks = serialExecutor.shutdownNow(); 287 LOG.info("pending tasks on stop {}", pendingTasks); 288 } 289 localBroker.oneway(new ShutdownInfo()); 290 remoteBroker.oneway(new ShutdownInfo()); 291 } catch (Throwable e) { 292 LOG.debug("Caught exception sending shutdown", e); 293 } finally { 294 sendShutdown.countDown(); 295 } 296 297 } 298 }, "ActiveMQ ForwardingBridge StopTask"); 299 300 if (!sendShutdown.await(10, TimeUnit.SECONDS)) { 301 LOG.info("Network Could not shutdown in a timely manner"); 302 } 303 } finally { 304 ServiceStopper ss = new ServiceStopper(); 305 ss.stop(remoteBroker); 306 ss.stop(localBroker); 307 ss.stop(duplexInboundLocalBroker); 308 // Release the started Latch since another thread could be 309 // stuck waiting for it to start up. 310 startedLatch.countDown(); 311 startedLatch.countDown(); 312 localStartedLatch.countDown(); 313 314 ss.throwFirstException(); 315 } 316 } 317 318 LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName); 319 } 320 } 321 322 protected void triggerStartAsyncNetworkBridgeCreation() throws IOException { 323 brokerService.getTaskRunnerFactory().execute(new Runnable() { 324 @Override 325 public void run() { 326 final String originalName = Thread.currentThread().getName(); 327 Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " + 328 "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker); 329 330 try { 331 // First we collect the info data from both the local and remote ends 332 collectBrokerInfos(); 333 334 // Once we have all required broker info we can attempt to start 335 // the local and then remote sides of the bridge. 336 doStartLocalAndRemoteBridges(); 337 } finally { 338 Thread.currentThread().setName(originalName); 339 } 340 } 341 }); 342 } 343 344 private void collectBrokerInfos() { 345 346 // First wait for the remote to feed us its BrokerInfo, then we can check on 347 // the LocalBrokerInfo and decide is this is a loop. 348 try { 349 remoteBrokerInfo = futureRemoteBrokerInfo.get(); 350 if (remoteBrokerInfo == null) { 351 serviceLocalException(new Throwable("remoteBrokerInfo is null")); 352 return; 353 } 354 } catch (Exception e) { 355 serviceRemoteException(e); 356 return; 357 } 358 359 try { 360 localBrokerInfo = futureLocalBrokerInfo.get(); 361 if (localBrokerInfo == null) { 362 serviceLocalException(new Throwable("localBrokerInfo is null")); 363 return; 364 } 365 366 // Before we try and build the bridge lets check if we are in a loop 367 // and if so just stop now before registering anything. 368 remoteBrokerId = remoteBrokerInfo.getBrokerId(); 369 if (localBrokerId.equals(remoteBrokerId)) { 370 LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{ 371 configuration.getBrokerName(), remoteBrokerName, remoteBrokerId 372 }); 373 ServiceSupport.dispose(localBroker); 374 ServiceSupport.dispose(remoteBroker); 375 // the bridge is left in a bit of limbo, but it won't get retried 376 // in this state. 377 return; 378 } 379 380 // Fill in the remote broker's information now. 381 remoteBrokerPath[0] = remoteBrokerId; 382 remoteBrokerName = remoteBrokerInfo.getBrokerName(); 383 if (configuration.isUseBrokerNamesAsIdSeed()) { 384 idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName); 385 } 386 } catch (Throwable e) { 387 serviceLocalException(e); 388 } 389 } 390 391 private void doStartLocalAndRemoteBridges() { 392 393 if (disposed.get()) { 394 return; 395 } 396 397 if (isCreatedByDuplex()) { 398 // apply remote (propagated) configuration to local duplex bridge before start 399 Properties props = null; 400 try { 401 props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties()); 402 IntrospectionSupport.getProperties(configuration, props, null); 403 if (configuration.getExcludedDestinations() != null) { 404 excludedDestinations = configuration.getExcludedDestinations().toArray( 405 new ActiveMQDestination[configuration.getExcludedDestinations().size()]); 406 } 407 if (configuration.getStaticallyIncludedDestinations() != null) { 408 staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray( 409 new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); 410 } 411 if (configuration.getDynamicallyIncludedDestinations() != null) { 412 dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray( 413 new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]); 414 } 415 } catch (Throwable t) { 416 LOG.error("Error mapping remote configuration: {}", props, t); 417 } 418 } 419 420 try { 421 startLocalBridge(); 422 } catch (Throwable e) { 423 serviceLocalException(e); 424 return; 425 } 426 427 try { 428 startRemoteBridge(); 429 } catch (Throwable e) { 430 serviceRemoteException(e); 431 return; 432 } 433 434 try { 435 if (safeWaitUntilStarted()) { 436 setupStaticDestinations(); 437 } 438 } catch (Throwable e) { 439 serviceLocalException(e); 440 } 441 } 442 443 private void startLocalBridge() throws Throwable { 444 if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) { 445 synchronized (this) { 446 LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker); 447 if (!disposed.get()) { 448 449 if (idGenerator == null) { 450 throw new IllegalStateException("Id Generator cannot be null"); 451 } 452 453 localConnectionInfo = new ConnectionInfo(); 454 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 455 localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); 456 localConnectionInfo.setClientId(localClientId); 457 localConnectionInfo.setUserName(configuration.getUserName()); 458 localConnectionInfo.setPassword(configuration.getPassword()); 459 Transport originalTransport = remoteBroker; 460 while (originalTransport instanceof TransportFilter) { 461 originalTransport = ((TransportFilter) originalTransport).getNext(); 462 } 463 if (originalTransport instanceof SslTransport) { 464 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); 465 localConnectionInfo.setTransportContext(peerCerts); 466 } 467 // sync requests that may fail 468 Object resp = localBroker.request(localConnectionInfo); 469 if (resp instanceof ExceptionResponse) { 470 throw ((ExceptionResponse) resp).getException(); 471 } 472 localSessionInfo = new SessionInfo(localConnectionInfo, 1); 473 localBroker.oneway(localSessionInfo); 474 475 if (configuration.isDuplex()) { 476 // separate in-bound channel for forwards so we don't 477 // contend with out-bound dispatch on same connection 478 remoteBrokerInfo.setNetworkConnection(true); 479 duplexInboundLocalBroker.oneway(remoteBrokerInfo); 480 481 ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo(); 482 duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 483 duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_" 484 + configuration.getBrokerName()); 485 duplexLocalConnectionInfo.setUserName(configuration.getUserName()); 486 duplexLocalConnectionInfo.setPassword(configuration.getPassword()); 487 488 if (originalTransport instanceof SslTransport) { 489 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); 490 duplexLocalConnectionInfo.setTransportContext(peerCerts); 491 } 492 // sync requests that may fail 493 resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo); 494 if (resp instanceof ExceptionResponse) { 495 throw ((ExceptionResponse) resp).getException(); 496 } 497 SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1); 498 duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1); 499 duplexInboundLocalBroker.oneway(duplexInboundSession); 500 duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo); 501 } 502 brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString()); 503 NetworkBridgeListener l = this.networkBridgeListener; 504 if (l != null) { 505 l.onStart(this); 506 } 507 508 // Let the local broker know the remote broker's ID. 509 localBroker.oneway(remoteBrokerInfo); 510 // new peer broker (a consumer can work with remote broker also) 511 brokerService.getBroker().addBroker(null, remoteBrokerInfo); 512 513 LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{ 514 localBroker, remoteBroker, remoteBrokerName 515 }); 516 LOG.trace("{} register bridge ({}) to {}", new Object[]{ 517 configuration.getBrokerName(), this, remoteBrokerName 518 }); 519 } else { 520 LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed."); 521 } 522 startedLatch.countDown(); 523 localStartedLatch.countDown(); 524 } 525 } 526 } 527 528 protected void startRemoteBridge() throws Exception { 529 if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) { 530 LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker); 531 synchronized (this) { 532 if (!isCreatedByDuplex()) { 533 BrokerInfo brokerInfo = new BrokerInfo(); 534 brokerInfo.setBrokerName(configuration.getBrokerName()); 535 brokerInfo.setBrokerURL(configuration.getBrokerURL()); 536 brokerInfo.setNetworkConnection(true); 537 brokerInfo.setDuplexConnection(configuration.isDuplex()); 538 // set our properties 539 Properties props = new Properties(); 540 IntrospectionSupport.getProperties(configuration, props, null); 541 props.remove("networkTTL"); 542 String str = MarshallingSupport.propertiesToString(props); 543 brokerInfo.setNetworkProperties(str); 544 brokerInfo.setBrokerId(this.localBrokerId); 545 remoteBroker.oneway(brokerInfo); 546 } 547 if (remoteConnectionInfo != null) { 548 remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); 549 } 550 remoteConnectionInfo = new ConnectionInfo(); 551 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 552 remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound"); 553 remoteConnectionInfo.setUserName(configuration.getUserName()); 554 remoteConnectionInfo.setPassword(configuration.getPassword()); 555 remoteBroker.oneway(remoteConnectionInfo); 556 557 SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1); 558 remoteBroker.oneway(remoteSessionInfo); 559 producerInfo = new ProducerInfo(remoteSessionInfo, 1); 560 producerInfo.setResponseRequired(false); 561 remoteBroker.oneway(producerInfo); 562 // Listen to consumer advisory messages on the remote broker to determine demand. 563 if (!configuration.isStaticBridge()) { 564 demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1); 565 // always dispatch advisory message asynchronously so that 566 // we never block the producer broker if we are slow 567 demandConsumerInfo.setDispatchAsync(true); 568 String advisoryTopic = configuration.getDestinationFilter(); 569 if (configuration.isBridgeTempDestinations()) { 570 advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC; 571 } 572 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic)); 573 demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize()); 574 remoteBroker.oneway(demandConsumerInfo); 575 } 576 startedLatch.countDown(); 577 } 578 } 579 } 580 581 @Override 582 public void serviceRemoteException(Throwable error) { 583 if (!disposed.get()) { 584 if (error instanceof SecurityException || error instanceof GeneralSecurityException) { 585 LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{ 586 localBroker, remoteBroker, error 587 }); 588 } else { 589 LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{ 590 localBroker, remoteBroker, error 591 }); 592 } 593 LOG.debug("The remote Exception was: {}", error, error); 594 brokerService.getTaskRunnerFactory().execute(new Runnable() { 595 @Override 596 public void run() { 597 ServiceSupport.dispose(getControllingService()); 598 } 599 }); 600 fireBridgeFailed(error); 601 } 602 } 603 604 protected void serviceRemoteCommand(Command command) { 605 if (!disposed.get()) { 606 try { 607 if (command.isMessageDispatch()) { 608 safeWaitUntilStarted(); 609 MessageDispatch md = (MessageDispatch) command; 610 serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); 611 ackAdvisory(md.getMessage()); 612 } else if (command.isBrokerInfo()) { 613 futureRemoteBrokerInfo.set((BrokerInfo) command); 614 } else if (command.getClass() == ConnectionError.class) { 615 ConnectionError ce = (ConnectionError) command; 616 serviceRemoteException(ce.getException()); 617 } else { 618 if (isDuplex()) { 619 LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType()); 620 if (command.isMessage()) { 621 final ActiveMQMessage message = (ActiveMQMessage) command; 622 if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) { 623 serviceRemoteConsumerAdvisory(message.getDataStructure()); 624 ackAdvisory(message); 625 } else { 626 if (!isPermissableDestination(message.getDestination(), true)) { 627 return; 628 } 629 // message being forwarded - we need to 630 // propagate the response to our local send 631 if (canDuplexDispatch(message)) { 632 message.setProducerId(duplexInboundLocalProducerInfo.getProducerId()); 633 if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { 634 duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() { 635 final int correlationId = message.getCommandId(); 636 637 @Override 638 public void onCompletion(FutureResponse resp) { 639 try { 640 Response reply = resp.getResult(); 641 reply.setCorrelationId(correlationId); 642 remoteBroker.oneway(reply); 643 //increment counter when messages are received in duplex mode 644 networkBridgeStatistics.getReceivedCount().increment(); 645 } catch (IOException error) { 646 LOG.error("Exception: {} on duplex forward of: {}", error, message); 647 serviceRemoteException(error); 648 } 649 } 650 }); 651 } else { 652 duplexInboundLocalBroker.oneway(message); 653 networkBridgeStatistics.getReceivedCount().increment(); 654 } 655 serviceInboundMessage(message); 656 } else { 657 if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { 658 Response reply = new Response(); 659 reply.setCorrelationId(message.getCommandId()); 660 remoteBroker.oneway(reply); 661 } 662 } 663 } 664 } else { 665 switch (command.getDataStructureType()) { 666 case ConnectionInfo.DATA_STRUCTURE_TYPE: 667 if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) { 668 // end of initiating connection setup - propogate to initial connection to get mbean by clientid 669 duplexInitiatingConnection.processAddConnection((ConnectionInfo) command); 670 } else { 671 localBroker.oneway(command); 672 } 673 break; 674 case SessionInfo.DATA_STRUCTURE_TYPE: 675 localBroker.oneway(command); 676 break; 677 case ProducerInfo.DATA_STRUCTURE_TYPE: 678 // using duplexInboundLocalProducerInfo 679 break; 680 case MessageAck.DATA_STRUCTURE_TYPE: 681 MessageAck ack = (MessageAck) command; 682 DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId()); 683 if (localSub != null) { 684 ack.setConsumerId(localSub.getLocalInfo().getConsumerId()); 685 localBroker.oneway(ack); 686 } else { 687 LOG.warn("Matching local subscription not found for ack: {}", ack); 688 } 689 break; 690 case ConsumerInfo.DATA_STRUCTURE_TYPE: 691 localStartedLatch.await(); 692 if (started.get()) { 693 addConsumerInfo((ConsumerInfo) command); 694 } else { 695 // received a subscription whilst stopping 696 LOG.warn("Stopping - ignoring ConsumerInfo: {}", command); 697 } 698 break; 699 case ShutdownInfo.DATA_STRUCTURE_TYPE: 700 // initiator is shutting down, controlled case 701 // abortive close dealt with by inactivity monitor 702 LOG.info("Stopping network bridge on shutdown of remote broker"); 703 serviceRemoteException(new IOException(command.toString())); 704 break; 705 default: 706 LOG.debug("Ignoring remote command: {}", command); 707 } 708 } 709 } else { 710 switch (command.getDataStructureType()) { 711 case KeepAliveInfo.DATA_STRUCTURE_TYPE: 712 case WireFormatInfo.DATA_STRUCTURE_TYPE: 713 case ShutdownInfo.DATA_STRUCTURE_TYPE: 714 break; 715 default: 716 LOG.warn("Unexpected remote command: {}", command); 717 } 718 } 719 } 720 } catch (Throwable e) { 721 LOG.debug("Exception processing remote command: {}", command, e); 722 serviceRemoteException(e); 723 } 724 } 725 } 726 727 private void ackAdvisory(Message message) throws IOException { 728 demandConsumerDispatched++; 729 if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) { 730 MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched); 731 ack.setConsumerId(demandConsumerInfo.getConsumerId()); 732 remoteBroker.oneway(ack); 733 demandConsumerDispatched = 0; 734 } 735 } 736 737 private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException { 738 final int networkTTL = configuration.getConsumerTTL(); 739 if (data.getClass() == ConsumerInfo.class) { 740 // Create a new local subscription 741 ConsumerInfo info = (ConsumerInfo) data; 742 BrokerId[] path = info.getBrokerPath(); 743 744 if (info.isBrowser()) { 745 LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName); 746 return; 747 } 748 749 if (path != null && networkTTL > -1 && path.length >= networkTTL) { 750 LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{ 751 configuration.getBrokerName(), remoteBrokerName, networkTTL, info 752 }); 753 return; 754 } 755 756 if (contains(path, localBrokerPath[0])) { 757 // Ignore this consumer as it's a consumer we locally sent to the broker. 758 LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{ 759 configuration.getBrokerName(), remoteBrokerName, info 760 }); 761 return; 762 } 763 764 if (!isPermissableDestination(info.getDestination())) { 765 // ignore if not in the permitted or in the excluded list 766 LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{ 767 configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info 768 }); 769 return; 770 } 771 772 // in a cyclic network there can be multiple bridges per broker that can propagate 773 // a network subscription so there is a need to synchronize on a shared entity 774 synchronized (brokerService.getVmConnectorURI()) { 775 addConsumerInfo(info); 776 } 777 } else if (data.getClass() == DestinationInfo.class) { 778 // It's a destination info - we want to pass up information about temporary destinations 779 final DestinationInfo destInfo = (DestinationInfo) data; 780 BrokerId[] path = destInfo.getBrokerPath(); 781 if (path != null && networkTTL > -1 && path.length >= networkTTL) { 782 LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{ 783 configuration.getBrokerName(), destInfo, networkTTL 784 }); 785 return; 786 } 787 if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) { 788 LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo); 789 return; 790 } 791 destInfo.setConnectionId(localConnectionInfo.getConnectionId()); 792 if (destInfo.getDestination() instanceof ActiveMQTempDestination) { 793 // re-set connection id so comes from here 794 ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination(); 795 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId()); 796 } 797 destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath())); 798 LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{ 799 configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo 800 }); 801 if (destInfo.isRemoveOperation()) { 802 // Serialize with removeSub operations such that all removeSub advisories 803 // are generated 804 serialExecutor.execute(new Runnable() { 805 @Override 806 public void run() { 807 try { 808 localBroker.oneway(destInfo); 809 } catch (IOException e) { 810 LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e); 811 } 812 } 813 }); 814 } else { 815 localBroker.oneway(destInfo); 816 } 817 } else if (data.getClass() == RemoveInfo.class) { 818 ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); 819 removeDemandSubscription(id); 820 } else if (data.getClass() == RemoveSubscriptionInfo.class) { 821 RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); 822 SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); 823 for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { 824 DemandSubscription ds = i.next(); 825 boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo); 826 if (removed) { 827 if (ds.getDurableRemoteSubs().isEmpty()) { 828 829 // deactivate subscriber 830 RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId()); 831 localBroker.oneway(removeInfo); 832 833 // remove subscriber 834 RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo(); 835 sending.setClientId(localClientId); 836 sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName()); 837 sending.setConnectionId(this.localConnectionInfo.getConnectionId()); 838 localBroker.oneway(sending); 839 840 //remove subscriber from map 841 i.remove(); 842 } 843 } 844 } 845 } 846 } 847 848 @Override 849 public void serviceLocalException(Throwable error) { 850 serviceLocalException(null, error); 851 } 852 853 public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) { 854 LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error); 855 if (!disposed.get()) { 856 if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) { 857 // not a reason to terminate the bridge - temps can disappear with 858 // pending sends as the demand sub may outlive the remote dest 859 if (messageDispatch != null) { 860 LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error); 861 try { 862 MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1); 863 poisonAck.setPoisonCause(error); 864 localBroker.oneway(poisonAck); 865 } catch (IOException ioe) { 866 LOG.error("Failed to posion ack message following forward failure: ", ioe); 867 } 868 fireFailedForwardAdvisory(messageDispatch, error); 869 } else { 870 LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error); 871 } 872 return; 873 } 874 875 LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error}); 876 LOG.debug("The local Exception was: {}", error, error); 877 878 brokerService.getTaskRunnerFactory().execute(new Runnable() { 879 @Override 880 public void run() { 881 ServiceSupport.dispose(getControllingService()); 882 } 883 }); 884 fireBridgeFailed(error); 885 } 886 } 887 888 private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) { 889 if (configuration.isAdvisoryForFailedForward()) { 890 AdvisoryBroker advisoryBroker = null; 891 try { 892 advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); 893 894 if (advisoryBroker != null) { 895 ConnectionContext context = new ConnectionContext(); 896 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 897 context.setBroker(brokerService.getBroker()); 898 899 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 900 advisoryMessage.setStringProperty("cause", error.getLocalizedMessage()); 901 advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null, 902 advisoryMessage); 903 904 } 905 } catch (Exception e) { 906 LOG.warn("failed to fire forward failure advisory, cause: {}", e); 907 LOG.debug("detail", e); 908 } 909 } 910 } 911 912 protected Service getControllingService() { 913 return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this; 914 } 915 916 protected void addSubscription(DemandSubscription sub) throws IOException { 917 if (sub != null) { 918 if (isDuplex()) { 919 // async vm transport, need to wait for completion 920 localBroker.request(sub.getLocalInfo()); 921 } else { 922 localBroker.oneway(sub.getLocalInfo()); 923 } 924 } 925 } 926 927 protected void removeSubscription(final DemandSubscription sub) throws IOException { 928 if (sub != null) { 929 LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()}); 930 931 // ensure not available for conduit subs pending removal 932 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 933 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 934 935 // continue removal in separate thread to free up this thread for outstanding responses 936 // Serialize with removeDestination operations so that removeSubs are serialized with 937 // removeDestinations such that all removeSub advisories are generated 938 serialExecutor.execute(new Runnable() { 939 @Override 940 public void run() { 941 sub.waitForCompletion(); 942 try { 943 localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); 944 } catch (IOException e) { 945 LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e); 946 } 947 } 948 }); 949 } 950 } 951 952 protected Message configureMessage(MessageDispatch md) throws IOException { 953 Message message = md.getMessage().copy(); 954 // Update the packet to show where it came from. 955 message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath)); 956 message.setProducerId(producerInfo.getProducerId()); 957 message.setDestination(md.getDestination()); 958 message.setMemoryUsage(null); 959 if (message.getOriginalTransactionId() == null) { 960 message.setOriginalTransactionId(message.getTransactionId()); 961 } 962 message.setTransactionId(null); 963 if (configuration.isUseCompression()) { 964 message.compress(); 965 } 966 return message; 967 } 968 969 protected void serviceLocalCommand(Command command) { 970 if (!disposed.get()) { 971 try { 972 if (command.isMessageDispatch()) { 973 safeWaitUntilStarted(); 974 networkBridgeStatistics.getEnqueues().increment(); 975 final MessageDispatch md = (MessageDispatch) command; 976 final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); 977 if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) { 978 979 if (suppressMessageDispatch(md, sub)) { 980 LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{ 981 configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage() 982 }); 983 // still ack as it may be durable 984 try { 985 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 986 } finally { 987 sub.decrementOutstandingResponses(); 988 } 989 return; 990 } 991 992 Message message = configureMessage(md); 993 LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{ 994 configuration.getBrokerName(), remoteBrokerName, (LOG.isTraceEnabled() ? message : message.getMessageId()), md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), message 995 }); 996 997 if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) { 998 try { 999 // never request b/c they are eventually acked async 1000 remoteBroker.oneway(message); 1001 } finally { 1002 sub.decrementOutstandingResponses(); 1003 } 1004 return; 1005 } 1006 1007 if (message.isPersistent() || configuration.isAlwaysSyncSend()) { 1008 1009 // The message was not sent using async send, so we should only 1010 // ack the local broker when we get confirmation that the remote 1011 // broker has received the message. 1012 remoteBroker.asyncRequest(message, new ResponseCallback() { 1013 @Override 1014 public void onCompletion(FutureResponse future) { 1015 try { 1016 Response response = future.getResult(); 1017 if (response.isException()) { 1018 ExceptionResponse er = (ExceptionResponse) response; 1019 serviceLocalException(md, er.getException()); 1020 } else { 1021 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1022 networkBridgeStatistics.getDequeues().increment(); 1023 } 1024 } catch (IOException e) { 1025 serviceLocalException(md, e); 1026 } finally { 1027 sub.decrementOutstandingResponses(); 1028 } 1029 } 1030 }); 1031 1032 } else { 1033 // If the message was originally sent using async send, we will 1034 // preserve that QOS by bridging it using an async send (small chance 1035 // of message loss). 1036 try { 1037 remoteBroker.oneway(message); 1038 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1039 networkBridgeStatistics.getDequeues().increment(); 1040 } finally { 1041 sub.decrementOutstandingResponses(); 1042 } 1043 } 1044 serviceOutbound(message); 1045 } else { 1046 LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage()); 1047 } 1048 } else if (command.isBrokerInfo()) { 1049 futureLocalBrokerInfo.set((BrokerInfo) command); 1050 } else if (command.isShutdownInfo()) { 1051 LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName()); 1052 stop(); 1053 } else if (command.getClass() == ConnectionError.class) { 1054 ConnectionError ce = (ConnectionError) command; 1055 serviceLocalException(ce.getException()); 1056 } else { 1057 switch (command.getDataStructureType()) { 1058 case WireFormatInfo.DATA_STRUCTURE_TYPE: 1059 break; 1060 default: 1061 LOG.warn("Unexpected local command: {}", command); 1062 } 1063 } 1064 } catch (Throwable e) { 1065 LOG.warn("Caught an exception processing local command", e); 1066 serviceLocalException(e); 1067 } 1068 } 1069 } 1070 1071 private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception { 1072 boolean suppress = false; 1073 // for durable subs, suppression via filter leaves dangling acks so we 1074 // need to check here and allow the ack irrespective 1075 if (sub.getLocalInfo().isDurable()) { 1076 MessageEvaluationContext messageEvalContext = new MessageEvaluationContext(); 1077 messageEvalContext.setMessageReference(md.getMessage()); 1078 messageEvalContext.setDestination(md.getDestination()); 1079 suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext); 1080 } 1081 return suppress; 1082 } 1083 1084 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { 1085 if (brokerPath != null) { 1086 for (BrokerId id : brokerPath) { 1087 if (brokerId.equals(id)) { 1088 return true; 1089 } 1090 } 1091 } 1092 return false; 1093 } 1094 1095 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) { 1096 if (brokerPath == null || brokerPath.length == 0) { 1097 return pathsToAppend; 1098 } 1099 BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length]; 1100 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 1101 System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length); 1102 return rc; 1103 } 1104 1105 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) { 1106 if (brokerPath == null || brokerPath.length == 0) { 1107 return new BrokerId[]{idToAppend}; 1108 } 1109 BrokerId rc[] = new BrokerId[brokerPath.length + 1]; 1110 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 1111 rc[brokerPath.length] = idToAppend; 1112 return rc; 1113 } 1114 1115 protected boolean isPermissableDestination(ActiveMQDestination destination) { 1116 return isPermissableDestination(destination, false); 1117 } 1118 1119 protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) { 1120 // Are we not bridging temporary destinations? 1121 if (destination.isTemporary()) { 1122 if (allowTemporary) { 1123 return true; 1124 } else { 1125 return configuration.isBridgeTempDestinations(); 1126 } 1127 } 1128 1129 ActiveMQDestination[] dests = staticallyIncludedDestinations; 1130 if (dests != null && dests.length > 0) { 1131 for (ActiveMQDestination dest : dests) { 1132 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); 1133 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1134 return true; 1135 } 1136 } 1137 } 1138 1139 dests = excludedDestinations; 1140 if (dests != null && dests.length > 0) { 1141 for (ActiveMQDestination dest : dests) { 1142 DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest); 1143 if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1144 return false; 1145 } 1146 } 1147 } 1148 1149 dests = dynamicallyIncludedDestinations; 1150 if (dests != null && dests.length > 0) { 1151 for (ActiveMQDestination dest : dests) { 1152 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); 1153 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1154 return true; 1155 } 1156 } 1157 1158 return false; 1159 } 1160 return true; 1161 } 1162 1163 /** 1164 * Subscriptions for these destinations are always created 1165 */ 1166 protected void setupStaticDestinations() { 1167 ActiveMQDestination[] dests = staticallyIncludedDestinations; 1168 if (dests != null) { 1169 for (ActiveMQDestination dest : dests) { 1170 DemandSubscription sub = createDemandSubscription(dest); 1171 sub.setStaticallyIncluded(true); 1172 try { 1173 addSubscription(sub); 1174 } catch (IOException e) { 1175 LOG.error("Failed to add static destination {}", dest, e); 1176 } 1177 LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest); 1178 } 1179 } 1180 } 1181 1182 protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException { 1183 ConsumerInfo info = consumerInfo.copy(); 1184 addRemoteBrokerToBrokerPath(info); 1185 DemandSubscription sub = createDemandSubscription(info); 1186 if (sub != null) { 1187 if (duplicateSuppressionIsRequired(sub)) { 1188 undoMapRegistration(sub); 1189 } else { 1190 if (consumerInfo.isDurable()) { 1191 sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName())); 1192 } 1193 addSubscription(sub); 1194 LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub); 1195 } 1196 } 1197 } 1198 1199 private void undoMapRegistration(DemandSubscription sub) { 1200 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 1201 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 1202 } 1203 1204 /* 1205 * check our existing subs networkConsumerIds against the list of network 1206 * ids in this subscription A match means a duplicate which we suppress for 1207 * topics and maybe for queues 1208 */ 1209 private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) { 1210 final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); 1211 boolean suppress = false; 1212 1213 if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic() 1214 && !configuration.isSuppressDuplicateTopicSubscriptions()) { 1215 return suppress; 1216 } 1217 1218 List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds(); 1219 Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination()); 1220 for (Subscription sub : currentSubs) { 1221 List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); 1222 if (!networkConsumers.isEmpty()) { 1223 if (matchFound(candidateConsumers, networkConsumers)) { 1224 if (isInActiveDurableSub(sub)) { 1225 suppress = false; 1226 } else { 1227 suppress = hasLowerPriority(sub, candidate.getLocalInfo()); 1228 } 1229 break; 1230 } 1231 } 1232 } 1233 return suppress; 1234 } 1235 1236 private boolean isInActiveDurableSub(Subscription sub) { 1237 return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive()); 1238 } 1239 1240 private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { 1241 boolean suppress = false; 1242 1243 if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) { 1244 LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{ 1245 configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds() 1246 }); 1247 suppress = true; 1248 } else { 1249 // remove the existing lower priority duplicate and allow this candidate 1250 try { 1251 removeDuplicateSubscription(existingSub); 1252 1253 LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{ 1254 configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds() 1255 }); 1256 } catch (IOException e) { 1257 LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e); 1258 } 1259 } 1260 return suppress; 1261 } 1262 1263 private void removeDuplicateSubscription(Subscription existingSub) throws IOException { 1264 for (NetworkConnector connector : brokerService.getNetworkConnectors()) { 1265 if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) { 1266 break; 1267 } 1268 } 1269 } 1270 1271 private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) { 1272 boolean found = false; 1273 for (ConsumerId aliasConsumer : networkConsumers) { 1274 if (candidateConsumers.contains(aliasConsumer)) { 1275 found = true; 1276 break; 1277 } 1278 } 1279 return found; 1280 } 1281 1282 protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) { 1283 RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker(); 1284 Region region; 1285 Collection<Subscription> subs; 1286 1287 region = null; 1288 switch (dest.getDestinationType()) { 1289 case ActiveMQDestination.QUEUE_TYPE: 1290 region = region_broker.getQueueRegion(); 1291 break; 1292 case ActiveMQDestination.TOPIC_TYPE: 1293 region = region_broker.getTopicRegion(); 1294 break; 1295 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1296 region = region_broker.getTempQueueRegion(); 1297 break; 1298 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1299 region = region_broker.getTempTopicRegion(); 1300 break; 1301 } 1302 1303 if (region instanceof AbstractRegion) { 1304 subs = ((AbstractRegion) region).getSubscriptions().values(); 1305 } else { 1306 subs = null; 1307 } 1308 1309 return subs; 1310 } 1311 1312 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 1313 // add our original id to ourselves 1314 info.addNetworkConsumerId(info.getConsumerId()); 1315 return doCreateDemandSubscription(info); 1316 } 1317 1318 protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException { 1319 DemandSubscription result = new DemandSubscription(info); 1320 result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1321 if (info.getDestination().isTemporary()) { 1322 // reset the local connection Id 1323 ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination(); 1324 dest.setConnectionId(localConnectionInfo.getConnectionId().toString()); 1325 } 1326 1327 if (configuration.isDecreaseNetworkConsumerPriority()) { 1328 byte priority = (byte) configuration.getConsumerPriorityBase(); 1329 if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) { 1330 // The longer the path to the consumer, the less it's consumer priority. 1331 priority -= info.getBrokerPath().length + 1; 1332 } 1333 result.getLocalInfo().setPriority(priority); 1334 LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info}); 1335 } 1336 configureDemandSubscription(info, result); 1337 return result; 1338 } 1339 1340 final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) { 1341 ConsumerInfo info = new ConsumerInfo(); 1342 info.setNetworkSubscription(true); 1343 info.setDestination(destination); 1344 1345 // Indicate that this subscription is being made on behalf of the remote broker. 1346 info.setBrokerPath(new BrokerId[]{remoteBrokerId}); 1347 1348 // the remote info held by the DemandSubscription holds the original 1349 // consumerId, the local info get's overwritten 1350 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1351 DemandSubscription result = null; 1352 try { 1353 result = createDemandSubscription(info); 1354 } catch (IOException e) { 1355 LOG.error("Failed to create DemandSubscription ", e); 1356 } 1357 return result; 1358 } 1359 1360 protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { 1361 if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) || 1362 AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) { 1363 sub.getLocalInfo().setDispatchAsync(true); 1364 } else { 1365 sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); 1366 } 1367 sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize()); 1368 subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub); 1369 subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub); 1370 1371 sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info)); 1372 if (!info.isDurable()) { 1373 // This works for now since we use a VM connection to the local broker. 1374 // may need to change if we ever subscribe to a remote broker. 1375 sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter()); 1376 } else { 1377 sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); 1378 } 1379 } 1380 1381 protected void removeDemandSubscription(ConsumerId id) throws IOException { 1382 DemandSubscription sub = subscriptionMapByRemoteId.remove(id); 1383 LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{ 1384 configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub 1385 }); 1386 if (sub != null) { 1387 removeSubscription(sub); 1388 LOG.debug("{} removed sub on {} from {}: {}", new Object[]{ 1389 configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo() 1390 }); 1391 } 1392 } 1393 1394 protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) { 1395 boolean removeDone = false; 1396 DemandSubscription sub = subscriptionMapByLocalId.get(consumerId); 1397 if (sub != null) { 1398 try { 1399 removeDemandSubscription(sub.getRemoteInfo().getConsumerId()); 1400 removeDone = true; 1401 } catch (IOException e) { 1402 LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e); 1403 } 1404 } 1405 return removeDone; 1406 } 1407 1408 /** 1409 * Performs a timed wait on the started latch and then checks for disposed 1410 * before performing another wait each time the the started wait times out. 1411 */ 1412 protected boolean safeWaitUntilStarted() throws InterruptedException { 1413 while (!disposed.get()) { 1414 if (startedLatch.await(1, TimeUnit.SECONDS)) { 1415 break; 1416 } 1417 } 1418 return !disposed.get(); 1419 } 1420 1421 protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { 1422 NetworkBridgeFilterFactory filterFactory = defaultFilterFactory; 1423 if (brokerService != null && brokerService.getDestinationPolicy() != null) { 1424 PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination()); 1425 if (entry != null && entry.getNetworkBridgeFilterFactory() != null) { 1426 filterFactory = entry.getNetworkBridgeFilterFactory(); 1427 } 1428 } 1429 return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL()); 1430 } 1431 1432 protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException { 1433 info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath())); 1434 } 1435 1436 protected BrokerId[] getRemoteBrokerPath() { 1437 return remoteBrokerPath; 1438 } 1439 1440 @Override 1441 public void setNetworkBridgeListener(NetworkBridgeListener listener) { 1442 this.networkBridgeListener = listener; 1443 } 1444 1445 private void fireBridgeFailed(Throwable reason) { 1446 LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason); 1447 NetworkBridgeListener l = this.networkBridgeListener; 1448 if (l != null && this.bridgeFailed.compareAndSet(false, true)) { 1449 l.bridgeFailed(); 1450 } 1451 } 1452 1453 /** 1454 * @return Returns the dynamicallyIncludedDestinations. 1455 */ 1456 public ActiveMQDestination[] getDynamicallyIncludedDestinations() { 1457 return dynamicallyIncludedDestinations; 1458 } 1459 1460 /** 1461 * @param dynamicallyIncludedDestinations 1462 * The dynamicallyIncludedDestinations to set. 1463 */ 1464 public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) { 1465 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; 1466 } 1467 1468 /** 1469 * @return Returns the excludedDestinations. 1470 */ 1471 public ActiveMQDestination[] getExcludedDestinations() { 1472 return excludedDestinations; 1473 } 1474 1475 /** 1476 * @param excludedDestinations The excludedDestinations to set. 1477 */ 1478 public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) { 1479 this.excludedDestinations = excludedDestinations; 1480 } 1481 1482 /** 1483 * @return Returns the staticallyIncludedDestinations. 1484 */ 1485 public ActiveMQDestination[] getStaticallyIncludedDestinations() { 1486 return staticallyIncludedDestinations; 1487 } 1488 1489 /** 1490 * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set. 1491 */ 1492 public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) { 1493 this.staticallyIncludedDestinations = staticallyIncludedDestinations; 1494 } 1495 1496 /** 1497 * @return Returns the durableDestinations. 1498 */ 1499 public ActiveMQDestination[] getDurableDestinations() { 1500 return durableDestinations; 1501 } 1502 1503 /** 1504 * @param durableDestinations The durableDestinations to set. 1505 */ 1506 public void setDurableDestinations(ActiveMQDestination[] durableDestinations) { 1507 this.durableDestinations = durableDestinations; 1508 } 1509 1510 /** 1511 * @return Returns the localBroker. 1512 */ 1513 public Transport getLocalBroker() { 1514 return localBroker; 1515 } 1516 1517 /** 1518 * @return Returns the remoteBroker. 1519 */ 1520 public Transport getRemoteBroker() { 1521 return remoteBroker; 1522 } 1523 1524 /** 1525 * @return the createdByDuplex 1526 */ 1527 public boolean isCreatedByDuplex() { 1528 return this.createdByDuplex; 1529 } 1530 1531 /** 1532 * @param createdByDuplex the createdByDuplex to set 1533 */ 1534 public void setCreatedByDuplex(boolean createdByDuplex) { 1535 this.createdByDuplex = createdByDuplex; 1536 } 1537 1538 @Override 1539 public String getRemoteAddress() { 1540 return remoteBroker.getRemoteAddress(); 1541 } 1542 1543 @Override 1544 public String getLocalAddress() { 1545 return localBroker.getRemoteAddress(); 1546 } 1547 1548 @Override 1549 public String getRemoteBrokerName() { 1550 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); 1551 } 1552 1553 @Override 1554 public String getRemoteBrokerId() { 1555 return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString(); 1556 } 1557 1558 @Override 1559 public String getLocalBrokerName() { 1560 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); 1561 } 1562 1563 @Override 1564 public long getDequeueCounter() { 1565 return networkBridgeStatistics.getDequeues().getCount(); 1566 } 1567 1568 @Override 1569 public long getEnqueueCounter() { 1570 return networkBridgeStatistics.getEnqueues().getCount(); 1571 } 1572 1573 @Override 1574 public NetworkBridgeStatistics getNetworkBridgeStatistics() { 1575 return networkBridgeStatistics; 1576 } 1577 1578 protected boolean isDuplex() { 1579 return configuration.isDuplex() || createdByDuplex; 1580 } 1581 1582 public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() { 1583 return subscriptionMapByRemoteId; 1584 } 1585 1586 @Override 1587 public void setBrokerService(BrokerService brokerService) { 1588 this.brokerService = brokerService; 1589 this.localBrokerId = brokerService.getRegionBroker().getBrokerId(); 1590 localBrokerPath[0] = localBrokerId; 1591 } 1592 1593 @Override 1594 public void setMbeanObjectName(ObjectName objectName) { 1595 this.mbeanObjectName = objectName; 1596 } 1597 1598 @Override 1599 public ObjectName getMbeanObjectName() { 1600 return mbeanObjectName; 1601 } 1602 1603 @Override 1604 public void resetStats() { 1605 networkBridgeStatistics.reset(); 1606 } 1607 1608 /* 1609 * Used to allow for async tasks to await receipt of the BrokerInfo from the local and 1610 * remote sides of the network bridge. 1611 */ 1612 private static class FutureBrokerInfo implements Future<BrokerInfo> { 1613 1614 private final CountDownLatch slot = new CountDownLatch(1); 1615 private final AtomicBoolean disposed; 1616 private volatile BrokerInfo info = null; 1617 1618 public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) { 1619 this.info = info; 1620 this.disposed = disposed; 1621 } 1622 1623 @Override 1624 public boolean cancel(boolean mayInterruptIfRunning) { 1625 slot.countDown(); 1626 return true; 1627 } 1628 1629 @Override 1630 public boolean isCancelled() { 1631 return slot.getCount() == 0 && info == null; 1632 } 1633 1634 @Override 1635 public boolean isDone() { 1636 return info != null; 1637 } 1638 1639 @Override 1640 public BrokerInfo get() throws InterruptedException, ExecutionException { 1641 try { 1642 if (info == null) { 1643 while (!disposed.get()) { 1644 if (slot.await(1, TimeUnit.SECONDS)) { 1645 break; 1646 } 1647 } 1648 } 1649 return info; 1650 } catch (InterruptedException e) { 1651 Thread.currentThread().interrupt(); 1652 LOG.debug("Operation interrupted: {}", e, e); 1653 throw new InterruptedException("Interrupted."); 1654 } 1655 } 1656 1657 @Override 1658 public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 1659 try { 1660 if (info == null) { 1661 long deadline = System.currentTimeMillis() + unit.toMillis(timeout); 1662 1663 while (!disposed.get() || System.currentTimeMillis() < deadline) { 1664 if (slot.await(1, TimeUnit.MILLISECONDS)) { 1665 break; 1666 } 1667 } 1668 if (info == null) { 1669 throw new TimeoutException(); 1670 } 1671 } 1672 return info; 1673 } catch (InterruptedException e) { 1674 throw new InterruptedException("Interrupted."); 1675 } 1676 } 1677 1678 public void set(BrokerInfo info) { 1679 this.info = info; 1680 this.slot.countDown(); 1681 } 1682 } 1683 1684 protected void serviceOutbound(Message message) { 1685 NetworkBridgeListener l = this.networkBridgeListener; 1686 if (l != null) { 1687 l.onOutboundMessage(this, message); 1688 } 1689 } 1690 1691 protected void serviceInboundMessage(Message message) { 1692 NetworkBridgeListener l = this.networkBridgeListener; 1693 if (l != null) { 1694 l.onInboundMessage(this, message); 1695 } 1696 } 1697 1698 protected boolean canDuplexDispatch(Message message) { 1699 boolean result = true; 1700 if (configuration.isCheckDuplicateMessagesOnDuplex()){ 1701 final long producerSequenceId = message.getMessageId().getProducerSequenceId(); 1702 // messages are multiplexed on this producer so we need to query the persistenceAdapter 1703 long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId()); 1704 if (producerSequenceId <= lastStoredForMessageProducer) { 1705 result = false; 1706 LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{ 1707 (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer 1708 }); 1709 } 1710 } 1711 return result; 1712 } 1713 1714 protected long getStoredSequenceIdForMessage(MessageId messageId) { 1715 try { 1716 return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId()); 1717 } catch (IOException ignored) { 1718 LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored); 1719 } 1720 return -1; 1721 } 1722 1723}