1 /* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
22 
23 #include <ndb_global.h>
24 
25 #include <VMSignal.hpp>
26 #include <kernel_types.h>
27 #include <Prio.hpp>
28 #include <SignalLoggerManager.hpp>
29 #include <SimulatedBlock.hpp>
30 #include <ErrorHandlingMacros.hpp>
31 #include <GlobalData.hpp>
32 #include <WatchDog.hpp>
33 #include <TransporterDefinitions.hpp>
34 #include "FastScheduler.hpp"
35 #include "mt.hpp"
36 #include <DebuggerNames.hpp>
37 #include <signaldata/StopForCrash.hpp>
38 #include "TransporterCallbackKernel.hpp"
39 #include <NdbSleep.h>
40 #include <portlib/ndb_prefetch.h>
41 
42 #include "mt-asm.h"
43 
44 inline
45 SimulatedBlock*
mt_getBlock(BlockNumber blockNo,Uint32 instanceNo)46 GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo)
47 {
48   SimulatedBlock* b = getBlock(blockNo);
49   if (b != 0 && instanceNo != 0)
50     b = b->getInstance(instanceNo);
51   return b;
52 }
53 
54 #ifdef __GNUC__
55 /* Provides a small (but noticeable) speedup in benchmarks. */
56 #define memcpy __builtin_memcpy
57 #endif
58 
59 /* size of a cacheline */
60 #define NDB_CL 64
61 
62 /* Constants found by benchmarks to be reasonable values. */
63 
64 /* Maximum number of signals to execute before sending to remote nodes. */
65 static const Uint32 MAX_SIGNALS_BEFORE_SEND = 200;
66 
67 /*
68  * Max. signals to execute from one job buffer before considering other
69  * possible stuff to do.
70  */
71 static const Uint32 MAX_SIGNALS_PER_JB = 100;
72 
73 /**
74  * Max signals written to other thread before calling flush_jbb_write_state
75  */
76 static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_RECEIVER = 2;
77 static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_OTHER = 20;
78 static const Uint32 MAX_SIGNALS_BEFORE_WAKEUP = 128;
79 
80 //#define NDB_MT_LOCK_TO_CPU
81 
82 #define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_LQH_WORKERS + 1) //main+lqh+extra
83 #define NUM_MAIN_THREADS 2 // except receiver
84 #define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_LQH_THREADS + 1)
85 
86 /* If this is too small it crashes before first signal. */
87 #define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)
88 
89 static Uint32 num_lqh_workers = 0;
90 static Uint32 num_lqh_threads = 0;
91 static Uint32 num_threads = 0;
92 static Uint32 receiver_thread_no = 0;
93 
94 #define NO_SEND_THREAD (MAX_THREADS + 1)
95 
96 /* max signal is 32 words, 7 for signal header and 25 datawords */
97 #define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / 32)
98 
99 struct mt_lock_stat
100 {
101   const void * m_ptr;
102   char * m_name;
103   Uint32 m_contended_count;
104   Uint32 m_spin_count;
105 };
106 static void register_lock(const void * ptr, const char * name);
107 static mt_lock_stat * lookup_lock(const void * ptr);
108 
109 #if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG)
110 #define USE_FUTEX
111 #endif
112 
113 #ifdef USE_FUTEX
114 #ifndef _GNU_SOURCE
115 #define _GNU_SOURCE
116 #endif
117 #include <unistd.h>
118 #include <sys/syscall.h>
119 #include <sys/types.h>
120 
121 #define FUTEX_WAIT              0
122 #define FUTEX_WAKE              1
123 #define FUTEX_FD                2
124 #define FUTEX_REQUEUE           3
125 #define FUTEX_CMP_REQUEUE       4
126 #define FUTEX_WAKE_OP           5
127 
128 static inline
129 int
futex_wait(volatile unsigned * addr,int val,const struct timespec * timeout)130 futex_wait(volatile unsigned * addr, int val, const struct timespec * timeout)
131 {
132   return syscall(SYS_futex,
133                  addr, FUTEX_WAIT, val, timeout, 0, 0) == 0 ? 0 : errno;
134 }
135 
136 static inline
137 int
futex_wake(volatile unsigned * addr)138 futex_wake(volatile unsigned * addr)
139 {
140   return syscall(SYS_futex, addr, FUTEX_WAKE, 1, 0, 0, 0) == 0 ? 0 : errno;
141 }
142 
143 struct thr_wait
144 {
145   volatile unsigned m_futex_state;
146   enum {
147     FS_RUNNING = 0,
148     FS_SLEEPING = 1
149   };
thr_waitthr_wait150   thr_wait() { xcng(&m_futex_state, FS_RUNNING);}
initthr_wait151   void init () {}
152 };
153 
154 /**
155  * Sleep until woken up or timeout occurs.
156  *
157  * Will call check_callback(check_arg) after proper synchronisation, and only
158  * if that returns true will it actually sleep, else it will return
159  * immediately. This is needed to avoid races with wakeup.
160  *
161  * Returns 'true' if it actually did sleep.
162  */
163 static inline
164 bool
yield(struct thr_wait * wait,const Uint32 nsec,bool (* check_callback)(struct thr_data *),struct thr_data * check_arg)165 yield(struct thr_wait* wait, const Uint32 nsec,
166       bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
167 {
168   volatile unsigned * val = &wait->m_futex_state;
169 #ifndef NDEBUG
170   int old =
171 #endif
172     xcng(val, thr_wait::FS_SLEEPING);
173   assert(old == thr_wait::FS_RUNNING);
174 
175   /**
176    * At this point, we need to re-check the condition that made us decide to
177    * sleep, and skip sleeping if it changed..
178    *
179    * Otherwise, the condition may have not changed, and the thread making the
180    * change have already decided not to wake us, as our state was FS_RUNNING
181    * at the time.
182    *
183    * Also need a memory barrier to ensure this extra check is race-free.
184    *   but that is already provided by xcng
185    */
186   bool waited = (*check_callback)(check_arg);
187   if (waited)
188   {
189     struct timespec timeout;
190     timeout.tv_sec = 0;
191     timeout.tv_nsec = nsec;
192     futex_wait(val, thr_wait::FS_SLEEPING, &timeout);
193   }
194   xcng(val, thr_wait::FS_RUNNING);
195   return waited;
196 }
197 
198 static inline
199 int
wakeup(struct thr_wait * wait)200 wakeup(struct thr_wait* wait)
201 {
202   volatile unsigned * val = &wait->m_futex_state;
203   /**
204    * We must ensure that any state update (new data in buffers...) are visible
205    * to the other thread before we can look at the sleep state of that other
206    * thread.
207    */
208   if (xcng(val, thr_wait::FS_RUNNING) == thr_wait::FS_SLEEPING)
209   {
210     return futex_wake(val);
211   }
212   return 0;
213 }
214 #else
215 #include <NdbMutex.h>
216 #include <NdbCondition.h>
217 
218 struct thr_wait
219 {
220   bool m_need_wakeup;
221   NdbMutex *m_mutex;
222   NdbCondition *m_cond;
thr_waitthr_wait223   thr_wait() : m_need_wakeup(false), m_mutex(0), m_cond(0) {}
224 
initthr_wait225   void init() {
226     m_mutex = NdbMutex_Create();
227     m_cond = NdbCondition_Create();
228   }
229 };
230 
231 static inline
232 bool
yield(struct thr_wait * wait,const Uint32 nsec,bool (* check_callback)(struct thr_data *),struct thr_data * check_arg)233 yield(struct thr_wait* wait, const Uint32 nsec,
234       bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
235 {
236   struct timespec end;
237   NdbCondition_ComputeAbsTime(&end, nsec/1000000);
238   NdbMutex_Lock(wait->m_mutex);
239 
240   Uint32 waits = 0;
241   /* May have spurious wakeups: Always recheck condition predicate */
242   while ((*check_callback)(check_arg))
243   {
244     wait->m_need_wakeup = true;
245     waits++;
246     if (NdbCondition_WaitTimeoutAbs(wait->m_cond,
247                                     wait->m_mutex, &end) == ETIMEDOUT)
248     {
249       wait->m_need_wakeup = false;
250       break;
251     }
252   }
253   NdbMutex_Unlock(wait->m_mutex);
254   return (waits > 0);
255 }
256 
257 
258 static inline
259 int
wakeup(struct thr_wait * wait)260 wakeup(struct thr_wait* wait)
261 {
262   NdbMutex_Lock(wait->m_mutex);
263   // We should avoid signaling when not waiting for wakeup
264   if (wait->m_need_wakeup)
265   {
266     wait->m_need_wakeup = false;
267     NdbCondition_Signal(wait->m_cond);
268   }
269   NdbMutex_Unlock(wait->m_mutex);
270   return 0;
271 }
272 
273 #endif
274 
275 #ifdef NDB_HAVE_XCNG
276 template <unsigned SZ>
277 struct thr_spin_lock
278 {
thr_spin_lockthr_spin_lock279   thr_spin_lock(const char * name = 0)
280   {
281     m_lock = 0;
282     register_lock(this, name);
283   }
284 
285   union {
286     volatile Uint32 m_lock;
287     char pad[SZ];
288   };
289 };
290 
291 static
292 ATTRIBUTE_NOINLINE
293 void
lock_slow(void * sl,volatile unsigned * val)294 lock_slow(void * sl, volatile unsigned * val)
295 {
296   mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock
297 
298 loop:
299   Uint32 spins = 0;
300   do {
301     spins++;
302     cpu_pause();
303   } while (* val == 1);
304 
305   if (unlikely(xcng(val, 1) != 0))
306     goto loop;
307 
308   if (s)
309   {
310     s->m_spin_count += spins;
311     Uint32 count = ++s->m_contended_count;
312     Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
313 
314     if ((count % freq) == 0)
315       printf("%s waiting for lock, contentions: %u spins: %u\n",
316              s->m_name, count, s->m_spin_count);
317   }
318 }
319 
320 template <unsigned SZ>
321 static
322 inline
323 void
lock(struct thr_spin_lock<SZ> * sl)324 lock(struct thr_spin_lock<SZ>* sl)
325 {
326   volatile unsigned* val = &sl->m_lock;
327   if (likely(xcng(val, 1) == 0))
328     return;
329 
330   lock_slow(sl, val);
331 }
332 
333 template <unsigned SZ>
334 static
335 inline
336 void
unlock(struct thr_spin_lock<SZ> * sl)337 unlock(struct thr_spin_lock<SZ>* sl)
338 {
339   /**
340    * Memory barrier here, to make sure all of our stores are visible before
341    * the lock release is.
342    */
343   mb();
344   sl->m_lock = 0;
345 }
346 
347 template <unsigned SZ>
348 static
349 inline
350 int
trylock(struct thr_spin_lock<SZ> * sl)351 trylock(struct thr_spin_lock<SZ>* sl)
352 {
353   volatile unsigned* val = &sl->m_lock;
354   return xcng(val, 1);
355 }
356 #else
357 #define thr_spin_lock thr_mutex
358 #endif
359 
360 template <unsigned SZ>
361 struct thr_mutex
362 {
thr_mutexthr_mutex363   thr_mutex(const char * name = 0) {
364     NdbMutex_Init(&m_mutex);
365     register_lock(this, name);
366   }
367 
368   union {
369     NdbMutex m_mutex;
370     char pad[SZ];
371   };
372 };
373 
374 template <unsigned SZ>
375 static
376 inline
377 void
lock(struct thr_mutex<SZ> * sl)378 lock(struct thr_mutex<SZ>* sl)
379 {
380   NdbMutex_Lock(&sl->m_mutex);
381 }
382 
383 template <unsigned SZ>
384 static
385 inline
386 void
unlock(struct thr_mutex<SZ> * sl)387 unlock(struct thr_mutex<SZ>* sl)
388 {
389   NdbMutex_Unlock(&sl->m_mutex);
390 }
391 
392 template <unsigned SZ>
393 static
394 inline
395 int
trylock(struct thr_mutex<SZ> * sl)396 trylock(struct thr_mutex<SZ> * sl)
397 {
398   return NdbMutex_Trylock(&sl->m_mutex);
399 }
400 
401 /**
402  * thr_safe_pool
403  */
404 template<typename T>
405 struct thr_safe_pool
406 {
thr_safe_poolthr_safe_pool407   thr_safe_pool(const char * name) : m_free_list(0), m_cnt(0), m_lock(name) {}
408 
409   T* m_free_list;
410   Uint32 m_cnt;
411   thr_spin_lock<NDB_CL - (sizeof(void*) + sizeof(Uint32))> m_lock;
412 
seizethr_safe_pool413   T* seize(Ndbd_mem_manager *mm, Uint32 rg) {
414     T* ret = 0;
415     lock(&m_lock);
416     if (m_free_list)
417     {
418       assert(m_cnt);
419       m_cnt--;
420       ret = m_free_list;
421       m_free_list = ret->m_next;
422       unlock(&m_lock);
423     }
424     else
425     {
426       Uint32 dummy;
427       unlock(&m_lock);
428       ret = reinterpret_cast<T*>
429         (mm->alloc_page(rg, &dummy,
430                         Ndbd_mem_manager::NDB_ZONE_ANY));
431       // ToDo: How to deal with failed allocation?!?
432       // I think in this case we need to start grabbing buffers kept for signal
433       // trace.
434     }
435     return ret;
436   }
437 
releasethr_safe_pool438   void release(Ndbd_mem_manager *mm, Uint32 rg, T* t) {
439     lock(&m_lock);
440     t->m_next = m_free_list;
441     m_free_list = t;
442     m_cnt++;
443     unlock(&m_lock);
444   }
445 
release_listthr_safe_pool446   void release_list(Ndbd_mem_manager *mm, Uint32 rg,
447                     T* head, T* tail, Uint32 cnt) {
448     lock(&m_lock);
449     tail->m_next = m_free_list;
450     m_free_list = head;
451     m_cnt += cnt;
452     unlock(&m_lock);
453   }
454 };
455 
456 /**
457  * thread_local_pool
458  */
459 template<typename T>
460 class thread_local_pool
461 {
462 public:
thread_local_pool(thr_safe_pool<T> * global_pool,unsigned max_free)463   thread_local_pool(thr_safe_pool<T> *global_pool, unsigned max_free) :
464     m_max_free(max_free),
465     m_free(0),
466     m_freelist(0),
467     m_global_pool(global_pool)
468   {
469   }
470 
seize(Ndbd_mem_manager * mm,Uint32 rg)471   T *seize(Ndbd_mem_manager *mm, Uint32 rg) {
472     T *tmp = m_freelist;
473     if (tmp)
474     {
475       m_freelist = tmp->m_next;
476       assert(m_free > 0);
477       m_free--;
478     }
479     else
480       tmp = m_global_pool->seize(mm, rg);
481 
482     validate();
483     return tmp;
484   }
485 
release(Ndbd_mem_manager * mm,Uint32 rg,T * t)486   void release(Ndbd_mem_manager *mm, Uint32 rg, T *t) {
487     unsigned free = m_free;
488     if (free < m_max_free)
489     {
490       m_free = free + 1;
491       t->m_next = m_freelist;
492       m_freelist = t;
493     }
494     else
495       m_global_pool->release(mm, rg, t);
496 
497     validate();
498   }
499 
500   /**
501    * Release to local pool even if it get's "too" full
502    *   (wrt to m_max_free)
503    */
release_local(T * t)504   void release_local(T *t) {
505     m_free++;
506     t->m_next = m_freelist;
507     m_freelist = t;
508 
509     validate();
510   }
511 
validate() const512   void validate() const {
513 #ifdef VM_TRACE
514     Uint32 cnt = 0;
515     T* t = m_freelist;
516     while (t)
517     {
518       cnt++;
519       t = t->m_next;
520     }
521     assert(cnt == m_free);
522 #endif
523   }
524 
525   /**
526    * Release entries so that m_max_free is honored
527    *   (likely used together with release_local)
528    */
release_global(Ndbd_mem_manager * mm,Uint32 rg)529   void release_global(Ndbd_mem_manager *mm, Uint32 rg) {
530     validate();
531     unsigned cnt = 0;
532     unsigned free = m_free;
533     Uint32 maxfree = m_max_free;
534     assert(maxfree > 0);
535 
536     T* head = m_freelist;
537     T* tail = m_freelist;
538     if (free > maxfree)
539     {
540       cnt++;
541       free--;
542 
543       while (free > maxfree)
544       {
545         cnt++;
546         free--;
547         tail = tail->m_next;
548       }
549 
550       assert(free == maxfree);
551 
552       m_free = free;
553       m_freelist = tail->m_next;
554       m_global_pool->release_list(mm, rg, head, tail, cnt);
555     }
556     validate();
557   }
558 
release_all(Ndbd_mem_manager * mm,Uint32 rg)559   void release_all(Ndbd_mem_manager *mm, Uint32 rg) {
560     validate();
561     T* head = m_freelist;
562     T* tail = m_freelist;
563     if (tail)
564     {
565       unsigned cnt = 1;
566       while (tail->m_next != 0)
567       {
568         cnt++;
569         tail = tail->m_next;
570       }
571       m_global_pool->release_list(mm, rg, head, tail, cnt);
572       m_free = 0;
573       m_freelist = 0;
574     }
575     validate();
576   }
577 
set_pool(thr_safe_pool<T> * pool)578   void set_pool(thr_safe_pool<T> * pool) { m_global_pool = pool; }
579 
580 private:
581   unsigned m_max_free;
582   unsigned m_free;
583   T *m_freelist;
584   thr_safe_pool<T> *m_global_pool;
585 };
586 
587 /**
588  * Signal buffers.
589  *
590  * Each thread job queue contains a list of these buffers with signals.
591  *
592  * There is an underlying assumption that the size of this structure is the
593  * same as the global memory manager page size.
594  */
595 struct thr_job_buffer // 32k
596 {
597   static const unsigned SIZE = 8190;
598 
599   /*
600    * Amount of signal data currently in m_data buffer.
601    * Read/written by producer, read by consumer.
602    */
603   Uint32 m_len;
604   /*
605    * Whether this buffer contained prio A or prio B signals, used when dumping
606    * signals from released buffers.
607    */
608   Uint32 m_prioa;
609   union {
610     Uint32 m_data[SIZE];
611 
612     thr_job_buffer * m_next; // For free-list
613   };
614 };
615 
616 static
617 inline
618 Uint32
calc_fifo_used(Uint32 ri,Uint32 wi,Uint32 sz)619 calc_fifo_used(Uint32 ri, Uint32 wi, Uint32 sz)
620 {
621   return (wi >= ri) ? wi - ri : (sz - ri) + wi;
622 }
623 
624 /**
625  * thr_job_queue is shared between consumer / producer.
626  *
627  * The hot-spot of the thr_job_queue are the read/write indexes.
628  * As they are updated and read frequently they have been placed
629  * in its own thr_job_queue_head[] in order to make them fit inside a
630  * single/few cache lines and thereby avoid complete L1-cache replacement
631  * every time the job_queue is scanned.
632  */
633 struct thr_job_queue_head
634 {
635   unsigned m_read_index;  // Read/written by consumer, read by producer
636   unsigned m_write_index; // Read/written by producer, read by consumer
637 
638   Uint32 used() const;
639 };
640 
641 struct thr_job_queue
642 {
643   static const unsigned SIZE = 31;
644 
645   struct thr_job_queue_head* m_head;
646   struct thr_job_buffer* m_buffers[SIZE];
647 };
648 
649 inline
650 Uint32
used() const651 thr_job_queue_head::used() const
652 {
653   return calc_fifo_used(m_read_index, m_write_index, thr_job_queue::SIZE);
654 }
655 
656 /*
657  * Two structures tightly associated with thr_job_queue.
658  *
659  * There will generally be exactly one thr_jb_read_state and one
660  * thr_jb_write_state associated with each thr_job_queue.
661  *
662  * The reason they are kept separate is to avoid unnecessary inter-CPU
663  * cache line pollution. All fields shared among producer and consumer
664  * threads are in thr_job_queue, thr_jb_write_state fields are only
665  * accessed by the producer thread(s), and thr_jb_read_state fields are
666  * only accessed by the consumer thread.
667  *
668  * For example, on Intel core 2 quad processors, there is a ~33%
669  * penalty for two cores accessing the same 64-byte cacheline.
670  */
671 struct thr_jb_write_state
672 {
673   /*
674    * The position to insert the next signal into the queue.
675    *
676    * m_write_index is the index into thr_job_queue::m_buffers[] of the buffer
677    * to insert into, and m_write_pos is the index into thr_job_buffer::m_data[]
678    * at which to store the next signal.
679    */
680   Uint32 m_write_index;
681   Uint32 m_write_pos;
682 
683   /* Thread-local copy of thr_job_queue::m_buffers[m_write_index]. */
684   thr_job_buffer *m_write_buffer;
685 
686   /* Number of signals inserted since last flush to thr_job_queue. */
687   Uint32 m_pending_signals;
688 
689   /* Number of signals inserted since last wakeup */
690   Uint32 m_pending_signals_wakeup;
691 };
692 
693 /*
694  * This structure is also used when dumping signal traces, to dump executed
695  * signals from the buffer(s) currently being processed.
696  */
697 struct thr_jb_read_state
698 {
699   /*
700    * Index into thr_job_queue::m_buffers[] of the buffer that we are currently
701    * executing signals from.
702    */
703   Uint32 m_read_index;
704   /*
705    * Index into m_read_buffer->m_data[] of the next signal to execute from the
706    * current buffer.
707    */
708   Uint32 m_read_pos;
709   /*
710    * Thread local copy of thr_job_queue::m_buffers[m_read_index].
711    */
712   thr_job_buffer *m_read_buffer;
713   /*
714    * These are thread-local copies of thr_job_queue::m_write_index and
715    * thr_job_buffer::m_len. They are read once at the start of the signal
716    * execution loop and used to determine when the end of available signals is
717    * reached.
718    */
719   Uint32 m_read_end;    // End within current thr_job_buffer. (*m_read_buffer)
720 
721   Uint32 m_write_index; // Last available thr_job_buffer.
722 
is_emptythr_jb_read_state723   bool is_empty() const
724   {
725     assert(m_read_index != m_write_index  ||  m_read_pos <= m_read_end);
726     return (m_read_index == m_write_index) && (m_read_pos >= m_read_end);
727   }
728 };
729 
730 /**
731  * time-queue
732  */
733 struct thr_tq
734 {
735   static const unsigned SQ_SIZE = 512;
736   static const unsigned LQ_SIZE = 512;
737   static const unsigned PAGES = 32 * (SQ_SIZE + LQ_SIZE) / 8192;
738 
739   Uint32 * m_delayed_signals[PAGES];
740   Uint32 m_next_free;
741   Uint32 m_next_timer;
742   Uint32 m_current_time;
743   Uint32 m_cnt[2];
744   Uint32 m_short_queue[SQ_SIZE];
745   Uint32 m_long_queue[LQ_SIZE];
746 };
747 
748 /*
749  * Max number of thread-local job buffers to keep before releasing to
750  * global pool.
751  */
752 #define THR_FREE_BUF_MAX 32
753 /* Minimum number of buffers (to ensure useful trace dumps). */
754 #define THR_FREE_BUF_MIN 12
755 /*
756  * 1/THR_FREE_BUF_BATCH is the fraction of job buffers to allocate/free
757  * at a time from/to global pool.
758  */
759 #define THR_FREE_BUF_BATCH 6
760 
761 /**
762  * a page with send data
763  */
764 struct thr_send_page
765 {
766   static const Uint32 PGSIZE = 32768;
767 #if SIZEOF_CHARP == 4
768   static const Uint32 HEADER_SIZE = 8;
769 #else
770   static const Uint32 HEADER_SIZE = 12;
771 #endif
772 
max_bytesthr_send_page773   static Uint32 max_bytes() {
774     return PGSIZE - offsetof(thr_send_page, m_data);
775   }
776 
777   /* Next page */
778   thr_send_page* m_next;
779 
780   /* Bytes of send data available in this page. */
781   Uint16 m_bytes;
782 
783   /* Start of unsent data */
784   Uint16 m_start;
785 
786   /* Data; real size is to the end of one page. */
787   char m_data[2];
788 };
789 
790 /**
791  * a linked list with thr_send_page
792  */
793 struct thr_send_buffer
794 {
795   thr_send_page* m_first_page;
796   thr_send_page* m_last_page;
797 };
798 
799 /**
800  * a ring buffer with linked list of thr_send_page
801  */
802 struct thr_send_queue
803 {
804   unsigned m_write_index;
805 #if SIZEOF_CHARP == 8
806   unsigned m_unused;
807   thr_send_page* m_buffers[7];
808   static const unsigned SIZE = 7;
809 #else
810   thr_send_page* m_buffers[15];
811   static const unsigned SIZE = 15;
812 #endif
813 };
814 
815 struct thr_data
816 {
thr_datathr_data817   thr_data() : m_jba_write_lock("jbalock"),
818                m_send_buffer_pool(0, THR_FREE_BUF_MAX) {}
819 
820   thr_wait m_waiter;
821   unsigned m_thr_no;
822 
823   /**
824    * max signals to execute per JBB buffer
825    */
826   unsigned m_max_signals_per_jb;
827 
828   /**
829    * max signals to execute before recomputing m_max_signals_per_jb
830    */
831   unsigned m_max_exec_signals;
832 
833   Uint64 m_time;
834   struct thr_tq m_tq;
835 
836   /* Prio A signal incoming queue. */
837   struct thr_spin_lock<64> m_jba_write_lock;
838   struct thr_job_queue m_jba;
839 
840   struct thr_job_queue_head m_jba_head;
841 
842   /* Thread-local read state of prio A buffer. */
843   struct thr_jb_read_state m_jba_read_state;
844   /*
845    * There is no m_jba_write_state, as we have multiple writers to the prio A
846    * queue, so local state becomes invalid as soon as we release the lock.
847    */
848 
849   /*
850    * In m_next_buffer we keep a free buffer at all times, so that when
851    * we hold the lock and find we need a new buffer, we can use this and this
852    * way defer allocation to after releasing the lock.
853    */
854   struct thr_job_buffer* m_next_buffer;
855 
856   /*
857    * We keep a small number of buffers in a thread-local cyclic FIFO, so that
858    * we can avoid going to the global pool in most cases, and so that we have
859    * recent buffers available for dumping in trace files.
860    */
861   struct thr_job_buffer *m_free_fifo[THR_FREE_BUF_MAX];
862   /* m_first_free is the index of the entry to return next from seize(). */
863   Uint32 m_first_free;
864   /* m_first_unused is the first unused entry in m_free_fifo. */
865   Uint32 m_first_unused;
866 
867   /*
868    * These are the thread input queues, where other threads deliver signals
869    * into.
870    */
871   struct thr_job_queue_head m_in_queue_head[MAX_THREADS];
872   struct thr_job_queue m_in_queue[MAX_THREADS];
873   /* These are the write states of m_in_queue[self] in each thread. */
874   struct thr_jb_write_state m_write_states[MAX_THREADS];
875   /* These are the read states of all of our own m_in_queue[]. */
876   struct thr_jb_read_state m_read_states[MAX_THREADS];
877 
878   /* Jam buffers for making trace files at crashes. */
879   EmulatedJamBuffer m_jam;
880   /* Watchdog counter for this thread. */
881   Uint32 m_watchdog_counter;
882   /* Signal delivery statistics. */
883   Uint32 m_prioa_count;
884   Uint32 m_prioa_size;
885   Uint32 m_priob_count;
886   Uint32 m_priob_size;
887 
888   /* Array of node ids with pending remote send data. */
889   Uint8 m_pending_send_nodes[MAX_NTRANSPORTERS];
890   /* Number of node ids in m_pending_send_nodes. */
891   Uint32 m_pending_send_count;
892 
893   /**
894    * Bitmap of pending node ids with send data.
895    * Used to quickly check if a node id is already in m_pending_send_nodes.
896    */
897   Bitmask<(MAX_NTRANSPORTERS+31)/32> m_pending_send_mask;
898 
899   /* pool for send buffers */
900   class thread_local_pool<thr_send_page> m_send_buffer_pool;
901 
902   /* Send buffer for this thread, these are not touched by any other thread */
903   struct thr_send_buffer m_send_buffers[MAX_NTRANSPORTERS];
904 
905   /* Block instances (main and worker) handled by this thread. */
906   /* Used for sendpacked (send-at-job-buffer-end). */
907   Uint32 m_instance_count;
908   BlockNumber m_instance_list[MAX_INSTANCES_PER_THREAD];
909 
910   SectionSegmentPool::Cache m_sectionPoolCache;
911 
912   Uint32 m_cpu;
913   pthread_t m_thr_id;
914   NdbThread* m_thread;
915 };
916 
917 struct mt_send_handle  : public TransporterSendBufferHandle
918 {
919   struct thr_data * m_selfptr;
mt_send_handlemt_send_handle920   mt_send_handle(thr_data* ptr) : m_selfptr(ptr) {}
~mt_send_handlemt_send_handle921   virtual ~mt_send_handle() {}
922 
923   virtual Uint32 *getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max);
924   virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
925   virtual bool forceSend(NodeId node);
926 };
927 
928 struct trp_callback : public TransporterCallbackKernel
929 {
trp_callbacktrp_callback930   trp_callback() {}
931 
932   /* Callback interface. */
933   int checkJobBuffer();
934   void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
935   void lock_transporter(NodeId node);
936   void unlock_transporter(NodeId node);
937   Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
938   Uint32 bytes_sent(NodeId node, Uint32 bytes);
939   bool has_data_to_send(NodeId node);
940   void reset_send_buffer(NodeId node, bool should_be_empty);
941 };
942 
943 extern trp_callback g_trp_callback;             // Forward declaration
944 extern struct thr_repository g_thr_repository;
945 
946 #include <NdbMutex.h>
947 #include <NdbCondition.h>
948 
949 struct thr_repository
950 {
thr_repositorythr_repository951   thr_repository()
952     : m_receive_lock("recvlock"),
953       m_section_lock("sectionlock"),
954       m_mem_manager_lock("memmanagerlock"),
955       m_jb_pool("jobbufferpool"),
956       m_sb_pool("sendbufferpool")
957     {}
958 
959   struct thr_spin_lock<64> m_receive_lock;
960   struct thr_spin_lock<64> m_section_lock;
961   struct thr_spin_lock<64> m_mem_manager_lock;
962   struct thr_safe_pool<thr_job_buffer> m_jb_pool;
963   struct thr_safe_pool<thr_send_page> m_sb_pool;
964   Ndbd_mem_manager * m_mm;
965   unsigned m_thread_count;
966   struct thr_data m_thread[MAX_THREADS];
967 
968   /**
969    * send buffer handling
970    */
971 
972   /* The buffers that are to be sent */
973   struct send_buffer
974   {
975     /**
976      * lock
977      */
978     struct thr_spin_lock<8> m_send_lock;
979 
980     /**
981      * pending data
982      */
983     struct thr_send_buffer m_buffer;
984 
985     /**
986      * Flag used to coordinate sending to same remote node from different
987      * threads.
988      *
989      * If two threads need to send to the same node at the same time, the
990      * second thread, rather than wait for the first to finish, will just
991      * set this flag, and the first thread will do an extra send when done
992      * with the first.
993      */
994     Uint32 m_force_send;
995 
996     /**
997      * Which thread is currently holding the m_send_lock
998      */
999     Uint32 m_send_thread;
1000 
1001     /**
1002      * bytes pending for this node
1003      */
1004     Uint32 m_bytes;
1005 
1006     /* read index(es) in thr_send_queue */
1007     Uint32 m_read_index[MAX_THREADS];
1008   } m_send_buffers[MAX_NTRANSPORTERS];
1009 
1010   /* The buffers published by threads */
1011   thr_send_queue m_thread_send_buffers[MAX_NTRANSPORTERS][MAX_THREADS];
1012 
1013   /*
1014    * These are used to synchronize during crash / trace dumps.
1015    *
1016    */
1017   NdbMutex stop_for_crash_mutex;
1018   NdbCondition stop_for_crash_cond;
1019   Uint32 stopped_threads;
1020 };
1021 
1022 #if 0
1023 static
1024 Uint32
1025 fifo_used_pages(struct thr_data* selfptr)
1026 {
1027   return calc_fifo_used(selfptr->m_first_unused,
1028                         selfptr->m_first_free,
1029                         THR_FREE_BUF_MAX);
1030 }
1031 #endif
1032 
1033 static
1034 void
job_buffer_full(struct thr_data * selfptr)1035 job_buffer_full(struct thr_data* selfptr)
1036 {
1037   ndbout_c("job buffer full");
1038   abort();
1039 }
1040 
1041 static
1042 void
out_of_job_buffer(struct thr_data * selfptr)1043 out_of_job_buffer(struct thr_data* selfptr)
1044 {
1045   ndbout_c("out of job buffer");
1046   abort();
1047 }
1048 
1049 static
1050 thr_job_buffer*
seize_buffer(struct thr_repository * rep,int thr_no,bool prioa)1051 seize_buffer(struct thr_repository* rep, int thr_no, bool prioa)
1052 {
1053   thr_job_buffer* jb;
1054   thr_data* selfptr = rep->m_thread + thr_no;
1055   Uint32 first_free = selfptr->m_first_free;
1056   Uint32 first_unused = selfptr->m_first_unused;
1057 
1058   /*
1059    * An empty FIFO is denoted by m_first_free == m_first_unused.
1060    * So we will never have a completely full FIFO array, at least one entry will
1061    * always be unused. But the code is simpler as a result.
1062    */
1063 
1064   /*
1065    * We never allow the fifo to become completely empty, as we want to have
1066    * a good number of signals available for trace files in case of a forced
1067    * shutdown.
1068    */
1069   Uint32 buffers = (first_free > first_unused ?
1070                     first_unused + THR_FREE_BUF_MAX - first_free :
1071                     first_unused - first_free);
1072   if (unlikely(buffers <= THR_FREE_BUF_MIN))
1073   {
1074     /*
1075      * All used, allocate another batch from global pool.
1076      *
1077      * Put the new buffers at the head of the fifo, so as not to needlessly
1078      * push out any existing buffers from the fifo (that would loose useful
1079      * data for signal dumps in trace files).
1080      */
1081     Uint32 cnt = 0;
1082     Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
1083     assert(batch > 0);
1084     assert(batch + THR_FREE_BUF_MIN < THR_FREE_BUF_MAX);
1085     do {
1086       jb = rep->m_jb_pool.seize(rep->m_mm, RG_JOBBUFFER);
1087       if (unlikely(jb == 0))
1088       {
1089         if (unlikely(cnt == 0))
1090         {
1091           out_of_job_buffer(selfptr);
1092         }
1093         break;
1094       }
1095       jb->m_len = 0;
1096       jb->m_prioa = false;
1097       first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
1098       selfptr->m_free_fifo[first_free] = jb;
1099       batch--;
1100     } while (cnt < batch);
1101     selfptr->m_first_free = first_free;
1102   }
1103 
1104   jb= selfptr->m_free_fifo[first_free];
1105   selfptr->m_first_free = (first_free + 1) % THR_FREE_BUF_MAX;
1106   /* Init here rather than in release_buffer() so signal dump will work. */
1107   jb->m_len = 0;
1108   jb->m_prioa = prioa;
1109   return jb;
1110 }
1111 
1112 static
1113 void
release_buffer(struct thr_repository * rep,int thr_no,thr_job_buffer * jb)1114 release_buffer(struct thr_repository* rep, int thr_no, thr_job_buffer* jb)
1115 {
1116   struct thr_data* selfptr = rep->m_thread + thr_no;
1117   Uint32 first_free = selfptr->m_first_free;
1118   Uint32 first_unused = selfptr->m_first_unused;
1119 
1120   /*
1121    * Pack near-empty signals, to get more info in the signal traces.
1122    *
1123    * This is not currently used, as we only release full job buffers, hence
1124    * the #if 0.
1125    */
1126 #if 0
1127   Uint32 last_free = (first_unused ? first_unused : THR_FREE_BUF_MAX) - 1;
1128   thr_job_buffer *last_jb = selfptr->m_free_fifo[last_free];
1129   Uint32 len1, len2;
1130 
1131   if (!jb->m_prioa &&
1132       first_free != first_unused &&
1133       !last_jb->m_prioa &&
1134       (len2 = jb->m_len) <= (thr_job_buffer::SIZE / 4) &&
1135       (len1 = last_jb->m_len) + len2 <= thr_job_buffer::SIZE)
1136   {
1137     /*
1138      * The buffer being release is fairly empty, and what data it contains fit
1139      * in the previously released buffer.
1140      *
1141      * We want to avoid too many almost-empty buffers in the free fifo, as that
1142      * makes signal traces less useful due to too little data available. So in
1143      * this case we move the data from the buffer to be released into the
1144      * previous buffer, and place the to-be-released buffer at the head of the
1145      * fifo (to be immediately reused).
1146      *
1147      * This is only done for prio B buffers, as we must not merge prio A and B
1148      * data (or dumps would be incorrect), and prio A buffers are in any case
1149      * full when released.
1150      */
1151     memcpy(last_jb->m_data + len1, jb->m_data, len2*sizeof(jb->m_data[0]));
1152     last_jb->m_len = len1 + len2;
1153     jb->m_len = 0;
1154     first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
1155     selfptr->m_free_fifo[first_free] = jb;
1156     selfptr->m_first_free = first_free;
1157   }
1158   else
1159 #endif
1160   {
1161     /* Just insert at the end of the fifo. */
1162     selfptr->m_free_fifo[first_unused] = jb;
1163     first_unused = (first_unused + 1) % THR_FREE_BUF_MAX;
1164     selfptr->m_first_unused = first_unused;
1165   }
1166 
1167   if (unlikely(first_unused == first_free))
1168   {
1169     /* FIFO full, need to release to global pool. */
1170     Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
1171     assert(batch > 0);
1172     assert(batch < THR_FREE_BUF_MAX);
1173     do {
1174       rep->m_jb_pool.release(rep->m_mm, RG_JOBBUFFER,
1175                              selfptr->m_free_fifo[first_free]);
1176       first_free = (first_free + 1) % THR_FREE_BUF_MAX;
1177       batch--;
1178     } while (batch > 0);
1179     selfptr->m_first_free = first_free;
1180   }
1181 }
1182 
1183 static
1184 inline
1185 Uint32
scan_queue(struct thr_data * selfptr,Uint32 cnt,Uint32 end,Uint32 * ptr)1186 scan_queue(struct thr_data* selfptr, Uint32 cnt, Uint32 end, Uint32* ptr)
1187 {
1188   Uint32 thr_no = selfptr->m_thr_no;
1189   Uint32 **pages = selfptr->m_tq.m_delayed_signals;
1190   Uint32 free = selfptr->m_tq.m_next_free;
1191   Uint32* save = ptr;
1192   for (Uint32 i = 0; i < cnt; i++, ptr++)
1193   {
1194     Uint32 val = * ptr;
1195     if ((val & 0xFFFF) <= end)
1196     {
1197       Uint32 idx = val >> 16;
1198       Uint32 buf = idx >> 8;
1199       Uint32 pos = 32 * (idx & 0xFF);
1200 
1201       Uint32* page = * (pages + buf);
1202 
1203       const SignalHeader *s = reinterpret_cast<SignalHeader*>(page + pos);
1204       const Uint32 *data = page + pos + (sizeof(*s)>>2);
1205       if (0)
1206 	ndbout_c("found %p val: %d end: %d", s, val & 0xFFFF, end);
1207       /*
1208        * ToDo: Do measurements of the frequency of these prio A timed signals.
1209        *
1210        * If they are frequent, we may want to optimize, as sending one prio A
1211        * signal is somewhat expensive compared to sending one prio B.
1212        */
1213       sendprioa(thr_no, s, data,
1214                 data + s->theLength);
1215       * (page + pos) = free;
1216       free = idx;
1217     }
1218     else if (i > 0)
1219     {
1220       selfptr->m_tq.m_next_free = free;
1221       memmove(save, ptr, 4 * (cnt - i));
1222       return i;
1223     }
1224     else
1225     {
1226       return 0;
1227     }
1228   }
1229   selfptr->m_tq.m_next_free = free;
1230   return cnt;
1231 }
1232 
1233 static
1234 void
handle_time_wrap(struct thr_data * selfptr)1235 handle_time_wrap(struct thr_data* selfptr)
1236 {
1237   Uint32 i;
1238   struct thr_tq * tq = &selfptr->m_tq;
1239   Uint32 cnt0 = tq->m_cnt[0];
1240   Uint32 cnt1 = tq->m_cnt[1];
1241   Uint32 tmp0 = scan_queue(selfptr, cnt0, 32767, tq->m_short_queue);
1242   Uint32 tmp1 = scan_queue(selfptr, cnt1, 32767, tq->m_long_queue);
1243   cnt0 -= tmp0;
1244   cnt1 -= tmp1;
1245   tq->m_cnt[0] = cnt0;
1246   tq->m_cnt[1] = cnt1;
1247   for (i = 0; i<cnt0; i++)
1248   {
1249     assert((tq->m_short_queue[i] & 0xFFFF) > 32767);
1250     tq->m_short_queue[i] -= 32767;
1251   }
1252   for (i = 0; i<cnt1; i++)
1253   {
1254     assert((tq->m_long_queue[i] & 0xFFFF) > 32767);
1255     tq->m_long_queue[i] -= 32767;
1256   }
1257 }
1258 
1259 static
1260 void
scan_time_queues_impl(struct thr_data * selfptr,NDB_TICKS now)1261 scan_time_queues_impl(struct thr_data* selfptr, NDB_TICKS now)
1262 {
1263   struct thr_tq * tq = &selfptr->m_tq;
1264   NDB_TICKS last = selfptr->m_time;
1265 
1266   Uint32 curr = tq->m_current_time;
1267   Uint32 cnt0 = tq->m_cnt[0];
1268   Uint32 cnt1 = tq->m_cnt[1];
1269 
1270   assert(now > last);
1271   Uint64 diff = now - last;
1272   Uint32 step = (Uint32)((diff > 20) ? 20 : diff);
1273   Uint32 end = (curr + step);
1274   if (end >= 32767)
1275   {
1276     handle_time_wrap(selfptr);
1277     cnt0 = tq->m_cnt[0];
1278     cnt1 = tq->m_cnt[1];
1279     end -= 32767;
1280   }
1281 
1282   Uint32 tmp0 = scan_queue(selfptr, cnt0, end, tq->m_short_queue);
1283   Uint32 tmp1 = scan_queue(selfptr, cnt1, end, tq->m_long_queue);
1284 
1285   tq->m_current_time = end;
1286   tq->m_cnt[0] = cnt0 - tmp0;
1287   tq->m_cnt[1] = cnt1 - tmp1;
1288   selfptr->m_time = last + step;
1289 }
1290 
1291 static inline
1292 void
scan_time_queues(struct thr_data * selfptr,NDB_TICKS now)1293 scan_time_queues(struct thr_data* selfptr, NDB_TICKS now)
1294 {
1295   if (selfptr->m_time != now)
1296     scan_time_queues_impl(selfptr, now);
1297 }
1298 
1299 static
1300 inline
1301 Uint32*
get_free_slot(struct thr_repository * rep,struct thr_data * selfptr,Uint32 * idxptr)1302 get_free_slot(struct thr_repository* rep,
1303 	      struct thr_data* selfptr,
1304 	      Uint32* idxptr)
1305 {
1306   struct thr_tq * tq = &selfptr->m_tq;
1307   Uint32 idx = tq->m_next_free;
1308 retry:
1309   Uint32 buf = idx >> 8;
1310   Uint32 pos = idx & 0xFF;
1311 
1312   if (idx != RNIL)
1313   {
1314     Uint32* page = * (tq->m_delayed_signals + buf);
1315     Uint32* ptr = page + (32 * pos);
1316     tq->m_next_free = * ptr;
1317     * idxptr = idx;
1318     return ptr;
1319   }
1320 
1321   Uint32 thr_no = selfptr->m_thr_no;
1322   for (Uint32 i = 0; i<thr_tq::PAGES; i++)
1323   {
1324     if (tq->m_delayed_signals[i] == 0)
1325     {
1326       struct thr_job_buffer *jb = seize_buffer(rep, thr_no, false);
1327       Uint32 * page = reinterpret_cast<Uint32*>(jb);
1328       tq->m_delayed_signals[i] = page;
1329 
1330       ndbout_c("saving %p at %p (%d)", page, tq->m_delayed_signals+i, i);
1331 
1332       /**
1333        * Init page
1334        */
1335       for (Uint32 j = 0; j<255; j ++)
1336       {
1337 	page[j * 32] = (i << 8) + (j + 1);
1338       }
1339       page[255*32] = RNIL;
1340       idx = (i << 8);
1341       goto retry;
1342     }
1343   }
1344   abort();
1345   return NULL;
1346 }
1347 
1348 void
senddelay(Uint32 thr_no,const SignalHeader * s,Uint32 delay)1349 senddelay(Uint32 thr_no, const SignalHeader* s, Uint32 delay)
1350 {
1351   struct thr_repository* rep = &g_thr_repository;
1352   struct thr_data * selfptr = rep->m_thread + thr_no;
1353   assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
1354   unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
1355 
1356   Uint32 max;
1357   Uint32 * cntptr;
1358   Uint32 * queueptr;
1359 
1360   Uint32 alarm = selfptr->m_tq.m_current_time + delay;
1361   Uint32 nexttimer = selfptr->m_tq.m_next_timer;
1362   if (delay < 100)
1363   {
1364     cntptr = selfptr->m_tq.m_cnt + 0;
1365     queueptr = selfptr->m_tq.m_short_queue;
1366     max = thr_tq::SQ_SIZE;
1367   }
1368   else
1369   {
1370     cntptr = selfptr->m_tq.m_cnt + 1;
1371     queueptr = selfptr->m_tq.m_long_queue;
1372     max = thr_tq::LQ_SIZE;
1373   }
1374 
1375   Uint32 idx;
1376   Uint32* ptr = get_free_slot(rep, selfptr, &idx);
1377   memcpy(ptr, s, 4*siglen);
1378 
1379   if (0)
1380     ndbout_c("now: %d alarm: %d send %s from %s to %s delay: %d idx: %x %p",
1381 	     selfptr->m_tq.m_current_time,
1382 	     alarm,
1383 	     getSignalName(s->theVerId_signalNumber),
1384 	     getBlockName(refToBlock(s->theSendersBlockRef)),
1385 	     getBlockName(s->theReceiversBlockNumber),
1386 	     delay,
1387 	     idx, ptr);
1388 
1389   Uint32 i;
1390   Uint32 cnt = *cntptr;
1391   Uint32 newentry = (idx << 16) | (alarm & 0xFFFF);
1392 
1393   * cntptr = cnt + 1;
1394   selfptr->m_tq.m_next_timer = alarm < nexttimer ? alarm : nexttimer;
1395 
1396   if (cnt == 0)
1397   {
1398     queueptr[0] = newentry;
1399     return;
1400   }
1401   else if (cnt < max)
1402   {
1403     for (i = 0; i<cnt; i++)
1404     {
1405       Uint32 save = queueptr[i];
1406       if ((save & 0xFFFF) > alarm)
1407       {
1408 	memmove(queueptr+i+1, queueptr+i, 4*(cnt - i));
1409 	queueptr[i] = newentry;
1410 	return;
1411       }
1412     }
1413     assert(i == cnt);
1414     queueptr[i] = newentry;
1415     return;
1416   }
1417   else
1418   {
1419     abort();
1420   }
1421 }
1422 
1423 /*
1424  * Flush the write state to the job queue, making any new signals available to
1425  * receiving threads.
1426  *
1427  * Two versions:
1428  *    - The general version flush_write_state_other() which may flush to
1429  *      any thread, and possibly signal any waiters.
1430  *    - The special version flush_write_state_self() which should only be used
1431  *      to flush messages to itself.
1432  *
1433  * Call to these functions are encapsulated through flush_write_state
1434  * which decides which of these functions to call.
1435  */
1436 static inline
1437 void
flush_write_state_self(thr_job_queue_head * q_head,thr_jb_write_state * w)1438 flush_write_state_self(thr_job_queue_head *q_head, thr_jb_write_state *w)
1439 {
1440   /*
1441    * Can simplify the flush_write_state when writing to myself:
1442    * Simply update write references wo/ mutex, memory barrier and signaling
1443    */
1444   w->m_write_buffer->m_len = w->m_write_pos;
1445   q_head->m_write_index = w->m_write_index;
1446   w->m_pending_signals_wakeup = 0;
1447   w->m_pending_signals = 0;
1448 }
1449 
1450 static inline
1451 void
flush_write_state_other(thr_data * dstptr,thr_job_queue_head * q_head,thr_jb_write_state * w)1452 flush_write_state_other(thr_data *dstptr, thr_job_queue_head *q_head,
1453                         thr_jb_write_state *w)
1454 {
1455   /*
1456    * Two write memory barriers here, as assigning m_len may make signal data
1457    * available to other threads, and assigning m_write_index may make new
1458    * buffers available.
1459    *
1460    * We could optimize this by only doing it as needed, and only doing it
1461    * once before setting all m_len, and once before setting all m_write_index.
1462    *
1463    * But wmb() is a no-op anyway in x86 ...
1464    */
1465   wmb();
1466   w->m_write_buffer->m_len = w->m_write_pos;
1467   wmb();
1468   q_head->m_write_index = w->m_write_index;
1469 
1470   w->m_pending_signals_wakeup += w->m_pending_signals;
1471   w->m_pending_signals = 0;
1472 
1473   if (w->m_pending_signals_wakeup >= MAX_SIGNALS_BEFORE_WAKEUP)
1474   {
1475     w->m_pending_signals_wakeup = 0;
1476     wakeup(&(dstptr->m_waiter));
1477   }
1478 }
1479 
1480 static inline
1481 void
flush_write_state(const thr_data * selfptr,thr_data * dstptr,thr_job_queue_head * q_head,thr_jb_write_state * w)1482 flush_write_state(const thr_data *selfptr, thr_data *dstptr,
1483                   thr_job_queue_head *q_head, thr_jb_write_state *w)
1484 {
1485   if (dstptr == selfptr)
1486   {
1487     flush_write_state_self(q_head, w);
1488   }
1489   else
1490   {
1491     flush_write_state_other(dstptr, q_head, w);
1492   }
1493 }
1494 
1495 
1496 static
1497 void
flush_jbb_write_state(thr_data * selfptr)1498 flush_jbb_write_state(thr_data *selfptr)
1499 {
1500   Uint32 thr_count = g_thr_repository.m_thread_count;
1501   Uint32 self = selfptr->m_thr_no;
1502 
1503   thr_jb_write_state *w = selfptr->m_write_states;
1504   thr_data *thrptr = g_thr_repository.m_thread;
1505   for (Uint32 thr_no = 0; thr_no < thr_count; thr_no++, thrptr++, w++)
1506   {
1507     if (w->m_pending_signals || w->m_pending_signals_wakeup)
1508     {
1509       w->m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
1510       thr_job_queue_head *q_head = thrptr->m_in_queue_head + self;
1511       flush_write_state(selfptr, thrptr, q_head, w);
1512     }
1513   }
1514 }
1515 
1516 /**
1517  * Transporter will receive 1024 signals (MAX_RECEIVED_SIGNALS)
1518  * before running check_job_buffers
1519  *
1520  * This function returns 0 if there is space to receive this amount of
1521  *   signals
1522  * else 1
1523  */
1524 static int
check_job_buffers(struct thr_repository * rep)1525 check_job_buffers(struct thr_repository* rep)
1526 {
1527   const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
1528   unsigned thr_no = receiver_thread_no;
1529   const thr_data *thrptr = rep->m_thread;
1530   for (unsigned i = 0; i<num_threads; i++, thrptr++)
1531   {
1532     /**
1533      * NOTE: m_read_index is read wo/ lock (and updated by different thread)
1534      *       but since the different thread can only consume
1535      *       signals this means that the value returned from this
1536      *       function is always conservative (i.e it can be better than
1537      *       returned value, if read-index has moved but we didnt see it)
1538      */
1539     const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
1540     unsigned ri = q_head->m_read_index;
1541     unsigned wi = q_head->m_write_index;
1542     unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
1543     if (1 + minfree + busy >= thr_job_queue::SIZE)
1544     {
1545       return 1;
1546     }
1547   }
1548 
1549   return 0;
1550 }
1551 
1552 /**
1553  * Compute max signals that thr_no can execute wo/ risking
1554  *   job-buffer-full
1555  *
1556  *  see-also update_sched_config
1557  *
1558  *
1559  * 1) compute free-slots in ring-buffer from self to each thread in system
1560  * 2) pick smallest value
1561  * 3) compute how many signals this corresponds to
1562  * 4) compute how many signals self can execute if all were to be to
1563  *    the thread with the fullest ring-buffer (i.e the worst case)
1564  *
1565  *   Assumption: each signal may send *at most* 4 signals
1566  *     - this assumption is made the same in ndbd and ndbmtd and is
1567  *       mostly followed by block-code, although not it all places :-(
1568  */
1569 static
1570 Uint32
compute_max_signals_to_execute(Uint32 thr_no)1571 compute_max_signals_to_execute(Uint32 thr_no)
1572 {
1573   Uint32 minfree = thr_job_queue::SIZE;
1574   const struct thr_repository* rep = &g_thr_repository;
1575   const thr_data *thrptr = rep->m_thread;
1576 
1577   for (unsigned i = 0; i<num_threads; i++, thrptr++)
1578   {
1579     /**
1580      * NOTE: m_read_index is read wo/ lock (and updated by different thread)
1581      *       but since the different thread can only consume
1582      *       signals this means that the value returned from this
1583      *       function is always conservative (i.e it can be better than
1584      *       returned value, if read-index has moved but we didnt see it)
1585      */
1586     const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
1587     unsigned ri = q_head->m_read_index;
1588     unsigned wi = q_head->m_write_index;
1589     unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
1590 
1591     assert(free <= thr_job_queue::SIZE);
1592 
1593     if (free < minfree)
1594       minfree = free;
1595   }
1596 
1597 #define SAFETY 2
1598 
1599   if (minfree >= (1 + SAFETY))
1600   {
1601     return (3 + (minfree - (1 + SAFETY)) * MIN_SIGNALS_PER_PAGE) / 4;
1602   }
1603   else
1604   {
1605     return 0;
1606   }
1607 }
1608 
1609 //#define NDBMT_RAND_YIELD
1610 #ifdef NDBMT_RAND_YIELD
1611 static Uint32 g_rand_yield = 0;
1612 static
1613 void
rand_yield(Uint32 limit,void * ptr0,void * ptr1)1614 rand_yield(Uint32 limit, void* ptr0, void * ptr1)
1615 {
1616   return;
1617   UintPtr tmp = UintPtr(ptr0) + UintPtr(ptr1);
1618   Uint8* tmpptr = (Uint8*)&tmp;
1619   Uint32 sum = g_rand_yield;
1620   for (Uint32 i = 0; i<sizeof(tmp); i++)
1621     sum = 33 * sum + tmpptr[i];
1622 
1623   if ((sum % 100) < limit)
1624   {
1625     g_rand_yield++;
1626     sched_yield();
1627   }
1628 }
1629 #else
rand_yield(Uint32 limit,void * ptr0,void * ptr1)1630 static inline void rand_yield(Uint32 limit, void* ptr0, void * ptr1) {}
1631 #endif
1632 
1633 
1634 
1635 void
reportSendLen(NodeId nodeId,Uint32 count,Uint64 bytes)1636 trp_callback::reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes)
1637 {
1638   SignalT<3> signalT;
1639   Signal &signal = * new (&signalT) Signal(0);
1640   memset(&signal.header, 0, sizeof(signal.header));
1641 
1642   signal.header.theLength = 3;
1643   signal.header.theSendersSignalId = 0;
1644   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
1645   signal.theData[0] = NDB_LE_SendBytesStatistic;
1646   signal.theData[1] = nodeId;
1647   signal.theData[2] = (Uint32)(bytes/count);
1648   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
1649   signal.header.theReceiversBlockNumber = CMVMI;
1650   sendlocal(g_thr_repository.m_send_buffers[nodeId].m_send_thread,
1651             &signalT.header, signalT.theData, NULL);
1652 }
1653 
1654 /**
1655  * To lock during connect/disconnect, we take both the send lock for the node
1656  * (to protect performSend(), and the global receive lock (to protect
1657  * performReceive()). By having two locks, we avoid contention between the
1658  * common send and receive operations.
1659  *
1660  * We can have contention between connect/disconnect of one transporter and
1661  * receive for the others. But the transporter code should try to keep this
1662  * lock only briefly, ie. only to set state to DISCONNECTING / socket fd to
1663  * NDB_INVALID_SOCKET, not for the actual close() syscall.
1664  */
1665 void
lock_transporter(NodeId node)1666 trp_callback::lock_transporter(NodeId node)
1667 {
1668   struct thr_repository* rep = &g_thr_repository;
1669   /**
1670    * Note: take the send lock _first_, so that we will not hold the receive
1671    * lock while blocking on the send lock.
1672    *
1673    * The reverse case, blocking send lock for one transporter while waiting
1674    * for receive lock, is not a problem, as the transporter being blocked is
1675    * in any case disconnecting/connecting at this point in time, and sends are
1676    * non-waiting (so we will not block sending on other transporters).
1677    */
1678   lock(&rep->m_send_buffers[node].m_send_lock);
1679   lock(&rep->m_receive_lock);
1680 }
1681 
1682 void
unlock_transporter(NodeId node)1683 trp_callback::unlock_transporter(NodeId node)
1684 {
1685   struct thr_repository* rep = &g_thr_repository;
1686   unlock(&rep->m_receive_lock);
1687   unlock(&rep->m_send_buffers[node].m_send_lock);
1688 }
1689 
1690 int
checkJobBuffer()1691 trp_callback::checkJobBuffer()
1692 {
1693   struct thr_repository* rep = &g_thr_repository;
1694   if (unlikely(check_job_buffers(rep)))
1695   {
1696     do
1697     {
1698       /**
1699        * theoretically (or when we do single threaded by using ndbmtd with
1700        * all in same thread) we should execute signals here...to
1701        * prevent dead-lock, but...with current ndbmtd only CMVMI runs in
1702        * this thread, and other thread is waiting for CMVMI
1703        * except for QMGR open/close connection, but that is not
1704        * (i think) sufficient to create a deadlock
1705        */
1706 
1707       /** FIXME:
1708        *  On a CMT chip where #CPU >= #NDB-threads sched_yield() is
1709        *  effectively a NOOP as there will normally be an idle CPU available
1710        *  to immediately resume thread execution.
1711        *  On a Niagara chip this may severely impact performance as the CPUs
1712        *  are virtualized by timemultiplexing the physical core.
1713        *  The thread should really be 'parked' on
1714        *  a condition to free its execution resources.
1715        */
1716 //    usleep(a-few-usec);  /* A micro-sleep would likely have been better... */
1717 #if defined HAVE_SCHED_YIELD
1718       sched_yield();
1719 #elif defined _WIN32
1720       SwitchToThread();
1721 #else
1722       NdbSleep_MilliSleep(0);
1723 #endif
1724 
1725     } while (check_job_buffers(rep));
1726   }
1727 
1728   return 0;
1729 }
1730 
1731 /**
1732  * Link all send-buffer-pages into *one*
1733  *   single linked list of buffers
1734  *
1735  * TODO: This is not completly fair,
1736  *       it would be better to get one entry from each thr_send_queue
1737  *       per thread instead (until empty)
1738  */
1739 static
1740 Uint32
link_thread_send_buffers(thr_repository::send_buffer * sb,Uint32 node)1741 link_thread_send_buffers(thr_repository::send_buffer * sb, Uint32 node)
1742 {
1743   Uint32 ri[MAX_THREADS];
1744   Uint32 wi[MAX_THREADS];
1745   thr_send_queue * src = g_thr_repository.m_thread_send_buffers[node];
1746   for (unsigned thr = 0; thr < num_threads; thr++)
1747   {
1748     ri[thr] = sb->m_read_index[thr];
1749     wi[thr] = src[thr].m_write_index;
1750   }
1751 
1752   Uint64 sentinel[thr_send_page::HEADER_SIZE >> 1];
1753   thr_send_page* sentinel_page = new (&sentinel[0]) thr_send_page;
1754   sentinel_page->m_next = 0;
1755 
1756   struct thr_send_buffer tmp;
1757   tmp.m_first_page = sentinel_page;
1758   tmp.m_last_page = sentinel_page;
1759 
1760   Uint32 bytes = 0;
1761   for (unsigned thr = 0; thr < num_threads; thr++, src++)
1762   {
1763     Uint32 r = ri[thr];
1764     Uint32 w = wi[thr];
1765     if (r != w)
1766     {
1767       rmb();
1768       while (r != w)
1769       {
1770         thr_send_page * p = src->m_buffers[r];
1771         assert(p->m_start == 0);
1772         bytes += p->m_bytes;
1773         tmp.m_last_page->m_next = p;
1774         while (p->m_next != 0)
1775         {
1776           p = p->m_next;
1777           assert(p->m_start == 0);
1778           bytes += p->m_bytes;
1779         }
1780         tmp.m_last_page = p;
1781         assert(tmp.m_last_page != 0);
1782         r = (r + 1) % thr_send_queue::SIZE;
1783       }
1784       sb->m_read_index[thr] = r;
1785     }
1786   }
1787 
1788   if (bytes)
1789   {
1790     if (sb->m_bytes)
1791     {
1792       assert(sb->m_buffer.m_first_page != 0);
1793       assert(sb->m_buffer.m_last_page != 0);
1794       sb->m_buffer.m_last_page->m_next = tmp.m_first_page->m_next;
1795       sb->m_buffer.m_last_page = tmp.m_last_page;
1796     }
1797     else
1798     {
1799       assert(sb->m_buffer.m_first_page == 0);
1800       assert(sb->m_buffer.m_last_page == 0);
1801       sb->m_buffer.m_first_page = tmp.m_first_page->m_next;
1802       sb->m_buffer.m_last_page = tmp.m_last_page;
1803     }
1804     sb->m_bytes += bytes;
1805   }
1806 
1807   return sb->m_bytes;
1808 }
1809 
1810 Uint32
get_bytes_to_send_iovec(NodeId node,struct iovec * dst,Uint32 max)1811 trp_callback::get_bytes_to_send_iovec(NodeId node,
1812                                       struct iovec *dst, Uint32 max)
1813 {
1814   thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers + node;
1815 
1816   Uint32 bytes = link_thread_send_buffers(sb, node);
1817   if (max == 0 || bytes == 0)
1818     return 0;
1819 
1820   /**
1821    * Process linked-list and put into iovecs
1822    * TODO: Here we would also pack stuff to get better utilization
1823    */
1824   Uint32 tot = 0;
1825   Uint32 pos = 0;
1826   thr_send_page * p = sb->m_buffer.m_first_page;
1827   do {
1828     dst[pos].iov_len = p->m_bytes;
1829     dst[pos].iov_base = p->m_data + p->m_start;
1830     assert(p->m_start + p->m_bytes <= p->max_bytes());
1831     tot += p->m_bytes;
1832     pos++;
1833     max--;
1834     p = p->m_next;
1835   } while (max && p != 0);
1836 
1837   return pos;
1838 }
1839 
1840 static
1841 void
release_list(thread_local_pool<thr_send_page> * pool,thr_send_page * head,thr_send_page * tail)1842 release_list(thread_local_pool<thr_send_page>* pool,
1843              thr_send_page* head, thr_send_page * tail)
1844 {
1845   while (head != tail)
1846   {
1847     thr_send_page * tmp = head;
1848     head = head->m_next;
1849     pool->release_local(tmp);
1850   }
1851   pool->release_local(tail);
1852 }
1853 
1854 
1855 static
1856 Uint32
bytes_sent(thread_local_pool<thr_send_page> * pool,thr_repository::send_buffer * sb,Uint32 bytes)1857 bytes_sent(thread_local_pool<thr_send_page>* pool,
1858            thr_repository::send_buffer* sb, Uint32 bytes)
1859 {
1860   assert(bytes);
1861 
1862   Uint32 remain = bytes;
1863   thr_send_page * prev = 0;
1864   thr_send_page * curr = sb->m_buffer.m_first_page;
1865 
1866   assert(sb->m_bytes >= bytes);
1867   while (remain && remain >= curr->m_bytes)
1868   {
1869     remain -= curr->m_bytes;
1870     prev = curr;
1871     curr = curr->m_next;
1872   }
1873 
1874   Uint32 total_bytes = sb->m_bytes;
1875   if (total_bytes == bytes)
1876   {
1877     /**
1878      * Every thing was released
1879      */
1880     release_list(pool, sb->m_buffer.m_first_page, sb->m_buffer.m_last_page);
1881     sb->m_buffer.m_first_page = 0;
1882     sb->m_buffer.m_last_page = 0;
1883     sb->m_bytes = 0;
1884     return 0;
1885   }
1886   else if (remain)
1887   {
1888     /**
1889      * Half a page was released
1890      */
1891     curr->m_start += remain;
1892     assert(curr->m_bytes > remain);
1893     curr->m_bytes -= remain;
1894     if (prev)
1895     {
1896       release_list(pool, sb->m_buffer.m_first_page, prev);
1897     }
1898   }
1899   else
1900   {
1901     /**
1902      * X full page(s) was released
1903      */
1904     if (prev)
1905     {
1906       release_list(pool, sb->m_buffer.m_first_page, prev);
1907     }
1908     else
1909     {
1910       pool->release_local(sb->m_buffer.m_first_page);
1911     }
1912   }
1913 
1914   sb->m_buffer.m_first_page = curr;
1915   assert(sb->m_bytes > bytes);
1916   sb->m_bytes -= bytes;
1917   return sb->m_bytes;
1918 }
1919 
1920 Uint32
bytes_sent(NodeId node,Uint32 bytes)1921 trp_callback::bytes_sent(NodeId node, Uint32 bytes)
1922 {
1923   thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
1924   Uint32 thr_no = sb->m_send_thread;
1925   assert(thr_no != NO_SEND_THREAD);
1926   return ::bytes_sent(&g_thr_repository.m_thread[thr_no].m_send_buffer_pool,
1927                       sb, bytes);
1928 }
1929 
1930 bool
has_data_to_send(NodeId node)1931 trp_callback::has_data_to_send(NodeId node)
1932 {
1933   return true;
1934 
1935   thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers + node;
1936   Uint32 thr_no = sb->m_send_thread;
1937   assert(thr_no != NO_SEND_THREAD);
1938   assert((sb->m_bytes > 0) == (sb->m_buffer.m_first_page != 0));
1939   if (sb->m_bytes > 0 || sb->m_force_send)
1940     return true;
1941 
1942   thr_send_queue * dst = g_thr_repository.m_thread_send_buffers[node]+thr_no;
1943 
1944   return sb->m_read_index[thr_no] != dst->m_write_index;
1945 }
1946 
1947 void
reset_send_buffer(NodeId node,bool should_be_empty)1948 trp_callback::reset_send_buffer(NodeId node, bool should_be_empty)
1949 {
1950   struct thr_repository *rep = &g_thr_repository;
1951   thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
1952   struct iovec v[32];
1953 
1954   thread_local_pool<thr_send_page> pool(&rep->m_sb_pool, 0);
1955 
1956   lock(&sb->m_send_lock);
1957 
1958   for (;;)
1959   {
1960     Uint32 count = get_bytes_to_send_iovec(node, v, sizeof(v)/sizeof(v[0]));
1961     if (count == 0)
1962       break;
1963     assert(!should_be_empty); // Got data when it should be empty
1964     int bytes = 0;
1965     for (Uint32 i = 0; i < count; i++)
1966       bytes += v[i].iov_len;
1967 
1968     ::bytes_sent(&pool, sb, bytes);
1969   }
1970 
1971   unlock(&sb->m_send_lock);
1972 
1973   pool.release_all(rep->m_mm, RG_TRANSPORTER_BUFFERS);
1974 }
1975 
1976 static inline
1977 void
register_pending_send(thr_data * selfptr,Uint32 nodeId)1978 register_pending_send(thr_data *selfptr, Uint32 nodeId)
1979 {
1980   /* Mark that this node has pending send data. */
1981   if (!selfptr->m_pending_send_mask.get(nodeId))
1982   {
1983     selfptr->m_pending_send_mask.set(nodeId, 1);
1984     Uint32 i = selfptr->m_pending_send_count;
1985     selfptr->m_pending_send_nodes[i] = nodeId;
1986     selfptr->m_pending_send_count = i + 1;
1987   }
1988 }
1989 
1990 /**
1991  * publish thread-locally prepared send-buffer
1992  */
1993 static
1994 void
flush_send_buffer(thr_data * selfptr,Uint32 node)1995 flush_send_buffer(thr_data* selfptr, Uint32 node)
1996 {
1997   Uint32 thr_no = selfptr->m_thr_no;
1998   thr_send_buffer * src = selfptr->m_send_buffers + node;
1999   thr_repository* rep = &g_thr_repository;
2000 
2001   if (src->m_first_page == 0)
2002   {
2003     return;
2004   }
2005   assert(src->m_last_page != 0);
2006 
2007   thr_send_queue * dst = rep->m_thread_send_buffers[node]+thr_no;
2008   thr_repository::send_buffer* sb = rep->m_send_buffers+node;
2009 
2010   Uint32 wi = dst->m_write_index;
2011   Uint32 next = (wi + 1) % thr_send_queue::SIZE;
2012   Uint32 ri = sb->m_read_index[thr_no];
2013 
2014   if (unlikely(next == ri))
2015   {
2016     lock(&sb->m_send_lock);
2017     link_thread_send_buffers(sb, node);
2018     unlock(&sb->m_send_lock);
2019   }
2020 
2021   dst->m_buffers[wi] = src->m_first_page;
2022   wmb();
2023   dst->m_write_index = next;
2024 
2025   src->m_first_page = 0;
2026   src->m_last_page = 0;
2027 }
2028 
2029 /**
2030  * This is used in case send buffer gets full, to force an emergency send,
2031  * hopefully freeing up some buffer space for the next signal.
2032  */
2033 bool
forceSend(NodeId nodeId)2034 mt_send_handle::forceSend(NodeId nodeId)
2035 {
2036   struct thr_repository *rep = &g_thr_repository;
2037   struct thr_data *selfptr = m_selfptr;
2038   struct thr_repository::send_buffer * sb = rep->m_send_buffers + nodeId;
2039 
2040   do
2041   {
2042     sb->m_force_send = 0;
2043     lock(&sb->m_send_lock);
2044     sb->m_send_thread = selfptr->m_thr_no;
2045     globalTransporterRegistry.performSend(nodeId);
2046     sb->m_send_thread = NO_SEND_THREAD;
2047     unlock(&sb->m_send_lock);
2048   } while (sb->m_force_send);
2049 
2050   selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
2051 
2052   return true;
2053 }
2054 
2055 /**
2056  * try sending data
2057  */
2058 static
2059 void
try_send(thr_data * selfptr,Uint32 node)2060 try_send(thr_data * selfptr, Uint32 node)
2061 {
2062   struct thr_repository *rep = &g_thr_repository;
2063   struct thr_repository::send_buffer * sb = rep->m_send_buffers + node;
2064 
2065   do
2066   {
2067     if (trylock(&sb->m_send_lock) != 0)
2068     {
2069       return;
2070     }
2071 
2072     sb->m_force_send = 0;
2073     mb();
2074 
2075     sb->m_send_thread = selfptr->m_thr_no;
2076     globalTransporterRegistry.performSend(node);
2077     sb->m_send_thread = NO_SEND_THREAD;
2078     unlock(&sb->m_send_lock);
2079   } while (sb->m_force_send);
2080 
2081   selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
2082 }
2083 
2084 /**
2085  * Flush send buffers and append them to dst. nodes send queue
2086  *
2087  * Flushed buffer contents are piggybacked when another thread
2088  * do_send() to the same dst. node. This makes it possible to have
2089  * more data included in each message, and thereby reduces total
2090  * #messages handled by the OS which really impacts performance!
2091  */
2092 static
2093 void
do_flush(struct thr_data * selfptr)2094 do_flush(struct thr_data* selfptr)
2095 {
2096   Uint32 i;
2097   Uint32 count = selfptr->m_pending_send_count;
2098   Uint8 *nodes = selfptr->m_pending_send_nodes;
2099 
2100   for (i = 0; i < count; i++)
2101   {
2102     flush_send_buffer(selfptr, nodes[i]);
2103   }
2104 }
2105 
2106 /**
2107  * Send any pending data to remote nodes.
2108  *
2109  * If MUST_SEND is false, will only try to lock the send lock, but if it would
2110  * block, that node is skipped, to be tried again next time round.
2111  *
2112  * If MUST_SEND is true, will always take the lock, waiting on it if needed.
2113  *
2114  * The list of pending nodes to send to is thread-local, but the per-node send
2115  * buffer is shared by all threads. Thus we might skip a node for which
2116  * another thread has pending send data, and we might send pending data also
2117  * for another thread without clearing the node from the pending list of that
2118  * other thread (but we will never loose signals due to this).
2119  */
2120 static
2121 Uint32
do_send(struct thr_data * selfptr,bool must_send)2122 do_send(struct thr_data* selfptr, bool must_send)
2123 {
2124   Uint32 i;
2125   Uint32 count = selfptr->m_pending_send_count;
2126   Uint8 *nodes = selfptr->m_pending_send_nodes;
2127   struct thr_repository* rep = &g_thr_repository;
2128 
2129   if (count == 0)
2130   {
2131     return 0; // send-buffers empty
2132   }
2133 
2134   /* Clear the pending list. */
2135   selfptr->m_pending_send_mask.clear();
2136   selfptr->m_pending_send_count = 0;
2137 
2138   for (i = 0; i < count; i++)
2139   {
2140     Uint32 node = nodes[i];
2141     selfptr->m_watchdog_counter = 6;
2142 
2143     flush_send_buffer(selfptr, node);
2144 
2145     thr_repository::send_buffer * sb = rep->m_send_buffers + node;
2146 
2147     /**
2148      * If we must send now, set the force_send flag.
2149      *
2150      * This will ensure that if we do not get the send lock, the thread
2151      * holding the lock will try sending again for us when it has released
2152      * the lock.
2153      *
2154      * The lock/unlock pair works as a memory barrier to ensure that the
2155      * flag update is flushed to the other thread.
2156      */
2157     if (must_send)
2158     {
2159       sb->m_force_send = 1;
2160     }
2161 
2162     do
2163     {
2164       if (trylock(&sb->m_send_lock) != 0)
2165       {
2166         if (!must_send)
2167         {
2168           /**
2169            * Not doing this node now, re-add to pending list.
2170            *
2171            * As we only add from the start of an empty list, we are safe from
2172            * overwriting the list while we are iterating over it.
2173            */
2174           register_pending_send(selfptr, node);
2175         }
2176         else
2177         {
2178           /* Other thread will send for us as we set m_force_send. */
2179         }
2180         break;
2181       }
2182 
2183       /**
2184        * Now clear the flag, and start sending all data available to this node.
2185        *
2186        * Put a memory barrier here, so that if another thread tries to grab
2187        * the send lock but fails due to us holding it here, we either
2188        * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or
2189        * 2) We clear here the flag just set by the other thread, but then we
2190        * will (thanks to mb()) be able to see and send all of the data already
2191        * in the first send iteration.
2192        */
2193       sb->m_force_send = 0;
2194       mb();
2195 
2196       /**
2197        * Set m_send_thr so that our transporter callback can know which thread
2198        * holds the send lock for this remote node.
2199        */
2200       sb->m_send_thread = selfptr->m_thr_no;
2201       int res = globalTransporterRegistry.performSend(node);
2202       sb->m_send_thread = NO_SEND_THREAD;
2203       unlock(&sb->m_send_lock);
2204       if (res)
2205       {
2206         register_pending_send(selfptr, node);
2207       }
2208     } while (sb->m_force_send);
2209   }
2210 
2211   selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
2212 
2213   return selfptr->m_pending_send_count;
2214 }
2215 
2216 Uint32 *
getWritePtr(NodeId node,Uint32 len,Uint32 prio,Uint32 max)2217 mt_send_handle::getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max)
2218 {
2219   struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
2220   thr_send_page * p = b->m_last_page;
2221   if ((p != 0) && (p->m_bytes + p->m_start + len <= thr_send_page::max_bytes()))
2222   {
2223     return (Uint32*)(p->m_data + p->m_start + p->m_bytes);
2224   }
2225   else if (p != 0)
2226   {
2227     // TODO: maybe dont always flush on page-boundary ???
2228     flush_send_buffer(m_selfptr, node);
2229     try_send(m_selfptr, node);
2230   }
2231 
2232   if ((p = m_selfptr->m_send_buffer_pool.seize(g_thr_repository.m_mm,
2233                                                RG_TRANSPORTER_BUFFERS)) != 0)
2234   {
2235     p->m_bytes = 0;
2236     p->m_start = 0;
2237     p->m_next = 0;
2238     b->m_first_page = b->m_last_page = p;
2239     return (Uint32*)p->m_data;
2240   }
2241   return 0;
2242 }
2243 
2244 Uint32
updateWritePtr(NodeId node,Uint32 lenBytes,Uint32 prio)2245 mt_send_handle::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
2246 {
2247   struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
2248   thr_send_page * p = b->m_last_page;
2249   p->m_bytes += lenBytes;
2250   return p->m_bytes;
2251 }
2252 
2253 /*
2254  * Insert a signal in a job queue.
2255  *
2256  * The signal is not visible to consumers yet after return from this function,
2257  * only recorded in the thr_jb_write_state. It is necessary to first call
2258  * flush_write_state() for this.
2259  *
2260  * The new_buffer is a job buffer to use if the current one gets full. If used,
2261  * we return true, indicating that the caller should allocate a new one for
2262  * the next call. (This is done to allow to insert under lock, but do the
2263  * allocation outside the lock).
2264  */
2265 static inline
2266 bool
insert_signal(thr_job_queue * q,thr_jb_write_state * w,Uint32 prioa,const SignalHeader * sh,const Uint32 * data,const Uint32 secPtr[3],thr_job_buffer * new_buffer)2267 insert_signal(thr_job_queue *q, thr_jb_write_state *w, Uint32 prioa,
2268               const SignalHeader* sh, const Uint32 *data,
2269               const Uint32 secPtr[3], thr_job_buffer *new_buffer)
2270 {
2271   Uint32 write_pos = w->m_write_pos;
2272   Uint32 datalen = sh->theLength;
2273   assert(w->m_write_buffer == q->m_buffers[w->m_write_index]);
2274   memcpy(w->m_write_buffer->m_data + write_pos, sh, sizeof(*sh));
2275   write_pos += (sizeof(*sh) >> 2);
2276   memcpy(w->m_write_buffer->m_data + write_pos, data, 4*datalen);
2277   write_pos += datalen;
2278   const Uint32 *p= secPtr;
2279   for (Uint32 i = 0; i < sh->m_noOfSections; i++)
2280     w->m_write_buffer->m_data[write_pos++] = *p++;
2281   w->m_pending_signals++;
2282 
2283 #if SIZEOF_CHARP == 8
2284   /* Align to 8-byte boundary, to ensure aligned copies. */
2285   write_pos= (write_pos+1) & ~((Uint32)1);
2286 #endif
2287 
2288   /*
2289    * We make sure that there is always room for at least one signal in the
2290    * current buffer in the queue, so one insert is always possible without
2291    * adding a new buffer.
2292    */
2293   if (likely(write_pos + 32 <= thr_job_buffer::SIZE))
2294   {
2295     w->m_write_pos = write_pos;
2296     return false;
2297   }
2298   else
2299   {
2300     /*
2301      * Need a write memory barrier here, as this might make signal data visible
2302      * to other threads.
2303      *
2304      * ToDo: We actually only need the wmb() here if we already make this
2305      * buffer visible to the other thread. So we might optimize it a bit. But
2306      * wmb() is a no-op on x86 anyway...
2307      */
2308     wmb();
2309     w->m_write_buffer->m_len = write_pos;
2310     Uint32 write_index = (w->m_write_index + 1) % thr_job_queue::SIZE;
2311 
2312     /**
2313      * Full job buffer is fatal.
2314      *
2315      * ToDo: should we wait for it to become non-full? There is no guarantee
2316      * that this will actually happen...
2317      *
2318      * Or alternatively, ndbrequire() ?
2319      */
2320     if (unlikely(write_index == q->m_head->m_read_index))
2321     {
2322       job_buffer_full(0);
2323     }
2324     new_buffer->m_len = 0;
2325     new_buffer->m_prioa = prioa;
2326     q->m_buffers[write_index] = new_buffer;
2327     w->m_write_index = write_index;
2328     w->m_write_pos = 0;
2329     w->m_write_buffer = new_buffer;
2330     return true;                // Buffer new_buffer used
2331   }
2332 
2333   return false;                 // Buffer new_buffer not used
2334 }
2335 
2336 static
2337 void
read_jbb_state(thr_data * selfptr,Uint32 count)2338 read_jbb_state(thr_data *selfptr, Uint32 count)
2339 {
2340 
2341   thr_jb_read_state *r = selfptr->m_read_states;
2342   const thr_job_queue *q = selfptr->m_in_queue;
2343   for (Uint32 i = 0; i < count; i++,r++,q++)
2344   {
2345     Uint32 read_index = r->m_read_index;
2346 
2347     /**
2348      * Optimization: Only reload when possibly empty.
2349      * Avoid cache reload of shared thr_job_queue_head
2350      */
2351     if (r->m_write_index == read_index)
2352     {
2353       r->m_write_index = q->m_head->m_write_index;
2354       read_barrier_depends();
2355       r->m_read_end = q->m_buffers[read_index]->m_len;
2356     }
2357   }
2358 }
2359 
2360 static
2361 bool
read_jba_state(thr_data * selfptr)2362 read_jba_state(thr_data *selfptr)
2363 {
2364   thr_jb_read_state *r = &(selfptr->m_jba_read_state);
2365   r->m_write_index = selfptr->m_jba_head.m_write_index;
2366   read_barrier_depends();
2367   r->m_read_end = selfptr->m_jba.m_buffers[r->m_read_index]->m_len;
2368   return r->is_empty();
2369 }
2370 
2371 /* Check all job queues, return true only if all are empty. */
2372 static bool
check_queues_empty(thr_data * selfptr)2373 check_queues_empty(thr_data *selfptr)
2374 {
2375   Uint32 thr_count = g_thr_repository.m_thread_count;
2376   bool empty = read_jba_state(selfptr);
2377   if (!empty)
2378     return false;
2379 
2380   read_jbb_state(selfptr, thr_count);
2381   const thr_jb_read_state *r = selfptr->m_read_states;
2382   for (Uint32 i = 0; i < thr_count; i++,r++)
2383   {
2384     if (!r->is_empty())
2385       return false;
2386   }
2387   return true;
2388 }
2389 
2390 /*
2391  * Execute at most MAX_SIGNALS signals from one job queue, updating local read
2392  * state as appropriate.
2393  *
2394  * Returns number of signals actually executed.
2395  */
2396 static
2397 Uint32
execute_signals(thr_data * selfptr,thr_job_queue * q,thr_jb_read_state * r,Signal * sig,Uint32 max_signals,Uint32 * signalIdCounter)2398 execute_signals(thr_data *selfptr, thr_job_queue *q, thr_jb_read_state *r,
2399                 Signal *sig, Uint32 max_signals, Uint32 *signalIdCounter)
2400 {
2401   Uint32 num_signals;
2402   Uint32 read_index = r->m_read_index;
2403   Uint32 write_index = r->m_write_index;
2404   Uint32 read_pos = r->m_read_pos;
2405   Uint32 read_end = r->m_read_end;
2406   Uint32 *watchDogCounter = &selfptr->m_watchdog_counter;
2407 
2408   if (read_index == write_index && read_pos >= read_end)
2409     return 0;          // empty read_state
2410 
2411   thr_job_buffer *read_buffer = r->m_read_buffer;
2412 
2413   for (num_signals = 0; num_signals < max_signals; num_signals++)
2414   {
2415     while (read_pos >= read_end)
2416     {
2417       if (read_index == write_index)
2418       {
2419         /* No more available now. */
2420         return num_signals;
2421       }
2422       else
2423       {
2424         /* Move to next buffer. */
2425         read_index = (read_index + 1) % thr_job_queue::SIZE;
2426         release_buffer(&g_thr_repository, selfptr->m_thr_no, read_buffer);
2427         read_buffer = q->m_buffers[read_index];
2428         read_pos = 0;
2429         read_end = read_buffer->m_len;
2430         /* Update thread-local read state. */
2431         r->m_read_index = q->m_head->m_read_index = read_index;
2432         r->m_read_buffer = read_buffer;
2433         r->m_read_pos = read_pos;
2434         r->m_read_end = read_end;
2435       }
2436     }
2437 
2438     /*
2439      * These pre-fetching were found using OProfile to reduce cache misses.
2440      * (Though on Intel Core 2, they do not give much speedup, as apparently
2441      * the hardware prefetcher is already doing a fairly good job).
2442      */
2443     NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 16);
2444     NDB_PREFETCH_WRITE ((Uint32 *)&sig->header + 16);
2445 
2446     /* Now execute the signal. */
2447     SignalHeader* s =
2448       reinterpret_cast<SignalHeader*>(read_buffer->m_data + read_pos);
2449     Uint32 seccnt = s->m_noOfSections;
2450     Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
2451     if(siglen>16)
2452     {
2453       NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 32);
2454     }
2455     Uint32 bno = blockToMain(s->theReceiversBlockNumber);
2456     Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
2457     SimulatedBlock* block = globalData.mt_getBlock(bno, ino);
2458     assert(block != 0);
2459 
2460     Uint32 gsn = s->theVerId_signalNumber;
2461     *watchDogCounter = 1;
2462     /* Must update original buffer so signal dump will see it. */
2463     s->theSignalId = (*signalIdCounter)++;
2464     memcpy(&sig->header, s, 4*siglen);
2465     sig->m_sectionPtrI[0] = read_buffer->m_data[read_pos + siglen + 0];
2466     sig->m_sectionPtrI[1] = read_buffer->m_data[read_pos + siglen + 1];
2467     sig->m_sectionPtrI[2] = read_buffer->m_data[read_pos + siglen + 2];
2468 
2469     read_pos += siglen + seccnt;
2470 #if SIZEOF_CHARP == 8
2471     /* Handle 8-byte alignment. */
2472     read_pos = (read_pos + 1) & ~((Uint32)1);
2473 #endif
2474 
2475     /* Update just before execute so signal dump can know how far we are. */
2476     r->m_read_pos = read_pos;
2477 
2478 #ifdef VM_TRACE
2479     if (globalData.testOn)
2480     { //wl4391_todo segments
2481       SegmentedSectionPtr ptr[3];
2482       ptr[0].i = sig->m_sectionPtrI[0];
2483       ptr[1].i = sig->m_sectionPtrI[1];
2484       ptr[2].i = sig->m_sectionPtrI[2];
2485       ::getSections(seccnt, ptr);
2486       globalSignalLoggers.executeSignal(*s,
2487                                         0,
2488                                         &sig->theData[0],
2489                                         globalData.ownId,
2490                                         ptr, seccnt);
2491     }
2492 #endif
2493 
2494     block->executeFunction(gsn, sig);
2495   }
2496 
2497   return num_signals;
2498 }
2499 
2500 static
2501 Uint32
run_job_buffers(thr_data * selfptr,Signal * sig,Uint32 * signalIdCounter)2502 run_job_buffers(thr_data *selfptr, Signal *sig, Uint32 *signalIdCounter)
2503 {
2504   Uint32 thr_count = g_thr_repository.m_thread_count;
2505   Uint32 signal_count = 0;
2506   Uint32 perjb = selfptr->m_max_signals_per_jb;
2507 
2508   read_jbb_state(selfptr, thr_count);
2509   /*
2510    * A load memory barrier to ensure that we see any prio A signal sent later
2511    * than loaded prio B signals.
2512    */
2513   rmb();
2514 
2515   thr_job_queue *queue = selfptr->m_in_queue;
2516   thr_jb_read_state *read_state = selfptr->m_read_states;
2517   for (Uint32 send_thr_no = 0; send_thr_no < thr_count;
2518        send_thr_no++,queue++,read_state++)
2519   {
2520     /* Read the prio A state often, to avoid starvation of prio A. */
2521     bool jba_empty = read_jba_state(selfptr);
2522     if (!jba_empty)
2523     {
2524       static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
2525       signal_count += execute_signals(selfptr, &(selfptr->m_jba),
2526                                       &(selfptr->m_jba_read_state), sig,
2527                                       max_prioA, signalIdCounter);
2528     }
2529 
2530     /* Now execute prio B signals from one thread. */
2531     signal_count += execute_signals(selfptr, queue, read_state,
2532                                     sig, perjb, signalIdCounter);
2533   }
2534 
2535   return signal_count;
2536 }
2537 
2538 struct thr_map_entry {
2539   enum { NULL_THR_NO = 0xFF };
2540   Uint8 thr_no;
thr_map_entrythr_map_entry2541   thr_map_entry() : thr_no(NULL_THR_NO) {}
2542 };
2543 
2544 static struct thr_map_entry thr_map[NO_OF_BLOCKS][MAX_BLOCK_INSTANCES];
2545 
2546 static inline Uint32
block2ThreadId(Uint32 block,Uint32 instance)2547 block2ThreadId(Uint32 block, Uint32 instance)
2548 {
2549   assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
2550   Uint32 index = block - MIN_BLOCK_NO;
2551   assert(instance < MAX_BLOCK_INSTANCES);
2552   const thr_map_entry& entry = thr_map[index][instance];
2553   assert(entry.thr_no < num_threads);
2554   return entry.thr_no;
2555 }
2556 
2557 void
add_thr_map(Uint32 main,Uint32 instance,Uint32 thr_no)2558 add_thr_map(Uint32 main, Uint32 instance, Uint32 thr_no)
2559 {
2560   assert(main == blockToMain(main));
2561   Uint32 index = main - MIN_BLOCK_NO;
2562   assert(index < NO_OF_BLOCKS);
2563   assert(instance < MAX_BLOCK_INSTANCES);
2564 
2565   SimulatedBlock* b = globalData.getBlock(main, instance);
2566   require(b != 0);
2567 
2568   /* Block number including instance. */
2569   Uint32 block = numberToBlock(main, instance);
2570 
2571   require(thr_no < num_threads);
2572   struct thr_repository* rep = &g_thr_repository;
2573   thr_data* thr_ptr = rep->m_thread + thr_no;
2574 
2575   /* Add to list. */
2576   {
2577     Uint32 i;
2578     for (i = 0; i < thr_ptr->m_instance_count; i++)
2579       require(thr_ptr->m_instance_list[i] != block);
2580   }
2581   require(thr_ptr->m_instance_count < MAX_INSTANCES_PER_THREAD);
2582   thr_ptr->m_instance_list[thr_ptr->m_instance_count++] = block;
2583 
2584   SimulatedBlock::ThreadContext ctx;
2585   ctx.threadId = thr_no;
2586   ctx.jamBuffer = &thr_ptr->m_jam;
2587   ctx.watchDogCounter = &thr_ptr->m_watchdog_counter;
2588   ctx.sectionPoolCache = &thr_ptr->m_sectionPoolCache;
2589   b->assignToThread(ctx);
2590 
2591   /* Create entry mapping block to thread. */
2592   thr_map_entry& entry = thr_map[index][instance];
2593   require(entry.thr_no == thr_map_entry::NULL_THR_NO);
2594   entry.thr_no = thr_no;
2595 }
2596 
2597 /* Static assignment of main instances (before first signal). */
2598 void
add_main_thr_map()2599 add_main_thr_map()
2600 {
2601   /* Keep mt-classic assignments in MT LQH. */
2602   const Uint32 thr_GLOBAL = 0;
2603   const Uint32 thr_LOCAL = 1;
2604   const Uint32 thr_RECEIVER = receiver_thread_no;
2605 
2606   add_thr_map(BACKUP, 0, thr_LOCAL);
2607   add_thr_map(DBTC, 0, thr_GLOBAL);
2608   add_thr_map(DBDIH, 0, thr_GLOBAL);
2609   add_thr_map(DBLQH, 0, thr_LOCAL);
2610   add_thr_map(DBACC, 0, thr_LOCAL);
2611   add_thr_map(DBTUP, 0, thr_LOCAL);
2612   add_thr_map(DBDICT, 0, thr_GLOBAL);
2613   add_thr_map(NDBCNTR, 0, thr_GLOBAL);
2614   add_thr_map(QMGR, 0, thr_GLOBAL);
2615   add_thr_map(NDBFS, 0, thr_GLOBAL);
2616   add_thr_map(CMVMI, 0, thr_RECEIVER);
2617   add_thr_map(TRIX, 0, thr_GLOBAL);
2618   add_thr_map(DBUTIL, 0, thr_GLOBAL);
2619   add_thr_map(SUMA, 0, thr_LOCAL);
2620   add_thr_map(DBTUX, 0, thr_LOCAL);
2621   add_thr_map(TSMAN, 0, thr_LOCAL);
2622   add_thr_map(LGMAN, 0, thr_LOCAL);
2623   add_thr_map(PGMAN, 0, thr_LOCAL);
2624   add_thr_map(RESTORE, 0, thr_LOCAL);
2625   add_thr_map(DBINFO, 0, thr_LOCAL);
2626   add_thr_map(DBSPJ, 0, thr_GLOBAL);
2627 }
2628 
2629 /* Workers added by LocalProxy (before first signal). */
2630 void
add_lqh_worker_thr_map(Uint32 block,Uint32 instance)2631 add_lqh_worker_thr_map(Uint32 block, Uint32 instance)
2632 {
2633   require(instance != 0);
2634   Uint32 i = instance - 1;
2635   Uint32 thr_no = NUM_MAIN_THREADS + i % num_lqh_threads;
2636   add_thr_map(block, instance, thr_no);
2637 }
2638 
2639 /* Extra workers run`in proxy thread. */
2640 void
add_extra_worker_thr_map(Uint32 block,Uint32 instance)2641 add_extra_worker_thr_map(Uint32 block, Uint32 instance)
2642 {
2643   require(instance != 0);
2644   Uint32 thr_no = block2ThreadId(block, 0);
2645   add_thr_map(block, instance, thr_no);
2646 }
2647 
2648 /**
2649  * create the duplicate entries needed so that
2650  *   sender doesnt need to know how many instances there
2651  *   actually are in this node...
2652  *
2653  * if only 1 instance...then duplicate that for all slots
2654  * else assume instance 0 is proxy...and duplicate workers (modulo)
2655  *
2656  * NOTE: extra pgman worker is instance 5
2657  */
2658 void
finalize_thr_map()2659 finalize_thr_map()
2660 {
2661   for (Uint32 b = 0; b < NO_OF_BLOCKS; b++)
2662   {
2663     Uint32 bno = b + MIN_BLOCK_NO;
2664     Uint32 cnt = 0;
2665     while (cnt < MAX_BLOCK_INSTANCES &&
2666            thr_map[b][cnt].thr_no != thr_map_entry::NULL_THR_NO)
2667       cnt++;
2668 
2669     if (cnt != MAX_BLOCK_INSTANCES)
2670     {
2671       SimulatedBlock * main = globalData.getBlock(bno, 0);
2672       for (Uint32 i = cnt; i < MAX_BLOCK_INSTANCES; i++)
2673       {
2674         Uint32 dup = (cnt == 1) ? 0 : 1 + ((i - 1) % (cnt - 1));
2675         if (thr_map[b][i].thr_no == thr_map_entry::NULL_THR_NO)
2676         {
2677           thr_map[b][i] = thr_map[b][dup];
2678           main->addInstance(globalData.getBlock(bno, dup), i);
2679         }
2680         else
2681         {
2682           /**
2683            * extra pgman instance
2684            */
2685           require(bno == PGMAN);
2686         }
2687       }
2688     }
2689   }
2690 }
2691 
reportSignalStats(Uint32 self,Uint32 a_count,Uint32 a_size,Uint32 b_count,Uint32 b_size)2692 static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size,
2693                               Uint32 b_count, Uint32 b_size)
2694 {
2695   SignalT<6> sT;
2696   Signal *s= new (&sT) Signal(0);
2697 
2698   memset(&s->header, 0, sizeof(s->header));
2699   s->header.theLength = 6;
2700   s->header.theSendersSignalId = 0;
2701   s->header.theSendersBlockRef = numberToRef(0, 0);
2702   s->header.theVerId_signalNumber = GSN_EVENT_REP;
2703   s->header.theReceiversBlockNumber = CMVMI;
2704   s->theData[0] = NDB_LE_MTSignalStatistics;
2705   s->theData[1] = self;
2706   s->theData[2] = a_count;
2707   s->theData[3] = a_size;
2708   s->theData[4] = b_count;
2709   s->theData[5] = b_size;
2710   /* ToDo: need this really be prio A like in old code? */
2711   sendlocal(self, &s->header, s->theData,
2712             NULL);
2713 }
2714 
2715 static inline void
update_sched_stats(thr_data * selfptr)2716 update_sched_stats(thr_data *selfptr)
2717 {
2718   if(selfptr->m_prioa_count + selfptr->m_priob_count >= 2000000)
2719   {
2720     reportSignalStats(selfptr->m_thr_no,
2721                       selfptr->m_prioa_count,
2722                       selfptr->m_prioa_size,
2723                       selfptr->m_priob_count,
2724                       selfptr->m_priob_size);
2725     selfptr->m_prioa_count = 0;
2726     selfptr->m_prioa_size = 0;
2727     selfptr->m_priob_count = 0;
2728     selfptr->m_priob_size = 0;
2729 
2730 #if 0
2731     Uint32 thr_no = selfptr->m_thr_no;
2732     ndbout_c("--- %u fifo: %u jba: %u global: %u",
2733              thr_no,
2734              fifo_used_pages(selfptr),
2735              selfptr->m_jba_head.used(),
2736              g_thr_repository.m_free_list.m_cnt);
2737     for (Uint32 i = 0; i<num_threads; i++)
2738     {
2739       ndbout_c("  %u-%u : %u",
2740                thr_no, i, selfptr->m_in_queue_head[i].used());
2741     }
2742 #endif
2743   }
2744 }
2745 
2746 static void
init_thread(thr_data * selfptr)2747 init_thread(thr_data *selfptr)
2748 {
2749   selfptr->m_waiter.init();
2750   selfptr->m_jam.theEmulatedJamIndex = 0;
2751   selfptr->m_jam.theEmulatedJamBlockNumber = 0;
2752   bzero(selfptr->m_jam.theEmulatedJam, sizeof(selfptr->m_jam.theEmulatedJam));
2753   NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &selfptr->m_jam);
2754   NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
2755 
2756   unsigned thr_no = selfptr->m_thr_no;
2757   globalEmulatorData.theWatchDog->
2758     registerWatchedThread(&selfptr->m_watchdog_counter, thr_no);
2759   {
2760     while(selfptr->m_thread == 0)
2761       NdbSleep_MilliSleep(30);
2762   }
2763 
2764   THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
2765   BaseString tmp;
2766   tmp.appfmt("thr: %u ", thr_no);
2767 
2768   int tid = NdbThread_GetTid(selfptr->m_thread);
2769   if (tid != -1)
2770   {
2771     tmp.appfmt("tid: %u ", tid);
2772   }
2773 
2774   conf.appendInfo(tmp,
2775                   selfptr->m_instance_list, selfptr->m_instance_count);
2776   int res = conf.do_bind(selfptr->m_thread,
2777                          selfptr->m_instance_list, selfptr->m_instance_count);
2778   if (res < 0)
2779   {
2780     tmp.appfmt("err: %d ", -res);
2781   }
2782   else if (res > 0)
2783   {
2784     tmp.appfmt("OK ");
2785   }
2786 
2787   selfptr->m_thr_id = pthread_self();
2788 
2789   for (Uint32 i = 0; i < selfptr->m_instance_count; i++)
2790   {
2791     BlockReference block = selfptr->m_instance_list[i];
2792     Uint32 main = blockToMain(block);
2793     Uint32 instance = blockToInstance(block);
2794     tmp.appfmt("%s(%u) ", getBlockName(main), instance);
2795   }
2796   printf("%s\n", tmp.c_str());
2797   fflush(stdout);
2798 }
2799 
2800 /**
2801  * Align signal buffer for better cache performance.
2802  * Also skew it a litte for each thread to avoid cache pollution.
2803  */
2804 #define SIGBUF_SIZE (sizeof(Signal) + 63 + 256 * MAX_THREADS)
2805 static Signal *
aligned_signal(unsigned char signal_buf[SIGBUF_SIZE],unsigned thr_no)2806 aligned_signal(unsigned char signal_buf[SIGBUF_SIZE], unsigned thr_no)
2807 {
2808   UintPtr sigtmp= (UintPtr)signal_buf;
2809   sigtmp= (sigtmp+63) & (~(UintPtr)63);
2810   sigtmp+= thr_no*256;
2811   return (Signal *)sigtmp;
2812 }
2813 
2814 Uint32 receiverThreadId;
2815 
2816 /*
2817  * We only do receive in thread 2, no other threads do receive.
2818  *
2819  * As part of the receive loop, we also periodically call update_connections()
2820  * (this way we are similar to single-threaded ndbd).
2821  *
2822  * The CMVMI block (and no other blocks) run in the same thread as this
2823  * receive loop; this way we avoid races between update_connections() and
2824  * CMVMI calls into the transporters.
2825  *
2826  * Note that with this setup, local signals to CMVMI cannot wake up the thread
2827  * if it is sleeping on the receive sockets. Thus CMVMI local signal processing
2828  * can be (slightly) delayed, however CMVMI is not really performance critical
2829  * (hopefully).
2830  */
2831 extern "C"
2832 void *
mt_receiver_thread_main(void * thr_arg)2833 mt_receiver_thread_main(void *thr_arg)
2834 {
2835   unsigned char signal_buf[SIGBUF_SIZE];
2836   Signal *signal;
2837   struct thr_repository* rep = &g_thr_repository;
2838   struct thr_data* selfptr = (struct thr_data *)thr_arg;
2839   unsigned thr_no = selfptr->m_thr_no;
2840   Uint32& watchDogCounter = selfptr->m_watchdog_counter;
2841   Uint32 thrSignalId = 0;
2842   bool has_received = false;
2843 
2844   init_thread(selfptr);
2845   receiverThreadId = thr_no;
2846   signal = aligned_signal(signal_buf, thr_no);
2847 
2848   while (globalData.theRestartFlag != perform_stop)
2849   {
2850     static int cnt = 0;
2851 
2852     update_sched_stats(selfptr);
2853 
2854     if (cnt == 0)
2855     {
2856       watchDogCounter = 5;
2857       globalTransporterRegistry.update_connections();
2858     }
2859     cnt = (cnt + 1) & 15;
2860 
2861     watchDogCounter = 2;
2862 
2863     NDB_TICKS now = NdbTick_CurrentMillisecond();
2864     scan_time_queues(selfptr, now);
2865 
2866     Uint32 sum = run_job_buffers(selfptr, signal, &thrSignalId);
2867 
2868     if (sum || has_received)
2869     {
2870       watchDogCounter = 6;
2871       flush_jbb_write_state(selfptr);
2872     }
2873 
2874     do_send(selfptr, TRUE);
2875 
2876     watchDogCounter = 7;
2877 
2878     has_received = false;
2879     if (globalTransporterRegistry.pollReceive(1))
2880     {
2881       if (check_job_buffers(rep) == 0)
2882       {
2883 	watchDogCounter = 8;
2884         lock(&rep->m_receive_lock);
2885         globalTransporterRegistry.performReceive();
2886         unlock(&rep->m_receive_lock);
2887         has_received = true;
2888       }
2889     }
2890   }
2891 
2892   globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
2893   return NULL;                  // Return value not currently used
2894 }
2895 
2896 static
2897 inline
2898 void
sendpacked(struct thr_data * thr_ptr,Signal * signal)2899 sendpacked(struct thr_data* thr_ptr, Signal* signal)
2900 {
2901   Uint32 i;
2902   for (i = 0; i < thr_ptr->m_instance_count; i++)
2903   {
2904     BlockReference block = thr_ptr->m_instance_list[i];
2905     Uint32 main = blockToMain(block);
2906     Uint32 instance = blockToInstance(block);
2907     SimulatedBlock* b = globalData.getBlock(main, instance);
2908     // wl4391_todo remove useless assert
2909     assert(b != 0 && b->getThreadId() == thr_ptr->m_thr_no);
2910     /* b->send_at_job_buffer_end(); */
2911     b->executeFunction(GSN_SEND_PACKED, signal);
2912   }
2913 }
2914 
2915 /**
2916  * check if out-queues of selfptr is full
2917  * return true is so
2918  */
2919 static bool
check_job_buffer_full(thr_data * selfptr)2920 check_job_buffer_full(thr_data *selfptr)
2921 {
2922   Uint32 thr_no = selfptr->m_thr_no;
2923   Uint32 tmp = compute_max_signals_to_execute(thr_no);
2924 #if 0
2925   Uint32 perjb = tmp / g_thr_repository.m_thread_count;
2926 
2927   if (perjb == 0)
2928   {
2929     return true;
2930   }
2931 
2932   return false;
2933 #else
2934   if (tmp < g_thr_repository.m_thread_count)
2935     return true;
2936   return false;
2937 #endif
2938 }
2939 
2940 /**
2941  * update_sched_config
2942  *
2943  *   In order to prevent "job-buffer-full", i.e
2944  *     that one thread(T1) produces so much signals to another thread(T2)
2945  *     so that the ring-buffer from T1 to T2 gets full
2946  *     the mainlop have 2 "config" variables
2947  *   - m_max_exec_signals
2948  *     This is the *total* no of signals T1 can execute before calling
2949  *     this method again
2950  *   - m_max_signals_per_jb
2951  *     This is the max no of signals T1 can execute from each other thread
2952  *     in system
2953  *
2954  *   Assumption: each signal may send *at most* 4 signals
2955  *     - this assumption is made the same in ndbd and ndbmtd and is
2956  *       mostly followed by block-code, although not it all places :-(
2957  *
2958  *   This function return true, if it it slept
2959  *     (i.e that it concluded that it could not execute *any* signals, wo/
2960  *      risking job-buffer-full)
2961  */
2962 static
2963 bool
update_sched_config(struct thr_data * selfptr,Uint32 pending_send)2964 update_sched_config(struct thr_data* selfptr, Uint32 pending_send)
2965 {
2966   Uint32 sleeploop = 0;
2967   Uint32 thr_no = selfptr->m_thr_no;
2968 loop:
2969   Uint32 tmp = compute_max_signals_to_execute(thr_no);
2970   Uint32 perjb = tmp / g_thr_repository.m_thread_count;
2971 
2972   if (perjb > MAX_SIGNALS_PER_JB)
2973     perjb = MAX_SIGNALS_PER_JB;
2974 
2975   selfptr->m_max_exec_signals = tmp;
2976   selfptr->m_max_signals_per_jb = perjb;
2977 
2978   if (unlikely(perjb == 0))
2979   {
2980     sleeploop++;
2981     if (sleeploop == 10)
2982     {
2983       /**
2984        * we've slept for 10ms...try running anyway
2985        */
2986       selfptr->m_max_signals_per_jb = 1;
2987       ndbout_c("%u - sleeploop 10!!", selfptr->m_thr_no);
2988       return true;
2989     }
2990 
2991     if (pending_send)
2992     {
2993       /* About to sleep, _must_ send now. */
2994       pending_send = do_send(selfptr, TRUE);
2995     }
2996 
2997     const Uint32 wait = 1000000;    /* 1 ms */
2998     yield(&selfptr->m_waiter, wait, check_job_buffer_full, selfptr);
2999     goto loop;
3000   }
3001 
3002   return sleeploop > 0;
3003 }
3004 
3005 extern "C"
3006 void *
mt_job_thread_main(void * thr_arg)3007 mt_job_thread_main(void *thr_arg)
3008 {
3009   unsigned char signal_buf[SIGBUF_SIZE];
3010   Signal *signal;
3011   const Uint32 nowait = 10 * 1000000;    /* 10 ms */
3012   Uint32 thrSignalId = 0;
3013 
3014   struct thr_data* selfptr = (struct thr_data *)thr_arg;
3015   init_thread(selfptr);
3016   Uint32& watchDogCounter = selfptr->m_watchdog_counter;
3017 
3018   unsigned thr_no = selfptr->m_thr_no;
3019   signal = aligned_signal(signal_buf, thr_no);
3020 
3021   /* Avoid false watchdog alarms caused by race condition. */
3022   watchDogCounter = 1;
3023 
3024   Uint32 pending_send = 0;
3025   Uint32 send_sum = 0;
3026   int loops = 0;
3027   int maxloops = 10;/* Loops before reading clock, fuzzy adapted to 1ms freq. */
3028   NDB_TICKS now = selfptr->m_time;
3029 
3030   while (globalData.theRestartFlag != perform_stop)
3031   {
3032     loops++;
3033     update_sched_stats(selfptr);
3034 
3035     watchDogCounter = 2;
3036     scan_time_queues(selfptr, now);
3037 
3038     Uint32 sum = run_job_buffers(selfptr, signal, &thrSignalId);
3039 
3040     watchDogCounter = 1;
3041     signal->header.m_noOfSections = 0; /* valgrind */
3042     sendpacked(selfptr, signal);
3043 
3044     if (sum)
3045     {
3046       watchDogCounter = 6;
3047       flush_jbb_write_state(selfptr);
3048       send_sum += sum;
3049 
3050       if (send_sum > MAX_SIGNALS_BEFORE_SEND)
3051       {
3052         /* Try to send, but skip for now in case of lock contention. */
3053         pending_send = do_send(selfptr, FALSE);
3054         send_sum = 0;
3055       }
3056       else
3057       {
3058         /* Send buffers append to send queues to dst. nodes. */
3059         do_flush(selfptr);
3060       }
3061     }
3062     else
3063     {
3064       /* No signals processed, prepare to sleep to wait for more */
3065       if (pending_send || send_sum > 0)
3066       {
3067         /* About to sleep, _must_ send now. */
3068         pending_send = do_send(selfptr, TRUE);
3069         send_sum = 0;
3070       }
3071 
3072       if (pending_send == 0)
3073       {
3074         bool waited = yield(&selfptr->m_waiter, nowait, check_queues_empty,
3075                             selfptr);
3076         if (waited)
3077         {
3078           /* Update current time after sleeping */
3079           now = NdbTick_CurrentMillisecond();
3080           loops = 0;
3081         }
3082       }
3083     }
3084 
3085     /**
3086      * Check if we executed enough signals,
3087      *   and if so recompute how many signals to execute
3088      */
3089     if (sum >= selfptr->m_max_exec_signals)
3090     {
3091       if (update_sched_config(selfptr, pending_send))
3092       {
3093         /* Update current time after sleeping */
3094         now = NdbTick_CurrentMillisecond();
3095         loops = 0;
3096       }
3097     }
3098     else
3099     {
3100       selfptr->m_max_exec_signals -= sum;
3101     }
3102 
3103     /**
3104      * Adaptive reading freq. of systeme time every time 1ms
3105      * is likely to have passed
3106      */
3107     if (loops > maxloops)
3108     {
3109       now = NdbTick_CurrentMillisecond();
3110       Uint64 diff = now - selfptr->m_time;
3111 
3112       /* Adjust 'maxloop' to achieve clock reading frequency of 1ms */
3113       if (diff < 1)
3114         maxloops += ((maxloops/10) + 1); /* No change: less frequent reading */
3115       else if (diff > 1 && maxloops > 1)
3116         maxloops -= ((maxloops/10) + 1); /* Overslept: Need more frequent read*/
3117 
3118       loops = 0;
3119     }
3120   }
3121 
3122   globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
3123   return NULL;                  // Return value not currently used
3124 }
3125 
3126 void
sendlocal(Uint32 self,const SignalHeader * s,const Uint32 * data,const Uint32 secPtr[3])3127 sendlocal(Uint32 self, const SignalHeader *s, const Uint32 *data,
3128           const Uint32 secPtr[3])
3129 {
3130   Uint32 block = blockToMain(s->theReceiversBlockNumber);
3131   Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
3132 
3133   /*
3134    * Max number of signals to put into job buffer before flushing the buffer
3135    * to the other thread.
3136    * This parameter found to be reasonable by benchmarking.
3137    */
3138   Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == receiver_thread_no) ?
3139     MAX_SIGNALS_BEFORE_FLUSH_RECEIVER :
3140     MAX_SIGNALS_BEFORE_FLUSH_OTHER;
3141 
3142   Uint32 dst = block2ThreadId(block, instance);
3143   struct thr_repository* rep = &g_thr_repository;
3144   struct thr_data * selfptr = rep->m_thread + self;
3145   assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
3146   struct thr_data * dstptr = rep->m_thread + dst;
3147 
3148   selfptr->m_priob_count++;
3149   Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
3150   selfptr->m_priob_size += siglen;
3151 
3152   thr_job_queue *q = dstptr->m_in_queue + self;
3153   thr_jb_write_state *w = selfptr->m_write_states + dst;
3154   if (insert_signal(q, w, false, s, data, secPtr, selfptr->m_next_buffer))
3155   {
3156     selfptr->m_next_buffer = seize_buffer(rep, self, false);
3157   }
3158   if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
3159     flush_write_state(selfptr, dstptr, q->m_head, w);
3160 }
3161 
3162 void
sendprioa(Uint32 self,const SignalHeader * s,const uint32 * data,const Uint32 secPtr[3])3163 sendprioa(Uint32 self, const SignalHeader *s, const uint32 *data,
3164           const Uint32 secPtr[3])
3165 {
3166   Uint32 block = blockToMain(s->theReceiversBlockNumber);
3167   Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
3168 
3169   Uint32 dst = block2ThreadId(block, instance);
3170   struct thr_repository* rep = &g_thr_repository;
3171   struct thr_data *selfptr = rep->m_thread + self;
3172   assert(s->theVerId_signalNumber == GSN_START_ORD ||
3173          pthread_equal(selfptr->m_thr_id, pthread_self()));
3174   struct thr_data *dstptr = rep->m_thread + dst;
3175 
3176   selfptr->m_prioa_count++;
3177   Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
3178   selfptr->m_prioa_size += siglen;
3179 
3180   thr_job_queue *q = &(dstptr->m_jba);
3181   thr_jb_write_state w;
3182 
3183   lock(&dstptr->m_jba_write_lock);
3184 
3185   Uint32 index = q->m_head->m_write_index;
3186   w.m_write_index = index;
3187   thr_job_buffer *buffer = q->m_buffers[index];
3188   w.m_write_buffer = buffer;
3189   w.m_write_pos = buffer->m_len;
3190   w.m_pending_signals = 0;
3191   w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
3192   bool buf_used = insert_signal(q, &w, true, s, data, secPtr,
3193                                 selfptr->m_next_buffer);
3194   flush_write_state(selfptr, dstptr, q->m_head, &w);
3195 
3196   unlock(&dstptr->m_jba_write_lock);
3197 
3198   if (buf_used)
3199     selfptr->m_next_buffer = seize_buffer(rep, self, true);
3200 }
3201 
3202 /**
3203  * Send a signal to a remote node.
3204  *
3205  * (The signal is only queued here, and actually sent later in do_send()).
3206  */
3207 SendStatus
mt_send_remote(Uint32 self,const SignalHeader * sh,Uint8 prio,const Uint32 * data,NodeId nodeId,const LinearSectionPtr ptr[3])3208 mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
3209                const Uint32 * data, NodeId nodeId,
3210                const LinearSectionPtr ptr[3])
3211 {
3212   thr_repository *rep = &g_thr_repository;
3213   thr_data *selfptr = rep->m_thread + self;
3214   SendStatus ss;
3215 
3216   mt_send_handle handle(selfptr);
3217   register_pending_send(selfptr, nodeId);
3218   /* prepareSend() is lock-free, as we have per-thread send buffers. */
3219   ss = globalTransporterRegistry.prepareSend(&handle,
3220                                              sh, prio, data, nodeId, ptr);
3221   return ss;
3222 }
3223 
3224 SendStatus
mt_send_remote(Uint32 self,const SignalHeader * sh,Uint8 prio,const Uint32 * data,NodeId nodeId,class SectionSegmentPool * thePool,const SegmentedSectionPtr ptr[3])3225 mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
3226                const Uint32 *data, NodeId nodeId,
3227                class SectionSegmentPool *thePool,
3228                const SegmentedSectionPtr ptr[3])
3229 {
3230   thr_repository *rep = &g_thr_repository;
3231   thr_data *selfptr = rep->m_thread + self;
3232   SendStatus ss;
3233 
3234   mt_send_handle handle(selfptr);
3235   register_pending_send(selfptr, nodeId);
3236   ss = globalTransporterRegistry.prepareSend(&handle,
3237                                              sh, prio, data, nodeId,
3238                                              *thePool, ptr);
3239   return ss;
3240 }
3241 
3242 /*
3243  * This functions sends a prio A STOP_FOR_CRASH signal to a thread.
3244  *
3245  * It works when called from any other thread, not just from job processing
3246  * threads. But note that this signal will be the last signal to be executed by
3247  * the other thread, as it will exit immediately.
3248  */
3249 static
3250 void
sendprioa_STOP_FOR_CRASH(const struct thr_data * selfptr,Uint32 dst)3251 sendprioa_STOP_FOR_CRASH(const struct thr_data *selfptr, Uint32 dst)
3252 {
3253   SignalT<StopForCrash::SignalLength> signalT;
3254   struct thr_repository* rep = &g_thr_repository;
3255   /* As this signal will be the last one executed by the other thread, it does
3256      not matter which buffer we use in case the current buffer is filled up by
3257      the STOP_FOR_CRASH signal; the data in it will never be read.
3258   */
3259   static thr_job_buffer dummy_buffer;
3260 
3261   /**
3262    * Pick any instance running in this thread
3263    */
3264   struct thr_data * dstptr = rep->m_thread + dst;
3265   Uint32 bno = dstptr->m_instance_list[0];
3266 
3267   memset(&signalT.header, 0, sizeof(SignalHeader));
3268   signalT.header.theVerId_signalNumber   = GSN_STOP_FOR_CRASH;
3269   signalT.header.theReceiversBlockNumber = bno;
3270   signalT.header.theSendersBlockRef      = 0;
3271   signalT.header.theTrace                = 0;
3272   signalT.header.theSendersSignalId      = 0;
3273   signalT.header.theSignalId             = 0;
3274   signalT.header.theLength               = StopForCrash::SignalLength;
3275   StopForCrash * stopForCrash = CAST_PTR(StopForCrash, &signalT.theData[0]);
3276   stopForCrash->flags = 0;
3277 
3278   thr_job_queue *q = &(dstptr->m_jba);
3279   thr_jb_write_state w;
3280 
3281   lock(&dstptr->m_jba_write_lock);
3282 
3283   Uint32 index = q->m_head->m_write_index;
3284   w.m_write_index = index;
3285   thr_job_buffer *buffer = q->m_buffers[index];
3286   w.m_write_buffer = buffer;
3287   w.m_write_pos = buffer->m_len;
3288   w.m_pending_signals = 0;
3289   w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
3290   insert_signal(q, &w, true, &signalT.header, signalT.theData, NULL,
3291                 &dummy_buffer);
3292   flush_write_state(selfptr, dstptr, q->m_head, &w);
3293 
3294   unlock(&dstptr->m_jba_write_lock);
3295 }
3296 
3297 /**
3298  * init functions
3299  */
3300 static
3301 void
queue_init(struct thr_tq * tq)3302 queue_init(struct thr_tq* tq)
3303 {
3304   tq->m_next_timer = 0;
3305   tq->m_current_time = 0;
3306   tq->m_next_free = RNIL;
3307   tq->m_cnt[0] = tq->m_cnt[1] = 0;
3308   bzero(tq->m_delayed_signals, sizeof(tq->m_delayed_signals));
3309 }
3310 
3311 static
3312 void
thr_init(struct thr_repository * rep,struct thr_data * selfptr,unsigned int cnt,unsigned thr_no)3313 thr_init(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
3314          unsigned thr_no)
3315 {
3316   Uint32 i;
3317 
3318   selfptr->m_thr_no = thr_no;
3319   selfptr->m_max_signals_per_jb = MAX_SIGNALS_PER_JB;
3320   selfptr->m_max_exec_signals = 0;
3321   selfptr->m_first_free = 0;
3322   selfptr->m_first_unused = 0;
3323 
3324   {
3325     char buf[100];
3326     BaseString::snprintf(buf, sizeof(buf), "jbalock thr: %u", thr_no);
3327     register_lock(&selfptr->m_jba_write_lock, buf);
3328   }
3329   selfptr->m_jba_head.m_read_index = 0;
3330   selfptr->m_jba_head.m_write_index = 0;
3331   selfptr->m_jba.m_head = &selfptr->m_jba_head;
3332   thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
3333   selfptr->m_jba.m_buffers[0] = buffer;
3334   selfptr->m_jba_read_state.m_read_index = 0;
3335   selfptr->m_jba_read_state.m_read_buffer = buffer;
3336   selfptr->m_jba_read_state.m_read_pos = 0;
3337   selfptr->m_jba_read_state.m_read_end = 0;
3338   selfptr->m_jba_read_state.m_write_index = 0;
3339   selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
3340   selfptr->m_send_buffer_pool.set_pool(&rep->m_sb_pool);
3341 
3342   for (i = 0; i<cnt; i++)
3343   {
3344     selfptr->m_in_queue_head[i].m_read_index = 0;
3345     selfptr->m_in_queue_head[i].m_write_index = 0;
3346     selfptr->m_in_queue[i].m_head = &selfptr->m_in_queue_head[i];
3347     buffer = seize_buffer(rep, thr_no, false);
3348     selfptr->m_in_queue[i].m_buffers[0] = buffer;
3349     selfptr->m_read_states[i].m_read_index = 0;
3350     selfptr->m_read_states[i].m_read_buffer = buffer;
3351     selfptr->m_read_states[i].m_read_pos = 0;
3352     selfptr->m_read_states[i].m_read_end = 0;
3353     selfptr->m_read_states[i].m_write_index = 0;
3354   }
3355   queue_init(&selfptr->m_tq);
3356 
3357   selfptr->m_prioa_count = 0;
3358   selfptr->m_prioa_size = 0;
3359   selfptr->m_priob_count = 0;
3360   selfptr->m_priob_size = 0;
3361 
3362   selfptr->m_pending_send_count = 0;
3363   selfptr->m_pending_send_mask.clear();
3364 
3365   selfptr->m_instance_count = 0;
3366   for (i = 0; i < MAX_INSTANCES_PER_THREAD; i++)
3367     selfptr->m_instance_list[i] = 0;
3368 
3369   bzero(&selfptr->m_send_buffers, sizeof(selfptr->m_send_buffers));
3370 
3371   selfptr->m_thread = 0;
3372   selfptr->m_cpu = NO_LOCK_CPU;
3373 }
3374 
3375 /* Have to do this after init of all m_in_queues is done. */
3376 static
3377 void
thr_init2(struct thr_repository * rep,struct thr_data * selfptr,unsigned int cnt,unsigned thr_no)3378 thr_init2(struct thr_repository* rep, struct thr_data *selfptr,
3379           unsigned int cnt, unsigned thr_no)
3380 {
3381   for (Uint32 i = 0; i<cnt; i++)
3382   {
3383     selfptr->m_write_states[i].m_write_index = 0;
3384     selfptr->m_write_states[i].m_write_pos = 0;
3385     selfptr->m_write_states[i].m_write_buffer =
3386       rep->m_thread[i].m_in_queue[thr_no].m_buffers[0];
3387     selfptr->m_write_states[i].m_pending_signals = 0;
3388     selfptr->m_write_states[i].m_pending_signals_wakeup = 0;
3389   }
3390 }
3391 
3392 static
3393 void
send_buffer_init(Uint32 node,thr_repository::send_buffer * sb)3394 send_buffer_init(Uint32 node, thr_repository::send_buffer * sb)
3395 {
3396   char buf[100];
3397   BaseString::snprintf(buf, sizeof(buf), "send lock node %d", node);
3398   register_lock(&sb->m_send_lock, buf);
3399   sb->m_force_send = 0;
3400   sb->m_send_thread = NO_SEND_THREAD;
3401   bzero(&sb->m_buffer, sizeof(sb->m_buffer));
3402   sb->m_bytes = 0;
3403   bzero(sb->m_read_index, sizeof(sb->m_read_index));
3404 }
3405 
3406 static
3407 void
rep_init(struct thr_repository * rep,unsigned int cnt,Ndbd_mem_manager * mm)3408 rep_init(struct thr_repository* rep, unsigned int cnt, Ndbd_mem_manager *mm)
3409 {
3410   rep->m_mm = mm;
3411 
3412   rep->m_thread_count = cnt;
3413   for (unsigned int i = 0; i<cnt; i++)
3414   {
3415     thr_init(rep, rep->m_thread + i, cnt, i);
3416   }
3417   for (unsigned int i = 0; i<cnt; i++)
3418   {
3419     thr_init2(rep, rep->m_thread + i, cnt, i);
3420   }
3421 
3422   rep->stopped_threads = 0;
3423   NdbMutex_Init(&rep->stop_for_crash_mutex);
3424   NdbCondition_Init(&rep->stop_for_crash_cond);
3425 
3426   for (int i = 0 ; i < MAX_NTRANSPORTERS; i++)
3427   {
3428     send_buffer_init(i, rep->m_send_buffers+i);
3429   }
3430 
3431   bzero(rep->m_thread_send_buffers, sizeof(rep->m_thread_send_buffers));
3432 }
3433 
3434 
3435 /**
3436  * Thread Config
3437  */
3438 
3439 #include "ThreadConfig.hpp"
3440 #include <signaldata/StartOrd.hpp>
3441 
3442 Uint32
compute_jb_pages(struct EmulatorData * ed)3443 compute_jb_pages(struct EmulatorData * ed)
3444 {
3445   Uint32 cnt = NUM_MAIN_THREADS + globalData.ndbMtLqhThreads + 1;
3446 
3447   Uint32 perthread = 0;
3448 
3449   /**
3450    * Each thread can have thr_job_queue::SIZE pages in out-queues
3451    *   to each other thread
3452    */
3453   perthread += cnt * (1 + thr_job_queue::SIZE);
3454 
3455   /**
3456    * And thr_job_queue::SIZE prio A signals
3457    */
3458   perthread += (1 + thr_job_queue::SIZE);
3459 
3460   /**
3461    * And XXX time-queue signals
3462    */
3463   perthread += 32; // Say 1M for now
3464 
3465   /**
3466    * Each thread also keeps an own cache with max THR_FREE_BUF_MAX
3467    */
3468   perthread += THR_FREE_BUF_MAX;
3469 
3470   /**
3471    * Multiply by no of threads
3472    */
3473   Uint32 tot = cnt * perthread;
3474 
3475   return tot;
3476 }
3477 
ThreadConfig()3478 ThreadConfig::ThreadConfig()
3479 {
3480 }
3481 
~ThreadConfig()3482 ThreadConfig::~ThreadConfig()
3483 {
3484 }
3485 
3486 /*
3487  * We must do the init here rather than in the constructor, since at
3488  * constructor time the global memory manager is not available.
3489  */
3490 void
init()3491 ThreadConfig::init()
3492 {
3493   num_lqh_workers = globalData.ndbMtLqhWorkers;
3494   num_lqh_threads = globalData.ndbMtLqhThreads;
3495   num_threads = NUM_MAIN_THREADS + num_lqh_threads + 1;
3496   require(num_threads <= MAX_THREADS);
3497   receiver_thread_no = num_threads - 1;
3498 
3499   ndbout << "NDBMT: num_threads=" << num_threads << endl;
3500 
3501   ::rep_init(&g_thr_repository, num_threads,
3502              globalEmulatorData.m_mem_manager);
3503 }
3504 
3505 static
3506 void
setcpuaffinity(struct thr_repository * rep)3507 setcpuaffinity(struct thr_repository* rep)
3508 {
3509   THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
3510   conf.create_cpusets();
3511   if (conf.getInfoMessage())
3512   {
3513     printf("%s", conf.getInfoMessage());
3514     fflush(stdout);
3515   }
3516 }
3517 
3518 void
ipControlLoop(NdbThread * pThis,Uint32 thread_index)3519 ThreadConfig::ipControlLoop(NdbThread* pThis, Uint32 thread_index)
3520 {
3521   unsigned int thr_no;
3522   struct thr_repository* rep = &g_thr_repository;
3523 
3524   /**
3525    * assign threads to CPU's
3526    */
3527   setcpuaffinity(rep);
3528 
3529   /*
3530    * Start threads for all execution threads, except for the receiver
3531    * thread, which runs in the main thread.
3532    */
3533   for (thr_no = 0; thr_no < num_threads; thr_no++)
3534   {
3535     rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
3536 
3537     if (thr_no == receiver_thread_no)
3538       continue;                 // Will run in the main thread.
3539 
3540     /*
3541      * The NdbThread_Create() takes void **, but that is cast to void * when
3542      * passed to the thread function. Which is kind of strange ...
3543      */
3544     rep->m_thread[thr_no].m_thread =
3545       NdbThread_Create(mt_job_thread_main,
3546                        (void **)(rep->m_thread + thr_no),
3547                        1024*1024,
3548                        "execute thread", //ToDo add number
3549                        NDB_THREAD_PRIO_MEAN);
3550     require(rep->m_thread[thr_no].m_thread != NULL);
3551   }
3552 
3553   /* Now run the main loop for thread 0 directly. */
3554   rep->m_thread[receiver_thread_no].m_thread = pThis;
3555   mt_receiver_thread_main(&(rep->m_thread[receiver_thread_no]));
3556 
3557   /* Wait for all threads to shutdown. */
3558   for (thr_no = 0; thr_no < num_threads; thr_no++)
3559   {
3560     if (thr_no == receiver_thread_no)
3561       continue;
3562     void *dummy_return_status;
3563     NdbThread_WaitFor(rep->m_thread[thr_no].m_thread, &dummy_return_status);
3564     NdbThread_Destroy(&(rep->m_thread[thr_no].m_thread));
3565   }
3566 }
3567 
3568 int
doStart(NodeState::StartLevel startLevel)3569 ThreadConfig::doStart(NodeState::StartLevel startLevel)
3570 {
3571   SignalT<3> signalT;
3572   memset(&signalT.header, 0, sizeof(SignalHeader));
3573 
3574   signalT.header.theVerId_signalNumber   = GSN_START_ORD;
3575   signalT.header.theReceiversBlockNumber = CMVMI;
3576   signalT.header.theSendersBlockRef      = 0;
3577   signalT.header.theTrace                = 0;
3578   signalT.header.theSignalId             = 0;
3579   signalT.header.theLength               = StartOrd::SignalLength;
3580 
3581   StartOrd * startOrd = CAST_PTR(StartOrd, &signalT.theData[0]);
3582   startOrd->restartInfo = 0;
3583 
3584   sendprioa(block2ThreadId(CMVMI, 0), &signalT.header, signalT.theData, 0);
3585   return 0;
3586 }
3587 
3588 /*
3589  * Compare signal ids, taking into account overflow/wrapover.
3590  * Return same as strcmp().
3591  * Eg.
3592  *   wrap_compare(0x10,0x20) -> -1
3593  *   wrap_compare(0x10,0xffffff20) -> 1
3594  *   wrap_compare(0xffffff80,0xffffff20) -> 1
3595  *   wrap_compare(0x7fffffff, 0x80000001) -> -1
3596  */
3597 static
3598 inline
3599 int
wrap_compare(Uint32 a,Uint32 b)3600 wrap_compare(Uint32 a, Uint32 b)
3601 {
3602   /* Avoid dependencies on undefined C/C++ interger overflow semantics. */
3603   if (a >= 0x80000000)
3604     if (b >= 0x80000000)
3605       return (int)(a & 0x7fffffff) - (int)(b & 0x7fffffff);
3606     else
3607       return (a - b) >= 0x80000000 ? -1 : 1;
3608   else
3609     if (b >= 0x80000000)
3610       return (b - a) >= 0x80000000 ? 1 : -1;
3611     else
3612       return (int)a - (int)b;
3613 }
3614 
3615 Uint32
traceDumpGetNumThreads()3616 FastScheduler::traceDumpGetNumThreads()
3617 {
3618   /* The last thread is only for receiver -> no trace file. */
3619   return num_threads;
3620 }
3621 
3622 bool
traceDumpGetJam(Uint32 thr_no,Uint32 & jamBlockNumber,const Uint32 * & thrdTheEmulatedJam,Uint32 & thrdTheEmulatedJamIndex)3623 FastScheduler::traceDumpGetJam(Uint32 thr_no, Uint32 & jamBlockNumber,
3624                                const Uint32 * & thrdTheEmulatedJam,
3625                                Uint32 & thrdTheEmulatedJamIndex)
3626 {
3627   if (thr_no >= num_threads)
3628     return false;
3629 
3630 #ifdef NO_EMULATED_JAM
3631   jamBlockNumber = 0;
3632   thrdTheEmulatedJam = NULL;
3633   thrdTheEmulatedJamIndex = 0;
3634 #else
3635   const EmulatedJamBuffer *jamBuffer = &g_thr_repository.m_thread[thr_no].m_jam;
3636   thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
3637   thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
3638   jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
3639 #endif
3640   return true;
3641 }
3642 
3643 void
traceDumpPrepare(NdbShutdownType & nst)3644 FastScheduler::traceDumpPrepare(NdbShutdownType& nst)
3645 {
3646   /*
3647    * We are about to generate trace files for all threads.
3648    *
3649    * We want to stop all threads processing before we dump, as otherwise the
3650    * signal buffers could change while dumping, leading to inconsistent
3651    * results.
3652    *
3653    * To stop threads, we send the GSN_STOP_FOR_CRASH signal as prio A to each
3654    * thread. We then wait for threads to signal they are done (but not forever,
3655    * so as to not have one hanging thread prevent the generation of trace
3656    * dumps). We also must be careful not to send to ourself if the crash is
3657    * being processed by one of the threads processing signals.
3658    *
3659    * We do not stop the transporter thread, as it cannot receive signals (but
3660    * because it does not receive signals it does not really influence dumps in
3661    * any case).
3662    */
3663   void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3664   const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
3665   /* The selfptr might be NULL, or pointer to thread that crashed. */
3666 
3667   Uint32 waitFor_count = 0;
3668   NdbMutex_Lock(&g_thr_repository.stop_for_crash_mutex);
3669   g_thr_repository.stopped_threads = 0;
3670 
3671   for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++)
3672   {
3673     if (selfptr != NULL && selfptr->m_thr_no == thr_no)
3674     {
3675       /* This is own thread; we have already stopped processing. */
3676       continue;
3677     }
3678 
3679     sendprioa_STOP_FOR_CRASH(selfptr, thr_no);
3680 
3681     waitFor_count++;
3682   }
3683 
3684   static const Uint32 max_wait_seconds = 2;
3685   NDB_TICKS start = NdbTick_CurrentMillisecond();
3686   while (g_thr_repository.stopped_threads < waitFor_count)
3687   {
3688     NdbCondition_WaitTimeout(&g_thr_repository.stop_for_crash_cond,
3689                              &g_thr_repository.stop_for_crash_mutex,
3690                              10);
3691     NDB_TICKS now = NdbTick_CurrentMillisecond();
3692     if (now > start + max_wait_seconds * 1000)
3693       break;                    // Give up
3694   }
3695   if (g_thr_repository.stopped_threads < waitFor_count)
3696   {
3697     if (nst != NST_ErrorInsert)
3698     {
3699       nst = NST_Watchdog; // Make this abort fast
3700     }
3701     ndbout_c("Warning: %d thread(s) did not stop before starting crash dump.",
3702              waitFor_count - g_thr_repository.stopped_threads);
3703   }
3704   NdbMutex_Unlock(&g_thr_repository.stop_for_crash_mutex);
3705 
3706   /* Now we are ready (or as ready as can be) for doing crash dump. */
3707 }
3708 
mt_execSTOP_FOR_CRASH()3709 void mt_execSTOP_FOR_CRASH()
3710 {
3711   void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3712   const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
3713   require(selfptr != NULL);
3714 
3715   NdbMutex_Lock(&g_thr_repository.stop_for_crash_mutex);
3716   g_thr_repository.stopped_threads++;
3717   NdbCondition_Signal(&g_thr_repository.stop_for_crash_cond);
3718   NdbMutex_Unlock(&g_thr_repository.stop_for_crash_mutex);
3719 
3720   /* ToDo: is this correct? */
3721   globalEmulatorData.theWatchDog->unregisterWatchedThread(selfptr->m_thr_no);
3722 
3723   pthread_exit(NULL);
3724 }
3725 
3726 void
dumpSignalMemory(Uint32 thr_no,FILE * out)3727 FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE* out)
3728 {
3729   void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3730   thr_data *selfptr = reinterpret_cast<thr_data *>(value);
3731   const thr_repository *rep = &g_thr_repository;
3732   /*
3733    * The selfptr might be NULL, or pointer to thread that is doing the crash
3734    * jump.
3735    * If non-null, we should update the watchdog counter while dumping.
3736    */
3737   Uint32 *watchDogCounter;
3738   if (selfptr)
3739     watchDogCounter = &selfptr->m_watchdog_counter;
3740   else
3741     watchDogCounter = NULL;
3742 
3743   /*
3744    * We want to dump the signal buffers from last executed to first executed.
3745    * So we first need to find the correct sequence to output signals in, stored
3746    * in this arrray.
3747    *
3748    * We will check any buffers in the cyclic m_free_fifo. In addition,
3749    * we also need to scan the already executed part of the current
3750    * buffer in m_jba.
3751    *
3752    * Due to partial execution of prio A buffers, we will use signal ids to know
3753    * where to interleave prio A signals into the stream of prio B signals
3754    * read. So we will keep a pointer to a prio A buffer around; and while
3755    * scanning prio B buffers we will interleave prio A buffers from that buffer
3756    * when the signal id fits the sequence.
3757    *
3758    * This also means that we may have to discard the earliest part of available
3759    * prio A signal data due to too little prio B data present, or vice versa.
3760    */
3761   static const Uint32 MAX_SIGNALS_TO_DUMP = 4096;
3762   struct {
3763     const SignalHeader *ptr;
3764     bool prioa;
3765   } signalSequence[MAX_SIGNALS_TO_DUMP];
3766   Uint32 seq_start = 0;
3767   Uint32 seq_end = 0;
3768 
3769   const thr_data *thr_ptr = &rep->m_thread[thr_no];
3770   if (watchDogCounter)
3771     *watchDogCounter = 4;
3772 
3773   /*
3774    * ToDo: Might do some sanity check to avoid crashing on not yet initialised
3775    * thread.
3776    */
3777 
3778   /* Scan all available buffers with already executed signals. */
3779 
3780   /*
3781    * Keep track of all available buffers, so that we can pick out signals in
3782    * the same order they were executed (order obtained from signal id).
3783    *
3784    * We may need to keep track of THR_FREE_BUF_MAX buffers for fully executed
3785    * (and freed) buffers, plus MAX_THREADS buffers for currently active
3786    * prio B buffers, plus one active prio A buffer.
3787    */
3788   struct {
3789     const thr_job_buffer *m_jb;
3790     Uint32 m_pos;
3791     Uint32 m_max;
3792   } jbs[THR_FREE_BUF_MAX + MAX_THREADS + 1];
3793 
3794   Uint32 num_jbs = 0;
3795 
3796   /* Load released buffers. */
3797   Uint32 idx = thr_ptr->m_first_free;
3798   while (idx != thr_ptr->m_first_unused)
3799   {
3800     const thr_job_buffer *q = thr_ptr->m_free_fifo[idx];
3801     if (q->m_len > 0)
3802     {
3803       jbs[num_jbs].m_jb = q;
3804       jbs[num_jbs].m_pos = 0;
3805       jbs[num_jbs].m_max = q->m_len;
3806       num_jbs++;
3807     }
3808     idx = (idx + 1) % THR_FREE_BUF_MAX;
3809   }
3810   /* Load any active prio B buffers. */
3811   for (Uint32 thr_no = 0; thr_no < rep->m_thread_count; thr_no++)
3812   {
3813     const thr_job_queue *q = thr_ptr->m_in_queue + thr_no;
3814     const thr_jb_read_state *r = thr_ptr->m_read_states + thr_no;
3815     Uint32 read_pos = r->m_read_pos;
3816     if (read_pos > 0)
3817     {
3818       jbs[num_jbs].m_jb = q->m_buffers[r->m_read_index];
3819       jbs[num_jbs].m_pos = 0;
3820       jbs[num_jbs].m_max = read_pos;
3821       num_jbs++;
3822     }
3823   }
3824   /* Load any active prio A buffer. */
3825   const thr_jb_read_state *r = &thr_ptr->m_jba_read_state;
3826   Uint32 read_pos = r->m_read_pos;
3827   if (read_pos > 0)
3828   {
3829     jbs[num_jbs].m_jb = thr_ptr->m_jba.m_buffers[r->m_read_index];
3830     jbs[num_jbs].m_pos = 0;
3831     jbs[num_jbs].m_max = read_pos;
3832     num_jbs++;
3833   }
3834 
3835   /* Now pick out one signal at a time, in signal id order. */
3836   while (num_jbs > 0)
3837   {
3838     if (watchDogCounter)
3839       *watchDogCounter = 4;
3840 
3841     /* Search out the smallest signal id remaining. */
3842     Uint32 idx_min = 0;
3843     const Uint32 *p = jbs[idx_min].m_jb->m_data + jbs[idx_min].m_pos;
3844     const SignalHeader *s_min = reinterpret_cast<const SignalHeader*>(p);
3845     Uint32 sid_min = s_min->theSignalId;
3846 
3847     for (Uint32 i = 1; i < num_jbs; i++)
3848     {
3849       p = jbs[i].m_jb->m_data + jbs[i].m_pos;
3850       const SignalHeader *s = reinterpret_cast<const SignalHeader*>(p);
3851       Uint32 sid = s->theSignalId;
3852       if (wrap_compare(sid, sid_min) < 0)
3853       {
3854         idx_min = i;
3855         s_min = s;
3856         sid_min = sid;
3857       }
3858     }
3859 
3860     /* We found the next signal, now put it in the ordered cyclic buffer. */
3861     signalSequence[seq_end].ptr = s_min;
3862     signalSequence[seq_end].prioa = jbs[idx_min].m_jb->m_prioa;
3863     Uint32 siglen =
3864       (sizeof(SignalHeader)>>2) + s_min->m_noOfSections + s_min->theLength;
3865 #if SIZEOF_CHARP == 8
3866     /* Align to 8-byte boundary, to ensure aligned copies. */
3867     siglen= (siglen+1) & ~((Uint32)1);
3868 #endif
3869     jbs[idx_min].m_pos += siglen;
3870     if (jbs[idx_min].m_pos >= jbs[idx_min].m_max)
3871     {
3872       /* We are done with this job buffer. */
3873       num_jbs--;
3874       jbs[idx_min] = jbs[num_jbs];
3875     }
3876     seq_end = (seq_end + 1) % MAX_SIGNALS_TO_DUMP;
3877     /* Drop old signals if too many available in history. */
3878     if (seq_end == seq_start)
3879       seq_start = (seq_start + 1) % MAX_SIGNALS_TO_DUMP;
3880   }
3881 
3882   /* Now, having build the correct signal sequence, we can dump them all. */
3883   fprintf(out, "\n");
3884   bool first_one = true;
3885   bool out_of_signals = false;
3886   Uint32 lastSignalId = 0;
3887   while (seq_end != seq_start)
3888   {
3889     if (watchDogCounter)
3890       *watchDogCounter = 4;
3891 
3892     if (seq_end == 0)
3893       seq_end = MAX_SIGNALS_TO_DUMP;
3894     seq_end--;
3895     SignalT<25> signal;
3896     const SignalHeader *s = signalSequence[seq_end].ptr;
3897     unsigned siglen = (sizeof(*s)>>2) + s->theLength;
3898     if (siglen > 25)
3899       siglen = 25;              // Sanity check
3900     memcpy(&signal.header, s, 4*siglen);
3901     // instance number in trace file is confusing if not MT LQH
3902     if (num_lqh_workers == 0)
3903       signal.header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
3904 
3905     const Uint32 *posptr = reinterpret_cast<const Uint32 *>(s);
3906     signal.m_sectionPtrI[0] = posptr[siglen + 0];
3907     signal.m_sectionPtrI[1] = posptr[siglen + 1];
3908     signal.m_sectionPtrI[2] = posptr[siglen + 2];
3909     bool prioa = signalSequence[seq_end].prioa;
3910 
3911     /* Make sure to display clearly when there is a gap in the dump. */
3912     if (!first_one && !out_of_signals && (s->theSignalId + 1) != lastSignalId)
3913     {
3914       out_of_signals = true;
3915       fprintf(out, "\n\n\nNo more prio %s signals, rest of dump will be "
3916               "incomplete.\n\n\n\n", prioa ? "B" : "A");
3917     }
3918     first_one = false;
3919     lastSignalId = s->theSignalId;
3920 
3921     fprintf(out, "--------------- Signal ----------------\n");
3922     Uint32 prio = (prioa ? JBA : JBB);
3923     SignalLoggerManager::printSignalHeader(out,
3924                                            signal.header,
3925                                            prio,
3926                                            globalData.ownId,
3927                                            true);
3928     SignalLoggerManager::printSignalData  (out,
3929                                            signal.header,
3930                                            &signal.theData[0]);
3931   }
3932   fflush(out);
3933 }
3934 
3935 int
traceDumpGetCurrentThread()3936 FastScheduler::traceDumpGetCurrentThread()
3937 {
3938   void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3939   const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
3940 
3941   /* The selfptr might be NULL, or pointer to thread that crashed. */
3942   if (selfptr == 0)
3943   {
3944     return -1;
3945   }
3946   else
3947   {
3948     return (int)selfptr->m_thr_no;
3949   }
3950 }
3951 
3952 void
mt_section_lock()3953 mt_section_lock()
3954 {
3955   lock(&(g_thr_repository.m_section_lock));
3956 }
3957 
3958 void
mt_section_unlock()3959 mt_section_unlock()
3960 {
3961   unlock(&(g_thr_repository.m_section_lock));
3962 }
3963 
3964 void
mt_mem_manager_init()3965 mt_mem_manager_init()
3966 {
3967 }
3968 
3969 void
mt_mem_manager_lock()3970 mt_mem_manager_lock()
3971 {
3972   lock(&(g_thr_repository.m_mem_manager_lock));
3973 }
3974 
3975 void
mt_mem_manager_unlock()3976 mt_mem_manager_unlock()
3977 {
3978   unlock(&(g_thr_repository.m_mem_manager_lock));
3979 }
3980 
3981 Vector<mt_lock_stat> g_locks;
3982 template class Vector<mt_lock_stat>;
3983 
3984 static
3985 void
register_lock(const void * ptr,const char * name)3986 register_lock(const void * ptr, const char * name)
3987 {
3988   if (name == 0)
3989     return;
3990 
3991   mt_lock_stat* arr = g_locks.getBase();
3992   for (size_t i = 0; i<g_locks.size(); i++)
3993   {
3994     if (arr[i].m_ptr == ptr)
3995     {
3996       if (arr[i].m_name)
3997       {
3998         free(arr[i].m_name);
3999       }
4000       arr[i].m_name = strdup(name);
4001       return;
4002     }
4003   }
4004 
4005   mt_lock_stat ln;
4006   ln.m_ptr = ptr;
4007   ln.m_name = strdup(name);
4008   ln.m_contended_count = 0;
4009   ln.m_spin_count = 0;
4010   g_locks.push_back(ln);
4011 }
4012 
4013 static
4014 mt_lock_stat *
lookup_lock(const void * ptr)4015 lookup_lock(const void * ptr)
4016 {
4017   mt_lock_stat* arr = g_locks.getBase();
4018   for (size_t i = 0; i<g_locks.size(); i++)
4019   {
4020     if (arr[i].m_ptr == ptr)
4021       return arr + i;
4022   }
4023 
4024   return 0;
4025 }
4026 
4027 Uint32
mt_get_thread_references_for_blocks(const Uint32 blocks[],Uint32 threadId,Uint32 dst[],Uint32 len)4028 mt_get_thread_references_for_blocks(const Uint32 blocks[], Uint32 threadId,
4029                                     Uint32 dst[], Uint32 len)
4030 {
4031   Uint32 cnt = 0;
4032   Bitmask<(MAX_THREADS+31)/32> mask;
4033   mask.set(threadId);
4034   for (Uint32 i = 0; blocks[i] != 0; i++)
4035   {
4036     Uint32 block = blocks[i];
4037     /**
4038      * Find each thread that has instance of block
4039      */
4040     assert(block == blockToMain(block));
4041     Uint32 index = block - MIN_BLOCK_NO;
4042     for (Uint32 instance = 0; instance < MAX_BLOCK_INSTANCES; instance++)
4043     {
4044       Uint32 thr_no = thr_map[index][instance].thr_no;
4045       if (thr_no == thr_map_entry::NULL_THR_NO)
4046         break;
4047 
4048       if (mask.get(thr_no))
4049         continue;
4050 
4051       mask.set(thr_no);
4052       require(cnt < len);
4053       dst[cnt++] = numberToRef(block, instance, 0);
4054     }
4055   }
4056   return cnt;
4057 }
4058 
4059 void
mt_wakeup(class SimulatedBlock * block)4060 mt_wakeup(class SimulatedBlock* block)
4061 {
4062   Uint32 thr_no = block->getThreadId();
4063   thr_data *thrptr = g_thr_repository.m_thread + thr_no;
4064   wakeup(&thrptr->m_waiter);
4065 }
4066 
4067 #ifdef VM_TRACE
4068 void
mt_assert_own_thread(SimulatedBlock * block)4069 mt_assert_own_thread(SimulatedBlock* block)
4070 {
4071   Uint32 thr_no = block->getThreadId();
4072   thr_data *thrptr = g_thr_repository.m_thread + thr_no;
4073 
4074   if (unlikely(pthread_equal(thrptr->m_thr_id, pthread_self()) == 0))
4075   {
4076     fprintf(stderr, "mt_assert_own_thread() - assertion-failure\n");
4077     fflush(stderr);
4078     abort();
4079   }
4080 }
4081 #endif
4082 
4083 /**
4084  * Global data
4085  */
4086 struct thr_repository g_thr_repository;
4087 
4088 struct trp_callback g_trp_callback;
4089 
4090 TransporterRegistry globalTransporterRegistry(&g_trp_callback, false);
4091