001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.plist;
018
019import org.apache.activemq.broker.BrokerService;
020import org.apache.activemq.broker.BrokerServiceAware;
021import org.apache.activemq.openwire.OpenWireFormat;
022import org.apache.activemq.store.JournaledStore;
023import org.apache.activemq.store.PList;
024import org.apache.activemq.store.PListStore;
025import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
026import org.apache.activemq.store.kahadb.disk.journal.Journal;
027import org.apache.activemq.store.kahadb.disk.journal.Location;
028import org.apache.activemq.store.kahadb.disk.page.Page;
029import org.apache.activemq.store.kahadb.disk.page.PageFile;
030import org.apache.activemq.store.kahadb.disk.page.Transaction;
031import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
032import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
033import org.apache.activemq.thread.Scheduler;
034import org.apache.activemq.util.*;
035import org.apache.activemq.wireformat.WireFormat;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import java.io.DataInput;
040import java.io.DataOutput;
041import java.io.File;
042import java.io.IOException;
043import java.util.*;
044import java.util.Map.Entry;
045
046/**
047 * @org.apache.xbean.XBean
048 */
049public class PListStoreImpl extends ServiceSupport implements BrokerServiceAware, Runnable, PListStore, JournaledStore {
050    static final Logger LOG = LoggerFactory.getLogger(PListStoreImpl.class);
051    private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
052
053    static final int CLOSED_STATE = 1;
054    static final int OPEN_STATE = 2;
055
056    private File directory;
057    PageFile pageFile;
058    private Journal journal;
059    private LockFile lockFile;
060    private boolean failIfDatabaseIsLocked;
061    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
062    private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
063    private boolean enableIndexWriteAsync = false;
064    private boolean initialized = false;
065    private boolean lazyInit = true;
066    // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
067    MetaData metaData = new MetaData(this);
068    final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
069    Map<String, PListImpl> persistentLists = new HashMap<String, PListImpl>();
070    final Object indexLock = new Object();
071    private Scheduler scheduler;
072    private long cleanupInterval = 30000;
073
074    private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE;
075    private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE;
076    private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
077    private boolean indexEnablePageCaching = true;
078
079    public Object getIndexLock() {
080        return indexLock;
081    }
082
083    @Override
084    public void setBrokerService(BrokerService brokerService) {
085        this.scheduler = brokerService.getScheduler();
086    }
087
088    public int getIndexPageSize() {
089        return indexPageSize;
090    }
091
092    public int getIndexCacheSize() {
093        return indexCacheSize;
094    }
095
096    public int getIndexWriteBatchSize() {
097        return indexWriteBatchSize;
098    }
099
100    public void setIndexPageSize(int indexPageSize) {
101        this.indexPageSize = indexPageSize;
102    }
103
104    public void setIndexCacheSize(int indexCacheSize) {
105        this.indexCacheSize = indexCacheSize;
106    }
107
108    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
109        this.indexWriteBatchSize = indexWriteBatchSize;
110    }
111
112    public boolean getIndexEnablePageCaching() {
113        return indexEnablePageCaching;
114    }
115
116    public void setIndexEnablePageCaching(boolean indexEnablePageCaching) {
117        this.indexEnablePageCaching = indexEnablePageCaching;
118    }
119
120    protected class MetaData {
121        protected MetaData(PListStoreImpl store) {
122            this.store = store;
123        }
124
125        private final PListStoreImpl store;
126        Page<MetaData> page;
127        BTreeIndex<String, PListImpl> lists;
128
129        void createIndexes(Transaction tx) throws IOException {
130            this.lists = new BTreeIndex<String, PListImpl>(pageFile, tx.allocate().getPageId());
131        }
132
133        void load(Transaction tx) throws IOException {
134            this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
135            this.lists.setValueMarshaller(new PListMarshaller(this.store));
136            this.lists.load(tx);
137        }
138
139        void loadLists(Transaction tx, Map<String, PListImpl> lists) throws IOException {
140            for (Iterator<Entry<String, PListImpl>> i = this.lists.iterator(tx); i.hasNext();) {
141                Entry<String, PListImpl> entry = i.next();
142                entry.getValue().load(tx);
143                lists.put(entry.getKey(), entry.getValue());
144            }
145        }
146
147        public void read(DataInput is) throws IOException {
148            this.lists = new BTreeIndex<String, PListImpl>(pageFile, is.readLong());
149            this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
150            this.lists.setValueMarshaller(new PListMarshaller(this.store));
151        }
152
153        public void write(DataOutput os) throws IOException {
154            os.writeLong(this.lists.getPageId());
155        }
156    }
157
158    class MetaDataMarshaller extends VariableMarshaller<MetaData> {
159        private final PListStoreImpl store;
160
161        MetaDataMarshaller(PListStoreImpl store) {
162            this.store = store;
163        }
164        public MetaData readPayload(DataInput dataIn) throws IOException {
165            MetaData rc = new MetaData(this.store);
166            rc.read(dataIn);
167            return rc;
168        }
169
170        public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
171            object.write(dataOut);
172        }
173    }
174
175    class PListMarshaller extends VariableMarshaller<PListImpl> {
176        private final PListStoreImpl store;
177        PListMarshaller(PListStoreImpl store) {
178            this.store = store;
179        }
180        public PListImpl readPayload(DataInput dataIn) throws IOException {
181            PListImpl result = new PListImpl(this.store);
182            result.read(dataIn);
183            return result;
184        }
185
186        public void writePayload(PListImpl list, DataOutput dataOut) throws IOException {
187            list.write(dataOut);
188        }
189    }
190
191    public Journal getJournal() {
192        return this.journal;
193    }
194
195    @Override
196    public File getDirectory() {
197        return directory;
198    }
199
200    @Override
201    public void setDirectory(File directory) {
202        this.directory = directory;
203    }
204
205    public long size() {
206        synchronized (this) {
207            if (!initialized) {
208                return 0;
209            }
210        }
211        try {
212            return journal.getDiskSize() + pageFile.getDiskSize();
213        } catch (IOException e) {
214            throw new RuntimeException(e);
215        }
216    }
217
218    @Override
219    public PListImpl getPList(final String name) throws Exception {
220        if (!isStarted()) {
221            throw new IllegalStateException("Not started");
222        }
223        intialize();
224        synchronized (indexLock) {
225            synchronized (this) {
226                PListImpl result = this.persistentLists.get(name);
227                if (result == null) {
228                    final PListImpl pl = new PListImpl(this);
229                    pl.setName(name);
230                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
231                        public void execute(Transaction tx) throws IOException {
232                            pl.setHeadPageId(tx.allocate().getPageId());
233                            pl.load(tx);
234                            metaData.lists.put(tx, name, pl);
235                        }
236                    });
237                    result = pl;
238                    this.persistentLists.put(name, pl);
239                }
240                final PListImpl toLoad = result;
241                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
242                    public void execute(Transaction tx) throws IOException {
243                        toLoad.load(tx);
244                    }
245                });
246
247                return result;
248            }
249        }
250    }
251
252    @Override
253    public boolean removePList(final String name) throws Exception {
254        boolean result = false;
255        synchronized (indexLock) {
256            synchronized (this) {
257                final PList pl = this.persistentLists.remove(name);
258                result = pl != null;
259                if (result) {
260                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
261                        public void execute(Transaction tx) throws IOException {
262                            metaData.lists.remove(tx, name);
263                            pl.destroy();
264                        }
265                    });
266                }
267            }
268        }
269        return result;
270    }
271
272    protected synchronized void intialize() throws Exception {
273        if (isStarted()) {
274            if (this.initialized == false) {
275                if (this.directory == null) {
276                    this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
277                }
278                IOHelper.mkdirs(this.directory);
279                IOHelper.deleteChildren(this.directory);
280                lock();
281                this.journal = new Journal();
282                this.journal.setDirectory(directory);
283                this.journal.setMaxFileLength(getJournalMaxFileLength());
284                this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
285                this.journal.start();
286                this.pageFile = new PageFile(directory, "tmpDB");
287                this.pageFile.setEnablePageCaching(getIndexEnablePageCaching());
288                this.pageFile.setPageSize(getIndexPageSize());
289                this.pageFile.setWriteBatchSize(getIndexWriteBatchSize());
290                this.pageFile.setPageCacheSize(getIndexCacheSize());
291                this.pageFile.load();
292
293                this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
294                    public void execute(Transaction tx) throws IOException {
295                        if (pageFile.getPageCount() == 0) {
296                            Page<MetaData> page = tx.allocate();
297                            assert page.getPageId() == 0;
298                            page.set(metaData);
299                            metaData.page = page;
300                            metaData.createIndexes(tx);
301                            tx.store(metaData.page, metaDataMarshaller, true);
302
303                        } else {
304                            Page<MetaData> page = tx.load(0, metaDataMarshaller);
305                            metaData = page.get();
306                            metaData.page = page;
307                        }
308                        metaData.load(tx);
309                        metaData.loadLists(tx, persistentLists);
310                    }
311                });
312                this.pageFile.flush();
313
314                if (cleanupInterval > 0) {
315                    if (scheduler == null) {
316                        scheduler = new Scheduler(PListStoreImpl.class.getSimpleName());
317                        scheduler.start();
318                    }
319                    scheduler.executePeriodically(this, cleanupInterval);
320                }
321                this.initialized = true;
322                LOG.info(this + " initialized");
323            }
324        }
325    }
326
327    @Override
328    protected synchronized void doStart() throws Exception {
329        if (!lazyInit) {
330            intialize();
331        }
332        LOG.info(this + " started");
333    }
334
335    @Override
336    protected synchronized void doStop(ServiceStopper stopper) throws Exception {
337        if (scheduler != null) {
338            if (PListStoreImpl.class.getSimpleName().equals(scheduler.getName())) {
339                scheduler.stop();
340                scheduler = null;
341            }
342        }
343        for (PListImpl pl : this.persistentLists.values()) {
344            pl.unload(null);
345        }
346        if (this.pageFile != null) {
347            this.pageFile.unload();
348        }
349        if (this.journal != null) {
350            journal.close();
351        }
352        if (this.lockFile != null) {
353            this.lockFile.unlock();
354        }
355        this.lockFile = null;
356        this.initialized = false;
357        LOG.info(this + " stopped");
358
359    }
360
361    public void run() {
362        try {
363            if (isStopping()) {
364                return;
365            }
366            final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId();
367            final Set<Integer> candidates = journal.getFileMap().keySet();
368            LOG.trace("Full gc candidate set:" + candidates);
369            if (candidates.size() > 1) {
370                // prune current write
371                for (Iterator<Integer> iterator = candidates.iterator(); iterator.hasNext();) {
372                    if (iterator.next() >= lastJournalFileId) {
373                        iterator.remove();
374                    }
375                }
376                List<PListImpl> plists = null;
377                synchronized (indexLock) {
378                    synchronized (this) {
379                        plists = new ArrayList<PListImpl>(persistentLists.values());
380                    }
381                }
382                for (PListImpl list : plists) {
383                    list.claimFileLocations(candidates);
384                    if (isStopping()) {
385                        return;
386                    }
387                    LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates);
388                }
389                LOG.trace("GC Candidate set:" + candidates);
390                this.journal.removeDataFiles(candidates);
391            }
392        } catch (IOException e) {
393            LOG.error("Exception on periodic cleanup: " + e, e);
394        }
395    }
396
397    ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
398        ByteSequence result = null;
399        result = this.journal.read(location);
400        return result;
401    }
402
403    Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
404        return this.journal.write(payload, sync);
405    }
406
407    private void lock() throws IOException {
408        if (lockFile == null) {
409            File lockFileName = new File(directory, "lock");
410            lockFile = new LockFile(lockFileName, true);
411            if (failIfDatabaseIsLocked) {
412                lockFile.lock();
413            } else {
414                while (true) {
415                    try {
416                        lockFile.lock();
417                        break;
418                    } catch (IOException e) {
419                        LOG.info("Database " + lockFileName + " is locked... waiting "
420                                + (DATABASE_LOCKED_WAIT_DELAY / 1000)
421                                + " seconds for the database to be unlocked. Reason: " + e);
422                        try {
423                            Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
424                        } catch (InterruptedException e1) {
425                        }
426                    }
427                }
428            }
429        }
430    }
431
432    PageFile getPageFile() {
433        this.pageFile.isLoaded();
434        return this.pageFile;
435    }
436
437    public boolean isFailIfDatabaseIsLocked() {
438        return failIfDatabaseIsLocked;
439    }
440
441    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
442        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
443    }
444
445    public int getJournalMaxFileLength() {
446        return journalMaxFileLength;
447    }
448
449    public void setJournalMaxFileLength(int journalMaxFileLength) {
450        this.journalMaxFileLength = journalMaxFileLength;
451    }
452
453    public int getJournalMaxWriteBatchSize() {
454        return journalMaxWriteBatchSize;
455    }
456
457    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
458        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
459    }
460
461    public boolean isEnableIndexWriteAsync() {
462        return enableIndexWriteAsync;
463    }
464
465    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
466        this.enableIndexWriteAsync = enableIndexWriteAsync;
467    }
468
469    public long getCleanupInterval() {
470        return cleanupInterval;
471    }
472
473    public void setCleanupInterval(long cleanupInterval) {
474        this.cleanupInterval = cleanupInterval;
475    }
476
477    public boolean isLazyInit() {
478        return lazyInit;
479    }
480
481    public void setLazyInit(boolean lazyInit) {
482        this.lazyInit = lazyInit;
483    }
484
485    @Override
486    public String toString() {
487        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
488        return "PListStore:[" + path + "]";
489    }
490}