001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.journal; 018 019import java.io.*; 020import java.nio.ByteBuffer; 021import java.nio.channels.FileChannel; 022import java.util.*; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.atomic.AtomicLong; 025import java.util.concurrent.atomic.AtomicReference; 026import java.util.zip.Adler32; 027import java.util.zip.Checksum; 028import org.apache.activemq.store.kahadb.disk.util.LinkedNode; 029import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 030import org.apache.activemq.util.*; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; 034import org.apache.activemq.store.kahadb.disk.util.SchedulerTimerTask; 035import org.apache.activemq.store.kahadb.disk.util.Sequence; 036 037/** 038 * Manages DataFiles 039 * 040 * 041 */ 042public class Journal { 043 public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER"; 044 public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false")); 045 046 private static final int MAX_BATCH_SIZE = 32*1024*1024; 047 048 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR 049 public static final int RECORD_HEAD_SPACE = 4 + 1; 050 051 public static final byte USER_RECORD_TYPE = 1; 052 public static final byte BATCH_CONTROL_RECORD_TYPE = 2; 053 // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 054 public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH"); 055 public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8; 056 public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader(); 057 058 // tackle corruption when checksum is disabled or corrupt with zeros, minimise data loss 059 public void corruptRecoveryLocation(Location recoveryPosition) throws IOException { 060 DataFile dataFile = getDataFile(recoveryPosition); 061 // with corruption on recovery we have no faith in the content - slip to the next batch record or eof 062 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 063 try { 064 int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1); 065 Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1); 066 LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence); 067 068 // skip corruption on getNextLocation 069 recoveryPosition.setOffset((int) sequence.getLast() + 1); 070 recoveryPosition.setSize(-1); 071 072 dataFile.corruptedBlocks.add(sequence); 073 074 } catch (IOException e) { 075 } finally { 076 accessorPool.closeDataFileAccessor(reader); 077 } 078 } 079 080 public enum PreallocationStrategy { 081 SPARSE_FILE, 082 OS_KERNEL_COPY, 083 ZEROS; 084 } 085 086 public enum PreallocationScope { 087 ENTIRE_JOURNAL; 088 } 089 090 private static byte[] createBatchControlRecordHeader() { 091 try { 092 DataByteArrayOutputStream os = new DataByteArrayOutputStream(); 093 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 094 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 095 os.write(BATCH_CONTROL_RECORD_MAGIC); 096 ByteSequence sequence = os.toByteSequence(); 097 sequence.compact(); 098 return sequence.getData(); 099 } catch (IOException e) { 100 throw new RuntimeException("Could not create batch control record header.", e); 101 } 102 } 103 104 public static final String DEFAULT_DIRECTORY = "."; 105 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; 106 public static final String DEFAULT_FILE_PREFIX = "db-"; 107 public static final String DEFAULT_FILE_SUFFIX = ".log"; 108 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 109 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; 110 public static final int PREFERED_DIFF = 1024 * 512; 111 public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4; 112 113 private static final Logger LOG = LoggerFactory.getLogger(Journal.class); 114 115 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); 116 117 protected File directory = new File(DEFAULT_DIRECTORY); 118 protected File directoryArchive; 119 private boolean directoryArchiveOverridden = false; 120 121 protected String filePrefix = DEFAULT_FILE_PREFIX; 122 protected String fileSuffix = DEFAULT_FILE_SUFFIX; 123 protected boolean started; 124 125 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 126 protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE; 127 128 protected FileAppender appender; 129 protected DataFileAccessorPool accessorPool; 130 131 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 132 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>(); 133 protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>(); 134 135 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); 136 protected Runnable cleanupTask; 137 protected AtomicLong totalLength = new AtomicLong(); 138 protected boolean archiveDataLogs; 139 private ReplicationTarget replicationTarget; 140 protected boolean checksum; 141 protected boolean checkForCorruptionOnStartup; 142 protected boolean enableAsyncDiskSync = true; 143 private Timer timer; 144 145 protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL; 146 protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE; 147 148 public interface DataFileRemovedListener { 149 void fileRemoved(DataFile datafile); 150 } 151 152 private DataFileRemovedListener dataFileRemovedListener; 153 154 public synchronized void start() throws IOException { 155 if (started) { 156 return; 157 } 158 159 long start = System.currentTimeMillis(); 160 accessorPool = new DataFileAccessorPool(this); 161 started = true; 162 163 appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this); 164 165 File[] files = directory.listFiles(new FilenameFilter() { 166 public boolean accept(File dir, String n) { 167 return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix); 168 } 169 }); 170 171 if (files != null) { 172 for (File file : files) { 173 try { 174 String n = file.getName(); 175 String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length()); 176 int num = Integer.parseInt(numStr); 177 DataFile dataFile = new DataFile(file, num); 178 fileMap.put(dataFile.getDataFileId(), dataFile); 179 totalLength.addAndGet(dataFile.getLength()); 180 } catch (NumberFormatException e) { 181 // Ignore file that do not match the pattern. 182 } 183 } 184 185 // Sort the list so that we can link the DataFiles together in the 186 // right order. 187 List<DataFile> l = new ArrayList<DataFile>(fileMap.values()); 188 Collections.sort(l); 189 for (DataFile df : l) { 190 if (df.getLength() == 0) { 191 // possibly the result of a previous failed write 192 LOG.info("ignoring zero length, partially initialised journal data file: " + df); 193 continue; 194 } 195 dataFiles.addLast(df); 196 fileByFileMap.put(df.getFile(), df); 197 198 if( isCheckForCorruptionOnStartup() ) { 199 lastAppendLocation.set(recoveryCheck(df)); 200 } 201 } 202 } 203 204 getCurrentWriteFile(); 205 206 if (preallocationStrategy != PreallocationStrategy.SPARSE_FILE && maxFileLength != DEFAULT_MAX_FILE_LENGTH) { 207 LOG.warn("You are using a preallocation strategy and journal maxFileLength which should be benchmarked accordingly to not introduce unexpected latencies."); 208 } 209 210 if( lastAppendLocation.get()==null ) { 211 DataFile df = dataFiles.getTail(); 212 lastAppendLocation.set(recoveryCheck(df)); 213 } 214 215 // ensure we don't report unused space of last journal file in size metric 216 if (totalLength.get() > maxFileLength && lastAppendLocation.get().getOffset() > 0) { 217 totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength); 218 } 219 220 221 cleanupTask = new Runnable() { 222 public void run() { 223 cleanup(); 224 } 225 }; 226 this.timer = new Timer("KahaDB Scheduler", true); 227 TimerTask task = new SchedulerTimerTask(cleanupTask); 228 this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL); 229 long end = System.currentTimeMillis(); 230 LOG.trace("Startup took: "+(end-start)+" ms"); 231 } 232 233 234 public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) { 235 236 if (PreallocationScope.ENTIRE_JOURNAL == preallocationScope) { 237 238 if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) { 239 doPreallocationKernelCopy(file); 240 241 }else if (PreallocationStrategy.ZEROS == preallocationStrategy) { 242 doPreallocationZeros(file); 243 } 244 else { 245 doPreallocationSparseFile(file); 246 } 247 }else { 248 LOG.info("Using journal preallocation scope of batch allocation"); 249 } 250 } 251 252 private void doPreallocationSparseFile(RecoverableRandomAccessFile file) { 253 try { 254 file.seek(maxFileLength - 1); 255 file.write((byte)0x00); 256 } catch (IOException e) { 257 LOG.error("Could not preallocate journal file with sparse file! Will continue without preallocation", e); 258 } 259 } 260 261 private void doPreallocationZeros(RecoverableRandomAccessFile file) { 262 ByteBuffer buffer = ByteBuffer.allocate(maxFileLength); 263 for (int i = 0; i < maxFileLength; i++) { 264 buffer.put((byte) 0x00); 265 } 266 buffer.flip(); 267 268 try { 269 FileChannel channel = file.getChannel(); 270 channel.write(buffer); 271 channel.force(false); 272 channel.position(0); 273 } catch (IOException e) { 274 LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e); 275 } 276 } 277 278 private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) { 279 280 // create a template file that will be used to pre-allocate the journal files 281 File templateFile = createJournalTemplateFile(); 282 283 RandomAccessFile templateRaf = null; 284 try { 285 templateRaf = new RandomAccessFile(templateFile, "rw"); 286 templateRaf.setLength(maxFileLength); 287 templateRaf.getChannel().force(true); 288 templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel()); 289 templateRaf.close(); 290 templateFile.delete(); 291 } catch (FileNotFoundException e) { 292 LOG.error("Could not find the template file on disk at " + templateFile.getAbsolutePath(), e); 293 } catch (IOException e) { 294 LOG.error("Could not transfer the template file to journal, transferFile=" + templateFile.getAbsolutePath(), e); 295 } 296 } 297 298 private File createJournalTemplateFile() { 299 String fileName = "db-log.template"; 300 File rc = new File(directory, fileName); 301 if (rc.exists()) { 302 System.out.println("deleting file because it already exists..."); 303 rc.delete(); 304 305 } 306 return rc; 307 } 308 309 private static byte[] bytes(String string) { 310 try { 311 return string.getBytes("UTF-8"); 312 } catch (UnsupportedEncodingException e) { 313 throw new RuntimeException(e); 314 } 315 } 316 317 protected Location recoveryCheck(DataFile dataFile) throws IOException { 318 Location location = new Location(); 319 location.setDataFileId(dataFile.getDataFileId()); 320 location.setOffset(0); 321 322 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 323 try { 324 while( true ) { 325 int size = checkBatchRecord(reader, location.getOffset()); 326 if ( size>=0 && location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size <= dataFile.getLength()) { 327 location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size); 328 } else { 329 330 // Perhaps it's just some corruption... scan through the file to find the next valid batch record. We 331 // may have subsequent valid batch records. 332 int nextOffset = findNextBatchRecord(reader, location.getOffset()+1); 333 if( nextOffset >=0 ) { 334 Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); 335 LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence); 336 dataFile.corruptedBlocks.add(sequence); 337 location.setOffset(nextOffset); 338 } else { 339 break; 340 } 341 } 342 } 343 344 } catch (IOException e) { 345 } finally { 346 accessorPool.closeDataFileAccessor(reader); 347 } 348 349 int existingLen = dataFile.getLength(); 350 dataFile.setLength(location.getOffset()); 351 if (existingLen > dataFile.getLength()) { 352 totalLength.addAndGet(dataFile.getLength() - existingLen); 353 } 354 355 if( !dataFile.corruptedBlocks.isEmpty() ) { 356 // Is the end of the data file corrupted? 357 if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) { 358 dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst()); 359 } 360 } 361 362 return location; 363 } 364 365 private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException { 366 ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); 367 byte data[] = new byte[1024*4]; 368 ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data)); 369 370 int pos = 0; 371 while( true ) { 372 pos = bs.indexOf(header, pos); 373 if( pos >= 0 ) { 374 return offset+pos; 375 } else { 376 // need to load the next data chunck in.. 377 if( bs.length != data.length ) { 378 // If we had a short read then we were at EOF 379 return -1; 380 } 381 offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length; 382 bs = new ByteSequence(data, 0, reader.read(offset, data)); 383 pos=0; 384 } 385 } 386 } 387 388 389 public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException { 390 byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE]; 391 DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord); 392 393 reader.readFully(offset, controlRecord); 394 395 // Assert that it's a batch record. 396 for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) { 397 if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) { 398 return -1; 399 } 400 } 401 402 int size = controlIs.readInt(); 403 if( size > MAX_BATCH_SIZE ) { 404 return -1; 405 } 406 407 if( isChecksum() ) { 408 409 long expectedChecksum = controlIs.readLong(); 410 if( expectedChecksum == 0 ) { 411 // Checksuming was not enabled when the record was stored. 412 // we can't validate the record :( 413 return size; 414 } 415 416 byte data[] = new byte[size]; 417 reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data); 418 419 Checksum checksum = new Adler32(); 420 checksum.update(data, 0, data.length); 421 422 if( expectedChecksum!=checksum.getValue() ) { 423 return -1; 424 } 425 426 } 427 return size; 428 } 429 430 431 void addToTotalLength(int size) { 432 totalLength.addAndGet(size); 433 } 434 435 public long length() { 436 return totalLength.get(); 437 } 438 439 synchronized DataFile getCurrentWriteFile() throws IOException { 440 if (dataFiles.isEmpty()) { 441 rotateWriteFile(); 442 } 443 return dataFiles.getTail(); 444 } 445 446 synchronized DataFile rotateWriteFile() { 447 int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1; 448 File file = getFile(nextNum); 449 DataFile nextWriteFile = new DataFile(file, nextNum); 450 fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile); 451 fileByFileMap.put(file, nextWriteFile); 452 dataFiles.addLast(nextWriteFile); 453 return nextWriteFile; 454 } 455 456 public File getFile(int nextNum) { 457 String fileName = filePrefix + nextNum + fileSuffix; 458 File file = new File(directory, fileName); 459 return file; 460 } 461 462 synchronized DataFile getDataFile(Location item) throws IOException { 463 Integer key = Integer.valueOf(item.getDataFileId()); 464 DataFile dataFile = fileMap.get(key); 465 if (dataFile == null) { 466 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 467 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 468 } 469 return dataFile; 470 } 471 472 synchronized File getFile(Location item) throws IOException { 473 Integer key = Integer.valueOf(item.getDataFileId()); 474 DataFile dataFile = fileMap.get(key); 475 if (dataFile == null) { 476 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 477 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 478 } 479 return dataFile.getFile(); 480 } 481 482 private DataFile getNextDataFile(DataFile dataFile) { 483 return dataFile.getNext(); 484 } 485 486 public void close() throws IOException { 487 synchronized (this) { 488 if (!started) { 489 return; 490 } 491 if (this.timer != null) { 492 this.timer.cancel(); 493 } 494 accessorPool.close(); 495 } 496 // the appender can be calling back to to the journal blocking a close AMQ-5620 497 appender.close(); 498 synchronized (this) { 499 fileMap.clear(); 500 fileByFileMap.clear(); 501 dataFiles.clear(); 502 lastAppendLocation.set(null); 503 started = false; 504 } 505 } 506 507 protected synchronized void cleanup() { 508 if (accessorPool != null) { 509 accessorPool.disposeUnused(); 510 } 511 } 512 513 public synchronized boolean delete() throws IOException { 514 515 // Close all open file handles... 516 appender.close(); 517 accessorPool.close(); 518 519 boolean result = true; 520 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 521 DataFile dataFile = i.next(); 522 result &= dataFile.delete(); 523 } 524 totalLength.set(0); 525 fileMap.clear(); 526 fileByFileMap.clear(); 527 lastAppendLocation.set(null); 528 dataFiles = new LinkedNodeList<DataFile>(); 529 530 // reopen open file handles... 531 accessorPool = new DataFileAccessorPool(this); 532 appender = new DataFileAppender(this); 533 return result; 534 } 535 536 public synchronized void removeDataFiles(Set<Integer> files) throws IOException { 537 for (Integer key : files) { 538 // Can't remove the data file (or subsequent files) that is currently being written to. 539 if( key >= lastAppendLocation.get().getDataFileId() ) { 540 continue; 541 } 542 DataFile dataFile = fileMap.get(key); 543 if( dataFile!=null ) { 544 forceRemoveDataFile(dataFile); 545 } 546 } 547 } 548 549 private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException { 550 accessorPool.disposeDataFileAccessors(dataFile); 551 fileByFileMap.remove(dataFile.getFile()); 552 fileMap.remove(dataFile.getDataFileId()); 553 totalLength.addAndGet(-dataFile.getLength()); 554 dataFile.unlink(); 555 if (archiveDataLogs) { 556 File directoryArchive = getDirectoryArchive(); 557 if (directoryArchive.exists()) { 558 LOG.debug("Archive directory exists: {}", directoryArchive); 559 } else { 560 if (directoryArchive.isAbsolute()) 561 if (LOG.isDebugEnabled()) { 562 LOG.debug("Archive directory [{}] does not exist - creating it now", 563 directoryArchive.getAbsolutePath()); 564 } 565 IOHelper.mkdirs(directoryArchive); 566 } 567 LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath()); 568 dataFile.move(directoryArchive); 569 LOG.debug("Successfully moved data file"); 570 } else { 571 LOG.debug("Deleting data file: {}", dataFile); 572 if ( dataFile.delete() ) { 573 LOG.debug("Discarded data file: {}", dataFile); 574 } else { 575 LOG.warn("Failed to discard data file : {}", dataFile.getFile()); 576 } 577 } 578 if (dataFileRemovedListener != null) { 579 dataFileRemovedListener.fileRemoved(dataFile); 580 } 581 } 582 583 /** 584 * @return the maxFileLength 585 */ 586 public int getMaxFileLength() { 587 return maxFileLength; 588 } 589 590 /** 591 * @param maxFileLength the maxFileLength to set 592 */ 593 public void setMaxFileLength(int maxFileLength) { 594 this.maxFileLength = maxFileLength; 595 } 596 597 @Override 598 public String toString() { 599 return directory.toString(); 600 } 601 602 public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException { 603 604 Location cur = null; 605 while (true) { 606 if (cur == null) { 607 if (location == null) { 608 DataFile head = dataFiles.getHead(); 609 if( head == null ) { 610 return null; 611 } 612 cur = new Location(); 613 cur.setDataFileId(head.getDataFileId()); 614 cur.setOffset(0); 615 } else { 616 // Set to the next offset.. 617 if (location.getSize() == -1) { 618 cur = new Location(location); 619 } else { 620 cur = new Location(location); 621 cur.setOffset(location.getOffset() + location.getSize()); 622 } 623 } 624 } else { 625 cur.setOffset(cur.getOffset() + cur.getSize()); 626 } 627 628 DataFile dataFile = getDataFile(cur); 629 630 // Did it go into the next file?? 631 if (dataFile.getLength() <= cur.getOffset()) { 632 dataFile = getNextDataFile(dataFile); 633 if (dataFile == null) { 634 return null; 635 } else { 636 cur.setDataFileId(dataFile.getDataFileId().intValue()); 637 cur.setOffset(0); 638 } 639 } 640 641 // Load in location size and type. 642 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 643 try { 644 reader.readLocationDetails(cur); 645 } finally { 646 accessorPool.closeDataFileAccessor(reader); 647 } 648 649 Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset()); 650 if (corruptedRange != null) { 651 // skip corruption 652 cur.setSize((int) corruptedRange.range()); 653 } else if (cur.getType() == 0) { 654 // eof - jump to next datafile 655 cur.setOffset(maxFileLength); 656 } else if (cur.getType() == USER_RECORD_TYPE) { 657 // Only return user records. 658 return cur; 659 } 660 } 661 } 662 663 public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException { 664 DataFile dataFile = getDataFile(location); 665 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 666 ByteSequence rc = null; 667 try { 668 rc = reader.readRecord(location); 669 } finally { 670 accessorPool.closeDataFileAccessor(reader); 671 } 672 return rc; 673 } 674 675 public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { 676 Location loc = appender.storeItem(data, Location.USER_TYPE, sync); 677 return loc; 678 } 679 680 public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { 681 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); 682 return loc; 683 } 684 685 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 686 DataFile dataFile = getDataFile(location); 687 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 688 try { 689 updater.updateRecord(location, data, sync); 690 } finally { 691 accessorPool.closeDataFileAccessor(updater); 692 } 693 } 694 695 public PreallocationStrategy getPreallocationStrategy() { 696 return preallocationStrategy; 697 } 698 699 public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) { 700 this.preallocationStrategy = preallocationStrategy; 701 } 702 703 public PreallocationScope getPreallocationScope() { 704 return preallocationScope; 705 } 706 707 public void setPreallocationScope(PreallocationScope preallocationScope) { 708 this.preallocationScope = preallocationScope; 709 } 710 711 public File getDirectory() { 712 return directory; 713 } 714 715 public void setDirectory(File directory) { 716 this.directory = directory; 717 } 718 719 public String getFilePrefix() { 720 return filePrefix; 721 } 722 723 public void setFilePrefix(String filePrefix) { 724 this.filePrefix = filePrefix; 725 } 726 727 public Map<WriteKey, WriteCommand> getInflightWrites() { 728 return inflightWrites; 729 } 730 731 public Location getLastAppendLocation() { 732 return lastAppendLocation.get(); 733 } 734 735 public void setLastAppendLocation(Location lastSyncedLocation) { 736 this.lastAppendLocation.set(lastSyncedLocation); 737 } 738 739 public File getDirectoryArchive() { 740 if (!directoryArchiveOverridden && (directoryArchive == null)) { 741 // create the directoryArchive relative to the journal location 742 directoryArchive = new File(directory.getAbsolutePath() + 743 File.separator + DEFAULT_ARCHIVE_DIRECTORY); 744 } 745 return directoryArchive; 746 } 747 748 public void setDirectoryArchive(File directoryArchive) { 749 directoryArchiveOverridden = true; 750 this.directoryArchive = directoryArchive; 751 } 752 753 public boolean isArchiveDataLogs() { 754 return archiveDataLogs; 755 } 756 757 public void setArchiveDataLogs(boolean archiveDataLogs) { 758 this.archiveDataLogs = archiveDataLogs; 759 } 760 761 synchronized public Integer getCurrentDataFileId() { 762 if (dataFiles.isEmpty()) 763 return null; 764 return dataFiles.getTail().getDataFileId(); 765 } 766 767 /** 768 * Get a set of files - only valid after start() 769 * 770 * @return files currently being used 771 */ 772 public Set<File> getFiles() { 773 return fileByFileMap.keySet(); 774 } 775 776 public synchronized Map<Integer, DataFile> getFileMap() { 777 return new TreeMap<Integer, DataFile>(fileMap); 778 } 779 780 public long getDiskSize() { 781 return totalLength.get(); 782 } 783 784 public void setReplicationTarget(ReplicationTarget replicationTarget) { 785 this.replicationTarget = replicationTarget; 786 } 787 public ReplicationTarget getReplicationTarget() { 788 return replicationTarget; 789 } 790 791 public String getFileSuffix() { 792 return fileSuffix; 793 } 794 795 public void setFileSuffix(String fileSuffix) { 796 this.fileSuffix = fileSuffix; 797 } 798 799 public boolean isChecksum() { 800 return checksum; 801 } 802 803 public void setChecksum(boolean checksumWrites) { 804 this.checksum = checksumWrites; 805 } 806 807 public boolean isCheckForCorruptionOnStartup() { 808 return checkForCorruptionOnStartup; 809 } 810 811 public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) { 812 this.checkForCorruptionOnStartup = checkForCorruptionOnStartup; 813 } 814 815 public void setWriteBatchSize(int writeBatchSize) { 816 this.writeBatchSize = writeBatchSize; 817 } 818 819 public int getWriteBatchSize() { 820 return writeBatchSize; 821 } 822 823 public void setSizeAccumulator(AtomicLong storeSizeAccumulator) { 824 this.totalLength = storeSizeAccumulator; 825 } 826 827 public void setEnableAsyncDiskSync(boolean val) { 828 this.enableAsyncDiskSync = val; 829 } 830 831 public boolean isEnableAsyncDiskSync() { 832 return enableAsyncDiskSync; 833 } 834 835 public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) { 836 this.dataFileRemovedListener = dataFileRemovedListener; 837 } 838 839 public static class WriteCommand extends LinkedNode<WriteCommand> { 840 public final Location location; 841 public final ByteSequence data; 842 final boolean sync; 843 public final Runnable onComplete; 844 845 public WriteCommand(Location location, ByteSequence data, boolean sync) { 846 this.location = location; 847 this.data = data; 848 this.sync = sync; 849 this.onComplete = null; 850 } 851 852 public WriteCommand(Location location, ByteSequence data, Runnable onComplete) { 853 this.location = location; 854 this.data = data; 855 this.onComplete = onComplete; 856 this.sync = false; 857 } 858 } 859 860 public static class WriteKey { 861 private final int file; 862 private final long offset; 863 private final int hash; 864 865 public WriteKey(Location item) { 866 file = item.getDataFileId(); 867 offset = item.getOffset(); 868 // TODO: see if we can build a better hash 869 hash = (int)(file ^ offset); 870 } 871 872 public int hashCode() { 873 return hash; 874 } 875 876 public boolean equals(Object obj) { 877 if (obj instanceof WriteKey) { 878 WriteKey di = (WriteKey)obj; 879 return di.file == file && di.offset == offset; 880 } 881 return false; 882 } 883 } 884}