001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.FileFilter;
021import java.io.IOException;
022import java.nio.charset.Charset;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.CopyOnWriteArrayList;
030
031import javax.transaction.xa.Xid;
032
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.BrokerServiceAware;
035import org.apache.activemq.broker.ConnectionContext;
036import org.apache.activemq.broker.Lockable;
037import org.apache.activemq.broker.LockableServiceSupport;
038import org.apache.activemq.broker.Locker;
039import org.apache.activemq.broker.scheduler.JobSchedulerStore;
040import org.apache.activemq.command.ActiveMQDestination;
041import org.apache.activemq.command.ActiveMQQueue;
042import org.apache.activemq.command.ActiveMQTopic;
043import org.apache.activemq.command.LocalTransactionId;
044import org.apache.activemq.command.ProducerId;
045import org.apache.activemq.command.TransactionId;
046import org.apache.activemq.command.XATransactionId;
047import org.apache.activemq.filter.AnyDestination;
048import org.apache.activemq.filter.DestinationMap;
049import org.apache.activemq.filter.DestinationMapEntry;
050import org.apache.activemq.store.MessageStore;
051import org.apache.activemq.store.PersistenceAdapter;
052import org.apache.activemq.store.SharedFileLocker;
053import org.apache.activemq.store.TopicMessageStore;
054import org.apache.activemq.store.TransactionIdTransformer;
055import org.apache.activemq.store.TransactionIdTransformerAware;
056import org.apache.activemq.store.TransactionStore;
057import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
058import org.apache.activemq.usage.SystemUsage;
059import org.apache.activemq.util.IOExceptionSupport;
060import org.apache.activemq.util.IOHelper;
061import org.apache.activemq.util.IntrospectionSupport;
062import org.apache.activemq.util.ServiceStopper;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * An implementation of {@link org.apache.activemq.store.PersistenceAdapter}  that supports
068 * distribution of destinations across multiple kahaDB persistence adapters
069 *
070 * @org.apache.xbean.XBean element="mKahaDB"
071 */
072public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, BrokerServiceAware {
073    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
074
075    final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
076    final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
077
078    final class DelegateDestinationMap extends DestinationMap {
079        @Override
080        public void setEntries(List<DestinationMapEntry>  entries) {
081            super.setEntries(entries);
082        }
083    };
084    final DelegateDestinationMap destinationMap = new DelegateDestinationMap();
085
086    List<PersistenceAdapter> adapters = new CopyOnWriteArrayList<PersistenceAdapter>();
087    private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
088
089    MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
090
091    // all local store transactions are XA, 2pc if more than one adapter involved
092    TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
093        @Override
094        public TransactionId transform(TransactionId txid) {
095            if (txid == null) {
096                return null;
097            }
098            if (txid.isLocalTransaction()) {
099                final LocalTransactionId t = (LocalTransactionId) txid;
100                return new XATransactionId(new Xid() {
101                    @Override
102                    public int getFormatId() {
103                        return LOCAL_FORMAT_ID_MAGIC;
104                    }
105
106                    @Override
107                    public byte[] getGlobalTransactionId() {
108                        return t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"));
109                    }
110
111                    @Override
112                    public byte[] getBranchQualifier() {
113                        return Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"));
114                    }
115                });
116            } else {
117                return txid;
118            }
119        }
120    };
121
122    /**
123     * Sets the  FilteredKahaDBPersistenceAdapter entries
124     *
125     * @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter"
126     */
127    @SuppressWarnings({ "rawtypes", "unchecked" })
128    public void setFilteredPersistenceAdapters(List entries) {
129        for (Object entry : entries) {
130            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
131            PersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
132            if (filteredAdapter.getDestination() == null) {
133                filteredAdapter.setDestination(matchAll);
134            }
135
136            if (filteredAdapter.isPerDestination()) {
137                configureDirectory(adapter, null);
138                // per destination adapters will be created on demand or during recovery
139                continue;
140            } else {
141                configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination()));
142            }
143
144            configureAdapter(adapter);
145            adapters.add(adapter);
146        }
147        destinationMap.setEntries(entries);
148    }
149
150    private String nameFromDestinationFilter(ActiveMQDestination destination) {
151        if (destination.getQualifiedName().length() > IOHelper.getMaxFileNameLength()) {
152            LOG.warn("Destination name is longer than 'MaximumFileNameLength' system property, " +
153                     "potential problem with recovery can result from name truncation.");
154        }
155
156        return IOHelper.toFileSystemSafeName(destination.getQualifiedName());
157    }
158
159    public boolean isLocalXid(TransactionId xid) {
160        return xid instanceof XATransactionId &&
161                ((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC;
162    }
163
164    @Override
165    public void beginTransaction(ConnectionContext context) throws IOException {
166        throw new IllegalStateException();
167    }
168
169    @Override
170    public void checkpoint(final boolean sync) throws IOException {
171        for (PersistenceAdapter persistenceAdapter : adapters) {
172            persistenceAdapter.checkpoint(sync);
173        }
174    }
175
176    @Override
177    public void commitTransaction(ConnectionContext context) throws IOException {
178        throw new IllegalStateException();
179    }
180
181    @Override
182    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
183        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
184        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
185    }
186
187    private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) throws IOException {
188        Object result = destinationMap.chooseValue(destination);
189        if (result == null) {
190            throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
191        }
192        FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
193        if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
194            filteredAdapter = addAdapter(filteredAdapter, destination);
195            if (LOG.isTraceEnabled()) {
196                LOG.info("created per destination adapter for: " + destination  + ", " + result);
197            }
198        }
199        startAdapter(filteredAdapter.getPersistenceAdapter(), destination.getQualifiedName());
200        LOG.debug("destination {} matched persistence adapter {}", new Object[]{destination.getQualifiedName(), filteredAdapter.getPersistenceAdapter()});
201        return filteredAdapter.getPersistenceAdapter();
202    }
203
204    private void startAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
205        try {
206            kahaDBPersistenceAdapter.start();
207        } catch (Exception e) {
208            RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
209            LOG.error(detail.toString(), e);
210            throw detail;
211        }
212    }
213
214    private void stopAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
215        try {
216            kahaDBPersistenceAdapter.stop();
217        } catch (Exception e) {
218            RuntimeException detail = new RuntimeException("Failed to stop per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
219            LOG.error(detail.toString(), e);
220            throw detail;
221        }
222    }
223
224    @Override
225    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
226        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
227        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
228    }
229
230    @Override
231    public TransactionStore createTransactionStore() throws IOException {
232        return transactionStore;
233    }
234
235    @Override
236    public void deleteAllMessages() throws IOException {
237        for (PersistenceAdapter persistenceAdapter : adapters) {
238            persistenceAdapter.deleteAllMessages();
239        }
240        transactionStore.deleteAllMessages();
241        IOHelper.deleteChildren(getDirectory());
242    }
243
244    @Override
245    public Set<ActiveMQDestination> getDestinations() {
246        Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>();
247        for (PersistenceAdapter persistenceAdapter : adapters) {
248            results.addAll(persistenceAdapter.getDestinations());
249        }
250        return results;
251    }
252
253    @Override
254    public long getLastMessageBrokerSequenceId() throws IOException {
255        long maxId = -1;
256        for (PersistenceAdapter persistenceAdapter : adapters) {
257            maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId());
258        }
259        return maxId;
260    }
261
262    @Override
263    public long getLastProducerSequenceId(ProducerId id) throws IOException {
264        long maxId = -1;
265        for (PersistenceAdapter persistenceAdapter : adapters) {
266            maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id));
267        }
268        return maxId;
269    }
270
271    @Override
272    public void removeQueueMessageStore(ActiveMQQueue destination) {
273        PersistenceAdapter adapter = null;
274        try {
275            adapter = getMatchingPersistenceAdapter(destination);
276        } catch (IOException e) {
277            throw new RuntimeException(e);
278        }
279        if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty()) {
280            adapter.removeQueueMessageStore(destination);
281            removeMessageStore(adapter, destination);
282            destinationMap.removeAll(destination);
283        }
284    }
285
286    @Override
287    public void removeTopicMessageStore(ActiveMQTopic destination) {
288        PersistenceAdapter adapter = null;
289        try {
290            adapter = getMatchingPersistenceAdapter(destination);
291        } catch (IOException e) {
292            throw new RuntimeException(e);
293        }
294        if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty()) {
295            adapter.removeTopicMessageStore(destination);
296            removeMessageStore(adapter, destination);
297            destinationMap.removeAll(destination);
298        }
299    }
300
301    private void removeMessageStore(PersistenceAdapter adapter, ActiveMQDestination destination) {
302        stopAdapter(adapter, destination.toString());
303        File adapterDir = adapter.getDirectory();
304        if (adapterDir != null) {
305            if (IOHelper.deleteFile(adapterDir)) {
306                if (LOG.isTraceEnabled()) {
307                    LOG.info("deleted per destination adapter directory for: " + destination);
308                }
309            } else {
310                if (LOG.isTraceEnabled()) {
311                    LOG.info("failed to deleted per destination adapter directory for: " + destination);
312                }
313            }
314        }
315    }
316
317    @Override
318    public void rollbackTransaction(ConnectionContext context) throws IOException {
319        throw new IllegalStateException();
320    }
321
322    @Override
323    public void setBrokerName(String brokerName) {
324        for (PersistenceAdapter persistenceAdapter : adapters) {
325            persistenceAdapter.setBrokerName(brokerName);
326        }
327    }
328
329    @Override
330    public void setUsageManager(SystemUsage usageManager) {
331        for (PersistenceAdapter persistenceAdapter : adapters) {
332            persistenceAdapter.setUsageManager(usageManager);
333        }
334    }
335
336    @Override
337    public long size() {
338        long size = 0;
339        for (PersistenceAdapter persistenceAdapter : adapters) {
340            size += persistenceAdapter.size();
341        }
342        return size;
343    }
344
345    @Override
346    public void doStart() throws Exception {
347        Object result = destinationMap.chooseValue(matchAll);
348        if (result != null) {
349            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
350            if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
351                findAndRegisterExistingAdapters(filteredAdapter);
352            }
353        }
354        for (PersistenceAdapter persistenceAdapter : adapters) {
355            persistenceAdapter.start();
356        }
357    }
358
359    private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) throws IOException {
360        FileFilter destinationNames = new FileFilter() {
361            @Override
362            public boolean accept(File file) {
363                return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
364            }
365        };
366        File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
367        if (candidates != null) {
368            for (File candidate : candidates) {
369                registerExistingAdapter(template, candidate);
370            }
371        }
372    }
373
374    private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) throws IOException {
375        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName());
376        startAdapter(adapter, candidate.getName());
377        Set<ActiveMQDestination> destinations = adapter.getDestinations();
378        if (destinations.size() != 0) {
379            registerAdapter(adapter, destinations.toArray(new ActiveMQDestination[]{})[0]);
380        } else {
381            stopAdapter(adapter, candidate.getName());
382        }
383    }
384
385    private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) throws IOException {
386        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), nameFromDestinationFilter(destination));
387        return registerAdapter(adapter, destination);
388    }
389
390    private PersistenceAdapter adapterFromTemplate(PersistenceAdapter template, String destinationName) throws IOException {
391        PersistenceAdapter adapter = kahaDBFromTemplate(template);
392        configureAdapter(adapter);
393        configureDirectory(adapter, destinationName);
394        return adapter;
395    }
396
397    private void configureDirectory(PersistenceAdapter adapter, String fileName) {
398        File directory = null;
399        File defaultDir = MessageDatabase.DEFAULT_DIRECTORY;
400        try {
401            defaultDir = adapter.getClass().newInstance().getDirectory();
402        } catch (Exception e) {
403        }
404        if (defaultDir.equals(adapter.getDirectory())) {
405            // not set so inherit from mkahadb
406            directory = getDirectory();
407        } else {
408            directory = adapter.getDirectory();
409        }
410
411        if (fileName != null) {
412            directory = new File(directory, fileName);
413        }
414        adapter.setDirectory(directory);
415    }
416
417    private FilteredKahaDBPersistenceAdapter registerAdapter(PersistenceAdapter adapter, ActiveMQDestination destination) {
418        adapters.add(adapter);
419        FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination, adapter);
420        destinationMap.put(destination, result);
421        return result;
422    }
423
424    private void configureAdapter(PersistenceAdapter adapter) {
425        // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
426        ((TransactionIdTransformerAware)adapter).setTransactionIdTransformer(transactionIdTransformer);
427        if (isUseLock()) {
428            if( adapter instanceof Lockable ) {
429                ((Lockable)adapter).setUseLock(false);
430            }
431        }
432        if( adapter instanceof BrokerServiceAware ) {
433            ((BrokerServiceAware)adapter).setBrokerService(getBrokerService());
434        }
435    }
436
437    private PersistenceAdapter kahaDBFromTemplate(PersistenceAdapter template) throws IOException {
438        try {
439            Map<String, Object> configuration = new HashMap<String, Object>();
440            IntrospectionSupport.getProperties(template, configuration, null);
441            PersistenceAdapter adapter = template.getClass().newInstance();
442            IntrospectionSupport.setProperties(adapter, configuration);
443            return adapter;
444        } catch (Exception e) {
445            throw IOExceptionSupport.create(e);
446        }
447    }
448
449    @Override
450    protected void doStop(ServiceStopper stopper) throws Exception {
451        for (PersistenceAdapter persistenceAdapter : adapters) {
452            stopper.stop(persistenceAdapter);
453        }
454    }
455
456    @Override
457    public File getDirectory() {
458        return this.directory;
459    }
460
461    @Override
462    public void setDirectory(File directory) {
463        this.directory = directory;
464    }
465
466    @Override
467    public void init() throws Exception {
468    }
469
470    @Override
471    public void setBrokerService(BrokerService brokerService) {
472        super.setBrokerService(brokerService);
473        for (PersistenceAdapter persistenceAdapter : adapters) {
474            if( persistenceAdapter instanceof BrokerServiceAware ) {
475                ((BrokerServiceAware)persistenceAdapter).setBrokerService(getBrokerService());
476            }
477        }
478    }
479
480    public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) {
481        this.transactionStore = transactionStore;
482    }
483
484    /**
485     * Set the max file length of the transaction journal
486     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
487     * be used
488     *
489     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
490     */
491    public void setJournalMaxFileLength(int maxFileLength) {
492        transactionStore.setJournalMaxFileLength(maxFileLength);
493    }
494
495    public int getJournalMaxFileLength() {
496        return transactionStore.getJournalMaxFileLength();
497    }
498
499    /**
500     * Set the max write batch size of  the transaction journal
501     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
502     * be used
503     *
504     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
505     */
506    public void setJournalWriteBatchSize(int journalWriteBatchSize) {
507        transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
508    }
509
510    public int getJournalWriteBatchSize() {
511        return transactionStore.getJournalMaxWriteBatchSize();
512    }
513
514    public List<PersistenceAdapter> getAdapters() {
515        return Collections.unmodifiableList(adapters);
516    }
517
518    @Override
519    public String toString() {
520        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
521        return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
522    }
523
524    @Override
525    public Locker createDefaultLocker() throws IOException {
526        SharedFileLocker locker = new SharedFileLocker();
527        locker.configure(this);
528        return locker;
529    }
530
531    @Override
532    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
533        return new JobSchedulerStoreImpl();
534    }
535}