1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 package org.rocksdb; 7 8 import org.junit.ClassRule; 9 import org.junit.Rule; 10 import org.junit.Test; 11 import org.junit.rules.TemporaryFolder; 12 13 import java.util.ArrayList; 14 import java.util.Arrays; 15 import java.util.List; 16 import java.util.Map; 17 18 import static org.assertj.core.api.Assertions.assertThat; 19 import static org.rocksdb.util.ByteUtil.bytes; 20 import static org.rocksdb.util.TestUtil.*; 21 22 public class WalFilterTest { 23 24 @ClassRule 25 public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE = 26 new RocksNativeLibraryResource(); 27 28 @Rule 29 public TemporaryFolder dbFolder = new TemporaryFolder(); 30 31 @Test walFilter()32 public void walFilter() throws RocksDBException { 33 // Create 3 batches with two keys each 34 final byte[][][] batchKeys = { 35 new byte[][] { 36 bytes("key1"), 37 bytes("key2") 38 }, 39 new byte[][] { 40 bytes("key3"), 41 bytes("key4") 42 }, 43 new byte[][] { 44 bytes("key5"), 45 bytes("key6") 46 } 47 48 }; 49 50 final List<ColumnFamilyDescriptor> cfDescriptors = Arrays.asList( 51 new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY), 52 new ColumnFamilyDescriptor(bytes("pikachu")) 53 ); 54 final List<ColumnFamilyHandle> cfHandles = new ArrayList<>(); 55 56 // Test with all WAL processing options 57 for (final WalProcessingOption option : WalProcessingOption.values()) { 58 try (final Options options = optionsForLogIterTest(); 59 final DBOptions dbOptions = new DBOptions(options) 60 .setCreateMissingColumnFamilies(true); 61 final RocksDB db = RocksDB.open(dbOptions, 62 dbFolder.getRoot().getAbsolutePath(), 63 cfDescriptors, cfHandles)) { 64 try (final WriteOptions writeOptions = new WriteOptions()) { 65 // Write given keys in given batches 66 for (int i = 0; i < batchKeys.length; i++) { 67 final WriteBatch batch = new WriteBatch(); 68 for (int j = 0; j < batchKeys[i].length; j++) { 69 batch.put(cfHandles.get(0), batchKeys[i][j], dummyString(1024)); 70 } 71 db.write(writeOptions, batch); 72 } 73 } finally { 74 for (final ColumnFamilyHandle cfHandle : cfHandles) { 75 cfHandle.close(); 76 } 77 cfHandles.clear(); 78 } 79 } 80 81 // Create a test filter that would apply wal_processing_option at the first 82 // record 83 final int applyOptionForRecordIndex = 1; 84 try (final TestableWalFilter walFilter = 85 new TestableWalFilter(option, applyOptionForRecordIndex)) { 86 87 try (final Options options = optionsForLogIterTest(); 88 final DBOptions dbOptions = new DBOptions(options) 89 .setWalFilter(walFilter)) { 90 91 try (final RocksDB db = RocksDB.open(dbOptions, 92 dbFolder.getRoot().getAbsolutePath(), 93 cfDescriptors, cfHandles)) { 94 95 try { 96 assertThat(walFilter.logNumbers).isNotEmpty(); 97 assertThat(walFilter.logFileNames).isNotEmpty(); 98 } finally { 99 for (final ColumnFamilyHandle cfHandle : cfHandles) { 100 cfHandle.close(); 101 } 102 cfHandles.clear(); 103 } 104 } catch (final RocksDBException e) { 105 if (option != WalProcessingOption.CORRUPTED_RECORD) { 106 // exception is expected when CORRUPTED_RECORD! 107 throw e; 108 } 109 } 110 } 111 } 112 } 113 } 114 115 116 private static class TestableWalFilter extends AbstractWalFilter { 117 private final WalProcessingOption walProcessingOption; 118 private final int applyOptionForRecordIndex; 119 Map<Integer, Long> cfLognumber; 120 Map<String, Integer> cfNameId; 121 final List<Long> logNumbers = new ArrayList<>(); 122 final List<String> logFileNames = new ArrayList<>(); 123 private int currentRecordIndex = 0; 124 TestableWalFilter(final WalProcessingOption walProcessingOption, final int applyOptionForRecordIndex)125 public TestableWalFilter(final WalProcessingOption walProcessingOption, 126 final int applyOptionForRecordIndex) { 127 super(); 128 this.walProcessingOption = walProcessingOption; 129 this.applyOptionForRecordIndex = applyOptionForRecordIndex; 130 } 131 132 @Override columnFamilyLogNumberMap(final Map<Integer, Long> cfLognumber, final Map<String, Integer> cfNameId)133 public void columnFamilyLogNumberMap(final Map<Integer, Long> cfLognumber, 134 final Map<String, Integer> cfNameId) { 135 this.cfLognumber = cfLognumber; 136 this.cfNameId = cfNameId; 137 } 138 139 @Override logRecordFound( final long logNumber, final String logFileName, final WriteBatch batch, final WriteBatch newBatch)140 public LogRecordFoundResult logRecordFound( 141 final long logNumber, final String logFileName, final WriteBatch batch, 142 final WriteBatch newBatch) { 143 144 logNumbers.add(logNumber); 145 logFileNames.add(logFileName); 146 147 final WalProcessingOption optionToReturn; 148 if (currentRecordIndex == applyOptionForRecordIndex) { 149 optionToReturn = walProcessingOption; 150 } 151 else { 152 optionToReturn = WalProcessingOption.CONTINUE_PROCESSING; 153 } 154 155 currentRecordIndex++; 156 157 return new LogRecordFoundResult(optionToReturn, false); 158 } 159 160 @Override name()161 public String name() { 162 return "testable-wal-filter"; 163 } 164 } 165 } 166