1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "util/concurrent_task_limiter_impl.h"
11 #include "rocksdb/concurrent_task_limiter.h"
12 
13 namespace ROCKSDB_NAMESPACE {
14 
ConcurrentTaskLimiterImpl(const std::string & name,int32_t max_outstanding_task)15 ConcurrentTaskLimiterImpl::ConcurrentTaskLimiterImpl(
16     const std::string& name, int32_t max_outstanding_task)
17     : name_(name),
18       max_outstanding_tasks_{max_outstanding_task},
19       outstanding_tasks_{0} {
20 
21 }
22 
~ConcurrentTaskLimiterImpl()23 ConcurrentTaskLimiterImpl::~ConcurrentTaskLimiterImpl() {
24   assert(outstanding_tasks_ == 0);
25 }
26 
GetName() const27 const std::string& ConcurrentTaskLimiterImpl::GetName() const {
28   return name_;
29 }
30 
SetMaxOutstandingTask(int32_t limit)31 void ConcurrentTaskLimiterImpl::SetMaxOutstandingTask(int32_t limit) {
32   max_outstanding_tasks_.store(limit, std::memory_order_relaxed);
33 }
34 
ResetMaxOutstandingTask()35 void ConcurrentTaskLimiterImpl::ResetMaxOutstandingTask() {
36   max_outstanding_tasks_.store(-1, std::memory_order_relaxed);
37 }
38 
GetOutstandingTask() const39 int32_t ConcurrentTaskLimiterImpl::GetOutstandingTask() const {
40   return outstanding_tasks_.load(std::memory_order_relaxed);
41 }
42 
GetToken(bool force)43 std::unique_ptr<TaskLimiterToken> ConcurrentTaskLimiterImpl::GetToken(
44     bool force) {
45   int32_t limit = max_outstanding_tasks_.load(std::memory_order_relaxed);
46   int32_t tasks = outstanding_tasks_.load(std::memory_order_relaxed);
47   // force = true, bypass the throttle.
48   // limit < 0 means unlimited tasks.
49   while (force || limit < 0 || tasks < limit) {
50     if (outstanding_tasks_.compare_exchange_weak(tasks, tasks + 1)) {
51       return std::unique_ptr<TaskLimiterToken>(new TaskLimiterToken(this));
52     }
53   }
54   return nullptr;
55 }
56 
NewConcurrentTaskLimiter(const std::string & name,int32_t limit)57 ConcurrentTaskLimiter* NewConcurrentTaskLimiter(
58     const std::string& name, int32_t limit) {
59   return new ConcurrentTaskLimiterImpl(name, limit);
60 }
61 
~TaskLimiterToken()62 TaskLimiterToken::~TaskLimiterToken() {
63   --limiter_->outstanding_tasks_;
64   assert(limiter_->outstanding_tasks_ >= 0);
65 }
66 
67 }  // namespace ROCKSDB_NAMESPACE
68