001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.virtual; 018 019import org.apache.activemq.broker.Broker; 020import org.apache.activemq.broker.ProducerBrokerExchange; 021import org.apache.activemq.broker.region.Destination; 022import org.apache.activemq.broker.region.Subscription; 023import org.apache.activemq.command.ActiveMQDestination; 024import org.apache.activemq.command.Message; 025import org.apache.activemq.filter.BooleanExpression; 026import org.apache.activemq.filter.MessageEvaluationContext; 027import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 028import org.apache.activemq.plugin.SubQueueSelectorCacheBroker; 029import org.apache.activemq.selector.SelectorParser; 030import org.apache.activemq.util.LRUCache; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import java.io.IOException; 035import java.util.List; 036import java.util.Set; 037 038public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor { 039 private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class); 040 LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>(); 041 private SubQueueSelectorCacheBroker selectorCachePlugin; 042 043 public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) { 044 super(next, virtualTopic); 045 } 046 047 /** 048 * Respect the selectors of the subscriptions to ensure only matched messages are dispatched to 049 * the virtual queues, hence there is no build up of unmatched messages on these destinations 050 */ 051 @Override 052 protected boolean shouldDispatch(final Broker broker, Message message, Destination dest) throws IOException { 053 //first validate that the prefix matches in the super class 054 if (super.shouldDispatch(broker, message, dest)) { 055 boolean matches = false; 056 MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 057 msgContext.setDestination(dest.getActiveMQDestination()); 058 msgContext.setMessageReference(message); 059 List<Subscription> subs = dest.getConsumers(); 060 for (Subscription sub : subs) { 061 if (sub.matches(message, msgContext)) { 062 matches = true; 063 break; 064 } 065 } 066 if (matches == false) { 067 matches = tryMatchingCachedSubs(broker, dest, msgContext); 068 } 069 return matches; 070 } 071 return false; 072 } 073 074 private boolean tryMatchingCachedSubs(final Broker broker, Destination dest, MessageEvaluationContext msgContext) { 075 boolean matches = false; 076 LOG.debug("No active consumer match found. Will try cache if configured..."); 077 078 //retrieve the specific plugin class and lookup the selector for the destination. 079 final SubQueueSelectorCacheBroker cache = getSubQueueSelectorCacheBrokerPlugin(broker); 080 081 if (cache != null) { 082 final Set<String> selectors = cache.getSelector(dest.getActiveMQDestination().getQualifiedName()); 083 if (selectors != null) { 084 for (String selector : selectors) { 085 try { 086 final BooleanExpression expression = getExpression(selector); 087 matches = expression.matches(msgContext); 088 if (matches) { 089 return true; 090 } 091 } catch (Exception e) { 092 LOG.error(e.getMessage(), e); 093 } 094 } 095 } 096 } 097 return matches; 098 } 099 100 private BooleanExpression getExpression(String selector) throws Exception{ 101 BooleanExpression result; 102 synchronized(expressionCache){ 103 result = expressionCache.get(selector); 104 if (result == null){ 105 result = compileSelector(selector); 106 expressionCache.put(selector,result); 107 } 108 } 109 return result; 110 } 111 112 /** 113 * @return The SubQueueSelectorCacheBroker instance or null if no such broker is available. 114 */ 115 private SubQueueSelectorCacheBroker getSubQueueSelectorCacheBrokerPlugin(final Broker broker) { 116 if (selectorCachePlugin == null) { 117 selectorCachePlugin = (SubQueueSelectorCacheBroker) broker.getAdaptor(SubQueueSelectorCacheBroker.class); 118 } //if 119 120 return selectorCachePlugin; 121 } 122 123 /** 124 * Pre-compile the JMS selector. 125 * 126 * @param selectorExpression The non-null JMS selector expression. 127 */ 128 private BooleanExpression compileSelector(final String selectorExpression) throws Exception { 129 return SelectorParser.parse(selectorExpression); 130 } 131}