1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "tbb/tbb_stddef.h"
18 #include "tbb/tbb_machine.h"
19 #include "tbb/tbb_exception.h"
20 // Define required to satisfy test in internal file.
21 #define  __TBB_concurrent_queue_H
22 #include "tbb/internal/_concurrent_queue_impl.h"
23 #include "concurrent_monitor.h"
24 #include "itt_notify.h"
25 #include <new>
26 #include <cstring>   // for memset()
27 
28 #if defined(_MSC_VER) && defined(_Wp64)
29     // Workaround for overzealous compiler warnings in /Wp64 mode
30     #pragma warning (disable: 4267)
31 #endif
32 
33 #define RECORD_EVENTS 0
34 
35 
36 namespace tbb {
37 
38 namespace internal {
39 
40 typedef concurrent_queue_base_v3 concurrent_queue_base;
41 
42 typedef size_t ticket;
43 
44 //! A queue using simple locking.
45 /** For efficiency, this class has no constructor.
46     The caller is expected to zero-initialize it. */
47 struct micro_queue {
48     typedef concurrent_queue_base::page page;
49 
50     friend class micro_queue_pop_finalizer;
51 
52     atomic<page*> head_page;
53     atomic<ticket> head_counter;
54 
55     atomic<page*> tail_page;
56     atomic<ticket> tail_counter;
57 
58     spin_mutex page_mutex;
59 
60     void push( const void* item, ticket k, concurrent_queue_base& base,
61                concurrent_queue_base::copy_specifics op_type );
62 
63     void abort_push( ticket k, concurrent_queue_base& base );
64 
65     bool pop( void* dst, ticket k, concurrent_queue_base& base );
66 
67     micro_queue& assign( const micro_queue& src, concurrent_queue_base& base,
68                          concurrent_queue_base::copy_specifics op_type );
69 
70     page* make_copy ( concurrent_queue_base& base, const page* src_page, size_t begin_in_page,
71                       size_t end_in_page, ticket& g_index, concurrent_queue_base::copy_specifics op_type ) ;
72 
73     void make_invalid( ticket k );
74 };
75 
76 // we need to yank it out of micro_queue because of concurrent_queue_base::deallocate_page being virtual.
77 class micro_queue_pop_finalizer: no_copy {
78     typedef concurrent_queue_base::page page;
79     ticket my_ticket;
80     micro_queue& my_queue;
81     page* my_page;
82     concurrent_queue_base &base;
83 public:
micro_queue_pop_finalizer(micro_queue & queue,concurrent_queue_base & b,ticket k,page * p)84     micro_queue_pop_finalizer( micro_queue& queue, concurrent_queue_base& b, ticket k, page* p ) :
85         my_ticket(k), my_queue(queue), my_page(p), base(b)
86     {}
~micro_queue_pop_finalizer()87     ~micro_queue_pop_finalizer() {
88         page* p = my_page;
89         if( p ) {
90             spin_mutex::scoped_lock lock( my_queue.page_mutex );
91             page* q = p->next;
92             my_queue.head_page = q;
93             if( !q ) {
94                 my_queue.tail_page = NULL;
95             }
96         }
97         my_queue.head_counter = my_ticket;
98         if( p )
99            base.deallocate_page( p );
100     }
101 };
102 
103 struct predicate_leq {
104     ticket t;
predicate_leqtbb::internal::predicate_leq105     predicate_leq( ticket t_ ) : t(t_) {}
operator ()tbb::internal::predicate_leq106     bool operator() ( uintptr_t p ) const {return (ticket)p<=t;}
107 };
108 
109 //! Internal representation of a ConcurrentQueue.
110 /** For efficiency, this class has no constructor.
111     The caller is expected to zero-initialize it. */
112 class concurrent_queue_rep {
113 public:
114 private:
115     friend struct micro_queue;
116 
117     //! Approximately n_queue/golden ratio
118     static const size_t phi = 3;
119 
120 public:
121     //! Must be power of 2
122     static const size_t n_queue = 8;
123 
124     //! Map ticket to an array index
index(ticket k)125     static size_t index( ticket k ) {
126         return k*phi%n_queue;
127     }
128 
129     atomic<ticket> head_counter;
130     concurrent_monitor items_avail;
131     atomic<size_t> n_invalid_entries;
132     char pad1[NFS_MaxLineSize-((sizeof(atomic<ticket>)+sizeof(concurrent_monitor)+sizeof(atomic<size_t>))&(NFS_MaxLineSize-1))];
133 
134     atomic<ticket> tail_counter;
135     concurrent_monitor slots_avail;
136     char pad2[NFS_MaxLineSize-((sizeof(atomic<ticket>)+sizeof(concurrent_monitor))&(NFS_MaxLineSize-1))];
137     micro_queue array[n_queue];
138 
choose(ticket k)139     micro_queue& choose( ticket k ) {
140         // The formula here approximates LRU in a cache-oblivious way.
141         return array[index(k)];
142     }
143 
144     atomic<unsigned> abort_counter;
145 
146     //! Value for effective_capacity that denotes unbounded queue.
147     static const ptrdiff_t infinite_capacity = ptrdiff_t(~size_t(0)/2);
148 };
149 
150 #if _MSC_VER && !defined(__INTEL_COMPILER)
151     // unary minus operator applied to unsigned type, result still unsigned
152     #pragma warning( push )
153     #pragma warning( disable: 4146 )
154 #endif
155 
156 static void* static_invalid_page;
157 
158 //------------------------------------------------------------------------
159 // micro_queue
160 //------------------------------------------------------------------------
push(const void * item,ticket k,concurrent_queue_base & base,concurrent_queue_base::copy_specifics op_type)161 void micro_queue::push( const void* item, ticket k, concurrent_queue_base& base,
162                         concurrent_queue_base::copy_specifics op_type ) {
163     k &= -concurrent_queue_rep::n_queue;
164     page* p = NULL;
165     // find index on page where we would put the data
166     size_t index = modulo_power_of_two( k/concurrent_queue_rep::n_queue, base.items_per_page );
167     if( !index ) {  // make a new page
168         __TBB_TRY {
169             p = base.allocate_page();
170         } __TBB_CATCH(...) {
171             ++base.my_rep->n_invalid_entries;
172             make_invalid( k );
173             __TBB_RETHROW();
174         }
175         p->mask = 0;
176         p->next = NULL;
177     }
178 
179     // wait for my turn
180     if( tail_counter!=k ) // The developer insisted on keeping first check out of the backoff loop
181         for( atomic_backoff b(true);;b.pause() ) {
182             ticket tail = tail_counter;
183             if( tail==k ) break;
184             else if( tail&0x1 ) {
185                 // no memory. throws an exception; assumes concurrent_queue_rep::n_queue>1
186                 ++base.my_rep->n_invalid_entries;
187                 throw_exception( eid_bad_last_alloc );
188             }
189         }
190 
191     if( p ) { // page is newly allocated; insert in micro_queue
192         spin_mutex::scoped_lock lock( page_mutex );
193         if( page* q = tail_page )
194             q->next = p;
195         else
196             head_page = p;
197         tail_page = p;
198     }
199 
200     if (item) {
201         p = tail_page;
202         ITT_NOTIFY( sync_acquired, p );
203         __TBB_TRY {
204             if( concurrent_queue_base::copy == op_type ) {
205                 base.copy_item( *p, index, item );
206             } else {
207                 __TBB_ASSERT( concurrent_queue_base::move == op_type, NULL );
208                 static_cast<concurrent_queue_base_v8&>(base).move_item( *p, index, item );
209             }
210         }  __TBB_CATCH(...) {
211             ++base.my_rep->n_invalid_entries;
212             tail_counter += concurrent_queue_rep::n_queue;
213             __TBB_RETHROW();
214         }
215         ITT_NOTIFY( sync_releasing, p );
216         // If no exception was thrown, mark item as present.
217         p->mask |= uintptr_t(1)<<index;
218     }
219     else // no item; this was called from abort_push
220         ++base.my_rep->n_invalid_entries;
221 
222     tail_counter += concurrent_queue_rep::n_queue;
223 }
224 
225 
abort_push(ticket k,concurrent_queue_base & base)226 void micro_queue::abort_push( ticket k, concurrent_queue_base& base ) {
227     push(NULL, k, base, concurrent_queue_base::copy);
228 }
229 
pop(void * dst,ticket k,concurrent_queue_base & base)230 bool micro_queue::pop( void* dst, ticket k, concurrent_queue_base& base ) {
231     k &= -concurrent_queue_rep::n_queue;
232     spin_wait_until_eq( head_counter, k );
233     spin_wait_while_eq( tail_counter, k );
234     page *p = head_page;
235     __TBB_ASSERT( p, NULL );
236     size_t index = modulo_power_of_two( k/concurrent_queue_rep::n_queue, base.items_per_page );
237     bool success = false;
238     {
239         micro_queue_pop_finalizer finalizer( *this, base, k+concurrent_queue_rep::n_queue, index==base.items_per_page-1 ? p : NULL );
240         if( p->mask & uintptr_t(1)<<index ) {
241             success = true;
242             ITT_NOTIFY( sync_acquired, dst );
243             ITT_NOTIFY( sync_acquired, head_page );
244             base.assign_and_destroy_item( dst, *p, index );
245             ITT_NOTIFY( sync_releasing, head_page );
246         } else {
247             --base.my_rep->n_invalid_entries;
248         }
249     }
250     return success;
251 }
252 
assign(const micro_queue & src,concurrent_queue_base & base,concurrent_queue_base::copy_specifics op_type)253 micro_queue& micro_queue::assign( const micro_queue& src, concurrent_queue_base& base,
254                                   concurrent_queue_base::copy_specifics op_type )
255 {
256     head_counter = src.head_counter;
257     tail_counter = src.tail_counter;
258 
259     const page* srcp = src.head_page;
260     if( srcp ) {
261         ticket g_index = head_counter;
262         __TBB_TRY {
263             size_t n_items  = (tail_counter-head_counter)/concurrent_queue_rep::n_queue;
264             size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep::n_queue, base.items_per_page );
265             size_t end_in_first_page = (index+n_items<base.items_per_page)?(index+n_items):base.items_per_page;
266 
267             head_page = make_copy( base, srcp, index, end_in_first_page, g_index, op_type );
268             page* cur_page = head_page;
269 
270             if( srcp != src.tail_page ) {
271                 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
272                     cur_page->next = make_copy( base, srcp, 0, base.items_per_page, g_index, op_type );
273                     cur_page = cur_page->next;
274                 }
275 
276                 __TBB_ASSERT( srcp==src.tail_page, NULL );
277 
278                 size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep::n_queue, base.items_per_page );
279                 if( last_index==0 ) last_index = base.items_per_page;
280 
281                 cur_page->next = make_copy( base, srcp, 0, last_index, g_index, op_type );
282                 cur_page = cur_page->next;
283             }
284             tail_page = cur_page;
285         } __TBB_CATCH(...) {
286             make_invalid( g_index );
287             __TBB_RETHROW();
288         }
289     } else {
290         head_page = tail_page = NULL;
291     }
292     return *this;
293 }
294 
make_copy(concurrent_queue_base & base,const concurrent_queue_base::page * src_page,size_t begin_in_page,size_t end_in_page,ticket & g_index,concurrent_queue_base::copy_specifics op_type)295 concurrent_queue_base::page* micro_queue::make_copy( concurrent_queue_base& base,
296     const concurrent_queue_base::page* src_page, size_t begin_in_page, size_t end_in_page,
297     ticket& g_index, concurrent_queue_base::copy_specifics op_type )
298 {
299     page* new_page = base.allocate_page();
300     new_page->next = NULL;
301     new_page->mask = src_page->mask;
302     for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
303         if( new_page->mask & uintptr_t(1)<<begin_in_page ) {
304             if( concurrent_queue_base::copy == op_type ) {
305                 base.copy_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
306             } else {
307                 __TBB_ASSERT( concurrent_queue_base::move == op_type, NULL );
308                 static_cast<concurrent_queue_base_v8&>(base).move_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
309             }
310         }
311     return new_page;
312 }
313 
make_invalid(ticket k)314 void micro_queue::make_invalid( ticket k )
315 {
316     static concurrent_queue_base::page dummy = {static_cast<page*>((void*)1), 0};
317     // mark it so that no more pushes are allowed.
318     static_invalid_page = &dummy;
319     {
320         spin_mutex::scoped_lock lock( page_mutex );
321         tail_counter = k+concurrent_queue_rep::n_queue+1;
322         if( page* q = tail_page )
323             q->next = static_cast<page*>(static_invalid_page);
324         else
325             head_page = static_cast<page*>(static_invalid_page);
326         tail_page = static_cast<page*>(static_invalid_page);
327     }
328 }
329 
330 #if _MSC_VER && !defined(__INTEL_COMPILER)
331     #pragma warning( pop )
332 #endif // warning 4146 is back
333 
334 //------------------------------------------------------------------------
335 // concurrent_queue_base
336 //------------------------------------------------------------------------
concurrent_queue_base_v3(size_t item_sz)337 concurrent_queue_base_v3::concurrent_queue_base_v3( size_t item_sz ) {
338     items_per_page = item_sz<=  8 ? 32 :
339                      item_sz<= 16 ? 16 :
340                      item_sz<= 32 ?  8 :
341                      item_sz<= 64 ?  4 :
342                      item_sz<=128 ?  2 :
343                      1;
344     my_capacity = size_t(-1)/(item_sz>1 ? item_sz : 2);
345     my_rep = cache_aligned_allocator<concurrent_queue_rep>().allocate(1);
346     __TBB_ASSERT( is_aligned(my_rep, NFS_GetLineSize()), "alignment error" );
347     __TBB_ASSERT( is_aligned(&my_rep->head_counter, NFS_GetLineSize()), "alignment error" );
348     __TBB_ASSERT( is_aligned(&my_rep->tail_counter, NFS_GetLineSize()), "alignment error" );
349     __TBB_ASSERT( is_aligned(&my_rep->array, NFS_GetLineSize()), "alignment error" );
350     std::memset(static_cast<void*>(my_rep),0,sizeof(concurrent_queue_rep));
351     new ( &my_rep->items_avail ) concurrent_monitor();
352     new ( &my_rep->slots_avail ) concurrent_monitor();
353     this->item_size = item_sz;
354 }
355 
~concurrent_queue_base_v3()356 concurrent_queue_base_v3::~concurrent_queue_base_v3() {
357     size_t nq = my_rep->n_queue;
358     for( size_t i=0; i<nq; i++ )
359         __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
360     cache_aligned_allocator<concurrent_queue_rep>().deallocate(my_rep,1);
361 }
362 
internal_push(const void * src)363 void concurrent_queue_base_v3::internal_push( const void* src ) {
364     internal_insert_item( src, copy );
365 }
366 
internal_push_move(const void * src)367 void concurrent_queue_base_v8::internal_push_move( const void* src ) {
368    internal_insert_item( src, move );
369 }
370 
internal_insert_item(const void * src,copy_specifics op_type)371 void concurrent_queue_base_v3::internal_insert_item( const void* src, copy_specifics op_type ) {
372     concurrent_queue_rep& r = *my_rep;
373     unsigned old_abort_counter = r.abort_counter;
374     ticket k = r.tail_counter++;
375     ptrdiff_t e = my_capacity;
376 #if DO_ITT_NOTIFY
377     bool sync_prepare_done = false;
378 #endif
379     if( (ptrdiff_t)(k-r.head_counter)>=e ) { // queue is full
380 #if DO_ITT_NOTIFY
381         if( !sync_prepare_done ) {
382             ITT_NOTIFY( sync_prepare, &sync_prepare_done );
383             sync_prepare_done = true;
384         }
385 #endif
386         bool slept = false;
387         concurrent_monitor::thread_context thr_ctx;
388         r.slots_avail.prepare_wait( thr_ctx, ((ptrdiff_t)(k-e)) );
389         while( (ptrdiff_t)(k-r.head_counter)>=const_cast<volatile ptrdiff_t&>(e = my_capacity) ) {
390             __TBB_TRY {
391                 if( r.abort_counter!=old_abort_counter ) {
392                     r.slots_avail.cancel_wait( thr_ctx );
393                     throw_exception( eid_user_abort );
394                 }
395                 slept = r.slots_avail.commit_wait( thr_ctx );
396             } __TBB_CATCH( tbb::user_abort& ) {
397                 r.choose(k).abort_push(k, *this);
398                 __TBB_RETHROW();
399             } __TBB_CATCH(...) {
400                 __TBB_RETHROW();
401             }
402             if (slept == true) break;
403             r.slots_avail.prepare_wait( thr_ctx, ((ptrdiff_t)(k-e)) );
404         }
405         if( !slept )
406             r.slots_avail.cancel_wait( thr_ctx );
407     }
408     ITT_NOTIFY( sync_acquired, &sync_prepare_done );
409     __TBB_ASSERT( (ptrdiff_t)(k-r.head_counter)<my_capacity, NULL);
410     r.choose( k ).push( src, k, *this, op_type );
411     r.items_avail.notify( predicate_leq(k) );
412 }
413 
internal_pop(void * dst)414 void concurrent_queue_base_v3::internal_pop( void* dst ) {
415     concurrent_queue_rep& r = *my_rep;
416     ticket k;
417 #if DO_ITT_NOTIFY
418     bool sync_prepare_done = false;
419 #endif
420     unsigned old_abort_counter = r.abort_counter;
421     // This loop is a single pop operation; abort_counter should not be re-read inside
422     do {
423         k=r.head_counter++;
424         if ( (ptrdiff_t)(r.tail_counter-k)<=0 ) { // queue is empty
425 #if DO_ITT_NOTIFY
426             if( !sync_prepare_done ) {
427                 ITT_NOTIFY( sync_prepare, dst );
428                 sync_prepare_done = true;
429             }
430 #endif
431             bool slept = false;
432             concurrent_monitor::thread_context thr_ctx;
433             r.items_avail.prepare_wait( thr_ctx, k );
434             while( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
435                 __TBB_TRY {
436                     if( r.abort_counter!=old_abort_counter ) {
437                         r.items_avail.cancel_wait( thr_ctx );
438                         throw_exception( eid_user_abort );
439                     }
440                     slept = r.items_avail.commit_wait( thr_ctx );
441                 } __TBB_CATCH( tbb::user_abort& ) {
442                     r.head_counter--;
443                     __TBB_RETHROW();
444                 } __TBB_CATCH(...) {
445                     __TBB_RETHROW();
446                 }
447                 if (slept == true) break;
448                 r.items_avail.prepare_wait( thr_ctx, k );
449             }
450             if( !slept )
451                 r.items_avail.cancel_wait( thr_ctx );
452         }
453         __TBB_ASSERT((ptrdiff_t)(r.tail_counter-k)>0, NULL);
454     } while( !r.choose(k).pop(dst,k,*this) );
455 
456     // wake up a producer..
457     r.slots_avail.notify( predicate_leq(k) );
458 }
459 
internal_abort()460 void concurrent_queue_base_v3::internal_abort() {
461     concurrent_queue_rep& r = *my_rep;
462     ++r.abort_counter;
463     r.items_avail.abort_all();
464     r.slots_avail.abort_all();
465 }
466 
internal_pop_if_present(void * dst)467 bool concurrent_queue_base_v3::internal_pop_if_present( void* dst ) {
468     concurrent_queue_rep& r = *my_rep;
469     ticket k;
470     do {
471         k = r.head_counter;
472         for(;;) {
473             if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
474                 // Queue is empty
475                 return false;
476             }
477             // Queue had item with ticket k when we looked.  Attempt to get that item.
478             ticket tk=k;
479             k = r.head_counter.compare_and_swap( tk+1, tk );
480             if( k==tk )
481                 break;
482             // Another thread snatched the item, retry.
483         }
484     } while( !r.choose( k ).pop( dst, k, *this ) );
485 
486     r.slots_avail.notify( predicate_leq(k) );
487 
488     return true;
489 }
490 
internal_push_if_not_full(const void * src)491 bool concurrent_queue_base_v3::internal_push_if_not_full( const void* src ) {
492     return internal_insert_if_not_full( src, copy );
493 }
494 
internal_push_move_if_not_full(const void * src)495 bool concurrent_queue_base_v8::internal_push_move_if_not_full( const void* src ) {
496     return internal_insert_if_not_full( src, move );
497 }
498 
internal_insert_if_not_full(const void * src,copy_specifics op_type)499 bool concurrent_queue_base_v3::internal_insert_if_not_full( const void* src, copy_specifics op_type ) {
500     concurrent_queue_rep& r = *my_rep;
501     ticket k = r.tail_counter;
502     for(;;) {
503         if( (ptrdiff_t)(k-r.head_counter)>=my_capacity ) {
504             // Queue is full
505             return false;
506         }
507         // Queue had empty slot with ticket k when we looked.  Attempt to claim that slot.
508         ticket tk=k;
509         k = r.tail_counter.compare_and_swap( tk+1, tk );
510         if( k==tk )
511             break;
512         // Another thread claimed the slot, so retry.
513     }
514     r.choose(k).push(src, k, *this, op_type);
515     r.items_avail.notify( predicate_leq(k) );
516     return true;
517 }
518 
internal_size() const519 ptrdiff_t concurrent_queue_base_v3::internal_size() const {
520     __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
521     return ptrdiff_t(my_rep->tail_counter-my_rep->head_counter-my_rep->n_invalid_entries);
522 }
523 
internal_empty() const524 bool concurrent_queue_base_v3::internal_empty() const {
525     ticket tc = my_rep->tail_counter;
526     ticket hc = my_rep->head_counter;
527     // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
528     return ( tc==my_rep->tail_counter && ptrdiff_t(tc-hc-my_rep->n_invalid_entries)<=0 );
529 }
530 
internal_set_capacity(ptrdiff_t capacity,size_t)531 void concurrent_queue_base_v3::internal_set_capacity( ptrdiff_t capacity, size_t /*item_sz*/ ) {
532     my_capacity = capacity<0 ? concurrent_queue_rep::infinite_capacity : capacity;
533 }
534 
internal_finish_clear()535 void concurrent_queue_base_v3::internal_finish_clear() {
536     size_t nq = my_rep->n_queue;
537     for( size_t i=0; i<nq; ++i ) {
538         page* tp = my_rep->array[i].tail_page;
539         __TBB_ASSERT( my_rep->array[i].head_page==tp, "at most one page should remain" );
540         if( tp!=NULL) {
541             if( tp!=static_invalid_page ) deallocate_page( tp );
542             my_rep->array[i].tail_page = NULL;
543         }
544     }
545 }
546 
internal_throw_exception() const547 void concurrent_queue_base_v3::internal_throw_exception() const {
548     throw_exception( eid_bad_alloc );
549 }
550 
internal_assign(const concurrent_queue_base & src,copy_specifics op_type)551 void concurrent_queue_base_v3::internal_assign( const concurrent_queue_base& src, copy_specifics op_type ) {
552     items_per_page = src.items_per_page;
553     my_capacity = src.my_capacity;
554 
555     // copy concurrent_queue_rep.
556     my_rep->head_counter = src.my_rep->head_counter;
557     my_rep->tail_counter = src.my_rep->tail_counter;
558     my_rep->n_invalid_entries = src.my_rep->n_invalid_entries;
559     my_rep->abort_counter = src.my_rep->abort_counter;
560 
561     // copy micro_queues
562     for( size_t i = 0; i<my_rep->n_queue; ++i )
563         my_rep->array[i].assign( src.my_rep->array[i], *this, op_type );
564 
565     __TBB_ASSERT( my_rep->head_counter==src.my_rep->head_counter && my_rep->tail_counter==src.my_rep->tail_counter,
566             "the source concurrent queue should not be concurrently modified." );
567 }
568 
assign(const concurrent_queue_base & src)569 void concurrent_queue_base_v3::assign( const concurrent_queue_base& src ) {
570     internal_assign( src, copy );
571 }
572 
move_content(concurrent_queue_base_v8 & src)573 void concurrent_queue_base_v8::move_content( concurrent_queue_base_v8& src ) {
574     internal_assign( src, move );
575 }
576 
577 //------------------------------------------------------------------------
578 // concurrent_queue_iterator_rep
579 //------------------------------------------------------------------------
580 class concurrent_queue_iterator_rep: no_assign {
581 public:
582     ticket head_counter;
583     const concurrent_queue_base& my_queue;
584     const size_t offset_of_last;
585     concurrent_queue_base::page* array[concurrent_queue_rep::n_queue];
concurrent_queue_iterator_rep(const concurrent_queue_base & queue,size_t offset_of_last_)586     concurrent_queue_iterator_rep( const concurrent_queue_base& queue, size_t offset_of_last_ ) :
587         head_counter(queue.my_rep->head_counter),
588         my_queue(queue),
589         offset_of_last(offset_of_last_)
590     {
591         const concurrent_queue_rep& rep = *queue.my_rep;
592         for( size_t k=0; k<concurrent_queue_rep::n_queue; ++k )
593             array[k] = rep.array[k].head_page;
594     }
595     //! Set item to point to kth element.  Return true if at end of queue or item is marked valid; false otherwise.
get_item(void * & item,size_t k)596     bool get_item( void*& item, size_t k ) {
597         if( k==my_queue.my_rep->tail_counter ) {
598             item = NULL;
599             return true;
600         } else {
601             concurrent_queue_base::page* p = array[concurrent_queue_rep::index(k)];
602             __TBB_ASSERT(p,NULL);
603             size_t i = modulo_power_of_two( k/concurrent_queue_rep::n_queue, my_queue.items_per_page );
604             item = static_cast<unsigned char*>(static_cast<void*>(p)) + offset_of_last + my_queue.item_size*i;
605             return (p->mask & uintptr_t(1)<<i)!=0;
606         }
607     }
608 };
609 
610 //------------------------------------------------------------------------
611 // concurrent_queue_iterator_base
612 //------------------------------------------------------------------------
613 
initialize(const concurrent_queue_base & queue,size_t offset_of_last)614 void concurrent_queue_iterator_base_v3::initialize( const concurrent_queue_base& queue, size_t offset_of_last ) {
615     my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep>().allocate(1);
616     new( my_rep ) concurrent_queue_iterator_rep(queue,offset_of_last);
617     size_t k = my_rep->head_counter;
618     if( !my_rep->get_item(my_item, k) ) advance();
619 }
620 
concurrent_queue_iterator_base_v3(const concurrent_queue_base & queue)621 concurrent_queue_iterator_base_v3::concurrent_queue_iterator_base_v3( const concurrent_queue_base& queue ) {
622     initialize(queue,0);
623 }
624 
concurrent_queue_iterator_base_v3(const concurrent_queue_base & queue,size_t offset_of_last)625 concurrent_queue_iterator_base_v3::concurrent_queue_iterator_base_v3( const concurrent_queue_base& queue, size_t offset_of_last ) {
626     initialize(queue,offset_of_last);
627 }
628 
assign(const concurrent_queue_iterator_base & other)629 void concurrent_queue_iterator_base_v3::assign( const concurrent_queue_iterator_base& other ) {
630     if( my_rep!=other.my_rep ) {
631         if( my_rep ) {
632             cache_aligned_allocator<concurrent_queue_iterator_rep>().deallocate(my_rep, 1);
633             my_rep = NULL;
634         }
635         if( other.my_rep ) {
636             my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep>().allocate(1);
637             new( my_rep ) concurrent_queue_iterator_rep( *other.my_rep );
638         }
639     }
640     my_item = other.my_item;
641 }
642 
advance()643 void concurrent_queue_iterator_base_v3::advance() {
644     __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
645     size_t k = my_rep->head_counter;
646     const concurrent_queue_base& queue = my_rep->my_queue;
647 #if TBB_USE_ASSERT
648     void* tmp;
649     my_rep->get_item(tmp,k);
650     __TBB_ASSERT( my_item==tmp, NULL );
651 #endif /* TBB_USE_ASSERT */
652     size_t i = modulo_power_of_two( k/concurrent_queue_rep::n_queue, queue.items_per_page );
653     if( i==queue.items_per_page-1 ) {
654         concurrent_queue_base::page*& root = my_rep->array[concurrent_queue_rep::index(k)];
655         root = root->next;
656     }
657     // advance k
658     my_rep->head_counter = ++k;
659     if( !my_rep->get_item(my_item, k) ) advance();
660 }
661 
~concurrent_queue_iterator_base_v3()662 concurrent_queue_iterator_base_v3::~concurrent_queue_iterator_base_v3() {
663     //delete my_rep;
664     cache_aligned_allocator<concurrent_queue_iterator_rep>().deallocate(my_rep, 1);
665     my_rep = NULL;
666 }
667 
668 } // namespace internal
669 
670 } // namespace tbb
671