001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.CancellationException; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.CopyOnWriteArrayList; 028import java.util.concurrent.Future; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030 031import org.apache.activemq.advisory.AdvisorySupport; 032import org.apache.activemq.broker.BrokerService; 033import org.apache.activemq.broker.ConnectionContext; 034import org.apache.activemq.broker.ProducerBrokerExchange; 035import org.apache.activemq.broker.region.policy.DispatchPolicy; 036import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; 037import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; 038import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; 039import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; 040import org.apache.activemq.broker.util.InsertionCountList; 041import org.apache.activemq.command.ActiveMQDestination; 042import org.apache.activemq.command.ConsumerInfo; 043import org.apache.activemq.command.ExceptionResponse; 044import org.apache.activemq.command.Message; 045import org.apache.activemq.command.MessageAck; 046import org.apache.activemq.command.MessageId; 047import org.apache.activemq.command.ProducerAck; 048import org.apache.activemq.command.ProducerInfo; 049import org.apache.activemq.command.Response; 050import org.apache.activemq.command.SubscriptionInfo; 051import org.apache.activemq.filter.MessageEvaluationContext; 052import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 053import org.apache.activemq.store.MessageRecoveryListener; 054import org.apache.activemq.store.TopicMessageStore; 055import org.apache.activemq.thread.Task; 056import org.apache.activemq.thread.TaskRunner; 057import org.apache.activemq.thread.TaskRunnerFactory; 058import org.apache.activemq.transaction.Synchronization; 059import org.apache.activemq.util.SubscriptionKey; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * The Topic is a destination that sends a copy of a message to every active 065 * Subscription registered. 066 */ 067public class Topic extends BaseDestination implements Task { 068 protected static final Logger LOG = LoggerFactory.getLogger(Topic.class); 069 private final TopicMessageStore topicStore; 070 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); 071 private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock(); 072 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); 073 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 074 private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 075 private final TaskRunner taskRunner; 076 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); 077 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 078 @Override 079 public void run() { 080 try { 081 Topic.this.taskRunner.wakeup(); 082 } catch (InterruptedException e) { 083 } 084 }; 085 }; 086 087 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, 088 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 089 super(brokerService, store, destination, parentStats); 090 this.topicStore = store; 091 subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null); 092 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); 093 } 094 095 @Override 096 public void initialize() throws Exception { 097 super.initialize(); 098 // set non default subscription recovery policy (override policyEntries) 099 if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) { 100 subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); 101 setAlwaysRetroactive(true); 102 } 103 if (store != null) { 104 // AMQ-2586: Better to leave this stat at zero than to give the user 105 // misleading metrics. 106 // int messageCount = store.getMessageCount(); 107 // destinationStatistics.getMessages().setCount(messageCount); 108 store.start(); 109 } 110 } 111 112 @Override 113 public List<Subscription> getConsumers() { 114 synchronized (consumers) { 115 return new ArrayList<Subscription>(consumers); 116 } 117 } 118 119 public boolean lock(MessageReference node, LockOwner sub) { 120 return true; 121 } 122 123 @Override 124 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { 125 if (!sub.getConsumerInfo().isDurable()) { 126 127 // Do a retroactive recovery if needed. 128 if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) { 129 130 // synchronize with dispatch method so that no new messages are sent 131 // while we are recovering a subscription to avoid out of order messages. 132 dispatchLock.writeLock().lock(); 133 try { 134 boolean applyRecovery = false; 135 synchronized (consumers) { 136 if (!consumers.contains(sub)){ 137 sub.add(context, this); 138 consumers.add(sub); 139 applyRecovery=true; 140 super.addSubscription(context, sub); 141 } 142 } 143 if (applyRecovery){ 144 subscriptionRecoveryPolicy.recover(context, this, sub); 145 } 146 } finally { 147 dispatchLock.writeLock().unlock(); 148 } 149 150 } else { 151 synchronized (consumers) { 152 if (!consumers.contains(sub)){ 153 sub.add(context, this); 154 consumers.add(sub); 155 super.addSubscription(context, sub); 156 } 157 } 158 } 159 } else { 160 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 161 super.addSubscription(context, sub); 162 sub.add(context, this); 163 if(dsub.isActive()) { 164 synchronized (consumers) { 165 boolean hasSubscription = false; 166 167 if (consumers.size() == 0) { 168 hasSubscription = false; 169 } else { 170 for (Subscription currentSub : consumers) { 171 if (currentSub.getConsumerInfo().isDurable()) { 172 DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub; 173 if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) { 174 hasSubscription = true; 175 break; 176 } 177 } 178 } 179 } 180 181 if (!hasSubscription) { 182 consumers.add(sub); 183 } 184 } 185 } 186 durableSubscribers.put(dsub.getSubscriptionKey(), dsub); 187 } 188 } 189 190 @Override 191 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { 192 if (!sub.getConsumerInfo().isDurable()) { 193 super.removeSubscription(context, sub, lastDeliveredSequenceId); 194 synchronized (consumers) { 195 consumers.remove(sub); 196 } 197 } 198 sub.remove(context, this); 199 } 200 201 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { 202 if (topicStore != null) { 203 topicStore.deleteSubscription(key.clientId, key.subscriptionName); 204 DurableTopicSubscription removed = durableSubscribers.remove(key); 205 if (removed != null) { 206 destinationStatistics.getConsumers().decrement(); 207 // deactivate and remove 208 removed.deactivate(false, 0l); 209 consumers.remove(removed); 210 } 211 } 212 } 213 214 private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) { 215 if (hasSelectorChanged(info1, info2)) { 216 return true; 217 } 218 219 return hasNoLocalChanged(info1, info2); 220 } 221 222 private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) { 223 // Prior to V11 the broker did not store the noLocal value for durable subs. 224 if (brokerService.getStoreOpenWireVersion() >= 11) { 225 if (info1.isNoLocal() ^ info2.isNoLocal()) { 226 return true; 227 } 228 } 229 230 return false; 231 } 232 233 private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) { 234 if (info1.getSelector() != null ^ info2.getSelector() != null) { 235 return true; 236 } 237 238 if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) { 239 return true; 240 } 241 242 return false; 243 } 244 245 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { 246 // synchronize with dispatch method so that no new messages are sent 247 // while we are recovering a subscription to avoid out of order messages. 248 dispatchLock.writeLock().lock(); 249 try { 250 251 if (topicStore == null) { 252 return; 253 } 254 255 // Recover the durable subscription. 256 String clientId = subscription.getSubscriptionKey().getClientId(); 257 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName(); 258 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName); 259 if (info != null) { 260 // Check to see if selector changed. 261 if (hasDurableSubChanged(info, subscription.getConsumerInfo())) { 262 // Need to delete the subscription 263 topicStore.deleteSubscription(clientId, subscriptionName); 264 info = null; 265 // Force a rebuild of the selector chain for the subscription otherwise 266 // the stored subscription is updated but the selector expression is not 267 // and the subscription will not behave according to the new configuration. 268 subscription.setSelector(subscription.getConsumerInfo().getSelector()); 269 synchronized (consumers) { 270 consumers.remove(subscription); 271 } 272 } else { 273 synchronized (consumers) { 274 if (!consumers.contains(subscription)) { 275 consumers.add(subscription); 276 } 277 } 278 } 279 } 280 281 // Do we need to create the subscription? 282 if (info == null) { 283 info = new SubscriptionInfo(); 284 info.setClientId(clientId); 285 info.setSelector(subscription.getConsumerInfo().getSelector()); 286 info.setSubscriptionName(subscriptionName); 287 info.setDestination(getActiveMQDestination()); 288 info.setNoLocal(subscription.getConsumerInfo().isNoLocal()); 289 // This destination is an actual destination id. 290 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); 291 // This destination might be a pattern 292 synchronized (consumers) { 293 consumers.add(subscription); 294 topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive()); 295 } 296 } 297 298 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 299 msgContext.setDestination(destination); 300 if (subscription.isRecoveryRequired()) { 301 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { 302 @Override 303 public boolean recoverMessage(Message message) throws Exception { 304 message.setRegionDestination(Topic.this); 305 try { 306 msgContext.setMessageReference(message); 307 if (subscription.matches(message, msgContext)) { 308 subscription.add(message); 309 } 310 } catch (IOException e) { 311 LOG.error("Failed to recover this message {}", message, e); 312 } 313 return true; 314 } 315 316 @Override 317 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 318 throw new RuntimeException("Should not be called."); 319 } 320 321 @Override 322 public boolean hasSpace() { 323 return true; 324 } 325 326 @Override 327 public boolean isDuplicate(MessageId id) { 328 return false; 329 } 330 }); 331 } 332 } finally { 333 dispatchLock.writeLock().unlock(); 334 } 335 } 336 337 public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception { 338 synchronized (consumers) { 339 consumers.remove(sub); 340 } 341 sub.remove(context, this, dispatched); 342 } 343 344 public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { 345 if (subscription.getConsumerInfo().isRetroactive()) { 346 subscriptionRecoveryPolicy.recover(context, this, subscription); 347 } 348 } 349 350 @Override 351 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 352 final ConnectionContext context = producerExchange.getConnectionContext(); 353 354 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 355 producerExchange.incrementSend(); 356 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 357 && !context.isInRecoveryMode(); 358 359 message.setRegionDestination(this); 360 361 // There is delay between the client sending it and it arriving at the 362 // destination.. it may have expired. 363 if (message.isExpired()) { 364 broker.messageExpired(context, message, null); 365 getDestinationStatistics().getExpired().increment(); 366 if (sendProducerAck) { 367 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 368 context.getConnection().dispatchAsync(ack); 369 } 370 return; 371 } 372 373 if (memoryUsage.isFull()) { 374 isFull(context, memoryUsage); 375 fastProducer(context, producerInfo); 376 377 if (isProducerFlowControl() && context.isProducerFlowControl()) { 378 379 if (warnOnProducerFlowControl) { 380 warnOnProducerFlowControl = false; 381 LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 382 getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); 383 } 384 385 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 386 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit (" 387 + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId() 388 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 389 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 390 } 391 392 // We can avoid blocking due to low usage if the producer is sending a sync message or 393 // if it is using a producer window 394 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 395 synchronized (messagesWaitingForSpace) { 396 messagesWaitingForSpace.add(new Runnable() { 397 @Override 398 public void run() { 399 try { 400 401 // While waiting for space to free up... the 402 // message may have expired. 403 if (message.isExpired()) { 404 broker.messageExpired(context, message, null); 405 getDestinationStatistics().getExpired().increment(); 406 } else { 407 doMessageSend(producerExchange, message); 408 } 409 410 if (sendProducerAck) { 411 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 412 .getSize()); 413 context.getConnection().dispatchAsync(ack); 414 } else { 415 Response response = new Response(); 416 response.setCorrelationId(message.getCommandId()); 417 context.getConnection().dispatchAsync(response); 418 } 419 420 } catch (Exception e) { 421 if (!sendProducerAck && !context.isInRecoveryMode()) { 422 ExceptionResponse response = new ExceptionResponse(e); 423 response.setCorrelationId(message.getCommandId()); 424 context.getConnection().dispatchAsync(response); 425 } 426 } 427 } 428 }); 429 430 registerCallbackForNotFullNotification(); 431 context.setDontSendReponse(true); 432 return; 433 } 434 435 } else { 436 // Producer flow control cannot be used, so we have do the flow control 437 // at the broker by blocking this thread until there is space available. 438 439 if (memoryUsage.isFull()) { 440 if (context.isInTransaction()) { 441 442 int count = 0; 443 while (!memoryUsage.waitForSpace(1000)) { 444 if (context.getStopping().get()) { 445 throw new IOException("Connection closed, send aborted."); 446 } 447 if (count > 2 && context.isInTransaction()) { 448 count = 0; 449 int size = context.getTransaction().size(); 450 LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message); 451 } 452 count++; 453 } 454 } else { 455 waitForSpace( 456 context, 457 producerExchange, 458 memoryUsage, 459 "Usage Manager Memory Usage limit reached. Stopping producer (" 460 + message.getProducerId() 461 + ") to prevent flooding " 462 + getActiveMQDestination().getQualifiedName() 463 + "." 464 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 465 } 466 } 467 468 // The usage manager could have delayed us by the time 469 // we unblock the message could have expired.. 470 if (message.isExpired()) { 471 getDestinationStatistics().getExpired().increment(); 472 LOG.debug("Expired message: {}", message); 473 return; 474 } 475 } 476 } 477 } 478 479 doMessageSend(producerExchange, message); 480 messageDelivered(context, message); 481 if (sendProducerAck) { 482 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 483 context.getConnection().dispatchAsync(ack); 484 } 485 } 486 487 /** 488 * do send the message - this needs to be synchronized to ensure messages 489 * are stored AND dispatched in the right order 490 * 491 * @param producerExchange 492 * @param message 493 * @throws IOException 494 * @throws Exception 495 */ 496 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) 497 throws IOException, Exception { 498 final ConnectionContext context = producerExchange.getConnectionContext(); 499 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 500 Future<Object> result = null; 501 502 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { 503 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 504 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of " 505 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() 506 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 507 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 508 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 509 throw new javax.jms.ResourceAllocationException(logMessage); 510 } 511 512 waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 513 } 514 result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage()); 515 } 516 517 message.incrementReferenceCount(); 518 519 if (context.isInTransaction()) { 520 context.getTransaction().addSynchronization(new Synchronization() { 521 @Override 522 public void afterCommit() throws Exception { 523 // It could take while before we receive the commit 524 // operation.. by that time the message could have 525 // expired.. 526 if (broker.isExpired(message)) { 527 getDestinationStatistics().getExpired().increment(); 528 broker.messageExpired(context, message, null); 529 message.decrementReferenceCount(); 530 return; 531 } 532 try { 533 dispatch(context, message); 534 } finally { 535 message.decrementReferenceCount(); 536 } 537 } 538 539 @Override 540 public void afterRollback() throws Exception { 541 message.decrementReferenceCount(); 542 } 543 }); 544 545 } else { 546 try { 547 dispatch(context, message); 548 } finally { 549 message.decrementReferenceCount(); 550 } 551 } 552 553 if (result != null && !result.isCancelled()) { 554 try { 555 result.get(); 556 } catch (CancellationException e) { 557 // ignore - the task has been cancelled if the message 558 // has already been deleted 559 } 560 } 561 } 562 563 private boolean canOptimizeOutPersistence() { 564 return durableSubscribers.size() == 0; 565 } 566 567 @Override 568 public String toString() { 569 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); 570 } 571 572 @Override 573 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, 574 final MessageReference node) throws IOException { 575 if (topicStore != null && node.isPersistent()) { 576 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 577 SubscriptionKey key = dsub.getSubscriptionKey(); 578 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), 579 convertToNonRangedAck(ack, node)); 580 } 581 messageConsumed(context, node); 582 } 583 584 @Override 585 public void gc() { 586 } 587 588 public Message loadMessage(MessageId messageId) throws IOException { 589 return topicStore != null ? topicStore.getMessage(messageId) : null; 590 } 591 592 @Override 593 public void start() throws Exception { 594 this.subscriptionRecoveryPolicy.start(); 595 if (memoryUsage != null) { 596 memoryUsage.start(); 597 } 598 599 if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) { 600 scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); 601 } 602 } 603 604 @Override 605 public void stop() throws Exception { 606 if (taskRunner != null) { 607 taskRunner.shutdown(); 608 } 609 this.subscriptionRecoveryPolicy.stop(); 610 if (memoryUsage != null) { 611 memoryUsage.stop(); 612 } 613 if (this.topicStore != null) { 614 this.topicStore.stop(); 615 } 616 617 scheduler.cancel(expireMessagesTask); 618 } 619 620 @Override 621 public Message[] browse() { 622 final List<Message> result = new ArrayList<Message>(); 623 doBrowse(result, getMaxBrowsePageSize()); 624 return result.toArray(new Message[result.size()]); 625 } 626 627 private void doBrowse(final List<Message> browseList, final int max) { 628 try { 629 if (topicStore != null) { 630 final List<Message> toExpire = new ArrayList<Message>(); 631 topicStore.recover(new MessageRecoveryListener() { 632 @Override 633 public boolean recoverMessage(Message message) throws Exception { 634 if (message.isExpired()) { 635 toExpire.add(message); 636 } 637 browseList.add(message); 638 return true; 639 } 640 641 @Override 642 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 643 return true; 644 } 645 646 @Override 647 public boolean hasSpace() { 648 return browseList.size() < max; 649 } 650 651 @Override 652 public boolean isDuplicate(MessageId id) { 653 return false; 654 } 655 }); 656 final ConnectionContext connectionContext = createConnectionContext(); 657 for (Message message : toExpire) { 658 for (DurableTopicSubscription sub : durableSubscribers.values()) { 659 if (!sub.isActive()) { 660 messageExpired(connectionContext, sub, message); 661 } 662 } 663 } 664 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination()); 665 if (msgs != null) { 666 for (int i = 0; i < msgs.length && browseList.size() < max; i++) { 667 browseList.add(msgs[i]); 668 } 669 } 670 } 671 } catch (Throwable e) { 672 LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e); 673 } 674 } 675 676 @Override 677 public boolean iterate() { 678 synchronized (messagesWaitingForSpace) { 679 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { 680 Runnable op = messagesWaitingForSpace.removeFirst(); 681 op.run(); 682 } 683 684 if (!messagesWaitingForSpace.isEmpty()) { 685 registerCallbackForNotFullNotification(); 686 } 687 } 688 return false; 689 } 690 691 private void registerCallbackForNotFullNotification() { 692 // If the usage manager is not full, then the task will not 693 // get called.. 694 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 695 // so call it directly here. 696 sendMessagesWaitingForSpaceTask.run(); 697 } 698 } 699 700 // Properties 701 // ------------------------------------------------------------------------- 702 703 public DispatchPolicy getDispatchPolicy() { 704 return dispatchPolicy; 705 } 706 707 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 708 this.dispatchPolicy = dispatchPolicy; 709 } 710 711 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 712 return subscriptionRecoveryPolicy; 713 } 714 715 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) { 716 if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) { 717 // allow users to combine retained message policy with other ActiveMQ policies 718 RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy; 719 policy.setWrapped(recoveryPolicy); 720 } else { 721 this.subscriptionRecoveryPolicy = recoveryPolicy; 722 } 723 } 724 725 // Implementation methods 726 // ------------------------------------------------------------------------- 727 728 @Override 729 public final void wakeup() { 730 } 731 732 protected void dispatch(final ConnectionContext context, Message message) throws Exception { 733 // AMQ-2586: Better to leave this stat at zero than to give the user 734 // misleading metrics. 735 // destinationStatistics.getMessages().increment(); 736 destinationStatistics.getEnqueues().increment(); 737 destinationStatistics.getMessageSize().addSize(message.getSize()); 738 MessageEvaluationContext msgContext = null; 739 740 dispatchLock.readLock().lock(); 741 try { 742 if (!subscriptionRecoveryPolicy.add(context, message)) { 743 return; 744 } 745 synchronized (consumers) { 746 if (consumers.isEmpty()) { 747 onMessageWithNoConsumers(context, message); 748 return; 749 } 750 } 751 msgContext = context.getMessageEvaluationContext(); 752 msgContext.setDestination(destination); 753 msgContext.setMessageReference(message); 754 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { 755 onMessageWithNoConsumers(context, message); 756 } 757 758 } finally { 759 dispatchLock.readLock().unlock(); 760 if (msgContext != null) { 761 msgContext.clear(); 762 } 763 } 764 } 765 766 private final Runnable expireMessagesTask = new Runnable() { 767 @Override 768 public void run() { 769 List<Message> browsedMessages = new InsertionCountList<Message>(); 770 doBrowse(browsedMessages, getMaxExpirePageSize()); 771 } 772 }; 773 774 @Override 775 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 776 broker.messageExpired(context, reference, subs); 777 // AMQ-2586: Better to leave this stat at zero than to give the user 778 // misleading metrics. 779 // destinationStatistics.getMessages().decrement(); 780 destinationStatistics.getExpired().increment(); 781 MessageAck ack = new MessageAck(); 782 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 783 ack.setDestination(destination); 784 ack.setMessageID(reference.getMessageId()); 785 try { 786 if (subs instanceof DurableTopicSubscription) { 787 ((DurableTopicSubscription)subs).removePending(reference); 788 } 789 acknowledge(context, subs, ack, reference); 790 } catch (Exception e) { 791 LOG.error("Failed to remove expired Message from the store ", e); 792 } 793 } 794 795 @Override 796 protected Logger getLog() { 797 return LOG; 798 } 799 800 protected boolean isOptimizeStorage(){ 801 boolean result = false; 802 803 if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){ 804 result = true; 805 for (DurableTopicSubscription s : durableSubscribers.values()) { 806 if (s.isActive()== false){ 807 result = false; 808 break; 809 } 810 if (s.getPrefetchSize()==0){ 811 result = false; 812 break; 813 } 814 if (s.isSlowConsumer()){ 815 result = false; 816 break; 817 } 818 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){ 819 result = false; 820 break; 821 } 822 } 823 } 824 return result; 825 } 826 827 /** 828 * force a reread of the store - after transaction recovery completion 829 */ 830 @Override 831 public void clearPendingMessages() { 832 dispatchLock.readLock().lock(); 833 try { 834 for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) { 835 clearPendingAndDispatch(durableTopicSubscription); 836 } 837 } finally { 838 dispatchLock.readLock().unlock(); 839 } 840 } 841 842 private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) { 843 synchronized (durableTopicSubscription.pendingLock) { 844 durableTopicSubscription.pending.clear(); 845 try { 846 durableTopicSubscription.dispatchPending(); 847 } catch (IOException exception) { 848 LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{ 849 durableTopicSubscription, 850 destination, 851 durableTopicSubscription.pending }, exception); 852 } 853 } 854 } 855 856 public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() { 857 return durableSubscribers; 858 } 859}