1 /* 2 * Copyright (C) 2009-2021 Codership Oy <info@codership.com> 3 */ 4 5 #include "gcache_bh.hpp" 6 #include "GCache.hpp" 7 8 #include <cerrno> 9 #include <cassert> 10 11 #include <sched.h> // sched_yeild() 12 13 namespace gcache 14 { 15 /*! 16 * Reinitialize seqno sequence (after SST or such) 17 * Clears seqno->ptr map // and sets seqno_min to s. 18 */ 19 void seqno_reset(const gu::UUID & g,seqno_t const s)20 GCache::seqno_reset (const gu::UUID& g, seqno_t const s) 21 { 22 gu::Lock lock(mtx); 23 24 assert(seqno2ptr.empty() || seqno_max == seqno2ptr.index_back()); 25 26 if (g == gid && s != SEQNO_ILL && seqno_max >= s) 27 { 28 if (seqno_max > s) 29 { 30 discard_tail(s); 31 seqno_max = s; 32 seqno_released = s; 33 assert(seqno_max == seqno2ptr.index_back()); 34 } 35 return; 36 } 37 38 log_info << "GCache history reset: " << gid << ':' << seqno_max 39 << " -> " << g << ':' << s; 40 41 seqno_released = SEQNO_NONE; 42 gid = g; 43 44 /* order is significant here */ 45 rb.seqno_reset(); 46 mem.seqno_reset(); 47 48 seqno2ptr.clear(SEQNO_NONE); 49 seqno_max = SEQNO_NONE; 50 } 51 52 /*! 53 * Assign sequence number to buffer pointed to by ptr 54 */ 55 void seqno_assign(const void * const ptr,seqno_t const seqno_g,seqno_t const seqno_d)56 GCache::seqno_assign (const void* const ptr, 57 seqno_t const seqno_g, 58 seqno_t const seqno_d) 59 { 60 gu::Lock lock(mtx); 61 62 BufferHeader* bh = ptr2BH(ptr); 63 64 assert (SEQNO_NONE == bh->seqno_g); 65 assert (SEQNO_ILL == bh->seqno_d); 66 assert (!BH_is_released(bh)); 67 68 if (gu_likely(seqno_g > seqno_max)) 69 { 70 seqno_max = seqno_g; 71 } 72 else 73 { 74 seqno2ptr_iter_t const i(seqno2ptr.find(seqno_g)); 75 76 if (i != seqno2ptr.end()) 77 { 78 const void* const prev_ptr(*i); 79 80 if (!seqno2ptr_t::not_set(prev_ptr)) 81 { 82 const BufferHeader* const prev_bh(ptr2BH(prev_ptr)); 83 assert(0); 84 gu_throw_fatal << "Attempt to reuse the same seqno: " 85 << seqno_g <<". New buffer: " << bh 86 << ", previous buffer: " << prev_bh; 87 } 88 } 89 90 seqno_released = std::min(seqno_released, seqno_g - 1); 91 } 92 93 seqno2ptr.insert(seqno_g, ptr); 94 95 bh->seqno_g = seqno_g; 96 bh->seqno_d = seqno_d; 97 } 98 99 void seqno_release(seqno_t const seqno)100 GCache::seqno_release (seqno_t const seqno) 101 { 102 assert (seqno > 0); 103 /* The number of buffers scheduled for release is unpredictable, so 104 * we want to allow some concurrency in cache access by releasing 105 * buffers in small batches */ 106 static int const min_batch_size(32); 107 108 /* Although extremely unlikely, theoretically concurrent access may 109 * lead to elements being added faster than released. The following is 110 * to control and possibly disable concurrency in that case. We start 111 * with min_batch_size and increase it if necessary. */ 112 size_t old_gap(-1); 113 int batch_size(min_batch_size); 114 115 bool loop(false); 116 117 do 118 { 119 /* if we're doing this loop repeatedly, allow other threads to run*/ 120 if (loop) sched_yield(); 121 122 gu::Lock lock(mtx); 123 124 if (seqno < seqno_released || seqno >= seqno_locked) 125 { 126 #ifndef NDEBUG 127 if (params.debug()) 128 { 129 log_info << "GCache::seqno_release(" << seqno 130 << "): seqno_released: " << seqno_released 131 << ", seqno_locked: " << seqno_locked 132 << ": exiting."; 133 } 134 #endif 135 break; 136 } 137 138 seqno_t idx(seqno2ptr.upper_bound(seqno_released)); 139 140 if (gu_unlikely(idx == seqno2ptr.index_end())) 141 { 142 /* This means that there are no elements with 143 * seqno following seqno_released - and this should not 144 * generally happen. But it looks like stopcont test does it. */ 145 if (SEQNO_NONE != seqno_released) 146 { 147 log_debug << "Releasing seqno " << seqno << " before " 148 << seqno_released + 1 << " was assigned."; 149 } 150 return; 151 } 152 153 assert(seqno_max >= seqno_released); 154 155 /* here we check if (seqno_max - seqno_released) is decreasing 156 * and if not - increase the batch_size (linearly) */ 157 size_t const new_gap(seqno_max - seqno_released); 158 batch_size += (new_gap >= old_gap) * min_batch_size; 159 old_gap = new_gap; 160 161 seqno_t const start (idx - 1); 162 seqno_t const max_end(std::min(seqno, seqno_locked - 1)); 163 seqno_t const end (max_end - start >= 2*batch_size ? 164 start + batch_size : max_end); 165 #ifndef NDEBUG 166 if (params.debug()) 167 { 168 log_info << "GCache::seqno_release(" << seqno << "): " 169 << (seqno - start) << " buffers, batch_size: " 170 << batch_size << ", end: " << end; 171 } 172 seqno_t const old_sr(seqno_released); 173 #endif 174 while((loop = (idx < seqno2ptr.index_end())) && idx <= end) 175 { 176 assert(idx != SEQNO_NONE); 177 BufferHeader* const bh(ptr2BH(seqno2ptr[idx])); 178 assert (bh->seqno_g == idx); 179 #ifndef NDEBUG 180 if (!(seqno_released + 1 == idx || 181 seqno_released == SEQNO_NONE)) 182 { 183 log_info << "seqno_released: " << seqno_released 184 << "; seqno_locked: " << seqno_locked 185 << "; idx: " << idx 186 << "; seqno2ptr.begin: " <<seqno2ptr.index_begin() 187 << "\nstart: " << start << "; end: " << end 188 << " batch_size: " << batch_size << "; gap: " 189 << new_gap << "; seqno_max: " << seqno_max; 190 assert(seqno_released + 1 == idx || 191 seqno_released == SEQNO_NONE); 192 } 193 #endif 194 if (gu_likely(!BH_is_released(bh))) free_common(bh); 195 /* free_common() could modify a map, look for next unreleased 196 * seqno. */ 197 idx = seqno2ptr.upper_bound(idx); 198 } 199 200 assert (loop || seqno == seqno_released); 201 202 loop = (end < seqno) && loop; 203 204 #ifndef NDEBUG 205 if (params.debug()) 206 { 207 log_info << "GCache::seqno_release(" << seqno 208 << ") seqno_released: " 209 << old_sr << " -> " << seqno_released; 210 } 211 #endif 212 } 213 while(loop); 214 } 215 216 /*! 217 * Move lock to a given seqno. Throw gu::NotFound if seqno is not in cache. 218 * @throws NotFound 219 */ seqno_lock(seqno_t const seqno_g)220 void GCache::seqno_lock (seqno_t const seqno_g) 221 { 222 gu::Lock lock(mtx); 223 224 assert(seqno_g > 0); 225 226 assert(SEQNO_MAX == seqno_locked || seqno_locked_count > 0); 227 assert(0 == seqno_locked_count || seqno_locked < SEQNO_MAX); 228 229 seqno2ptr.at(seqno_g); /* check that the element exists */ 230 231 seqno_locked_count++; 232 233 if (seqno_g < seqno_locked) seqno_locked = seqno_g; 234 } 235 236 /*! 237 * Get pointer to buffer identified by seqno. 238 * Moves lock to the given seqno. 239 * @throws NotFound 240 */ seqno_get_ptr(seqno_t const seqno_g,seqno_t & seqno_d,ssize_t & size)241 const void* GCache::seqno_get_ptr (seqno_t const seqno_g, 242 seqno_t& seqno_d, 243 ssize_t& size) 244 { 245 const void* ptr; 246 247 { 248 gu::Lock lock(mtx); 249 ptr = seqno2ptr.at(seqno_g); 250 } 251 252 assert (ptr); 253 254 const BufferHeader* const bh (ptr2BH(ptr)); // this can result in IO 255 seqno_d = bh->seqno_d; 256 size = bh->size - sizeof(BufferHeader); 257 258 return ptr; 259 } 260 261 size_t seqno_get_buffers(std::vector<Buffer> & v,seqno_t const start)262 GCache::seqno_get_buffers (std::vector<Buffer>& v, 263 seqno_t const start) 264 { 265 size_t const max(v.size()); 266 267 assert (max > 0); 268 269 size_t found(0); 270 271 { 272 gu::Lock lock(mtx); 273 274 assert(seqno_locked <= start); 275 // the caller should have locked the range first 276 277 seqno2ptr_iter_t p = seqno2ptr.find(start); 278 279 if (p != seqno2ptr.end() && *p) 280 { 281 do { 282 assert(seqno2ptr.index(p) == seqno_t(start + found)); 283 assert(*p); 284 v[found].set_ptr(*p); 285 } 286 while (++found < max && ++p != seqno2ptr.end() && *p); 287 /* the last condition ensures seqno continuty, #643 */ 288 } 289 } 290 291 // the following may cause IO 292 for (size_t i(0); i < found; ++i) 293 { 294 const BufferHeader* const bh (ptr2BH(v[i].ptr())); 295 296 assert (bh->seqno_g == seqno_t(start + i)); 297 Limits::assert_size(bh->size); 298 299 v[i].set_other (bh->seqno_g, 300 bh->seqno_d, 301 bh->size - sizeof(BufferHeader)); 302 } 303 304 return found; 305 } 306 307 /*! 308 * Releases any history locks present. 309 */ seqno_unlock()310 void GCache::seqno_unlock () 311 { 312 gu::Lock lock(mtx); 313 314 if (seqno_locked_count > 0) 315 { 316 assert(seqno_locked < SEQNO_MAX); 317 seqno_locked_count--; 318 if (0 == seqno_locked_count) seqno_locked = SEQNO_MAX; 319 } 320 else 321 { 322 assert(SEQNO_MAX == seqno_locked); 323 assert(0); // something wrong with the caller's logic 324 seqno_locked = SEQNO_MAX; 325 } 326 } 327 } 328