001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Set;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.FutureTask;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.Semaphore;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.TimeoutException;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicInteger;
042
043import org.apache.activemq.broker.ConnectionContext;
044import org.apache.activemq.broker.region.BaseDestination;
045import org.apache.activemq.broker.scheduler.JobSchedulerStore;
046import org.apache.activemq.command.ActiveMQDestination;
047import org.apache.activemq.command.ActiveMQQueue;
048import org.apache.activemq.command.ActiveMQTempQueue;
049import org.apache.activemq.command.ActiveMQTempTopic;
050import org.apache.activemq.command.ActiveMQTopic;
051import org.apache.activemq.command.Message;
052import org.apache.activemq.command.MessageAck;
053import org.apache.activemq.command.MessageId;
054import org.apache.activemq.command.ProducerId;
055import org.apache.activemq.command.SubscriptionInfo;
056import org.apache.activemq.command.TransactionId;
057import org.apache.activemq.openwire.OpenWireFormat;
058import org.apache.activemq.protobuf.Buffer;
059import org.apache.activemq.store.AbstractMessageStore;
060import org.apache.activemq.store.IndexListener;
061import org.apache.activemq.store.ListenableFuture;
062import org.apache.activemq.store.MessageRecoveryListener;
063import org.apache.activemq.store.MessageStore;
064import org.apache.activemq.store.MessageStoreStatistics;
065import org.apache.activemq.store.PersistenceAdapter;
066import org.apache.activemq.store.TopicMessageStore;
067import org.apache.activemq.store.TransactionIdTransformer;
068import org.apache.activemq.store.TransactionStore;
069import org.apache.activemq.store.kahadb.MessageDatabase.Metadata;
070import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
071import org.apache.activemq.store.kahadb.data.KahaDestination;
072import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
073import org.apache.activemq.store.kahadb.data.KahaLocation;
074import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
075import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
076import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
077import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
078import org.apache.activemq.store.kahadb.disk.journal.Location;
079import org.apache.activemq.store.kahadb.disk.page.Transaction;
080import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
081import org.apache.activemq.usage.MemoryUsage;
082import org.apache.activemq.usage.SystemUsage;
083import org.apache.activemq.util.ServiceStopper;
084import org.apache.activemq.util.ThreadPoolUtils;
085import org.apache.activemq.wireformat.WireFormat;
086import org.slf4j.Logger;
087import org.slf4j.LoggerFactory;
088
089public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
090    static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
091    private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH;
092
093    public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
094    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
095            PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
096    public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
097    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
098            PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
099
100    protected ExecutorService queueExecutor;
101    protected ExecutorService topicExecutor;
102    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
103    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
104    final WireFormat wireFormat = new OpenWireFormat();
105    private SystemUsage usageManager;
106    private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
107    private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
108    Semaphore globalQueueSemaphore;
109    Semaphore globalTopicSemaphore;
110    private boolean concurrentStoreAndDispatchQueues = true;
111    // when true, message order may be compromised when cache is exhausted if store is out
112    // or order w.r.t cache
113    private boolean concurrentStoreAndDispatchTopics = false;
114    private final boolean concurrentStoreAndDispatchTransactions = false;
115    private int maxAsyncJobs = MAX_ASYNC_JOBS;
116    private final KahaDBTransactionStore transactionStore;
117    private TransactionIdTransformer transactionIdTransformer;
118
119    public KahaDBStore() {
120        this.transactionStore = new KahaDBTransactionStore(this);
121        this.transactionIdTransformer = new TransactionIdTransformer() {
122            @Override
123            public TransactionId transform(TransactionId txid) {
124                return txid;
125            }
126        };
127    }
128
129    @Override
130    public String toString() {
131        return "KahaDB:[" + directory.getAbsolutePath() + "]";
132    }
133
134    @Override
135    public void setBrokerName(String brokerName) {
136    }
137
138    @Override
139    public void setUsageManager(SystemUsage usageManager) {
140        this.usageManager = usageManager;
141    }
142
143    public SystemUsage getUsageManager() {
144        return this.usageManager;
145    }
146
147    /**
148     * @return the concurrentStoreAndDispatch
149     */
150    public boolean isConcurrentStoreAndDispatchQueues() {
151        return this.concurrentStoreAndDispatchQueues;
152    }
153
154    /**
155     * @param concurrentStoreAndDispatch
156     *            the concurrentStoreAndDispatch to set
157     */
158    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
159        this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
160    }
161
162    /**
163     * @return the concurrentStoreAndDispatch
164     */
165    public boolean isConcurrentStoreAndDispatchTopics() {
166        return this.concurrentStoreAndDispatchTopics;
167    }
168
169    /**
170     * @param concurrentStoreAndDispatch
171     *            the concurrentStoreAndDispatch to set
172     */
173    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
174        this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
175    }
176
177    public boolean isConcurrentStoreAndDispatchTransactions() {
178        return this.concurrentStoreAndDispatchTransactions;
179    }
180
181    /**
182     * @return the maxAsyncJobs
183     */
184    public int getMaxAsyncJobs() {
185        return this.maxAsyncJobs;
186    }
187
188    /**
189     * @param maxAsyncJobs
190     *            the maxAsyncJobs to set
191     */
192    public void setMaxAsyncJobs(int maxAsyncJobs) {
193        this.maxAsyncJobs = maxAsyncJobs;
194    }
195
196
197    @Override
198    protected void configureMetadata() {
199        if (brokerService != null) {
200            metadata.openwireVersion = brokerService.getStoreOpenWireVersion();
201            wireFormat.setVersion(metadata.openwireVersion);
202
203            if (LOG.isDebugEnabled()) {
204                LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion);
205            }
206
207        }
208    }
209
210    @Override
211    public void doStart() throws Exception {
212        //configure the metadata before start, right now
213        //this is just the open wire version
214        configureMetadata();
215
216        super.doStart();
217
218        if (brokerService != null) {
219            // In case the recovered store used a different OpenWire version log a warning
220            // to assist in determining why journal reads fail.
221            if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) {
222                LOG.warn("Existing Store uses a different OpenWire version[{}] " +
223                         "than the version configured[{}] reverting to the version " +
224                         "used by this store, some newer broker features may not work" +
225                         "as expected.",
226                         metadata.openwireVersion, brokerService.getStoreOpenWireVersion());
227
228                // Update the broker service instance to the actual version in use.
229                wireFormat.setVersion(metadata.openwireVersion);
230                brokerService.setStoreOpenWireVersion(metadata.openwireVersion);
231            }
232        }
233
234        this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
235        this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
236        this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
237        this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
238        this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
239            asyncQueueJobQueue, new ThreadFactory() {
240                @Override
241                public Thread newThread(Runnable runnable) {
242                    Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
243                    thread.setDaemon(true);
244                    return thread;
245                }
246            });
247        this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
248            asyncTopicJobQueue, new ThreadFactory() {
249                @Override
250                public Thread newThread(Runnable runnable) {
251                    Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
252                    thread.setDaemon(true);
253                    return thread;
254                }
255            });
256    }
257
258    @Override
259    public void doStop(ServiceStopper stopper) throws Exception {
260        // drain down async jobs
261        LOG.info("Stopping async queue tasks");
262        if (this.globalQueueSemaphore != null) {
263            this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
264        }
265        synchronized (this.asyncQueueMaps) {
266            for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
267                synchronized (m) {
268                    for (StoreTask task : m.values()) {
269                        task.cancel();
270                    }
271                }
272            }
273            this.asyncQueueMaps.clear();
274        }
275        LOG.info("Stopping async topic tasks");
276        if (this.globalTopicSemaphore != null) {
277            this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
278        }
279        synchronized (this.asyncTopicMaps) {
280            for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
281                synchronized (m) {
282                    for (StoreTask task : m.values()) {
283                        task.cancel();
284                    }
285                }
286            }
287            this.asyncTopicMaps.clear();
288        }
289        if (this.globalQueueSemaphore != null) {
290            this.globalQueueSemaphore.drainPermits();
291        }
292        if (this.globalTopicSemaphore != null) {
293            this.globalTopicSemaphore.drainPermits();
294        }
295        if (this.queueExecutor != null) {
296            ThreadPoolUtils.shutdownNow(queueExecutor);
297            queueExecutor = null;
298        }
299        if (this.topicExecutor != null) {
300            ThreadPoolUtils.shutdownNow(topicExecutor);
301            topicExecutor = null;
302        }
303        LOG.info("Stopped KahaDB");
304        super.doStop(stopper);
305    }
306
307    private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
308        return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
309            @Override
310            public Location execute(Transaction tx) throws IOException {
311                StoredDestination sd = getStoredDestination(destination, tx);
312                Long sequence = sd.messageIdIndex.get(tx, key);
313                if (sequence == null) {
314                    return null;
315                }
316                return sd.orderIndex.get(tx, sequence).location;
317            }
318        });
319    }
320
321    protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
322        StoreQueueTask task = null;
323        synchronized (store.asyncTaskMap) {
324            task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
325        }
326        return task;
327    }
328
329    protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
330        synchronized (store.asyncTaskMap) {
331            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
332        }
333        this.queueExecutor.execute(task);
334    }
335
336    protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
337        StoreTopicTask task = null;
338        synchronized (store.asyncTaskMap) {
339            task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
340        }
341        return task;
342    }
343
344    protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
345        synchronized (store.asyncTaskMap) {
346            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
347        }
348        this.topicExecutor.execute(task);
349    }
350
351    @Override
352    public TransactionStore createTransactionStore() throws IOException {
353        return this.transactionStore;
354    }
355
356    public boolean getForceRecoverIndex() {
357        return this.forceRecoverIndex;
358    }
359
360    public void setForceRecoverIndex(boolean forceRecoverIndex) {
361        this.forceRecoverIndex = forceRecoverIndex;
362    }
363
364    public class KahaDBMessageStore extends AbstractMessageStore {
365        protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
366        protected KahaDestination dest;
367        private final int maxAsyncJobs;
368        private final Semaphore localDestinationSemaphore;
369
370        double doneTasks, canceledTasks = 0;
371
372        public KahaDBMessageStore(ActiveMQDestination destination) {
373            super(destination);
374            this.dest = convert(destination);
375            this.maxAsyncJobs = getMaxAsyncJobs();
376            this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
377        }
378
379        @Override
380        public ActiveMQDestination getDestination() {
381            return destination;
382        }
383
384        @Override
385        public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
386                throws IOException {
387            if (isConcurrentStoreAndDispatchQueues()) {
388                StoreQueueTask result = new StoreQueueTask(this, context, message);
389                ListenableFuture<Object> future = result.getFuture();
390                message.getMessageId().setFutureOrSequenceLong(future);
391                message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch
392                result.aquireLocks();
393                addQueueTask(this, result);
394                if (indexListener != null) {
395                    indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
396                }
397                return future;
398            } else {
399                return super.asyncAddQueueMessage(context, message);
400            }
401        }
402
403        @Override
404        public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
405            if (isConcurrentStoreAndDispatchQueues()) {
406                AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
407                StoreQueueTask task = null;
408                synchronized (asyncTaskMap) {
409                    task = (StoreQueueTask) asyncTaskMap.get(key);
410                }
411                if (task != null) {
412                    if (ack.isInTransaction() || !task.cancel()) {
413                        try {
414                            task.future.get();
415                        } catch (InterruptedException e) {
416                            throw new InterruptedIOException(e.toString());
417                        } catch (Exception ignored) {
418                            LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
419                        }
420                        removeMessage(context, ack);
421                    } else {
422                        synchronized (asyncTaskMap) {
423                            asyncTaskMap.remove(key);
424                        }
425                    }
426                } else {
427                    removeMessage(context, ack);
428                }
429            } else {
430                removeMessage(context, ack);
431            }
432        }
433
434        @Override
435        public void addMessage(final ConnectionContext context, final Message message) throws IOException {
436            final KahaAddMessageCommand command = new KahaAddMessageCommand();
437            command.setDestination(dest);
438            command.setMessageId(message.getMessageId().toProducerKey());
439            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
440            command.setPriority(message.getPriority());
441            command.setPrioritySupported(isPrioritizedMessages());
442            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
443            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
444            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() {
445                // sync add? (for async, future present from getFutureOrSequenceLong)
446                Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
447
448                @Override
449                public void sequenceAssignedWithIndexLocked(final long sequence) {
450                    message.getMessageId().setFutureOrSequenceLong(sequence);
451                    if (indexListener != null) {
452                        if (possibleFuture == null) {
453                            trackPendingAdd(dest, sequence);
454                            indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
455                                @Override
456                                public void run() {
457                                    trackPendingAddComplete(dest, sequence);
458                                }
459                            }));
460                        }
461                    }
462                }
463            }, null);
464        }
465
466        @Override
467        public void updateMessage(Message message) throws IOException {
468            if (LOG.isTraceEnabled()) {
469                LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
470            }
471            KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
472            KahaAddMessageCommand command = new KahaAddMessageCommand();
473            command.setDestination(dest);
474            command.setMessageId(message.getMessageId().toProducerKey());
475            command.setPriority(message.getPriority());
476            command.setPrioritySupported(prioritizedMessages);
477            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
478            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
479            updateMessageCommand.setMessage(command);
480            store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null);
481        }
482
483        @Override
484        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
485            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
486            command.setDestination(dest);
487            command.setMessageId(ack.getLastMessageId().toProducerKey());
488            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())));
489
490            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
491            command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
492            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
493        }
494
495        @Override
496        public void removeAllMessages(ConnectionContext context) throws IOException {
497            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
498            command.setDestination(dest);
499            store(command, true, null, null);
500        }
501
502        @Override
503        public Message getMessage(MessageId identity) throws IOException {
504            final String key = identity.toProducerKey();
505
506            // Hopefully one day the page file supports concurrent read
507            // operations... but for now we must
508            // externally synchronize...
509            Location location;
510            indexLock.writeLock().lock();
511            try {
512                location = findMessageLocation(key, dest);
513            } finally {
514                indexLock.writeLock().unlock();
515            }
516            if (location == null) {
517                return null;
518            }
519
520            return loadMessage(location);
521        }
522
523        @Override
524        public boolean isEmpty() throws IOException {
525            indexLock.writeLock().lock();
526            try {
527                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
528                    @Override
529                    public Boolean execute(Transaction tx) throws IOException {
530                        // Iterate through all index entries to get a count of
531                        // messages in the destination.
532                        StoredDestination sd = getStoredDestination(dest, tx);
533                        return sd.locationIndex.isEmpty(tx);
534                    }
535                });
536            } finally {
537                indexLock.writeLock().unlock();
538            }
539        }
540
541        @Override
542        public void recover(final MessageRecoveryListener listener) throws Exception {
543            // recovery may involve expiry which will modify
544            indexLock.writeLock().lock();
545            try {
546                pageFile.tx().execute(new Transaction.Closure<Exception>() {
547                    @Override
548                    public void execute(Transaction tx) throws Exception {
549                        StoredDestination sd = getStoredDestination(dest, tx);
550                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
551                        sd.orderIndex.resetCursorPosition();
552                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
553                                .hasNext(); ) {
554                            Entry<Long, MessageKeys> entry = iterator.next();
555                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
556                                continue;
557                            }
558                            Message msg = loadMessage(entry.getValue().location);
559                            listener.recoverMessage(msg);
560                        }
561                    }
562                });
563            } finally {
564                indexLock.writeLock().unlock();
565            }
566        }
567
568        @Override
569        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
570            indexLock.writeLock().lock();
571            try {
572                pageFile.tx().execute(new Transaction.Closure<Exception>() {
573                    @Override
574                    public void execute(Transaction tx) throws Exception {
575                        StoredDestination sd = getStoredDestination(dest, tx);
576                        Entry<Long, MessageKeys> entry = null;
577                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
578                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
579                            entry = iterator.next();
580                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
581                                continue;
582                            }
583                            Message msg = loadMessage(entry.getValue().location);
584                            msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
585                            listener.recoverMessage(msg);
586                            counter++;
587                            if (counter >= maxReturned) {
588                                break;
589                            }
590                        }
591                        sd.orderIndex.stoppedIterating();
592                    }
593                });
594            } finally {
595                indexLock.writeLock().unlock();
596            }
597        }
598
599        protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
600            int counter = 0;
601            String id;
602            for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) {
603                id = iterator.next();
604                iterator.remove();
605                Long sequence = sd.messageIdIndex.get(tx, id);
606                if (sequence != null) {
607                    if (sd.orderIndex.alreadyDispatched(sequence)) {
608                        listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location));
609                        counter++;
610                        if (counter >= maxReturned) {
611                            break;
612                        }
613                    } else {
614                        LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
615                    }
616                } else {
617                    LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd);
618                }
619            }
620            return counter;
621        }
622
623
624        @Override
625        public void resetBatching() {
626            if (pageFile.isLoaded()) {
627                indexLock.writeLock().lock();
628                try {
629                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
630                        @Override
631                        public void execute(Transaction tx) throws Exception {
632                            StoredDestination sd = getExistingStoredDestination(dest, tx);
633                            if (sd != null) {
634                                sd.orderIndex.resetCursorPosition();}
635                            }
636                        });
637                } catch (Exception e) {
638                    LOG.error("Failed to reset batching",e);
639                } finally {
640                    indexLock.writeLock().unlock();
641                }
642            }
643        }
644
645        @Override
646        public void setBatch(final MessageId identity) throws IOException {
647            indexLock.writeLock().lock();
648            try {
649                pageFile.tx().execute(new Transaction.Closure<IOException>() {
650                    @Override
651                    public void execute(Transaction tx) throws IOException {
652                        StoredDestination sd = getStoredDestination(dest, tx);
653                        Long location = (Long) identity.getFutureOrSequenceLong();
654                        Long pending = sd.orderIndex.minPendingAdd();
655                        if (pending != null) {
656                            location = Math.min(location, pending-1);
657                        }
658                        sd.orderIndex.setBatch(tx, location);
659                    }
660                });
661            } finally {
662                indexLock.writeLock().unlock();
663            }
664        }
665
666        @Override
667        public void setMemoryUsage(MemoryUsage memoryUsage) {
668        }
669        @Override
670        public void start() throws Exception {
671            super.start();
672        }
673        @Override
674        public void stop() throws Exception {
675            super.stop();
676        }
677
678        protected void lockAsyncJobQueue() {
679            try {
680                if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) {
681                    throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore);
682                }
683            } catch (Exception e) {
684                LOG.error("Failed to lock async jobs for " + this.destination, e);
685            }
686        }
687
688        protected void unlockAsyncJobQueue() {
689            this.localDestinationSemaphore.release(this.maxAsyncJobs);
690        }
691
692        protected void acquireLocalAsyncLock() {
693            try {
694                this.localDestinationSemaphore.acquire();
695            } catch (InterruptedException e) {
696                LOG.error("Failed to aquire async lock for " + this.destination, e);
697            }
698        }
699
700        protected void releaseLocalAsyncLock() {
701            this.localDestinationSemaphore.release();
702        }
703
704        @Override
705        public String toString(){
706            return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
707        }
708
709        @Override
710        protected void recoverMessageStoreStatistics() throws IOException {
711            try {
712                MessageStoreStatistics recoveredStatistics;
713                lockAsyncJobQueue();
714                indexLock.writeLock().lock();
715                try {
716                    recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() {
717                        @Override
718                        public MessageStoreStatistics execute(Transaction tx) throws IOException {
719                            MessageStoreStatistics statistics = new MessageStoreStatistics();
720
721                            // Iterate through all index entries to get the size of each message
722                            StoredDestination sd = getStoredDestination(dest, tx);
723                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
724                                int locationSize = iterator.next().getKey().getSize();
725                                statistics.getMessageCount().increment();
726                                statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
727                            }
728                           return statistics;
729                        }
730                    });
731                    getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
732                    getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
733                } finally {
734                    indexLock.writeLock().unlock();
735                }
736            } finally {
737                unlockAsyncJobQueue();
738            }
739        }
740    }
741
742    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
743        private final AtomicInteger subscriptionCount = new AtomicInteger();
744        public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
745            super(destination);
746            this.subscriptionCount.set(getAllSubscriptions().length);
747            if (isConcurrentStoreAndDispatchTopics()) {
748                asyncTopicMaps.add(asyncTaskMap);
749            }
750        }
751
752        @Override
753        public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
754                throws IOException {
755            if (isConcurrentStoreAndDispatchTopics()) {
756                StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
757                result.aquireLocks();
758                addTopicTask(this, result);
759                return result.getFuture();
760            } else {
761                return super.asyncAddTopicMessage(context, message);
762            }
763        }
764
765        @Override
766        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
767                                MessageId messageId, MessageAck ack) throws IOException {
768            String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
769            if (isConcurrentStoreAndDispatchTopics()) {
770                AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
771                StoreTopicTask task = null;
772                synchronized (asyncTaskMap) {
773                    task = (StoreTopicTask) asyncTaskMap.get(key);
774                }
775                if (task != null) {
776                    if (task.addSubscriptionKey(subscriptionKey)) {
777                        removeTopicTask(this, messageId);
778                        if (task.cancel()) {
779                            synchronized (asyncTaskMap) {
780                                asyncTaskMap.remove(key);
781                            }
782                        }
783                    }
784                } else {
785                    doAcknowledge(context, subscriptionKey, messageId, ack);
786                }
787            } else {
788                doAcknowledge(context, subscriptionKey, messageId, ack);
789            }
790        }
791
792        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
793                throws IOException {
794            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
795            command.setDestination(dest);
796            command.setSubscriptionKey(subscriptionKey);
797            command.setMessageId(messageId.toProducerKey());
798            command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null);
799            if (ack != null && ack.isUnmatchedAck()) {
800                command.setAck(UNMATCHED);
801            } else {
802                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
803                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
804            }
805            store(command, false, null, null);
806        }
807
808        @Override
809        public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
810            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
811                    .getSubscriptionName());
812            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
813            command.setDestination(dest);
814            command.setSubscriptionKey(subscriptionKey.toString());
815            command.setRetroactive(retroactive);
816            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
817            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
818            store(command, isEnableJournalDiskSyncs() && true, null, null);
819            this.subscriptionCount.incrementAndGet();
820        }
821
822        @Override
823        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
824            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
825            command.setDestination(dest);
826            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
827            store(command, isEnableJournalDiskSyncs() && true, null, null);
828            this.subscriptionCount.decrementAndGet();
829        }
830
831        @Override
832        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
833
834            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
835            indexLock.writeLock().lock();
836            try {
837                pageFile.tx().execute(new Transaction.Closure<IOException>() {
838                    @Override
839                    public void execute(Transaction tx) throws IOException {
840                        StoredDestination sd = getStoredDestination(dest, tx);
841                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
842                                .hasNext();) {
843                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
844                            SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
845                                    .getValue().getSubscriptionInfo().newInput()));
846                            subscriptions.add(info);
847
848                        }
849                    }
850                });
851            } finally {
852                indexLock.writeLock().unlock();
853            }
854
855            SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
856            subscriptions.toArray(rc);
857            return rc;
858        }
859
860        @Override
861        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
862            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
863            indexLock.writeLock().lock();
864            try {
865                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
866                    @Override
867                    public SubscriptionInfo execute(Transaction tx) throws IOException {
868                        StoredDestination sd = getStoredDestination(dest, tx);
869                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
870                        if (command == null) {
871                            return null;
872                        }
873                        return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
874                                .getSubscriptionInfo().newInput()));
875                    }
876                });
877            } finally {
878                indexLock.writeLock().unlock();
879            }
880        }
881
882        @Override
883        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
884            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
885            indexLock.writeLock().lock();
886            try {
887                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
888                    @Override
889                    public Integer execute(Transaction tx) throws IOException {
890                        StoredDestination sd = getStoredDestination(dest, tx);
891                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
892                        if (cursorPos == null) {
893                            // The subscription might not exist.
894                            return 0;
895                        }
896
897                        return (int) getStoredMessageCount(tx, sd, subscriptionKey);
898                    }
899                });
900            } finally {
901                indexLock.writeLock().unlock();
902            }
903        }
904
905
906        @Override
907        public long getMessageSize(String clientId, String subscriptionName) throws IOException {
908            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
909            indexLock.writeLock().lock();
910            try {
911                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
912                    @Override
913                    public Integer execute(Transaction tx) throws IOException {
914                        StoredDestination sd = getStoredDestination(dest, tx);
915                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
916                        if (cursorPos == null) {
917                            // The subscription might not exist.
918                            return 0;
919                        }
920
921                        return (int) getStoredMessageSize(tx, sd, subscriptionKey);
922                    }
923                });
924            } finally {
925                indexLock.writeLock().unlock();
926            }
927        }
928
929        @Override
930        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
931                throws Exception {
932            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
933            @SuppressWarnings("unused")
934            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
935            indexLock.writeLock().lock();
936            try {
937                pageFile.tx().execute(new Transaction.Closure<Exception>() {
938                    @Override
939                    public void execute(Transaction tx) throws Exception {
940                        StoredDestination sd = getStoredDestination(dest, tx);
941                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
942                        sd.orderIndex.setBatch(tx, cursorPos);
943                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
944                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
945                                .hasNext();) {
946                            Entry<Long, MessageKeys> entry = iterator.next();
947                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
948                                continue;
949                            }
950                            listener.recoverMessage(loadMessage(entry.getValue().location));
951                        }
952                        sd.orderIndex.resetCursorPosition();
953                    }
954                });
955            } finally {
956                indexLock.writeLock().unlock();
957            }
958        }
959
960        @Override
961        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
962                final MessageRecoveryListener listener) throws Exception {
963            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
964            @SuppressWarnings("unused")
965            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
966            indexLock.writeLock().lock();
967            try {
968                pageFile.tx().execute(new Transaction.Closure<Exception>() {
969                    @Override
970                    public void execute(Transaction tx) throws Exception {
971                        StoredDestination sd = getStoredDestination(dest, tx);
972                        sd.orderIndex.resetCursorPosition();
973                        MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
974                        if (moc == null) {
975                            LastAck pos = getLastAck(tx, sd, subscriptionKey);
976                            if (pos == null) {
977                                // sub deleted
978                                return;
979                            }
980                            sd.orderIndex.setBatch(tx, pos);
981                            moc = sd.orderIndex.cursor;
982                        } else {
983                            sd.orderIndex.cursor.sync(moc);
984                        }
985
986                        Entry<Long, MessageKeys> entry = null;
987                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
988                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
989                                .hasNext();) {
990                            entry = iterator.next();
991                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
992                                continue;
993                            }
994                            if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
995                                counter++;
996                            }
997                            if (counter >= maxReturned || listener.hasSpace() == false) {
998                                break;
999                            }
1000                        }
1001                        sd.orderIndex.stoppedIterating();
1002                        if (entry != null) {
1003                            MessageOrderCursor copy = sd.orderIndex.cursor.copy();
1004                            sd.subscriptionCursors.put(subscriptionKey, copy);
1005                        }
1006                    }
1007                });
1008            } finally {
1009                indexLock.writeLock().unlock();
1010            }
1011        }
1012
1013        @Override
1014        public void resetBatching(String clientId, String subscriptionName) {
1015            try {
1016                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1017                indexLock.writeLock().lock();
1018                try {
1019                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1020                        @Override
1021                        public void execute(Transaction tx) throws IOException {
1022                            StoredDestination sd = getStoredDestination(dest, tx);
1023                            sd.subscriptionCursors.remove(subscriptionKey);
1024                        }
1025                    });
1026                }finally {
1027                    indexLock.writeLock().unlock();
1028                }
1029            } catch (IOException e) {
1030                throw new RuntimeException(e);
1031            }
1032        }
1033    }
1034
1035    String subscriptionKey(String clientId, String subscriptionName) {
1036        return clientId + ":" + subscriptionName;
1037    }
1038
1039    @Override
1040    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
1041        String key = key(convert(destination));
1042        MessageStore store = storeCache.get(key(convert(destination)));
1043        if (store == null) {
1044            final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination));
1045            store = storeCache.putIfAbsent(key, queueStore);
1046            if (store == null) {
1047                store = queueStore;
1048            }
1049        }
1050
1051        return store;
1052    }
1053
1054    @Override
1055    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
1056        String key = key(convert(destination));
1057        MessageStore store = storeCache.get(key(convert(destination)));
1058        if (store == null) {
1059            final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
1060            store = storeCache.putIfAbsent(key, topicStore);
1061            if (store == null) {
1062                store = topicStore;
1063            }
1064        }
1065
1066        return (TopicMessageStore) store;
1067    }
1068
1069    /**
1070     * Cleanup method to remove any state associated with the given destination.
1071     * This method does not stop the message store (it might not be cached).
1072     *
1073     * @param destination
1074     *            Destination to forget
1075     */
1076    @Override
1077    public void removeQueueMessageStore(ActiveMQQueue destination) {
1078    }
1079
1080    /**
1081     * Cleanup method to remove any state associated with the given destination
1082     * This method does not stop the message store (it might not be cached).
1083     *
1084     * @param destination
1085     *            Destination to forget
1086     */
1087    @Override
1088    public void removeTopicMessageStore(ActiveMQTopic destination) {
1089    }
1090
1091    @Override
1092    public void deleteAllMessages() throws IOException {
1093        deleteAllMessages = true;
1094    }
1095
1096    @Override
1097    public Set<ActiveMQDestination> getDestinations() {
1098        try {
1099            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
1100            indexLock.writeLock().lock();
1101            try {
1102                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1103                    @Override
1104                    public void execute(Transaction tx) throws IOException {
1105                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
1106                                .hasNext();) {
1107                            Entry<String, StoredDestination> entry = iterator.next();
1108                            //Removing isEmpty topic check - see AMQ-5875
1109                            rc.add(convert(entry.getKey()));
1110                        }
1111                    }
1112                });
1113            }finally {
1114                indexLock.writeLock().unlock();
1115            }
1116            return rc;
1117        } catch (IOException e) {
1118            throw new RuntimeException(e);
1119        }
1120    }
1121
1122    @Override
1123    public long getLastMessageBrokerSequenceId() throws IOException {
1124        return 0;
1125    }
1126
1127    @Override
1128    public long getLastProducerSequenceId(ProducerId id) {
1129        indexLock.writeLock().lock();
1130        try {
1131            return metadata.producerSequenceIdTracker.getLastSeqId(id);
1132        } finally {
1133            indexLock.writeLock().unlock();
1134        }
1135    }
1136
1137    @Override
1138    public long size() {
1139        try {
1140            return journalSize.get() + getPageFile().getDiskSize();
1141        } catch (IOException e) {
1142            throw new RuntimeException(e);
1143        }
1144    }
1145
1146    @Override
1147    public void beginTransaction(ConnectionContext context) throws IOException {
1148        throw new IOException("Not yet implemented.");
1149    }
1150    @Override
1151    public void commitTransaction(ConnectionContext context) throws IOException {
1152        throw new IOException("Not yet implemented.");
1153    }
1154    @Override
1155    public void rollbackTransaction(ConnectionContext context) throws IOException {
1156        throw new IOException("Not yet implemented.");
1157    }
1158
1159    @Override
1160    public void checkpoint(boolean sync) throws IOException {
1161        super.checkpointCleanup(sync);
1162    }
1163
1164    // /////////////////////////////////////////////////////////////////
1165    // Internal helper methods.
1166    // /////////////////////////////////////////////////////////////////
1167
1168    /**
1169     * @param location
1170     * @return
1171     * @throws IOException
1172     */
1173    Message loadMessage(Location location) throws IOException {
1174        JournalCommand<?> command = load(location);
1175        KahaAddMessageCommand addMessage = null;
1176        switch (command.type()) {
1177            case KAHA_UPDATE_MESSAGE_COMMAND:
1178                addMessage = ((KahaUpdateMessageCommand)command).getMessage();
1179                break;
1180            default:
1181                addMessage = (KahaAddMessageCommand) command;
1182        }
1183        Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1184        return msg;
1185    }
1186
1187    // /////////////////////////////////////////////////////////////////
1188    // Internal conversion methods.
1189    // /////////////////////////////////////////////////////////////////
1190
1191    KahaLocation convert(Location location) {
1192        KahaLocation rc = new KahaLocation();
1193        rc.setLogId(location.getDataFileId());
1194        rc.setOffset(location.getOffset());
1195        return rc;
1196    }
1197
1198    KahaDestination convert(ActiveMQDestination dest) {
1199        KahaDestination rc = new KahaDestination();
1200        rc.setName(dest.getPhysicalName());
1201        switch (dest.getDestinationType()) {
1202        case ActiveMQDestination.QUEUE_TYPE:
1203            rc.setType(DestinationType.QUEUE);
1204            return rc;
1205        case ActiveMQDestination.TOPIC_TYPE:
1206            rc.setType(DestinationType.TOPIC);
1207            return rc;
1208        case ActiveMQDestination.TEMP_QUEUE_TYPE:
1209            rc.setType(DestinationType.TEMP_QUEUE);
1210            return rc;
1211        case ActiveMQDestination.TEMP_TOPIC_TYPE:
1212            rc.setType(DestinationType.TEMP_TOPIC);
1213            return rc;
1214        default:
1215            return null;
1216        }
1217    }
1218
1219    ActiveMQDestination convert(String dest) {
1220        int p = dest.indexOf(":");
1221        if (p < 0) {
1222            throw new IllegalArgumentException("Not in the valid destination format");
1223        }
1224        int type = Integer.parseInt(dest.substring(0, p));
1225        String name = dest.substring(p + 1);
1226        return convert(type, name);
1227    }
1228
1229    private ActiveMQDestination convert(KahaDestination commandDestination) {
1230        return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1231    }
1232
1233    private ActiveMQDestination convert(int type, String name) {
1234        switch (KahaDestination.DestinationType.valueOf(type)) {
1235        case QUEUE:
1236            return new ActiveMQQueue(name);
1237        case TOPIC:
1238            return new ActiveMQTopic(name);
1239        case TEMP_QUEUE:
1240            return new ActiveMQTempQueue(name);
1241        case TEMP_TOPIC:
1242            return new ActiveMQTempTopic(name);
1243        default:
1244            throw new IllegalArgumentException("Not in the valid destination format");
1245        }
1246    }
1247
1248    public TransactionIdTransformer getTransactionIdTransformer() {
1249        return transactionIdTransformer;
1250    }
1251
1252    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1253        this.transactionIdTransformer = transactionIdTransformer;
1254    }
1255
1256    static class AsyncJobKey {
1257        MessageId id;
1258        ActiveMQDestination destination;
1259
1260        AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1261            this.id = id;
1262            this.destination = destination;
1263        }
1264
1265        @Override
1266        public boolean equals(Object obj) {
1267            if (obj == this) {
1268                return true;
1269            }
1270            return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1271                    && destination.equals(((AsyncJobKey) obj).destination);
1272        }
1273
1274        @Override
1275        public int hashCode() {
1276            return id.hashCode() + destination.hashCode();
1277        }
1278
1279        @Override
1280        public String toString() {
1281            return destination.getPhysicalName() + "-" + id;
1282        }
1283    }
1284
1285    public interface StoreTask {
1286        public boolean cancel();
1287
1288        public void aquireLocks();
1289
1290        public void releaseLocks();
1291    }
1292
1293    class StoreQueueTask implements Runnable, StoreTask {
1294        protected final Message message;
1295        protected final ConnectionContext context;
1296        protected final KahaDBMessageStore store;
1297        protected final InnerFutureTask future;
1298        protected final AtomicBoolean done = new AtomicBoolean();
1299        protected final AtomicBoolean locked = new AtomicBoolean();
1300
1301        public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1302            this.store = store;
1303            this.context = context;
1304            this.message = message;
1305            this.future = new InnerFutureTask(this);
1306        }
1307
1308        public ListenableFuture<Object> getFuture() {
1309            return this.future;
1310        }
1311
1312        @Override
1313        public boolean cancel() {
1314            if (this.done.compareAndSet(false, true)) {
1315                return this.future.cancel(false);
1316            }
1317            return false;
1318        }
1319
1320        @Override
1321        public void aquireLocks() {
1322            if (this.locked.compareAndSet(false, true)) {
1323                try {
1324                    globalQueueSemaphore.acquire();
1325                    store.acquireLocalAsyncLock();
1326                    message.incrementReferenceCount();
1327                } catch (InterruptedException e) {
1328                    LOG.warn("Failed to aquire lock", e);
1329                }
1330            }
1331
1332        }
1333
1334        @Override
1335        public void releaseLocks() {
1336            if (this.locked.compareAndSet(true, false)) {
1337                store.releaseLocalAsyncLock();
1338                globalQueueSemaphore.release();
1339                message.decrementReferenceCount();
1340            }
1341        }
1342
1343        @Override
1344        public void run() {
1345            this.store.doneTasks++;
1346            try {
1347                if (this.done.compareAndSet(false, true)) {
1348                    this.store.addMessage(context, message);
1349                    removeQueueTask(this.store, this.message.getMessageId());
1350                    this.future.complete();
1351                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1352                    System.err.println(this.store.dest.getName() + " cancelled: "
1353                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1354                    this.store.canceledTasks = this.store.doneTasks = 0;
1355                }
1356            } catch (Exception e) {
1357                this.future.setException(e);
1358            }
1359        }
1360
1361        protected Message getMessage() {
1362            return this.message;
1363        }
1364
1365        private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object>  {
1366
1367            private Runnable listener;
1368            public InnerFutureTask(Runnable runnable) {
1369                super(runnable, null);
1370
1371            }
1372
1373            public void setException(final Exception e) {
1374                super.setException(e);
1375            }
1376
1377            public void complete() {
1378                super.set(null);
1379            }
1380
1381            @Override
1382            public void done() {
1383                fireListener();
1384            }
1385
1386            @Override
1387            public void addListener(Runnable listener) {
1388                this.listener = listener;
1389                if (isDone()) {
1390                    fireListener();
1391                }
1392            }
1393
1394            private void fireListener() {
1395                if (listener != null) {
1396                    try {
1397                        listener.run();
1398                    } catch (Exception ignored) {
1399                        LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored);
1400                    }
1401                }
1402            }
1403        }
1404    }
1405
1406    class StoreTopicTask extends StoreQueueTask {
1407        private final int subscriptionCount;
1408        private final List<String> subscriptionKeys = new ArrayList<String>(1);
1409        private final KahaDBTopicMessageStore topicStore;
1410        public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1411                int subscriptionCount) {
1412            super(store, context, message);
1413            this.topicStore = store;
1414            this.subscriptionCount = subscriptionCount;
1415
1416        }
1417
1418        @Override
1419        public void aquireLocks() {
1420            if (this.locked.compareAndSet(false, true)) {
1421                try {
1422                    globalTopicSemaphore.acquire();
1423                    store.acquireLocalAsyncLock();
1424                    message.incrementReferenceCount();
1425                } catch (InterruptedException e) {
1426                    LOG.warn("Failed to aquire lock", e);
1427                }
1428            }
1429        }
1430
1431        @Override
1432        public void releaseLocks() {
1433            if (this.locked.compareAndSet(true, false)) {
1434                message.decrementReferenceCount();
1435                store.releaseLocalAsyncLock();
1436                globalTopicSemaphore.release();
1437            }
1438        }
1439
1440        /**
1441         * add a key
1442         *
1443         * @param key
1444         * @return true if all acknowledgements received
1445         */
1446        public boolean addSubscriptionKey(String key) {
1447            synchronized (this.subscriptionKeys) {
1448                this.subscriptionKeys.add(key);
1449            }
1450            return this.subscriptionKeys.size() >= this.subscriptionCount;
1451        }
1452
1453        @Override
1454        public void run() {
1455            this.store.doneTasks++;
1456            try {
1457                if (this.done.compareAndSet(false, true)) {
1458                    this.topicStore.addMessage(context, message);
1459                    // apply any acks we have
1460                    synchronized (this.subscriptionKeys) {
1461                        for (String key : this.subscriptionKeys) {
1462                            this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1463
1464                        }
1465                    }
1466                    removeTopicTask(this.topicStore, this.message.getMessageId());
1467                    this.future.complete();
1468                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1469                    System.err.println(this.store.dest.getName() + " cancelled: "
1470                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1471                    this.store.canceledTasks = this.store.doneTasks = 0;
1472                }
1473            } catch (Exception e) {
1474                this.future.setException(e);
1475            }
1476        }
1477    }
1478
1479    public class StoreTaskExecutor extends ThreadPoolExecutor {
1480
1481        public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1482            super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1483        }
1484
1485        @Override
1486        protected void afterExecute(Runnable runnable, Throwable throwable) {
1487            super.afterExecute(runnable, throwable);
1488
1489            if (runnable instanceof StoreTask) {
1490               ((StoreTask)runnable).releaseLocks();
1491            }
1492        }
1493    }
1494
1495    @Override
1496    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
1497        return new JobSchedulerStoreImpl();
1498    }
1499}