1 #include "sync.h"
2 
3 using namespace rsimpl;
4 
frame_archive(const std::vector<subdevice_mode_selection> & selection,rs_stream key_stream)5 frame_archive::frame_archive(const std::vector<subdevice_mode_selection> & selection, rs_stream key_stream) : key_stream(key_stream)
6 {
7     // Store the mode selection that pertains to each native stream
8     for(auto & mode : selection)
9     {
10         for(auto & o : mode.get_outputs())
11         {
12             modes[o.first] = mode;
13         }
14     }
15 
16     // Enumerate all streams we need to keep synchronized with the key stream
17     for(auto s : {RS_STREAM_DEPTH, RS_STREAM_INFRARED, RS_STREAM_INFRARED2, RS_STREAM_COLOR})
18     {
19         if(is_stream_enabled(s) && s != key_stream) other_streams.push_back(s);
20     }
21 
22     // Allocate an empty image for each stream, and move it to the frontbuffer
23     // This allows us to assume that get_frame_data/get_frame_timestamp always return valid data
24     alloc_frame(key_stream, 0);
25     frontbuffer[key_stream] = std::move(backbuffer[key_stream]);
26     for(auto s : other_streams)
27     {
28         alloc_frame(s, 0);
29         frontbuffer[s] = std::move(backbuffer[s]);
30     }
31 }
32 
get_frame_data(rs_stream stream) const33 const byte * frame_archive::get_frame_data(rs_stream stream) const
34 {
35     return frontbuffer[stream].data.data();
36 }
37 
get_frame_timestamp(rs_stream stream) const38 int frame_archive::get_frame_timestamp(rs_stream stream) const
39 {
40     return frontbuffer[stream].timestamp;
41 }
42 
43 // Block until the next coherent frameset is available
wait_for_frames()44 void frame_archive::wait_for_frames()
45 {
46     std::unique_lock<std::mutex> lock(mutex);
47     const auto ready = [this]() { return !frames[key_stream].empty(); };
48     if(!ready() && !cv.wait_for(lock, std::chrono::seconds(5), ready)) throw std::runtime_error("Timeout waiting for frames.");
49     get_next_frames();
50 }
51 
52 // If a coherent frameset is available, obtain it and return true, otherwise return false immediately
poll_for_frames()53 bool frame_archive::poll_for_frames()
54 {
55     // TODO: Implement a user-specifiable timeout for how long to wait before returning false?
56     std::unique_lock<std::mutex> lock(mutex);
57     if(frames[key_stream].empty()) return false;
58     get_next_frames();
59     return true;
60 }
61 
62 // Move frames from the queues to the frontbuffers to form the next coherent frameset
get_next_frames()63 void frame_archive::get_next_frames()
64 {
65     // Always dequeue a frame from the key stream
66     dequeue_frame(key_stream);
67 
68     // Dequeue from other streams if the new frame is closer to the timestamp of the key stream than the old frame
69     for(auto s : other_streams)
70     {
71         if(!frames[s].empty() && abs(frames[s].front().timestamp - frontbuffer[key_stream].timestamp) <= abs(frontbuffer[s].timestamp - frontbuffer[key_stream].timestamp))
72         {
73             dequeue_frame(s);
74         }
75     }
76 }
77 
78 // Allocate a new frame in the backbuffer, potentially recycling a buffer from the freelist
alloc_frame(rs_stream stream,int timestamp)79 byte * frame_archive::alloc_frame(rs_stream stream, int timestamp)
80 {
81     const size_t size = modes[stream].get_image_size(stream);
82 
83     {
84         std::lock_guard<std::mutex> guard(mutex);
85 
86         // Attempt to obtain a buffer of the appropriate size from the freelist
87         for(auto it = begin(freelist); it != end(freelist); ++it)
88         {
89             if(it->data.size() == size)
90             {
91                 backbuffer[stream] = std::move(*it);
92                 freelist.erase(it);
93                 break;
94             }
95         }
96 
97         // Discard buffers that have been in the freelist for longer than 1s
98         for(auto it = begin(freelist); it != end(freelist); )
99         {
100             if(timestamp > it->timestamp + 1000) it = freelist.erase(it);
101             else ++it;
102         }
103     }
104 
105     backbuffer[stream].data.resize(size); // TODO: Allow users to provide a custom allocator for frame buffers
106     backbuffer[stream].timestamp = timestamp;
107     return backbuffer[stream].data.data();
108 }
109 
110 // Move a frame from the backbuffer to the back of the queue
commit_frame(rs_stream stream)111 void frame_archive::commit_frame(rs_stream stream)
112 {
113     std::unique_lock<std::mutex> lock(mutex);
114     frames[stream].push_back(std::move(backbuffer[stream]));
115     cull_frames();
116     lock.unlock();
117     if(!frames[key_stream].empty()) cv.notify_one();
118 }
119 
120 // Discard all frames which are older than the most recent coherent frameset
cull_frames()121 void frame_archive::cull_frames()
122 {
123     // Never keep more than four frames around in any given stream, regardless of timestamps
124     for(auto s : {RS_STREAM_DEPTH, RS_STREAM_COLOR, RS_STREAM_INFRARED, RS_STREAM_INFRARED2})
125     {
126         while(frames[s].size() > 4)
127         {
128             discard_frame(s);
129         }
130     }
131 
132     // Cannot do any culling unless at least one frame is enqueued for each enabled stream
133     if(frames[key_stream].empty()) return;
134     for(auto s : other_streams) if(frames[s].empty()) return;
135 
136     // We can discard frames from the key stream if we have at least two and the latter is closer to the most recent frame of all other streams than the former
137     while(true)
138     {
139         if(frames[key_stream].size() < 2) break;
140         const int t0 = frames[key_stream][0].timestamp, t1 = frames[key_stream][1].timestamp;
141 
142         bool valid_to_skip = true;
143         for(auto s : other_streams)
144         {
145             if(abs(t0 - frames[s].back().timestamp) < abs(t1 - frames[s].back().timestamp))
146             {
147                 valid_to_skip = false;
148                 break;
149             }
150         }
151         if(!valid_to_skip) break;
152 
153         discard_frame(key_stream);
154     }
155 
156     // We can discard frames for other streams if we have at least two and the latter is closer to the next key stream frame than the former
157     for(auto s : other_streams)
158     {
159         while(true)
160         {
161             if(frames[s].size() < 2) break;
162             const int t0 = frames[s][0].timestamp, t1 = frames[s][1].timestamp;
163 
164             if(abs(t0 - frames[key_stream].front().timestamp) < abs(t1 - frames[key_stream].front().timestamp)) break;
165             discard_frame(s);
166         }
167     }
168 }
169 
170 // Move a single frame from the head of the queue to the front buffer, while recycling the front buffer into the freelist
dequeue_frame(rs_stream stream)171 void frame_archive::dequeue_frame(rs_stream stream)
172 {
173     if(!frontbuffer[stream].data.empty()) freelist.push_back(std::move(frontbuffer[stream]));
174     frontbuffer[stream] = std::move(frames[stream].front());
175     frames[stream].erase(begin(frames[stream]));
176 }
177 
178 // Move a single frame from the head of the queue directly to the freelist
discard_frame(rs_stream stream)179 void frame_archive::discard_frame(rs_stream stream)
180 {
181     freelist.push_back(std::move(frames[stream].front()));
182     frames[stream].erase(begin(frames[stream]));
183 }