001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.memory;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.Map.Entry;
025
026import org.apache.activemq.broker.ConnectionContext;
027import org.apache.activemq.command.ActiveMQDestination;
028import org.apache.activemq.command.Message;
029import org.apache.activemq.command.MessageAck;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.command.SubscriptionInfo;
032import org.apache.activemq.store.MessageRecoveryListener;
033import org.apache.activemq.store.MessageStoreStatistics;
034import org.apache.activemq.store.TopicMessageStore;
035import org.apache.activemq.util.LRUCache;
036import org.apache.activemq.util.SubscriptionKey;
037
038/**
039 *
040 */
041public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
042
043    private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
044    private Map<SubscriptionKey, MemoryTopicSub> topicSubMap;
045    private final Map<MessageId, Message> originalMessageTable;
046
047    public MemoryTopicMessageStore(ActiveMQDestination destination) {
048        this(destination, new MemoryTopicMessageStoreLRUCache(100, 100, 0.75f, false), makeSubscriptionInfoMap());
049
050        //Set the messageStoreStatistics after the super class is initialized so that the stats can be
051        //properly updated on cache eviction
052        MemoryTopicMessageStoreLRUCache cache = (MemoryTopicMessageStoreLRUCache) originalMessageTable;
053        cache.setMessageStoreStatistics(messageStoreStatistics);
054    }
055
056    public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
057        super(destination, messageTable);
058        this.subscriberDatabase = subscriberDatabase;
059        this.topicSubMap = makeSubMap();
060        //this is only necessary so that messageStoreStatistics can be set if necessary
061        //We need the original reference since messageTable is wrapped in a synchronized map in the parent class
062        this.originalMessageTable = messageTable;
063    }
064
065    protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() {
066        return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>());
067    }
068
069    protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() {
070        return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>());
071    }
072
073    @Override
074    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
075        super.addMessage(context, message);
076        for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) {
077            MemoryTopicSub sub = i.next();
078            sub.addMessage(message.getMessageId(), message);
079        }
080    }
081
082    @Override
083    public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
084                                         MessageId messageId, MessageAck ack) throws IOException {
085        SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
086        MemoryTopicSub sub = topicSubMap.get(key);
087        if (sub != null) {
088            sub.removeMessage(messageId);
089        }
090    }
091
092    @Override
093    public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
094        return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
095    }
096
097    @Override
098    public synchronized void addSubscription(SubscriptionInfo info, boolean retroactive) throws IOException {
099        SubscriptionKey key = new SubscriptionKey(info);
100        MemoryTopicSub sub = new MemoryTopicSub();
101        topicSubMap.put(key, sub);
102        if (retroactive) {
103            for (Iterator i = messageTable.entrySet().iterator(); i.hasNext();) {
104                Map.Entry entry = (Entry)i.next();
105                sub.addMessage((MessageId)entry.getKey(), (Message)entry.getValue());
106            }
107        }
108        subscriberDatabase.put(key, info);
109    }
110
111    @Override
112    public synchronized void deleteSubscription(String clientId, String subscriptionName) {
113        org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
114        subscriberDatabase.remove(key);
115        topicSubMap.remove(key);
116    }
117
118    @Override
119    public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
120        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
121        if (sub != null) {
122            sub.recoverSubscription(listener);
123        }
124    }
125
126    @Override
127    public synchronized void delete() {
128        super.delete();
129        subscriberDatabase.clear();
130        topicSubMap.clear();
131    }
132
133    @Override
134    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
135        return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
136    }
137
138    @Override
139    public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
140        int result = 0;
141        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
142        if (sub != null) {
143            result = sub.size();
144        }
145        return result;
146    }
147
148    @Override
149    public synchronized long getMessageSize(String clientId, String subscriberName) throws IOException {
150        long result = 0;
151        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
152        if (sub != null) {
153            result = sub.messageSize();
154        }
155        return result;
156    }
157
158    @Override
159    public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
160        MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
161        if (sub != null) {
162            sub.recoverNextMessages(maxReturned, listener);
163        }
164    }
165
166    @Override
167    public void resetBatching(String clientId, String subscriptionName) {
168        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
169        if (sub != null) {
170            sub.resetBatching();
171        }
172    }
173
174    /**
175     * Since we initialize the store with a LRUCache in some cases, we need to account for cache evictions
176     * when computing the message store statistics.
177     *
178     */
179    private static class MemoryTopicMessageStoreLRUCache extends LRUCache<MessageId, Message> {
180        private static final long serialVersionUID = -342098639681884413L;
181        private MessageStoreStatistics messageStoreStatistics;
182
183        public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize,
184                float loadFactor, boolean accessOrder) {
185            super(initialCapacity, maximumCacheSize, loadFactor, accessOrder);
186        }
187
188        public void setMessageStoreStatistics(
189                MessageStoreStatistics messageStoreStatistics) {
190            this.messageStoreStatistics = messageStoreStatistics;
191        }
192
193        @Override
194        protected void onCacheEviction(Map.Entry<MessageId, Message> eldest) {
195            decMessageStoreStatistics(messageStoreStatistics, eldest.getValue());
196        }
197    }
198}