1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 package org.rocksdb;
7 
8 import org.junit.Test;
9 
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.List;
13 
14 import static java.nio.charset.StandardCharsets.UTF_8;
15 import static org.assertj.core.api.Assertions.assertThat;
16 import static org.assertj.core.api.Assertions.fail;
17 
18 public class OptimisticTransactionTest extends AbstractTransactionTest {
19 
20   @Test
21   public void getForUpdate_cf_conflict() throws RocksDBException {
22     final byte k1[] = "key1".getBytes(UTF_8);
23     final byte v1[] = "value1".getBytes(UTF_8);
24     final byte v12[] = "value12".getBytes(UTF_8);
25     try(final DBContainer dbContainer = startDb();
26         final ReadOptions readOptions = new ReadOptions()) {
27       final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily();
28 
29       try(final Transaction txn = dbContainer.beginTransaction()) {
30         txn.put(testCf, k1, v1);
31         assertThat(txn.get(testCf, readOptions, k1)).isEqualTo(v1);
32         txn.commit();
33       }
34 
35       try(final Transaction txn2 = dbContainer.beginTransaction()) {
36         try(final Transaction txn3 = dbContainer.beginTransaction()) {
37           assertThat(txn3.getForUpdate(readOptions, testCf, k1, true)).isEqualTo(v1);
38 
39           // NOTE: txn2 updates k1, during txn3
40           txn2.put(testCf, k1, v12);
41           assertThat(txn2.get(testCf, readOptions, k1)).isEqualTo(v12);
42           txn2.commit();
43 
44           try {
45             txn3.commit(); // should cause an exception!
46           } catch(final RocksDBException e) {
47             assertThat(e.getStatus().getCode()).isSameAs(Status.Code.Busy);
48             return;
49           }
50         }
51       }
52 
53       fail("Expected an exception for put after getForUpdate from conflicting" +
54           "transactions");
55     }
56   }
57 
58   @Test
59   public void getForUpdate_conflict() throws RocksDBException {
60     final byte k1[] = "key1".getBytes(UTF_8);
61     final byte v1[] = "value1".getBytes(UTF_8);
62     final byte v12[] = "value12".getBytes(UTF_8);
63     try(final DBContainer dbContainer = startDb();
64         final ReadOptions readOptions = new ReadOptions()) {
65 
66       try(final Transaction txn = dbContainer.beginTransaction()) {
67         txn.put(k1, v1);
68         assertThat(txn.get(readOptions, k1)).isEqualTo(v1);
69         txn.commit();
70       }
71 
72       try(final Transaction txn2 = dbContainer.beginTransaction()) {
73         try(final Transaction txn3 = dbContainer.beginTransaction()) {
74           assertThat(txn3.getForUpdate(readOptions, k1, true)).isEqualTo(v1);
75 
76           // NOTE: txn2 updates k1, during txn3
77           txn2.put(k1, v12);
78           assertThat(txn2.get(readOptions, k1)).isEqualTo(v12);
79           txn2.commit();
80 
81           try {
82             txn3.commit(); // should cause an exception!
83           } catch(final RocksDBException e) {
84             assertThat(e.getStatus().getCode()).isSameAs(Status.Code.Busy);
85             return;
86           }
87         }
88       }
89 
90       fail("Expected an exception for put after getForUpdate from conflicting" +
91           "transactions");
92     }
93   }
94 
95   @Test
96   public void multiGetForUpdate_cf_conflict() throws RocksDBException {
97     final byte keys[][] = new byte[][] {
98         "key1".getBytes(UTF_8),
99         "key2".getBytes(UTF_8)};
100     final byte values[][] = new byte[][] {
101         "value1".getBytes(UTF_8),
102         "value2".getBytes(UTF_8)};
103     final byte[] otherValue = "otherValue".getBytes(UTF_8);
104 
105     try(final DBContainer dbContainer = startDb();
106         final ReadOptions readOptions = new ReadOptions()) {
107       final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily();
108       final List<ColumnFamilyHandle> cfList = Arrays.asList(testCf, testCf);
109 
110       try(final Transaction txn = dbContainer.beginTransaction()) {
111         txn.put(testCf, keys[0], values[0]);
112         txn.put(testCf, keys[1], values[1]);
113         assertThat(txn.multiGet(readOptions, cfList, keys)).isEqualTo(values);
114         txn.commit();
115       }
116 
117       try(final Transaction txn2 = dbContainer.beginTransaction()) {
118         try(final Transaction txn3 = dbContainer.beginTransaction()) {
119           assertThat(txn3.multiGetForUpdate(readOptions, cfList, keys))
120               .isEqualTo(values);
121 
122           // NOTE: txn2 updates k1, during txn3
123           txn2.put(testCf, keys[0], otherValue);
124           assertThat(txn2.get(testCf, readOptions, keys[0]))
125               .isEqualTo(otherValue);
126           txn2.commit();
127 
128           try {
129             txn3.commit(); // should cause an exception!
130           } catch(final RocksDBException e) {
131             assertThat(e.getStatus().getCode()).isSameAs(Status.Code.Busy);
132             return;
133           }
134         }
135       }
136 
137       fail("Expected an exception for put after getForUpdate from conflicting" +
138           "transactions");
139     }
140   }
141 
142   @Test
143   public void multiGetForUpdate_conflict() throws RocksDBException {
144     final byte keys[][] = new byte[][] {
145         "key1".getBytes(UTF_8),
146         "key2".getBytes(UTF_8)};
147     final byte values[][] = new byte[][] {
148         "value1".getBytes(UTF_8),
149         "value2".getBytes(UTF_8)};
150     final byte[] otherValue = "otherValue".getBytes(UTF_8);
151 
152     try(final DBContainer dbContainer = startDb();
153         final ReadOptions readOptions = new ReadOptions()) {
154       try(final Transaction txn = dbContainer.beginTransaction()) {
155         txn.put(keys[0], values[0]);
156         txn.put(keys[1], values[1]);
157         assertThat(txn.multiGet(readOptions, keys)).isEqualTo(values);
158         txn.commit();
159       }
160 
161       try(final Transaction txn2 = dbContainer.beginTransaction()) {
162         try(final Transaction txn3 = dbContainer.beginTransaction()) {
163           assertThat(txn3.multiGetForUpdate(readOptions, keys))
164               .isEqualTo(values);
165 
166           // NOTE: txn2 updates k1, during txn3
167           txn2.put(keys[0], otherValue);
168           assertThat(txn2.get(readOptions, keys[0]))
169               .isEqualTo(otherValue);
170           txn2.commit();
171 
172           try {
173             txn3.commit(); // should cause an exception!
174           } catch(final RocksDBException e) {
175             assertThat(e.getStatus().getCode()).isSameAs(Status.Code.Busy);
176             return;
177           }
178         }
179       }
180 
181       fail("Expected an exception for put after getForUpdate from conflicting" +
182           "transactions");
183     }
184   }
185 
186   @Test
187   public void undoGetForUpdate_cf_conflict() throws RocksDBException {
188     final byte k1[] = "key1".getBytes(UTF_8);
189     final byte v1[] = "value1".getBytes(UTF_8);
190     final byte v12[] = "value12".getBytes(UTF_8);
191     try(final DBContainer dbContainer = startDb();
192         final ReadOptions readOptions = new ReadOptions()) {
193       final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily();
194 
195       try(final Transaction txn = dbContainer.beginTransaction()) {
196         txn.put(testCf, k1, v1);
197         assertThat(txn.get(testCf, readOptions, k1)).isEqualTo(v1);
198         txn.commit();
199       }
200 
201       try(final Transaction txn2 = dbContainer.beginTransaction()) {
202         try(final Transaction txn3 = dbContainer.beginTransaction()) {
203           assertThat(txn3.getForUpdate(readOptions, testCf, k1, true)).isEqualTo(v1);
204 
205           // undo the getForUpdate
206           txn3.undoGetForUpdate(testCf, k1);
207 
208           // NOTE: txn2 updates k1, during txn3
209           txn2.put(testCf, k1, v12);
210           assertThat(txn2.get(testCf, readOptions, k1)).isEqualTo(v12);
211           txn2.commit();
212 
213           // should not cause an exception
214           // because we undid the getForUpdate above!
215           txn3.commit();
216         }
217       }
218     }
219   }
220 
221   @Test
222   public void undoGetForUpdate_conflict() throws RocksDBException {
223     final byte k1[] = "key1".getBytes(UTF_8);
224     final byte v1[] = "value1".getBytes(UTF_8);
225     final byte v12[] = "value12".getBytes(UTF_8);
226     try(final DBContainer dbContainer = startDb();
227         final ReadOptions readOptions = new ReadOptions()) {
228 
229       try(final Transaction txn = dbContainer.beginTransaction()) {
230         txn.put(k1, v1);
231         assertThat(txn.get(readOptions, k1)).isEqualTo(v1);
232         txn.commit();
233       }
234 
235       try(final Transaction txn2 = dbContainer.beginTransaction()) {
236         try(final Transaction txn3 = dbContainer.beginTransaction()) {
237           assertThat(txn3.getForUpdate(readOptions, k1, true)).isEqualTo(v1);
238 
239           // undo the getForUpdate
240           txn3.undoGetForUpdate(k1);
241 
242           // NOTE: txn2 updates k1, during txn3
243           txn2.put(k1, v12);
244           assertThat(txn2.get(readOptions, k1)).isEqualTo(v12);
245           txn2.commit();
246 
247           // should not cause an exception
248           // because we undid the getForUpdate above!
249           txn3.commit();
250         }
251       }
252     }
253   }
254 
255   @Test
256   public void name() throws RocksDBException {
257     try(final DBContainer dbContainer = startDb();
258         final Transaction txn = dbContainer.beginTransaction()) {
259       assertThat(txn.getName()).isEmpty();
260       final String name = "my-transaction-" + rand.nextLong();
261 
262       try {
263         txn.setName(name);
264       } catch(final RocksDBException e) {
265          assertThat(e.getStatus().getCode() == Status.Code.InvalidArgument);
266         return;
267       }
268 
269       fail("Optimistic transactions cannot be named.");
270     }
271   }
272 
273   @Override
274   public OptimisticTransactionDBContainer startDb()
275       throws RocksDBException {
276     final DBOptions options = new DBOptions()
277         .setCreateIfMissing(true)
278         .setCreateMissingColumnFamilies(true);
279 
280     final ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions();
281     final List<ColumnFamilyDescriptor> columnFamilyDescriptors =
282         Arrays.asList(
283             new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY),
284             new ColumnFamilyDescriptor(TXN_TEST_COLUMN_FAMILY,
285                 columnFamilyOptions));
286     final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
287 
288     final OptimisticTransactionDB optimisticTxnDb;
289     try {
290       optimisticTxnDb = OptimisticTransactionDB.open(
291           options, dbFolder.getRoot().getAbsolutePath(),
292           columnFamilyDescriptors, columnFamilyHandles);
293     } catch(final RocksDBException e) {
294       columnFamilyOptions.close();
295       options.close();
296       throw e;
297     }
298 
299     final WriteOptions writeOptions = new WriteOptions();
300     final OptimisticTransactionOptions optimisticTxnOptions =
301              new OptimisticTransactionOptions();
302 
303     return new OptimisticTransactionDBContainer(optimisticTxnOptions,
304         writeOptions, columnFamilyHandles, optimisticTxnDb, columnFamilyOptions,
305         options);
306   }
307 
308   private static class OptimisticTransactionDBContainer
309       extends DBContainer {
310 
311     private final OptimisticTransactionOptions optimisticTxnOptions;
312     private final OptimisticTransactionDB optimisticTxnDb;
313 
314     public OptimisticTransactionDBContainer(
315         final OptimisticTransactionOptions optimisticTxnOptions,
316         final WriteOptions writeOptions,
317         final List<ColumnFamilyHandle> columnFamilyHandles,
318         final OptimisticTransactionDB optimisticTxnDb,
319         final ColumnFamilyOptions columnFamilyOptions,
320         final DBOptions options) {
321       super(writeOptions, columnFamilyHandles, columnFamilyOptions,
322           options);
323       this.optimisticTxnOptions = optimisticTxnOptions;
324       this.optimisticTxnDb = optimisticTxnDb;
325     }
326 
327     @Override
328     public Transaction beginTransaction() {
329       return optimisticTxnDb.beginTransaction(writeOptions,
330           optimisticTxnOptions);
331     }
332 
333     @Override
334     public Transaction beginTransaction(final WriteOptions writeOptions) {
335       return optimisticTxnDb.beginTransaction(writeOptions,
336           optimisticTxnOptions);
337     }
338 
339     @Override
340     public void close() {
341       optimisticTxnOptions.close();
342       writeOptions.close();
343       for(final ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
344         columnFamilyHandle.close();
345       }
346       optimisticTxnDb.close();
347       options.close();
348     }
349   }
350 }
351