001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.journal;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025
026/**
027 * Used to pool DataFileAccessors.
028 *
029 * @author chirino
030 */
031public class DataFileAccessorPool {
032
033    private final Journal journal;
034    private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>();
035    private boolean closed;
036    private int maxOpenReadersPerFile = 5;
037
038    class Pool {
039
040        private final DataFile file;
041        private final List<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
042        private boolean used;
043        private int openCounter;
044        private boolean disposed;
045
046        public Pool(DataFile file) {
047            this.file = file;
048        }
049
050        public DataFileAccessor openDataFileReader() throws IOException {
051            DataFileAccessor rc = null;
052            if (pool.isEmpty()) {
053                rc = new DataFileAccessor(journal, file);
054            } else {
055                rc = pool.remove(pool.size() - 1);
056            }
057            used = true;
058            openCounter++;
059            return rc;
060        }
061
062        public synchronized void closeDataFileReader(DataFileAccessor reader) {
063            openCounter--;
064            if (pool.size() >= maxOpenReadersPerFile || disposed) {
065                reader.dispose();
066            } else {
067                pool.add(reader);
068            }
069        }
070
071        public synchronized void clearUsedMark() {
072            used = false;
073        }
074
075        public synchronized boolean isUsed() {
076            return used;
077        }
078
079        public synchronized void dispose() {
080            for (DataFileAccessor reader : pool) {
081                reader.dispose();
082            }
083            pool.clear();
084            disposed = true;
085        }
086
087        public synchronized int getOpenCounter() {
088            return openCounter;
089        }
090
091    }
092
093    public DataFileAccessorPool(Journal dataManager) {
094        this.journal = dataManager;
095    }
096
097    synchronized void clearUsedMark() {
098        for (Pool pool : pools.values()) {
099            pool.clearUsedMark();
100        }
101    }
102
103    synchronized void disposeUnused() {
104        for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
105            Pool pool = iter.next();
106            if (!pool.isUsed()) {
107                pool.dispose();
108                iter.remove();
109            }
110        }
111    }
112
113    synchronized void disposeDataFileAccessors(DataFile dataFile) {
114        if (closed) {
115            throw new IllegalStateException("Closed.");
116        }
117        Pool pool = pools.get(dataFile.getDataFileId());
118        if (pool != null) {
119            if (pool.getOpenCounter() == 0) {
120                pool.dispose();
121                pools.remove(dataFile.getDataFileId());
122            } else {
123                throw new IllegalStateException("The data file is still in use: " + dataFile + ", use count: " + pool.getOpenCounter());
124            }
125        }
126    }
127
128    synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
129        if (closed) {
130            throw new IOException("Closed.");
131        }
132
133        Pool pool = pools.get(dataFile.getDataFileId());
134        if (pool == null) {
135            pool = new Pool(dataFile);
136            pools.put(dataFile.getDataFileId(), pool);
137        }
138        return pool.openDataFileReader();
139    }
140
141    synchronized void closeDataFileAccessor(DataFileAccessor reader) {
142        Pool pool = pools.get(reader.getDataFile().getDataFileId());
143        if (pool == null || closed) {
144            reader.dispose();
145        } else {
146            pool.closeDataFileReader(reader);
147        }
148    }
149
150    public synchronized void close() {
151        if (closed) {
152            return;
153        }
154        closed = true;
155        for (Pool pool : pools.values()) {
156            pool.dispose();
157        }
158        pools.clear();
159    }
160
161}