001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.memory; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.HashSet; 022import java.util.Iterator; 023import java.util.Set; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026 027import org.apache.activemq.broker.ConnectionContext; 028import org.apache.activemq.broker.scheduler.JobSchedulerStore; 029import org.apache.activemq.command.ActiveMQDestination; 030import org.apache.activemq.command.ActiveMQQueue; 031import org.apache.activemq.command.ActiveMQTopic; 032import org.apache.activemq.command.ProducerId; 033import org.apache.activemq.store.MessageStore; 034import org.apache.activemq.store.PersistenceAdapter; 035import org.apache.activemq.store.ProxyMessageStore; 036import org.apache.activemq.store.TopicMessageStore; 037import org.apache.activemq.store.TransactionStore; 038import org.apache.activemq.usage.SystemUsage; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * @org.apache.xbean.XBean 044 * 045 */ 046public class MemoryPersistenceAdapter implements PersistenceAdapter { 047 private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class); 048 049 MemoryTransactionStore transactionStore; 050 ConcurrentMap<ActiveMQDestination, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>(); 051 ConcurrentMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>(); 052 private boolean useExternalMessageReferences; 053 054 @Override 055 public Set<ActiveMQDestination> getDestinations() { 056 Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size()); 057 for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) { 058 rc.add(iter.next()); 059 } 060 for (Iterator<ActiveMQDestination> iter = topics.keySet().iterator(); iter.hasNext();) { 061 rc.add(iter.next()); 062 } 063 return rc; 064 } 065 066 public static MemoryPersistenceAdapter newInstance(File file) { 067 return new MemoryPersistenceAdapter(); 068 } 069 070 @Override 071 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 072 MessageStore rc = queues.get(destination); 073 if (rc == null) { 074 rc = new MemoryMessageStore(destination); 075 if (transactionStore != null) { 076 rc = transactionStore.proxy(rc); 077 } 078 queues.put(destination, rc); 079 } 080 return rc; 081 } 082 083 @Override 084 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 085 TopicMessageStore rc = topics.get(destination); 086 if (rc == null) { 087 rc = new MemoryTopicMessageStore(destination); 088 if (transactionStore != null) { 089 rc = transactionStore.proxy(rc); 090 } 091 topics.put(destination, rc); 092 } 093 return rc; 094 } 095 096 /** 097 * Cleanup method to remove any state associated with the given destination 098 * 099 * @param destination Destination to forget 100 */ 101 @Override 102 public void removeQueueMessageStore(ActiveMQQueue destination) { 103 queues.remove(destination); 104 } 105 106 /** 107 * Cleanup method to remove any state associated with the given destination 108 * 109 * @param destination Destination to forget 110 */ 111 @Override 112 public void removeTopicMessageStore(ActiveMQTopic destination) { 113 topics.remove(destination); 114 } 115 116 @Override 117 public TransactionStore createTransactionStore() throws IOException { 118 if (transactionStore == null) { 119 transactionStore = new MemoryTransactionStore(this); 120 } 121 return transactionStore; 122 } 123 124 @Override 125 public void beginTransaction(ConnectionContext context) { 126 } 127 128 @Override 129 public void commitTransaction(ConnectionContext context) { 130 } 131 132 @Override 133 public void rollbackTransaction(ConnectionContext context) { 134 } 135 136 @Override 137 public void start() throws Exception { 138 } 139 140 @Override 141 public void stop() throws Exception { 142 } 143 144 @Override 145 public long getLastMessageBrokerSequenceId() throws IOException { 146 return 0; 147 } 148 149 @Override 150 public void deleteAllMessages() throws IOException { 151 for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) { 152 MemoryMessageStore store = asMemoryMessageStore(iter.next()); 153 if (store != null) { 154 store.delete(); 155 } 156 } 157 for (Iterator<MessageStore> iter = queues.values().iterator(); iter.hasNext();) { 158 MemoryMessageStore store = asMemoryMessageStore(iter.next()); 159 if (store != null) { 160 store.delete(); 161 } 162 } 163 164 if (transactionStore != null) { 165 transactionStore.delete(); 166 } 167 } 168 169 public boolean isUseExternalMessageReferences() { 170 return useExternalMessageReferences; 171 } 172 173 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 174 this.useExternalMessageReferences = useExternalMessageReferences; 175 } 176 177 protected MemoryMessageStore asMemoryMessageStore(Object value) { 178 if (value instanceof MemoryMessageStore) { 179 return (MemoryMessageStore)value; 180 } 181 if (value instanceof ProxyMessageStore) { 182 MessageStore delegate = ((ProxyMessageStore)value).getDelegate(); 183 if (delegate instanceof MemoryMessageStore) { 184 return (MemoryMessageStore) delegate; 185 } 186 } 187 LOG.warn("Expected an instance of MemoryMessageStore but was: " + value); 188 return null; 189 } 190 191 /** 192 * @param usageManager The UsageManager that is controlling the broker's 193 * memory usage. 194 */ 195 @Override 196 public void setUsageManager(SystemUsage usageManager) { 197 } 198 199 @Override 200 public String toString() { 201 return "MemoryPersistenceAdapter"; 202 } 203 204 @Override 205 public void setBrokerName(String brokerName) { 206 } 207 208 @Override 209 public void setDirectory(File dir) { 210 } 211 212 @Override 213 public File getDirectory(){ 214 return null; 215 } 216 217 @Override 218 public void checkpoint(boolean sync) throws IOException { 219 } 220 221 @Override 222 public long size(){ 223 return 0; 224 } 225 226 public void setCreateTransactionStore(boolean create) throws IOException { 227 if (create) { 228 createTransactionStore(); 229 } 230 } 231 232 @Override 233 public long getLastProducerSequenceId(ProducerId id) { 234 // memory map does duplicate suppression 235 return -1; 236 } 237 238 @Override 239 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 240 // We could eventuall implement an in memory scheduler. 241 throw new UnsupportedOperationException(); 242 } 243}