public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware
Modifier and Type | Class and Description |
---|---|
protected class |
MessageDatabase.LastAckMarshaller |
protected class |
MessageDatabase.MessageKeysMarshaller |
protected class |
MessageDatabase.Metadata |
protected class |
MessageDatabase.StoredDestinationMarshaller |
Modifier and Type | Field and Description |
---|---|
protected Set<String> |
ackedAndPrepared |
protected boolean |
archiveDataLogs |
protected BrokerService |
brokerService |
protected Thread |
checkpointThread |
static File |
DEFAULT_DIRECTORY |
protected boolean |
deleteAllMessages |
protected File |
directory |
protected File |
directoryArchive |
protected boolean |
enableJournalDiskSyncs |
protected boolean |
failIfDatabaseIsLocked |
protected boolean |
forceRecoverIndex |
protected File |
indexDirectory |
protected ReentrantReadWriteLock |
indexLock |
protected Journal |
journal |
protected AtomicLong |
journalSize |
static int |
LOG_SLOW_ACCESS_TIME |
protected MessageDatabase.Metadata |
metadata |
protected org.apache.activemq.store.kahadb.MessageDatabase.MetadataMarshaller |
metadataMarshaller |
protected AtomicBoolean |
opened |
protected PageFile |
pageFile |
protected LinkedHashMap<TransactionId,List<org.apache.activemq.store.kahadb.MessageDatabase.Operation>> |
preparedTransactions |
static String |
PROPERTY_LOG_SLOW_ACCESS_TIME |
protected Set<String> |
rolledBackAcks |
protected ConcurrentMap<String,MessageStore> |
storeCache
This is a map to cache DestinationStatistics for a specific
KahaDestination key
|
protected HashMap<String,org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination> |
storedDestinations |
protected static org.apache.activemq.protobuf.Buffer |
UNMATCHED |
Constructor and Description |
---|
MessageDatabase() |
Modifier and Type | Method and Description |
---|---|
protected void |
checkpointCleanup(boolean cleanup) |
protected void |
clearStoreStats(KahaDestination kahaDestination)
Clear the counter for the destination, if one exists.
|
void |
close() |
protected abstract void |
configureMetadata() |
protected void |
decrementAndSubSizeToStoreStat(KahaDestination kahaDestination,
long size) |
protected void |
decrementAndSubSizeToStoreStat(String kahaDestKey,
long size) |
void |
doStart() |
void |
doStop(ServiceStopper stopper) |
void |
forgetRecoveredAcks(ArrayList<MessageAck> acks,
boolean rollback) |
long |
getCheckpointInterval() |
long |
getCleanupInterval() |
File |
getDirectory() |
File |
getDirectoryArchive() |
protected org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination |
getExistingStoredDestination(KahaDestination destination,
Transaction tx) |
int |
getFailoverProducersAuditDepth() |
int |
getIndexCacheSize() |
File |
getIndexDirectory() |
float |
getIndexLFUEvictionFactor() |
int |
getIndexWriteBatchSize() |
Location[] |
getInProgressTxLocationRange() |
Journal |
getJournal() |
HashSet<Integer> |
getJournalFilesBeingReplicated() |
int |
getJournalMaxFileLength() |
int |
getJournalMaxWriteBatchSize() |
org.apache.activemq.store.kahadb.MessageDatabase.LastAck |
getLastAck(Transaction tx,
org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
String subscriptionKey) |
Location |
getLastUpdatePosition() |
int |
getMaxFailoverProducersToTrack() |
PageFile |
getPageFile() |
String |
getPreallocationScope() |
String |
getPreallocationStrategy() |
protected org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination |
getStoredDestination(KahaDestination destination,
Transaction tx) |
long |
getStoredMessageCount(Transaction tx,
org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
String subscriptionKey) |
long |
getStoredMessageSize(Transaction tx,
org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
String subscriptionKey) |
protected MessageStoreStatistics |
getStoreStats(String kahaDestKey)
Locate the storeMessageSize counter for this KahaDestination
|
String |
getTransactions() |
void |
incrementalRecover() |
protected void |
incrementAndAddSizeToStoreStat(KahaDestination kahaDestination,
long size)
Update MessageStoreStatistics
|
protected void |
incrementAndAddSizeToStoreStat(String kahaDestKey,
long size) |
boolean |
isArchiveCorruptedIndex() |
boolean |
isArchiveDataLogs() |
boolean |
isCheckForCorruptJournalFiles() |
boolean |
isChecksumJournalFiles() |
boolean |
isDeleteAllMessages() |
boolean |
isEnableIndexDiskSyncs() |
boolean |
isEnableIndexPageCaching() |
boolean |
isEnableIndexRecoveryFile() |
boolean |
isEnableJournalDiskSyncs() |
boolean |
isFailIfDatabaseIsLocked() |
boolean |
isIgnoreMissingJournalfiles() |
boolean |
isUseIndexLFRUEviction() |
protected String |
key(KahaDestination destination) |
void |
load() |
JournalCommand<?> |
load(Location location)
Loads a previously stored JournalMessage
|
protected boolean |
matchType(Destination destination,
KahaDestination.DestinationType type)
Determine whether this Destination matches the DestinationType
|
void |
open() |
protected void |
process(KahaAddMessageCommand command,
Location location,
org.apache.activemq.store.kahadb.MessageDatabase.IndexAware runWithIndexLock) |
protected void |
process(KahaCommitCommand command,
Location location,
org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before) |
protected void |
process(KahaPrepareCommand command,
Location location) |
protected void |
process(KahaRemoveDestinationCommand command,
Location location) |
protected void |
process(KahaRemoveMessageCommand command,
Location location) |
protected void |
process(KahaRollbackCommand command,
Location location) |
protected void |
process(KahaSubscriptionCommand command,
Location location) |
protected void |
process(KahaUpdateMessageCommand command,
Location location) |
protected void |
processLocation(Location location) |
protected void |
recoverIndex(Transaction tx) |
void |
setArchiveCorruptedIndex(boolean archiveCorruptedIndex) |
void |
setArchiveDataLogs(boolean archiveDataLogs) |
void |
setBrokerService(BrokerService brokerService) |
void |
setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) |
void |
setCheckpointInterval(long checkpointInterval) |
void |
setChecksumJournalFiles(boolean checksumJournalFiles) |
void |
setCleanupInterval(long cleanupInterval) |
void |
setDeleteAllMessages(boolean deleteAllMessages) |
void |
setDirectory(File directory) |
void |
setDirectoryArchive(File directoryArchive) |
void |
setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) |
void |
setEnableIndexPageCaching(boolean enableIndexPageCaching) |
void |
setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) |
void |
setEnableIndexWriteAsync(boolean enableIndexWriteAsync) |
void |
setEnableJournalDiskSyncs(boolean syncWrites) |
void |
setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) |
void |
setFailoverProducersAuditDepth(int failoverProducersAuditDepth) |
void |
setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) |
void |
setIndexCacheSize(int indexCacheSize) |
void |
setIndexDirectory(File indexDirectory) |
void |
setIndexLFUEvictionFactor(float indexLFUEvictionFactor) |
void |
setIndexWriteBatchSize(int setIndexWriteBatchSize) |
void |
setJournalMaxFileLength(int journalMaxFileLength) |
void |
setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) |
void |
setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) |
void |
setPreallocationScope(String preallocationScope) |
void |
setPreallocationStrategy(String preallocationStrategy) |
void |
setUseIndexLFRUEviction(boolean useIndexLFRUEviction) |
Location |
store(JournalCommand<?> data) |
Location |
store(JournalCommand<?> data,
boolean sync,
org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before,
Runnable after) |
Location |
store(JournalCommand<?> data,
boolean sync,
org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before,
Runnable after,
Runnable onJournalStoreComplete)
All updated are are funneled through this method.
|
Location |
store(JournalCommand<?> data,
Runnable onJournalStoreComplete) |
ByteSequence |
toByteSequence(JournalCommand<?> data) |
void |
trackRecoveredAcks(ArrayList<MessageAck> acks) |
void |
unload() |
addServiceListener, dispose, isStarted, isStopped, isStopping, postStop, preStart, removeServiceListener, start, stop
protected BrokerService brokerService
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME
public static final int LOG_SLOW_ACCESS_TIME
public static final File DEFAULT_DIRECTORY
protected static final org.apache.activemq.protobuf.Buffer UNMATCHED
protected MessageDatabase.Metadata metadata
protected org.apache.activemq.store.kahadb.MessageDatabase.MetadataMarshaller metadataMarshaller
protected boolean failIfDatabaseIsLocked
protected boolean deleteAllMessages
protected File indexDirectory
protected Thread checkpointThread
protected boolean enableJournalDiskSyncs
protected boolean archiveDataLogs
protected File directoryArchive
protected AtomicLong journalSize
protected AtomicBoolean opened
protected boolean forceRecoverIndex
protected final ReentrantReadWriteLock indexLock
protected final HashMap<String,org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination> storedDestinations
protected final ConcurrentMap<String,MessageStore> storeCache
protected final LinkedHashMap<TransactionId,List<org.apache.activemq.store.kahadb.MessageDatabase.Operation>> preparedTransactions
protected final Set<String> ackedAndPrepared
protected final Set<String> rolledBackAcks
public MessageDatabase()
public void doStart() throws Exception
doStart
in class ServiceSupport
Exception
public void doStop(ServiceStopper stopper) throws Exception
doStop
in class ServiceSupport
Exception
public void open() throws IOException
IOException
public void load() throws IOException
IOException
public void close() throws IOException, InterruptedException
IOException
InterruptedException
public void unload() throws IOException, InterruptedException
IOException
InterruptedException
public Location[] getInProgressTxLocationRange()
public String getTransactions()
protected void recoverIndex(Transaction tx) throws IOException
IOException
public void incrementalRecover() throws IOException
IOException
public Location getLastUpdatePosition() throws IOException
IOException
protected void checkpointCleanup(boolean cleanup) throws IOException
IOException
public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException
IOException
public Location store(JournalCommand<?> data) throws IOException
IOException
public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException
IOException
public Location store(JournalCommand<?> data, boolean sync, org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before, Runnable after) throws IOException
IOException
public Location store(JournalCommand<?> data, boolean sync, org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException
IOException
public JournalCommand<?> load(Location location) throws IOException
location
- IOException
protected void process(KahaAddMessageCommand command, Location location, org.apache.activemq.store.kahadb.MessageDatabase.IndexAware runWithIndexLock) throws IOException
IOException
protected void process(KahaUpdateMessageCommand command, Location location) throws IOException
IOException
protected void process(KahaRemoveMessageCommand command, Location location) throws IOException
IOException
protected void process(KahaRemoveDestinationCommand command, Location location) throws IOException
IOException
protected void process(KahaSubscriptionCommand command, Location location) throws IOException
IOException
protected void processLocation(Location location)
protected void process(KahaCommitCommand command, Location location, org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before) throws IOException
IOException
protected void process(KahaPrepareCommand command, Location location)
protected void process(KahaRollbackCommand command, Location location) throws IOException
IOException
public HashSet<Integer> getJournalFilesBeingReplicated()
protected org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException
IOException
protected org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException
IOException
protected void clearStoreStats(KahaDestination kahaDestination)
kahaDestination
- protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size)
kahaDestination
- size
- protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size)
protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size)
protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size)
protected MessageStoreStatistics getStoreStats(String kahaDestKey)
kahaDestination
- protected boolean matchType(Destination destination, KahaDestination.DestinationType type)
destination
- type
- public org.apache.activemq.store.kahadb.MessageDatabase.LastAck getLastAck(Transaction tx, org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd, String subscriptionKey) throws IOException
IOException
public long getStoredMessageCount(Transaction tx, org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd, String subscriptionKey) throws IOException
IOException
public long getStoredMessageSize(Transaction tx, org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd, String subscriptionKey) throws IOException
IOException
protected String key(KahaDestination destination)
public void trackRecoveredAcks(ArrayList<MessageAck> acks)
public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException
IOException
protected abstract void configureMetadata()
public int getJournalMaxWriteBatchSize()
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize)
public File getDirectory()
public void setDirectory(File directory)
public boolean isDeleteAllMessages()
public void setDeleteAllMessages(boolean deleteAllMessages)
public void setIndexWriteBatchSize(int setIndexWriteBatchSize)
public int getIndexWriteBatchSize()
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync)
public boolean isEnableJournalDiskSyncs()
public void setEnableJournalDiskSyncs(boolean syncWrites)
public long getCheckpointInterval()
public void setCheckpointInterval(long checkpointInterval)
public long getCleanupInterval()
public void setCleanupInterval(long cleanupInterval)
public void setJournalMaxFileLength(int journalMaxFileLength)
public int getJournalMaxFileLength()
public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack)
public int getMaxFailoverProducersToTrack()
public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth)
public int getFailoverProducersAuditDepth()
public PageFile getPageFile() throws IOException
IOException
public Journal getJournal() throws IOException
IOException
public boolean isFailIfDatabaseIsLocked()
public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked)
public boolean isIgnoreMissingJournalfiles()
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles)
public int getIndexCacheSize()
public void setIndexCacheSize(int indexCacheSize)
public boolean isCheckForCorruptJournalFiles()
public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles)
public boolean isChecksumJournalFiles()
public void setChecksumJournalFiles(boolean checksumJournalFiles)
public void setBrokerService(BrokerService brokerService)
setBrokerService
in interface BrokerServiceAware
public boolean isArchiveDataLogs()
public void setArchiveDataLogs(boolean archiveDataLogs)
archiveDataLogs
- the archiveDataLogs to setpublic File getDirectoryArchive()
public void setDirectoryArchive(File directoryArchive)
directoryArchive
- the directoryArchive to setpublic boolean isArchiveCorruptedIndex()
public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex)
public float getIndexLFUEvictionFactor()
public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor)
public boolean isUseIndexLFRUEviction()
public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction)
public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs)
public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile)
public void setEnableIndexPageCaching(boolean enableIndexPageCaching)
public boolean isEnableIndexDiskSyncs()
public boolean isEnableIndexRecoveryFile()
public boolean isEnableIndexPageCaching()
public File getIndexDirectory()
public void setIndexDirectory(File indexDirectory)
public String getPreallocationScope()
public void setPreallocationScope(String preallocationScope)
public String getPreallocationStrategy()
public void setPreallocationStrategy(String preallocationStrategy)
Copyright © 2005–2016 The Apache Software Foundation. All rights reserved.