001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.List; 021import java.util.Set; 022 023import org.apache.activemq.broker.Broker; 024import org.apache.activemq.broker.ConnectionContext; 025import org.apache.activemq.broker.ProducerBrokerExchange; 026import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 027import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; 028import org.apache.activemq.command.ActiveMQDestination; 029import org.apache.activemq.command.Message; 030import org.apache.activemq.command.MessageAck; 031import org.apache.activemq.command.MessageDispatchNotification; 032import org.apache.activemq.command.ProducerInfo; 033import org.apache.activemq.store.MessageStore; 034import org.apache.activemq.usage.MemoryUsage; 035import org.apache.activemq.usage.Usage; 036import org.apache.activemq.util.SubscriptionKey; 037 038/** 039 * 040 * 041 */ 042public class DestinationFilter implements Destination { 043 044 protected final Destination next; 045 046 public DestinationFilter(Destination next) { 047 this.next = next; 048 } 049 050 @Override 051 public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException { 052 next.acknowledge(context, sub, ack, node); 053 } 054 055 @Override 056 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { 057 next.addSubscription(context, sub); 058 } 059 060 @Override 061 public Message[] browse() { 062 return next.browse(); 063 } 064 065 @Override 066 public void dispose(ConnectionContext context) throws IOException { 067 next.dispose(context); 068 } 069 070 @Override 071 public boolean isDisposed() { 072 return next.isDisposed(); 073 } 074 075 @Override 076 public void gc() { 077 next.gc(); 078 } 079 080 @Override 081 public void markForGC(long timeStamp) { 082 next.markForGC(timeStamp); 083 } 084 085 @Override 086 public boolean canGC() { 087 return next.canGC(); 088 } 089 090 @Override 091 public long getInactiveTimeoutBeforeGC() { 092 return next.getInactiveTimeoutBeforeGC(); 093 } 094 095 @Override 096 public ActiveMQDestination getActiveMQDestination() { 097 return next.getActiveMQDestination(); 098 } 099 100 @Override 101 public DeadLetterStrategy getDeadLetterStrategy() { 102 return next.getDeadLetterStrategy(); 103 } 104 105 @Override 106 public DestinationStatistics getDestinationStatistics() { 107 return next.getDestinationStatistics(); 108 } 109 110 @Override 111 public String getName() { 112 return next.getName(); 113 } 114 115 @Override 116 public MemoryUsage getMemoryUsage() { 117 return next.getMemoryUsage(); 118 } 119 120 @Override 121 public void setMemoryUsage(MemoryUsage memoryUsage) { 122 next.setMemoryUsage(memoryUsage); 123 } 124 125 @Override 126 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { 127 next.removeSubscription(context, sub, lastDeliveredSequenceId); 128 } 129 130 @Override 131 public void send(ProducerBrokerExchange context, Message messageSend) throws Exception { 132 next.send(context, messageSend); 133 } 134 135 @Override 136 public void start() throws Exception { 137 next.start(); 138 } 139 140 @Override 141 public void stop() throws Exception { 142 next.stop(); 143 } 144 145 @Override 146 public List<Subscription> getConsumers() { 147 return next.getConsumers(); 148 } 149 150 /** 151 * Sends a message to the given destination which may be a wildcard 152 * 153 * @param context broker context 154 * @param message message to send 155 * @param destination possibly wildcard destination to send the message to 156 * @throws Exception on error 157 */ 158 protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception { 159 Broker broker = context.getConnectionContext().getBroker(); 160 Set<Destination> destinations = broker.getDestinations(destination); 161 162 for (Destination dest : destinations) { 163 dest.send(context, message.copy()); 164 } 165 } 166 167 @Override 168 public MessageStore getMessageStore() { 169 return next.getMessageStore(); 170 } 171 172 @Override 173 public boolean isProducerFlowControl() { 174 return next.isProducerFlowControl(); 175 } 176 177 @Override 178 public void setProducerFlowControl(boolean value) { 179 next.setProducerFlowControl(value); 180 } 181 182 @Override 183 public boolean isAlwaysRetroactive() { 184 return next.isAlwaysRetroactive(); 185 } 186 187 @Override 188 public void setAlwaysRetroactive(boolean value) { 189 next.setAlwaysRetroactive(value); 190 } 191 192 @Override 193 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 194 next.setBlockedProducerWarningInterval(blockedProducerWarningInterval); 195 } 196 197 @Override 198 public long getBlockedProducerWarningInterval() { 199 return next.getBlockedProducerWarningInterval(); 200 } 201 202 @Override 203 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 204 next.addProducer(context, info); 205 } 206 207 @Override 208 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 209 next.removeProducer(context, info); 210 } 211 212 @Override 213 public int getMaxAuditDepth() { 214 return next.getMaxAuditDepth(); 215 } 216 217 @Override 218 public int getMaxProducersToAudit() { 219 return next.getMaxProducersToAudit(); 220 } 221 222 @Override 223 public boolean isEnableAudit() { 224 return next.isEnableAudit(); 225 } 226 227 @Override 228 public void setEnableAudit(boolean enableAudit) { 229 next.setEnableAudit(enableAudit); 230 } 231 232 @Override 233 public void setMaxAuditDepth(int maxAuditDepth) { 234 next.setMaxAuditDepth(maxAuditDepth); 235 } 236 237 @Override 238 public void setMaxProducersToAudit(int maxProducersToAudit) { 239 next.setMaxProducersToAudit(maxProducersToAudit); 240 } 241 242 @Override 243 public boolean isActive() { 244 return next.isActive(); 245 } 246 247 @Override 248 public int getMaxPageSize() { 249 return next.getMaxPageSize(); 250 } 251 252 @Override 253 public void setMaxPageSize(int maxPageSize) { 254 next.setMaxPageSize(maxPageSize); 255 } 256 257 @Override 258 public boolean isUseCache() { 259 return next.isUseCache(); 260 } 261 262 @Override 263 public void setUseCache(boolean useCache) { 264 next.setUseCache(useCache); 265 } 266 267 @Override 268 public int getMinimumMessageSize() { 269 return next.getMinimumMessageSize(); 270 } 271 272 @Override 273 public void setMinimumMessageSize(int minimumMessageSize) { 274 next.setMinimumMessageSize(minimumMessageSize); 275 } 276 277 @Override 278 public void wakeup() { 279 next.wakeup(); 280 } 281 282 @Override 283 public boolean isLazyDispatch() { 284 return next.isLazyDispatch(); 285 } 286 287 @Override 288 public void setLazyDispatch(boolean value) { 289 next.setLazyDispatch(value); 290 } 291 292 public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) { 293 next.messageExpired(context, prefetchSubscription, node); 294 } 295 296 @Override 297 public boolean iterate() { 298 return next.iterate(); 299 } 300 301 @Override 302 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { 303 next.fastProducer(context, producerInfo); 304 } 305 306 @Override 307 public void isFull(ConnectionContext context, Usage<?> usage) { 308 next.isFull(context, usage); 309 } 310 311 @Override 312 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 313 next.messageConsumed(context, messageReference); 314 } 315 316 @Override 317 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 318 next.messageDelivered(context, messageReference); 319 } 320 321 @Override 322 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 323 next.messageDiscarded(context, sub, messageReference); 324 } 325 326 @Override 327 public void slowConsumer(ConnectionContext context, Subscription subs) { 328 next.slowConsumer(context, subs); 329 } 330 331 @Override 332 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference node) { 333 next.messageExpired(context, subs, node); 334 } 335 336 @Override 337 public int getMaxBrowsePageSize() { 338 return next.getMaxBrowsePageSize(); 339 } 340 341 @Override 342 public void setMaxBrowsePageSize(int maxPageSize) { 343 next.setMaxBrowsePageSize(maxPageSize); 344 } 345 346 @Override 347 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 348 next.processDispatchNotification(messageDispatchNotification); 349 } 350 351 @Override 352 public int getCursorMemoryHighWaterMark() { 353 return next.getCursorMemoryHighWaterMark(); 354 } 355 356 @Override 357 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { 358 next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark); 359 } 360 361 @Override 362 public boolean isPrioritizedMessages() { 363 return next.isPrioritizedMessages(); 364 } 365 366 @Override 367 public SlowConsumerStrategy getSlowConsumerStrategy() { 368 return next.getSlowConsumerStrategy(); 369 } 370 371 @Override 372 public boolean isDoOptimzeMessageStorage() { 373 return next.isDoOptimzeMessageStorage(); 374 } 375 376 @Override 377 public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { 378 next.setDoOptimzeMessageStorage(doOptimzeMessageStorage); 379 } 380 381 @Override 382 public void clearPendingMessages() { 383 next.clearPendingMessages(); 384 } 385 386 @Override 387 public void duplicateFromStore(Message message, Subscription subscription) { 388 next.duplicateFromStore(message, subscription); 389 } 390 391 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { 392 if (next instanceof DestinationFilter) { 393 DestinationFilter filter = (DestinationFilter) next; 394 filter.deleteSubscription(context, key); 395 } else if (next instanceof Topic) { 396 Topic topic = (Topic)next; 397 topic.deleteSubscription(context, key); 398 } 399 } 400 401 public Destination getNext() { 402 return next; 403 } 404 405 public <T> T getAdaptor(Class <? extends T> clazz) { 406 if (clazz.isInstance(this)) { 407 return clazz.cast(this); 408 } else if (next != null && clazz.isInstance(next)) { 409 return clazz.cast(next); 410 } else if (next instanceof DestinationFilter) { 411 return ((DestinationFilter)next).getAdaptor(clazz); 412 } 413 return null; 414 } 415}