1 /*
2  * Copyright (C) 2009-2021 Codership Oy <info@codership.com>
3  */
4 
5 #include "gcache_bh.hpp"
6 #include "GCache.hpp"
7 
8 #include <cerrno>
9 #include <cassert>
10 
11 #include <sched.h> // sched_yeild()
12 
13 namespace gcache
14 {
15     /*!
16      * Reinitialize seqno sequence (after SST or such)
17      * Clears seqno->ptr map // and sets seqno_min to s.
18      */
19     void
seqno_reset(const gu::UUID & g,seqno_t const s)20     GCache::seqno_reset (const gu::UUID& g, seqno_t const s)
21     {
22         gu::Lock lock(mtx);
23 
24         assert(seqno2ptr.empty() || seqno_max == seqno2ptr.index_back());
25 
26         if (g == gid && s != SEQNO_ILL && seqno_max >= s)
27         {
28             if (seqno_max > s)
29             {
30                 discard_tail(s);
31                 seqno_max = s;
32                 seqno_released = s;
33                 assert(seqno_max == seqno2ptr.index_back());
34             }
35             return;
36         }
37 
38         log_info << "GCache history reset: " << gid << ':' << seqno_max
39                  << " -> " << g << ':' << s;
40 
41         seqno_released = SEQNO_NONE;
42         gid = g;
43 
44         /* order is significant here */
45         rb.seqno_reset();
46         mem.seqno_reset();
47 
48         seqno2ptr.clear(SEQNO_NONE);
49         seqno_max = SEQNO_NONE;
50     }
51 
52     /*!
53      * Assign sequence number to buffer pointed to by ptr
54      */
55     void
seqno_assign(const void * const ptr,seqno_t const seqno_g,seqno_t const seqno_d)56     GCache::seqno_assign (const void* const ptr,
57                           seqno_t     const seqno_g,
58                           seqno_t     const seqno_d)
59     {
60         gu::Lock lock(mtx);
61 
62         BufferHeader* bh = ptr2BH(ptr);
63 
64         assert (SEQNO_NONE == bh->seqno_g);
65         assert (SEQNO_ILL  == bh->seqno_d);
66         assert (!BH_is_released(bh));
67 
68         if (gu_likely(seqno_g > seqno_max))
69         {
70             seqno_max = seqno_g;
71         }
72         else
73         {
74             seqno2ptr_iter_t const i(seqno2ptr.find(seqno_g));
75 
76             if (i != seqno2ptr.end())
77             {
78                 const void* const prev_ptr(*i);
79 
80                 if (!seqno2ptr_t::not_set(prev_ptr))
81                 {
82                     const BufferHeader* const prev_bh(ptr2BH(prev_ptr));
83                     assert(0);
84                     gu_throw_fatal << "Attempt to reuse the same seqno: "
85                                    << seqno_g <<". New buffer: " << bh
86                                    << ", previous buffer: " << prev_bh;
87                 }
88             }
89 
90             seqno_released = std::min(seqno_released, seqno_g - 1);
91         }
92 
93         seqno2ptr.insert(seqno_g, ptr);
94 
95         bh->seqno_g = seqno_g;
96         bh->seqno_d = seqno_d;
97     }
98 
99     void
seqno_release(seqno_t const seqno)100     GCache::seqno_release (seqno_t const seqno)
101     {
102         assert (seqno > 0);
103         /* The number of buffers scheduled for release is unpredictable, so
104          * we want to allow some concurrency in cache access by releasing
105          * buffers in small batches */
106         static int const min_batch_size(32);
107 
108         /* Although extremely unlikely, theoretically concurrent access may
109          * lead to elements being added faster than released. The following is
110          * to control and possibly disable concurrency in that case. We start
111          * with min_batch_size and increase it if necessary. */
112         size_t old_gap(-1);
113         int    batch_size(min_batch_size);
114 
115         bool   loop(false);
116 
117         do
118         {
119             /* if we're doing this loop repeatedly, allow other threads to run*/
120             if (loop) sched_yield();
121 
122             gu::Lock lock(mtx);
123 
124             if (seqno < seqno_released || seqno >= seqno_locked)
125             {
126 #ifndef NDEBUG
127                 if (params.debug())
128                 {
129                     log_info << "GCache::seqno_release(" << seqno
130                              << "): seqno_released: " << seqno_released
131                              << ", seqno_locked: " << seqno_locked
132                              << ": exiting.";
133                 }
134 #endif
135                 break;
136             }
137 
138             seqno_t idx(seqno2ptr.upper_bound(seqno_released));
139 
140             if (gu_unlikely(idx == seqno2ptr.index_end()))
141             {
142                 /* This means that there are no elements with
143                  * seqno following seqno_released - and this should not
144                  * generally happen. But it looks like stopcont test does it. */
145                 if (SEQNO_NONE != seqno_released)
146                 {
147                     log_debug << "Releasing seqno " << seqno << " before "
148                               << seqno_released + 1 << " was assigned.";
149                 }
150                 return;
151             }
152 
153             assert(seqno_max >= seqno_released);
154 
155             /* here we check if (seqno_max - seqno_released) is decreasing
156              * and if not - increase the batch_size (linearly) */
157             size_t const new_gap(seqno_max - seqno_released);
158             batch_size += (new_gap >= old_gap) * min_batch_size;
159             old_gap = new_gap;
160 
161             seqno_t const start  (idx - 1);
162             seqno_t const max_end(std::min(seqno, seqno_locked - 1));
163             seqno_t const end    (max_end - start >= 2*batch_size ?
164                                   start + batch_size : max_end);
165 #ifndef NDEBUG
166             if (params.debug())
167             {
168                 log_info << "GCache::seqno_release(" << seqno << "): "
169                          << (seqno - start) << " buffers, batch_size: "
170                          << batch_size << ", end: " << end;
171             }
172             seqno_t const old_sr(seqno_released);
173 #endif
174             while((loop = (idx < seqno2ptr.index_end())) && idx <= end)
175             {
176                 assert(idx != SEQNO_NONE);
177                 BufferHeader* const bh(ptr2BH(seqno2ptr[idx]));
178                 assert (bh->seqno_g == idx);
179 #ifndef NDEBUG
180                 if (!(seqno_released + 1 == idx ||
181                       seqno_released == SEQNO_NONE))
182                 {
183                     log_info << "seqno_released: " << seqno_released
184                              << "; seqno_locked: " << seqno_locked
185                              << "; idx: " << idx
186                              << "; seqno2ptr.begin: " <<seqno2ptr.index_begin()
187                              << "\nstart: " << start << "; end: " << end
188                              << " batch_size: " << batch_size << "; gap: "
189                              << new_gap << "; seqno_max: " << seqno_max;
190                     assert(seqno_released + 1 == idx ||
191                            seqno_released == SEQNO_NONE);
192                 }
193 #endif
194                 if (gu_likely(!BH_is_released(bh))) free_common(bh);
195                 /* free_common() could modify a map, look for next unreleased
196                  * seqno. */
197                 idx = seqno2ptr.upper_bound(idx);
198             }
199 
200             assert (loop || seqno == seqno_released);
201 
202             loop = (end < seqno) && loop;
203 
204 #ifndef NDEBUG
205             if (params.debug())
206             {
207                 log_info << "GCache::seqno_release(" << seqno
208                          << ") seqno_released: "
209                          << old_sr << " -> " << seqno_released;
210             }
211 #endif
212         }
213         while(loop);
214     }
215 
216     /*!
217      * Move lock to a given seqno. Throw gu::NotFound if seqno is not in cache.
218      * @throws NotFound
219      */
seqno_lock(seqno_t const seqno_g)220     void GCache::seqno_lock (seqno_t const seqno_g)
221     {
222         gu::Lock lock(mtx);
223 
224         assert(seqno_g > 0);
225 
226         assert(SEQNO_MAX == seqno_locked || seqno_locked_count > 0);
227         assert(0   == seqno_locked_count || seqno_locked < SEQNO_MAX);
228 
229         seqno2ptr.at(seqno_g); /* check that the element exists */
230 
231         seqno_locked_count++;
232 
233         if (seqno_g < seqno_locked) seqno_locked = seqno_g;
234     }
235 
236     /*!
237      * Get pointer to buffer identified by seqno.
238      * Moves lock to the given seqno.
239      * @throws NotFound
240      */
seqno_get_ptr(seqno_t const seqno_g,seqno_t & seqno_d,ssize_t & size)241     const void* GCache::seqno_get_ptr (seqno_t const seqno_g,
242                                        seqno_t&      seqno_d,
243                                        ssize_t&      size)
244     {
245         const void* ptr;
246 
247         {
248             gu::Lock lock(mtx);
249             ptr = seqno2ptr.at(seqno_g);
250         }
251 
252         assert (ptr);
253 
254         const BufferHeader* const bh (ptr2BH(ptr)); // this can result in IO
255         seqno_d = bh->seqno_d;
256         size    = bh->size - sizeof(BufferHeader);
257 
258         return ptr;
259     }
260 
261     size_t
seqno_get_buffers(std::vector<Buffer> & v,seqno_t const start)262     GCache::seqno_get_buffers (std::vector<Buffer>& v,
263                                seqno_t const start)
264     {
265         size_t const max(v.size());
266 
267         assert (max > 0);
268 
269         size_t found(0);
270 
271         {
272             gu::Lock lock(mtx);
273 
274             assert(seqno_locked <= start);
275             // the caller should have locked the range first
276 
277             seqno2ptr_iter_t p = seqno2ptr.find(start);
278 
279             if (p != seqno2ptr.end() && *p)
280             {
281                 do {
282                     assert(seqno2ptr.index(p) == seqno_t(start + found));
283                     assert(*p);
284                     v[found].set_ptr(*p);
285                 }
286                 while (++found < max && ++p != seqno2ptr.end() && *p);
287                 /* the last condition ensures seqno continuty, #643 */
288             }
289         }
290 
291         // the following may cause IO
292         for (size_t i(0); i < found; ++i)
293         {
294             const BufferHeader* const bh (ptr2BH(v[i].ptr()));
295 
296             assert (bh->seqno_g == seqno_t(start + i));
297             Limits::assert_size(bh->size);
298 
299             v[i].set_other (bh->seqno_g,
300                             bh->seqno_d,
301                             bh->size - sizeof(BufferHeader));
302         }
303 
304         return found;
305     }
306 
307     /*!
308      * Releases any history locks present.
309      */
seqno_unlock()310     void GCache::seqno_unlock ()
311     {
312         gu::Lock lock(mtx);
313 
314         if (seqno_locked_count > 0)
315         {
316             assert(seqno_locked < SEQNO_MAX);
317             seqno_locked_count--;
318             if (0 == seqno_locked_count) seqno_locked = SEQNO_MAX;
319         }
320         else
321         {
322             assert(SEQNO_MAX == seqno_locked);
323             assert(0); // something wrong with the caller's logic
324             seqno_locked = SEQNO_MAX;
325         }
326     }
327 }
328