001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.kahadb.journal; 018 019import java.io.File; 020import java.io.FilenameFilter; 021import java.io.IOException; 022import java.io.UnsupportedEncodingException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.Iterator; 027import java.util.LinkedHashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.Timer; 032import java.util.TimerTask; 033import java.util.TreeMap; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.atomic.AtomicLong; 036import java.util.concurrent.atomic.AtomicReference; 037import java.util.zip.Adler32; 038import java.util.zip.Checksum; 039import org.apache.kahadb.util.LinkedNode; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042import org.apache.kahadb.util.ByteSequence; 043import org.apache.kahadb.util.DataByteArrayInputStream; 044import org.apache.kahadb.util.DataByteArrayOutputStream; 045import org.apache.kahadb.util.LinkedNodeList; 046import org.apache.kahadb.util.SchedulerTimerTask; 047import org.apache.kahadb.util.Sequence; 048 049/** 050 * Manages DataFiles 051 * 052 * 053 */ 054public class Journal { 055 public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER"; 056 public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false")); 057 058 private static final int MAX_BATCH_SIZE = 32*1024*1024; 059 060 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR 061 public static final int RECORD_HEAD_SPACE = 4 + 1; 062 063 public static final byte USER_RECORD_TYPE = 1; 064 public static final byte BATCH_CONTROL_RECORD_TYPE = 2; 065 // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 066 public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH"); 067 public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8; 068 public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader(); 069 070 private static byte[] createBatchControlRecordHeader() { 071 try { 072 DataByteArrayOutputStream os = new DataByteArrayOutputStream(); 073 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 074 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 075 os.write(BATCH_CONTROL_RECORD_MAGIC); 076 ByteSequence sequence = os.toByteSequence(); 077 sequence.compact(); 078 return sequence.getData(); 079 } catch (IOException e) { 080 throw new RuntimeException("Could not create batch control record header.", e); 081 } 082 } 083 084 public static final String DEFAULT_DIRECTORY = "."; 085 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; 086 public static final String DEFAULT_FILE_PREFIX = "db-"; 087 public static final String DEFAULT_FILE_SUFFIX = ".log"; 088 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 089 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; 090 public static final int PREFERED_DIFF = 1024 * 512; 091 public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4; 092 093 private static final Logger LOG = LoggerFactory.getLogger(Journal.class); 094 095 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); 096 097 protected File directory = new File(DEFAULT_DIRECTORY); 098 protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY); 099 protected String filePrefix = DEFAULT_FILE_PREFIX; 100 protected String fileSuffix = DEFAULT_FILE_SUFFIX; 101 protected boolean started; 102 103 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 104 protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF; 105 protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE; 106 107 protected FileAppender appender; 108 protected DataFileAccessorPool accessorPool; 109 110 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 111 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>(); 112 protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>(); 113 114 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); 115 protected Runnable cleanupTask; 116 protected AtomicLong totalLength = new AtomicLong(); 117 protected boolean archiveDataLogs; 118 private ReplicationTarget replicationTarget; 119 protected boolean checksum; 120 protected boolean checkForCorruptionOnStartup; 121 protected boolean enableAsyncDiskSync = true; 122 private Timer timer; 123 124 public synchronized void start() throws IOException { 125 if (started) { 126 return; 127 } 128 129 long start = System.currentTimeMillis(); 130 accessorPool = new DataFileAccessorPool(this); 131 started = true; 132 preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF); 133 134 appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this); 135 136 File[] files = directory.listFiles(new FilenameFilter() { 137 public boolean accept(File dir, String n) { 138 return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix); 139 } 140 }); 141 142 if (files != null) { 143 for (File file : files) { 144 try { 145 String n = file.getName(); 146 String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length()); 147 int num = Integer.parseInt(numStr); 148 DataFile dataFile = new DataFile(file, num, preferedFileLength); 149 fileMap.put(dataFile.getDataFileId(), dataFile); 150 totalLength.addAndGet(dataFile.getLength()); 151 } catch (NumberFormatException e) { 152 // Ignore file that do not match the pattern. 153 } 154 } 155 156 // Sort the list so that we can link the DataFiles together in the 157 // right order. 158 List<DataFile> l = new ArrayList<DataFile>(fileMap.values()); 159 Collections.sort(l); 160 for (DataFile df : l) { 161 if (df.getLength() == 0) { 162 // possibly the result of a previous failed write 163 LOG.info("ignoring zero length, partially initialised journal data file: " + df); 164 continue; 165 } 166 dataFiles.addLast(df); 167 fileByFileMap.put(df.getFile(), df); 168 169 if( isCheckForCorruptionOnStartup() ) { 170 lastAppendLocation.set(recoveryCheck(df)); 171 } 172 } 173 } 174 175 getCurrentWriteFile(); 176 177 if( lastAppendLocation.get()==null ) { 178 DataFile df = dataFiles.getTail(); 179 lastAppendLocation.set(recoveryCheck(df)); 180 } 181 182 cleanupTask = new Runnable() { 183 public void run() { 184 cleanup(); 185 } 186 }; 187 this.timer = new Timer("KahaDB Scheduler", true); 188 TimerTask task = new SchedulerTimerTask(cleanupTask); 189 this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL); 190 long end = System.currentTimeMillis(); 191 LOG.trace("Startup took: "+(end-start)+" ms"); 192 } 193 194 private static byte[] bytes(String string) { 195 try { 196 return string.getBytes("UTF-8"); 197 } catch (UnsupportedEncodingException e) { 198 throw new RuntimeException(e); 199 } 200 } 201 202 protected Location recoveryCheck(DataFile dataFile) throws IOException { 203 Location location = new Location(); 204 location.setDataFileId(dataFile.getDataFileId()); 205 location.setOffset(0); 206 207 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 208 try { 209 while( true ) { 210 int size = checkBatchRecord(reader, location.getOffset()); 211 if ( size>=0 ) { 212 location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size); 213 } else { 214 215 // Perhaps it's just some corruption... scan through the file to find the next valid batch record. We 216 // may have subsequent valid batch records. 217 int nextOffset = findNextBatchRecord(reader, location.getOffset()+1); 218 if( nextOffset >=0 ) { 219 Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); 220 LOG.info("Corrupt journal records found in '"+dataFile.getFile()+"' between offsets: "+sequence); 221 dataFile.corruptedBlocks.add(sequence); 222 location.setOffset(nextOffset); 223 } else { 224 break; 225 } 226 } 227 } 228 229 } catch (IOException e) { 230 } finally { 231 accessorPool.closeDataFileAccessor(reader); 232 } 233 234 int existingLen = dataFile.getLength(); 235 dataFile.setLength(location.getOffset()); 236 if (existingLen > dataFile.getLength()) { 237 totalLength.addAndGet(dataFile.getLength() - existingLen); 238 } 239 240 if( !dataFile.corruptedBlocks.isEmpty() ) { 241 // Is the end of the data file corrupted? 242 if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) { 243 dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst()); 244 } 245 } 246 247 return location; 248 } 249 250 private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException { 251 ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); 252 byte data[] = new byte[1024*4]; 253 ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data)); 254 255 int pos = 0; 256 while( true ) { 257 pos = bs.indexOf(header, pos); 258 if( pos >= 0 ) { 259 return offset+pos; 260 } else { 261 // need to load the next data chunck in.. 262 if( bs.length != data.length ) { 263 // If we had a short read then we were at EOF 264 return -1; 265 } 266 offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length; 267 bs = new ByteSequence(data, 0, reader.read(offset, data)); 268 pos=0; 269 } 270 } 271 } 272 273 274 public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException { 275 byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE]; 276 DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord); 277 278 reader.readFully(offset, controlRecord); 279 280 // Assert that it's a batch record. 281 for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) { 282 if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) { 283 return -1; 284 } 285 } 286 287 int size = controlIs.readInt(); 288 if( size > MAX_BATCH_SIZE ) { 289 return -1; 290 } 291 292 if( isChecksum() ) { 293 294 long expectedChecksum = controlIs.readLong(); 295 if( expectedChecksum == 0 ) { 296 // Checksuming was not enabled when the record was stored. 297 // we can't validate the record :( 298 return size; 299 } 300 301 byte data[] = new byte[size]; 302 reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data); 303 304 Checksum checksum = new Adler32(); 305 checksum.update(data, 0, data.length); 306 307 if( expectedChecksum!=checksum.getValue() ) { 308 return -1; 309 } 310 311 } 312 return size; 313 } 314 315 316 void addToTotalLength(int size) { 317 totalLength.addAndGet(size); 318 } 319 320 public long length() { 321 return totalLength.get(); 322 } 323 324 synchronized DataFile getCurrentWriteFile() throws IOException { 325 if (dataFiles.isEmpty()) { 326 rotateWriteFile(); 327 } 328 return dataFiles.getTail(); 329 } 330 331 synchronized DataFile rotateWriteFile() { 332 int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1; 333 File file = getFile(nextNum); 334 DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength); 335 // actually allocate the disk space 336 fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile); 337 fileByFileMap.put(file, nextWriteFile); 338 dataFiles.addLast(nextWriteFile); 339 return nextWriteFile; 340 } 341 342 public File getFile(int nextNum) { 343 String fileName = filePrefix + nextNum + fileSuffix; 344 File file = new File(directory, fileName); 345 return file; 346 } 347 348 synchronized DataFile getDataFile(Location item) throws IOException { 349 Integer key = Integer.valueOf(item.getDataFileId()); 350 DataFile dataFile = fileMap.get(key); 351 if (dataFile == null) { 352 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 353 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 354 } 355 return dataFile; 356 } 357 358 synchronized File getFile(Location item) throws IOException { 359 Integer key = Integer.valueOf(item.getDataFileId()); 360 DataFile dataFile = fileMap.get(key); 361 if (dataFile == null) { 362 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 363 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 364 } 365 return dataFile.getFile(); 366 } 367 368 private DataFile getNextDataFile(DataFile dataFile) { 369 return dataFile.getNext(); 370 } 371 372 public synchronized void close() throws IOException { 373 if (!started) { 374 return; 375 } 376 if (this.timer != null) { 377 this.timer.cancel(); 378 } 379 accessorPool.close(); 380 appender.close(); 381 fileMap.clear(); 382 fileByFileMap.clear(); 383 dataFiles.clear(); 384 lastAppendLocation.set(null); 385 started = false; 386 } 387 388 protected synchronized void cleanup() { 389 if (accessorPool != null) { 390 accessorPool.disposeUnused(); 391 } 392 } 393 394 public synchronized boolean delete() throws IOException { 395 396 // Close all open file handles... 397 appender.close(); 398 accessorPool.close(); 399 400 boolean result = true; 401 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 402 DataFile dataFile = i.next(); 403 totalLength.addAndGet(-dataFile.getLength()); 404 result &= dataFile.delete(); 405 } 406 fileMap.clear(); 407 fileByFileMap.clear(); 408 lastAppendLocation.set(null); 409 dataFiles = new LinkedNodeList<DataFile>(); 410 411 // reopen open file handles... 412 accessorPool = new DataFileAccessorPool(this); 413 appender = new DataFileAppender(this); 414 return result; 415 } 416 417 public synchronized void removeDataFiles(Set<Integer> files) throws IOException { 418 for (Integer key : files) { 419 // Can't remove the data file (or subsequent files) that is currently being written to. 420 if( key >= lastAppendLocation.get().getDataFileId() ) { 421 continue; 422 } 423 DataFile dataFile = fileMap.get(key); 424 if( dataFile!=null ) { 425 forceRemoveDataFile(dataFile); 426 } 427 } 428 } 429 430 private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException { 431 accessorPool.disposeDataFileAccessors(dataFile); 432 fileByFileMap.remove(dataFile.getFile()); 433 fileMap.remove(dataFile.getDataFileId()); 434 totalLength.addAndGet(-dataFile.getLength()); 435 dataFile.unlink(); 436 if (archiveDataLogs) { 437 dataFile.move(getDirectoryArchive()); 438 LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive()); 439 } else { 440 if ( dataFile.delete() ) { 441 LOG.debug("Discarded data file " + dataFile); 442 } else { 443 LOG.warn("Failed to discard data file " + dataFile.getFile()); 444 } 445 } 446 } 447 448 /** 449 * @return the maxFileLength 450 */ 451 public int getMaxFileLength() { 452 return maxFileLength; 453 } 454 455 /** 456 * @param maxFileLength the maxFileLength to set 457 */ 458 public void setMaxFileLength(int maxFileLength) { 459 this.maxFileLength = maxFileLength; 460 } 461 462 @Override 463 public String toString() { 464 return directory.toString(); 465 } 466 467 public synchronized void appendedExternally(Location loc, int length) throws IOException { 468 DataFile dataFile = null; 469 if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) { 470 // It's an update to the current log file.. 471 dataFile = dataFiles.getTail(); 472 dataFile.incrementLength(length); 473 } else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) { 474 // It's an update to the next log file. 475 int nextNum = loc.getDataFileId(); 476 File file = getFile(nextNum); 477 dataFile = new DataFile(file, nextNum, preferedFileLength); 478 // actually allocate the disk space 479 fileMap.put(dataFile.getDataFileId(), dataFile); 480 fileByFileMap.put(file, dataFile); 481 dataFiles.addLast(dataFile); 482 } else { 483 throw new IOException("Invalid external append."); 484 } 485 } 486 487 public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException { 488 489 Location cur = null; 490 while (true) { 491 if (cur == null) { 492 if (location == null) { 493 DataFile head = dataFiles.getHead(); 494 if( head == null ) { 495 return null; 496 } 497 cur = new Location(); 498 cur.setDataFileId(head.getDataFileId()); 499 cur.setOffset(0); 500 } else { 501 // Set to the next offset.. 502 if (location.getSize() == -1) { 503 cur = new Location(location); 504 } else { 505 cur = new Location(location); 506 cur.setOffset(location.getOffset() + location.getSize()); 507 } 508 } 509 } else { 510 cur.setOffset(cur.getOffset() + cur.getSize()); 511 } 512 513 DataFile dataFile = getDataFile(cur); 514 515 // Did it go into the next file?? 516 if (dataFile.getLength() <= cur.getOffset()) { 517 dataFile = getNextDataFile(dataFile); 518 if (dataFile == null) { 519 return null; 520 } else { 521 cur.setDataFileId(dataFile.getDataFileId().intValue()); 522 cur.setOffset(0); 523 } 524 } 525 526 // Load in location size and type. 527 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 528 try { 529 reader.readLocationDetails(cur); 530 } finally { 531 accessorPool.closeDataFileAccessor(reader); 532 } 533 534 if (cur.getType() == 0) { 535 return null; 536 } else if (cur.getType() == USER_RECORD_TYPE) { 537 // Only return user records. 538 return cur; 539 } 540 } 541 } 542 543 public synchronized Location getNextLocation(File file, Location lastLocation, boolean thisFileOnly) throws IllegalStateException, IOException { 544 DataFile df = fileByFileMap.get(file); 545 return getNextLocation(df, lastLocation, thisFileOnly); 546 } 547 548 public synchronized Location getNextLocation(DataFile dataFile, Location lastLocation, boolean thisFileOnly) throws IOException, IllegalStateException { 549 550 Location cur = null; 551 while (true) { 552 if (cur == null) { 553 if (lastLocation == null) { 554 DataFile head = dataFile.getHeadNode(); 555 cur = new Location(); 556 cur.setDataFileId(head.getDataFileId()); 557 cur.setOffset(0); 558 } else { 559 // Set to the next offset.. 560 cur = new Location(lastLocation); 561 cur.setOffset(cur.getOffset() + cur.getSize()); 562 } 563 } else { 564 cur.setOffset(cur.getOffset() + cur.getSize()); 565 } 566 567 // Did it go into the next file?? 568 if (dataFile.getLength() <= cur.getOffset()) { 569 if (thisFileOnly) { 570 return null; 571 } else { 572 dataFile = getNextDataFile(dataFile); 573 if (dataFile == null) { 574 return null; 575 } else { 576 cur.setDataFileId(dataFile.getDataFileId().intValue()); 577 cur.setOffset(0); 578 } 579 } 580 } 581 582 // Load in location size and type. 583 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 584 try { 585 reader.readLocationDetails(cur); 586 } finally { 587 accessorPool.closeDataFileAccessor(reader); 588 } 589 590 if (cur.getType() == 0) { 591 return null; 592 } else if (cur.getType() > 0) { 593 // Only return user records. 594 return cur; 595 } 596 } 597 } 598 599 public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException { 600 DataFile dataFile = getDataFile(location); 601 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 602 ByteSequence rc = null; 603 try { 604 rc = reader.readRecord(location); 605 } finally { 606 accessorPool.closeDataFileAccessor(reader); 607 } 608 return rc; 609 } 610 611 public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { 612 Location loc = appender.storeItem(data, Location.USER_TYPE, sync); 613 return loc; 614 } 615 616 public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { 617 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); 618 return loc; 619 } 620 621 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 622 DataFile dataFile = getDataFile(location); 623 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 624 try { 625 updater.updateRecord(location, data, sync); 626 } finally { 627 accessorPool.closeDataFileAccessor(updater); 628 } 629 } 630 631 public File getDirectory() { 632 return directory; 633 } 634 635 public void setDirectory(File directory) { 636 this.directory = directory; 637 } 638 639 public String getFilePrefix() { 640 return filePrefix; 641 } 642 643 public void setFilePrefix(String filePrefix) { 644 this.filePrefix = filePrefix; 645 } 646 647 public Map<WriteKey, WriteCommand> getInflightWrites() { 648 return inflightWrites; 649 } 650 651 public Location getLastAppendLocation() { 652 return lastAppendLocation.get(); 653 } 654 655 public void setLastAppendLocation(Location lastSyncedLocation) { 656 this.lastAppendLocation.set(lastSyncedLocation); 657 } 658 659 public File getDirectoryArchive() { 660 return directoryArchive; 661 } 662 663 public void setDirectoryArchive(File directoryArchive) { 664 this.directoryArchive = directoryArchive; 665 } 666 667 public boolean isArchiveDataLogs() { 668 return archiveDataLogs; 669 } 670 671 public void setArchiveDataLogs(boolean archiveDataLogs) { 672 this.archiveDataLogs = archiveDataLogs; 673 } 674 675 synchronized public Integer getCurrentDataFileId() { 676 if (dataFiles.isEmpty()) 677 return null; 678 return dataFiles.getTail().getDataFileId(); 679 } 680 681 /** 682 * Get a set of files - only valid after start() 683 * 684 * @return files currently being used 685 */ 686 public Set<File> getFiles() { 687 return fileByFileMap.keySet(); 688 } 689 690 public synchronized Map<Integer, DataFile> getFileMap() { 691 return new TreeMap<Integer, DataFile>(fileMap); 692 } 693 694 public long getDiskSize() { 695 long tailLength=0; 696 synchronized( this ) { 697 if( !dataFiles.isEmpty() ) { 698 tailLength = dataFiles.getTail().getLength(); 699 } 700 } 701 702 long rc = totalLength.get(); 703 704 // The last file is actually at a minimum preferedFileLength big. 705 if( tailLength < preferedFileLength ) { 706 rc -= tailLength; 707 rc += preferedFileLength; 708 } 709 return rc; 710 } 711 712 public void setReplicationTarget(ReplicationTarget replicationTarget) { 713 this.replicationTarget = replicationTarget; 714 } 715 public ReplicationTarget getReplicationTarget() { 716 return replicationTarget; 717 } 718 719 public String getFileSuffix() { 720 return fileSuffix; 721 } 722 723 public void setFileSuffix(String fileSuffix) { 724 this.fileSuffix = fileSuffix; 725 } 726 727 public boolean isChecksum() { 728 return checksum; 729 } 730 731 public void setChecksum(boolean checksumWrites) { 732 this.checksum = checksumWrites; 733 } 734 735 public boolean isCheckForCorruptionOnStartup() { 736 return checkForCorruptionOnStartup; 737 } 738 739 public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) { 740 this.checkForCorruptionOnStartup = checkForCorruptionOnStartup; 741 } 742 743 public void setWriteBatchSize(int writeBatchSize) { 744 this.writeBatchSize = writeBatchSize; 745 } 746 747 public int getWriteBatchSize() { 748 return writeBatchSize; 749 } 750 751 public void setSizeAccumulator(AtomicLong storeSizeAccumulator) { 752 this.totalLength = storeSizeAccumulator; 753 } 754 755 public void setEnableAsyncDiskSync(boolean val) { 756 this.enableAsyncDiskSync = val; 757 } 758 759 public boolean isEnableAsyncDiskSync() { 760 return enableAsyncDiskSync; 761 } 762 763 public static class WriteCommand extends LinkedNode<WriteCommand> { 764 public final Location location; 765 public final ByteSequence data; 766 final boolean sync; 767 public final Runnable onComplete; 768 769 public WriteCommand(Location location, ByteSequence data, boolean sync) { 770 this.location = location; 771 this.data = data; 772 this.sync = sync; 773 this.onComplete = null; 774 } 775 776 public WriteCommand(Location location, ByteSequence data, Runnable onComplete) { 777 this.location = location; 778 this.data = data; 779 this.onComplete = onComplete; 780 this.sync = false; 781 } 782 } 783 784 public static class WriteKey { 785 private final int file; 786 private final long offset; 787 private final int hash; 788 789 public WriteKey(Location item) { 790 file = item.getDataFileId(); 791 offset = item.getOffset(); 792 // TODO: see if we can build a better hash 793 hash = (int)(file ^ offset); 794 } 795 796 public int hashCode() { 797 return hash; 798 } 799 800 public boolean equals(Object obj) { 801 if (obj instanceof WriteKey) { 802 WriteKey di = (WriteKey)obj; 803 return di.file == file && di.offset == offset; 804 } 805 return false; 806 } 807 } 808}