1 #include <dmlc/json.h>
2 #include <dmlc/io.h>
3 #include <dmlc/memory_io.h>
4 #include <dmlc/concurrentqueue.h>
5 #include <dmlc/blockingconcurrentqueue.h>
6 #include <dmlc/thread_group.h>
7 #include <gtest/gtest.h>
8 
9 template<typename TQueue>
10 struct LFQThreadData {
LFQThreadDataLFQThreadData11   LFQThreadData() : count_(0) {}
12   std::atomic<size_t> count_;
13   std::shared_ptr<TQueue> q_ = std::make_shared<TQueue>();
14   std::shared_ptr<dmlc::ManualEvent> ready_ = std::make_shared<dmlc::ManualEvent>();
15   std::mutex cs_map_;
16   std::set<int> thread_map_;
17 };
18 
19 template<typename TQueue>
PushThread(const int id,std::shared_ptr<LFQThreadData<TQueue>> data)20 static int PushThread(const int id, std::shared_ptr<LFQThreadData<TQueue>> data) {
21   ++data->count_;
22   data->ready_->wait();
23   data->q_->enqueue(id);
24   std::unique_lock<std::mutex> lk(data->cs_map_);
25   data->thread_map_.erase(id);
26   return 0;
27 }
28 
29 template<typename TQueue>
PullThread(const int id,std::shared_ptr<LFQThreadData<TQueue>> data)30 static int PullThread(const int id, std::shared_ptr<LFQThreadData<TQueue>> data) {
31   ++data->count_;
32   data->ready_->wait();
33   int val;
34   CHECK_EQ(data->q_->try_dequeue(val), true);
35   std::unique_lock<std::mutex> lk(data->cs_map_);
36   data->thread_map_.erase(id);
37   return 0;
38 }
39 
40 template<typename TQueue>
BlockingPullThread(const int id,std::shared_ptr<LFQThreadData<TQueue>> data)41 static int BlockingPullThread(const int id, std::shared_ptr<LFQThreadData<TQueue>> data) {
42   ++data->count_;
43   data->ready_->wait();
44   int val;
45   data->q_->wait_dequeue(val);
46   std::unique_lock<std::mutex> lk(data->cs_map_);
47   data->thread_map_.erase(id);
48   return 0;
49 }
50 
TName(const std::string & s,int x)51 static inline std::string TName(const std::string& s, int x) { return s + "-" + std::to_string(x); }
52 
TEST(Lockfree,ConcurrentQueue)53 TEST(Lockfree, ConcurrentQueue) {
54   dmlc::ThreadGroup threads;
55   const size_t ITEM_COUNT = 100;
56   auto data = std::make_shared<LFQThreadData<dmlc::moodycamel::ConcurrentQueue<int>>>();
57   for(size_t x = 0; x < ITEM_COUNT; ++x) {
58     std::unique_lock<std::mutex> lk(data->cs_map_);
59     data->thread_map_.insert(x);
60     threads.create(TName("PushThread", x), true, PushThread<dmlc::moodycamel::ConcurrentQueue<int>>, x, data);
61   }
62   while(data->count_ < ITEM_COUNT) {
63     std::this_thread::sleep_for(std::chrono::milliseconds(1));
64   }
65   data->ready_->signal();
66   size_t remaining = ITEM_COUNT;
67   do {
68     std::this_thread::sleep_for(std::chrono::milliseconds(10));
69     std::unique_lock<std::mutex> lk(data->cs_map_);
70     remaining = data->thread_map_.size();
71   } while (remaining);
72 
73   size_t count = data->q_->size_approx();
74   GTEST_ASSERT_EQ(count, ITEM_COUNT);
75 
76   threads.join_all();
77   GTEST_ASSERT_EQ(threads.size(), 0U);
78 
79   for(size_t x = 0; x < ITEM_COUNT; ++x) {
80     std::unique_lock<std::mutex> lk(data->cs_map_);
81     data->thread_map_.insert(x);
82     // Just to mix things up, don't auto-remove
83     threads.create(TName("PullThread", x), false, PullThread<dmlc::moodycamel::ConcurrentQueue<int>>, x, data);
84   }
85   data->ready_->signal();
86   threads.join_all();
87   GTEST_ASSERT_EQ(threads.size(), 0U);
88 
89   count = data->q_->size_approx();
90   GTEST_ASSERT_EQ(count, 0UL);
91 }
92 
TEST(Lockfree,BlockingConcurrentQueue)93 TEST(Lockfree, BlockingConcurrentQueue) {
94   using BlockingQueue = dmlc::moodycamel::BlockingConcurrentQueue<
95     int, dmlc::moodycamel::ConcurrentQueueDefaultTraits>;
96 
97   using BlockingQueue = dmlc::moodycamel::BlockingConcurrentQueue<
98     int, dmlc::moodycamel::ConcurrentQueueDefaultTraits>;
99 
100   dmlc::ThreadGroup threads;
101   const size_t ITEM_COUNT = 100;
102   auto data = std::make_shared<LFQThreadData<BlockingQueue>>();
103   for(size_t x = 0; x < ITEM_COUNT; ++x) {
104     std::unique_lock<std::mutex> lk(data->cs_map_);
105     data->thread_map_.insert(x);
106     // Just to mix things up, don't auto-remove
107     threads.create(TName("PushThread", x), false, PushThread<BlockingQueue>, x, data);
108   }
109   while(data->count_ < ITEM_COUNT) {
110     std::this_thread::sleep_for(std::chrono::milliseconds(1));
111   }
112   data->ready_->signal();
113   size_t remaining = ITEM_COUNT;
114   do {
115     std::this_thread::sleep_for(std::chrono::milliseconds(10));
116     std::unique_lock<std::mutex> lk(data->cs_map_);
117     remaining = data->thread_map_.size();
118   } while (remaining);
119 
120   size_t count = data->q_->size_approx();
121   GTEST_ASSERT_EQ(count, ITEM_COUNT);
122 
123   threads.join_all();
124   GTEST_ASSERT_EQ(threads.size(), 0U);
125 
126   for(size_t x = 0; x < ITEM_COUNT; ++x) {
127     std::unique_lock<std::mutex> lk(data->cs_map_);
128     data->thread_map_.insert(static_cast<int>(x));
129     threads.create(TName("BlockingPullThread", x), true, BlockingPullThread<BlockingQueue>, x, data);
130   }
131   data->ready_->signal();
132   threads.join_all();
133   GTEST_ASSERT_EQ(threads.size(), 0U);
134 
135   count = data->q_->size_approx();
136   GTEST_ASSERT_EQ(count, 0UL);
137 }
138 
139