1 #include <dmlc/json.h>
2 #include <dmlc/io.h>
3 #include <dmlc/memory_io.h>
4 #include <dmlc/concurrentqueue.h>
5 #include <dmlc/blockingconcurrentqueue.h>
6 #include <dmlc/thread_group.h>
7 #include <gtest/gtest.h>
8
9 template<typename TQueue>
10 struct LFQThreadData {
LFQThreadDataLFQThreadData11 LFQThreadData() : count_(0) {}
12 std::atomic<size_t> count_;
13 std::shared_ptr<TQueue> q_ = std::make_shared<TQueue>();
14 std::shared_ptr<dmlc::ManualEvent> ready_ = std::make_shared<dmlc::ManualEvent>();
15 std::mutex cs_map_;
16 std::set<int> thread_map_;
17 };
18
19 template<typename TQueue>
PushThread(const int id,std::shared_ptr<LFQThreadData<TQueue>> data)20 static int PushThread(const int id, std::shared_ptr<LFQThreadData<TQueue>> data) {
21 ++data->count_;
22 data->ready_->wait();
23 data->q_->enqueue(id);
24 std::unique_lock<std::mutex> lk(data->cs_map_);
25 data->thread_map_.erase(id);
26 return 0;
27 }
28
29 template<typename TQueue>
PullThread(const int id,std::shared_ptr<LFQThreadData<TQueue>> data)30 static int PullThread(const int id, std::shared_ptr<LFQThreadData<TQueue>> data) {
31 ++data->count_;
32 data->ready_->wait();
33 int val;
34 CHECK_EQ(data->q_->try_dequeue(val), true);
35 std::unique_lock<std::mutex> lk(data->cs_map_);
36 data->thread_map_.erase(id);
37 return 0;
38 }
39
40 template<typename TQueue>
BlockingPullThread(const int id,std::shared_ptr<LFQThreadData<TQueue>> data)41 static int BlockingPullThread(const int id, std::shared_ptr<LFQThreadData<TQueue>> data) {
42 ++data->count_;
43 data->ready_->wait();
44 int val;
45 data->q_->wait_dequeue(val);
46 std::unique_lock<std::mutex> lk(data->cs_map_);
47 data->thread_map_.erase(id);
48 return 0;
49 }
50
TName(const std::string & s,int x)51 static inline std::string TName(const std::string& s, int x) { return s + "-" + std::to_string(x); }
52
TEST(Lockfree,ConcurrentQueue)53 TEST(Lockfree, ConcurrentQueue) {
54 dmlc::ThreadGroup threads;
55 const size_t ITEM_COUNT = 100;
56 auto data = std::make_shared<LFQThreadData<dmlc::moodycamel::ConcurrentQueue<int>>>();
57 for(size_t x = 0; x < ITEM_COUNT; ++x) {
58 std::unique_lock<std::mutex> lk(data->cs_map_);
59 data->thread_map_.insert(x);
60 threads.create(TName("PushThread", x), true, PushThread<dmlc::moodycamel::ConcurrentQueue<int>>, x, data);
61 }
62 while(data->count_ < ITEM_COUNT) {
63 std::this_thread::sleep_for(std::chrono::milliseconds(1));
64 }
65 data->ready_->signal();
66 size_t remaining = ITEM_COUNT;
67 do {
68 std::this_thread::sleep_for(std::chrono::milliseconds(10));
69 std::unique_lock<std::mutex> lk(data->cs_map_);
70 remaining = data->thread_map_.size();
71 } while (remaining);
72
73 size_t count = data->q_->size_approx();
74 GTEST_ASSERT_EQ(count, ITEM_COUNT);
75
76 threads.join_all();
77 GTEST_ASSERT_EQ(threads.size(), 0U);
78
79 for(size_t x = 0; x < ITEM_COUNT; ++x) {
80 std::unique_lock<std::mutex> lk(data->cs_map_);
81 data->thread_map_.insert(x);
82 // Just to mix things up, don't auto-remove
83 threads.create(TName("PullThread", x), false, PullThread<dmlc::moodycamel::ConcurrentQueue<int>>, x, data);
84 }
85 data->ready_->signal();
86 threads.join_all();
87 GTEST_ASSERT_EQ(threads.size(), 0U);
88
89 count = data->q_->size_approx();
90 GTEST_ASSERT_EQ(count, 0UL);
91 }
92
TEST(Lockfree,BlockingConcurrentQueue)93 TEST(Lockfree, BlockingConcurrentQueue) {
94 using BlockingQueue = dmlc::moodycamel::BlockingConcurrentQueue<
95 int, dmlc::moodycamel::ConcurrentQueueDefaultTraits>;
96
97 using BlockingQueue = dmlc::moodycamel::BlockingConcurrentQueue<
98 int, dmlc::moodycamel::ConcurrentQueueDefaultTraits>;
99
100 dmlc::ThreadGroup threads;
101 const size_t ITEM_COUNT = 100;
102 auto data = std::make_shared<LFQThreadData<BlockingQueue>>();
103 for(size_t x = 0; x < ITEM_COUNT; ++x) {
104 std::unique_lock<std::mutex> lk(data->cs_map_);
105 data->thread_map_.insert(x);
106 // Just to mix things up, don't auto-remove
107 threads.create(TName("PushThread", x), false, PushThread<BlockingQueue>, x, data);
108 }
109 while(data->count_ < ITEM_COUNT) {
110 std::this_thread::sleep_for(std::chrono::milliseconds(1));
111 }
112 data->ready_->signal();
113 size_t remaining = ITEM_COUNT;
114 do {
115 std::this_thread::sleep_for(std::chrono::milliseconds(10));
116 std::unique_lock<std::mutex> lk(data->cs_map_);
117 remaining = data->thread_map_.size();
118 } while (remaining);
119
120 size_t count = data->q_->size_approx();
121 GTEST_ASSERT_EQ(count, ITEM_COUNT);
122
123 threads.join_all();
124 GTEST_ASSERT_EQ(threads.size(), 0U);
125
126 for(size_t x = 0; x < ITEM_COUNT; ++x) {
127 std::unique_lock<std::mutex> lk(data->cs_map_);
128 data->thread_map_.insert(static_cast<int>(x));
129 threads.create(TName("BlockingPullThread", x), true, BlockingPullThread<BlockingQueue>, x, data);
130 }
131 data->ready_->signal();
132 threads.join_all();
133 GTEST_ASSERT_EQ(threads.size(), 0U);
134
135 count = data->q_->size_approx();
136 GTEST_ASSERT_EQ(count, 0UL);
137 }
138
139