001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.Map.Entry;
026import java.util.Set;
027
028import org.apache.activemq.broker.BrokerService;
029import org.apache.activemq.broker.BrokerServiceAware;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.broker.scheduler.JobSchedulerStore;
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.ActiveMQQueue;
034import org.apache.activemq.command.ActiveMQTempQueue;
035import org.apache.activemq.command.ActiveMQTempTopic;
036import org.apache.activemq.command.ActiveMQTopic;
037import org.apache.activemq.command.Message;
038import org.apache.activemq.command.MessageAck;
039import org.apache.activemq.command.MessageId;
040import org.apache.activemq.command.ProducerId;
041import org.apache.activemq.command.SubscriptionInfo;
042import org.apache.activemq.command.TransactionId;
043import org.apache.activemq.command.XATransactionId;
044import org.apache.activemq.openwire.OpenWireFormat;
045import org.apache.activemq.protobuf.Buffer;
046import org.apache.activemq.store.AbstractMessageStore;
047import org.apache.activemq.store.MessageRecoveryListener;
048import org.apache.activemq.store.MessageStore;
049import org.apache.activemq.store.PersistenceAdapter;
050import org.apache.activemq.store.TopicMessageStore;
051import org.apache.activemq.store.TransactionRecoveryListener;
052import org.apache.activemq.store.TransactionStore;
053import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
054import org.apache.activemq.store.kahadb.data.KahaDestination;
055import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
056import org.apache.activemq.store.kahadb.data.KahaLocation;
057import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
058import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
059import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
060import org.apache.activemq.store.kahadb.disk.journal.Location;
061import org.apache.activemq.store.kahadb.disk.page.Transaction;
062import org.apache.activemq.usage.MemoryUsage;
063import org.apache.activemq.usage.SystemUsage;
064import org.apache.activemq.util.ByteSequence;
065import org.apache.activemq.wireformat.WireFormat;
066
067public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware {
068
069    private final WireFormat wireFormat = new OpenWireFormat();
070    private BrokerService brokerService;
071
072    @Override
073    public void setBrokerName(String brokerName) {
074    }
075    @Override
076    public void setUsageManager(SystemUsage usageManager) {
077    }
078
079    @Override
080    public TransactionStore createTransactionStore() throws IOException {
081        return new TransactionStore(){
082
083            @Override
084            public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
085                if (preCommit != null) {
086                    preCommit.run();
087                }
088                processCommit(txid);
089                if (postCommit != null) {
090                    postCommit.run();
091                }
092            }
093            @Override
094            public void prepare(TransactionId txid) throws IOException {
095                processPrepare(txid);
096            }
097            @Override
098            public void rollback(TransactionId txid) throws IOException {
099                processRollback(txid);
100            }
101            @Override
102            public void recover(TransactionRecoveryListener listener) throws IOException {
103                for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
104                    XATransactionId xid = (XATransactionId)entry.getKey();
105                    ArrayList<Message> messageList = new ArrayList<Message>();
106                    ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
107
108                    for (Operation op : entry.getValue()) {
109                        if( op.getClass() == AddOpperation.class ) {
110                            AddOpperation addOp = (AddOpperation)op;
111                            Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) );
112                            messageList.add(msg);
113                        } else {
114                            RemoveOpperation rmOp = (RemoveOpperation)op;
115                            MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) );
116                            ackList.add(ack);
117                        }
118                    }
119
120                    Message[] addedMessages = new Message[messageList.size()];
121                    MessageAck[] acks = new MessageAck[ackList.size()];
122                    messageList.toArray(addedMessages);
123                    ackList.toArray(acks);
124                    listener.recover(xid, addedMessages, acks);
125                }
126            }
127            @Override
128            public void start() throws Exception {
129            }
130            @Override
131            public void stop() throws Exception {
132            }
133        };
134    }
135
136    public class KahaDBMessageStore extends AbstractMessageStore {
137        protected KahaDestination dest;
138
139        public KahaDBMessageStore(ActiveMQDestination destination) {
140            super(destination);
141            this.dest = convert( destination );
142        }
143
144        @Override
145        public ActiveMQDestination getDestination() {
146            return destination;
147        }
148
149        @Override
150        public void addMessage(ConnectionContext context, Message message) throws IOException {
151            KahaAddMessageCommand command = new KahaAddMessageCommand();
152            command.setDestination(dest);
153            command.setMessageId(message.getMessageId().toProducerKey());
154            processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
155        }
156
157        @Override
158        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
159            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
160            command.setDestination(dest);
161            command.setMessageId(ack.getLastMessageId().toProducerKey());
162            processRemove(command, ack.getTransactionId());
163        }
164
165        @Override
166        public void removeAllMessages(ConnectionContext context) throws IOException {
167            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
168            command.setDestination(dest);
169            process(command);
170        }
171
172        @Override
173        public Message getMessage(MessageId identity) throws IOException {
174            final String key = identity.toProducerKey();
175
176            // Hopefully one day the page file supports concurrent read operations... but for now we must
177            // externally synchronize...
178            ByteSequence data;
179            synchronized(indexMutex) {
180                data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){
181                    @Override
182                    public ByteSequence execute(Transaction tx) throws IOException {
183                        StoredDestination sd = getStoredDestination(dest, tx);
184                        Long sequence = sd.messageIdIndex.get(tx, key);
185                        if( sequence ==null ) {
186                            return null;
187                        }
188                        return sd.orderIndex.get(tx, sequence).data;
189                    }
190                });
191            }
192            if( data == null ) {
193                return null;
194            }
195
196            Message msg = (Message)wireFormat.unmarshal( data );
197            return msg;
198        }
199
200        @Override
201        public void recover(final MessageRecoveryListener listener) throws Exception {
202            synchronized(indexMutex) {
203                pageFile.tx().execute(new Transaction.Closure<Exception>(){
204                    @Override
205                    public void execute(Transaction tx) throws Exception {
206                        StoredDestination sd = getStoredDestination(dest, tx);
207                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
208                            Entry<Long, MessageRecord> entry = iterator.next();
209                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data) );
210                        }
211                    }
212                });
213            }
214        }
215
216        long cursorPos=0;
217
218        @Override
219        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
220            synchronized(indexMutex) {
221                pageFile.tx().execute(new Transaction.Closure<Exception>(){
222                    @Override
223                    public void execute(Transaction tx) throws Exception {
224                        StoredDestination sd = getStoredDestination(dest, tx);
225                        Entry<Long, MessageRecord> entry=null;
226                        int counter = 0;
227                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
228                            entry = iterator.next();
229                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
230                            counter++;
231                            if( counter >= maxReturned ) {
232                                break;
233                            }
234                        }
235                        if( entry!=null ) {
236                            cursorPos = entry.getKey()+1;
237                        }
238                    }
239                });
240            }
241        }
242
243        @Override
244        public void resetBatching() {
245            cursorPos=0;
246        }
247
248
249        @Override
250        public void setBatch(MessageId identity) throws IOException {
251            final String key = identity.toProducerKey();
252
253            // Hopefully one day the page file supports concurrent read operations... but for now we must
254            // externally synchronize...
255            Long location;
256            synchronized(indexMutex) {
257                location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
258                    @Override
259                    public Long execute(Transaction tx) throws IOException {
260                        StoredDestination sd = getStoredDestination(dest, tx);
261                        return sd.messageIdIndex.get(tx, key);
262                    }
263                });
264            }
265            if( location!=null ) {
266                cursorPos=location+1;
267            }
268
269        }
270
271        @Override
272        public void setMemoryUsage(MemoryUsage memoryUsage) {
273        }
274        @Override
275        public void start() throws Exception {
276        }
277        @Override
278        public void stop() throws Exception {
279        }
280
281        @Override
282        public void recoverMessageStoreStatistics() throws IOException {
283            int count = 0;
284            synchronized(indexMutex) {
285                count = pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
286                    @Override
287                    public Integer execute(Transaction tx) throws IOException {
288                        // Iterate through all index entries to get a count of messages in the destination.
289                        StoredDestination sd = getStoredDestination(dest, tx);
290                        int rc=0;
291                        for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
292                            iterator.next();
293                            rc++;
294                        }
295                        return rc;
296                    }
297                });
298            }
299            getMessageStoreStatistics().getMessageCount().setCount(count);
300        }
301
302    }
303
304    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
305        public KahaDBTopicMessageStore(ActiveMQTopic destination) {
306            super(destination);
307        }
308
309        @Override
310        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
311                                MessageId messageId, MessageAck ack) throws IOException {
312            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
313            command.setDestination(dest);
314            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
315            command.setMessageId(messageId.toProducerKey());
316            // We are not passed a transaction info.. so we can't participate in a transaction.
317            // Looks like a design issue with the TopicMessageStore interface.  Also we can't recover the original ack
318            // to pass back to the XA recover method.
319            // command.setTransactionInfo();
320            processRemove(command, null);
321        }
322
323        @Override
324        public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
325            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
326            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
327            command.setDestination(dest);
328            command.setSubscriptionKey(subscriptionKey);
329            command.setRetroactive(retroactive);
330            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
331            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
332            process(command);
333        }
334
335        @Override
336        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
337            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
338            command.setDestination(dest);
339            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
340            process(command);
341        }
342
343        @Override
344        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
345
346            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
347            synchronized(indexMutex) {
348                pageFile.tx().execute(new Transaction.Closure<IOException>(){
349                    @Override
350                    public void execute(Transaction tx) throws IOException {
351                        StoredDestination sd = getStoredDestination(dest, tx);
352                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
353                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
354                            SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) );
355                            subscriptions.add(info);
356
357                        }
358                    }
359                });
360            }
361
362            SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
363            subscriptions.toArray(rc);
364            return rc;
365        }
366
367        @Override
368        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
369            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
370            synchronized(indexMutex) {
371                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
372                    @Override
373                    public SubscriptionInfo execute(Transaction tx) throws IOException {
374                        StoredDestination sd = getStoredDestination(dest, tx);
375                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
376                        if( command ==null ) {
377                            return null;
378                        }
379                        return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) );
380                    }
381                });
382            }
383        }
384
385        @Override
386        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
387            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
388            synchronized(indexMutex) {
389                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
390                    @Override
391                    public Integer execute(Transaction tx) throws IOException {
392                        StoredDestination sd = getStoredDestination(dest, tx);
393                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
394                        if ( cursorPos==null ) {
395                            // The subscription might not exist.
396                            return 0;
397                        }
398                        cursorPos += 1;
399
400                        int counter = 0;
401                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
402                            iterator.next();
403                            counter++;
404                        }
405                        return counter;
406                    }
407                });
408            }
409        }
410
411        @Override
412        public long getMessageSize(String clientId, String subscriptionName) throws IOException {
413            return 0;
414        }
415
416        @Override
417        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
418            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
419            synchronized(indexMutex) {
420                pageFile.tx().execute(new Transaction.Closure<Exception>(){
421                    @Override
422                    public void execute(Transaction tx) throws Exception {
423                        StoredDestination sd = getStoredDestination(dest, tx);
424                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
425                        cursorPos += 1;
426
427                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
428                            Entry<Long, MessageRecord> entry = iterator.next();
429                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
430                        }
431                    }
432                });
433            }
434        }
435
436        @Override
437        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
438            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
439            synchronized(indexMutex) {
440                pageFile.tx().execute(new Transaction.Closure<Exception>(){
441                    @Override
442                    public void execute(Transaction tx) throws Exception {
443                        StoredDestination sd = getStoredDestination(dest, tx);
444                        Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
445                        if( cursorPos == null ) {
446                            cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
447                            cursorPos += 1;
448                        }
449
450                        Entry<Long, MessageRecord> entry=null;
451                        int counter = 0;
452                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
453                            entry = iterator.next();
454                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
455                            counter++;
456                            if( counter >= maxReturned ) {
457                                break;
458                            }
459                        }
460                        if( entry!=null ) {
461                            sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1);
462                        }
463                    }
464                });
465            }
466        }
467
468        @Override
469        public void resetBatching(String clientId, String subscriptionName) {
470            try {
471                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
472                synchronized(indexMutex) {
473                    pageFile.tx().execute(new Transaction.Closure<IOException>(){
474                        @Override
475                        public void execute(Transaction tx) throws IOException {
476                            StoredDestination sd = getStoredDestination(dest, tx);
477                            sd.subscriptionCursors.remove(subscriptionKey);
478                        }
479                    });
480                }
481            } catch (IOException e) {
482                throw new RuntimeException(e);
483            }
484        }
485    }
486
487    String subscriptionKey(String clientId, String subscriptionName){
488        return clientId+":"+subscriptionName;
489    }
490
491    @Override
492    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
493        return new KahaDBMessageStore(destination);
494    }
495
496    @Override
497    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
498        return new KahaDBTopicMessageStore(destination);
499    }
500
501    /**
502     * Cleanup method to remove any state associated with the given destination.
503     * This method does not stop the message store (it might not be cached).
504     *
505     * @param destination Destination to forget
506     */
507    @Override
508    public void removeQueueMessageStore(ActiveMQQueue destination) {
509    }
510
511    /**
512     * Cleanup method to remove any state associated with the given destination
513     * This method does not stop the message store (it might not be cached).
514     *
515     * @param destination Destination to forget
516     */
517    @Override
518    public void removeTopicMessageStore(ActiveMQTopic destination) {
519    }
520
521    @Override
522    public void deleteAllMessages() throws IOException {
523    }
524
525
526    @Override
527    public Set<ActiveMQDestination> getDestinations() {
528        try {
529            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
530            synchronized(indexMutex) {
531                pageFile.tx().execute(new Transaction.Closure<IOException>(){
532                    @Override
533                    public void execute(Transaction tx) throws IOException {
534                        for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) {
535                            Entry<String, StoredDestination> entry = iterator.next();
536                            rc.add(convert(entry.getKey()));
537                        }
538                    }
539                });
540            }
541            return rc;
542        } catch (IOException e) {
543            throw new RuntimeException(e);
544        }
545    }
546
547    @Override
548    public long getLastMessageBrokerSequenceId() throws IOException {
549        return 0;
550    }
551
552    @Override
553    public long size() {
554        if ( !started.get() ) {
555            return 0;
556        }
557        try {
558            return pageFile.getDiskSize();
559        } catch (IOException e) {
560            throw new RuntimeException(e);
561        }
562    }
563
564    @Override
565    public void beginTransaction(ConnectionContext context) throws IOException {
566        throw new IOException("Not yet implemented.");
567    }
568    @Override
569    public void commitTransaction(ConnectionContext context) throws IOException {
570        throw new IOException("Not yet implemented.");
571    }
572    @Override
573    public void rollbackTransaction(ConnectionContext context) throws IOException {
574        throw new IOException("Not yet implemented.");
575    }
576
577    @Override
578    public void checkpoint(boolean sync) throws IOException {
579    }
580
581    ///////////////////////////////////////////////////////////////////
582    // Internal conversion methods.
583    ///////////////////////////////////////////////////////////////////
584
585
586
587    KahaLocation convert(Location location) {
588        KahaLocation rc = new KahaLocation();
589        rc.setLogId(location.getDataFileId());
590        rc.setOffset(location.getOffset());
591        return rc;
592    }
593
594    KahaDestination convert(ActiveMQDestination dest) {
595        KahaDestination rc = new KahaDestination();
596        rc.setName(dest.getPhysicalName());
597        switch( dest.getDestinationType() ) {
598        case ActiveMQDestination.QUEUE_TYPE:
599            rc.setType(DestinationType.QUEUE);
600            return rc;
601        case ActiveMQDestination.TOPIC_TYPE:
602            rc.setType(DestinationType.TOPIC);
603            return rc;
604        case ActiveMQDestination.TEMP_QUEUE_TYPE:
605            rc.setType(DestinationType.TEMP_QUEUE);
606            return rc;
607        case ActiveMQDestination.TEMP_TOPIC_TYPE:
608            rc.setType(DestinationType.TEMP_TOPIC);
609            return rc;
610        default:
611            return null;
612        }
613    }
614
615    ActiveMQDestination convert(String dest) {
616        int p = dest.indexOf(":");
617        if( p<0 ) {
618            throw new IllegalArgumentException("Not in the valid destination format");
619        }
620        int type = Integer.parseInt(dest.substring(0, p));
621        String name = dest.substring(p+1);
622
623        switch( KahaDestination.DestinationType.valueOf(type) ) {
624        case QUEUE:
625            return new ActiveMQQueue(name);
626        case TOPIC:
627            return new ActiveMQTopic(name);
628        case TEMP_QUEUE:
629            return new ActiveMQTempQueue(name);
630        case TEMP_TOPIC:
631            return new ActiveMQTempTopic(name);
632        default:
633            throw new IllegalArgumentException("Not in the valid destination format");
634        }
635    }
636
637    @Override
638    public long getLastProducerSequenceId(ProducerId id) {
639        return -1;
640    }
641
642    @Override
643    public void setBrokerService(BrokerService brokerService) {
644        this.brokerService = brokerService;
645    }
646
647    @Override
648    public void load() throws IOException {
649        if( brokerService!=null ) {
650            wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
651        }
652        super.load();
653    }
654    @Override
655    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
656        throw new UnsupportedOperationException();
657    }
658}