001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.net.URI; 020import java.util.Map; 021import java.util.Set; 022import java.util.concurrent.ThreadPoolExecutor; 023 024import org.apache.activemq.broker.region.Destination; 025import org.apache.activemq.broker.region.MessageReference; 026import org.apache.activemq.broker.region.Subscription; 027import org.apache.activemq.broker.region.virtual.VirtualDestination; 028import org.apache.activemq.command.ActiveMQDestination; 029import org.apache.activemq.command.BrokerId; 030import org.apache.activemq.command.BrokerInfo; 031import org.apache.activemq.command.ConnectionInfo; 032import org.apache.activemq.command.ConsumerControl; 033import org.apache.activemq.command.ConsumerInfo; 034import org.apache.activemq.command.DestinationInfo; 035import org.apache.activemq.command.Message; 036import org.apache.activemq.command.MessageAck; 037import org.apache.activemq.command.MessageDispatch; 038import org.apache.activemq.command.MessageDispatchNotification; 039import org.apache.activemq.command.MessagePull; 040import org.apache.activemq.command.ProducerInfo; 041import org.apache.activemq.command.RemoveSubscriptionInfo; 042import org.apache.activemq.command.Response; 043import org.apache.activemq.command.SessionInfo; 044import org.apache.activemq.command.TransactionId; 045import org.apache.activemq.store.PListStore; 046import org.apache.activemq.thread.Scheduler; 047import org.apache.activemq.usage.Usage; 048 049/** 050 * Allows you to intercept broker operation so that features such as security 051 * can be implemented as a pluggable filter. 052 * 053 * 054 */ 055public class BrokerFilter implements Broker { 056 057 protected final Broker next; 058 059 public BrokerFilter(Broker next) { 060 this.next = next; 061 } 062 063 @Override 064 public Broker getAdaptor(Class type) { 065 if (type.isInstance(this)) { 066 return this; 067 } 068 return next.getAdaptor(type); 069 } 070 071 @Override 072 public Map<ActiveMQDestination, Destination> getDestinationMap() { 073 return next.getDestinationMap(); 074 } 075 076 @Override 077 public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination destination) { 078 return next.getDestinationMap(destination); 079 } 080 081 @Override 082 public Set <Destination>getDestinations(ActiveMQDestination destination) { 083 return next.getDestinations(destination); 084 } 085 086 @Override 087 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 088 next.acknowledge(consumerExchange, ack); 089 } 090 091 @Override 092 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 093 return next.messagePull(context, pull); 094 } 095 096 @Override 097 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 098 next.addConnection(context, info); 099 } 100 101 @Override 102 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 103 return next.addConsumer(context, info); 104 } 105 106 @Override 107 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 108 next.addProducer(context, info); 109 } 110 111 @Override 112 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 113 next.commitTransaction(context, xid, onePhase); 114 } 115 116 @Override 117 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 118 next.removeSubscription(context, info); 119 } 120 121 @Override 122 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 123 return next.getPreparedTransactions(context); 124 } 125 126 @Override 127 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 128 return next.prepareTransaction(context, xid); 129 } 130 131 @Override 132 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 133 next.removeConnection(context, info, error); 134 } 135 136 @Override 137 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 138 next.removeConsumer(context, info); 139 } 140 141 @Override 142 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 143 next.removeProducer(context, info); 144 } 145 146 @Override 147 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 148 next.rollbackTransaction(context, xid); 149 } 150 151 @Override 152 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 153 next.send(producerExchange, messageSend); 154 } 155 156 @Override 157 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 158 next.beginTransaction(context, xid); 159 } 160 161 @Override 162 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 163 next.forgetTransaction(context, transactionId); 164 } 165 166 @Override 167 public Connection[] getClients() throws Exception { 168 return next.getClients(); 169 } 170 171 @Override 172 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception { 173 return next.addDestination(context, destination,createIfTemporary); 174 } 175 176 @Override 177 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 178 next.removeDestination(context, destination, timeout); 179 } 180 181 @Override 182 public ActiveMQDestination[] getDestinations() throws Exception { 183 return next.getDestinations(); 184 } 185 186 @Override 187 public void start() throws Exception { 188 next.start(); 189 } 190 191 @Override 192 public void stop() throws Exception { 193 next.stop(); 194 } 195 196 @Override 197 public void addSession(ConnectionContext context, SessionInfo info) throws Exception { 198 next.addSession(context, info); 199 } 200 201 @Override 202 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { 203 next.removeSession(context, info); 204 } 205 206 @Override 207 public BrokerId getBrokerId() { 208 return next.getBrokerId(); 209 } 210 211 @Override 212 public String getBrokerName() { 213 return next.getBrokerName(); 214 } 215 216 @Override 217 public void gc() { 218 next.gc(); 219 } 220 221 @Override 222 public void addBroker(Connection connection, BrokerInfo info) { 223 next.addBroker(connection, info); 224 } 225 226 @Override 227 public void removeBroker(Connection connection, BrokerInfo info) { 228 next.removeBroker(connection, info); 229 } 230 231 @Override 232 public BrokerInfo[] getPeerBrokerInfos() { 233 return next.getPeerBrokerInfos(); 234 } 235 236 @Override 237 public void preProcessDispatch(MessageDispatch messageDispatch) { 238 next.preProcessDispatch(messageDispatch); 239 } 240 241 @Override 242 public void postProcessDispatch(MessageDispatch messageDispatch) { 243 next.postProcessDispatch(messageDispatch); 244 } 245 246 @Override 247 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 248 next.processDispatchNotification(messageDispatchNotification); 249 } 250 251 @Override 252 public boolean isStopped() { 253 return next.isStopped(); 254 } 255 256 @Override 257 public Set<ActiveMQDestination> getDurableDestinations() { 258 return next.getDurableDestinations(); 259 } 260 261 @Override 262 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 263 next.addDestinationInfo(context, info); 264 } 265 266 @Override 267 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 268 next.removeDestinationInfo(context, info); 269 } 270 271 @Override 272 public boolean isFaultTolerantConfiguration() { 273 return next.isFaultTolerantConfiguration(); 274 } 275 276 @Override 277 public ConnectionContext getAdminConnectionContext() { 278 return next.getAdminConnectionContext(); 279 } 280 281 @Override 282 public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { 283 next.setAdminConnectionContext(adminConnectionContext); 284 } 285 286 @Override 287 public PListStore getTempDataStore() { 288 return next.getTempDataStore(); 289 } 290 291 @Override 292 public URI getVmConnectorURI() { 293 return next.getVmConnectorURI(); 294 } 295 296 @Override 297 public void brokerServiceStarted() { 298 next.brokerServiceStarted(); 299 } 300 301 @Override 302 public BrokerService getBrokerService() { 303 return next.getBrokerService(); 304 } 305 306 @Override 307 public boolean isExpired(MessageReference messageReference) { 308 return next.isExpired(messageReference); 309 } 310 311 @Override 312 public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) { 313 next.messageExpired(context, message, subscription); 314 } 315 316 @Override 317 public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, 318 Subscription subscription, Throwable poisonCause) { 319 return next.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause); 320 } 321 322 @Override 323 public Broker getRoot() { 324 return next.getRoot(); 325 } 326 327 @Override 328 public long getBrokerSequenceId() { 329 return next.getBrokerSequenceId(); 330 } 331 332 333 @Override 334 public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) { 335 next.fastProducer(context, producerInfo, destination); 336 } 337 338 @Override 339 public void isFull(ConnectionContext context,Destination destination, Usage usage) { 340 next.isFull(context,destination, usage); 341 } 342 343 @Override 344 public void messageConsumed(ConnectionContext context,MessageReference messageReference) { 345 next.messageConsumed(context, messageReference); 346 } 347 348 @Override 349 public void messageDelivered(ConnectionContext context,MessageReference messageReference) { 350 next.messageDelivered(context, messageReference); 351 } 352 353 @Override 354 public void messageDiscarded(ConnectionContext context,Subscription sub, MessageReference messageReference) { 355 next.messageDiscarded(context, sub, messageReference); 356 } 357 358 @Override 359 public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { 360 next.slowConsumer(context, destination,subs); 361 } 362 363 @Override 364 public void virtualDestinationAdded(ConnectionContext context, 365 VirtualDestination virtualDestination) { 366 next.virtualDestinationAdded(context, virtualDestination); 367 } 368 369 @Override 370 public void virtualDestinationRemoved(ConnectionContext context, 371 VirtualDestination virtualDestination) { 372 next.virtualDestinationRemoved(context, virtualDestination); 373 } 374 375 @Override 376 public void nowMasterBroker() { 377 next.nowMasterBroker(); 378 } 379 380 @Override 381 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, 382 ConsumerControl control) { 383 next.processConsumerControl(consumerExchange, control); 384 } 385 386 @Override 387 public void reapplyInterceptor() { 388 next.reapplyInterceptor(); 389 } 390 391 @Override 392 public Scheduler getScheduler() { 393 return next.getScheduler(); 394 } 395 396 @Override 397 public ThreadPoolExecutor getExecutor() { 398 return next.getExecutor(); 399 } 400 401 @Override 402 public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) { 403 next.networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp); 404 } 405 406 @Override 407 public void networkBridgeStopped(BrokerInfo brokerInfo) { 408 next.networkBridgeStopped(brokerInfo); 409 } 410}