1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // This file implements the callback "bridge" between Java and C++ for
7 // ROCKSDB_NAMESPACE::Comparator.
8 
9 #include "rocksjni/writebatchhandlerjnicallback.h"
10 #include "rocksjni/portal.h"
11 
12 namespace ROCKSDB_NAMESPACE {
WriteBatchHandlerJniCallback(JNIEnv * env,jobject jWriteBatchHandler)13 WriteBatchHandlerJniCallback::WriteBatchHandlerJniCallback(
14     JNIEnv* env, jobject jWriteBatchHandler)
15     : JniCallback(env, jWriteBatchHandler), m_env(env) {
16 
17   m_jPutCfMethodId = WriteBatchHandlerJni::getPutCfMethodId(env);
18   if(m_jPutCfMethodId == nullptr) {
19     // exception thrown
20     return;
21   }
22 
23   m_jPutMethodId = WriteBatchHandlerJni::getPutMethodId(env);
24   if(m_jPutMethodId == nullptr) {
25     // exception thrown
26     return;
27   }
28 
29   m_jMergeCfMethodId = WriteBatchHandlerJni::getMergeCfMethodId(env);
30   if(m_jMergeCfMethodId == nullptr) {
31     // exception thrown
32     return;
33   }
34 
35   m_jMergeMethodId = WriteBatchHandlerJni::getMergeMethodId(env);
36   if(m_jMergeMethodId == nullptr) {
37     // exception thrown
38     return;
39   }
40 
41   m_jDeleteCfMethodId = WriteBatchHandlerJni::getDeleteCfMethodId(env);
42   if(m_jDeleteCfMethodId == nullptr) {
43     // exception thrown
44     return;
45   }
46 
47   m_jDeleteMethodId = WriteBatchHandlerJni::getDeleteMethodId(env);
48   if(m_jDeleteMethodId == nullptr) {
49     // exception thrown
50     return;
51   }
52 
53   m_jSingleDeleteCfMethodId =
54       WriteBatchHandlerJni::getSingleDeleteCfMethodId(env);
55   if(m_jSingleDeleteCfMethodId == nullptr) {
56     // exception thrown
57     return;
58   }
59 
60   m_jSingleDeleteMethodId = WriteBatchHandlerJni::getSingleDeleteMethodId(env);
61   if(m_jSingleDeleteMethodId == nullptr) {
62     // exception thrown
63     return;
64   }
65 
66   m_jDeleteRangeCfMethodId =
67       WriteBatchHandlerJni::getDeleteRangeCfMethodId(env);
68   if (m_jDeleteRangeCfMethodId == nullptr) {
69     // exception thrown
70     return;
71   }
72 
73   m_jDeleteRangeMethodId = WriteBatchHandlerJni::getDeleteRangeMethodId(env);
74   if (m_jDeleteRangeMethodId == nullptr) {
75     // exception thrown
76     return;
77   }
78 
79   m_jLogDataMethodId = WriteBatchHandlerJni::getLogDataMethodId(env);
80   if(m_jLogDataMethodId == nullptr) {
81     // exception thrown
82     return;
83   }
84 
85   m_jPutBlobIndexCfMethodId =
86       WriteBatchHandlerJni::getPutBlobIndexCfMethodId(env);
87   if(m_jPutBlobIndexCfMethodId == nullptr) {
88     // exception thrown
89     return;
90   }
91 
92   m_jMarkBeginPrepareMethodId =
93       WriteBatchHandlerJni::getMarkBeginPrepareMethodId(env);
94   if(m_jMarkBeginPrepareMethodId == nullptr) {
95     // exception thrown
96     return;
97   }
98 
99   m_jMarkEndPrepareMethodId =
100       WriteBatchHandlerJni::getMarkEndPrepareMethodId(env);
101   if(m_jMarkEndPrepareMethodId == nullptr) {
102     // exception thrown
103     return;
104   }
105 
106   m_jMarkNoopMethodId = WriteBatchHandlerJni::getMarkNoopMethodId(env);
107   if(m_jMarkNoopMethodId == nullptr) {
108     // exception thrown
109     return;
110   }
111 
112   m_jMarkRollbackMethodId = WriteBatchHandlerJni::getMarkRollbackMethodId(env);
113   if(m_jMarkRollbackMethodId == nullptr) {
114     // exception thrown
115     return;
116   }
117 
118   m_jMarkCommitMethodId = WriteBatchHandlerJni::getMarkCommitMethodId(env);
119   if(m_jMarkCommitMethodId == nullptr) {
120     // exception thrown
121     return;
122   }
123 
124   m_jContinueMethodId = WriteBatchHandlerJni::getContinueMethodId(env);
125   if(m_jContinueMethodId == nullptr) {
126     // exception thrown
127     return;
128   }
129 }
130 
PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)131 ROCKSDB_NAMESPACE::Status WriteBatchHandlerJniCallback::PutCF(
132     uint32_t column_family_id, const Slice& key, const Slice& value) {
133   auto put = [this, column_family_id] (
134       jbyteArray j_key, jbyteArray j_value) {
135     m_env->CallVoidMethod(
136       m_jcallback_obj,
137       m_jPutCfMethodId,
138       static_cast<jint>(column_family_id),
139       j_key,
140       j_value);
141   };
142   auto status = WriteBatchHandlerJniCallback::kv_op(key, value, put);
143   if(status == nullptr) {
144     return ROCKSDB_NAMESPACE::Status::OK();  // TODO(AR) what to do if there is
145                                              // an Exception but we don't know
146                                              // the ROCKSDB_NAMESPACE::Status?
147   } else {
148     return ROCKSDB_NAMESPACE::Status(*status);
149   }
150 }
151 
Put(const Slice & key,const Slice & value)152 void WriteBatchHandlerJniCallback::Put(const Slice& key, const Slice& value) {
153   auto put = [this] (
154         jbyteArray j_key, jbyteArray j_value) {
155     m_env->CallVoidMethod(
156       m_jcallback_obj,
157       m_jPutMethodId,
158       j_key,
159       j_value);
160   };
161   WriteBatchHandlerJniCallback::kv_op(key, value, put);
162 }
163 
MergeCF(uint32_t column_family_id,const Slice & key,const Slice & value)164 ROCKSDB_NAMESPACE::Status WriteBatchHandlerJniCallback::MergeCF(
165     uint32_t column_family_id, const Slice& key, const Slice& value) {
166   auto merge = [this, column_family_id] (
167         jbyteArray j_key, jbyteArray j_value) {
168     m_env->CallVoidMethod(
169       m_jcallback_obj,
170       m_jMergeCfMethodId,
171       static_cast<jint>(column_family_id),
172       j_key,
173       j_value);
174   };
175   auto status = WriteBatchHandlerJniCallback::kv_op(key, value, merge);
176   if(status == nullptr) {
177     return ROCKSDB_NAMESPACE::Status::OK();  // TODO(AR) what to do if there is
178                                              // an Exception but we don't know
179                                              // the ROCKSDB_NAMESPACE::Status?
180   } else {
181     return ROCKSDB_NAMESPACE::Status(*status);
182   }
183 }
184 
Merge(const Slice & key,const Slice & value)185 void WriteBatchHandlerJniCallback::Merge(const Slice& key, const Slice& value) {
186   auto merge = [this] (
187         jbyteArray j_key, jbyteArray j_value) {
188     m_env->CallVoidMethod(
189       m_jcallback_obj,
190       m_jMergeMethodId,
191       j_key,
192       j_value);
193   };
194   WriteBatchHandlerJniCallback::kv_op(key, value, merge);
195 }
196 
DeleteCF(uint32_t column_family_id,const Slice & key)197 ROCKSDB_NAMESPACE::Status WriteBatchHandlerJniCallback::DeleteCF(
198     uint32_t column_family_id, const Slice& key) {
199   auto remove = [this, column_family_id] (jbyteArray j_key) {
200     m_env->CallVoidMethod(
201       m_jcallback_obj,
202       m_jDeleteCfMethodId,
203       static_cast<jint>(column_family_id),
204       j_key);
205   };
206   auto status = WriteBatchHandlerJniCallback::k_op(key, remove);
207   if(status == nullptr) {
208     return ROCKSDB_NAMESPACE::Status::OK();  // TODO(AR) what to do if there is
209                                              // an Exception but we don't know
210                                              // the ROCKSDB_NAMESPACE::Status?
211   } else {
212     return ROCKSDB_NAMESPACE::Status(*status);
213   }
214 }
215 
Delete(const Slice & key)216 void WriteBatchHandlerJniCallback::Delete(const Slice& key) {
217   auto remove = [this] (jbyteArray j_key) {
218     m_env->CallVoidMethod(
219       m_jcallback_obj,
220       m_jDeleteMethodId,
221       j_key);
222   };
223   WriteBatchHandlerJniCallback::k_op(key, remove);
224 }
225 
SingleDeleteCF(uint32_t column_family_id,const Slice & key)226 ROCKSDB_NAMESPACE::Status WriteBatchHandlerJniCallback::SingleDeleteCF(
227     uint32_t column_family_id, const Slice& key) {
228   auto singleDelete = [this, column_family_id] (jbyteArray j_key) {
229     m_env->CallVoidMethod(
230       m_jcallback_obj,
231       m_jSingleDeleteCfMethodId,
232       static_cast<jint>(column_family_id),
233       j_key);
234   };
235   auto status = WriteBatchHandlerJniCallback::k_op(key, singleDelete);
236   if(status == nullptr) {
237     return ROCKSDB_NAMESPACE::Status::OK();  // TODO(AR) what to do if there is
238                                              // an Exception but we don't know
239                                              // the ROCKSDB_NAMESPACE::Status?
240   } else {
241     return ROCKSDB_NAMESPACE::Status(*status);
242   }
243 }
244 
SingleDelete(const Slice & key)245 void WriteBatchHandlerJniCallback::SingleDelete(const Slice& key) {
246   auto singleDelete = [this] (jbyteArray j_key) {
247     m_env->CallVoidMethod(
248       m_jcallback_obj,
249       m_jSingleDeleteMethodId,
250       j_key);
251   };
252   WriteBatchHandlerJniCallback::k_op(key, singleDelete);
253 }
254 
DeleteRangeCF(uint32_t column_family_id,const Slice & beginKey,const Slice & endKey)255 ROCKSDB_NAMESPACE::Status WriteBatchHandlerJniCallback::DeleteRangeCF(
256     uint32_t column_family_id, const Slice& beginKey, const Slice& endKey) {
257   auto deleteRange = [this, column_family_id] (
258         jbyteArray j_beginKey, jbyteArray j_endKey) {
259     m_env->CallVoidMethod(
260       m_jcallback_obj,
261       m_jDeleteRangeCfMethodId,
262       static_cast<jint>(column_family_id),
263       j_beginKey,
264       j_endKey);
265   };
266   auto status = WriteBatchHandlerJniCallback::kv_op(beginKey, endKey, deleteRange);
267   if(status == nullptr) {
268     return ROCKSDB_NAMESPACE::Status::OK();  // TODO(AR) what to do if there is
269                                              // an Exception but we don't know
270                                              // the ROCKSDB_NAMESPACE::Status?
271   } else {
272     return ROCKSDB_NAMESPACE::Status(*status);
273   }
274 }
275 
DeleteRange(const Slice & beginKey,const Slice & endKey)276 void WriteBatchHandlerJniCallback::DeleteRange(const Slice& beginKey,
277     const Slice& endKey) {
278   auto deleteRange = [this] (
279         jbyteArray j_beginKey, jbyteArray j_endKey) {
280     m_env->CallVoidMethod(
281       m_jcallback_obj,
282       m_jDeleteRangeMethodId,
283       j_beginKey,
284       j_endKey);
285   };
286   WriteBatchHandlerJniCallback::kv_op(beginKey, endKey, deleteRange);
287 }
288 
LogData(const Slice & blob)289 void WriteBatchHandlerJniCallback::LogData(const Slice& blob) {
290   auto logData = [this] (jbyteArray j_blob) {
291     m_env->CallVoidMethod(
292       m_jcallback_obj,
293       m_jLogDataMethodId,
294       j_blob);
295   };
296   WriteBatchHandlerJniCallback::k_op(blob, logData);
297 }
298 
PutBlobIndexCF(uint32_t column_family_id,const Slice & key,const Slice & value)299 ROCKSDB_NAMESPACE::Status WriteBatchHandlerJniCallback::PutBlobIndexCF(
300     uint32_t column_family_id, const Slice& key, const Slice& value) {
301   auto putBlobIndex = [this, column_family_id] (
302       jbyteArray j_key, jbyteArray j_value) {
303     m_env->CallVoidMethod(
304       m_jcallback_obj,
305       m_jPutBlobIndexCfMethodId,
306       static_cast<jint>(column_family_id),
307       j_key,
308       j_value);
309   };
310   auto status = WriteBatchHandlerJniCallback::kv_op(key, value, putBlobIndex);
311   if(status == nullptr) {
312     return ROCKSDB_NAMESPACE::Status::OK();  // TODO(AR) what to do if there is
313                                              // an Exception but we don't know
314                                              // the ROCKSDB_NAMESPACE::Status?
315   } else {
316     return ROCKSDB_NAMESPACE::Status(*status);
317   }
318 }
319 
MarkBeginPrepare(bool unprepare)320 ROCKSDB_NAMESPACE::Status WriteBatchHandlerJniCallback::MarkBeginPrepare(
321     bool unprepare) {
322 #ifndef DEBUG
323   (void) unprepare;
324 #else
325   assert(!unprepare);
326 #endif
327   m_env->CallVoidMethod(m_jcallback_obj, m_jMarkBeginPrepareMethodId);
328 
329   // check for Exception, in-particular RocksDBException
330   if (m_env->ExceptionCheck()) {
331     // exception thrown
332     jthrowable exception = m_env->ExceptionOccurred();
333     std::unique_ptr<ROCKSDB_NAMESPACE::Status> status =
334         ROCKSDB_NAMESPACE::RocksDBExceptionJni::toCppStatus(m_env, exception);
335     if (status == nullptr) {
336       // unkown status or exception occurred extracting status
337       m_env->ExceptionDescribe();
338       return ROCKSDB_NAMESPACE::Status::OK();  // TODO(AR) probably need a
339                                                // better error code here
340 
341     } else {
342       m_env->ExceptionClear();  // clear the exception, as we have extracted the status
343       return ROCKSDB_NAMESPACE::Status(*status);
344     }
345   }
346 
347   return ROCKSDB_NAMESPACE::Status::OK();
348 }
349 
MarkEndPrepare(const Slice & xid)350 ROCKSDB_NAMESPACE::Status WriteBatchHandlerJniCallback::MarkEndPrepare(
351     const Slice& xid) {
352   auto markEndPrepare = [this] (
353       jbyteArray j_xid) {
354     m_env->CallVoidMethod(
355       m_jcallback_obj,
356       m_jMarkEndPrepareMethodId,
357       j_xid);
358   };
359   auto status = WriteBatchHandlerJniCallback::k_op(xid, markEndPrepare);
360   if(status == nullptr) {
361     return ROCKSDB_NAMESPACE::Status::OK();  // TODO(AR) what to do if there is
362                                              // an Exception but we don't know
363                                              // the ROCKSDB_NAMESPACE::Status?
364   } else {
365     return ROCKSDB_NAMESPACE::Status(*status);
366   }
367 }
368 
MarkNoop(bool empty_batch)369 ROCKSDB_NAMESPACE::Status WriteBatchHandlerJniCallback::MarkNoop(
370     bool empty_batch) {
371   m_env->CallVoidMethod(m_jcallback_obj, m_jMarkNoopMethodId, static_cast<jboolean>(empty_batch));
372 
373   // check for Exception, in-particular RocksDBException
374   if (m_env->ExceptionCheck()) {
375     // exception thrown
376     jthrowable exception = m_env->ExceptionOccurred();
377     std::unique_ptr<ROCKSDB_NAMESPACE::Status> status =
378         ROCKSDB_NAMESPACE::RocksDBExceptionJni::toCppStatus(m_env, exception);
379     if (status == nullptr) {
380       // unkown status or exception occurred extracting status
381       m_env->ExceptionDescribe();
382       return ROCKSDB_NAMESPACE::Status::OK();  // TODO(AR) probably need a
383                                                // better error code here
384 
385     } else {
386       m_env->ExceptionClear();  // clear the exception, as we have extracted the status
387       return ROCKSDB_NAMESPACE::Status(*status);
388     }
389   }
390 
391   return ROCKSDB_NAMESPACE::Status::OK();
392 }
393 
MarkRollback(const Slice & xid)394 ROCKSDB_NAMESPACE::Status WriteBatchHandlerJniCallback::MarkRollback(
395     const Slice& xid) {
396   auto markRollback = [this] (
397       jbyteArray j_xid) {
398     m_env->CallVoidMethod(
399       m_jcallback_obj,
400       m_jMarkRollbackMethodId,
401       j_xid);
402   };
403   auto status = WriteBatchHandlerJniCallback::k_op(xid, markRollback);
404   if(status == nullptr) {
405     return ROCKSDB_NAMESPACE::Status::OK();  // TODO(AR) what to do if there is
406                                              // an Exception but we don't know
407                                              // the ROCKSDB_NAMESPACE::Status?
408   } else {
409     return ROCKSDB_NAMESPACE::Status(*status);
410   }
411 }
412 
MarkCommit(const Slice & xid)413 ROCKSDB_NAMESPACE::Status WriteBatchHandlerJniCallback::MarkCommit(
414     const Slice& xid) {
415   auto markCommit = [this] (
416       jbyteArray j_xid) {
417     m_env->CallVoidMethod(
418       m_jcallback_obj,
419       m_jMarkCommitMethodId,
420       j_xid);
421   };
422   auto status = WriteBatchHandlerJniCallback::k_op(xid, markCommit);
423   if(status == nullptr) {
424     return ROCKSDB_NAMESPACE::Status::OK();  // TODO(AR) what to do if there is
425                                              // an Exception but we don't know
426                                              // the ROCKSDB_NAMESPACE::Status?
427   } else {
428     return ROCKSDB_NAMESPACE::Status(*status);
429   }
430 }
431 
Continue()432 bool WriteBatchHandlerJniCallback::Continue() {
433   jboolean jContinue = m_env->CallBooleanMethod(
434       m_jcallback_obj,
435       m_jContinueMethodId);
436   if(m_env->ExceptionCheck()) {
437     // exception thrown
438     m_env->ExceptionDescribe();
439   }
440 
441   return static_cast<bool>(jContinue == JNI_TRUE);
442 }
443 
kv_op(const Slice & key,const Slice & value,std::function<void (jbyteArray,jbyteArray)> kvFn)444 std::unique_ptr<ROCKSDB_NAMESPACE::Status> WriteBatchHandlerJniCallback::kv_op(
445     const Slice& key, const Slice& value,
446     std::function<void(jbyteArray, jbyteArray)> kvFn) {
447   const jbyteArray j_key = JniUtil::copyBytes(m_env, key);
448   if (j_key == nullptr) {
449     // exception thrown
450     if (m_env->ExceptionCheck()) {
451       m_env->ExceptionDescribe();
452     }
453     return nullptr;
454   }
455 
456   const jbyteArray j_value = JniUtil::copyBytes(m_env, value);
457   if (j_value == nullptr) {
458     // exception thrown
459     if (m_env->ExceptionCheck()) {
460       m_env->ExceptionDescribe();
461     }
462     if (j_key != nullptr) {
463       m_env->DeleteLocalRef(j_key);
464     }
465     return nullptr;
466   }
467 
468   kvFn(j_key, j_value);
469 
470   // check for Exception, in-particular RocksDBException
471   if (m_env->ExceptionCheck()) {
472     if (j_value != nullptr) {
473       m_env->DeleteLocalRef(j_value);
474     }
475     if (j_key != nullptr) {
476       m_env->DeleteLocalRef(j_key);
477     }
478 
479     // exception thrown
480     jthrowable exception = m_env->ExceptionOccurred();
481     std::unique_ptr<ROCKSDB_NAMESPACE::Status> status =
482         ROCKSDB_NAMESPACE::RocksDBExceptionJni::toCppStatus(m_env, exception);
483     if (status == nullptr) {
484       // unkown status or exception occurred extracting status
485       m_env->ExceptionDescribe();
486       return nullptr;
487 
488     } else {
489       m_env->ExceptionClear();  // clear the exception, as we have extracted the status
490       return status;
491     }
492   }
493 
494   if (j_value != nullptr) {
495     m_env->DeleteLocalRef(j_value);
496   }
497   if (j_key != nullptr) {
498     m_env->DeleteLocalRef(j_key);
499   }
500 
501   // all OK
502   return std::unique_ptr<ROCKSDB_NAMESPACE::Status>(
503       new ROCKSDB_NAMESPACE::Status(ROCKSDB_NAMESPACE::Status::OK()));
504 }
505 
k_op(const Slice & key,std::function<void (jbyteArray)> kFn)506 std::unique_ptr<ROCKSDB_NAMESPACE::Status> WriteBatchHandlerJniCallback::k_op(
507     const Slice& key, std::function<void(jbyteArray)> kFn) {
508   const jbyteArray j_key = JniUtil::copyBytes(m_env, key);
509   if (j_key == nullptr) {
510     // exception thrown
511     if (m_env->ExceptionCheck()) {
512       m_env->ExceptionDescribe();
513     }
514     return nullptr;
515   }
516 
517   kFn(j_key);
518 
519   // check for Exception, in-particular RocksDBException
520   if (m_env->ExceptionCheck()) {
521     if (j_key != nullptr) {
522       m_env->DeleteLocalRef(j_key);
523     }
524 
525     // exception thrown
526     jthrowable exception = m_env->ExceptionOccurred();
527     std::unique_ptr<ROCKSDB_NAMESPACE::Status> status =
528         ROCKSDB_NAMESPACE::RocksDBExceptionJni::toCppStatus(m_env, exception);
529     if (status == nullptr) {
530       // unkown status or exception occurred extracting status
531       m_env->ExceptionDescribe();
532       return nullptr;
533 
534     } else {
535       m_env->ExceptionClear();  // clear the exception, as we have extracted the status
536       return status;
537     }
538   }
539 
540   if (j_key != nullptr) {
541     m_env->DeleteLocalRef(j_key);
542   }
543 
544   // all OK
545   return std::unique_ptr<ROCKSDB_NAMESPACE::Status>(
546       new ROCKSDB_NAMESPACE::Status(ROCKSDB_NAMESPACE::Status::OK()));
547 }
548 }  // namespace ROCKSDB_NAMESPACE
549