1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 package org.rocksdb; 7 8 import org.junit.Test; 9 10 import java.util.ArrayList; 11 import java.util.Arrays; 12 import java.util.List; 13 14 import static java.nio.charset.StandardCharsets.UTF_8; 15 import static org.assertj.core.api.Assertions.assertThat; 16 import static org.assertj.core.api.Assertions.fail; 17 18 public class OptimisticTransactionTest extends AbstractTransactionTest { 19 20 @Test 21 public void getForUpdate_cf_conflict() throws RocksDBException { 22 final byte k1[] = "key1".getBytes(UTF_8); 23 final byte v1[] = "value1".getBytes(UTF_8); 24 final byte v12[] = "value12".getBytes(UTF_8); 25 try(final DBContainer dbContainer = startDb(); 26 final ReadOptions readOptions = new ReadOptions()) { 27 final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily(); 28 29 try(final Transaction txn = dbContainer.beginTransaction()) { 30 txn.put(testCf, k1, v1); 31 assertThat(txn.get(testCf, readOptions, k1)).isEqualTo(v1); 32 txn.commit(); 33 } 34 35 try(final Transaction txn2 = dbContainer.beginTransaction()) { 36 try(final Transaction txn3 = dbContainer.beginTransaction()) { 37 assertThat(txn3.getForUpdate(readOptions, testCf, k1, true)).isEqualTo(v1); 38 39 // NOTE: txn2 updates k1, during txn3 40 txn2.put(testCf, k1, v12); 41 assertThat(txn2.get(testCf, readOptions, k1)).isEqualTo(v12); 42 txn2.commit(); 43 44 try { 45 txn3.commit(); // should cause an exception! 46 } catch(final RocksDBException e) { 47 assertThat(e.getStatus().getCode()).isSameAs(Status.Code.Busy); 48 return; 49 } 50 } 51 } 52 53 fail("Expected an exception for put after getForUpdate from conflicting" + 54 "transactions"); 55 } 56 } 57 58 @Test 59 public void getForUpdate_conflict() throws RocksDBException { 60 final byte k1[] = "key1".getBytes(UTF_8); 61 final byte v1[] = "value1".getBytes(UTF_8); 62 final byte v12[] = "value12".getBytes(UTF_8); 63 try(final DBContainer dbContainer = startDb(); 64 final ReadOptions readOptions = new ReadOptions()) { 65 66 try(final Transaction txn = dbContainer.beginTransaction()) { 67 txn.put(k1, v1); 68 assertThat(txn.get(readOptions, k1)).isEqualTo(v1); 69 txn.commit(); 70 } 71 72 try(final Transaction txn2 = dbContainer.beginTransaction()) { 73 try(final Transaction txn3 = dbContainer.beginTransaction()) { 74 assertThat(txn3.getForUpdate(readOptions, k1, true)).isEqualTo(v1); 75 76 // NOTE: txn2 updates k1, during txn3 77 txn2.put(k1, v12); 78 assertThat(txn2.get(readOptions, k1)).isEqualTo(v12); 79 txn2.commit(); 80 81 try { 82 txn3.commit(); // should cause an exception! 83 } catch(final RocksDBException e) { 84 assertThat(e.getStatus().getCode()).isSameAs(Status.Code.Busy); 85 return; 86 } 87 } 88 } 89 90 fail("Expected an exception for put after getForUpdate from conflicting" + 91 "transactions"); 92 } 93 } 94 95 @Test 96 public void multiGetForUpdate_cf_conflict() throws RocksDBException { 97 final byte keys[][] = new byte[][] { 98 "key1".getBytes(UTF_8), 99 "key2".getBytes(UTF_8)}; 100 final byte values[][] = new byte[][] { 101 "value1".getBytes(UTF_8), 102 "value2".getBytes(UTF_8)}; 103 final byte[] otherValue = "otherValue".getBytes(UTF_8); 104 105 try(final DBContainer dbContainer = startDb(); 106 final ReadOptions readOptions = new ReadOptions()) { 107 final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily(); 108 final List<ColumnFamilyHandle> cfList = Arrays.asList(testCf, testCf); 109 110 try(final Transaction txn = dbContainer.beginTransaction()) { 111 txn.put(testCf, keys[0], values[0]); 112 txn.put(testCf, keys[1], values[1]); 113 assertThat(txn.multiGet(readOptions, cfList, keys)).isEqualTo(values); 114 txn.commit(); 115 } 116 117 try(final Transaction txn2 = dbContainer.beginTransaction()) { 118 try(final Transaction txn3 = dbContainer.beginTransaction()) { 119 assertThat(txn3.multiGetForUpdate(readOptions, cfList, keys)) 120 .isEqualTo(values); 121 122 // NOTE: txn2 updates k1, during txn3 123 txn2.put(testCf, keys[0], otherValue); 124 assertThat(txn2.get(testCf, readOptions, keys[0])) 125 .isEqualTo(otherValue); 126 txn2.commit(); 127 128 try { 129 txn3.commit(); // should cause an exception! 130 } catch(final RocksDBException e) { 131 assertThat(e.getStatus().getCode()).isSameAs(Status.Code.Busy); 132 return; 133 } 134 } 135 } 136 137 fail("Expected an exception for put after getForUpdate from conflicting" + 138 "transactions"); 139 } 140 } 141 142 @Test 143 public void multiGetForUpdate_conflict() throws RocksDBException { 144 final byte keys[][] = new byte[][] { 145 "key1".getBytes(UTF_8), 146 "key2".getBytes(UTF_8)}; 147 final byte values[][] = new byte[][] { 148 "value1".getBytes(UTF_8), 149 "value2".getBytes(UTF_8)}; 150 final byte[] otherValue = "otherValue".getBytes(UTF_8); 151 152 try(final DBContainer dbContainer = startDb(); 153 final ReadOptions readOptions = new ReadOptions()) { 154 try(final Transaction txn = dbContainer.beginTransaction()) { 155 txn.put(keys[0], values[0]); 156 txn.put(keys[1], values[1]); 157 assertThat(txn.multiGet(readOptions, keys)).isEqualTo(values); 158 txn.commit(); 159 } 160 161 try(final Transaction txn2 = dbContainer.beginTransaction()) { 162 try(final Transaction txn3 = dbContainer.beginTransaction()) { 163 assertThat(txn3.multiGetForUpdate(readOptions, keys)) 164 .isEqualTo(values); 165 166 // NOTE: txn2 updates k1, during txn3 167 txn2.put(keys[0], otherValue); 168 assertThat(txn2.get(readOptions, keys[0])) 169 .isEqualTo(otherValue); 170 txn2.commit(); 171 172 try { 173 txn3.commit(); // should cause an exception! 174 } catch(final RocksDBException e) { 175 assertThat(e.getStatus().getCode()).isSameAs(Status.Code.Busy); 176 return; 177 } 178 } 179 } 180 181 fail("Expected an exception for put after getForUpdate from conflicting" + 182 "transactions"); 183 } 184 } 185 186 @Test 187 public void undoGetForUpdate_cf_conflict() throws RocksDBException { 188 final byte k1[] = "key1".getBytes(UTF_8); 189 final byte v1[] = "value1".getBytes(UTF_8); 190 final byte v12[] = "value12".getBytes(UTF_8); 191 try(final DBContainer dbContainer = startDb(); 192 final ReadOptions readOptions = new ReadOptions()) { 193 final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily(); 194 195 try(final Transaction txn = dbContainer.beginTransaction()) { 196 txn.put(testCf, k1, v1); 197 assertThat(txn.get(testCf, readOptions, k1)).isEqualTo(v1); 198 txn.commit(); 199 } 200 201 try(final Transaction txn2 = dbContainer.beginTransaction()) { 202 try(final Transaction txn3 = dbContainer.beginTransaction()) { 203 assertThat(txn3.getForUpdate(readOptions, testCf, k1, true)).isEqualTo(v1); 204 205 // undo the getForUpdate 206 txn3.undoGetForUpdate(testCf, k1); 207 208 // NOTE: txn2 updates k1, during txn3 209 txn2.put(testCf, k1, v12); 210 assertThat(txn2.get(testCf, readOptions, k1)).isEqualTo(v12); 211 txn2.commit(); 212 213 // should not cause an exception 214 // because we undid the getForUpdate above! 215 txn3.commit(); 216 } 217 } 218 } 219 } 220 221 @Test 222 public void undoGetForUpdate_conflict() throws RocksDBException { 223 final byte k1[] = "key1".getBytes(UTF_8); 224 final byte v1[] = "value1".getBytes(UTF_8); 225 final byte v12[] = "value12".getBytes(UTF_8); 226 try(final DBContainer dbContainer = startDb(); 227 final ReadOptions readOptions = new ReadOptions()) { 228 229 try(final Transaction txn = dbContainer.beginTransaction()) { 230 txn.put(k1, v1); 231 assertThat(txn.get(readOptions, k1)).isEqualTo(v1); 232 txn.commit(); 233 } 234 235 try(final Transaction txn2 = dbContainer.beginTransaction()) { 236 try(final Transaction txn3 = dbContainer.beginTransaction()) { 237 assertThat(txn3.getForUpdate(readOptions, k1, true)).isEqualTo(v1); 238 239 // undo the getForUpdate 240 txn3.undoGetForUpdate(k1); 241 242 // NOTE: txn2 updates k1, during txn3 243 txn2.put(k1, v12); 244 assertThat(txn2.get(readOptions, k1)).isEqualTo(v12); 245 txn2.commit(); 246 247 // should not cause an exception 248 // because we undid the getForUpdate above! 249 txn3.commit(); 250 } 251 } 252 } 253 } 254 255 @Test 256 public void name() throws RocksDBException { 257 try(final DBContainer dbContainer = startDb(); 258 final Transaction txn = dbContainer.beginTransaction()) { 259 assertThat(txn.getName()).isEmpty(); 260 final String name = "my-transaction-" + rand.nextLong(); 261 262 try { 263 txn.setName(name); 264 } catch(final RocksDBException e) { 265 assertThat(e.getStatus().getCode() == Status.Code.InvalidArgument); 266 return; 267 } 268 269 fail("Optimistic transactions cannot be named."); 270 } 271 } 272 273 @Override 274 public OptimisticTransactionDBContainer startDb() 275 throws RocksDBException { 276 final DBOptions options = new DBOptions() 277 .setCreateIfMissing(true) 278 .setCreateMissingColumnFamilies(true); 279 280 final ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions(); 281 final List<ColumnFamilyDescriptor> columnFamilyDescriptors = 282 Arrays.asList( 283 new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY), 284 new ColumnFamilyDescriptor(TXN_TEST_COLUMN_FAMILY, 285 columnFamilyOptions)); 286 final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(); 287 288 final OptimisticTransactionDB optimisticTxnDb; 289 try { 290 optimisticTxnDb = OptimisticTransactionDB.open( 291 options, dbFolder.getRoot().getAbsolutePath(), 292 columnFamilyDescriptors, columnFamilyHandles); 293 } catch(final RocksDBException e) { 294 columnFamilyOptions.close(); 295 options.close(); 296 throw e; 297 } 298 299 final WriteOptions writeOptions = new WriteOptions(); 300 final OptimisticTransactionOptions optimisticTxnOptions = 301 new OptimisticTransactionOptions(); 302 303 return new OptimisticTransactionDBContainer(optimisticTxnOptions, 304 writeOptions, columnFamilyHandles, optimisticTxnDb, columnFamilyOptions, 305 options); 306 } 307 308 private static class OptimisticTransactionDBContainer 309 extends DBContainer { 310 311 private final OptimisticTransactionOptions optimisticTxnOptions; 312 private final OptimisticTransactionDB optimisticTxnDb; 313 314 public OptimisticTransactionDBContainer( 315 final OptimisticTransactionOptions optimisticTxnOptions, 316 final WriteOptions writeOptions, 317 final List<ColumnFamilyHandle> columnFamilyHandles, 318 final OptimisticTransactionDB optimisticTxnDb, 319 final ColumnFamilyOptions columnFamilyOptions, 320 final DBOptions options) { 321 super(writeOptions, columnFamilyHandles, columnFamilyOptions, 322 options); 323 this.optimisticTxnOptions = optimisticTxnOptions; 324 this.optimisticTxnDb = optimisticTxnDb; 325 } 326 327 @Override 328 public Transaction beginTransaction() { 329 return optimisticTxnDb.beginTransaction(writeOptions, 330 optimisticTxnOptions); 331 } 332 333 @Override 334 public Transaction beginTransaction(final WriteOptions writeOptions) { 335 return optimisticTxnDb.beginTransaction(writeOptions, 336 optimisticTxnOptions); 337 } 338 339 @Override 340 public void close() { 341 optimisticTxnOptions.close(); 342 writeOptions.close(); 343 for(final ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { 344 columnFamilyHandle.close(); 345 } 346 optimisticTxnDb.close(); 347 options.close(); 348 } 349 } 350 } 351