001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.net.URI; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Locale; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.CopyOnWriteArrayList; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.locks.ReentrantReadWriteLock; 032 033import javax.jms.InvalidClientIDException; 034import javax.jms.JMSException; 035 036import org.apache.activemq.broker.Broker; 037import org.apache.activemq.broker.BrokerService; 038import org.apache.activemq.broker.Connection; 039import org.apache.activemq.broker.ConnectionContext; 040import org.apache.activemq.broker.ConsumerBrokerExchange; 041import org.apache.activemq.broker.EmptyBroker; 042import org.apache.activemq.broker.ProducerBrokerExchange; 043import org.apache.activemq.broker.TransportConnection; 044import org.apache.activemq.broker.TransportConnector; 045import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 046import org.apache.activemq.broker.region.policy.PolicyMap; 047import org.apache.activemq.command.ActiveMQDestination; 048import org.apache.activemq.command.ActiveMQMessage; 049import org.apache.activemq.command.BrokerId; 050import org.apache.activemq.command.BrokerInfo; 051import org.apache.activemq.command.ConnectionId; 052import org.apache.activemq.command.ConnectionInfo; 053import org.apache.activemq.command.ConsumerControl; 054import org.apache.activemq.command.ConsumerInfo; 055import org.apache.activemq.command.DestinationInfo; 056import org.apache.activemq.command.Message; 057import org.apache.activemq.command.MessageAck; 058import org.apache.activemq.command.MessageDispatch; 059import org.apache.activemq.command.MessageDispatchNotification; 060import org.apache.activemq.command.MessagePull; 061import org.apache.activemq.command.ProducerInfo; 062import org.apache.activemq.command.RemoveSubscriptionInfo; 063import org.apache.activemq.command.Response; 064import org.apache.activemq.command.TransactionId; 065import org.apache.activemq.state.ConnectionState; 066import org.apache.activemq.store.PListStore; 067import org.apache.activemq.thread.Scheduler; 068import org.apache.activemq.thread.TaskRunnerFactory; 069import org.apache.activemq.transport.TransmitCallback; 070import org.apache.activemq.usage.SystemUsage; 071import org.apache.activemq.util.BrokerSupport; 072import org.apache.activemq.util.IdGenerator; 073import org.apache.activemq.util.InetAddressUtil; 074import org.apache.activemq.util.LongSequenceGenerator; 075import org.apache.activemq.util.ServiceStopper; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079/** 080 * Routes Broker operations to the correct messaging regions for processing. 081 */ 082public class RegionBroker extends EmptyBroker { 083 public static final String ORIGINAL_EXPIRATION = "originalExpiration"; 084 private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class); 085 private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); 086 087 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 088 protected DestinationFactory destinationFactory; 089 protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>()); 090 091 private final Region queueRegion; 092 private final Region topicRegion; 093 private final Region tempQueueRegion; 094 private final Region tempTopicRegion; 095 protected final BrokerService brokerService; 096 private boolean started; 097 private boolean keepDurableSubsActive; 098 099 private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>(); 100 private final Map<ActiveMQDestination, ActiveMQDestination> destinationGate = new HashMap<ActiveMQDestination, ActiveMQDestination>(); 101 private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 102 private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>(); 103 104 private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 105 private BrokerId brokerId; 106 private String brokerName; 107 private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>(); 108 private final DestinationInterceptor destinationInterceptor; 109 private ConnectionContext adminConnectionContext; 110 private final Scheduler scheduler; 111 private final ThreadPoolExecutor executor; 112 private boolean allowTempAutoCreationOnSend; 113 114 private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock(); 115 private final Runnable purgeInactiveDestinationsTask = new Runnable() { 116 @Override 117 public void run() { 118 purgeInactiveDestinations(); 119 } 120 }; 121 122 public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory, 123 DestinationInterceptor destinationInterceptor, Scheduler scheduler, ThreadPoolExecutor executor) throws IOException { 124 this.brokerService = brokerService; 125 this.executor = executor; 126 this.scheduler = scheduler; 127 if (destinationFactory == null) { 128 throw new IllegalArgumentException("null destinationFactory"); 129 } 130 this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId()); 131 this.destinationFactory = destinationFactory; 132 queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory); 133 topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); 134 this.destinationInterceptor = destinationInterceptor; 135 tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory); 136 tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); 137 } 138 139 @Override 140 public Map<ActiveMQDestination, Destination> getDestinationMap() { 141 Map<ActiveMQDestination, Destination> answer = new HashMap<ActiveMQDestination, Destination>(getQueueRegion().getDestinationMap()); 142 answer.putAll(getTopicRegion().getDestinationMap()); 143 return answer; 144 } 145 146 @Override 147 public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination destination) { 148 try { 149 return getRegion(destination).getDestinationMap(); 150 } catch (JMSException jmse) { 151 return Collections.emptyMap(); 152 } 153 } 154 155 @Override 156 public Set<Destination> getDestinations(ActiveMQDestination destination) { 157 try { 158 return getRegion(destination).getDestinations(destination); 159 } catch (JMSException jmse) { 160 return Collections.emptySet(); 161 } 162 } 163 164 @Override 165 @SuppressWarnings("rawtypes") 166 public Broker getAdaptor(Class type) { 167 if (type.isInstance(this)) { 168 return this; 169 } 170 return null; 171 } 172 173 public Region getQueueRegion() { 174 return queueRegion; 175 } 176 177 public Region getTempQueueRegion() { 178 return tempQueueRegion; 179 } 180 181 public Region getTempTopicRegion() { 182 return tempTopicRegion; 183 } 184 185 public Region getTopicRegion() { 186 return topicRegion; 187 } 188 189 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 190 return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 191 } 192 193 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 194 return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 195 } 196 197 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 198 return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 199 } 200 201 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 202 return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 203 } 204 205 @Override 206 public void start() throws Exception { 207 started = true; 208 queueRegion.start(); 209 topicRegion.start(); 210 tempQueueRegion.start(); 211 tempTopicRegion.start(); 212 int period = this.brokerService.getSchedulePeriodForDestinationPurge(); 213 if (period > 0) { 214 this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period); 215 } 216 } 217 218 @Override 219 public void stop() throws Exception { 220 started = false; 221 this.scheduler.cancel(purgeInactiveDestinationsTask); 222 ServiceStopper ss = new ServiceStopper(); 223 doStop(ss); 224 ss.throwFirstException(); 225 // clear the state 226 clientIdSet.clear(); 227 connections.clear(); 228 destinations.clear(); 229 brokerInfos.clear(); 230 } 231 232 public PolicyMap getDestinationPolicy() { 233 return brokerService != null ? brokerService.getDestinationPolicy() : null; 234 } 235 236 public ConnectionContext getConnectionContext(String clientId) { 237 return clientIdSet.get(clientId); 238 } 239 240 @Override 241 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 242 String clientId = info.getClientId(); 243 if (clientId == null) { 244 throw new InvalidClientIDException("No clientID specified for connection request"); 245 } 246 247 ConnectionContext oldContext = null; 248 249 synchronized (clientIdSet) { 250 oldContext = clientIdSet.get(clientId); 251 if (oldContext != null) { 252 if (context.isAllowLinkStealing()) { 253 clientIdSet.put(clientId, context); 254 } else { 255 throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " 256 + oldContext.getConnection().getRemoteAddress()); 257 } 258 } else { 259 clientIdSet.put(clientId, context); 260 } 261 } 262 263 if (oldContext != null) { 264 if (oldContext.getConnection() != null) { 265 Connection connection = oldContext.getConnection(); 266 LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection()); 267 if (connection instanceof TransportConnection) { 268 TransportConnection transportConnection = (TransportConnection) connection; 269 transportConnection.stopAsync(new IOException("Stealing link for clientId " + clientId + " From Connection " + oldContext.getConnection().getConnectionId())); 270 } else { 271 connection.stop(); 272 } 273 } else { 274 LOG.error("No Connection found for {}", oldContext); 275 } 276 } 277 278 connections.add(context.getConnection()); 279 } 280 281 @Override 282 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 283 String clientId = info.getClientId(); 284 if (clientId == null) { 285 throw new InvalidClientIDException("No clientID specified for connection disconnect request"); 286 } 287 synchronized (clientIdSet) { 288 ConnectionContext oldValue = clientIdSet.get(clientId); 289 // we may be removing the duplicate connection, not the first connection to be created 290 // so lets check that their connection IDs are the same 291 if (oldValue == context) { 292 if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) { 293 clientIdSet.remove(clientId); 294 } 295 } 296 } 297 connections.remove(context.getConnection()); 298 } 299 300 protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) { 301 return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2)); 302 } 303 304 @Override 305 public Connection[] getClients() throws Exception { 306 ArrayList<Connection> l = new ArrayList<Connection>(connections); 307 Connection rc[] = new Connection[l.size()]; 308 l.toArray(rc); 309 return rc; 310 } 311 312 @Override 313 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemp) throws Exception { 314 315 Destination answer; 316 317 answer = destinations.get(destination); 318 if (answer != null) { 319 return answer; 320 } 321 322 synchronized (destinationGate) { 323 answer = destinations.get(destination); 324 if (answer != null) { 325 return answer; 326 } 327 328 if (destinationGate.get(destination) != null) { 329 // Guard against spurious wakeup. 330 while (destinationGate.containsKey(destination)) { 331 destinationGate.wait(); 332 } 333 answer = destinations.get(destination); 334 if (answer != null) { 335 return answer; 336 } else { 337 // In case of intermediate remove or add failure 338 destinationGate.put(destination, destination); 339 } 340 } 341 } 342 343 try { 344 boolean create = true; 345 if (destination.isTemporary()) { 346 create = createIfTemp; 347 } 348 answer = getRegion(destination).addDestination(context, destination, create); 349 destinations.put(destination, answer); 350 } finally { 351 synchronized (destinationGate) { 352 destinationGate.remove(destination); 353 destinationGate.notifyAll(); 354 } 355 } 356 357 return answer; 358 } 359 360 @Override 361 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 362 if (destinations.containsKey(destination)) { 363 getRegion(destination).removeDestination(context, destination, timeout); 364 destinations.remove(destination); 365 } 366 } 367 368 @Override 369 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 370 addDestination(context, info.getDestination(), true); 371 372 } 373 374 @Override 375 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 376 removeDestination(context, info.getDestination(), info.getTimeout()); 377 } 378 379 @Override 380 public ActiveMQDestination[] getDestinations() throws Exception { 381 ArrayList<ActiveMQDestination> l; 382 383 l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet()); 384 385 ActiveMQDestination rc[] = new ActiveMQDestination[l.size()]; 386 l.toArray(rc); 387 return rc; 388 } 389 390 @Override 391 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 392 ActiveMQDestination destination = info.getDestination(); 393 if (destination != null) { 394 inactiveDestinationsPurgeLock.readLock().lock(); 395 try { 396 // This seems to cause the destination to be added but without 397 // advisories firing... 398 context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend()); 399 getRegion(destination).addProducer(context, info); 400 } finally { 401 inactiveDestinationsPurgeLock.readLock().unlock(); 402 } 403 } 404 } 405 406 @Override 407 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 408 ActiveMQDestination destination = info.getDestination(); 409 if (destination != null) { 410 inactiveDestinationsPurgeLock.readLock().lock(); 411 try { 412 getRegion(destination).removeProducer(context, info); 413 } finally { 414 inactiveDestinationsPurgeLock.readLock().unlock(); 415 } 416 } 417 } 418 419 @Override 420 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 421 ActiveMQDestination destination = info.getDestination(); 422 if (destinationInterceptor != null) { 423 destinationInterceptor.create(this, context, destination); 424 } 425 inactiveDestinationsPurgeLock.readLock().lock(); 426 try { 427 return getRegion(destination).addConsumer(context, info); 428 } finally { 429 inactiveDestinationsPurgeLock.readLock().unlock(); 430 } 431 } 432 433 @Override 434 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 435 ActiveMQDestination destination = info.getDestination(); 436 inactiveDestinationsPurgeLock.readLock().lock(); 437 try { 438 getRegion(destination).removeConsumer(context, info); 439 } finally { 440 inactiveDestinationsPurgeLock.readLock().unlock(); 441 } 442 } 443 444 @Override 445 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 446 inactiveDestinationsPurgeLock.readLock().lock(); 447 try { 448 topicRegion.removeSubscription(context, info); 449 } finally { 450 inactiveDestinationsPurgeLock.readLock().unlock(); 451 452 } 453 } 454 455 @Override 456 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 457 ActiveMQDestination destination = message.getDestination(); 458 message.setBrokerInTime(System.currentTimeMillis()); 459 if (producerExchange.isMutable() || producerExchange.getRegion() == null 460 || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) { 461 // ensure the destination is registered with the RegionBroker 462 producerExchange.getConnectionContext().getBroker() 463 .addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend()); 464 producerExchange.setRegion(getRegion(destination)); 465 producerExchange.setRegionDestination(null); 466 } 467 468 producerExchange.getRegion().send(producerExchange, message); 469 470 // clean up so these references aren't kept (possible leak) in the producer exchange 471 // especially since temps are transitory 472 if (producerExchange.isMutable()) { 473 producerExchange.setRegionDestination(null); 474 producerExchange.setRegion(null); 475 } 476 } 477 478 @Override 479 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 480 if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) { 481 ActiveMQDestination destination = ack.getDestination(); 482 consumerExchange.setRegion(getRegion(destination)); 483 } 484 consumerExchange.getRegion().acknowledge(consumerExchange, ack); 485 } 486 487 protected Region getRegion(ActiveMQDestination destination) throws JMSException { 488 switch (destination.getDestinationType()) { 489 case ActiveMQDestination.QUEUE_TYPE: 490 return queueRegion; 491 case ActiveMQDestination.TOPIC_TYPE: 492 return topicRegion; 493 case ActiveMQDestination.TEMP_QUEUE_TYPE: 494 return tempQueueRegion; 495 case ActiveMQDestination.TEMP_TOPIC_TYPE: 496 return tempTopicRegion; 497 default: 498 throw createUnknownDestinationTypeException(destination); 499 } 500 } 501 502 @Override 503 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 504 ActiveMQDestination destination = pull.getDestination(); 505 return getRegion(destination).messagePull(context, pull); 506 } 507 508 @Override 509 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 510 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 511 } 512 513 @Override 514 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 515 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 516 } 517 518 @Override 519 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 520 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 521 } 522 523 @Override 524 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 525 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 526 } 527 528 @Override 529 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 530 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 531 } 532 533 @Override 534 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 535 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 536 } 537 538 @Override 539 public void gc() { 540 queueRegion.gc(); 541 topicRegion.gc(); 542 } 543 544 @Override 545 public BrokerId getBrokerId() { 546 if (brokerId == null) { 547 brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId()); 548 } 549 return brokerId; 550 } 551 552 public void setBrokerId(BrokerId brokerId) { 553 this.brokerId = brokerId; 554 } 555 556 @Override 557 public String getBrokerName() { 558 if (brokerName == null) { 559 try { 560 brokerName = InetAddressUtil.getLocalHostName().toLowerCase(Locale.ENGLISH); 561 } catch (Exception e) { 562 brokerName = "localhost"; 563 } 564 } 565 return brokerName; 566 } 567 568 public void setBrokerName(String brokerName) { 569 this.brokerName = brokerName; 570 } 571 572 public DestinationStatistics getDestinationStatistics() { 573 return destinationStatistics; 574 } 575 576 protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) { 577 return new JMSException("Unknown destination type: " + destination.getDestinationType()); 578 } 579 580 @Override 581 public synchronized void addBroker(Connection connection, BrokerInfo info) { 582 BrokerInfo existing = brokerInfos.get(info.getBrokerId()); 583 if (existing == null) { 584 existing = info.copy(); 585 existing.setPeerBrokerInfos(null); 586 brokerInfos.put(info.getBrokerId(), existing); 587 } 588 existing.incrementRefCount(); 589 LOG.debug("{} addBroker: {} brokerInfo size: {}", new Object[]{ getBrokerName(), info.getBrokerName(), brokerInfos.size() }); 590 addBrokerInClusterUpdate(info); 591 } 592 593 @Override 594 public synchronized void removeBroker(Connection connection, BrokerInfo info) { 595 if (info != null) { 596 BrokerInfo existing = brokerInfos.get(info.getBrokerId()); 597 if (existing != null && existing.decrementRefCount() == 0) { 598 brokerInfos.remove(info.getBrokerId()); 599 } 600 LOG.debug("{} removeBroker: {} brokerInfo size: {}", new Object[]{ getBrokerName(), info.getBrokerName(), brokerInfos.size()}); 601 // When stopping don't send cluster updates since we are the one's tearing down 602 // our own bridges. 603 if (!brokerService.isStopping()) { 604 removeBrokerInClusterUpdate(info); 605 } 606 } 607 } 608 609 @Override 610 public synchronized BrokerInfo[] getPeerBrokerInfos() { 611 BrokerInfo[] result = new BrokerInfo[brokerInfos.size()]; 612 result = brokerInfos.values().toArray(result); 613 return result; 614 } 615 616 @Override 617 public void preProcessDispatch(final MessageDispatch messageDispatch) { 618 final Message message = messageDispatch.getMessage(); 619 if (message != null) { 620 long endTime = System.currentTimeMillis(); 621 message.setBrokerOutTime(endTime); 622 if (getBrokerService().isEnableStatistics()) { 623 long totalTime = endTime - message.getBrokerInTime(); 624 ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime); 625 } 626 if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered()) { 627 final int originalValue = message.getRedeliveryCounter(); 628 message.incrementRedeliveryCounter(); 629 try { 630 if (message.isPersistent()) { 631 ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message); 632 } 633 messageDispatch.setTransmitCallback(new TransmitCallback() { 634 // dispatch is considered a delivery, so update sub state post dispatch otherwise 635 // on a disconnect/reconnect cached messages will not reflect initial delivery attempt 636 final TransmitCallback delegate = messageDispatch.getTransmitCallback(); 637 @Override 638 public void onSuccess() { 639 message.incrementRedeliveryCounter(); 640 if (delegate != null) { 641 delegate.onSuccess(); 642 } 643 } 644 645 @Override 646 public void onFailure() { 647 if (delegate != null) { 648 delegate.onFailure(); 649 } 650 } 651 }); 652 } catch (IOException error) { 653 RuntimeException runtimeException = new RuntimeException("Failed to persist JMSRedeliveryFlag on " + message.getMessageId() + " in " + message.getDestination(), error); 654 LOG.warn(runtimeException.getLocalizedMessage(), runtimeException); 655 throw runtimeException; 656 } finally { 657 message.setRedeliveryCounter(originalValue); 658 } 659 } 660 } 661 } 662 663 @Override 664 public void postProcessDispatch(MessageDispatch messageDispatch) { 665 } 666 667 @Override 668 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 669 ActiveMQDestination destination = messageDispatchNotification.getDestination(); 670 getRegion(destination).processDispatchNotification(messageDispatchNotification); 671 } 672 673 @Override 674 public boolean isStopped() { 675 return !started; 676 } 677 678 @Override 679 public Set<ActiveMQDestination> getDurableDestinations() { 680 return destinationFactory.getDestinations(); 681 } 682 683 protected void doStop(ServiceStopper ss) { 684 ss.stop(queueRegion); 685 ss.stop(topicRegion); 686 ss.stop(tempQueueRegion); 687 ss.stop(tempTopicRegion); 688 } 689 690 public boolean isKeepDurableSubsActive() { 691 return keepDurableSubsActive; 692 } 693 694 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 695 this.keepDurableSubsActive = keepDurableSubsActive; 696 ((TopicRegion) topicRegion).setKeepDurableSubsActive(keepDurableSubsActive); 697 } 698 699 public DestinationInterceptor getDestinationInterceptor() { 700 return destinationInterceptor; 701 } 702 703 @Override 704 public ConnectionContext getAdminConnectionContext() { 705 return adminConnectionContext; 706 } 707 708 @Override 709 public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { 710 this.adminConnectionContext = adminConnectionContext; 711 } 712 713 public Map<ConnectionId, ConnectionState> getConnectionStates() { 714 return connectionStates; 715 } 716 717 @Override 718 public PListStore getTempDataStore() { 719 return brokerService.getTempDataStore(); 720 } 721 722 @Override 723 public URI getVmConnectorURI() { 724 return brokerService.getVmConnectorURI(); 725 } 726 727 @Override 728 public void brokerServiceStarted() { 729 } 730 731 @Override 732 public BrokerService getBrokerService() { 733 return brokerService; 734 } 735 736 @Override 737 public boolean isExpired(MessageReference messageReference) { 738 boolean expired = false; 739 if (messageReference.isExpired()) { 740 try { 741 // prevent duplicate expiry processing 742 Message message = messageReference.getMessage(); 743 synchronized (message) { 744 expired = stampAsExpired(message); 745 } 746 } catch (IOException e) { 747 LOG.warn("unexpected exception on message expiry determination for: {}", messageReference, e); 748 } 749 } 750 return expired; 751 } 752 753 private boolean stampAsExpired(Message message) throws IOException { 754 boolean stamped = false; 755 if (message.getProperty(ORIGINAL_EXPIRATION) == null) { 756 long expiration = message.getExpiration(); 757 message.setProperty(ORIGINAL_EXPIRATION, new Long(expiration)); 758 stamped = true; 759 } 760 return stamped; 761 } 762 763 @Override 764 public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) { 765 LOG.debug("Message expired {}", node); 766 getRoot().sendToDeadLetterQueue(context, node, subscription, new Throwable("Message Expired. Expiration:" + node.getExpiration())); 767 } 768 769 @Override 770 public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription, Throwable poisonCause) { 771 try { 772 if (node != null) { 773 Message message = node.getMessage(); 774 if (message != null && node.getRegionDestination() != null) { 775 DeadLetterStrategy deadLetterStrategy = ((Destination) node.getRegionDestination()).getDeadLetterStrategy(); 776 if (deadLetterStrategy != null) { 777 if (deadLetterStrategy.isSendToDeadLetterQueue(message)) { 778 ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message, subscription); 779 // Prevent a DLQ loop where same message is sent from a DLQ back to itself 780 if (deadLetterDestination.equals(message.getDestination())) { 781 LOG.debug("Not re-adding to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination()); 782 return false; 783 } 784 785 // message may be inflight to other subscriptions so do not modify 786 message = message.copy(); 787 long dlqExpiration = deadLetterStrategy.getExpiration(); 788 if (dlqExpiration > 0) { 789 dlqExpiration += System.currentTimeMillis(); 790 } else { 791 stampAsExpired(message); 792 } 793 message.setExpiration(dlqExpiration); 794 if (!message.isPersistent()) { 795 message.setPersistent(true); 796 message.setProperty("originalDeliveryMode", "NON_PERSISTENT"); 797 } 798 if (poisonCause != null) { 799 message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, 800 poisonCause.toString()); 801 } 802 // The original destination and transaction id do 803 // not get filled when the message is first sent, 804 // it is only populated if the message is routed to 805 // another destination like the DLQ 806 ConnectionContext adminContext = context; 807 if (context.getSecurityContext() == null || !context.getSecurityContext().isBrokerContext()) { 808 adminContext = BrokerSupport.getConnectionContext(this); 809 } 810 addDestination(adminContext, deadLetterDestination, false).getActiveMQDestination().setDLQ(); 811 BrokerSupport.resendNoCopy(adminContext, message, deadLetterDestination); 812 return true; 813 } 814 } else { 815 LOG.debug("Dead Letter message with no DLQ strategy in place, message id: {}, destination: {}", message.getMessageId(), message.getDestination()); 816 } 817 } 818 } 819 } catch (Exception e) { 820 LOG.warn("Caught an exception sending to DLQ: {}", node, e); 821 } 822 823 return false; 824 } 825 826 @Override 827 public Broker getRoot() { 828 try { 829 return getBrokerService().getBroker(); 830 } catch (Exception e) { 831 LOG.error("Trying to get Root Broker", e); 832 throw new RuntimeException("The broker from the BrokerService should not throw an exception"); 833 } 834 } 835 836 /** 837 * @return the broker sequence id 838 */ 839 @Override 840 public long getBrokerSequenceId() { 841 synchronized (sequenceGenerator) { 842 return sequenceGenerator.getNextSequenceId(); 843 } 844 } 845 846 @Override 847 public Scheduler getScheduler() { 848 return this.scheduler; 849 } 850 851 @Override 852 public ThreadPoolExecutor getExecutor() { 853 return this.executor; 854 } 855 856 @Override 857 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { 858 ActiveMQDestination destination = control.getDestination(); 859 try { 860 getRegion(destination).processConsumerControl(consumerExchange, control); 861 } catch (JMSException jmse) { 862 LOG.warn("unmatched destination: {}, in consumerControl: {}", destination, control); 863 } 864 } 865 866 protected void addBrokerInClusterUpdate(BrokerInfo info) { 867 List<TransportConnector> connectors = this.brokerService.getTransportConnectors(); 868 for (TransportConnector connector : connectors) { 869 if (connector.isUpdateClusterClients()) { 870 connector.addPeerBroker(info); 871 connector.updateClientClusterInfo(); 872 } 873 } 874 } 875 876 protected void removeBrokerInClusterUpdate(BrokerInfo info) { 877 List<TransportConnector> connectors = this.brokerService.getTransportConnectors(); 878 for (TransportConnector connector : connectors) { 879 if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) { 880 connector.removePeerBroker(info); 881 connector.updateClientClusterInfo(); 882 } 883 } 884 } 885 886 protected void purgeInactiveDestinations() { 887 inactiveDestinationsPurgeLock.writeLock().lock(); 888 try { 889 List<Destination> list = new ArrayList<Destination>(); 890 Map<ActiveMQDestination, Destination> map = getDestinationMap(); 891 if (isAllowTempAutoCreationOnSend()) { 892 map.putAll(tempQueueRegion.getDestinationMap()); 893 map.putAll(tempTopicRegion.getDestinationMap()); 894 } 895 long maxPurgedDests = this.brokerService.getMaxPurgedDestinationsPerSweep(); 896 long timeStamp = System.currentTimeMillis(); 897 for (Destination d : map.values()) { 898 d.markForGC(timeStamp); 899 if (d.canGC()) { 900 list.add(d); 901 if (maxPurgedDests > 0 && list.size() == maxPurgedDests) { 902 break; 903 } 904 } 905 } 906 907 if (!list.isEmpty()) { 908 ConnectionContext context = BrokerSupport.getConnectionContext(this); 909 context.setBroker(this); 910 911 for (Destination dest : list) { 912 Logger log = LOG; 913 if (dest instanceof BaseDestination) { 914 log = ((BaseDestination) dest).getLog(); 915 } 916 log.info("{} Inactive for longer than {} ms - removing ...", dest.getName(), dest.getInactiveTimeoutBeforeGC()); 917 try { 918 getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0); 919 } catch (Exception e) { 920 LOG.error("Failed to remove inactive destination {}", dest, e); 921 } 922 } 923 } 924 } finally { 925 inactiveDestinationsPurgeLock.writeLock().unlock(); 926 } 927 } 928 929 public boolean isAllowTempAutoCreationOnSend() { 930 return allowTempAutoCreationOnSend; 931 } 932 933 public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) { 934 this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; 935 } 936 937 @Override 938 public void reapplyInterceptor() { 939 queueRegion.reapplyInterceptor(); 940 topicRegion.reapplyInterceptor(); 941 tempQueueRegion.reapplyInterceptor(); 942 tempTopicRegion.reapplyInterceptor(); 943 } 944}