001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.kahadb.page;
018
019import java.io.*;
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.Iterator;
025import java.util.LinkedHashMap;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.Properties;
029import java.util.TreeMap;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicLong;
033import java.util.zip.Adler32;
034import java.util.zip.Checksum;
035
036import org.apache.kahadb.util.DataByteArrayOutputStream;
037import org.apache.kahadb.util.IOExceptionSupport;
038import org.apache.kahadb.util.IOHelper;
039import org.apache.kahadb.util.IntrospectionSupport;
040import org.apache.kahadb.util.LFUCache;
041import org.apache.kahadb.util.LRUCache;
042import org.apache.kahadb.util.Sequence;
043import org.apache.kahadb.util.SequenceSet;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should
049 * be externally synchronized.
050 * <p/>
051 * The file has 3 parts:
052 * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
053 * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
054 * Page Space: The pages in the page file.
055 */
056public class PageFile {
057
058    private static final String PAGEFILE_SUFFIX = ".data";
059    private static final String RECOVERY_FILE_SUFFIX = ".redo";
060    private static final String FREE_FILE_SUFFIX = ".free";
061
062    // 4k Default page size.
063    public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4);
064    public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000);
065    public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);;
066
067    private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4;
068    private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4;
069
070    // Recovery header is (long offset)
071    private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
072
073    // A PageFile will use a couple of files in this directory
074    private File directory;
075    // And the file names in that directory will be based on this name.
076    private final String name;
077
078    // File handle used for reading pages..
079    private RandomAccessFile readFile;
080    // File handle used for writing pages..
081    private RandomAccessFile writeFile;
082    // File handle used for writing pages..
083    private RandomAccessFile recoveryFile;
084
085    // The size of pages
086    private int pageSize = DEFAULT_PAGE_SIZE;
087
088    // The minimum number of space allocated to the recovery file in number of pages.
089    private int recoveryFileMinPageCount = 1000;
090    // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
091    // to this max size as soon as  possible.
092    private int recoveryFileMaxPageCount = 10000;
093    // The number of pages in the current recovery buffer
094    private int recoveryPageCount;
095
096    private AtomicBoolean loaded = new AtomicBoolean();
097    // The number of pages we are aiming to write every time we
098    // write to disk.
099    int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
100
101    // We keep a cache of pages recently used?
102    private Map<Long, Page> pageCache;
103    // The cache of recently used pages.
104    private boolean enablePageCaching = true;
105    // How many pages will we keep in the cache?
106    private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE;
107
108    // Should first log the page write to the recovery buffer? Avoids partial
109    // page write failures..
110    private boolean enableRecoveryFile = true;
111    // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
112    private boolean enableDiskSyncs = true;
113    // Will writes be done in an async thread?
114    private boolean enabledWriteThread = false;
115
116    // These are used if enableAsyncWrites==true
117    private AtomicBoolean stopWriter = new AtomicBoolean();
118    private Thread writerThread;
119    private CountDownLatch checkpointLatch;
120
121    // Keeps track of writes that are being written to disk.
122    private TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
123
124    // Keeps track of free pages.
125    private final AtomicLong nextFreePageId = new AtomicLong();
126    private SequenceSet freeList = new SequenceSet();
127
128    private AtomicLong nextTxid = new AtomicLong();
129
130    // Persistent settings stored in the page file.
131    private MetaData metaData;
132
133    private ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
134
135    private boolean useLFRUEviction = false;
136    private float LFUEvictionFactor = 0.2f;
137
138    /**
139     * Use to keep track of updated pages which have not yet been committed.
140     */
141    static class PageWrite {
142        Page page;
143        byte[] current;
144        byte[] diskBound;
145        long currentLocation = -1;
146        long diskBoundLocation = -1;
147        File tmpFile;
148        int length;
149
150        public PageWrite(Page page, byte[] data) {
151            this.page = page;
152            current = data;
153        }
154
155        public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
156            this.page = page;
157            this.currentLocation = currentLocation;
158            this.tmpFile = tmpFile;
159            this.length = length;
160        }
161
162        public void setCurrent(Page page, byte[] data) {
163            this.page = page;
164            current = data;
165            currentLocation = -1;
166            diskBoundLocation = -1;
167        }
168
169        public void setCurrentLocation(Page page, long location, int length) {
170            this.page = page;
171            this.currentLocation = location;
172            this.length = length;
173            this.current = null;
174        }
175
176        @Override
177        public String toString() {
178            return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]";
179        }
180
181        @SuppressWarnings("unchecked")
182        public Page getPage() {
183            return page;
184        }
185
186        public byte[] getDiskBound() throws IOException {
187            if (diskBound == null && diskBoundLocation != -1) {
188                diskBound = new byte[length];
189                RandomAccessFile file = new RandomAccessFile(tmpFile, "r");
190                file.seek(diskBoundLocation);
191                file.read(diskBound);
192                file.close();
193                diskBoundLocation = -1;
194            }
195            return diskBound;
196        }
197
198        void begin() {
199            if (currentLocation != -1) {
200                diskBoundLocation = currentLocation;
201            } else {
202                diskBound = current;
203            }
204            current = null;
205            currentLocation = -1;
206        }
207
208        /**
209         * @return true if there is no pending writes to do.
210         */
211        boolean done() {
212            diskBoundLocation = -1;
213            diskBound = null;
214            return current == null || currentLocation == -1;
215        }
216
217        boolean isDone() {
218            return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
219        }
220    }
221
222    /**
223     * The MetaData object hold the persistent data associated with a PageFile object.
224     */
225    public static class MetaData {
226
227        String fileType;
228        String fileTypeVersion;
229
230        long metaDataTxId = -1;
231        int pageSize;
232        boolean cleanShutdown;
233        long lastTxId;
234        long freePages;
235
236        public String getFileType() {
237            return fileType;
238        }
239
240        public void setFileType(String fileType) {
241            this.fileType = fileType;
242        }
243
244        public String getFileTypeVersion() {
245            return fileTypeVersion;
246        }
247
248        public void setFileTypeVersion(String version) {
249            this.fileTypeVersion = version;
250        }
251
252        public long getMetaDataTxId() {
253            return metaDataTxId;
254        }
255
256        public void setMetaDataTxId(long metaDataTxId) {
257            this.metaDataTxId = metaDataTxId;
258        }
259
260        public int getPageSize() {
261            return pageSize;
262        }
263
264        public void setPageSize(int pageSize) {
265            this.pageSize = pageSize;
266        }
267
268        public boolean isCleanShutdown() {
269            return cleanShutdown;
270        }
271
272        public void setCleanShutdown(boolean cleanShutdown) {
273            this.cleanShutdown = cleanShutdown;
274        }
275
276        public long getLastTxId() {
277            return lastTxId;
278        }
279
280        public void setLastTxId(long lastTxId) {
281            this.lastTxId = lastTxId;
282        }
283
284        public long getFreePages() {
285            return freePages;
286        }
287
288        public void setFreePages(long value) {
289            this.freePages = value;
290        }
291    }
292
293    public Transaction tx() {
294        assertLoaded();
295        return new Transaction(this);
296    }
297
298    /**
299     * Creates a PageFile in the specified directory who's data files are named by name.
300     */
301    public PageFile(File directory, String name) {
302        this.directory = directory;
303        this.name = name;
304    }
305
306    /**
307     * Deletes the files used by the PageFile object.  This method can only be used when this object is not loaded.
308     *
309     * @throws IOException           if the files cannot be deleted.
310     * @throws IllegalStateException if this PageFile is loaded
311     */
312    public void delete() throws IOException {
313        if (loaded.get()) {
314            throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
315        }
316        delete(getMainPageFile());
317        delete(getFreeFile());
318        delete(getRecoveryFile());
319    }
320
321    public void archive() throws IOException {
322        if (loaded.get()) {
323            throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
324        }
325        long timestamp = System.currentTimeMillis();
326        archive(getMainPageFile(), String.valueOf(timestamp));
327        archive(getFreeFile(), String.valueOf(timestamp));
328        archive(getRecoveryFile(), String.valueOf(timestamp));
329    }
330
331    /**
332     * @param file
333     * @throws IOException
334     */
335    private void delete(File file) throws IOException {
336        if (file.exists() && !file.delete()) {
337            throw new IOException("Could not delete: " + file.getPath());
338        }
339    }
340
341    private void archive(File file, String suffix) throws IOException {
342        if (file.exists()) {
343            File archive = new File(file.getPath() + "-" + suffix);
344            if (!file.renameTo(archive)) {
345                throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath());
346            }
347        }
348    }
349
350    /**
351     * Loads the page file so that it can be accessed for read/write purposes.  This allocates OS resources.  If this is the
352     * first time the page file is loaded, then this creates the page file in the file system.
353     *
354     * @throws IOException           If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if
355     *                               there was a disk error.
356     * @throws IllegalStateException If the page file was already loaded.
357     */
358    public void load() throws IOException, IllegalStateException {
359        if (loaded.compareAndSet(false, true)) {
360
361            if (enablePageCaching) {
362                if (isUseLFRUEviction()) {
363                    pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor()));
364                } else {
365                    pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
366                }
367            }
368
369            File file = getMainPageFile();
370            IOHelper.mkdirs(file.getParentFile());
371            writeFile = new RandomAccessFile(file, "rw");
372            readFile = new RandomAccessFile(file, "r");
373
374            if (readFile.length() > 0) {
375                // Load the page size setting cause that can't change once the file is created.
376                loadMetaData();
377                pageSize = metaData.getPageSize();
378            } else {
379                // Store the page size setting cause that can't change once the file is created.
380                metaData = new MetaData();
381                metaData.setFileType(PageFile.class.getName());
382                metaData.setFileTypeVersion("1");
383                metaData.setPageSize(getPageSize());
384                metaData.setCleanShutdown(true);
385                metaData.setFreePages(-1);
386                metaData.setLastTxId(0);
387                storeMetaData();
388            }
389
390            if (enableRecoveryFile) {
391                recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw");
392            }
393
394            if (metaData.isCleanShutdown()) {
395                nextTxid.set(metaData.getLastTxId() + 1);
396                if (metaData.getFreePages() > 0) {
397                    loadFreeList();
398                }
399            } else {
400                LOG.debug(toString() + ", Recovering page file...");
401                nextTxid.set(redoRecoveryUpdates());
402
403                // Scan all to find the free pages.
404                freeList = new SequenceSet();
405                for (Iterator<Page> i = tx().iterator(true); i.hasNext(); ) {
406                    Page page = i.next();
407                    if (page.getType() == Page.PAGE_FREE_TYPE) {
408                        freeList.add(page.getPageId());
409                    }
410                }
411            }
412
413            metaData.setCleanShutdown(false);
414            storeMetaData();
415            getFreeFile().delete();
416
417            if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
418                writeFile.setLength(PAGE_FILE_HEADER_SIZE);
419            }
420            nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize);
421            startWriter();
422
423        } else {
424            throw new IllegalStateException("Cannot load the page file when it is already loaded.");
425        }
426    }
427
428
429    /**
430     * Unloads a previously loaded PageFile.  This deallocates OS related resources like file handles.
431     * once unloaded, you can no longer use the page file to read or write Pages.
432     *
433     * @throws IOException           if there was a disk error occurred while closing the down the page file.
434     * @throws IllegalStateException if the PageFile is not loaded
435     */
436    public void unload() throws IOException {
437        if (loaded.compareAndSet(true, false)) {
438            flush();
439            try {
440                stopWriter();
441            } catch (InterruptedException e) {
442                throw new InterruptedIOException();
443            }
444
445            if (freeList.isEmpty()) {
446                metaData.setFreePages(0);
447            } else {
448                storeFreeList();
449                metaData.setFreePages(freeList.size());
450            }
451
452            metaData.setLastTxId(nextTxid.get() - 1);
453            metaData.setCleanShutdown(true);
454            storeMetaData();
455
456            if (readFile != null) {
457                readFile.close();
458                readFile = null;
459                writeFile.close();
460                writeFile = null;
461                if (enableRecoveryFile) {
462                    recoveryFile.close();
463                    recoveryFile = null;
464                }
465                freeList.clear();
466                if (pageCache != null) {
467                    pageCache = null;
468                }
469                synchronized (writes) {
470                    writes.clear();
471                }
472            }
473        } else {
474            throw new IllegalStateException("Cannot unload the page file when it is not loaded");
475        }
476    }
477
478    public boolean isLoaded() {
479        return loaded.get();
480    }
481
482    /**
483     * Flush and sync all write buffers to disk.
484     *
485     * @throws IOException If an disk error occurred.
486     */
487    public void flush() throws IOException {
488
489        if (enabledWriteThread && stopWriter.get()) {
490            throw new IOException("Page file already stopped: checkpointing is not allowed");
491        }
492
493        // Setup a latch that gets notified when all buffered writes hits the disk.
494        CountDownLatch checkpointLatch;
495        synchronized (writes) {
496            if (writes.isEmpty()) {
497                return;
498            }
499            if (enabledWriteThread) {
500                if (this.checkpointLatch == null) {
501                    this.checkpointLatch = new CountDownLatch(1);
502                }
503                checkpointLatch = this.checkpointLatch;
504                writes.notify();
505            } else {
506                writeBatch();
507                return;
508            }
509        }
510        try {
511            checkpointLatch.await();
512        } catch (InterruptedException e) {
513            InterruptedIOException ioe = new InterruptedIOException();
514            ioe.initCause(e);
515            throw ioe;
516        }
517    }
518
519
520    public String toString() {
521        return "Page File: " + getMainPageFile();
522    }
523
524    ///////////////////////////////////////////////////////////////////
525    // Private Implementation Methods
526    ///////////////////////////////////////////////////////////////////
527    private File getMainPageFile() {
528        return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX);
529    }
530
531    public File getFreeFile() {
532        return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX);
533    }
534
535    public File getRecoveryFile() {
536        return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX);
537    }
538
539    public long toOffset(long pageId) {
540        return PAGE_FILE_HEADER_SIZE + (pageId * pageSize);
541    }
542
543    private void loadMetaData() throws IOException {
544
545        ByteArrayInputStream is;
546        MetaData v1 = new MetaData();
547        MetaData v2 = new MetaData();
548        try {
549            Properties p = new Properties();
550            byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
551            readFile.seek(0);
552            readFile.readFully(d);
553            is = new ByteArrayInputStream(d);
554            p.load(is);
555            IntrospectionSupport.setProperties(v1, p);
556        } catch (IOException e) {
557            v1 = null;
558        }
559
560        try {
561            Properties p = new Properties();
562            byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
563            readFile.seek(PAGE_FILE_HEADER_SIZE / 2);
564            readFile.readFully(d);
565            is = new ByteArrayInputStream(d);
566            p.load(is);
567            IntrospectionSupport.setProperties(v2, p);
568        } catch (IOException e) {
569            v2 = null;
570        }
571
572        if (v1 == null && v2 == null) {
573            throw new IOException("Could not load page file meta data");
574        }
575
576        if (v1 == null || v1.metaDataTxId < 0) {
577            metaData = v2;
578        } else if (v2 == null || v1.metaDataTxId < 0) {
579            metaData = v1;
580        } else if (v1.metaDataTxId == v2.metaDataTxId) {
581            metaData = v1; // use the first since the 2nd could be a partial..
582        } else {
583            metaData = v2; // use the second cause the first is probably a partial.
584        }
585    }
586
587    private void storeMetaData() throws IOException {
588        // Convert the metadata into a property format
589        metaData.metaDataTxId++;
590        Properties p = new Properties();
591        IntrospectionSupport.getProperties(metaData, p, null);
592
593        ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
594        p.store(os, "");
595        if (os.size() > PAGE_FILE_HEADER_SIZE / 2) {
596            throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2);
597        }
598        // Fill the rest with space...
599        byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()];
600        Arrays.fill(filler, (byte) ' ');
601        os.write(filler);
602        os.flush();
603
604        byte[] d = os.toByteArray();
605
606        // So we don't loose it.. write it 2 times...
607        writeFile.seek(0);
608        writeFile.write(d);
609        writeFile.getFD().sync();
610        writeFile.seek(PAGE_FILE_HEADER_SIZE / 2);
611        writeFile.write(d);
612        writeFile.getFD().sync();
613    }
614
615    private void storeFreeList() throws IOException {
616        FileOutputStream os = new FileOutputStream(getFreeFile());
617        DataOutputStream dos = new DataOutputStream(os);
618        SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos);
619        dos.close();
620    }
621
622    private void loadFreeList() throws IOException {
623        freeList.clear();
624        FileInputStream is = new FileInputStream(getFreeFile());
625        DataInputStream dis = new DataInputStream(is);
626        freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
627        dis.close();
628    }
629
630    ///////////////////////////////////////////////////////////////////
631    // Property Accessors
632    ///////////////////////////////////////////////////////////////////
633
634    /**
635     * Is the recovery buffer used to double buffer page writes.  Enabled by default.
636     *
637     * @return is the recovery buffer enabled.
638     */
639    public boolean isEnableRecoveryFile() {
640        return enableRecoveryFile;
641    }
642
643    /**
644     * Sets if the recovery buffer uses to double buffer page writes.  Enabled by default.  Disabling this
645     * may potentially cause partial page writes which can lead to page file corruption.
646     */
647    public void setEnableRecoveryFile(boolean doubleBuffer) {
648        assertNotLoaded();
649        this.enableRecoveryFile = doubleBuffer;
650    }
651
652    /**
653     * @return Are page writes synced to disk?
654     */
655    public boolean isEnableDiskSyncs() {
656        return enableDiskSyncs;
657    }
658
659    /**
660     * Allows you enable syncing writes to disk.
661     */
662    public void setEnableDiskSyncs(boolean syncWrites) {
663        assertNotLoaded();
664        this.enableDiskSyncs = syncWrites;
665    }
666
667    /**
668     * @return the page size
669     */
670    public int getPageSize() {
671        return this.pageSize;
672    }
673
674    /**
675     * @return the amount of content data that a page can hold.
676     */
677    public int getPageContentSize() {
678        return this.pageSize - Page.PAGE_HEADER_SIZE;
679    }
680
681    /**
682     * Configures the page size used by the page file.  By default it is 4k.  Once a page file is created on disk,
683     * subsequent loads of that file will use the original pageSize.  Once the PageFile is loaded, this setting
684     * can no longer be changed.
685     *
686     * @param pageSize the pageSize to set
687     * @throws IllegalStateException once the page file is loaded.
688     */
689    public void setPageSize(int pageSize) throws IllegalStateException {
690        assertNotLoaded();
691        this.pageSize = pageSize;
692    }
693
694    /**
695     * @return true if read page caching is enabled
696     */
697    public boolean isEnablePageCaching() {
698        return this.enablePageCaching;
699    }
700
701    /**
702     * @param enablePageCaching allows you to enable read page caching
703     */
704    public void setEnablePageCaching(boolean enablePageCaching) {
705        assertNotLoaded();
706        this.enablePageCaching = enablePageCaching;
707    }
708
709    /**
710     * @return the maximum number of pages that will get stored in the read page cache.
711     */
712    public int getPageCacheSize() {
713        return this.pageCacheSize;
714    }
715
716    /**
717     * @param pageCacheSize Sets the maximum number of pages that will get stored in the read page cache.
718     */
719    public void setPageCacheSize(int pageCacheSize) {
720        assertNotLoaded();
721        this.pageCacheSize = pageCacheSize;
722    }
723
724    public boolean isEnabledWriteThread() {
725        return enabledWriteThread;
726    }
727
728    public void setEnableWriteThread(boolean enableAsyncWrites) {
729        assertNotLoaded();
730        this.enabledWriteThread = enableAsyncWrites;
731    }
732
733    public long getDiskSize() throws IOException {
734        return toOffset(nextFreePageId.get());
735    }
736
737    /**
738     * @return the number of pages allocated in the PageFile
739     */
740    public long getPageCount() {
741        return nextFreePageId.get();
742    }
743
744    public int getRecoveryFileMinPageCount() {
745        return recoveryFileMinPageCount;
746    }
747
748    public long getFreePageCount() {
749        assertLoaded();
750        return freeList.rangeSize();
751    }
752
753    public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {
754        assertNotLoaded();
755        this.recoveryFileMinPageCount = recoveryFileMinPageCount;
756    }
757
758    public int getRecoveryFileMaxPageCount() {
759        return recoveryFileMaxPageCount;
760    }
761
762    public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) {
763        assertNotLoaded();
764        this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
765    }
766
767    public int getWriteBatchSize() {
768        return writeBatchSize;
769    }
770
771    public void setWriteBatchSize(int writeBatchSize) {
772        this.writeBatchSize = writeBatchSize;
773    }
774
775    public float getLFUEvictionFactor() {
776        return LFUEvictionFactor;
777    }
778
779    public void setLFUEvictionFactor(float LFUEvictionFactor) {
780        this.LFUEvictionFactor = LFUEvictionFactor;
781    }
782
783    public boolean isUseLFRUEviction() {
784        return useLFRUEviction;
785    }
786
787    public void setUseLFRUEviction(boolean useLFRUEviction) {
788        this.useLFRUEviction = useLFRUEviction;
789    }
790
791    ///////////////////////////////////////////////////////////////////
792    // Package Protected Methods exposed to Transaction
793    ///////////////////////////////////////////////////////////////////
794
795    /**
796     * @throws IllegalStateException if the page file is not loaded.
797     */
798    void assertLoaded() throws IllegalStateException {
799        if (!loaded.get()) {
800            throw new IllegalStateException("PageFile is not loaded");
801        }
802    }
803
804    void assertNotLoaded() throws IllegalStateException {
805        if (loaded.get()) {
806            throw new IllegalStateException("PageFile is loaded");
807        }
808    }
809
810    /**
811     * Allocates a block of free pages that you can write data to.
812     *
813     * @param count the number of sequential pages to allocate
814     * @return the first page of the sequential set.
815     * @throws IOException           If an disk error occurred.
816     * @throws IllegalStateException if the PageFile is not loaded
817     */
818    <T> Page<T> allocate(int count) throws IOException {
819        assertLoaded();
820        if (count <= 0) {
821            throw new IllegalArgumentException("The allocation count must be larger than zero");
822        }
823
824        Sequence seq = freeList.removeFirstSequence(count);
825
826        // We may need to create new free pages...
827        if (seq == null) {
828
829            Page<T> first = null;
830            int c = count;
831
832            // Perform the id's only once....
833            long pageId = nextFreePageId.getAndAdd(count);
834            long writeTxnId = nextTxid.getAndAdd(count);
835
836            while (c-- > 0) {
837                Page<T> page = new Page<T>(pageId++);
838                page.makeFree(writeTxnId++);
839
840                if (first == null) {
841                    first = page;
842                }
843
844                addToCache(page);
845                DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
846                page.write(out);
847                write(page, out.getData());
848
849                // LOG.debug("allocate writing: "+page.getPageId());
850            }
851
852            return first;
853        }
854
855        Page<T> page = new Page<T>(seq.getFirst());
856        page.makeFree(0);
857        // LOG.debug("allocated: "+page.getPageId());
858        return page;
859    }
860
861    long getNextWriteTransactionId() {
862        return nextTxid.incrementAndGet();
863    }
864
865    void readPage(long pageId, byte[] data) throws IOException {
866        readFile.seek(toOffset(pageId));
867        readFile.readFully(data);
868    }
869
870    public void freePage(long pageId) {
871        freeList.add(pageId);
872        removeFromCache(pageId);
873    }
874
875    @SuppressWarnings("unchecked")
876    private <T> void write(Page<T> page, byte[] data) throws IOException {
877        final PageWrite write = new PageWrite(page, data);
878        Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() {
879            public Long getKey() {
880                return write.getPage().getPageId();
881            }
882
883            public PageWrite getValue() {
884                return write;
885            }
886
887            public PageWrite setValue(PageWrite value) {
888                return null;
889            }
890        };
891        Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry};
892        write(Arrays.asList(entries));
893    }
894
895    void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
896        synchronized (writes) {
897            if (enabledWriteThread) {
898                while (writes.size() >= writeBatchSize && !stopWriter.get()) {
899                    try {
900                        writes.wait();
901                    } catch (InterruptedException e) {
902                        Thread.currentThread().interrupt();
903                        throw new InterruptedIOException();
904                    }
905                }
906            }
907
908            boolean longTx = false;
909
910            for (Map.Entry<Long, PageWrite> entry : updates) {
911                Long key = entry.getKey();
912                PageWrite value = entry.getValue();
913                PageWrite write = writes.get(key);
914                if (write == null) {
915                    writes.put(key, value);
916                } else {
917                    if (value.currentLocation != -1) {
918                        write.setCurrentLocation(value.page, value.currentLocation, value.length);
919                        write.tmpFile = value.tmpFile;
920                        longTx = true;
921                    } else {
922                        write.setCurrent(value.page, value.current);
923                    }
924                }
925            }
926
927            // Once we start approaching capacity, notify the writer to start writing
928            // sync immediately for long txs
929            if (longTx || canStartWriteBatch()) {
930
931                if (enabledWriteThread) {
932                    writes.notify();
933                } else {
934                    writeBatch();
935                }
936            }
937        }
938    }
939
940    private boolean canStartWriteBatch() {
941        int capacityUsed = ((writes.size() * 100) / writeBatchSize);
942        if (enabledWriteThread) {
943            // The constant 10 here controls how soon write batches start going to disk..
944            // would be nice to figure out how to auto tune that value.  Make to small and
945            // we reduce through put because we are locking the write mutex too often doing writes
946            return capacityUsed >= 10 || checkpointLatch != null;
947        } else {
948            return capacityUsed >= 80 || checkpointLatch != null;
949        }
950    }
951
952    ///////////////////////////////////////////////////////////////////
953    // Cache Related operations
954    ///////////////////////////////////////////////////////////////////
955    @SuppressWarnings("unchecked")
956    <T> Page<T> getFromCache(long pageId) {
957        synchronized (writes) {
958            PageWrite pageWrite = writes.get(pageId);
959            if (pageWrite != null) {
960                return pageWrite.page;
961            }
962        }
963
964        Page<T> result = null;
965        if (enablePageCaching) {
966            result = pageCache.get(pageId);
967        }
968        return result;
969    }
970
971    void addToCache(Page page) {
972        if (enablePageCaching) {
973            pageCache.put(page.getPageId(), page);
974        }
975    }
976
977    void removeFromCache(long pageId) {
978        if (enablePageCaching) {
979            pageCache.remove(pageId);
980        }
981    }
982
983    ///////////////////////////////////////////////////////////////////
984    // Internal Double write implementation follows...
985    ///////////////////////////////////////////////////////////////////
986
987    private void pollWrites() {
988        try {
989            while (!stopWriter.get()) {
990                // Wait for a notification...
991                synchronized (writes) {
992                    writes.notifyAll();
993
994                    // If there is not enough to write, wait for a notification...
995                    while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) {
996                        writes.wait(100);
997                    }
998
999                    if (writes.isEmpty()) {
1000                        releaseCheckpointWaiter();
1001                    }
1002                }
1003                writeBatch();
1004            }
1005        } catch (Throwable e) {
1006            LOG.info("An exception was raised while performing poll writes", e);
1007        } finally {
1008            releaseCheckpointWaiter();
1009        }
1010    }
1011
1012    private void writeBatch() throws IOException {
1013
1014        CountDownLatch checkpointLatch;
1015        ArrayList<PageWrite> batch;
1016        synchronized (writes) {
1017            // If there is not enough to write, wait for a notification...
1018
1019            batch = new ArrayList<PageWrite>(writes.size());
1020            // build a write batch from the current write cache.
1021            for (PageWrite write : writes.values()) {
1022                batch.add(write);
1023                // Move the current write to the diskBound write, this lets folks update the
1024                // page again without blocking for this write.
1025                write.begin();
1026                if (write.diskBound == null && write.diskBoundLocation == -1) {
1027                    batch.remove(write);
1028                }
1029            }
1030
1031            // Grab on to the existing checkpoint latch cause once we do this write we can
1032            // release the folks that were waiting for those writes to hit disk.
1033            checkpointLatch = this.checkpointLatch;
1034            this.checkpointLatch = null;
1035        }
1036
1037        Checksum checksum = new Adler32();
1038        if (enableRecoveryFile) {
1039            recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1040        }
1041        for (PageWrite w : batch) {
1042            if (enableRecoveryFile) {
1043                try {
1044                    checksum.update(w.getDiskBound(), 0, pageSize);
1045                } catch (Throwable t) {
1046                    throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
1047                }
1048                recoveryFile.writeLong(w.page.getPageId());
1049                recoveryFile.write(w.getDiskBound(), 0, pageSize);
1050            }
1051
1052            writeFile.seek(toOffset(w.page.getPageId()));
1053            writeFile.write(w.getDiskBound(), 0, pageSize);
1054            w.done();
1055        }
1056
1057        try {
1058            if (enableRecoveryFile) {
1059                // Can we shrink the recovery buffer??
1060                if (recoveryPageCount > recoveryFileMaxPageCount) {
1061                    int t = Math.max(recoveryFileMinPageCount, batch.size());
1062                    recoveryFile.setLength(recoveryFileSizeForPages(t));
1063                }
1064
1065                // Record the page writes in the recovery buffer.
1066                recoveryFile.seek(0);
1067                // Store the next tx id...
1068                recoveryFile.writeLong(nextTxid.get());
1069                // Store the checksum for thw write batch so that on recovery we
1070                // know if we have a consistent
1071                // write batch on disk.
1072                recoveryFile.writeLong(checksum.getValue());
1073                // Write the # of pages that will follow
1074                recoveryFile.writeInt(batch.size());
1075            }
1076
1077            if (enableDiskSyncs) {
1078                // Sync to make sure recovery buffer writes land on disk..
1079                if (enableRecoveryFile) {
1080                    recoveryFile.getFD().sync();
1081                }
1082                writeFile.getFD().sync();
1083            }
1084        } finally {
1085            synchronized (writes) {
1086                for (PageWrite w : batch) {
1087                    // If there are no more pending writes, then remove it from
1088                    // the write cache.
1089                    if (w.isDone()) {
1090                        writes.remove(w.page.getPageId());
1091                        if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
1092                            if (!w.tmpFile.delete()) {
1093                                throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
1094                            }
1095                            tmpFilesForRemoval.remove(w.tmpFile);
1096                        }
1097                    }
1098                }
1099            }
1100
1101            if (checkpointLatch != null) {
1102                checkpointLatch.countDown();
1103            }
1104        }
1105    }
1106
1107    public void removeTmpFile(File file) {
1108        tmpFilesForRemoval.add(file);
1109    }
1110
1111    private long recoveryFileSizeForPages(int pageCount) {
1112        return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8) * pageCount);
1113    }
1114
1115    private void releaseCheckpointWaiter() {
1116        if (checkpointLatch != null) {
1117            checkpointLatch.countDown();
1118            checkpointLatch = null;
1119        }
1120    }
1121
1122    /**
1123     * Inspects the recovery buffer and re-applies any
1124     * partially applied page writes.
1125     *
1126     * @return the next transaction id that can be used.
1127     */
1128    private long redoRecoveryUpdates() throws IOException {
1129        if (!enableRecoveryFile) {
1130            return 0;
1131        }
1132        recoveryPageCount = 0;
1133
1134        // Are we initializing the recovery file?
1135        if (recoveryFile.length() == 0) {
1136            // Write an empty header..
1137            recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
1138            // Preallocate the minium size for better performance.
1139            recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
1140            return 0;
1141        }
1142
1143        // How many recovery pages do we have in the recovery buffer?
1144        recoveryFile.seek(0);
1145        long nextTxId = recoveryFile.readLong();
1146        long expectedChecksum = recoveryFile.readLong();
1147        int pageCounter = recoveryFile.readInt();
1148
1149        recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1150        Checksum checksum = new Adler32();
1151        LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
1152        try {
1153            for (int i = 0; i < pageCounter; i++) {
1154                long offset = recoveryFile.readLong();
1155                byte[] data = new byte[pageSize];
1156                if (recoveryFile.read(data, 0, pageSize) != pageSize) {
1157                    // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
1158                    return nextTxId;
1159                }
1160                checksum.update(data, 0, pageSize);
1161                batch.put(offset, data);
1162            }
1163        } catch (Exception e) {
1164            // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it.
1165            // as the pages should still be consistent.
1166            LOG.debug("Redo buffer was not fully intact: ", e);
1167            return nextTxId;
1168        }
1169
1170        recoveryPageCount = pageCounter;
1171
1172        // If the checksum is not valid then the recovery buffer was partially written to disk.
1173        if (checksum.getValue() != expectedChecksum) {
1174            return nextTxId;
1175        }
1176
1177        // Re-apply all the writes in the recovery buffer.
1178        for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
1179            writeFile.seek(toOffset(e.getKey()));
1180            writeFile.write(e.getValue());
1181        }
1182
1183        // And sync it to disk
1184        writeFile.getFD().sync();
1185        return nextTxId;
1186    }
1187
1188    private void startWriter() {
1189        synchronized (writes) {
1190            if (enabledWriteThread) {
1191                stopWriter.set(false);
1192                writerThread = new Thread("KahaDB Page Writer") {
1193                    @Override
1194                    public void run() {
1195                        pollWrites();
1196                    }
1197                };
1198                writerThread.setPriority(Thread.MAX_PRIORITY);
1199                writerThread.setDaemon(true);
1200                writerThread.start();
1201            }
1202        }
1203    }
1204
1205    private void stopWriter() throws InterruptedException {
1206        if (enabledWriteThread) {
1207            stopWriter.set(true);
1208            writerThread.join();
1209        }
1210    }
1211
1212    public File getFile() {
1213        return getMainPageFile();
1214    }
1215
1216    public File getDirectory() {
1217        return directory;
1218    }
1219}