1 /*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "tbb/tbb_stddef.h"
18 #include "tbb/tbb_machine.h"
19 #include "tbb/tbb_exception.h"
20 // Define required to satisfy test in internal file.
21 #define __TBB_concurrent_queue_H
22 #include "tbb/internal/_concurrent_queue_impl.h"
23 #include "concurrent_monitor.h"
24 #include "itt_notify.h"
25 #include <new>
26 #include <cstring> // for memset()
27
28 #if defined(_MSC_VER) && defined(_Wp64)
29 // Workaround for overzealous compiler warnings in /Wp64 mode
30 #pragma warning (disable: 4267)
31 #endif
32
33 #define RECORD_EVENTS 0
34
35
36 namespace tbb {
37
38 namespace internal {
39
40 typedef concurrent_queue_base_v3 concurrent_queue_base;
41
42 typedef size_t ticket;
43
44 //! A queue using simple locking.
45 /** For efficiency, this class has no constructor.
46 The caller is expected to zero-initialize it. */
47 struct micro_queue {
48 typedef concurrent_queue_base::page page;
49
50 friend class micro_queue_pop_finalizer;
51
52 atomic<page*> head_page;
53 atomic<ticket> head_counter;
54
55 atomic<page*> tail_page;
56 atomic<ticket> tail_counter;
57
58 spin_mutex page_mutex;
59
60 void push( const void* item, ticket k, concurrent_queue_base& base,
61 concurrent_queue_base::copy_specifics op_type );
62
63 void abort_push( ticket k, concurrent_queue_base& base );
64
65 bool pop( void* dst, ticket k, concurrent_queue_base& base );
66
67 micro_queue& assign( const micro_queue& src, concurrent_queue_base& base,
68 concurrent_queue_base::copy_specifics op_type );
69
70 page* make_copy ( concurrent_queue_base& base, const page* src_page, size_t begin_in_page,
71 size_t end_in_page, ticket& g_index, concurrent_queue_base::copy_specifics op_type ) ;
72
73 void make_invalid( ticket k );
74 };
75
76 // we need to yank it out of micro_queue because of concurrent_queue_base::deallocate_page being virtual.
77 class micro_queue_pop_finalizer: no_copy {
78 typedef concurrent_queue_base::page page;
79 ticket my_ticket;
80 micro_queue& my_queue;
81 page* my_page;
82 concurrent_queue_base &base;
83 public:
micro_queue_pop_finalizer(micro_queue & queue,concurrent_queue_base & b,ticket k,page * p)84 micro_queue_pop_finalizer( micro_queue& queue, concurrent_queue_base& b, ticket k, page* p ) :
85 my_ticket(k), my_queue(queue), my_page(p), base(b)
86 {}
~micro_queue_pop_finalizer()87 ~micro_queue_pop_finalizer() {
88 page* p = my_page;
89 if( p ) {
90 spin_mutex::scoped_lock lock( my_queue.page_mutex );
91 page* q = p->next;
92 my_queue.head_page = q;
93 if( !q ) {
94 my_queue.tail_page = NULL;
95 }
96 }
97 my_queue.head_counter = my_ticket;
98 if( p )
99 base.deallocate_page( p );
100 }
101 };
102
103 struct predicate_leq {
104 ticket t;
predicate_leqtbb::internal::predicate_leq105 predicate_leq( ticket t_ ) : t(t_) {}
operator ()tbb::internal::predicate_leq106 bool operator() ( uintptr_t p ) const {return (ticket)p<=t;}
107 };
108
109 //! Internal representation of a ConcurrentQueue.
110 /** For efficiency, this class has no constructor.
111 The caller is expected to zero-initialize it. */
112 class concurrent_queue_rep {
113 public:
114 private:
115 friend struct micro_queue;
116
117 //! Approximately n_queue/golden ratio
118 static const size_t phi = 3;
119
120 public:
121 //! Must be power of 2
122 static const size_t n_queue = 8;
123
124 //! Map ticket to an array index
index(ticket k)125 static size_t index( ticket k ) {
126 return k*phi%n_queue;
127 }
128
129 atomic<ticket> head_counter;
130 concurrent_monitor items_avail;
131 atomic<size_t> n_invalid_entries;
132 char pad1[NFS_MaxLineSize-((sizeof(atomic<ticket>)+sizeof(concurrent_monitor)+sizeof(atomic<size_t>))&(NFS_MaxLineSize-1))];
133
134 atomic<ticket> tail_counter;
135 concurrent_monitor slots_avail;
136 char pad2[NFS_MaxLineSize-((sizeof(atomic<ticket>)+sizeof(concurrent_monitor))&(NFS_MaxLineSize-1))];
137 micro_queue array[n_queue];
138
choose(ticket k)139 micro_queue& choose( ticket k ) {
140 // The formula here approximates LRU in a cache-oblivious way.
141 return array[index(k)];
142 }
143
144 atomic<unsigned> abort_counter;
145
146 //! Value for effective_capacity that denotes unbounded queue.
147 static const ptrdiff_t infinite_capacity = ptrdiff_t(~size_t(0)/2);
148 };
149
150 #if _MSC_VER && !defined(__INTEL_COMPILER)
151 // unary minus operator applied to unsigned type, result still unsigned
152 #pragma warning( push )
153 #pragma warning( disable: 4146 )
154 #endif
155
156 static void* static_invalid_page;
157
158 //------------------------------------------------------------------------
159 // micro_queue
160 //------------------------------------------------------------------------
push(const void * item,ticket k,concurrent_queue_base & base,concurrent_queue_base::copy_specifics op_type)161 void micro_queue::push( const void* item, ticket k, concurrent_queue_base& base,
162 concurrent_queue_base::copy_specifics op_type ) {
163 k &= -concurrent_queue_rep::n_queue;
164 page* p = NULL;
165 // find index on page where we would put the data
166 size_t index = modulo_power_of_two( k/concurrent_queue_rep::n_queue, base.items_per_page );
167 if( !index ) { // make a new page
168 __TBB_TRY {
169 p = base.allocate_page();
170 } __TBB_CATCH(...) {
171 ++base.my_rep->n_invalid_entries;
172 make_invalid( k );
173 __TBB_RETHROW();
174 }
175 p->mask = 0;
176 p->next = NULL;
177 }
178
179 // wait for my turn
180 if( tail_counter!=k ) // The developer insisted on keeping first check out of the backoff loop
181 for( atomic_backoff b(true);;b.pause() ) {
182 ticket tail = tail_counter;
183 if( tail==k ) break;
184 else if( tail&0x1 ) {
185 // no memory. throws an exception; assumes concurrent_queue_rep::n_queue>1
186 ++base.my_rep->n_invalid_entries;
187 throw_exception( eid_bad_last_alloc );
188 }
189 }
190
191 if( p ) { // page is newly allocated; insert in micro_queue
192 spin_mutex::scoped_lock lock( page_mutex );
193 if( page* q = tail_page )
194 q->next = p;
195 else
196 head_page = p;
197 tail_page = p;
198 }
199
200 if (item) {
201 p = tail_page;
202 ITT_NOTIFY( sync_acquired, p );
203 __TBB_TRY {
204 if( concurrent_queue_base::copy == op_type ) {
205 base.copy_item( *p, index, item );
206 } else {
207 __TBB_ASSERT( concurrent_queue_base::move == op_type, NULL );
208 static_cast<concurrent_queue_base_v8&>(base).move_item( *p, index, item );
209 }
210 } __TBB_CATCH(...) {
211 ++base.my_rep->n_invalid_entries;
212 tail_counter += concurrent_queue_rep::n_queue;
213 __TBB_RETHROW();
214 }
215 ITT_NOTIFY( sync_releasing, p );
216 // If no exception was thrown, mark item as present.
217 p->mask |= uintptr_t(1)<<index;
218 }
219 else // no item; this was called from abort_push
220 ++base.my_rep->n_invalid_entries;
221
222 tail_counter += concurrent_queue_rep::n_queue;
223 }
224
225
abort_push(ticket k,concurrent_queue_base & base)226 void micro_queue::abort_push( ticket k, concurrent_queue_base& base ) {
227 push(NULL, k, base, concurrent_queue_base::copy);
228 }
229
pop(void * dst,ticket k,concurrent_queue_base & base)230 bool micro_queue::pop( void* dst, ticket k, concurrent_queue_base& base ) {
231 k &= -concurrent_queue_rep::n_queue;
232 spin_wait_until_eq( head_counter, k );
233 spin_wait_while_eq( tail_counter, k );
234 page *p = head_page;
235 __TBB_ASSERT( p, NULL );
236 size_t index = modulo_power_of_two( k/concurrent_queue_rep::n_queue, base.items_per_page );
237 bool success = false;
238 {
239 micro_queue_pop_finalizer finalizer( *this, base, k+concurrent_queue_rep::n_queue, index==base.items_per_page-1 ? p : NULL );
240 if( p->mask & uintptr_t(1)<<index ) {
241 success = true;
242 ITT_NOTIFY( sync_acquired, dst );
243 ITT_NOTIFY( sync_acquired, head_page );
244 base.assign_and_destroy_item( dst, *p, index );
245 ITT_NOTIFY( sync_releasing, head_page );
246 } else {
247 --base.my_rep->n_invalid_entries;
248 }
249 }
250 return success;
251 }
252
assign(const micro_queue & src,concurrent_queue_base & base,concurrent_queue_base::copy_specifics op_type)253 micro_queue& micro_queue::assign( const micro_queue& src, concurrent_queue_base& base,
254 concurrent_queue_base::copy_specifics op_type )
255 {
256 head_counter = src.head_counter;
257 tail_counter = src.tail_counter;
258
259 const page* srcp = src.head_page;
260 if( srcp ) {
261 ticket g_index = head_counter;
262 __TBB_TRY {
263 size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep::n_queue;
264 size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep::n_queue, base.items_per_page );
265 size_t end_in_first_page = (index+n_items<base.items_per_page)?(index+n_items):base.items_per_page;
266
267 head_page = make_copy( base, srcp, index, end_in_first_page, g_index, op_type );
268 page* cur_page = head_page;
269
270 if( srcp != src.tail_page ) {
271 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
272 cur_page->next = make_copy( base, srcp, 0, base.items_per_page, g_index, op_type );
273 cur_page = cur_page->next;
274 }
275
276 __TBB_ASSERT( srcp==src.tail_page, NULL );
277
278 size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep::n_queue, base.items_per_page );
279 if( last_index==0 ) last_index = base.items_per_page;
280
281 cur_page->next = make_copy( base, srcp, 0, last_index, g_index, op_type );
282 cur_page = cur_page->next;
283 }
284 tail_page = cur_page;
285 } __TBB_CATCH(...) {
286 make_invalid( g_index );
287 __TBB_RETHROW();
288 }
289 } else {
290 head_page = tail_page = NULL;
291 }
292 return *this;
293 }
294
make_copy(concurrent_queue_base & base,const concurrent_queue_base::page * src_page,size_t begin_in_page,size_t end_in_page,ticket & g_index,concurrent_queue_base::copy_specifics op_type)295 concurrent_queue_base::page* micro_queue::make_copy( concurrent_queue_base& base,
296 const concurrent_queue_base::page* src_page, size_t begin_in_page, size_t end_in_page,
297 ticket& g_index, concurrent_queue_base::copy_specifics op_type )
298 {
299 page* new_page = base.allocate_page();
300 new_page->next = NULL;
301 new_page->mask = src_page->mask;
302 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
303 if( new_page->mask & uintptr_t(1)<<begin_in_page ) {
304 if( concurrent_queue_base::copy == op_type ) {
305 base.copy_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
306 } else {
307 __TBB_ASSERT( concurrent_queue_base::move == op_type, NULL );
308 static_cast<concurrent_queue_base_v8&>(base).move_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
309 }
310 }
311 return new_page;
312 }
313
make_invalid(ticket k)314 void micro_queue::make_invalid( ticket k )
315 {
316 static concurrent_queue_base::page dummy = {static_cast<page*>((void*)1), 0};
317 // mark it so that no more pushes are allowed.
318 static_invalid_page = &dummy;
319 {
320 spin_mutex::scoped_lock lock( page_mutex );
321 tail_counter = k+concurrent_queue_rep::n_queue+1;
322 if( page* q = tail_page )
323 q->next = static_cast<page*>(static_invalid_page);
324 else
325 head_page = static_cast<page*>(static_invalid_page);
326 tail_page = static_cast<page*>(static_invalid_page);
327 }
328 }
329
330 #if _MSC_VER && !defined(__INTEL_COMPILER)
331 #pragma warning( pop )
332 #endif // warning 4146 is back
333
334 //------------------------------------------------------------------------
335 // concurrent_queue_base
336 //------------------------------------------------------------------------
concurrent_queue_base_v3(size_t item_sz)337 concurrent_queue_base_v3::concurrent_queue_base_v3( size_t item_sz ) {
338 items_per_page = item_sz<= 8 ? 32 :
339 item_sz<= 16 ? 16 :
340 item_sz<= 32 ? 8 :
341 item_sz<= 64 ? 4 :
342 item_sz<=128 ? 2 :
343 1;
344 my_capacity = size_t(-1)/(item_sz>1 ? item_sz : 2);
345 my_rep = cache_aligned_allocator<concurrent_queue_rep>().allocate(1);
346 __TBB_ASSERT( is_aligned(my_rep, NFS_GetLineSize()), "alignment error" );
347 __TBB_ASSERT( is_aligned(&my_rep->head_counter, NFS_GetLineSize()), "alignment error" );
348 __TBB_ASSERT( is_aligned(&my_rep->tail_counter, NFS_GetLineSize()), "alignment error" );
349 __TBB_ASSERT( is_aligned(&my_rep->array, NFS_GetLineSize()), "alignment error" );
350 std::memset(static_cast<void*>(my_rep),0,sizeof(concurrent_queue_rep));
351 new ( &my_rep->items_avail ) concurrent_monitor();
352 new ( &my_rep->slots_avail ) concurrent_monitor();
353 this->item_size = item_sz;
354 }
355
~concurrent_queue_base_v3()356 concurrent_queue_base_v3::~concurrent_queue_base_v3() {
357 size_t nq = my_rep->n_queue;
358 for( size_t i=0; i<nq; i++ )
359 __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
360 cache_aligned_allocator<concurrent_queue_rep>().deallocate(my_rep,1);
361 }
362
internal_push(const void * src)363 void concurrent_queue_base_v3::internal_push( const void* src ) {
364 internal_insert_item( src, copy );
365 }
366
internal_push_move(const void * src)367 void concurrent_queue_base_v8::internal_push_move( const void* src ) {
368 internal_insert_item( src, move );
369 }
370
internal_insert_item(const void * src,copy_specifics op_type)371 void concurrent_queue_base_v3::internal_insert_item( const void* src, copy_specifics op_type ) {
372 concurrent_queue_rep& r = *my_rep;
373 unsigned old_abort_counter = r.abort_counter;
374 ticket k = r.tail_counter++;
375 ptrdiff_t e = my_capacity;
376 #if DO_ITT_NOTIFY
377 bool sync_prepare_done = false;
378 #endif
379 if( (ptrdiff_t)(k-r.head_counter)>=e ) { // queue is full
380 #if DO_ITT_NOTIFY
381 if( !sync_prepare_done ) {
382 ITT_NOTIFY( sync_prepare, &sync_prepare_done );
383 sync_prepare_done = true;
384 }
385 #endif
386 bool slept = false;
387 concurrent_monitor::thread_context thr_ctx;
388 r.slots_avail.prepare_wait( thr_ctx, ((ptrdiff_t)(k-e)) );
389 while( (ptrdiff_t)(k-r.head_counter)>=const_cast<volatile ptrdiff_t&>(e = my_capacity) ) {
390 __TBB_TRY {
391 if( r.abort_counter!=old_abort_counter ) {
392 r.slots_avail.cancel_wait( thr_ctx );
393 throw_exception( eid_user_abort );
394 }
395 slept = r.slots_avail.commit_wait( thr_ctx );
396 } __TBB_CATCH( tbb::user_abort& ) {
397 r.choose(k).abort_push(k, *this);
398 __TBB_RETHROW();
399 } __TBB_CATCH(...) {
400 __TBB_RETHROW();
401 }
402 if (slept == true) break;
403 r.slots_avail.prepare_wait( thr_ctx, ((ptrdiff_t)(k-e)) );
404 }
405 if( !slept )
406 r.slots_avail.cancel_wait( thr_ctx );
407 }
408 ITT_NOTIFY( sync_acquired, &sync_prepare_done );
409 __TBB_ASSERT( (ptrdiff_t)(k-r.head_counter)<my_capacity, NULL);
410 r.choose( k ).push( src, k, *this, op_type );
411 r.items_avail.notify( predicate_leq(k) );
412 }
413
internal_pop(void * dst)414 void concurrent_queue_base_v3::internal_pop( void* dst ) {
415 concurrent_queue_rep& r = *my_rep;
416 ticket k;
417 #if DO_ITT_NOTIFY
418 bool sync_prepare_done = false;
419 #endif
420 unsigned old_abort_counter = r.abort_counter;
421 // This loop is a single pop operation; abort_counter should not be re-read inside
422 do {
423 k=r.head_counter++;
424 if ( (ptrdiff_t)(r.tail_counter-k)<=0 ) { // queue is empty
425 #if DO_ITT_NOTIFY
426 if( !sync_prepare_done ) {
427 ITT_NOTIFY( sync_prepare, dst );
428 sync_prepare_done = true;
429 }
430 #endif
431 bool slept = false;
432 concurrent_monitor::thread_context thr_ctx;
433 r.items_avail.prepare_wait( thr_ctx, k );
434 while( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
435 __TBB_TRY {
436 if( r.abort_counter!=old_abort_counter ) {
437 r.items_avail.cancel_wait( thr_ctx );
438 throw_exception( eid_user_abort );
439 }
440 slept = r.items_avail.commit_wait( thr_ctx );
441 } __TBB_CATCH( tbb::user_abort& ) {
442 r.head_counter--;
443 __TBB_RETHROW();
444 } __TBB_CATCH(...) {
445 __TBB_RETHROW();
446 }
447 if (slept == true) break;
448 r.items_avail.prepare_wait( thr_ctx, k );
449 }
450 if( !slept )
451 r.items_avail.cancel_wait( thr_ctx );
452 }
453 __TBB_ASSERT((ptrdiff_t)(r.tail_counter-k)>0, NULL);
454 } while( !r.choose(k).pop(dst,k,*this) );
455
456 // wake up a producer..
457 r.slots_avail.notify( predicate_leq(k) );
458 }
459
internal_abort()460 void concurrent_queue_base_v3::internal_abort() {
461 concurrent_queue_rep& r = *my_rep;
462 ++r.abort_counter;
463 r.items_avail.abort_all();
464 r.slots_avail.abort_all();
465 }
466
internal_pop_if_present(void * dst)467 bool concurrent_queue_base_v3::internal_pop_if_present( void* dst ) {
468 concurrent_queue_rep& r = *my_rep;
469 ticket k;
470 do {
471 k = r.head_counter;
472 for(;;) {
473 if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
474 // Queue is empty
475 return false;
476 }
477 // Queue had item with ticket k when we looked. Attempt to get that item.
478 ticket tk=k;
479 k = r.head_counter.compare_and_swap( tk+1, tk );
480 if( k==tk )
481 break;
482 // Another thread snatched the item, retry.
483 }
484 } while( !r.choose( k ).pop( dst, k, *this ) );
485
486 r.slots_avail.notify( predicate_leq(k) );
487
488 return true;
489 }
490
internal_push_if_not_full(const void * src)491 bool concurrent_queue_base_v3::internal_push_if_not_full( const void* src ) {
492 return internal_insert_if_not_full( src, copy );
493 }
494
internal_push_move_if_not_full(const void * src)495 bool concurrent_queue_base_v8::internal_push_move_if_not_full( const void* src ) {
496 return internal_insert_if_not_full( src, move );
497 }
498
internal_insert_if_not_full(const void * src,copy_specifics op_type)499 bool concurrent_queue_base_v3::internal_insert_if_not_full( const void* src, copy_specifics op_type ) {
500 concurrent_queue_rep& r = *my_rep;
501 ticket k = r.tail_counter;
502 for(;;) {
503 if( (ptrdiff_t)(k-r.head_counter)>=my_capacity ) {
504 // Queue is full
505 return false;
506 }
507 // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
508 ticket tk=k;
509 k = r.tail_counter.compare_and_swap( tk+1, tk );
510 if( k==tk )
511 break;
512 // Another thread claimed the slot, so retry.
513 }
514 r.choose(k).push(src, k, *this, op_type);
515 r.items_avail.notify( predicate_leq(k) );
516 return true;
517 }
518
internal_size() const519 ptrdiff_t concurrent_queue_base_v3::internal_size() const {
520 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
521 return ptrdiff_t(my_rep->tail_counter-my_rep->head_counter-my_rep->n_invalid_entries);
522 }
523
internal_empty() const524 bool concurrent_queue_base_v3::internal_empty() const {
525 ticket tc = my_rep->tail_counter;
526 ticket hc = my_rep->head_counter;
527 // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
528 return ( tc==my_rep->tail_counter && ptrdiff_t(tc-hc-my_rep->n_invalid_entries)<=0 );
529 }
530
internal_set_capacity(ptrdiff_t capacity,size_t)531 void concurrent_queue_base_v3::internal_set_capacity( ptrdiff_t capacity, size_t /*item_sz*/ ) {
532 my_capacity = capacity<0 ? concurrent_queue_rep::infinite_capacity : capacity;
533 }
534
internal_finish_clear()535 void concurrent_queue_base_v3::internal_finish_clear() {
536 size_t nq = my_rep->n_queue;
537 for( size_t i=0; i<nq; ++i ) {
538 page* tp = my_rep->array[i].tail_page;
539 __TBB_ASSERT( my_rep->array[i].head_page==tp, "at most one page should remain" );
540 if( tp!=NULL) {
541 if( tp!=static_invalid_page ) deallocate_page( tp );
542 my_rep->array[i].tail_page = NULL;
543 }
544 }
545 }
546
internal_throw_exception() const547 void concurrent_queue_base_v3::internal_throw_exception() const {
548 throw_exception( eid_bad_alloc );
549 }
550
internal_assign(const concurrent_queue_base & src,copy_specifics op_type)551 void concurrent_queue_base_v3::internal_assign( const concurrent_queue_base& src, copy_specifics op_type ) {
552 items_per_page = src.items_per_page;
553 my_capacity = src.my_capacity;
554
555 // copy concurrent_queue_rep.
556 my_rep->head_counter = src.my_rep->head_counter;
557 my_rep->tail_counter = src.my_rep->tail_counter;
558 my_rep->n_invalid_entries = src.my_rep->n_invalid_entries;
559 my_rep->abort_counter = src.my_rep->abort_counter;
560
561 // copy micro_queues
562 for( size_t i = 0; i<my_rep->n_queue; ++i )
563 my_rep->array[i].assign( src.my_rep->array[i], *this, op_type );
564
565 __TBB_ASSERT( my_rep->head_counter==src.my_rep->head_counter && my_rep->tail_counter==src.my_rep->tail_counter,
566 "the source concurrent queue should not be concurrently modified." );
567 }
568
assign(const concurrent_queue_base & src)569 void concurrent_queue_base_v3::assign( const concurrent_queue_base& src ) {
570 internal_assign( src, copy );
571 }
572
move_content(concurrent_queue_base_v8 & src)573 void concurrent_queue_base_v8::move_content( concurrent_queue_base_v8& src ) {
574 internal_assign( src, move );
575 }
576
577 //------------------------------------------------------------------------
578 // concurrent_queue_iterator_rep
579 //------------------------------------------------------------------------
580 class concurrent_queue_iterator_rep: no_assign {
581 public:
582 ticket head_counter;
583 const concurrent_queue_base& my_queue;
584 const size_t offset_of_last;
585 concurrent_queue_base::page* array[concurrent_queue_rep::n_queue];
concurrent_queue_iterator_rep(const concurrent_queue_base & queue,size_t offset_of_last_)586 concurrent_queue_iterator_rep( const concurrent_queue_base& queue, size_t offset_of_last_ ) :
587 head_counter(queue.my_rep->head_counter),
588 my_queue(queue),
589 offset_of_last(offset_of_last_)
590 {
591 const concurrent_queue_rep& rep = *queue.my_rep;
592 for( size_t k=0; k<concurrent_queue_rep::n_queue; ++k )
593 array[k] = rep.array[k].head_page;
594 }
595 //! Set item to point to kth element. Return true if at end of queue or item is marked valid; false otherwise.
get_item(void * & item,size_t k)596 bool get_item( void*& item, size_t k ) {
597 if( k==my_queue.my_rep->tail_counter ) {
598 item = NULL;
599 return true;
600 } else {
601 concurrent_queue_base::page* p = array[concurrent_queue_rep::index(k)];
602 __TBB_ASSERT(p,NULL);
603 size_t i = modulo_power_of_two( k/concurrent_queue_rep::n_queue, my_queue.items_per_page );
604 item = static_cast<unsigned char*>(static_cast<void*>(p)) + offset_of_last + my_queue.item_size*i;
605 return (p->mask & uintptr_t(1)<<i)!=0;
606 }
607 }
608 };
609
610 //------------------------------------------------------------------------
611 // concurrent_queue_iterator_base
612 //------------------------------------------------------------------------
613
initialize(const concurrent_queue_base & queue,size_t offset_of_last)614 void concurrent_queue_iterator_base_v3::initialize( const concurrent_queue_base& queue, size_t offset_of_last ) {
615 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep>().allocate(1);
616 new( my_rep ) concurrent_queue_iterator_rep(queue,offset_of_last);
617 size_t k = my_rep->head_counter;
618 if( !my_rep->get_item(my_item, k) ) advance();
619 }
620
concurrent_queue_iterator_base_v3(const concurrent_queue_base & queue)621 concurrent_queue_iterator_base_v3::concurrent_queue_iterator_base_v3( const concurrent_queue_base& queue ) {
622 initialize(queue,0);
623 }
624
concurrent_queue_iterator_base_v3(const concurrent_queue_base & queue,size_t offset_of_last)625 concurrent_queue_iterator_base_v3::concurrent_queue_iterator_base_v3( const concurrent_queue_base& queue, size_t offset_of_last ) {
626 initialize(queue,offset_of_last);
627 }
628
assign(const concurrent_queue_iterator_base & other)629 void concurrent_queue_iterator_base_v3::assign( const concurrent_queue_iterator_base& other ) {
630 if( my_rep!=other.my_rep ) {
631 if( my_rep ) {
632 cache_aligned_allocator<concurrent_queue_iterator_rep>().deallocate(my_rep, 1);
633 my_rep = NULL;
634 }
635 if( other.my_rep ) {
636 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep>().allocate(1);
637 new( my_rep ) concurrent_queue_iterator_rep( *other.my_rep );
638 }
639 }
640 my_item = other.my_item;
641 }
642
advance()643 void concurrent_queue_iterator_base_v3::advance() {
644 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
645 size_t k = my_rep->head_counter;
646 const concurrent_queue_base& queue = my_rep->my_queue;
647 #if TBB_USE_ASSERT
648 void* tmp;
649 my_rep->get_item(tmp,k);
650 __TBB_ASSERT( my_item==tmp, NULL );
651 #endif /* TBB_USE_ASSERT */
652 size_t i = modulo_power_of_two( k/concurrent_queue_rep::n_queue, queue.items_per_page );
653 if( i==queue.items_per_page-1 ) {
654 concurrent_queue_base::page*& root = my_rep->array[concurrent_queue_rep::index(k)];
655 root = root->next;
656 }
657 // advance k
658 my_rep->head_counter = ++k;
659 if( !my_rep->get_item(my_item, k) ) advance();
660 }
661
~concurrent_queue_iterator_base_v3()662 concurrent_queue_iterator_base_v3::~concurrent_queue_iterator_base_v3() {
663 //delete my_rep;
664 cache_aligned_allocator<concurrent_queue_iterator_rep>().deallocate(my_rep, 1);
665 my_rep = NULL;
666 }
667
668 } // namespace internal
669
670 } // namespace tbb
671