001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.memory; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.Iterator; 023import java.util.LinkedHashMap; 024import java.util.Map; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027 028import org.apache.activemq.broker.ConnectionContext; 029import org.apache.activemq.command.Message; 030import org.apache.activemq.command.MessageAck; 031import org.apache.activemq.command.MessageId; 032import org.apache.activemq.command.TransactionId; 033import org.apache.activemq.command.XATransactionId; 034import org.apache.activemq.store.InlineListenableFuture; 035import org.apache.activemq.store.ListenableFuture; 036import org.apache.activemq.store.MessageStore; 037import org.apache.activemq.store.PersistenceAdapter; 038import org.apache.activemq.store.ProxyMessageStore; 039import org.apache.activemq.store.ProxyTopicMessageStore; 040import org.apache.activemq.store.TopicMessageStore; 041import org.apache.activemq.store.TransactionRecoveryListener; 042import org.apache.activemq.store.TransactionStore; 043 044/** 045 * Provides a TransactionStore implementation that can create transaction aware 046 * MessageStore objects from non transaction aware MessageStore objects. 047 * 048 * 049 */ 050public class MemoryTransactionStore implements TransactionStore { 051 052 protected ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>(); 053 protected Map<TransactionId, Tx> preparedTransactions = Collections.synchronizedMap(new LinkedHashMap<TransactionId, Tx>()); 054 protected final PersistenceAdapter persistenceAdapter; 055 056 private boolean doingRecover; 057 058 public class Tx { 059 public ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>(); 060 061 public final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>(); 062 063 public void add(AddMessageCommand msg) { 064 messages.add(msg); 065 } 066 067 public void add(RemoveMessageCommand ack) { 068 acks.add(ack); 069 } 070 071 public Message[] getMessages() { 072 Message rc[] = new Message[messages.size()]; 073 int count = 0; 074 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 075 AddMessageCommand cmd = iter.next(); 076 rc[count++] = cmd.getMessage(); 077 } 078 return rc; 079 } 080 081 public MessageAck[] getAcks() { 082 MessageAck rc[] = new MessageAck[acks.size()]; 083 int count = 0; 084 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 085 RemoveMessageCommand cmd = iter.next(); 086 rc[count++] = cmd.getMessageAck(); 087 } 088 return rc; 089 } 090 091 /** 092 * @throws IOException 093 */ 094 public void commit() throws IOException { 095 ConnectionContext ctx = new ConnectionContext(); 096 persistenceAdapter.beginTransaction(ctx); 097 try { 098 099 // Do all the message adds. 100 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 101 AddMessageCommand cmd = iter.next(); 102 cmd.run(ctx); 103 } 104 // And removes.. 105 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 106 RemoveMessageCommand cmd = iter.next(); 107 cmd.run(ctx); 108 } 109 110 } catch ( IOException e ) { 111 persistenceAdapter.rollbackTransaction(ctx); 112 throw e; 113 } 114 persistenceAdapter.commitTransaction(ctx); 115 } 116 } 117 118 public interface AddMessageCommand { 119 Message getMessage(); 120 121 MessageStore getMessageStore(); 122 123 void run(ConnectionContext context) throws IOException; 124 125 void setMessageStore(MessageStore messageStore); 126 } 127 128 public interface RemoveMessageCommand { 129 MessageAck getMessageAck(); 130 131 void run(ConnectionContext context) throws IOException; 132 133 MessageStore getMessageStore(); 134 } 135 136 public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) { 137 this.persistenceAdapter=persistenceAdapter; 138 } 139 140 public MessageStore proxy(MessageStore messageStore) { 141 ProxyMessageStore proxyMessageStore = new ProxyMessageStore(messageStore) { 142 @Override 143 public void addMessage(ConnectionContext context, final Message send) throws IOException { 144 MemoryTransactionStore.this.addMessage(context, getDelegate(), send); 145 } 146 147 @Override 148 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 149 MemoryTransactionStore.this.addMessage(context, getDelegate(), send); 150 } 151 152 @Override 153 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 154 MemoryTransactionStore.this.addMessage(context, getDelegate(), message); 155 return new InlineListenableFuture(); 156 } 157 158 @Override 159 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException { 160 MemoryTransactionStore.this.addMessage(context, getDelegate(), message); 161 return new InlineListenableFuture(); 162 } 163 164 @Override 165 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 166 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 167 } 168 169 @Override 170 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 171 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 172 } 173 }; 174 onProxyQueueStore(proxyMessageStore); 175 return proxyMessageStore; 176 } 177 178 protected void onProxyQueueStore(ProxyMessageStore proxyMessageStore) { 179 } 180 181 public TopicMessageStore proxy(TopicMessageStore messageStore) { 182 ProxyTopicMessageStore proxyTopicMessageStore = new ProxyTopicMessageStore(messageStore) { 183 @Override 184 public void addMessage(ConnectionContext context, final Message send) throws IOException { 185 MemoryTransactionStore.this.addMessage(context, getDelegate(), send); 186 } 187 188 @Override 189 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 190 MemoryTransactionStore.this.addMessage(context, getDelegate(), send); 191 } 192 193 @Override 194 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 195 MemoryTransactionStore.this.addMessage(context, getDelegate(), message); 196 return new InlineListenableFuture(); 197 } 198 199 @Override 200 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException { 201 MemoryTransactionStore.this.addMessage(context, getDelegate(), message); 202 return new InlineListenableFuture(); 203 } 204 205 @Override 206 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 207 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 208 } 209 210 @Override 211 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 212 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 213 } 214 215 @Override 216 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 217 MessageId messageId, MessageAck ack) throws IOException { 218 MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId, 219 subscriptionName, messageId, ack); 220 } 221 }; 222 onProxyTopicStore(proxyTopicMessageStore); 223 return proxyTopicMessageStore; 224 } 225 226 protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) { 227 } 228 229 /** 230 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 231 */ 232 @Override 233 public void prepare(TransactionId txid) throws IOException { 234 Tx tx = inflightTransactions.remove(txid); 235 if (tx == null) { 236 return; 237 } 238 preparedTransactions.put(txid, tx); 239 } 240 241 public Tx getTx(Object txid) { 242 Tx tx = inflightTransactions.get(txid); 243 if (tx == null) { 244 tx = new Tx(); 245 inflightTransactions.put(txid, tx); 246 } 247 return tx; 248 } 249 250 public Tx getPreparedTx(TransactionId txid) { 251 Tx tx = preparedTransactions.get(txid); 252 if (tx == null) { 253 tx = new Tx(); 254 preparedTransactions.put(txid, tx); 255 } 256 return tx; 257 } 258 259 @Override 260 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException { 261 if (preCommit != null) { 262 preCommit.run(); 263 } 264 Tx tx; 265 if (wasPrepared) { 266 tx = preparedTransactions.remove(txid); 267 } else { 268 tx = inflightTransactions.remove(txid); 269 } 270 271 if (tx != null) { 272 tx.commit(); 273 } 274 if (postCommit != null) { 275 postCommit.run(); 276 } 277 } 278 279 /** 280 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 281 */ 282 @Override 283 public void rollback(TransactionId txid) throws IOException { 284 preparedTransactions.remove(txid); 285 inflightTransactions.remove(txid); 286 } 287 288 @Override 289 public void start() throws Exception { 290 } 291 292 @Override 293 public void stop() throws Exception { 294 } 295 296 @Override 297 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 298 // All the inflight transactions get rolled back.. 299 inflightTransactions.clear(); 300 this.doingRecover = true; 301 try { 302 for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) { 303 Object txid = iter.next(); 304 Tx tx = preparedTransactions.get(txid); 305 listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks()); 306 onRecovered(tx); 307 } 308 } finally { 309 this.doingRecover = false; 310 } 311 } 312 313 protected void onRecovered(Tx tx) { 314 } 315 316 /** 317 * @param message 318 * @throws IOException 319 */ 320 void addMessage(final ConnectionContext context, final MessageStore destination, final Message message) throws IOException { 321 322 if (doingRecover) { 323 return; 324 } 325 326 if (message.getTransactionId() != null) { 327 Tx tx = getTx(message.getTransactionId()); 328 tx.add(new AddMessageCommand() { 329 MessageStore messageStore = destination; 330 @Override 331 public Message getMessage() { 332 return message; 333 } 334 335 @Override 336 public MessageStore getMessageStore() { 337 return destination; 338 } 339 340 @Override 341 public void run(ConnectionContext ctx) throws IOException { 342 destination.addMessage(ctx, message); 343 } 344 345 @Override 346 public void setMessageStore(MessageStore messageStore) { 347 this.messageStore = messageStore; 348 } 349 350 }); 351 } else { 352 destination.addMessage(context, message); 353 } 354 } 355 356 /** 357 * @param ack 358 * @throws IOException 359 */ 360 final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException { 361 if (doingRecover) { 362 return; 363 } 364 365 if (ack.isInTransaction()) { 366 Tx tx = getTx(ack.getTransactionId()); 367 tx.add(new RemoveMessageCommand() { 368 @Override 369 public MessageAck getMessageAck() { 370 return ack; 371 } 372 373 @Override 374 public void run(ConnectionContext ctx) throws IOException { 375 destination.removeMessage(ctx, ack); 376 } 377 378 @Override 379 public MessageStore getMessageStore() { 380 return destination; 381 } 382 }); 383 } else { 384 destination.removeMessage(null, ack); 385 } 386 } 387 388 public void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName, 389 final MessageId messageId, final MessageAck ack) throws IOException { 390 if (doingRecover) { 391 return; 392 } 393 394 if (ack.isInTransaction()) { 395 Tx tx = getTx(ack.getTransactionId()); 396 tx.add(new RemoveMessageCommand() { 397 @Override 398 public MessageAck getMessageAck() { 399 return ack; 400 } 401 402 @Override 403 public void run(ConnectionContext ctx) throws IOException { 404 destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack); 405 } 406 407 @Override 408 public MessageStore getMessageStore() { 409 return destination; 410 } 411 }); 412 } else { 413 destination.acknowledge(null, clientId, subscriptionName, messageId, ack); 414 } 415 } 416 417 418 public void delete() { 419 inflightTransactions.clear(); 420 preparedTransactions.clear(); 421 doingRecover = false; 422 } 423 424}