1 #include "sync.h"
2
3 using namespace rsimpl;
4
frame_archive(const std::vector<subdevice_mode_selection> & selection,rs_stream key_stream)5 frame_archive::frame_archive(const std::vector<subdevice_mode_selection> & selection, rs_stream key_stream) : key_stream(key_stream)
6 {
7 // Store the mode selection that pertains to each native stream
8 for(auto & mode : selection)
9 {
10 for(auto & o : mode.get_outputs())
11 {
12 modes[o.first] = mode;
13 }
14 }
15
16 // Enumerate all streams we need to keep synchronized with the key stream
17 for(auto s : {RS_STREAM_DEPTH, RS_STREAM_INFRARED, RS_STREAM_INFRARED2, RS_STREAM_COLOR})
18 {
19 if(is_stream_enabled(s) && s != key_stream) other_streams.push_back(s);
20 }
21
22 // Allocate an empty image for each stream, and move it to the frontbuffer
23 // This allows us to assume that get_frame_data/get_frame_timestamp always return valid data
24 alloc_frame(key_stream, 0);
25 frontbuffer[key_stream] = std::move(backbuffer[key_stream]);
26 for(auto s : other_streams)
27 {
28 alloc_frame(s, 0);
29 frontbuffer[s] = std::move(backbuffer[s]);
30 }
31 }
32
get_frame_data(rs_stream stream) const33 const byte * frame_archive::get_frame_data(rs_stream stream) const
34 {
35 return frontbuffer[stream].data.data();
36 }
37
get_frame_timestamp(rs_stream stream) const38 int frame_archive::get_frame_timestamp(rs_stream stream) const
39 {
40 return frontbuffer[stream].timestamp;
41 }
42
43 // Block until the next coherent frameset is available
wait_for_frames()44 void frame_archive::wait_for_frames()
45 {
46 std::unique_lock<std::mutex> lock(mutex);
47 const auto ready = [this]() { return !frames[key_stream].empty(); };
48 if(!ready() && !cv.wait_for(lock, std::chrono::seconds(5), ready)) throw std::runtime_error("Timeout waiting for frames.");
49 get_next_frames();
50 }
51
52 // If a coherent frameset is available, obtain it and return true, otherwise return false immediately
poll_for_frames()53 bool frame_archive::poll_for_frames()
54 {
55 // TODO: Implement a user-specifiable timeout for how long to wait before returning false?
56 std::unique_lock<std::mutex> lock(mutex);
57 if(frames[key_stream].empty()) return false;
58 get_next_frames();
59 return true;
60 }
61
62 // Move frames from the queues to the frontbuffers to form the next coherent frameset
get_next_frames()63 void frame_archive::get_next_frames()
64 {
65 // Always dequeue a frame from the key stream
66 dequeue_frame(key_stream);
67
68 // Dequeue from other streams if the new frame is closer to the timestamp of the key stream than the old frame
69 for(auto s : other_streams)
70 {
71 if(!frames[s].empty() && abs(frames[s].front().timestamp - frontbuffer[key_stream].timestamp) <= abs(frontbuffer[s].timestamp - frontbuffer[key_stream].timestamp))
72 {
73 dequeue_frame(s);
74 }
75 }
76 }
77
78 // Allocate a new frame in the backbuffer, potentially recycling a buffer from the freelist
alloc_frame(rs_stream stream,int timestamp)79 byte * frame_archive::alloc_frame(rs_stream stream, int timestamp)
80 {
81 const size_t size = modes[stream].get_image_size(stream);
82
83 {
84 std::lock_guard<std::mutex> guard(mutex);
85
86 // Attempt to obtain a buffer of the appropriate size from the freelist
87 for(auto it = begin(freelist); it != end(freelist); ++it)
88 {
89 if(it->data.size() == size)
90 {
91 backbuffer[stream] = std::move(*it);
92 freelist.erase(it);
93 break;
94 }
95 }
96
97 // Discard buffers that have been in the freelist for longer than 1s
98 for(auto it = begin(freelist); it != end(freelist); )
99 {
100 if(timestamp > it->timestamp + 1000) it = freelist.erase(it);
101 else ++it;
102 }
103 }
104
105 backbuffer[stream].data.resize(size); // TODO: Allow users to provide a custom allocator for frame buffers
106 backbuffer[stream].timestamp = timestamp;
107 return backbuffer[stream].data.data();
108 }
109
110 // Move a frame from the backbuffer to the back of the queue
commit_frame(rs_stream stream)111 void frame_archive::commit_frame(rs_stream stream)
112 {
113 std::unique_lock<std::mutex> lock(mutex);
114 frames[stream].push_back(std::move(backbuffer[stream]));
115 cull_frames();
116 lock.unlock();
117 if(!frames[key_stream].empty()) cv.notify_one();
118 }
119
120 // Discard all frames which are older than the most recent coherent frameset
cull_frames()121 void frame_archive::cull_frames()
122 {
123 // Never keep more than four frames around in any given stream, regardless of timestamps
124 for(auto s : {RS_STREAM_DEPTH, RS_STREAM_COLOR, RS_STREAM_INFRARED, RS_STREAM_INFRARED2})
125 {
126 while(frames[s].size() > 4)
127 {
128 discard_frame(s);
129 }
130 }
131
132 // Cannot do any culling unless at least one frame is enqueued for each enabled stream
133 if(frames[key_stream].empty()) return;
134 for(auto s : other_streams) if(frames[s].empty()) return;
135
136 // We can discard frames from the key stream if we have at least two and the latter is closer to the most recent frame of all other streams than the former
137 while(true)
138 {
139 if(frames[key_stream].size() < 2) break;
140 const int t0 = frames[key_stream][0].timestamp, t1 = frames[key_stream][1].timestamp;
141
142 bool valid_to_skip = true;
143 for(auto s : other_streams)
144 {
145 if(abs(t0 - frames[s].back().timestamp) < abs(t1 - frames[s].back().timestamp))
146 {
147 valid_to_skip = false;
148 break;
149 }
150 }
151 if(!valid_to_skip) break;
152
153 discard_frame(key_stream);
154 }
155
156 // We can discard frames for other streams if we have at least two and the latter is closer to the next key stream frame than the former
157 for(auto s : other_streams)
158 {
159 while(true)
160 {
161 if(frames[s].size() < 2) break;
162 const int t0 = frames[s][0].timestamp, t1 = frames[s][1].timestamp;
163
164 if(abs(t0 - frames[key_stream].front().timestamp) < abs(t1 - frames[key_stream].front().timestamp)) break;
165 discard_frame(s);
166 }
167 }
168 }
169
170 // Move a single frame from the head of the queue to the front buffer, while recycling the front buffer into the freelist
dequeue_frame(rs_stream stream)171 void frame_archive::dequeue_frame(rs_stream stream)
172 {
173 if(!frontbuffer[stream].data.empty()) freelist.push_back(std::move(frontbuffer[stream]));
174 frontbuffer[stream] = std::move(frames[stream].front());
175 frames[stream].erase(begin(frames[stream]));
176 }
177
178 // Move a single frame from the head of the queue directly to the freelist
discard_frame(rs_stream stream)179 void frame_archive::discard_frame(rs_stream stream)
180 {
181 freelist.push_back(std::move(frames[stream].front()));
182 frames[stream].erase(begin(frames[stream]));
183 }