001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.memory;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.Iterator;
023import java.util.LinkedHashMap;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027
028import org.apache.activemq.broker.ConnectionContext;
029import org.apache.activemq.command.Message;
030import org.apache.activemq.command.MessageAck;
031import org.apache.activemq.command.MessageId;
032import org.apache.activemq.command.TransactionId;
033import org.apache.activemq.command.XATransactionId;
034import org.apache.activemq.store.InlineListenableFuture;
035import org.apache.activemq.store.ListenableFuture;
036import org.apache.activemq.store.MessageStore;
037import org.apache.activemq.store.PersistenceAdapter;
038import org.apache.activemq.store.ProxyMessageStore;
039import org.apache.activemq.store.ProxyTopicMessageStore;
040import org.apache.activemq.store.TopicMessageStore;
041import org.apache.activemq.store.TransactionRecoveryListener;
042import org.apache.activemq.store.TransactionStore;
043
044/**
045 * Provides a TransactionStore implementation that can create transaction aware
046 * MessageStore objects from non transaction aware MessageStore objects.
047 *
048 *
049 */
050public class MemoryTransactionStore implements TransactionStore {
051
052    protected ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
053    protected Map<TransactionId, Tx> preparedTransactions = Collections.synchronizedMap(new LinkedHashMap<TransactionId, Tx>());
054    protected final PersistenceAdapter persistenceAdapter;
055
056    private boolean doingRecover;
057
058    public class Tx {
059        public ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
060
061        public final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
062
063        public void add(AddMessageCommand msg) {
064            messages.add(msg);
065        }
066
067        public void add(RemoveMessageCommand ack) {
068            acks.add(ack);
069        }
070
071        public Message[] getMessages() {
072            Message rc[] = new Message[messages.size()];
073            int count = 0;
074            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
075                AddMessageCommand cmd = iter.next();
076                rc[count++] = cmd.getMessage();
077            }
078            return rc;
079        }
080
081        public MessageAck[] getAcks() {
082            MessageAck rc[] = new MessageAck[acks.size()];
083            int count = 0;
084            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
085                RemoveMessageCommand cmd = iter.next();
086                rc[count++] = cmd.getMessageAck();
087            }
088            return rc;
089        }
090
091        /**
092         * @throws IOException
093         */
094        public void commit() throws IOException {
095            ConnectionContext ctx = new ConnectionContext();
096            persistenceAdapter.beginTransaction(ctx);
097            try {
098
099                // Do all the message adds.
100                for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
101                    AddMessageCommand cmd = iter.next();
102                    cmd.run(ctx);
103                }
104                // And removes..
105                for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
106                    RemoveMessageCommand cmd = iter.next();
107                    cmd.run(ctx);
108                }
109
110            } catch ( IOException e ) {
111                persistenceAdapter.rollbackTransaction(ctx);
112                throw e;
113            }
114            persistenceAdapter.commitTransaction(ctx);
115        }
116    }
117
118    public interface AddMessageCommand {
119        Message getMessage();
120
121        MessageStore getMessageStore();
122
123        void run(ConnectionContext context) throws IOException;
124
125        void setMessageStore(MessageStore messageStore);
126    }
127
128    public interface RemoveMessageCommand {
129        MessageAck getMessageAck();
130
131        void run(ConnectionContext context) throws IOException;
132
133        MessageStore getMessageStore();
134    }
135
136    public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
137        this.persistenceAdapter=persistenceAdapter;
138    }
139
140    public MessageStore proxy(MessageStore messageStore) {
141        ProxyMessageStore proxyMessageStore = new ProxyMessageStore(messageStore) {
142            @Override
143            public void addMessage(ConnectionContext context, final Message send) throws IOException {
144                MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
145            }
146
147            @Override
148            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
149                MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
150            }
151
152            @Override
153            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
154                MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
155                return new InlineListenableFuture();
156             }
157
158            @Override
159            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
160                MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
161                return new InlineListenableFuture();
162             }
163
164            @Override
165            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
166                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
167            }
168
169            @Override
170            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
171                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
172            }
173        };
174        onProxyQueueStore(proxyMessageStore);
175        return proxyMessageStore;
176    }
177
178    protected void onProxyQueueStore(ProxyMessageStore proxyMessageStore) {
179    }
180
181    public TopicMessageStore proxy(TopicMessageStore messageStore) {
182        ProxyTopicMessageStore proxyTopicMessageStore = new ProxyTopicMessageStore(messageStore) {
183            @Override
184            public void addMessage(ConnectionContext context, final Message send) throws IOException {
185                MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
186            }
187
188            @Override
189            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
190                MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
191            }
192
193            @Override
194            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
195                MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
196                return new InlineListenableFuture();
197             }
198
199            @Override
200            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
201                MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
202                return new InlineListenableFuture();
203             }
204
205            @Override
206            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
207                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
208            }
209
210            @Override
211            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
212                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
213            }
214
215            @Override
216            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
217                            MessageId messageId, MessageAck ack) throws IOException {
218                MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId,
219                        subscriptionName, messageId, ack);
220            }
221        };
222        onProxyTopicStore(proxyTopicMessageStore);
223        return proxyTopicMessageStore;
224    }
225
226    protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
227    }
228
229    /**
230     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
231     */
232    @Override
233    public void prepare(TransactionId txid) throws IOException {
234        Tx tx = inflightTransactions.remove(txid);
235        if (tx == null) {
236            return;
237        }
238        preparedTransactions.put(txid, tx);
239    }
240
241    public Tx getTx(Object txid) {
242        Tx tx = inflightTransactions.get(txid);
243        if (tx == null) {
244            tx = new Tx();
245            inflightTransactions.put(txid, tx);
246        }
247        return tx;
248    }
249
250    public Tx getPreparedTx(TransactionId txid) {
251        Tx tx = preparedTransactions.get(txid);
252        if (tx == null) {
253            tx = new Tx();
254            preparedTransactions.put(txid, tx);
255        }
256        return tx;
257    }
258
259    @Override
260    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
261        if (preCommit != null) {
262            preCommit.run();
263        }
264        Tx tx;
265        if (wasPrepared) {
266            tx = preparedTransactions.remove(txid);
267        } else {
268            tx = inflightTransactions.remove(txid);
269        }
270
271        if (tx != null) {
272            tx.commit();
273        }
274        if (postCommit != null) {
275            postCommit.run();
276        }
277    }
278
279    /**
280     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
281     */
282    @Override
283    public void rollback(TransactionId txid) throws IOException {
284        preparedTransactions.remove(txid);
285        inflightTransactions.remove(txid);
286    }
287
288    @Override
289    public void start() throws Exception {
290    }
291
292    @Override
293    public void stop() throws Exception {
294    }
295
296    @Override
297    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
298        // All the inflight transactions get rolled back..
299        inflightTransactions.clear();
300        this.doingRecover = true;
301        try {
302            for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
303                Object txid = iter.next();
304                Tx tx = preparedTransactions.get(txid);
305                listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
306                onRecovered(tx);
307            }
308        } finally {
309            this.doingRecover = false;
310        }
311    }
312
313    protected void onRecovered(Tx tx) {
314    }
315
316    /**
317     * @param message
318     * @throws IOException
319     */
320    void addMessage(final ConnectionContext context, final MessageStore destination, final Message message) throws IOException {
321
322        if (doingRecover) {
323            return;
324        }
325
326        if (message.getTransactionId() != null) {
327            Tx tx = getTx(message.getTransactionId());
328            tx.add(new AddMessageCommand() {
329                MessageStore messageStore = destination;
330                @Override
331                public Message getMessage() {
332                    return message;
333                }
334
335                @Override
336                public MessageStore getMessageStore() {
337                    return destination;
338                }
339
340                @Override
341                public void run(ConnectionContext ctx) throws IOException {
342                    destination.addMessage(ctx, message);
343                }
344
345                @Override
346                public void setMessageStore(MessageStore messageStore) {
347                    this.messageStore = messageStore;
348                }
349
350            });
351        } else {
352            destination.addMessage(context, message);
353        }
354    }
355
356    /**
357     * @param ack
358     * @throws IOException
359     */
360    final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
361        if (doingRecover) {
362            return;
363        }
364
365        if (ack.isInTransaction()) {
366            Tx tx = getTx(ack.getTransactionId());
367            tx.add(new RemoveMessageCommand() {
368                @Override
369                public MessageAck getMessageAck() {
370                    return ack;
371                }
372
373                @Override
374                public void run(ConnectionContext ctx) throws IOException {
375                    destination.removeMessage(ctx, ack);
376                }
377
378                @Override
379                public MessageStore getMessageStore() {
380                    return destination;
381                }
382            });
383        } else {
384            destination.removeMessage(null, ack);
385        }
386    }
387
388    public void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
389                           final MessageId messageId, final MessageAck ack) throws IOException {
390        if (doingRecover) {
391            return;
392        }
393
394        if (ack.isInTransaction()) {
395            Tx tx = getTx(ack.getTransactionId());
396            tx.add(new RemoveMessageCommand() {
397                @Override
398                public MessageAck getMessageAck() {
399                    return ack;
400                }
401
402                @Override
403                public void run(ConnectionContext ctx) throws IOException {
404                    destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
405                }
406
407                @Override
408                public MessageStore getMessageStore() {
409                    return destination;
410                }
411            });
412        } else {
413            destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
414        }
415    }
416
417
418    public void delete() {
419        inflightTransactions.clear();
420        preparedTransactions.clear();
421        doingRecover = false;
422    }
423
424}