1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "util/concurrent_task_limiter_impl.h"
11 #include "rocksdb/concurrent_task_limiter.h"
12
13 namespace ROCKSDB_NAMESPACE {
14
ConcurrentTaskLimiterImpl(const std::string & name,int32_t max_outstanding_task)15 ConcurrentTaskLimiterImpl::ConcurrentTaskLimiterImpl(
16 const std::string& name, int32_t max_outstanding_task)
17 : name_(name),
18 max_outstanding_tasks_{max_outstanding_task},
19 outstanding_tasks_{0} {
20
21 }
22
~ConcurrentTaskLimiterImpl()23 ConcurrentTaskLimiterImpl::~ConcurrentTaskLimiterImpl() {
24 assert(outstanding_tasks_ == 0);
25 }
26
GetName() const27 const std::string& ConcurrentTaskLimiterImpl::GetName() const {
28 return name_;
29 }
30
SetMaxOutstandingTask(int32_t limit)31 void ConcurrentTaskLimiterImpl::SetMaxOutstandingTask(int32_t limit) {
32 max_outstanding_tasks_.store(limit, std::memory_order_relaxed);
33 }
34
ResetMaxOutstandingTask()35 void ConcurrentTaskLimiterImpl::ResetMaxOutstandingTask() {
36 max_outstanding_tasks_.store(-1, std::memory_order_relaxed);
37 }
38
GetOutstandingTask() const39 int32_t ConcurrentTaskLimiterImpl::GetOutstandingTask() const {
40 return outstanding_tasks_.load(std::memory_order_relaxed);
41 }
42
GetToken(bool force)43 std::unique_ptr<TaskLimiterToken> ConcurrentTaskLimiterImpl::GetToken(
44 bool force) {
45 int32_t limit = max_outstanding_tasks_.load(std::memory_order_relaxed);
46 int32_t tasks = outstanding_tasks_.load(std::memory_order_relaxed);
47 // force = true, bypass the throttle.
48 // limit < 0 means unlimited tasks.
49 while (force || limit < 0 || tasks < limit) {
50 if (outstanding_tasks_.compare_exchange_weak(tasks, tasks + 1)) {
51 return std::unique_ptr<TaskLimiterToken>(new TaskLimiterToken(this));
52 }
53 }
54 return nullptr;
55 }
56
NewConcurrentTaskLimiter(const std::string & name,int32_t limit)57 ConcurrentTaskLimiter* NewConcurrentTaskLimiter(
58 const std::string& name, int32_t limit) {
59 return new ConcurrentTaskLimiterImpl(name, limit);
60 }
61
~TaskLimiterToken()62 TaskLimiterToken::~TaskLimiterToken() {
63 --limiter_->outstanding_tasks_;
64 assert(limiter_->outstanding_tasks_ >= 0);
65 }
66
67 } // namespace ROCKSDB_NAMESPACE
68