1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 package org.rocksdb;
7 
8 import org.junit.ClassRule;
9 import org.junit.Rule;
10 import org.junit.Test;
11 import org.junit.rules.TemporaryFolder;
12 
13 import java.util.ArrayList;
14 import java.util.Arrays;
15 import java.util.List;
16 import java.util.Map;
17 
18 import static org.assertj.core.api.Assertions.assertThat;
19 import static org.rocksdb.util.ByteUtil.bytes;
20 import static org.rocksdb.util.TestUtil.*;
21 
22 public class WalFilterTest {
23 
24   @ClassRule
25   public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE =
26       new RocksNativeLibraryResource();
27 
28   @Rule
29   public TemporaryFolder dbFolder = new TemporaryFolder();
30 
31   @Test
walFilter()32   public void walFilter() throws RocksDBException {
33     // Create 3 batches with two keys each
34     final byte[][][] batchKeys = {
35         new byte[][] {
36             bytes("key1"),
37             bytes("key2")
38         },
39         new byte[][] {
40             bytes("key3"),
41             bytes("key4")
42         },
43         new byte[][] {
44             bytes("key5"),
45             bytes("key6")
46         }
47 
48     };
49 
50     final List<ColumnFamilyDescriptor> cfDescriptors = Arrays.asList(
51         new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY),
52         new ColumnFamilyDescriptor(bytes("pikachu"))
53     );
54     final List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
55 
56     // Test with all WAL processing options
57     for (final WalProcessingOption option : WalProcessingOption.values()) {
58       try (final Options options = optionsForLogIterTest();
59            final DBOptions dbOptions = new DBOptions(options)
60                .setCreateMissingColumnFamilies(true);
61            final RocksDB db = RocksDB.open(dbOptions,
62                dbFolder.getRoot().getAbsolutePath(),
63                 cfDescriptors, cfHandles)) {
64         try (final WriteOptions writeOptions = new WriteOptions()) {
65           // Write given keys in given batches
66           for (int i = 0; i < batchKeys.length; i++) {
67             final WriteBatch batch = new WriteBatch();
68             for (int j = 0; j < batchKeys[i].length; j++) {
69               batch.put(cfHandles.get(0), batchKeys[i][j], dummyString(1024));
70             }
71             db.write(writeOptions, batch);
72           }
73         } finally {
74           for (final ColumnFamilyHandle cfHandle : cfHandles) {
75             cfHandle.close();
76           }
77           cfHandles.clear();
78         }
79       }
80 
81       // Create a test filter that would apply wal_processing_option at the first
82       // record
83       final int applyOptionForRecordIndex = 1;
84       try (final TestableWalFilter walFilter =
85                new TestableWalFilter(option, applyOptionForRecordIndex)) {
86 
87         try (final Options options = optionsForLogIterTest();
88              final DBOptions dbOptions = new DBOptions(options)
89                 .setWalFilter(walFilter)) {
90 
91           try (final RocksDB db = RocksDB.open(dbOptions,
92               dbFolder.getRoot().getAbsolutePath(),
93               cfDescriptors, cfHandles)) {
94 
95             try {
96               assertThat(walFilter.logNumbers).isNotEmpty();
97               assertThat(walFilter.logFileNames).isNotEmpty();
98             } finally {
99               for (final ColumnFamilyHandle cfHandle : cfHandles) {
100                 cfHandle.close();
101               }
102               cfHandles.clear();
103             }
104           } catch (final RocksDBException e) {
105             if (option != WalProcessingOption.CORRUPTED_RECORD) {
106               // exception is expected when CORRUPTED_RECORD!
107               throw e;
108             }
109           }
110         }
111       }
112     }
113   }
114 
115 
116   private static class TestableWalFilter extends AbstractWalFilter {
117     private final WalProcessingOption walProcessingOption;
118     private final int applyOptionForRecordIndex;
119     Map<Integer, Long> cfLognumber;
120     Map<String, Integer> cfNameId;
121     final List<Long> logNumbers = new ArrayList<>();
122     final List<String> logFileNames = new ArrayList<>();
123     private int currentRecordIndex = 0;
124 
TestableWalFilter(final WalProcessingOption walProcessingOption, final int applyOptionForRecordIndex)125     public TestableWalFilter(final WalProcessingOption walProcessingOption,
126         final int applyOptionForRecordIndex) {
127       super();
128       this.walProcessingOption = walProcessingOption;
129       this.applyOptionForRecordIndex = applyOptionForRecordIndex;
130     }
131 
132     @Override
columnFamilyLogNumberMap(final Map<Integer, Long> cfLognumber, final Map<String, Integer> cfNameId)133     public void columnFamilyLogNumberMap(final Map<Integer, Long> cfLognumber,
134         final Map<String, Integer> cfNameId) {
135       this.cfLognumber = cfLognumber;
136       this.cfNameId = cfNameId;
137     }
138 
139     @Override
logRecordFound( final long logNumber, final String logFileName, final WriteBatch batch, final WriteBatch newBatch)140     public LogRecordFoundResult logRecordFound(
141         final long logNumber, final String logFileName, final WriteBatch batch,
142         final WriteBatch newBatch) {
143 
144       logNumbers.add(logNumber);
145       logFileNames.add(logFileName);
146 
147       final WalProcessingOption optionToReturn;
148       if (currentRecordIndex == applyOptionForRecordIndex) {
149         optionToReturn = walProcessingOption;
150       }
151       else {
152         optionToReturn = WalProcessingOption.CONTINUE_PROCESSING;
153       }
154 
155       currentRecordIndex++;
156 
157       return new LogRecordFoundResult(optionToReturn, false);
158     }
159 
160     @Override
name()161     public String name() {
162       return "testable-wal-filter";
163     }
164   }
165 }
166