1 /* Copyright (c) 2008, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
22 
23 #include <ndb_global.h>
24 
25 #define NDBD_MULTITHREADED
26 
27 #include <VMSignal.hpp>
28 #include <kernel_types.h>
29 #include <Prio.hpp>
30 #include <SignalLoggerManager.hpp>
31 #include <SimulatedBlock.hpp>
32 #include <ErrorHandlingMacros.hpp>
33 #include <GlobalData.hpp>
34 #include <WatchDog.hpp>
35 #include <TransporterDefinitions.hpp>
36 #include <TransporterRegistry.hpp>
37 #include "FastScheduler.hpp"
38 #include "mt.hpp"
39 #include <DebuggerNames.hpp>
40 #include <signaldata/StopForCrash.hpp>
41 #include "TransporterCallbackKernel.hpp"
42 #include <NdbSleep.h>
43 #include <portlib/ndb_prefetch.h>
44 
45 #include "mt-asm.h"
46 #include "mt-lock.hpp"
47 
48 #include "ThreadConfig.hpp"
49 #include <signaldata/StartOrd.hpp>
50 
51 #include <NdbTick.h>
52 #include <NdbMutex.h>
53 #include <NdbCondition.h>
54 #include <ErrorReporter.hpp>
55 #include <EventLogger.hpp>
56 
57 extern EventLogger * g_eventLogger;
58 
59 
60 static void dumpJobQueues(void);
61 
62 inline
63 SimulatedBlock*
mt_getBlock(BlockNumber blockNo,Uint32 instanceNo)64 GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo)
65 {
66   SimulatedBlock* b = getBlock(blockNo);
67   if (b != 0 && instanceNo != 0)
68     b = b->getInstance(instanceNo);
69   return b;
70 }
71 
72 #ifdef __GNUC__
73 /* Provides a small (but noticeable) speedup in benchmarks. */
74 #define memcpy __builtin_memcpy
75 #endif
76 
77 /* Constants found by benchmarks to be reasonable values. */
78 
79 /* Maximum number of signals to execute before sending to remote nodes. */
80 static const Uint32 MAX_SIGNALS_BEFORE_SEND = 200;
81 
82 /*
83  * Max. signals to execute from one job buffer before considering other
84  * possible stuff to do.
85  */
86 static const Uint32 MAX_SIGNALS_PER_JB = 75;
87 
88 /**
89  * Max signals written to other thread before calling flush_jbb_write_state
90  */
91 static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_RECEIVER = 2;
92 static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_OTHER = 20;
93 static const Uint32 MAX_SIGNALS_BEFORE_WAKEUP = 128;
94 
95 //#define NDB_MT_LOCK_TO_CPU
96 
97 #define NUM_MAIN_THREADS 2 // except receiver
98 /*
99   MAX_BLOCK_THREADS need not include the send threads since it's
100   used to set size of arrays used by all threads that contains a
101   job buffer and executes signals. The send threads only sends
102   messages directed to other nodes and contains no blocks and
103   executes thus no signals.
104 */
105 #define MAX_BLOCK_THREADS (NUM_MAIN_THREADS +       \
106                            MAX_NDBMT_LQH_THREADS +  \
107                            MAX_NDBMT_TC_THREADS +   \
108                            MAX_NDBMT_RECEIVE_THREADS)
109 
110 /* If this is too small it crashes before first signal. */
111 #define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)
112 
113 static Uint32 num_threads = 0;
114 static Uint32 first_receiver_thread_no = 0;
115 static Uint32 max_send_delay = 0;
116 
117 #define NO_SEND_THREAD (MAX_BLOCK_THREADS + MAX_NDBMT_SEND_THREADS + 1)
118 
119 /* max signal is 32 words, 7 for signal header and 25 datawords */
120 #define MAX_SIGNAL_SIZE 32
121 #define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / MAX_SIGNAL_SIZE) //255
122 
123 #if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG)
124 #define USE_FUTEX
125 #endif
126 
127 #ifdef USE_FUTEX
128 #ifndef _GNU_SOURCE
129 #define _GNU_SOURCE
130 #endif
131 #include <unistd.h>
132 #include <sys/syscall.h>
133 #include <sys/types.h>
134 
135 #define FUTEX_WAIT              0
136 #define FUTEX_WAKE              1
137 #define FUTEX_FD                2
138 #define FUTEX_REQUEUE           3
139 #define FUTEX_CMP_REQUEUE       4
140 #define FUTEX_WAKE_OP           5
141 
142 static inline
143 int
futex_wait(volatile unsigned * addr,int val,const struct timespec * timeout)144 futex_wait(volatile unsigned * addr, int val, const struct timespec * timeout)
145 {
146   return syscall(SYS_futex,
147                  addr, FUTEX_WAIT, val, timeout, 0, 0) == 0 ? 0 : errno;
148 }
149 
150 static inline
151 int
futex_wake(volatile unsigned * addr)152 futex_wake(volatile unsigned * addr)
153 {
154   return syscall(SYS_futex, addr, FUTEX_WAKE, 1, 0, 0, 0) == 0 ? 0 : errno;
155 }
156 
MY_ALIGNED(NDB_CL)157 struct MY_ALIGNED(NDB_CL) thr_wait
158 {
159   volatile unsigned m_futex_state;
160   enum {
161     FS_RUNNING = 0,
162     FS_SLEEPING = 1
163   };
164   thr_wait() {
165     assert((sizeof(*this) % NDB_CL) == 0); //Maintain any CL-allignment
166     xcng(&m_futex_state, FS_RUNNING);
167   }
168   void init () {}
169 };
170 
171 /**
172  * Sleep until woken up or timeout occurs.
173  *
174  * Will call check_callback(check_arg) after proper synchronisation, and only
175  * if that returns true will it actually sleep, else it will return
176  * immediately. This is needed to avoid races with wakeup.
177  *
178  * Returns 'true' if it actually did sleep.
179  */
180 template<typename T>
181 static inline
182 bool
yield(struct thr_wait * wait,const Uint32 nsec,bool (* check_callback)(T *),T * check_arg)183 yield(struct thr_wait* wait, const Uint32 nsec,
184       bool (*check_callback)(T*), T* check_arg)
185 {
186   volatile unsigned * val = &wait->m_futex_state;
187 #ifndef NDEBUG
188   int old =
189 #endif
190     xcng(val, thr_wait::FS_SLEEPING);
191   assert(old == thr_wait::FS_RUNNING);
192 
193   /**
194    * At this point, we need to re-check the condition that made us decide to
195    * sleep, and skip sleeping if it changed..
196    *
197    * Otherwise, the condition may have not changed, and the thread making the
198    * change have already decided not to wake us, as our state was FS_RUNNING
199    * at the time.
200    *
201    * Also need a memory barrier to ensure this extra check is race-free.
202    *   but that is already provided by xcng
203    */
204   const bool waited = (*check_callback)(check_arg);
205   if (waited)
206   {
207     struct timespec timeout;
208     timeout.tv_sec = 0;
209     timeout.tv_nsec = nsec;
210     futex_wait(val, thr_wait::FS_SLEEPING, &timeout);
211   }
212   xcng(val, thr_wait::FS_RUNNING);
213   return waited;
214 }
215 
216 static inline
217 int
wakeup(struct thr_wait * wait)218 wakeup(struct thr_wait* wait)
219 {
220   volatile unsigned * val = &wait->m_futex_state;
221   /**
222    * We must ensure that any state update (new data in buffers...) are visible
223    * to the other thread before we can look at the sleep state of that other
224    * thread.
225    */
226   if (xcng(val, thr_wait::FS_RUNNING) == thr_wait::FS_SLEEPING)
227   {
228     return futex_wake(val);
229   }
230   return 0;
231 }
232 #else
233 
MY_ALIGNED(NDB_CL)234 struct MY_ALIGNED(NDB_CL) thr_wait
235 {
236   NdbMutex *m_mutex;
237   NdbCondition *m_cond;
238   bool m_need_wakeup;
239   thr_wait() : m_mutex(0), m_cond(0), m_need_wakeup(false) {
240     assert((sizeof(*this) % NDB_CL) == 0); //Maintain any CL-allignment
241   }
242 
243   void init() {
244     m_mutex = NdbMutex_Create();
245     m_cond = NdbCondition_Create();
246   }
247 };
248 
249 template<typename T>
250 static inline
251 bool
yield(struct thr_wait * wait,const Uint32 nsec,bool (* check_callback)(T *),T * check_arg)252 yield(struct thr_wait* wait, const Uint32 nsec,
253       bool (*check_callback)(T*), T* check_arg)
254 {
255   struct timespec end;
256   NdbCondition_ComputeAbsTime(&end, (nsec >= 1000000) ? nsec/1000000 : 1);
257   NdbMutex_Lock(wait->m_mutex);
258 
259   Uint32 waits = 0;
260   /* May have spurious wakeups: Always recheck condition predicate */
261   while ((*check_callback)(check_arg))
262   {
263     wait->m_need_wakeup = true;
264     waits++;
265     if (NdbCondition_WaitTimeoutAbs(wait->m_cond,
266                                     wait->m_mutex, &end) == ETIMEDOUT)
267     {
268       wait->m_need_wakeup = false;
269       break;
270     }
271   }
272   NdbMutex_Unlock(wait->m_mutex);
273   return (waits > 0);
274 }
275 
276 
277 static inline
278 int
wakeup(struct thr_wait * wait)279 wakeup(struct thr_wait* wait)
280 {
281   NdbMutex_Lock(wait->m_mutex);
282   // We should avoid signaling when not waiting for wakeup
283   if (wait->m_need_wakeup)
284   {
285     wait->m_need_wakeup = false;
286     NdbCondition_Signal(wait->m_cond);
287   }
288   NdbMutex_Unlock(wait->m_mutex);
289   return 0;
290 }
291 
292 #endif
293 
294 #define JAM_FILE_ID 236
295 
296 
297 /**
298  * thr_safe_pool
299  */
300 template<typename T>
MY_ALIGNED(NDB_CL)301 struct MY_ALIGNED(NDB_CL) thr_safe_pool
302 {
303   thr_safe_pool(const char * name) : m_lock(name), m_free_list(0), m_cnt(0) {
304     assert((sizeof(*this) % NDB_CL) == 0); //Maintain any CL-allignment
305   }
306 
307   struct thr_spin_lock m_lock;
308 
309   T* m_free_list;
310   Uint32 m_cnt;
311 
312   T* seize(Ndbd_mem_manager *mm, Uint32 rg) {
313     T* ret = 0;
314     lock(&m_lock);
315     if (m_free_list)
316     {
317       assert(m_cnt);
318       m_cnt--;
319       ret = m_free_list;
320       m_free_list = ret->m_next;
321       unlock(&m_lock);
322     }
323     else
324     {
325       unlock(&m_lock);
326       Uint32 dummy;
327       ret = reinterpret_cast<T*>
328         (mm->alloc_page(rg, &dummy,
329                         Ndbd_mem_manager::NDB_ZONE_ANY));
330       // ToDo: How to deal with failed allocation?!?
331       // I think in this case we need to start grabbing buffers kept for signal
332       // trace.
333     }
334     return ret;
335   }
336 
337   Uint32 seize_list(Ndbd_mem_manager *mm, Uint32 rg,
338                     Uint32 requested, T** head, T** tail)
339   {
340     lock(&m_lock);
341     if (m_cnt == 0)
342     {
343       unlock(&m_lock);
344       Uint32 dummy;
345       T* ret = reinterpret_cast<T*>
346         (mm->alloc_page(rg, &dummy,
347                         Ndbd_mem_manager::NDB_ZONE_ANY));
348 
349       if (ret == 0)
350       {
351         return 0;
352       }
353       else
354       {
355         ret->m_next = 0;
356         * head = * tail = ret;
357         return 1;
358       }
359     }
360     else
361     {
362       if (m_cnt < requested )
363         requested = m_cnt;
364 
365       T* first = m_free_list;
366       T* last = first;
367       for (Uint32 i = 1; i < requested; i++)
368       {
369         last = last->m_next;
370       }
371       m_cnt -= requested;
372       m_free_list = last->m_next;
373       unlock(&m_lock);
374       last->m_next = 0;
375       * head = first;
376       * tail = last;
377       return requested;
378     }
379   }
380 
381   void release(Ndbd_mem_manager *mm, Uint32 rg, T* t) {
382     lock(&m_lock);
383     t->m_next = m_free_list;
384     m_free_list = t;
385     m_cnt++;
386     unlock(&m_lock);
387   }
388 
389   void release_list(Ndbd_mem_manager *mm, Uint32 rg,
390                     T* head, T* tail, Uint32 cnt) {
391     lock(&m_lock);
392     tail->m_next = m_free_list;
393     m_free_list = head;
394     m_cnt += cnt;
395     unlock(&m_lock);
396   }
397 };
398 
399 /**
400  * thread_local_pool
401  */
402 template<typename T>
403 class thread_local_pool
404 {
405 public:
thread_local_pool(thr_safe_pool<T> * global_pool,unsigned max_free,unsigned alloc_size=1)406   thread_local_pool(thr_safe_pool<T> *global_pool,
407                     unsigned max_free, unsigned alloc_size = 1) :
408     m_max_free(max_free),
409     m_alloc_size(alloc_size),
410     m_free(0),
411     m_freelist(0),
412     m_global_pool(global_pool)
413   {
414   }
415 
seize(Ndbd_mem_manager * mm,Uint32 rg)416   T *seize(Ndbd_mem_manager *mm, Uint32 rg) {
417     T *tmp = m_freelist;
418     if (tmp == 0)
419     {
420       T * tail;
421       m_free = m_global_pool->seize_list(mm, rg, m_alloc_size, &tmp, &tail);
422     }
423     if (tmp)
424     {
425       m_freelist = tmp->m_next;
426       assert(m_free > 0);
427       m_free--;
428     }
429 
430     validate();
431     return tmp;
432   }
433 
release(Ndbd_mem_manager * mm,Uint32 rg,T * t)434   void release(Ndbd_mem_manager *mm, Uint32 rg, T *t) {
435     unsigned free = m_free;
436     if (free < m_max_free)
437     {
438       m_free = free + 1;
439       t->m_next = m_freelist;
440       m_freelist = t;
441     }
442     else
443       m_global_pool->release(mm, rg, t);
444 
445     validate();
446   }
447 
448   /**
449    * Release to local pool even if it get's "too" full
450    *   (wrt to m_max_free)
451    */
release_local(T * t)452   void release_local(T *t) {
453     m_free++;
454     t->m_next = m_freelist;
455     m_freelist = t;
456 
457     validate();
458   }
459 
validate() const460   void validate() const {
461 #ifdef VM_TRACE
462     Uint32 cnt = 0;
463     T* t = m_freelist;
464     while (t)
465     {
466       cnt++;
467       t = t->m_next;
468     }
469     assert(cnt == m_free);
470 #endif
471   }
472 
473   /**
474    * Release entries so that m_max_free is honored
475    *   (likely used together with release_local)
476    */
release_global(Ndbd_mem_manager * mm,Uint32 rg)477   void release_global(Ndbd_mem_manager *mm, Uint32 rg) {
478     validate();
479     unsigned cnt = 0;
480     unsigned free = m_free;
481     Uint32 maxfree = m_max_free;
482     assert(maxfree > 0);
483 
484     T* head = m_freelist;
485     T* tail = m_freelist;
486     if (free > maxfree)
487     {
488       cnt++;
489       free--;
490 
491       while (free > maxfree)
492       {
493         cnt++;
494         free--;
495         tail = tail->m_next;
496       }
497 
498       assert(free == maxfree);
499 
500       m_free = free;
501       m_freelist = tail->m_next;
502       m_global_pool->release_list(mm, rg, head, tail, cnt);
503     }
504     validate();
505   }
506 
release_all(Ndbd_mem_manager * mm,Uint32 rg)507   void release_all(Ndbd_mem_manager *mm, Uint32 rg) {
508     validate();
509     T* head = m_freelist;
510     T* tail = m_freelist;
511     if (tail)
512     {
513       unsigned cnt = 1;
514       while (tail->m_next != 0)
515       {
516         cnt++;
517         tail = tail->m_next;
518       }
519       m_global_pool->release_list(mm, rg, head, tail, cnt);
520       m_free = 0;
521       m_freelist = 0;
522     }
523     validate();
524   }
525 
526   /**
527    * release everything if more than m_max_free
528    *   else do nothing
529    */
release_chunk(Ndbd_mem_manager * mm,Uint32 rg)530   void release_chunk(Ndbd_mem_manager *mm, Uint32 rg) {
531     if (m_free > m_max_free)
532       release_all(mm, rg);
533   }
534 
535   /**
536    * prealloc up to <em>cnt</em> pages into this pool
537    */
fill(Ndbd_mem_manager * mm,Uint32 rg,Uint32 cnt)538   bool fill(Ndbd_mem_manager *mm, Uint32 rg, Uint32 cnt)
539   {
540     if (m_free >= cnt)
541     {
542       return true;
543     }
544 
545     T *head, *tail;
546     Uint32 allocated = m_global_pool->seize_list(mm, rg, m_alloc_size,
547                                                  &head, &tail);
548     if (allocated)
549     {
550       tail->m_next = m_freelist;
551       m_freelist = head;
552       m_free += allocated;
553       return m_free >= cnt;
554     }
555 
556     return false;
557   }
558 
set_pool(thr_safe_pool<T> * pool)559   void set_pool(thr_safe_pool<T> * pool) { m_global_pool = pool; }
560 
561 private:
562   const unsigned m_max_free;
563   const unsigned m_alloc_size;
564   unsigned m_free;
565   T *m_freelist;
566   thr_safe_pool<T> *m_global_pool;
567 };
568 
569 /**
570  * Signal buffers.
571  *
572  * Each thread job queue contains a list of these buffers with signals.
573  *
574  * There is an underlying assumption that the size of this structure is the
575  * same as the global memory manager page size.
576  */
577 struct thr_job_buffer // 32k
578 {
579   static const unsigned SIZE = 8190;
580 
581   /*
582    * Amount of signal data currently in m_data buffer.
583    * Read/written by producer, read by consumer.
584    */
585   Uint32 m_len;
586   /*
587    * Whether this buffer contained prio A or prio B signals, used when dumping
588    * signals from released buffers.
589    */
590   Uint32 m_prioa;
591   union {
592     Uint32 m_data[SIZE];
593 
594     thr_job_buffer * m_next; // For free-list
595   };
596 };
597 
598 static
599 inline
600 Uint32
calc_fifo_used(Uint32 ri,Uint32 wi,Uint32 sz)601 calc_fifo_used(Uint32 ri, Uint32 wi, Uint32 sz)
602 {
603   return (wi >= ri) ? wi - ri : (sz - ri) + wi;
604 }
605 
606 /**
607  * thr_job_queue is shared between consumer / producer.
608  *
609  * The hot-spot of the thr_job_queue are the read/write indexes.
610  * As they are updated and read frequently they have been placed
611  * in its own thr_job_queue_head[] in order to make them fit inside a
612  * single/few cache lines and thereby avoid complete L1-cache replacement
613  * every time the job_queue is scanned.
614  */
615 struct thr_job_queue_head
616 {
617   unsigned m_read_index;  // Read/written by consumer, read by producer
618   unsigned m_write_index; // Read/written by producer, read by consumer
619 
620   /**
621    * Waiter object: In case job queue is full, the produced thread
622    * will 'yield' on this waiter object until the consumer thread
623    * has consumed (at least) a job buffer.
624    */
625   thr_wait m_waiter;
626 
627   Uint32 used() const;
628 };
629 
630 struct thr_job_queue
631 {
632   static const unsigned SIZE = 32;
633 
634   /**
635    * There is a SAFETY limit on free buffers we never allocate,
636    * but may allow these to be implicitly used as a last resort
637    * when job scheduler is really stuck. ('sleeploop 10')
638    */
639   static const unsigned SAFETY = 2;
640 
641   /**
642    * Some more free buffers are RESERVED to be used to avoid
643    * or resolve circular wait-locks between threads waiting
644    * for buffers to become available.
645    */
646   static const unsigned RESERVED = 4;
647 
648   /**
649    * When free buffer count drops below ALMOST_FULL, we
650    * are allowed to start using RESERVED buffers to prevent
651    * circular wait-locks.
652    */
653   static const unsigned ALMOST_FULL = RESERVED + 2;
654 
655   struct thr_job_buffer* m_buffers[SIZE];
656 };
657 
658 inline
659 Uint32
used() const660 thr_job_queue_head::used() const
661 {
662   return calc_fifo_used(m_read_index, m_write_index, thr_job_queue::SIZE);
663 }
664 
665 /*
666  * Two structures tightly associated with thr_job_queue.
667  *
668  * There will generally be exactly one thr_jb_read_state and one
669  * thr_jb_write_state associated with each thr_job_queue.
670  *
671  * The reason they are kept separate is to avoid unnecessary inter-CPU
672  * cache line pollution. All fields shared among producer and consumer
673  * threads are in thr_job_queue, thr_jb_write_state fields are only
674  * accessed by the producer thread(s), and thr_jb_read_state fields are
675  * only accessed by the consumer thread.
676  *
677  * For example, on Intel core 2 quad processors, there is a ~33%
678  * penalty for two cores accessing the same 64-byte cacheline.
679  */
680 struct thr_jb_write_state
681 {
682   /*
683    * The position to insert the next signal into the queue.
684    *
685    * m_write_index is the index into thr_job_queue::m_buffers[] of the buffer
686    * to insert into, and m_write_pos is the index into thr_job_buffer::m_data[]
687    * at which to store the next signal.
688    */
689   Uint32 m_write_index;
690   Uint32 m_write_pos;
691 
692   /* Thread-local copy of thr_job_queue::m_buffers[m_write_index]. */
693   thr_job_buffer *m_write_buffer;
694 
695   /**
696     Number of signals inserted since last flush to thr_job_queue.
697     This variable stores the number of pending signals not yet flushed
698     in the lower 16 bits and the number of pending signals before a
699     wakeup is called of the other side in the upper 16 bits. To
700     simplify the code we implement the bit manipulations in the
701     methods below.
702 
703     The reason for this optimisation is to minimise use of memory for
704     these variables as they are likely to consume CPU cache memory.
705     It also speeds up some pending signal checks.
706   */
707   Uint32 m_pending_signals;
708 
has_any_pending_signalsthr_jb_write_state709   bool has_any_pending_signals() const
710   {
711     return m_pending_signals;
712   }
get_pending_signalsthr_jb_write_state713   Uint32 get_pending_signals() const
714   {
715     return (m_pending_signals & 0xFFFF);
716   }
get_pending_signals_wakeupthr_jb_write_state717   Uint32 get_pending_signals_wakeup() const
718   {
719     return (m_pending_signals >> 16);
720   }
clear_pending_signals_and_set_wakeupthr_jb_write_state721   void clear_pending_signals_and_set_wakeup(Uint32 wakeups)
722   {
723     m_pending_signals = (wakeups << 16);
724   }
increment_pending_signalsthr_jb_write_state725   void increment_pending_signals()
726   {
727     m_pending_signals++;
728   }
init_pending_signalsthr_jb_write_state729   void init_pending_signals()
730   {
731     m_pending_signals = 0;
732   }
733 
734   /*
735    * Is this job buffer open for communication at all?
736    * Several threads are not expected to communicate, and thus does
737    * not allocate thr_job_buffer for exchange of signals.
738    * Don't access any job_buffers without ensuring 'is_open()==true'.
739    */
is_openthr_jb_write_state740   bool is_open() const
741   {
742     return (m_write_buffer != NULL);
743   }
744 };
745 
746 /*
747  * This structure is also used when dumping signal traces, to dump executed
748  * signals from the buffer(s) currently being processed.
749  */
750 struct thr_jb_read_state
751 {
752   /*
753    * Index into thr_job_queue::m_buffers[] of the buffer that we are currently
754    * executing signals from.
755    */
756   Uint32 m_read_index;
757   /*
758    * Index into m_read_buffer->m_data[] of the next signal to execute from the
759    * current buffer.
760    */
761   Uint32 m_read_pos;
762   /*
763    * Thread local copy of thr_job_queue::m_buffers[m_read_index].
764    */
765   thr_job_buffer *m_read_buffer;
766   /*
767    * These are thread-local copies of thr_job_queue::m_write_index and
768    * thr_job_buffer::m_len. They are read once at the start of the signal
769    * execution loop and used to determine when the end of available signals is
770    * reached.
771    */
772   Uint32 m_read_end;    // End within current thr_job_buffer. (*m_read_buffer)
773 
774   Uint32 m_write_index; // Last available thr_job_buffer.
775 
776   /*
777    * Is this job buffer open for communication at all?
778    * Several threads are not expected to communicate, and thus does
779    * not allocate thr_job_buffer for exchange of signals.
780    * Don't access any job_buffers without ensuring 'is_open()==true'.
781    */
is_openthr_jb_read_state782   bool is_open() const
783   {
784     return (m_read_buffer != NULL);
785   }
786 
is_emptythr_jb_read_state787   bool is_empty() const
788   {
789     assert(m_read_index != m_write_index  ||  m_read_pos <= m_read_end);
790     return (m_read_index == m_write_index) && (m_read_pos >= m_read_end);
791   }
792 };
793 
794 /**
795  * time-queue
796  */
797 struct thr_tq
798 {
799   static const unsigned ZQ_SIZE = 256;
800   static const unsigned SQ_SIZE = 512;
801   static const unsigned LQ_SIZE = 512;
802   static const unsigned PAGES = (MAX_SIGNAL_SIZE *
803                                 (ZQ_SIZE + SQ_SIZE + LQ_SIZE)) / 8192;
804 
805   Uint32 * m_delayed_signals[PAGES];
806   Uint32 m_next_free;
807   Uint32 m_next_timer;
808   Uint32 m_current_time;
809   Uint32 m_cnt[3];
810   Uint32 m_zero_queue[ZQ_SIZE];
811   Uint32 m_short_queue[SQ_SIZE];
812   Uint32 m_long_queue[LQ_SIZE];
813 };
814 
815 /**
816  * THR_SEND_BUFFER_ALLOC_SIZE is the amount of 32k pages allocated
817  * when we allocate pages from the global pool of send buffers to
818  * the thread_local_pool (which is local to a thread).
819  *
820  * We allocate a bunch to decrease contention on send-buffer-pool-mutex
821  */
822 #define THR_SEND_BUFFER_ALLOC_SIZE 32
823 
824 /**
825  * THR_SEND_BUFFER_PRE_ALLOC is the amout of 32k pages that are
826  *   allocated before we start to run signals
827  */
828 #define THR_SEND_BUFFER_PRE_ALLOC 32
829 
830 /**
831  * Amount of pages that is allowed to linger in a
832  * thread-local send-buffer pool
833  */
834 #define THR_SEND_BUFFER_MAX_FREE \
835   (THR_SEND_BUFFER_ALLOC_SIZE + THR_SEND_BUFFER_PRE_ALLOC - 1)
836 
837 /*
838  * Max number of thread-local job buffers to keep before releasing to
839  * global pool.
840  */
841 #define THR_FREE_BUF_MAX 32
842 /* Minimum number of buffers (to ensure useful trace dumps). */
843 #define THR_FREE_BUF_MIN 12
844 /*
845  * 1/THR_FREE_BUF_BATCH is the fraction of job buffers to allocate/free
846  * at a time from/to global pool.
847  */
848 #define THR_FREE_BUF_BATCH 6
849 
850 /**
851  * a page with send data
852  */
853 struct thr_send_page
854 {
855   static const Uint32 PGSIZE = 32768;
856 #if SIZEOF_CHARP == 4
857   static const Uint32 HEADER_SIZE = 8;
858 #else
859   static const Uint32 HEADER_SIZE = 12;
860 #endif
861 
max_bytesthr_send_page862   static Uint32 max_bytes() {
863     return PGSIZE - offsetof(thr_send_page, m_data);
864   }
865 
866   /* Next page */
867   thr_send_page* m_next;
868 
869   /* Bytes of send data available in this page. */
870   Uint16 m_bytes;
871 
872   /* Start of unsent data */
873   Uint16 m_start;
874 
875   /* Data; real size is to the end of one page. */
876   char m_data[2];
877 };
878 
879 /**
880  * a linked list with thr_send_page
881  */
882 struct thr_send_buffer
883 {
884   thr_send_page* m_first_page;
885   thr_send_page* m_last_page;
886 };
887 
888 /**
889  * a ring buffer with linked list of thr_send_page
890  */
891 struct thr_send_queue
892 {
893   unsigned m_write_index;
894 #if SIZEOF_CHARP == 8
895   unsigned m_unused;
896   thr_send_page* m_buffers[7];
897   static const unsigned SIZE = 7;
898 #else
899   thr_send_page* m_buffers[15];
900   static const unsigned SIZE = 15;
901 #endif
902 };
903 
MY_ALIGNED(NDB_CL)904 struct MY_ALIGNED(NDB_CL) thr_data
905 {
906   thr_data() : m_jba_write_lock("jbalock"),
907                m_signal_id_counter(0),
908                m_send_buffer_pool(0,
909                                   THR_SEND_BUFFER_MAX_FREE,
910                                   THR_SEND_BUFFER_ALLOC_SIZE) {
911 
912     // Check cacheline allignment
913     assert((((UintPtr)this) % NDB_CL) == 0);
914     assert((((UintPtr)&m_waiter) % NDB_CL) == 0);
915     assert((((UintPtr)&m_jba_write_lock) % NDB_CL) == 0);
916     assert((((UintPtr)&m_jba) % NDB_CL) == 0);
917     assert((((UintPtr)m_in_queue_head) % NDB_CL) == 0);
918     assert((((UintPtr)m_in_queue) % NDB_CL) == 0);
919   }
920 
921   /**
922    * We start with the data structures that are shared globally to
923    * ensure that they get the proper cache line alignment
924    */
925   thr_wait m_waiter; /* Cacheline aligned*/
926 
927   /*
928    * Prio A signal incoming queue. This area is used from many threads
929    * protected by the spin lock. Thus it is also important to protect
930    * surrounding thread-local variables from CPU cache line sharing
931    * with this part.
932    */
933   MY_ALIGNED(NDB_CL) struct thr_spin_lock m_jba_write_lock;
934   MY_ALIGNED(NDB_CL) struct thr_job_queue m_jba;
935   struct thr_job_queue_head m_jba_head;
936 
937   /*
938    * These are the thread input queues, where other threads deliver signals
939    * into.
940    * These cache lines are going to be updated by many different CPU's
941    * all the time whereas other neighbour variables are thread-local variables.
942    * Avoid false cacheline sharing by require an alignment.
943    */
944   MY_ALIGNED(NDB_CL) struct thr_job_queue_head m_in_queue_head[MAX_BLOCK_THREADS];
945   MY_ALIGNED(NDB_CL) struct thr_job_queue m_in_queue[MAX_BLOCK_THREADS];
946 
947   /**
948    * The remainder of the variables in thr_data are thread-local,
949    * meaning that they are always updated by the thread that owns those
950    * data structures and thus those variables aren't shared with other
951    * CPUs.
952    */
953 
954   unsigned m_thr_no;
955 
956   /**
957    * Spin time of thread after completing all its work (in microseconds).
958    * We won't go to sleep until we have spun for sufficient time, the aim
959    * is to increase readiness in systems with much CPU resources
960    */
961   unsigned m_spintime;
962 
963   /**
964    * Realtime scheduler activated for this thread. This means this
965    * thread will run at a very high priority even beyond the priority
966    * of the OS.
967    */
968   unsigned m_realtime;
969 
970   /**
971    * Index of thread locally in Configuration.cpp
972    */
973   unsigned m_thr_index;
974 
975   /**
976    * max signals to execute per JBB buffer
977    */
978   unsigned m_max_signals_per_jb;
979 
980   /**
981    * Extra JBB signal execute quota allowed to be used to
982    * drain (almost) full in-buffers. Reserved for usage where
983    * we are about to end up in a circular wait-lock between
984    * threads where none if them will be able to proceed.
985    */
986   unsigned m_max_extra_signals;
987 
988   /**
989    * max signals to execute before recomputing m_max_signals_per_jb
990    */
991   unsigned m_max_exec_signals;
992 
993   /**
994    * Flag indicating that we have sent a local Prio A signal. Used to know
995    * if to scan for more prio A signals after executing those signals.
996    * This is used to ensure that if we execute at prio A level and send a
997    * prio A signal it will be immediately executed (or at least before any
998    * prio B signal).
999    */
1000   bool m_sent_local_prioa_signal;
1001 
1002   NDB_TICKS m_ticks;
1003   struct thr_tq m_tq;
1004 
1005   /*
1006    * In m_next_buffer we keep a free buffer at all times, so that when
1007    * we hold the lock and find we need a new buffer, we can use this and this
1008    * way defer allocation to after releasing the lock.
1009    */
1010   struct thr_job_buffer* m_next_buffer;
1011 
1012   /*
1013    * We keep a small number of buffers in a thread-local cyclic FIFO, so that
1014    * we can avoid going to the global pool in most cases, and so that we have
1015    * recent buffers available for dumping in trace files.
1016    */
1017   struct thr_job_buffer *m_free_fifo[THR_FREE_BUF_MAX];
1018   /* m_first_free is the index of the entry to return next from seize(). */
1019   Uint32 m_first_free;
1020   /* m_first_unused is the first unused entry in m_free_fifo. */
1021   Uint32 m_first_unused;
1022 
1023 
1024   /* Thread-local read state of prio A buffer. */
1025   struct thr_jb_read_state m_jba_read_state;
1026 
1027   /*
1028    * There is no m_jba_write_state, as we have multiple writers to the prio A
1029    * queue, so local state becomes invalid as soon as we release the lock.
1030    */
1031 
1032   /* These are the write states of m_in_queue[self] in each thread. */
1033   struct thr_jb_write_state m_write_states[MAX_BLOCK_THREADS];
1034   /* These are the read states of all of our own m_in_queue[]. */
1035   struct thr_jb_read_state m_read_states[MAX_BLOCK_THREADS];
1036 
1037   /* Jam buffers for making trace files at crashes. */
1038   EmulatedJamBuffer m_jam;
1039   /* Watchdog counter for this thread. */
1040   Uint32 m_watchdog_counter;
1041   /* Latest executed signal id assigned in this thread */
1042   Uint32 m_signal_id_counter;
1043 
1044   /* Signal delivery statistics. */
1045   struct
1046   {
1047     Uint64 m_loop_cnt;
1048     Uint64 m_exec_cnt;
1049     Uint64 m_wait_cnt;
1050     Uint64 m_prioa_count;
1051     Uint64 m_prioa_size;
1052     Uint64 m_priob_count;
1053     Uint64 m_priob_size;
1054   } m_stat;
1055 
1056   /* Array of node ids with pending remote send data. */
1057   Uint8 m_pending_send_nodes[MAX_NTRANSPORTERS];
1058   /* Number of node ids in m_pending_send_nodes. */
1059   Uint32 m_pending_send_count;
1060 
1061   /**
1062    * Bitmap of pending node ids with send data.
1063    * Used to quickly check if a node id is already in m_pending_send_nodes.
1064    */
1065   Bitmask<(MAX_NTRANSPORTERS+31)/32> m_pending_send_mask;
1066 
1067   /* pool for send buffers */
1068   class thread_local_pool<thr_send_page> m_send_buffer_pool;
1069 
1070   /* Send buffer for this thread, these are not touched by any other thread */
1071   struct thr_send_buffer m_send_buffers[MAX_NTRANSPORTERS];
1072 
1073   /* Block instances (main and worker) handled by this thread. */
1074   /* Used for sendpacked (send-at-job-buffer-end). */
1075   Uint32 m_instance_count;
1076   BlockNumber m_instance_list[MAX_INSTANCES_PER_THREAD];
1077 
1078   SectionSegmentPool::Cache m_sectionPoolCache;
1079 
1080   Uint32 m_cpu;
1081   my_thread_t m_thr_id;
1082   NdbThread* m_thread;
1083 };
1084 
1085 struct mt_send_handle  : public TransporterSendBufferHandle
1086 {
1087   struct thr_data * m_selfptr;
mt_send_handlemt_send_handle1088   mt_send_handle(thr_data* ptr) : m_selfptr(ptr) {}
~mt_send_handlemt_send_handle1089   virtual ~mt_send_handle() {}
1090 
1091   virtual Uint32 *getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max);
1092   virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
1093   virtual void getSendBufferLevel(NodeId node, SB_LevelType &level);
1094   virtual bool forceSend(NodeId node);
1095 };
1096 
1097 struct trp_callback : public TransporterCallback
1098 {
trp_callbacktrp_callback1099   trp_callback() {}
1100 
1101   /* Callback interface. */
1102   void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
1103   void lock_transporter(NodeId node);
1104   void unlock_transporter(NodeId node);
1105   Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
1106   Uint32 bytes_sent(NodeId node, Uint32 bytes);
1107   bool has_data_to_send(NodeId node);
1108   void reset_send_buffer(NodeId node, bool should_be_empty);
1109 };
1110 
1111 static char *g_thr_repository_mem = NULL;
1112 static struct thr_repository *g_thr_repository = NULL;
1113 
1114 struct thr_repository
1115 {
thr_repositorythr_repository1116   thr_repository()
1117     : m_section_lock("sectionlock"),
1118       m_mem_manager_lock("memmanagerlock"),
1119       m_jb_pool("jobbufferpool"),
1120       m_sb_pool("sendbufferpool")
1121   {
1122     // Verify assumed cacheline allignment
1123     assert((((UintPtr)this) % NDB_CL) == 0);
1124     assert((((UintPtr)&m_receive_lock) % NDB_CL) == 0);
1125     assert((((UintPtr)&m_section_lock) % NDB_CL) == 0);
1126     assert((((UintPtr)&m_mem_manager_lock) % NDB_CL) == 0);
1127     assert((((UintPtr)&m_jb_pool) % NDB_CL) == 0);
1128     assert((((UintPtr)&m_sb_pool) % NDB_CL) == 0);
1129     assert((((UintPtr)m_thread) % NDB_CL) == 0);
1130     assert((sizeof(m_receive_lock[0]) % NDB_CL) == 0);
1131   }
1132 
1133   /**
1134    * m_receive_lock, m_section_lock, m_mem_manager_lock, m_jb_pool
1135    * and m_sb_pool are all variables globally shared among the threads
1136    * and also heavily updated.
1137    * Requiring alignments avoid false cache line sharing.
1138    */
1139   MY_ALIGNED(NDB_CL)
1140   struct MY_ALIGNED(NDB_CL) aligned_locks : public thr_spin_lock
1141   {
1142   } m_receive_lock[MAX_NDBMT_RECEIVE_THREADS];
1143 
1144   MY_ALIGNED(NDB_CL) struct thr_spin_lock m_section_lock;
1145   MY_ALIGNED(NDB_CL) struct thr_spin_lock m_mem_manager_lock;
1146   MY_ALIGNED(NDB_CL) struct thr_safe_pool<thr_job_buffer> m_jb_pool;
1147   MY_ALIGNED(NDB_CL) struct thr_safe_pool<thr_send_page> m_sb_pool;
1148 
1149   /* m_mm and m_thread_count are globally shared and read only variables */
1150   Ndbd_mem_manager * m_mm;
1151   unsigned m_thread_count;
1152   /**
1153    * Protect m_mm and m_thread_count from CPU cache misses, first
1154    * part of m_thread (struct thr_data) is globally shared variables.
1155    * So sharing cache line with these for these read only variables
1156    * isn't a good idea
1157    */
1158   MY_ALIGNED(NDB_CL) struct thr_data m_thread[MAX_BLOCK_THREADS];
1159 
1160   /* The buffers that are to be sent */
1161   struct send_buffer
1162   {
1163     /**
1164      * In order to reduce lock contention while
1165      * adding job buffer pages to the send buffers,
1166      * and sending these with the help of the send
1167      * transporters, there are two different
1168      * thr_send_buffer's. Each protected by its own lock:
1169      *
1170      * - m_buffer / m_buffer_lock:
1171      *   Send buffer pages from all threads are linked into
1172      *   the m_buffer when collected by link_thread_send_buffers().
1173      *
1174      * - m_sending / m_send_lock:
1175      *   Before send buffers are given to the send-transporter,
1176      *   they are moved from m_buffer -> m_sending by
1177      *   get_bytes_to_send_iovec(). (Req. both locks.)
1178      *   When transporter has consumed some/all of m_sending
1179      *   buffers, ::bytes_sent() will update m_sending accordingly.
1180      *
1181      * If both locks are required, grab the m_send_lock first.
1182      * Release m_buffer_lock before releasing m_send_lock.
1183      */
1184     struct thr_spin_lock m_buffer_lock; //Protect m_buffer
1185     struct thr_send_buffer m_buffer;
1186 
1187     struct thr_spin_lock m_send_lock;   //Protect m_sending + transporter
1188     struct thr_send_buffer m_sending;
1189 
1190     Uint64 m_node_total_send_buffer_size; //Protected by m_buffer_lock
1191     /**
1192      * Flag used to coordinate sending to same remote node from different
1193      * threads when there are contention on m_send_lock.
1194      *
1195      * If two threads need to send to the same node at the same time, the
1196      * second thread, rather than wait for the first to finish, will just
1197      * set this flag. The first thread will will then take responsibility
1198      * for sending to this node when done with its own sending.
1199      */
1200     Uint32 m_force_send;   //Check after release of m_send_lock
1201 
1202     /**
1203      * Which thread is currently holding the m_send_lock
1204      */
1205     Uint32 m_send_thread;  //Protected by m_send_lock
1206 
1207     /**
1208      * Bytes sent in last performSend().
1209      */
1210     Uint32 m_bytes_sent;
1211 
1212     /* read index(es) in thr_send_queue */
1213     Uint32 m_read_index[MAX_BLOCK_THREADS];
1214   } m_send_buffers[MAX_NTRANSPORTERS];
1215 
1216   /* The buffers published by threads */
1217   thr_send_queue m_thread_send_buffers[MAX_NTRANSPORTERS][MAX_BLOCK_THREADS];
1218 
1219   /*
1220    * These are used to synchronize during crash / trace dumps.
1221    *
1222    */
1223   NdbMutex stop_for_crash_mutex;
1224   NdbCondition stop_for_crash_cond;
1225   Uint32 stopped_threads;
1226 };
1227 
1228 /**
1229  *  Class to handle send threads
1230  *  ----------------------------
1231  *  We can have up to 8 send threads.
1232  *
1233  *  This class will handle when a block thread needs to send, it will
1234  *  handle the running of the send thread and will also start the
1235  *  send thread.
1236  */
1237 #define is_send_thread(thr_no) (thr_no >= num_threads)
1238 
1239 struct thr_send_thread_instance
1240 {
thr_send_thread_instancethr_send_thread_instance1241   thr_send_thread_instance() :
1242                m_instance_no(0),
1243                m_watchdog_counter(0),
1244                m_awake(FALSE),
1245                m_thread(NULL),
1246                m_waiter_struct(),
1247                m_send_buffer_pool(0,
1248                                   THR_SEND_BUFFER_MAX_FREE,
1249                                   THR_SEND_BUFFER_ALLOC_SIZE)
1250   {}
1251   Uint32 m_instance_no;
1252   Uint32 m_watchdog_counter;
1253   Uint32 m_awake;
1254   Uint32 m_thr_index;
1255   NdbThread *m_thread;
1256   thr_wait m_waiter_struct;
1257   class thread_local_pool<thr_send_page> m_send_buffer_pool;
1258 };
1259 
1260 struct thr_send_nodes
1261 {
1262   /**
1263    * 'm_next' implements a list of 'send_nodes' with PENDING'
1264    * data, not yet assigned to a send thread. 0 means NULL.
1265    */
1266   Uint16 m_next;
1267 
1268   /**
1269    * m_data_available are incremented/decremented by each
1270    * party having data to be sent to this specific node.
1271    * It work in conjunction with a queue of get'able nodes
1272    * (insert_node(), get_node()) waiting to be served by
1273    * the send threads, such that:
1274    *
1275    * 1) IDLE-state (m_data_available==0, not in list)
1276    *    There are no data available for sending, and
1277    *    no send threads are assigned to this node.
1278    *
1279    * 2) PENDING-state (m_data_available>0, in list)
1280    *    There are data available for sending, possibly
1281    *    supplied by multiple parties. No send threads
1282    *    are currently serving this request.
1283    *
1284    * 3) ACTIVE-state (m_data_available==1, not in list)
1285    *    There are data available for sending, possibly
1286    *    supplied by multiple parties, which are currently
1287    *    being served by a send thread. All known
1288    *    data available at the time when we became 'ACTIVE'
1289    *    will be served now ( -> '==1')
1290    *
1291    * 3b ACTIVE-WITH-PENDING-state (m_data_available>1, not in list)
1292    *    Variant of above state, send thread is serving requests,
1293    *    and even more data became available since we started.
1294    *
1295    * Allowed state transitions are:
1296    *
1297    * IDLE     -> PENDING  (alert_send_thread w/ insert_node)
1298    * PENDING  -> ACTIVE   (get_node)
1299    * ACTIVE   -> IDLE     (run_send_thread if check_done_node)
1300    * ACTIVE   -> PENDING  (run_send_thread if 'more'
1301    * ACTIVE   -> ACTIVE-P (alert_send_thread while ACTIVE)
1302    * ACTIVE-P -> PENDING  (run_send_thread while not check_done_node)
1303    * ACTIVE-P -> ACTIVE-P (alert_send_thread while ACTIVE-P)
1304    *
1305    * A consequence of this, is that only a (single-) ACTIVE
1306    * send thread will serve send request to a specific node.
1307    * Thus, there will be no contention on the m_send_lock
1308    * caused by the send threads.
1309    */
1310   Uint16 m_data_available;
1311 
1312   /**
1313    * m_send_thread is the current/last send thread instance
1314    * serving this send_node. Whenever possible we try to
1315    * reuse the same thread next time around to avoid
1316    * switching between CPUs.
1317    */
1318   Uint16 m_send_thread;
1319 
1320   /* Send to this node has caused a Transporter overload */
1321   Uint16 m_send_overload;
1322 
1323   /**
1324    * Further sending to this node should be delayed until
1325    * 'm_micros_delayed' has passed since 'm_inserted_time'.
1326    */
1327   Uint32 m_micros_delayed;
1328   NDB_TICKS m_inserted_time;
1329 };
1330 
1331 class thr_send_threads
1332 {
1333 public:
1334   /* Create send thread environment */
1335   thr_send_threads();
1336 
1337   /* Destroy send thread environment and ensure threads are stopped */
1338   ~thr_send_threads();
1339 
1340   /* A block thread has flushed data for a node and wants it sent */
1341   void alert_send_thread(NodeId node, NDB_TICKS now);
1342 
1343   /* Method used to run the send thread */
1344   void run_send_thread(Uint32 instance_no);
1345 
1346   /* Method to start the send threads */
1347   void start_send_threads();
1348 
1349   /**
1350    * Check if a node possibly is having data ready to be sent.
1351    * Upon 'true', callee should grab send_thread_mutex and
1352    * try to get_node() while holding lock.
1353    */
data_available() const1354   bool data_available() const
1355   {
1356     rmb();
1357     return (m_more_nodes == TRUE);
1358   }
1359 
1360   /* Get send buffer pool for send thread */
get_send_buffer_pool(Uint32 thr_no)1361   thread_local_pool<thr_send_page>* get_send_buffer_pool(Uint32 thr_no)
1362   {
1363     return &m_send_threads[thr_no - num_threads].m_send_buffer_pool;
1364   }
1365 
1366 private:
1367   /* Insert a node in list of nodes that has data available to send */
1368   void insert_node(NodeId node);
1369 
1370   /* Get a node from the list in order to send to it */
1371   NodeId get_node(Uint32 instance_no, NDB_TICKS now);
1372 
1373   /**
1374    * Set of utility methods to aid in scheduling of send work:
1375    *
1376    * Further sending to node can be delayed
1377    * until 'now+delay'. Used either to wait for more packets
1378    * to be available for bigger chunks, or to wait for an overload
1379    * situation to clear.
1380    */
1381   void set_max_delay(NodeId node, NDB_TICKS now, Uint32 delay_usec);
1382   void set_overload_delay(NodeId node, NDB_TICKS now, Uint32 delay_usec);
1383   Uint32 check_delay_expired(NodeId node, NDB_TICKS now);
1384 
1385   /* Completed sending data to this node, check if more work pending. */
1386   bool check_done_node(NodeId node);
1387 
1388   /* Get a send thread which isn't awake currently */
1389   struct thr_send_thread_instance* get_not_awake_send_thread(NodeId node);
1390   Uint32 count_awake_send_threads(void) const;
1391 
1392   /* Try to lock send_buffer for this node. */
1393   static
1394   int trylock_send_node(NodeId node);
1395 
1396   /* Perform the actual send to the node, release send_buffer lock.
1397    * Return 'true' if there are still more to be sent to this node.
1398    */
1399   static
1400   bool perform_send(NodeId node, Uint32 instance_no, Uint32& bytes_sent);
1401 
1402   /* Have threads been started */
1403   Uint32 m_started_threads;
1404 
1405   /* First node that has data to be sent */
1406   Uint32 m_first_node;
1407 
1408   /* Last node in list of nodes with data available for sending */
1409   Uint32 m_last_node;
1410 
1411   /* 'true': More nodes became available -> Need recheck ::get_node() */
1412   bool m_more_nodes;
1413 
1414   /* Is data available and next reference for each node in cluster */
1415   struct thr_send_nodes m_node_state[MAX_NODES];
1416 
1417   /**
1418    * Very few compiler (gcc) allow zero length arrays
1419    */
1420 #if MAX_NDBMT_SEND_THREADS == 0
1421 #define _MAX_SEND_THREADS 1
1422 #else
1423 #define _MAX_SEND_THREADS MAX_NDBMT_SEND_THREADS
1424 #endif
1425 
1426   /* Data and state for the send threads */
1427   struct thr_send_thread_instance m_send_threads[_MAX_SEND_THREADS];
1428 
1429   /**
1430    * Mutex protecting the linked list of nodes awaiting sending
1431    * and also the not_awake variable of the send thread.
1432    */
1433   NdbMutex *send_thread_mutex;
1434 };
1435 
1436 
1437 /*
1438  * The single instance of the thr_send_threads class, if this variable
1439  * is non-NULL, then we're using send threads, otherwise if NULL, there
1440  * are no send threads.
1441  */
1442 static thr_send_threads *g_send_threads = NULL;
1443 
1444 extern "C"
1445 void *
mt_send_thread_main(void * thr_arg)1446 mt_send_thread_main(void *thr_arg)
1447 {
1448   struct thr_send_thread_instance *this_send_thread =
1449     (thr_send_thread_instance*)thr_arg;
1450 
1451   Uint32 instance_no = this_send_thread->m_instance_no;
1452   g_send_threads->run_send_thread(instance_no);
1453   return NULL;
1454 }
1455 
thr_send_threads()1456 thr_send_threads::thr_send_threads()
1457   : m_started_threads(FALSE),
1458     m_first_node(0),
1459     m_last_node(0),
1460     m_more_nodes(false),
1461     send_thread_mutex(NULL)
1462 {
1463   struct thr_repository *rep = g_thr_repository;
1464 
1465   for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_node_state); i++)
1466   {
1467     m_node_state[i].m_next = 0;
1468     m_node_state[i].m_data_available = 0;
1469     m_node_state[i].m_send_thread = 0;
1470     m_node_state[i].m_send_overload = FALSE;
1471     m_node_state[i].m_micros_delayed = 0;
1472     NdbTick_Invalidate(&m_node_state[i].m_inserted_time);
1473   }
1474   for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_send_threads); i++)
1475   {
1476     m_send_threads[i].m_waiter_struct.init();
1477     m_send_threads[i].m_instance_no = i;
1478     m_send_threads[i].m_send_buffer_pool.set_pool(&rep->m_sb_pool);
1479   }
1480   send_thread_mutex = NdbMutex_Create();
1481 }
1482 
~thr_send_threads()1483 thr_send_threads::~thr_send_threads()
1484 {
1485   if (!m_started_threads)
1486     return;
1487 
1488   for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++)
1489   {
1490     void *dummy_return_status;
1491 
1492     /* Ensure thread is woken up to die */
1493     wakeup(&(m_send_threads[i].m_waiter_struct));
1494     NdbThread_WaitFor(m_send_threads[i].m_thread, &dummy_return_status);
1495     globalEmulatorData.theConfiguration->removeThread(
1496       m_send_threads[i].m_thread);
1497     NdbThread_Destroy(&(m_send_threads[i].m_thread));
1498   }
1499 }
1500 
1501 void
start_send_threads()1502 thr_send_threads::start_send_threads()
1503 {
1504   for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++)
1505   {
1506     m_send_threads[i].m_thread =
1507       NdbThread_Create(mt_send_thread_main,
1508                        (void **)&m_send_threads[i],
1509                        1024*1024,
1510                        "send thread", //ToDo add number
1511                        NDB_THREAD_PRIO_MEAN);
1512     m_send_threads[i].m_thr_index =
1513       globalEmulatorData.theConfiguration->addThread(
1514         m_send_threads[i].m_thread,
1515         SendThread);
1516   }
1517   m_started_threads = TRUE;
1518 }
1519 
1520 /* Called under mutex protection of send_thread_mutex */
1521 void
insert_node(NodeId node)1522 thr_send_threads::insert_node(NodeId node)
1523 {
1524   Uint8 last_node = m_last_node;
1525   Uint8 first_node = m_first_node;
1526   struct thr_send_nodes &last_node_state = m_node_state[last_node];
1527   struct thr_send_nodes &node_state = m_node_state[node];
1528 
1529   assert(node_state.m_data_available > 0);
1530   node_state.m_next = 0;
1531 
1532   m_more_nodes = true;
1533   /* Ensure the lock free ::data_available see 'm_more_nodes == TRUE' */
1534   wmb();
1535 
1536   m_last_node = node;
1537   if (first_node == 0)
1538     m_first_node = node;
1539   else
1540     last_node_state.m_next = node;
1541 }
1542 
1543 /* Called under mutex protection of send_thread_mutex */
1544 void
set_max_delay(NodeId node,NDB_TICKS now,Uint32 delay_usec)1545 thr_send_threads::set_max_delay(NodeId node, NDB_TICKS now, Uint32 delay_usec)
1546 {
1547   struct thr_send_nodes &node_state = m_node_state[node];
1548   assert(node_state.m_data_available > 0);
1549   assert(!node_state.m_send_overload);
1550 
1551   node_state.m_micros_delayed = delay_usec;
1552   node_state.m_inserted_time = now;
1553 }
1554 
1555 /* Called under mutex protection of send_thread_mutex */
1556 void
set_overload_delay(NodeId node,NDB_TICKS now,Uint32 delay_usec)1557 thr_send_threads::set_overload_delay(NodeId node, NDB_TICKS now, Uint32 delay_usec)
1558 {
1559   struct thr_send_nodes &node_state = m_node_state[node];
1560   assert(node_state.m_data_available > 0);
1561   node_state.m_send_overload = TRUE;
1562   node_state.m_micros_delayed = delay_usec;
1563   node_state.m_inserted_time = now;
1564 }
1565 
1566 /* Called under mutex protection of send_thread_mutex */
1567 Uint32
check_delay_expired(NodeId node,NDB_TICKS now)1568 thr_send_threads::check_delay_expired(NodeId node, NDB_TICKS now)
1569 {
1570   struct thr_send_nodes &node_state = m_node_state[node];
1571   assert(node_state.m_data_available > 0);
1572 
1573   if (node_state.m_micros_delayed == 0)
1574     return 0;
1575 
1576   const Uint64 micros_passed = NdbTick_Elapsed(node_state.m_inserted_time, now).microSec();
1577   if (micros_passed >= Uint64(node_state.m_micros_delayed)) //Expired
1578   {
1579     node_state.m_inserted_time = now;
1580     node_state.m_micros_delayed = 0;
1581     node_state.m_send_overload = FALSE;
1582     return 0;
1583   }
1584 
1585   // Update and return remaining wait time
1586   node_state.m_inserted_time = now;
1587   node_state.m_micros_delayed -= micros_passed;
1588   return node_state.m_micros_delayed;
1589 }
1590 
1591 /**
1592  * TODO RONM:
1593  * Add some more NDBINFO table to make it easier the workings
1594  * of the MaxSendDelay parameter.
1595  */
1596 
1597 static Uint64 mt_get_send_buffer_bytes(NodeId node);
1598 
1599 /**
1600  * MAX_SEND_BUFFER_SIZE_TO_DELAY is a heauristic constant that specifies
1601  * a send buffer size that will always be sent. The size of this is based
1602  * on experience that maximum performance of the send part is achieved at
1603  * around 64 kBytes of send buffer size and that the difference between
1604  * 20 kB and 64 kByte is small. So thus avoiding unnecessary delays that
1605  * gain no significant performance gain.
1606  */
1607 static const Uint64 MAX_SEND_BUFFER_SIZE_TO_DELAY = (20 * 1024);
1608 
1609 
1610 /**
1611  * Get a node having data to be sent to a node (returned).
1612  *
1613  * Sending could have been delayed, in such cases the node
1614  * to expire it delay first will be returned. It is then upto
1615  * the callee to either accept this node, or reinsert it
1616  * such that it can be returned and retried later.
1617  *
1618  * Called under mutex protection of send_thread_mutex
1619  */
1620 NodeId
get_node(Uint32 send_thread,NDB_TICKS now)1621 thr_send_threads::get_node(Uint32 send_thread, NDB_TICKS now)
1622 {
1623   Uint32 next;
1624   Uint32 prev = 0;
1625   Uint32 node = m_first_node;
1626   Uint32 delayed_node = 0;
1627 
1628   if (!node)
1629   {
1630     m_more_nodes = false;
1631     return 0;
1632   }
1633 
1634   /**
1635    * Search for a node ready to be sent to.
1636    * If none found, remember the one with the smallest delay.
1637    */
1638   Uint32 min_wait_usec = Uint32(~0);
1639   while (node)
1640   {
1641     next = m_node_state[node].m_next;
1642 
1643     const Uint32 send_delay = check_delay_expired(node, now);
1644     if (likely(send_delay == 0))
1645       goto found;
1646 
1647     /* Find remaining minimum wait: */
1648     if (min_wait_usec > send_delay)
1649     {
1650       min_wait_usec = send_delay;
1651       delayed_node = node;
1652     }
1653 
1654     prev = node;
1655     node = next;
1656   }
1657 
1658   // As 'm_first_node != 0', there has to be a 'delayed_node'
1659   assert(delayed_node != 0);
1660   m_more_nodes = false;  // No more to execute without delays
1661 
1662   // Relocate the delayed send node, return it */
1663   node = m_first_node;
1664   prev = 0;
1665   do
1666   {
1667     next = m_node_state[node].m_next;
1668     if (node == delayed_node)
1669       goto found;
1670 
1671     prev = node;
1672     node = next;
1673   } while (node);
1674 
1675   require(false);  // Should never get here
1676 
1677 found:
1678   struct thr_send_nodes &node_state = m_node_state[node];
1679   assert(node_state.m_data_available > 0);
1680   node_state.m_next = 0;
1681 
1682   if (likely(node == m_first_node))
1683     m_first_node = next;
1684   else
1685     m_node_state[prev].m_next = next;
1686 
1687   if (node == m_last_node)
1688     m_last_node = prev;
1689 
1690   node_state.m_data_available = 1;
1691   node_state.m_send_thread = send_thread;
1692   return (NodeId)node;
1693 }
1694 
1695 /* Called under mutex protection of send_thread_mutex */
1696 bool
check_done_node(NodeId node)1697 thr_send_threads::check_done_node(NodeId node)
1698 {
1699   struct thr_send_nodes &node_state = m_node_state[node];
1700   assert(node_state.m_data_available > 0);
1701   node_state.m_data_available--;
1702   return (node_state.m_data_available == 0);
1703 }
1704 
1705 /* Called under mutex protection of send_thread_mutex */
1706 struct thr_send_thread_instance*
get_not_awake_send_thread(NodeId node)1707 thr_send_threads::get_not_awake_send_thread(NodeId node)
1708 {
1709   struct thr_send_thread_instance *used_send_thread;
1710 
1711   /* Reuse previous send_thread if available */
1712   if (!m_send_threads[m_node_state[node].m_send_thread].m_awake)
1713   {
1714     used_send_thread= &m_send_threads[m_node_state[node].m_send_thread];
1715     return used_send_thread;
1716   }
1717 
1718   /* Search for another available send thread */
1719   for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++)
1720   {
1721     if (!m_send_threads[i].m_awake)
1722     {
1723       used_send_thread= &m_send_threads[i];
1724       return used_send_thread;
1725     }
1726   }
1727   return NULL;
1728 }
1729 
1730 Uint32
count_awake_send_threads() const1731 thr_send_threads::count_awake_send_threads() const
1732 {
1733   Uint32 count = 0;
1734   for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++)
1735   {
1736     if (m_send_threads[i].m_awake)
1737     {
1738       count++;
1739     }
1740   }
1741   return count;
1742 }
1743 
1744 void
alert_send_thread(NodeId node,NDB_TICKS now)1745 thr_send_threads::alert_send_thread(NodeId node, NDB_TICKS now)
1746 {
1747   struct thr_send_nodes& node_state = m_node_state[node];
1748 
1749   NdbMutex_Lock(send_thread_mutex);
1750   node_state.m_data_available++;  // There is more to send
1751   if (node_state.m_data_available > 1)
1752   {
1753     /**
1754      * ACTIVE(_P) -> ACTIVE_P
1755      *
1756      * The node is already flagged that it has data needing to be sent.
1757      * There is no need to wake even more threads up in this case
1758      * since we piggyback on someone else's request.
1759      *
1760      * Waking another thread for sending to this node, had only
1761      * resulted in contention and blockage on the send_lock.
1762      *
1763      * We are safe that the buffers we have flushed will be read by a send
1764      * thread: They will either be piggybacked when the send thread
1765      * 'get_node()' for sending, or data will be available when
1766      * send thread 'check_done_node()', finds that more data has
1767      * become available. In the later case, the send thread will schedule
1768      * the node for another round with insert_node()
1769      */
1770     NdbMutex_Unlock(send_thread_mutex);
1771     return;
1772   }
1773   assert(!node_state.m_send_overload);      // Caught above as ACTIVE
1774   insert_node(node);                        // IDLE -> PENDING
1775 
1776   /**
1777    * We need to delay sending the data, as set in config.
1778    * This is the first send to this node, so we start the
1779    * delay timer now.
1780    */
1781   if (max_send_delay > 0)                   // Wait for more payload?
1782   {
1783     set_max_delay(node, now, max_send_delay);
1784   }
1785 
1786   /*
1787    * Search for a send thread which is asleep, if there is one, wake it
1788    *
1789    * If everyone is already awake we don't need to wake anyone up since
1790    * the threads will check if there is nodes available to send to before
1791    * they go to sleep.
1792    *
1793    * The reason to look for anyone asleep is to ensure proper use of CPU
1794    * resources and ensure that we use all the send thread CPUs available
1795    * to our disposal when necessary.
1796    */
1797   struct thr_send_thread_instance *avail_send_thread
1798     = get_not_awake_send_thread(node);
1799 
1800   NdbMutex_Unlock(send_thread_mutex);
1801 
1802   if (avail_send_thread)
1803   {
1804     /*
1805      * Wake the assigned sleeping send thread, potentially a spurious wakeup,
1806      * but this is not a problem, important is to ensure that at least one
1807      * send thread is awoken to handle our request. If someone is already
1808      * awake and takes care of our request before we get to wake someone up it's
1809      * not a problem.
1810      */
1811     wakeup(&(avail_send_thread->m_waiter_struct));
1812   }
1813 }
1814 
1815 static bool
check_available_send_data(struct thr_data * not_used)1816 check_available_send_data(struct thr_data *not_used)
1817 {
1818   (void)not_used;
1819   return !g_send_threads->data_available();
1820 }
1821 
1822 //static
1823 int
trylock_send_node(NodeId node)1824 thr_send_threads::trylock_send_node(NodeId node)
1825 {
1826   thr_repository::send_buffer *sb = g_thr_repository->m_send_buffers+node;
1827   return trylock(&sb->m_send_lock);
1828 }
1829 
1830 //static
1831 bool
perform_send(NodeId node,Uint32 instance_no,Uint32 & bytes_sent)1832 thr_send_threads::perform_send(NodeId node, Uint32 instance_no, Uint32& bytes_sent)
1833 {
1834   thr_repository::send_buffer * sb = g_thr_repository->m_send_buffers+node;
1835 
1836   /**
1837    * Set m_send_thr so that our transporter callback can know which thread
1838    * holds the send lock for this remote node.
1839    */
1840   sb->m_send_thread = num_threads + instance_no;
1841   const bool more = globalTransporterRegistry.performSend(node);
1842   bytes_sent = sb->m_bytes_sent;
1843   sb->m_send_thread = NO_SEND_THREAD;
1844   unlock(&sb->m_send_lock);
1845   return more;
1846 }
1847 
1848 static void
update_send_sched_config(THRConfigApplier & conf,unsigned instance_no,bool & real_time,Uint64 & spin_time)1849 update_send_sched_config(THRConfigApplier & conf,
1850                          unsigned instance_no,
1851                          bool & real_time,
1852                          Uint64 & spin_time)
1853 {
1854   real_time = conf.do_get_realtime_send(instance_no);
1855   spin_time = (Uint64)conf.do_get_spintime_send(instance_no);
1856 }
1857 
1858 static void
yield_rt_break(NdbThread * thread,enum ThreadTypes type,bool real_time)1859 yield_rt_break(NdbThread *thread,
1860                enum ThreadTypes type,
1861                bool real_time)
1862 {
1863   Configuration * conf = globalEmulatorData.theConfiguration;
1864   conf->setRealtimeScheduler(thread,
1865                              type,
1866                              FALSE,
1867                              FALSE);
1868   conf->setRealtimeScheduler(thread,
1869                              type,
1870                              real_time,
1871                              FALSE);
1872 }
1873 
1874 static void
check_real_time_break(NDB_TICKS now,NDB_TICKS * yield_time,NdbThread * thread,enum ThreadTypes type)1875 check_real_time_break(NDB_TICKS now,
1876                       NDB_TICKS *yield_time,
1877                       NdbThread *thread,
1878                       enum ThreadTypes type)
1879 {
1880   if (unlikely(NdbTick_Compare(now, *yield_time) < 0))
1881   {
1882     /**
1883      * Timer was adjusted backwards, or the monotonic timer implementation
1884      * on this platform is unstable. Best we can do is to restart
1885      * RT-yield timers from new current time.
1886      */
1887     *yield_time = now;
1888   }
1889 
1890   const Uint64 micros_passed =
1891     NdbTick_Elapsed(*yield_time, now).microSec();
1892 
1893   if (micros_passed > 50000)
1894   {
1895     /**
1896      * Lower scheduling prio to time-sharing mode to ensure that
1897      * other threads and processes gets a chance to be scheduled
1898      * if we run for an extended time.
1899      */
1900     yield_rt_break(thread, type, TRUE);
1901     *yield_time = now;
1902   }
1903 }
1904 
1905 static bool
check_yield(NDB_TICKS now,NDB_TICKS * start_spin_ticks,Uint64 min_spin_timer)1906 check_yield(NDB_TICKS now,
1907             NDB_TICKS *start_spin_ticks,
1908             Uint64 min_spin_timer) //microseconds
1909 {
1910   assert(min_spin_timer > 0);
1911 
1912   if (!NdbTick_IsValid(*start_spin_ticks))
1913   {
1914     /**
1915      * We haven't started spinning yet, start spin timer.
1916      */
1917     *start_spin_ticks = now;
1918     return false;
1919   }
1920   if (unlikely(NdbTick_Compare(now, *start_spin_ticks) < 0))
1921   {
1922     /**
1923      * Timer was adjusted backwards, or the monotonic timer implementation
1924      * on this platform is unstable. Best we can do is to restart
1925      * spin timers from new current time.
1926      */
1927     *start_spin_ticks = now;
1928     return false;
1929   }
1930 
1931   /**
1932    * We have a minimum spin timer before we go to sleep.
1933    * We will go to sleep only if we have spun for longer
1934    * time than required by the minimum spin time.
1935    */
1936   const Uint64 micros_passed =
1937     NdbTick_Elapsed(*start_spin_ticks, now).microSec();
1938   return (micros_passed >= min_spin_timer);
1939 }
1940 
1941 /**
1942  * There are some send scheduling algorithms build into the send thread.
1943  * Mainly implemented as part of ::run_send_thread, thus commented here:
1944  *
1945  * We have the possibility to set a 'send delay' for each node. This
1946  * is used both for handling send overload where we should wait
1947  * before retrying, and as an aid for collecting smaller packets into
1948  * larger, and thus fewer packets. Thus decreasing the send overhead
1949  * on a highly loaded system.
1950  *
1951  * A delay due to overload is always waited for. As there are already
1952  * queued up send work in the buffers, sending will be possible
1953  * without the send thread actively busy-retrying. However, delays
1954  * in order to increase the packed size can be ignored.
1955  *
1956  * The basic idea if the later is the following:
1957  * By introducing a delay we ensure that all block threads have
1958  * gotten a chance to execute messages that will generate data
1959  * to be sent to nodes. This is particularly helpful in e.g.
1960  * queries that are scanning a table. Here a SCAN_TABREQ is
1961  * received in a TC and this generates a number of SCAN_FRAGREQ
1962  * signals to each LDM, each of those LDMs will in turn generate
1963  * a number of new signals that are all destined to the same
1964  * node. So this delay here increases the chance that those
1965  * signals can be sent in the same TCP/IP packet over the wire.
1966  *
1967  * Another use case is applications using the asynchronous API
1968  * and thus sending many PK lookups that traverse a node in
1969  * parallel from the same destination node. These can benefit
1970  * greatly from this extra delay increasing the packet sizes.
1971  *
1972  * There is also a case when sending many updates that need to
1973  * be sent to the other node in the same node group. By delaying
1974  * the send of this data we ensure that the receiver thread on
1975  * the other end is getting larger packet sizes and thus we
1976  * improve the throughput of the system in all sorts of ways.
1977  *
1978  * However we also try to ensure that we don't delay signals in
1979  * an idle system where response time is more important than
1980  * the throughput. This is achieved by the fact that we will
1981  * send after looping through the nodes ready to send to. In
1982  * an idle system this will be a quick operation. In a loaded
1983  * system this delay can be fairly substantial on the other
1984  * hand.
1985  *
1986  * Finally we attempt to limit the use of more than one send
1987  * thread to cases of very high load. So if there are only
1988  * delayed node sends remaining, we deduce that the
1989  * system is lightly loaded and we will go to sleep if there
1990  * are other send threads also awake.
1991  */
1992 void
run_send_thread(Uint32 instance_no)1993 thr_send_threads::run_send_thread(Uint32 instance_no)
1994 {
1995   struct thr_send_thread_instance *this_send_thread =
1996     &m_send_threads[instance_no];
1997   const Uint32 thr_no = num_threads + instance_no;
1998 
1999   {
2000     /**
2001      * Wait for thread object to be visible
2002      */
2003     while(this_send_thread->m_thread == 0)
2004       NdbSleep_MilliSleep(30);
2005   }
2006 
2007   {
2008     /**
2009      * Print out information about starting thread
2010      *   (number, tid, name, the CPU it's locked into (if locked at all))
2011      * Also perform the locking to CPU.
2012      */
2013     BaseString tmp;
2014     THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
2015     tmp.appfmt("thr: %u ", thr_no);
2016     int tid = NdbThread_GetTid(this_send_thread->m_thread);
2017     if (tid != -1)
2018     {
2019       tmp.appfmt("tid: %u ", tid);
2020     }
2021     conf.appendInfoSendThread(tmp, instance_no);
2022     int res = conf.do_bind_send(this_send_thread->m_thread, instance_no);
2023     if (res < 0)
2024     {
2025       tmp.appfmt("err: %d ", -res);
2026     }
2027     else if (res > 0)
2028     {
2029       tmp.appfmt("OK ");
2030     }
2031     printf("%s\n", tmp.c_str());
2032     fflush(stdout);
2033   }
2034 
2035   /**
2036    * register watchdog
2037    */
2038   globalEmulatorData.theWatchDog->
2039     registerWatchedThread(&this_send_thread->m_watchdog_counter, thr_no);
2040 
2041   NdbMutex_Lock(send_thread_mutex);
2042   this_send_thread->m_awake = FALSE;
2043   NdbMutex_Unlock(send_thread_mutex);
2044 
2045   NDB_TICKS start_spin_ticks;
2046   NDB_TICKS yield_ticks;
2047   bool real_time = false;
2048   Uint64 min_spin_timer = 0;
2049 
2050   NdbTick_Invalidate(&start_spin_ticks);
2051   yield_ticks = NdbTick_getCurrentTicks();
2052   THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
2053   update_send_sched_config(conf, instance_no, real_time, min_spin_timer);
2054 
2055   NodeId node = 0;
2056   while (globalData.theRestartFlag != perform_stop)
2057   {
2058     this_send_thread->m_watchdog_counter = 1;
2059 
2060     NDB_TICKS now = NdbTick_getCurrentTicks();
2061     NdbMutex_Lock(send_thread_mutex);
2062     this_send_thread->m_awake = TRUE;
2063 
2064     /**
2065      * If waited for a specific node, reinsert it such that
2066      * it can be re-evaluated for send by get_node().
2067      */
2068     if (node != 0)
2069     {
2070       insert_node(node);
2071       node = 0;
2072     }
2073     while (globalData.theRestartFlag != perform_stop &&
2074            (node = get_node(instance_no, now)) != 0)   // PENDING -> ACTIVE
2075     {
2076       if (m_node_state[node].m_micros_delayed > 0)     // Node send is delayed
2077       {
2078         if (m_node_state[node].m_send_overload)        // Pause overloaded node
2079           break;
2080 
2081         /**
2082          * non-overload is a 'soft delay' which we might ignore depending on
2083          * current load. On a lightly loaded system we send immediately
2084          * to reduce latency. On a loaded system we increase throughput
2085          * by collecting into larger packets.
2086          * If multiple send threads are awake, excess threads are put to sleep.
2087          */
2088         if (count_awake_send_threads() == 1)           // Lightly loaded system:
2089           set_max_delay(node, now, 0);                 //   Send now to improve latency
2090         else if (mt_get_send_buffer_bytes(node) >= MAX_SEND_BUFFER_SIZE_TO_DELAY)
2091           set_max_delay(node, now, 0);                 // Large packet -> Send now
2092         else                                           // Sleep, let last awake send
2093 	{
2094           insert_node(node);
2095           m_more_nodes = false;
2096           node = 0;
2097           break;
2098         }
2099       }
2100 
2101       /**
2102        * Multiple send threads can not 'get' the same
2103        * node simultaneously. Thus, we does not need
2104        * to keep the global send thread mutex any longer.
2105        * Also avoids worker threads blocking on us in
2106        * ::alert_send_thread
2107        */
2108       NdbMutex_Unlock(send_thread_mutex);
2109       this_send_thread->m_watchdog_counter = 6;
2110 
2111       /**
2112        * Need a lock on the send buffers to protect against
2113        * worker thread doing ::forceSend, possibly
2114        * reset_send_buffers() and/or lock_/unlock_transporter().
2115        * To avoid a livelock with ::forceSend() on an overloaded
2116        * systems, we 'try-lock', and reinsert the node for
2117        * later retry if failed.
2118        */
2119       bool more = true;
2120       Uint32 bytes_sent = 0;
2121       if (likely(trylock_send_node(node) == 0))
2122       {
2123         more = perform_send(node, instance_no, bytes_sent);
2124         /* We return with no locks or mutexes held */
2125 
2126         /* Release chunk-wise to decrease pressure on lock */
2127         this_send_thread->m_watchdog_counter = 3;
2128         this_send_thread->m_send_buffer_pool.
2129           release_chunk(g_thr_repository->m_mm, RG_TRANSPORTER_BUFFERS);
2130       }
2131 
2132       /**
2133        * Either own perform_send() processing, or external 'alert'
2134        * could have signaled that there are more sends pending.
2135        * If we had no progress in perform_send, we conclude that
2136        * node is overloaded, and takes a break doing further send
2137        * attempts to that node. Also failure of trylock_send_node
2138        * will result on the 'overload' to be concluded.
2139        * (Quite reasonable as the worker thread is likely forceSend'ing)
2140        */
2141       now = NdbTick_getCurrentTicks();
2142       NdbMutex_Lock(send_thread_mutex);
2143 
2144       if (more ||                  // ACTIVE   -> PENDING
2145           !check_done_node(node))  // ACTIVE-P -> PENDING
2146       {
2147         insert_node(node);
2148 
2149         if (unlikely(more && bytes_sent == 0)) //Node is overloaded
2150         {
2151           set_overload_delay(node, now, 1000); //Delay send-retry by 1000us
2152         }
2153       } // else:                   // ACTIVE   -> IDLE
2154     } // while (get_node()...)
2155 
2156     /* No more nodes having data to send right now, prepare to sleep */
2157     this_send_thread->m_awake = FALSE;
2158     const Uint32 node_wait = (node != 0) ? m_node_state[node].m_micros_delayed : 0;
2159     NdbMutex_Unlock(send_thread_mutex);
2160 
2161     if (real_time)
2162     {
2163       check_real_time_break(now,
2164                             &yield_ticks,
2165                             this_send_thread->m_thread,
2166                             SendThread);
2167     }
2168 
2169 
2170     if (min_spin_timer == 0 ||
2171         check_yield(now,
2172                     &start_spin_ticks,
2173                     min_spin_timer))
2174     {
2175       Uint32 max_wait_usec;
2176       /**
2177        * We sleep a max time, possibly waiting for a specific node
2178        * with delayed send (overloaded, or waiting for more payload).
2179        * Send thread instance#0 should only be allowed to take short naps.
2180        * Other send threads may sleep longer if not needed right now.
2181        * (Will be alerted to start working when more send work arrives)
2182        */
2183       if (node_wait != 0)
2184         max_wait_usec = node_wait;
2185       else if (instance_no == 0)
2186         max_wait_usec = 10*1000;  //10ms, default sleep if not set by ::get_node()
2187       else
2188         max_wait_usec = 50*1000;  //50ms, has to wakeup before 100ms watchdog alert.
2189 
2190       yield(&this_send_thread->m_waiter_struct, max_wait_usec*1000,
2191             check_available_send_data, (struct thr_data*)NULL);
2192     }
2193   }
2194 
2195   globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
2196 }
2197 
2198 #if 0
2199 static
2200 Uint32
2201 fifo_used_pages(struct thr_data* selfptr)
2202 {
2203   return calc_fifo_used(selfptr->m_first_unused,
2204                         selfptr->m_first_free,
2205                         THR_FREE_BUF_MAX);
2206 }
2207 #endif
2208 
2209 ATTRIBUTE_NOINLINE
2210 static
2211 void
job_buffer_full(struct thr_data * selfptr)2212 job_buffer_full(struct thr_data* selfptr)
2213 {
2214   ndbout_c("job buffer full");
2215   dumpJobQueues();
2216   abort();
2217 }
2218 
2219 ATTRIBUTE_NOINLINE
2220 static
2221 void
out_of_job_buffer(struct thr_data * selfptr)2222 out_of_job_buffer(struct thr_data* selfptr)
2223 {
2224   ndbout_c("out of job buffer");
2225   dumpJobQueues();
2226   abort();
2227 }
2228 
2229 static
2230 thr_job_buffer*
seize_buffer(struct thr_repository * rep,int thr_no,bool prioa)2231 seize_buffer(struct thr_repository* rep, int thr_no, bool prioa)
2232 {
2233   thr_job_buffer* jb;
2234   struct thr_data* selfptr = &rep->m_thread[thr_no];
2235   Uint32 first_free = selfptr->m_first_free;
2236   Uint32 first_unused = selfptr->m_first_unused;
2237 
2238   /*
2239    * An empty FIFO is denoted by m_first_free == m_first_unused.
2240    * So we will never have a completely full FIFO array, at least one entry will
2241    * always be unused. But the code is simpler as a result.
2242    */
2243 
2244   /*
2245    * We never allow the fifo to become completely empty, as we want to have
2246    * a good number of signals available for trace files in case of a forced
2247    * shutdown.
2248    */
2249   Uint32 buffers = (first_free > first_unused ?
2250                     first_unused + THR_FREE_BUF_MAX - first_free :
2251                     first_unused - first_free);
2252   if (unlikely(buffers <= THR_FREE_BUF_MIN))
2253   {
2254     /*
2255      * All used, allocate another batch from global pool.
2256      *
2257      * Put the new buffers at the head of the fifo, so as not to needlessly
2258      * push out any existing buffers from the fifo (that would loose useful
2259      * data for signal dumps in trace files).
2260      */
2261     Uint32 cnt = 0;
2262     Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
2263     assert(batch > 0);
2264     assert(batch + THR_FREE_BUF_MIN < THR_FREE_BUF_MAX);
2265     do {
2266       jb = rep->m_jb_pool.seize(rep->m_mm, RG_JOBBUFFER);
2267       if (unlikely(jb == 0))
2268       {
2269         if (unlikely(cnt == 0))
2270         {
2271           out_of_job_buffer(selfptr);
2272         }
2273         break;
2274       }
2275       jb->m_len = 0;
2276       jb->m_prioa = false;
2277       first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
2278       selfptr->m_free_fifo[first_free] = jb;
2279       batch--;
2280     } while (cnt < batch);
2281     selfptr->m_first_free = first_free;
2282   }
2283 
2284   jb= selfptr->m_free_fifo[first_free];
2285   selfptr->m_first_free = (first_free + 1) % THR_FREE_BUF_MAX;
2286   /* Init here rather than in release_buffer() so signal dump will work. */
2287   jb->m_len = 0;
2288   jb->m_prioa = prioa;
2289   return jb;
2290 }
2291 
2292 static
2293 void
release_buffer(struct thr_repository * rep,int thr_no,thr_job_buffer * jb)2294 release_buffer(struct thr_repository* rep, int thr_no, thr_job_buffer* jb)
2295 {
2296   struct thr_data* selfptr = &rep->m_thread[thr_no];
2297   Uint32 first_free = selfptr->m_first_free;
2298   Uint32 first_unused = selfptr->m_first_unused;
2299 
2300   /*
2301    * Pack near-empty signals, to get more info in the signal traces.
2302    *
2303    * This is not currently used, as we only release full job buffers, hence
2304    * the #if 0.
2305    */
2306 #if 0
2307   Uint32 last_free = (first_unused ? first_unused : THR_FREE_BUF_MAX) - 1;
2308   thr_job_buffer *last_jb = selfptr->m_free_fifo[last_free];
2309   Uint32 len1, len2;
2310 
2311   if (!jb->m_prioa &&
2312       first_free != first_unused &&
2313       !last_jb->m_prioa &&
2314       (len2 = jb->m_len) <= (thr_job_buffer::SIZE / 4) &&
2315       (len1 = last_jb->m_len) + len2 <= thr_job_buffer::SIZE)
2316   {
2317     /*
2318      * The buffer being release is fairly empty, and what data it contains fit
2319      * in the previously released buffer.
2320      *
2321      * We want to avoid too many almost-empty buffers in the free fifo, as that
2322      * makes signal traces less useful due to too little data available. So in
2323      * this case we move the data from the buffer to be released into the
2324      * previous buffer, and place the to-be-released buffer at the head of the
2325      * fifo (to be immediately reused).
2326      *
2327      * This is only done for prio B buffers, as we must not merge prio A and B
2328      * data (or dumps would be incorrect), and prio A buffers are in any case
2329      * full when released.
2330      */
2331     memcpy(last_jb->m_data + len1, jb->m_data, len2*sizeof(jb->m_data[0]));
2332     last_jb->m_len = len1 + len2;
2333     jb->m_len = 0;
2334     first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
2335     selfptr->m_free_fifo[first_free] = jb;
2336     selfptr->m_first_free = first_free;
2337   }
2338   else
2339 #endif
2340   {
2341     /* Just insert at the end of the fifo. */
2342     selfptr->m_free_fifo[first_unused] = jb;
2343     first_unused = (first_unused + 1) % THR_FREE_BUF_MAX;
2344     selfptr->m_first_unused = first_unused;
2345   }
2346 
2347   if (unlikely(first_unused == first_free))
2348   {
2349     /* FIFO full, need to release to global pool. */
2350     Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
2351     assert(batch > 0);
2352     assert(batch < THR_FREE_BUF_MAX);
2353     do {
2354       rep->m_jb_pool.release(rep->m_mm, RG_JOBBUFFER,
2355                              selfptr->m_free_fifo[first_free]);
2356       first_free = (first_free + 1) % THR_FREE_BUF_MAX;
2357       batch--;
2358     } while (batch > 0);
2359     selfptr->m_first_free = first_free;
2360   }
2361 }
2362 
2363 static
2364 inline
2365 Uint32
scan_queue(struct thr_data * selfptr,Uint32 cnt,Uint32 end,Uint32 * ptr)2366 scan_queue(struct thr_data* selfptr, Uint32 cnt, Uint32 end, Uint32* ptr)
2367 {
2368   Uint32 thr_no = selfptr->m_thr_no;
2369   Uint32 **pages = selfptr->m_tq.m_delayed_signals;
2370   Uint32 free = selfptr->m_tq.m_next_free;
2371   Uint32* save = ptr;
2372   for (Uint32 i = 0; i < cnt; i++, ptr++)
2373   {
2374     Uint32 val = * ptr;
2375     if ((val & 0xFFFF) <= end)
2376     {
2377       Uint32 idx = val >> 16;
2378       Uint32 buf = idx >> 8;
2379       Uint32 pos = MAX_SIGNAL_SIZE * (idx & 0xFF);
2380 
2381       Uint32* page = * (pages + buf);
2382 
2383       const SignalHeader *s = reinterpret_cast<SignalHeader*>(page + pos);
2384       const Uint32 *data = page + pos + (sizeof(*s)>>2);
2385       if (0)
2386 	ndbout_c("found %p val: %d end: %d", s, val & 0xFFFF, end);
2387       /*
2388        * ToDo: Do measurements of the frequency of these prio A timed signals.
2389        *
2390        * If they are frequent, we may want to optimize, as sending one prio A
2391        * signal is somewhat expensive compared to sending one prio B.
2392        */
2393       sendprioa(thr_no, s, data,
2394                 data + s->theLength);
2395       * (page + pos) = free;
2396       free = idx;
2397     }
2398     else if (i > 0)
2399     {
2400       selfptr->m_tq.m_next_free = free;
2401       memmove(save, ptr, 4 * (cnt - i));
2402       return i;
2403     }
2404     else
2405     {
2406       return 0;
2407     }
2408   }
2409   selfptr->m_tq.m_next_free = free;
2410   return cnt;
2411 }
2412 
2413 static
2414 void
handle_time_wrap(struct thr_data * selfptr)2415 handle_time_wrap(struct thr_data* selfptr)
2416 {
2417   Uint32 i;
2418   struct thr_tq * tq = &selfptr->m_tq;
2419   Uint32 cnt0 = tq->m_cnt[0];
2420   Uint32 cnt1 = tq->m_cnt[1];
2421   Uint32 tmp0 = scan_queue(selfptr, cnt0, 32767, tq->m_short_queue);
2422   Uint32 tmp1 = scan_queue(selfptr, cnt1, 32767, tq->m_long_queue);
2423   cnt0 -= tmp0;
2424   cnt1 -= tmp1;
2425   tq->m_cnt[0] = cnt0;
2426   tq->m_cnt[1] = cnt1;
2427   for (i = 0; i<cnt0; i++)
2428   {
2429     assert((tq->m_short_queue[i] & 0xFFFF) > 32767);
2430     tq->m_short_queue[i] -= 32767;
2431   }
2432   for (i = 0; i<cnt1; i++)
2433   {
2434     assert((tq->m_long_queue[i] & 0xFFFF) > 32767);
2435     tq->m_long_queue[i] -= 32767;
2436   }
2437 }
2438 
2439 /**
2440  * FUNCTION: scan_time_queues(), scan_time_queues_impl(),
2441  *           scan_time_queues_backtick()
2442  *
2443  * scan_time_queues() Implements the part we want to be inlined
2444  * into the scheduler loops, while *_impl() & *_backtick() is
2445  * the more unlikely part we don't call unless the timer has
2446  * ticked backward or forward more than 1ms since last 'scan_time.
2447  *
2448  * Check if any delayed signals has expired and should be sent now.
2449  * The time_queues will be checked every time we detect a change
2450  * in current time of >= 1ms. If idle we will sleep for max 10ms
2451  * before rechecking the time_queue.
2452  *
2453  * However, some situations need special attention:
2454  * - Even if we prefer monotonic timers, they are not available, or
2455  *   implemented in our abstraction layer, for all platforms.
2456  *   A non-monotonic timer may leap when adjusted by the user, both
2457  *   forward or backwards.
2458  * - Early implementation of monotonic timers had bugs where time
2459  *   could jump. Similar problems has been reported for several VMs.
2460  * - There might be CPU contention or system swapping where we might
2461  *   sleep for significantly longer that 10ms, causing long forward
2462  *   leaps in perceived time.
2463  *
2464  * In order to adapt to this non-perfect clock behaviour, the
2465  * scheduler has its own 'm_ticks' which is the current time
2466  * as perceived by the scheduler. On entering this function, 'now'
2467  * is the 'real' current time fetched from NdbTick_getCurrentTime().
2468  * 'selfptr->m_ticks' is the previous tick seen by the scheduler,
2469  * and as such is the timestamp which reflects the current time
2470  * as seen by the timer queues.
2471  *
2472  * Normally only a few milliseconds will elapse between each ticks
2473  * as seen by the diff between 'now' and 'selfthr->m_ticks'.
2474  * However, if there are larger leaps in the current time,
2475  * we breaks this up in several small(20ms) steps
2476  * by gradually increasing schedulers 'm_ticks' time. This ensure
2477  * that delayed signals will arrive in correct relative order,
2478  * and repeated signals (pace signals) are received with
2479  * the expected frequence. However, each individual signal may
2480  * be delayed or arriving to fast. Where excact timing is critical,
2481  * these signals should do their own time calculation by reading
2482  * the clock, instead of trusting that the signal is delivered as
2483  * specified by the 'delay' argument
2484  *
2485  * If there are leaps larger than 1500ms, we try a hybrid
2486  * solution by moving the 'm_ticks' forward, close to the
2487  * actuall current time, then continue as above from that
2488  * point in time. A 'time leap Warning' will also be printed
2489  * in the logs.
2490  */
2491 static
2492 Uint32
scan_time_queues_impl(struct thr_data * selfptr,Uint32 diff)2493 scan_time_queues_impl(struct thr_data* selfptr, Uint32 diff)
2494 {
2495   NDB_TICKS last = selfptr->m_ticks;
2496   Uint32 step = diff;
2497 
2498   if (unlikely(diff > 20))     // Break up into max 20ms steps
2499   {
2500     if (unlikely(diff > 1500)) // Time leaped more than 1500ms
2501     {
2502       /**
2503        * There was a long leap in the time since last checking
2504        * of the time_queues. The clock could have been adjusted, or we
2505        * are CPU starved. Anyway, we can never make up for the lost
2506        * CPU cycles, so we forget about them and start fresh from
2507        * a point in time 1000ms behind our current time.
2508        */
2509       g_eventLogger->warning("thr: %u: Overslept %u ms, expected ~10ms",
2510                              selfptr->m_thr_no, diff);
2511 
2512       last = NdbTick_AddMilliseconds(last, diff-1000);
2513     }
2514     step = 20;  // Max expire intervall handled is 20ms
2515   }
2516 
2517   struct thr_tq * tq = &selfptr->m_tq;
2518   Uint32 curr = tq->m_current_time;
2519   Uint32 cnt0 = tq->m_cnt[0];
2520   Uint32 cnt1 = tq->m_cnt[1];
2521   Uint32 end = (curr + step);
2522   if (end >= 32767)
2523   {
2524     handle_time_wrap(selfptr);
2525     cnt0 = tq->m_cnt[0];
2526     cnt1 = tq->m_cnt[1];
2527     end -= 32767;
2528   }
2529 
2530   Uint32 tmp0 = scan_queue(selfptr, cnt0, end, tq->m_short_queue);
2531   Uint32 tmp1 = scan_queue(selfptr, cnt1, end, tq->m_long_queue);
2532 
2533   tq->m_current_time = end;
2534   tq->m_cnt[0] = cnt0 - tmp0;
2535   tq->m_cnt[1] = cnt1 - tmp1;
2536   selfptr->m_ticks = NdbTick_AddMilliseconds(last, step);
2537   return (diff - step);
2538 }
2539 
2540 /**
2541  * Clock has ticked backwards. We try to handle this
2542  * as best we can.
2543  */
2544 static
2545 void
scan_time_queues_backtick(struct thr_data * selfptr,NDB_TICKS now)2546 scan_time_queues_backtick(struct thr_data* selfptr, NDB_TICKS now)
2547 {
2548   const NDB_TICKS last = selfptr->m_ticks;
2549   assert(NdbTick_Compare(now, last) < 0);
2550 
2551   const Uint64 backward = NdbTick_Elapsed(now, last).milliSec();
2552 
2553   /**
2554    * Silently ignore sub millisecond backticks.
2555    * Such 'noise' is unfortunately common, even for monotonic timers.
2556    */
2557   if (backward > 0)
2558   {
2559     g_eventLogger->warning("thr: %u Time ticked backwards %llu ms.",
2560 		           selfptr->m_thr_no, backward);
2561 
2562     /* Long backticks should never happen for monotonic timers */
2563     assert(backward < 100 || !NdbTick_IsMonotonic());
2564 
2565     /* Accept new time as current */
2566     selfptr->m_ticks = now;
2567   }
2568 }
2569 
2570 /**
2571  * If someone sends a signal with delay it means that the signal
2572  * should be executed as soon as we come to the scan_time_queues
2573  * independent of the amount of time spent since it was sent. We
2574  * use a special time queue for bounded delay signals to avoid having
2575  * to scan through all short time queue signals in every loop of
2576  * the run job buffers.
2577  */
2578 static inline
2579 void
scan_zero_queue(struct thr_data * selfptr)2580 scan_zero_queue(struct thr_data* selfptr)
2581 {
2582   struct thr_tq * tq = &selfptr->m_tq;
2583   Uint32 cnt = tq->m_cnt[2];
2584   if (cnt)
2585   {
2586     Uint32 num_found = scan_queue(selfptr,
2587                                   cnt,
2588                                   tq->m_current_time,
2589                                   tq->m_zero_queue);
2590     require(num_found == cnt);
2591   }
2592   tq->m_cnt[2] = 0;
2593 }
2594 
2595 static inline
2596 Uint32
scan_time_queues(struct thr_data * selfptr,NDB_TICKS now)2597 scan_time_queues(struct thr_data* selfptr, NDB_TICKS now)
2598 {
2599   scan_zero_queue(selfptr);
2600   const NDB_TICKS last = selfptr->m_ticks;
2601   if (unlikely(NdbTick_Compare(now, last) < 0))
2602   {
2603     scan_time_queues_backtick(selfptr, now);
2604     return 0;
2605   }
2606 
2607   const Uint32 diff = (Uint32)NdbTick_Elapsed(last, now).milliSec();
2608   if (unlikely(diff > 0))
2609   {
2610     return scan_time_queues_impl(selfptr, diff);
2611   }
2612   return 0;
2613 }
2614 
2615 static
2616 inline
2617 Uint32*
get_free_slot(struct thr_repository * rep,struct thr_data * selfptr,Uint32 * idxptr)2618 get_free_slot(struct thr_repository* rep,
2619 	      struct thr_data* selfptr,
2620 	      Uint32* idxptr)
2621 {
2622   struct thr_tq * tq = &selfptr->m_tq;
2623   Uint32 idx = tq->m_next_free;
2624 retry:
2625 
2626   if (idx != RNIL)
2627   {
2628     Uint32 buf = idx >> 8;
2629     Uint32 pos = idx & 0xFF;
2630     Uint32* page = * (tq->m_delayed_signals + buf);
2631     Uint32* ptr = page + (MAX_SIGNAL_SIZE * pos);
2632     tq->m_next_free = * ptr;
2633     * idxptr = idx;
2634     return ptr;
2635   }
2636 
2637   Uint32 thr_no = selfptr->m_thr_no;
2638   for (Uint32 i = 0; i<thr_tq::PAGES; i++)
2639   {
2640     if (tq->m_delayed_signals[i] == 0)
2641     {
2642       struct thr_job_buffer *jb = seize_buffer(rep, thr_no, false);
2643       Uint32 * page = reinterpret_cast<Uint32*>(jb);
2644       tq->m_delayed_signals[i] = page;
2645       /**
2646        * Init page
2647        */
2648       for (Uint32 j = 0; j < MIN_SIGNALS_PER_PAGE; j ++)
2649       {
2650 	page[j * MAX_SIGNAL_SIZE] = (i << 8) + (j + 1);
2651       }
2652       page[MIN_SIGNALS_PER_PAGE*MAX_SIGNAL_SIZE] = RNIL;
2653       idx = (i << 8);
2654       goto retry;
2655     }
2656   }
2657   abort();
2658   return NULL;
2659 }
2660 
2661 void
senddelay(Uint32 thr_no,const SignalHeader * s,Uint32 delay)2662 senddelay(Uint32 thr_no, const SignalHeader* s, Uint32 delay)
2663 {
2664   struct thr_repository* rep = g_thr_repository;
2665   struct thr_data* selfptr = &rep->m_thread[thr_no];
2666   assert(my_thread_equal(selfptr->m_thr_id, my_thread_self()));
2667   unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
2668 
2669   Uint32 max;
2670   Uint32 * cntptr;
2671   Uint32 * queueptr;
2672 
2673   Uint32 alarm;
2674   Uint32 nexttimer = selfptr->m_tq.m_next_timer;
2675   if (delay == SimulatedBlock::BOUNDED_DELAY)
2676   {
2677     alarm = selfptr->m_tq.m_current_time;
2678     cntptr = selfptr->m_tq.m_cnt + 2;
2679     queueptr = selfptr->m_tq.m_zero_queue;
2680     max = thr_tq::ZQ_SIZE;
2681   }
2682   else
2683   {
2684     alarm = selfptr->m_tq.m_current_time + delay;
2685     if (delay < 100)
2686     {
2687       cntptr = selfptr->m_tq.m_cnt + 0;
2688       queueptr = selfptr->m_tq.m_short_queue;
2689       max = thr_tq::SQ_SIZE;
2690     }
2691     else
2692     {
2693       cntptr = selfptr->m_tq.m_cnt + 1;
2694       queueptr = selfptr->m_tq.m_long_queue;
2695       max = thr_tq::LQ_SIZE;
2696     }
2697   }
2698 
2699   Uint32 idx;
2700   Uint32* ptr = get_free_slot(rep, selfptr, &idx);
2701   memcpy(ptr, s, 4*siglen);
2702 
2703   if (0)
2704     ndbout_c("now: %d alarm: %d send %s from %s to %s delay: %d idx: %x %p",
2705 	     selfptr->m_tq.m_current_time,
2706 	     alarm,
2707 	     getSignalName(s->theVerId_signalNumber),
2708 	     getBlockName(refToBlock(s->theSendersBlockRef)),
2709 	     getBlockName(s->theReceiversBlockNumber),
2710 	     delay,
2711 	     idx, ptr);
2712 
2713   Uint32 i;
2714   Uint32 cnt = *cntptr;
2715   Uint32 newentry = (idx << 16) | (alarm & 0xFFFF);
2716 
2717   * cntptr = cnt + 1;
2718   selfptr->m_tq.m_next_timer = alarm < nexttimer ? alarm : nexttimer;
2719 
2720   if (cnt == 0 || delay == SimulatedBlock::BOUNDED_DELAY)
2721   {
2722     /* First delayed signal needs no order and bounded delay is FIFO */
2723     queueptr[cnt] = newentry;
2724     return;
2725   }
2726   else if (cnt < max)
2727   {
2728     for (i = 0; i<cnt; i++)
2729     {
2730       Uint32 save = queueptr[i];
2731       if ((save & 0xFFFF) > alarm)
2732       {
2733 	memmove(queueptr+i+1, queueptr+i, 4*(cnt - i));
2734 	queueptr[i] = newentry;
2735 	return;
2736       }
2737     }
2738     assert(i == cnt);
2739     queueptr[i] = newentry;
2740     return;
2741   }
2742   else
2743   {
2744     /* Out of entries in time queue, issue proper error */
2745     if (cntptr == (selfptr->m_tq.m_cnt + 0))
2746     {
2747       /* Error in short time queue */
2748       ERROR_SET(ecError, NDBD_EXIT_TIME_QUEUE_SHORT,
2749                 "Too many in Short Time Queue", "mt.cpp" );
2750     }
2751     else if (cntptr == (selfptr->m_tq.m_cnt + 1))
2752     {
2753       /* Error in long time queue */
2754       ERROR_SET(ecError, NDBD_EXIT_TIME_QUEUE_LONG,
2755                 "Too many in Long Time Queue", "mt.cpp" );
2756     }
2757     else
2758     {
2759       /* Error in zero time queue */
2760       ERROR_SET(ecError, NDBD_EXIT_TIME_QUEUE_ZERO,
2761                 "Too many in Zero Time Queue", "mt.cpp" );
2762     }
2763   }
2764 }
2765 
2766 /*
2767  * Flush the write state to the job queue, making any new signals available to
2768  * receiving threads.
2769  *
2770  * Two versions:
2771  *    - The general version flush_write_state_other() which may flush to
2772  *      any thread, and possibly signal any waiters.
2773  *    - The special version flush_write_state_self() which should only be used
2774  *      to flush messages to itself.
2775  *
2776  * Call to these functions are encapsulated through flush_write_state
2777  * which decides which of these functions to call.
2778  */
2779 static inline
2780 void
flush_write_state_self(thr_job_queue_head * q_head,thr_jb_write_state * w)2781 flush_write_state_self(thr_job_queue_head *q_head, thr_jb_write_state *w)
2782 {
2783   /*
2784    * Can simplify the flush_write_state when writing to myself:
2785    * Simply update write references wo/ mutex, memory barrier and signaling
2786    */
2787   w->m_write_buffer->m_len = w->m_write_pos;
2788   q_head->m_write_index = w->m_write_index;
2789   w->init_pending_signals();
2790 }
2791 
2792 static inline
2793 void
flush_write_state_other(thr_data * dstptr,thr_job_queue_head * q_head,thr_jb_write_state * w)2794 flush_write_state_other(thr_data *dstptr, thr_job_queue_head *q_head,
2795                         thr_jb_write_state *w)
2796 {
2797   Uint32 pending_signals_saved;
2798   /*
2799    * Two write memory barriers here, as assigning m_len may make signal data
2800    * available to other threads, and assigning m_write_index may make new
2801    * buffers available.
2802    *
2803    * We could optimize this by only doing it as needed, and only doing it
2804    * once before setting all m_len, and once before setting all m_write_index.
2805    *
2806    * But wmb() is a no-op anyway in x86 ...
2807    */
2808   wmb();
2809   w->m_write_buffer->m_len = w->m_write_pos;
2810   wmb();
2811   q_head->m_write_index = w->m_write_index;
2812 
2813   pending_signals_saved = w->get_pending_signals_wakeup();
2814   pending_signals_saved += w->get_pending_signals();
2815 
2816   if (pending_signals_saved >= MAX_SIGNALS_BEFORE_WAKEUP)
2817   {
2818     w->init_pending_signals();
2819     wakeup(&(dstptr->m_waiter));
2820   }
2821   else
2822   {
2823     w->clear_pending_signals_and_set_wakeup(pending_signals_saved);
2824   }
2825 }
2826 
2827 /**
2828   This function is used when we need to send signal immediately
2829   due to the flush limit being reached. We don't know whether
2830   signal is to ourselves in this case and we act dependent on who
2831   is the receiver of the signal.
2832 */
2833 static inline
2834 void
flush_write_state(const thr_data * selfptr,thr_data * dstptr,thr_job_queue_head * q_head,thr_jb_write_state * w)2835 flush_write_state(const thr_data *selfptr, thr_data *dstptr,
2836                   thr_job_queue_head *q_head, thr_jb_write_state *w)
2837 {
2838   if (dstptr == selfptr)
2839   {
2840     flush_write_state_self(q_head, w);
2841   }
2842   else
2843   {
2844     flush_write_state_other(dstptr, q_head, w);
2845   }
2846 }
2847 
2848 /**
2849   This function is used when we are called from flush_jbb_write_state
2850   where we know that the receiver should wakeup to receive the signals
2851   we're sending.
2852 */
2853 static inline
2854 void
flush_write_state_other_wakeup(thr_data * dstptr,thr_job_queue_head * q_head,thr_jb_write_state * w)2855 flush_write_state_other_wakeup(thr_data *dstptr,
2856                                thr_job_queue_head *q_head,
2857                                thr_jb_write_state *w)
2858 {
2859   /*
2860    * We already did a memory barrier before the loop calling this
2861    * function to ensure the buffer is properly seen by receiving
2862    * thread.
2863    */
2864   w->m_write_buffer->m_len = w->m_write_pos;
2865   wmb();
2866   q_head->m_write_index = w->m_write_index;
2867 
2868   w->init_pending_signals();
2869   wakeup(&(dstptr->m_waiter));
2870 }
2871 
2872 static
2873 void
flush_jbb_write_state(thr_data * selfptr)2874 flush_jbb_write_state(thr_data *selfptr)
2875 {
2876   Uint32 thr_count = g_thr_repository->m_thread_count;
2877   Uint32 self = selfptr->m_thr_no;
2878 
2879   thr_jb_write_state *w = selfptr->m_write_states + self;
2880   thr_data *thrptr = g_thr_repository->m_thread;
2881 
2882   /**
2883     We start by flushing to ourselves, this requires no extra memory
2884     barriers and ensures that we can proceed in the loop knowing that
2885     we will only send to remote threads.
2886 
2887     After this we will insert a memory barrier before we start updating
2888     the m_len variable that makes other threads see our signals that
2889     we're sending to them. We need the memory barrier to ensure that the
2890     buffers are seen properly updated by the remote thread when they see
2891     the pointer to them.
2892   */
2893   if (w->has_any_pending_signals())
2894   {
2895     flush_write_state_self(selfptr->m_in_queue_head + self, w);
2896   }
2897   wmb();
2898   w = selfptr->m_write_states;
2899   thr_jb_write_state *w_end = selfptr->m_write_states + thr_count;
2900   for (; w < w_end; thrptr++, w++)
2901   {
2902     if (w->has_any_pending_signals())
2903     {
2904       thr_job_queue_head *q_head = thrptr->m_in_queue_head + self;
2905       flush_write_state_other_wakeup(thrptr, q_head, w);
2906     }
2907   }
2908 }
2909 
2910 /**
2911  * Receive thread will unpack 1024 signals (MAX_RECEIVED_SIGNALS)
2912  * from Transporters before running another check_recv_queue
2913  *
2914  * This function returns true if there is not space to unpack
2915  * this amount of signals, else false.
2916  *
2917  * Also used as callback function from yield() to recheck
2918  * 'full' condition before going to sleep.
2919  */
2920 static bool
check_recv_queue(thr_job_queue_head * q_head)2921 check_recv_queue(thr_job_queue_head *q_head)
2922 {
2923   const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
2924   /**
2925    * NOTE: m_read_index is read wo/ lock (and updated by different thread)
2926    *       but since the different thread can only consume
2927    *       signals this means that the value returned from this
2928    *       function is always conservative (i.e it can be better than
2929    *       returned value, if read-index has moved but we didnt see it)
2930    */
2931   const unsigned ri = q_head->m_read_index;
2932   const unsigned wi = q_head->m_write_index;
2933   const unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
2934   return (1 + minfree + busy >= thr_job_queue::SIZE);
2935 }
2936 
2937 /**
2938  * Check if any of the receive queues for the threads being served
2939  * by this receive thread, are full.
2940  * If full: Return 'Thr_data*' for (one of) the thread(s)
2941  *          which we have to wait for. (to consume from queue)
2942  */
2943 static struct thr_data*
get_congested_recv_queue(struct thr_repository * rep,Uint32 recv_thread_id)2944 get_congested_recv_queue(struct thr_repository* rep, Uint32 recv_thread_id)
2945 {
2946   const unsigned thr_no = first_receiver_thread_no + recv_thread_id;
2947   thr_data *thrptr = rep->m_thread;
2948 
2949   for (unsigned i = 0; i<num_threads; i++, thrptr++)
2950   {
2951     thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
2952     if (check_recv_queue(q_head))
2953     {
2954       return thrptr;
2955     }
2956   }
2957   return NULL;
2958 }
2959 
2960 /**
2961  * Compute free buffers in specified queue.
2962  * The SAFETY margin is subtracted from the available
2963  * 'free'. which is returned.
2964  */
2965 static
2966 Uint32
compute_free_buffers_in_queue(const thr_job_queue_head * q_head)2967 compute_free_buffers_in_queue(const thr_job_queue_head *q_head)
2968 {
2969   /**
2970    * NOTE: m_read_index is read wo/ lock (and updated by different thread)
2971    *       but since the different thread can only consume
2972    *       signals this means that the value returned from this
2973    *       function is always conservative (i.e it can be better than
2974    *       returned value, if read-index has moved but we didnt see it)
2975    */
2976   unsigned ri = q_head->m_read_index;
2977   unsigned wi = q_head->m_write_index;
2978   unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
2979 
2980   assert(free <= thr_job_queue::SIZE);
2981 
2982   if (free <= (1 + thr_job_queue::SAFETY))
2983     return 0;
2984   else
2985     return free - (1 + thr_job_queue::SAFETY);
2986 }
2987 
2988 static
2989 Uint32
compute_min_free_out_buffers(Uint32 thr_no)2990 compute_min_free_out_buffers(Uint32 thr_no)
2991 {
2992   Uint32 minfree = thr_job_queue::SIZE;
2993   const struct thr_repository* rep = g_thr_repository;
2994   const struct thr_data *thrptr = rep->m_thread;
2995 
2996   for (unsigned i = 0; i<num_threads; i++, thrptr++)
2997   {
2998     const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
2999     unsigned free = compute_free_buffers_in_queue(q_head);
3000 
3001     if (free < minfree)
3002       minfree = free;
3003   }
3004   return minfree;
3005 }
3006 
3007 /**
3008  * Compute max signals that thr_no can execute wo/ risking
3009  *   job-buffer-full
3010  *
3011  *  see-also update_sched_config
3012  *
3013  *
3014  * 1) compute free-slots in ring-buffer from self to each thread in system
3015  * 2) pick smallest value
3016  * 3) compute how many signals this corresponds to
3017  * 4) compute how many signals self can execute if all were to be to
3018  *    the thread with the fullest ring-buffer (i.e the worst case)
3019  *
3020  *   Assumption: each signal may send *at most* 4 signals
3021  *     - this assumption is made the same in ndbd and ndbmtd and is
3022  *       mostly followed by block-code, although not it all places :-(
3023  */
3024 static
3025 Uint32
compute_max_signals_to_execute(Uint32 min_free_buffers)3026 compute_max_signals_to_execute(Uint32 min_free_buffers)
3027 {
3028   return ((min_free_buffers * MIN_SIGNALS_PER_PAGE) + 3) / 4;
3029 }
3030 
3031 static
3032 void
dumpJobQueues(void)3033 dumpJobQueues(void)
3034 {
3035   BaseString tmp;
3036   const struct thr_repository* rep = g_thr_repository;
3037   for (unsigned from = 0; from<num_threads; from++)
3038   {
3039     for (unsigned to = 0; to<num_threads; to++)
3040     {
3041       const thr_data *thrptr = rep->m_thread + to;
3042       const thr_job_queue_head *q_head = thrptr->m_in_queue_head + from;
3043 
3044       const unsigned used = q_head->used();
3045       if (used > 0)
3046       {
3047         tmp.appfmt(" job buffer %d --> %d, used %d",
3048                    from, to, used);
3049         unsigned free = compute_free_buffers_in_queue(q_head);
3050         if (free <= 0)
3051         {
3052           tmp.appfmt(" FULL!");
3053         }
3054         else if (free <= thr_job_queue::RESERVED)
3055         {
3056           tmp.appfmt(" HIGH LOAD (free:%d)", free);
3057         }
3058         tmp.appfmt("\n");
3059       }
3060     }
3061   }
3062   if (!tmp.empty())
3063   {
3064     ndbout_c("Dumping non-empty job queues:\n%s", tmp.c_str());
3065   }
3066 }
3067 
3068 void
reportSendLen(NodeId nodeId,Uint32 count,Uint64 bytes)3069 trp_callback::reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes)
3070 {
3071   SignalT<3> signalT;
3072   Signal &signal = * new (&signalT) Signal(0);
3073   memset(&signal.header, 0, sizeof(signal.header));
3074 
3075   if (g_send_threads)
3076   {
3077     /**
3078      * TODO: Implement this also when using send threads!!
3079      */
3080     return;
3081   }
3082 
3083   signal.header.theLength = 3;
3084   signal.header.theSendersSignalId = 0;
3085   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
3086   signal.theData[0] = NDB_LE_SendBytesStatistic;
3087   signal.theData[1] = nodeId;
3088   signal.theData[2] = (Uint32)(bytes/count);
3089   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
3090   signal.header.theReceiversBlockNumber = CMVMI;
3091   sendlocal(g_thr_repository->m_send_buffers[nodeId].m_send_thread,
3092             &signalT.header, signalT.theData, NULL);
3093 }
3094 
3095 /**
3096  * To lock during connect/disconnect, we take both the send lock for the node
3097  * (to protect performSend(), and the global receive lock (to protect
3098  * performReceive()). By having two locks, we avoid contention between the
3099  * common send and receive operations.
3100  *
3101  * We can have contention between connect/disconnect of one transporter and
3102  * receive for the others. But the transporter code should try to keep this
3103  * lock only briefly, ie. only to set state to DISCONNECTING / socket fd to
3104  * NDB_INVALID_SOCKET, not for the actual close() syscall.
3105  */
3106 void
lock_transporter(NodeId node)3107 trp_callback::lock_transporter(NodeId node)
3108 {
3109   Uint32 recv_thread_idx = mt_get_recv_thread_idx(node);
3110   struct thr_repository* rep = g_thr_repository;
3111   /**
3112    * Note: take the send lock _first_, so that we will not hold the receive
3113    * lock while blocking on the send lock.
3114    *
3115    * The reverse case, blocking send lock for one transporter while waiting
3116    * for receive lock, is not a problem, as the transporter being blocked is
3117    * in any case disconnecting/connecting at this point in time, and sends are
3118    * non-waiting (so we will not block sending on other transporters).
3119    */
3120   lock(&rep->m_send_buffers[node].m_send_lock);
3121   lock(&rep->m_receive_lock[recv_thread_idx]);
3122 }
3123 
3124 void
unlock_transporter(NodeId node)3125 trp_callback::unlock_transporter(NodeId node)
3126 {
3127   Uint32 recv_thread_idx = mt_get_recv_thread_idx(node);
3128   struct thr_repository* rep = g_thr_repository;
3129   unlock(&rep->m_receive_lock[recv_thread_idx]);
3130   unlock(&rep->m_send_buffers[node].m_send_lock);
3131 }
3132 
3133 int
mt_checkDoJob(Uint32 recv_thread_idx)3134 mt_checkDoJob(Uint32 recv_thread_idx)
3135 {
3136   struct thr_repository* rep = g_thr_repository;
3137 
3138   /**
3139    * Return '1' if we are not allowed to receive more signals
3140    * into the job buffers from this 'recv_thread_idx'.
3141    *
3142    * NOTE:
3143    *   We should not loop-wait for buffers to become available
3144    *   here as we currently hold the receiver-lock. Furthermore
3145    *   waiting too long here could cause the receiver thread to be
3146    *   less responsive wrt. moving incoming (TCP) data from the
3147    *   TCPTransporters into the (local) receiveBuffers.
3148    *   The thread could also oversleep on its other tasks as
3149    *   handling open/close of connections, and catching
3150    *   its own shutdown events
3151    */
3152   return (get_congested_recv_queue(rep, recv_thread_idx) != NULL);
3153 }
3154 
3155 /**
3156  * Collect all send-buffer-pages to be delivered to 'node'
3157  * from each thread. Link them together and append them to
3158  * the single send_buffer list 'sb->m_buffer'.
3159  *
3160  * The 'sb->m_buffer_lock' has to be held prior to calling
3161  * this function.
3162  *
3163  * Return: Number of bytes in the collected send-buffers.
3164  *
3165  * TODO: This is not completely fair,
3166  *       it would be better to get one entry from each thr_send_queue
3167  *       per thread instead (until empty)
3168  */
3169 static
3170 Uint32
link_thread_send_buffers(thr_repository::send_buffer * sb,Uint32 node)3171 link_thread_send_buffers(thr_repository::send_buffer * sb, Uint32 node)
3172 {
3173   Uint32 ri[MAX_BLOCK_THREADS];
3174   Uint32 wi[MAX_BLOCK_THREADS];
3175   thr_send_queue *src = g_thr_repository->m_thread_send_buffers[node];
3176   for (unsigned thr = 0; thr < num_threads; thr++)
3177   {
3178     ri[thr] = sb->m_read_index[thr];
3179     wi[thr] = src[thr].m_write_index;
3180   }
3181 
3182   Uint64 sentinel[thr_send_page::HEADER_SIZE >> 1];
3183   thr_send_page* sentinel_page = new (&sentinel[0]) thr_send_page;
3184   sentinel_page->m_next = 0;
3185 
3186   struct thr_send_buffer tmp;
3187   tmp.m_first_page = sentinel_page;
3188   tmp.m_last_page = sentinel_page;
3189 
3190   Uint32 bytes = 0;
3191 
3192 #ifdef ERROR_INSERT
3193 
3194 #define MIXOLOGY_MIX_MT_SEND 2
3195 
3196   if (unlikely(globalEmulatorData.theConfiguration->getMixologyLevel() &
3197                MIXOLOGY_MIX_MT_SEND))
3198   {
3199     /**
3200      * DEBUGGING only
3201      * Interleave at the page level from all threads with
3202      * pages to send - intended to help expose signal
3203      * order dependency bugs
3204      * TODO : Avoid having a whole separate implementation
3205      * like this.
3206      */
3207     bool more_pages;
3208 
3209     do
3210     {
3211       src = g_thr_repository->m_thread_send_buffers[node];
3212       more_pages = false;
3213       for (unsigned thr = 0; thr < num_threads; thr++, src++)
3214       {
3215         Uint32 r = ri[thr];
3216         Uint32 w = wi[thr];
3217         if (r != w)
3218         {
3219           rmb();
3220           /* Take one page from this thread's send buffer for this node */
3221           thr_send_page * p = src->m_buffers[r];
3222           assert(p->m_start == 0);
3223           bytes += p->m_bytes;
3224           tmp.m_last_page->m_next = p;
3225           tmp.m_last_page = p;
3226 
3227           /* Take page out of read_index slot list */
3228           thr_send_page * next = p->m_next;
3229           p->m_next = NULL;
3230           src->m_buffers[r] = next;
3231 
3232           if (next == NULL)
3233           {
3234             /**
3235              * Used up read slot, any more slots available to read
3236              * from this thread?
3237              */
3238             r = (r+1) % thr_send_queue::SIZE;
3239             more_pages |= (r != w);
3240 
3241             /* Update global and local per thread read indices */
3242             sb->m_read_index[thr] = r;
3243             ri[thr] = r;
3244           }
3245           else
3246           {
3247             more_pages |= true;
3248           }
3249         }
3250       }
3251     } while (more_pages);
3252   }
3253   else
3254 
3255 #endif
3256 
3257   {
3258     for (unsigned thr = 0; thr < num_threads; thr++, src++)
3259     {
3260       Uint32 r = ri[thr];
3261       Uint32 w = wi[thr];
3262       if (r != w)
3263       {
3264         rmb();
3265         while (r != w)
3266         {
3267           thr_send_page * p = src->m_buffers[r];
3268           assert(p->m_start == 0);
3269           bytes += p->m_bytes;
3270           tmp.m_last_page->m_next = p;
3271           while (p->m_next != 0)
3272           {
3273             p = p->m_next;
3274             assert(p->m_start == 0);
3275             bytes += p->m_bytes;
3276           }
3277           tmp.m_last_page = p;
3278           assert(tmp.m_last_page != 0); /* Impossible */
3279           r = (r + 1) % thr_send_queue::SIZE;
3280         }
3281         sb->m_read_index[thr] = r;
3282       }
3283     }
3284   }
3285   Uint64 node_total_send_buffer_size = sb->m_node_total_send_buffer_size;
3286   if (bytes)
3287   {
3288     /**
3289      * Append send buffers collected from threads
3290      * to end of existing m_buffers.
3291      */
3292     if (sb->m_buffer.m_first_page)
3293     {
3294       assert(sb->m_buffer.m_first_page != NULL);
3295       assert(sb->m_buffer.m_last_page != NULL);
3296       sb->m_buffer.m_last_page->m_next = tmp.m_first_page->m_next;
3297       sb->m_buffer.m_last_page = tmp.m_last_page;
3298     }
3299     else
3300     {
3301       assert(sb->m_buffer.m_first_page == NULL);
3302       assert(sb->m_buffer.m_last_page == NULL);
3303       sb->m_buffer.m_first_page = tmp.m_first_page->m_next;
3304       sb->m_buffer.m_last_page = tmp.m_last_page;
3305     }
3306   }
3307   sb->m_node_total_send_buffer_size =
3308     node_total_send_buffer_size + bytes;
3309   return bytes;
3310 }
3311 
3312 /**
3313  * pack thr_send_pages for a particular send-buffer <em>db</em>
3314  * release pages (local) to <em>pool</em>
3315  *
3316  * We're using a very simple algorithm that packs two neighbour
3317  * pages into one page if possible, if not possible we simply
3318  * move on. This guarantees that pages will at least be full to
3319  * 50% fill level which should be sufficient for our needs here.
3320  *
3321  * We call pack_sb_pages() when we fail to send all data to one
3322  * specific node immediately. This ensures that we won't keep
3323  * pages allocated with lots of free spaces.
3324  *
3325  * We may also pack_sb_pages() from get_bytes_to_send_iovec()
3326  * if all send buffers can't be filled into the iovec[]. Thus
3327  * possibly saving extra send roundtrips.
3328  *
3329  * The send threads will use the pack_sb_pages()
3330  * from the bytes_sent function which is a callback from
3331  * the transporter.
3332  *
3333  * Can only be called with relevant lock held on 'buffer'.
3334  * Return remaining unsent bytes in 'buffer'.
3335  */
3336 static
3337 Uint32
pack_sb_pages(thread_local_pool<thr_send_page> * pool,struct thr_send_buffer * buffer)3338 pack_sb_pages(thread_local_pool<thr_send_page>* pool,
3339               struct thr_send_buffer* buffer)
3340 {
3341   assert(buffer->m_first_page != NULL);
3342   assert(buffer->m_last_page != NULL);
3343   assert(buffer->m_last_page->m_next == NULL);
3344 
3345   thr_send_page* curr = buffer->m_first_page;
3346   Uint32 curr_free = curr->max_bytes() - (curr->m_bytes + curr->m_start);
3347   Uint32 bytes = curr->m_bytes;
3348   while (curr->m_next != 0)
3349   {
3350     thr_send_page* next = curr->m_next;
3351     bytes += next->m_bytes;
3352     assert(next->m_start == 0); // only first page should have half sent bytes
3353     if (next->m_bytes <= curr_free)
3354     {
3355       /**
3356        * There is free space in the current page and it is sufficient to
3357        * store the entire next-page. Copy from next page to current page
3358        * and update current page and release next page to local pool.
3359        */
3360       thr_send_page * save = next;
3361       memcpy(curr->m_data + (curr->m_bytes + curr->m_start),
3362              next->m_data,
3363              next->m_bytes);
3364 
3365       curr_free -= next->m_bytes;
3366 
3367       curr->m_bytes += next->m_bytes;
3368       curr->m_next = next->m_next;
3369 
3370       pool->release_local(save);
3371     }
3372     else
3373     {
3374       /* Not enough free space in current, move to next page */
3375       curr = next;
3376       curr_free = curr->max_bytes() - (curr->m_bytes + curr->m_start);
3377     }
3378   }
3379 
3380   buffer->m_last_page = curr;
3381   assert(bytes > 0);
3382   return bytes;
3383 }
3384 
3385 /**
3386  * Get buffered pages ready to be sent by the transporter.
3387  * All pages returned from this function will refer to
3388  * pages in the m_sending buffers
3389  *
3390  * The 'sb->m_send_lock' has to be held prior to calling
3391  * this function.
3392  *
3393  * If more send_buffer pages are required from the
3394  * 'm_buffer', we will also grab the m_buffer_lock as
3395  * required. Any grabbed m_buffer's will be moved to
3396  * m_sending buffers
3397  */
3398 Uint32
get_bytes_to_send_iovec(NodeId node,struct iovec * dst,Uint32 max)3399 trp_callback::get_bytes_to_send_iovec(NodeId node,
3400                                       struct iovec *dst,
3401                                       Uint32 max)
3402 {
3403   thr_repository::send_buffer *sb = g_thr_repository->m_send_buffers + node;
3404 
3405   if (max == 0)
3406     return 0;
3407 
3408   /**
3409    * Collect any available send pages from the thread queues
3410    * and 'm_buffers'. Append them to the end of m_sending buffers
3411    */
3412   {
3413     lock(&sb->m_buffer_lock);
3414     link_thread_send_buffers(sb, node);
3415 
3416     if (sb->m_buffer.m_first_page != NULL)
3417     {
3418       if (sb->m_sending.m_first_page == NULL)
3419       {
3420         sb->m_sending = sb->m_buffer;
3421       }
3422       else
3423       {
3424         assert(sb->m_sending.m_last_page != NULL);
3425         sb->m_sending.m_last_page->m_next = sb->m_buffer.m_first_page;
3426         sb->m_sending.m_last_page = sb->m_buffer.m_last_page;
3427       }
3428       sb->m_buffer.m_first_page = NULL;
3429       sb->m_buffer.m_last_page  = NULL;
3430     }
3431     unlock(&sb->m_buffer_lock);
3432 
3433     if (sb->m_sending.m_first_page == NULL)
3434       return 0;
3435   }
3436 
3437   /**
3438    * Process linked-list and put into iovecs
3439    */
3440 fill_iovec:
3441   Uint32 tot = 0;
3442   Uint32 pos = 0;
3443   thr_send_page * p = sb->m_sending.m_first_page;
3444   sb->m_bytes_sent = 0;
3445 
3446   do {
3447     dst[pos].iov_len = p->m_bytes;
3448     dst[pos].iov_base = p->m_data + p->m_start;
3449     assert(p->m_start + p->m_bytes <= p->max_bytes());
3450     tot += p->m_bytes;
3451     pos++;
3452     p = p->m_next;
3453     if (p == NULL)
3454       return pos;
3455   } while (pos < max);
3456 
3457   /**
3458    * Possibly pack send-buffers to get better utilization:
3459    * If we were unable to fill all sendbuffers into iovec[],
3460    * we pack the sendbuffers now if they have a low fill degree.
3461    * This could save us another OS-send for sending the remaining.
3462    */
3463   if (pos == max && max > 1 &&                    // Exhausted iovec[]
3464       tot < (pos * thr_send_page::max_bytes())/4) // < 25% filled
3465   {
3466     const Uint32 thr_no = sb->m_send_thread;
3467     assert(thr_no != NO_SEND_THREAD);
3468 
3469     if (!is_send_thread(thr_no))
3470     {
3471       thr_data * thrptr = &g_thr_repository->m_thread[thr_no];
3472       pack_sb_pages(&thrptr->m_send_buffer_pool, &sb->m_sending);
3473     }
3474     else
3475     {
3476       pack_sb_pages(g_send_threads->get_send_buffer_pool(thr_no), &sb->m_sending);
3477     }
3478 
3479     /**
3480      * Retry filling iovec[]. As 'pack' will ensure at least 50% fill degree,
3481      * we will not do another 'pack' after the retry.
3482      */
3483     goto fill_iovec;
3484   }
3485   return pos;
3486 }
3487 
3488 static
3489 void
release_list(thread_local_pool<thr_send_page> * pool,thr_send_page * head,thr_send_page * tail)3490 release_list(thread_local_pool<thr_send_page>* pool,
3491              thr_send_page* head, thr_send_page * tail)
3492 {
3493   while (head != tail)
3494   {
3495     thr_send_page * tmp = head;
3496     head = head->m_next;
3497     pool->release_local(tmp);
3498   }
3499   pool->release_local(tail);
3500 }
3501 
3502 static
3503 Uint32
bytes_sent(thread_local_pool<thr_send_page> * pool,thr_repository::send_buffer * sb,Uint32 bytes)3504 bytes_sent(thread_local_pool<thr_send_page>* pool,
3505            thr_repository::send_buffer* sb, Uint32 bytes)
3506 {
3507   Uint64 node_total_send_buffer_size = sb->m_node_total_send_buffer_size;
3508   assert(bytes);
3509 
3510   sb->m_bytes_sent = bytes;
3511 
3512   Uint32 remain = bytes;
3513   thr_send_page * prev = NULL;
3514   thr_send_page * curr = sb->m_sending.m_first_page;
3515   sb->m_node_total_send_buffer_size = node_total_send_buffer_size - bytes;
3516 
3517   /* Some, or all, in 'm_sending' was sent, find endpoint. */
3518   while (remain && remain >= curr->m_bytes)
3519   {
3520     /**
3521      * Calculate new current page such that we can release the
3522      * pages that have been completed and update the state of
3523      * the new current page
3524      */
3525     remain -= curr->m_bytes;
3526     prev = curr;
3527     curr = curr->m_next;
3528   }
3529 
3530   if (remain)
3531   {
3532     /**
3533      * Not all pages was fully sent and we stopped in the middle of
3534      * a page
3535      *
3536      * Update state of new current page and release any pages
3537      * that have already been sent
3538      */
3539     curr->m_start += remain;
3540     assert(curr->m_bytes > remain);
3541     curr->m_bytes -= remain;
3542     if (prev)
3543     {
3544       release_list(pool, sb->m_sending.m_first_page, prev);
3545     }
3546   }
3547   else
3548   {
3549     /**
3550      * We sent a couple of full pages and the sending stopped at a
3551      * page boundary, so we only need to release the sent pages
3552      * and update the new current page.
3553      */
3554     if (prev)
3555     {
3556       release_list(pool, sb->m_sending.m_first_page, prev);
3557 
3558       if (prev == sb->m_sending.m_last_page)
3559       {
3560         /**
3561          * Every thing was released, release the pages in the local pool
3562          */
3563         sb->m_sending.m_first_page = NULL;
3564         sb->m_sending.m_last_page = NULL;
3565         return 0;
3566       }
3567     }
3568     else
3569     {
3570       assert(sb->m_sending.m_first_page != NULL);
3571       pool->release_local(sb->m_sending.m_first_page);
3572     }
3573   }
3574 
3575   sb->m_sending.m_first_page = curr;
3576 
3577   /**
3578    * Since not all bytes were sent...
3579    * spend the time to try to pack the m_sending pages
3580    * possibly releasing send-buffer
3581    */
3582   return pack_sb_pages(pool, &sb->m_sending);
3583 }
3584 
3585 /**
3586  * Register the specified amount of 'bytes' as sent, starting
3587  * from the first avail byte in the m_sending buffer.
3588  *
3589  * The 'm_send_lock' has to be held prior to calling
3590  * this function.
3591  */
3592 Uint32
bytes_sent(NodeId node,Uint32 bytes)3593 trp_callback::bytes_sent(NodeId node, Uint32 bytes)
3594 {
3595   thr_repository::send_buffer *sb = g_thr_repository->m_send_buffers+node;
3596   Uint32 thr_no = sb->m_send_thread;
3597   assert(thr_no != NO_SEND_THREAD);
3598   if (!is_send_thread(thr_no))
3599   {
3600     thr_data * thrptr = &g_thr_repository->m_thread[thr_no];
3601     return ::bytes_sent(&thrptr->m_send_buffer_pool,
3602                         sb,
3603                         bytes);
3604   }
3605   else
3606   {
3607     return ::bytes_sent(g_send_threads->get_send_buffer_pool(thr_no),
3608                         sb,
3609                         bytes);
3610   }
3611 }
3612 
3613 /**
3614  * NOTE:
3615  *    ::has_data_to_send() is only called
3616  *    from TransporterRegistry::performSend().
3617  *    ::performSend() in turn, is only called from either
3618  *    the single threaded scheduler, or the API, which
3619  *    will end up in the single threaded ::has_data_to_send()
3620  *    implemented in class TransporterCallbackKernelNonMT
3621  *    Thus, this ::has_data_to_send is actually never used!
3622  *
3623  *    However, a simple implementaton based on probing
3624  *    get_bytes_to_send_iovec() is provided for completenes.
3625  *    As this is unused code, it is completely untested.
3626  */
3627 bool
has_data_to_send(NodeId node)3628 trp_callback::has_data_to_send(NodeId node)
3629 {
3630   assert(false); //Trap untested code, see comment above
3631   struct iovec v[1];
3632   return (get_bytes_to_send_iovec(node, v, 1) > 0);
3633 }
3634 
3635 /**
3636  * Reset send buffers by releasing all buffered send pages,
3637  * in both the m_buffer and m_sending buffers, *and*
3638  * available thread send buffers.
3639  *
3640  * Neither m_send_lock or m_buffer_lock should be set prior
3641  * to calling this function, they will be acquired here
3642  * as required.
3643  */
3644 void
reset_send_buffer(NodeId node,bool should_be_empty)3645 trp_callback::reset_send_buffer(NodeId node, bool should_be_empty)
3646 {
3647   struct thr_repository *rep = g_thr_repository;
3648   thr_repository::send_buffer * sb = rep->m_send_buffers+node;
3649 
3650   thread_local_pool<thr_send_page> pool(&rep->m_sb_pool, 0);
3651 
3652   lock(&sb->m_send_lock);
3653   lock(&sb->m_buffer_lock);
3654 
3655   /* Collect thread send buffers into m_buffer. */
3656   link_thread_send_buffers(sb, node);
3657 
3658   /* Drop all pending data in m_buffer. */
3659   if (sb->m_buffer.m_first_page)
3660   {
3661     release_list(&pool, sb->m_buffer.m_first_page, sb->m_buffer.m_last_page);
3662     sb->m_buffer.m_first_page = NULL;
3663     sb->m_buffer.m_last_page  = NULL;
3664     assert(!should_be_empty); // Got data when it should be empty
3665   }
3666 
3667   /* Drop all pending data in m_sending buffers. */
3668   if (sb->m_sending.m_first_page)
3669   {
3670     release_list(&pool, sb->m_sending.m_first_page, sb->m_sending.m_last_page);
3671     sb->m_sending.m_first_page = NULL;
3672     sb->m_sending.m_last_page = NULL;
3673 
3674     assert(!should_be_empty); // Got data when it should be empty
3675   }
3676   sb->m_node_total_send_buffer_size = 0;
3677   unlock(&sb->m_buffer_lock);
3678   unlock(&sb->m_send_lock);
3679 
3680   pool.release_all(rep->m_mm, RG_TRANSPORTER_BUFFERS);
3681 }
3682 
3683 static inline
3684 void
register_pending_send(thr_data * selfptr,Uint32 nodeId)3685 register_pending_send(thr_data *selfptr, Uint32 nodeId)
3686 {
3687   /* Mark that this node has pending send data. */
3688   if (!selfptr->m_pending_send_mask.get(nodeId))
3689   {
3690     selfptr->m_pending_send_mask.set(nodeId, 1);
3691     Uint32 i = selfptr->m_pending_send_count;
3692     selfptr->m_pending_send_nodes[i] = nodeId;
3693     selfptr->m_pending_send_count = i + 1;
3694   }
3695 }
3696 
3697 /**
3698   Pack send buffers to make memory available to other threads. The signals
3699   sent uses often one page per signal which means that most pages are very
3700   unpacked. In some situations this means that we can run out of send buffers
3701   and still have massive amounts of free space.
3702 
3703   We call this from the main loop in the block threads when we fail to
3704   allocate enough send buffers. In addition we call the node local
3705   pack_sb_pages() several places - See header-comment for that function.
3706 */
3707 static
3708 void
try_pack_send_buffers(thr_data * selfptr)3709 try_pack_send_buffers(thr_data* selfptr)
3710 {
3711   thr_repository* rep = g_thr_repository;
3712   thread_local_pool<thr_send_page>* pool = &selfptr->m_send_buffer_pool;
3713 
3714   for (Uint32 i = 1; i < NDB_ARRAY_SIZE(selfptr->m_send_buffers); i++)
3715   {
3716     if (globalTransporterRegistry.get_transporter(i))
3717     {
3718       thr_repository::send_buffer* sb = rep->m_send_buffers+i;
3719       if (trylock(&sb->m_buffer_lock) != 0)
3720       {
3721         continue; // Continue with next if busy
3722       }
3723 
3724       link_thread_send_buffers(sb, i);
3725       if (sb->m_buffer.m_first_page != NULL)
3726       {
3727         pack_sb_pages(pool, &sb->m_buffer);
3728       }
3729       unlock(&sb->m_buffer_lock);
3730     }
3731   }
3732   /* Release surplus buffers from local pool to global pool */
3733   pool->release_global(g_thr_repository->m_mm, RG_TRANSPORTER_BUFFERS);
3734 }
3735 
3736 
3737 /**
3738  * publish thread-locally prepared send-buffer
3739  */
3740 static
3741 void
flush_send_buffer(thr_data * selfptr,Uint32 node)3742 flush_send_buffer(thr_data* selfptr, Uint32 node)
3743 {
3744   Uint32 thr_no = selfptr->m_thr_no;
3745   thr_send_buffer * src = selfptr->m_send_buffers + node;
3746   thr_repository* rep = g_thr_repository;
3747 
3748   if (src->m_first_page == 0)
3749   {
3750     return;
3751   }
3752   assert(src->m_last_page != 0);
3753 
3754   thr_send_queue * dst = rep->m_thread_send_buffers[node]+thr_no;
3755   thr_repository::send_buffer* sb = rep->m_send_buffers+node;
3756 
3757   Uint32 wi = dst->m_write_index;
3758   Uint32 next = (wi + 1) % thr_send_queue::SIZE;
3759   Uint32 ri = sb->m_read_index[thr_no];
3760 
3761   /**
3762    * If thread local ring buffer of send-buffers is full:
3763    * Empty it by transfering them to the global send_buffer list.
3764    */
3765   if (unlikely(next == ri))
3766   {
3767     lock(&sb->m_buffer_lock);
3768     link_thread_send_buffers(sb, node);
3769     unlock(&sb->m_buffer_lock);
3770   }
3771 
3772   dst->m_buffers[wi] = src->m_first_page;
3773   wmb();
3774   dst->m_write_index = next;
3775 
3776   src->m_first_page = 0;
3777   src->m_last_page = 0;
3778 }
3779 
3780 /**
3781  * This is used in case send buffer gets full, to force an emergency send,
3782  * hopefully freeing up some buffer space for the next signal.
3783  */
3784 bool
forceSend(NodeId nodeId)3785 mt_send_handle::forceSend(NodeId nodeId)
3786 {
3787   struct thr_repository *rep = g_thr_repository;
3788   struct thr_data *selfptr = m_selfptr;
3789   struct thr_repository::send_buffer * sb = rep->m_send_buffers + nodeId;
3790 
3791   {
3792     /**
3793      * NOTE: we don't need a memory barrier after clearing
3794      *       m_force_send here as we unconditionally lock m_send_lock
3795      *       hence there is no way that our data can be "unsent"
3796      */
3797     sb->m_force_send = 0;
3798 
3799     lock(&sb->m_send_lock);
3800     sb->m_send_thread = selfptr->m_thr_no;
3801     globalTransporterRegistry.performSend(nodeId);
3802     sb->m_send_thread = NO_SEND_THREAD;
3803     unlock(&sb->m_send_lock);
3804 
3805     /**
3806      * release buffers prior to maybe looping on sb->m_force_send
3807      */
3808     selfptr->m_send_buffer_pool.release_global(rep->m_mm,
3809                                                RG_TRANSPORTER_BUFFERS);
3810     /**
3811      * We need a memory barrier here to prevent race between clearing lock
3812      *   and reading of m_force_send.
3813      *   CPU can reorder the load to before the clear of the lock
3814      */
3815     mb();
3816     if (unlikely(sb->m_force_send))
3817     {
3818       register_pending_send(selfptr, nodeId);
3819     }
3820   }
3821 
3822   return true;
3823 }
3824 
3825 /**
3826  * try sending data
3827  */
3828 static
3829 void
try_send(thr_data * selfptr,Uint32 node)3830 try_send(thr_data * selfptr, Uint32 node)
3831 {
3832   struct thr_repository *rep = g_thr_repository;
3833   struct thr_repository::send_buffer * sb = rep->m_send_buffers + node;
3834 
3835   if (trylock(&sb->m_send_lock) == 0)
3836   {
3837     /**
3838      * Now clear the flag, and start sending all data available to this node.
3839      *
3840      * Put a memory barrier here, so that if another thread tries to grab
3841      * the send lock but fails due to us holding it here, we either
3842      * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or
3843      * 2) We clear here the flag just set by the other thread, but then we
3844      * will (thanks to mb()) be able to see and send all of the data already
3845      * in the first send iteration.
3846      */
3847     sb->m_force_send = 0;
3848     mb();
3849 
3850     sb->m_send_thread = selfptr->m_thr_no;
3851     globalTransporterRegistry.performSend(node);
3852     sb->m_send_thread = NO_SEND_THREAD;
3853     unlock(&sb->m_send_lock);
3854 
3855     /**
3856      * release buffers prior to maybe looping on sb->m_force_send
3857      */
3858     selfptr->m_send_buffer_pool.release_global(rep->m_mm,
3859                                                RG_TRANSPORTER_BUFFERS);
3860 
3861     /**
3862      * We need a memory barrier here to prevent race between clearing lock
3863      *   and reading of m_force_send.
3864      *   CPU can reorder the load to before the clear of the lock
3865      */
3866     mb();
3867     if (unlikely(sb->m_force_send))
3868     {
3869       register_pending_send(selfptr, node);
3870     }
3871   }
3872 }
3873 
3874 /**
3875  * Flush send buffers and append them to dst. nodes send queue
3876  *
3877  * Flushed buffer contents are piggybacked when another thread
3878  * do_send() to the same dst. node. This makes it possible to have
3879  * more data included in each message, and thereby reduces total
3880  * #messages handled by the OS which really impacts performance!
3881  */
3882 static
3883 void
do_flush(struct thr_data * selfptr)3884 do_flush(struct thr_data* selfptr)
3885 {
3886   Uint32 i;
3887   Uint32 count = selfptr->m_pending_send_count;
3888   Uint8 *nodes = selfptr->m_pending_send_nodes;
3889 
3890   for (i = 0; i < count; i++)
3891   {
3892     flush_send_buffer(selfptr, nodes[i]);
3893   }
3894 }
3895 
3896 /**
3897  * Send any pending data to remote nodes.
3898  *
3899  * If MUST_SEND is false, will only try to lock the send lock, but if it would
3900  * block, that node is skipped, to be tried again next time round.
3901  *
3902  * If MUST_SEND is true, we still only try to lock, but if it would block,
3903  * we will force the thread holding the lock, to do the sending on our behalf.
3904  *
3905  * The list of pending nodes to send to is thread-local, but the per-node send
3906  * buffer is shared by all threads. Thus we might skip a node for which
3907  * another thread has pending send data, and we might send pending data also
3908  * for another thread without clearing the node from the pending list of that
3909  * other thread (but we will never loose signals due to this).
3910  *
3911  * Return number of nodes which still has pending data to be sent.
3912  * These will be retried again in the next round. 'Pending' is
3913  * returned as a negative number if nothing was sent in this round.
3914  *
3915  * (Likely due to receivers consuming too slow, and receive and send buffers
3916  *  already being filled up)
3917  */
3918 static
3919 Int32
do_send(struct thr_data * selfptr,bool must_send)3920 do_send(struct thr_data* selfptr, bool must_send)
3921 {
3922   Uint32 count = selfptr->m_pending_send_count;
3923   Uint8 *nodes = selfptr->m_pending_send_nodes;
3924 
3925   if (count == 0)
3926   {
3927     return 0; // send-buffers empty
3928   }
3929 
3930   /* Clear the pending list. */
3931   selfptr->m_pending_send_mask.clear();
3932   selfptr->m_pending_send_count = 0;
3933 
3934   if (g_send_threads)
3935   {
3936     const NDB_TICKS now = NdbTick_getCurrentTicks();
3937 
3938     /**
3939      * We're using send threads, in this case we simply alert any send
3940      * thread to take over the actual sending of the signals. In this case
3941      * we will never have any failures. So we simply need to flush buffers
3942      * leave over to the send thread
3943      */
3944     for (Uint32 i = 0; i < count; i++)
3945     {
3946       Uint32 node = nodes[i];
3947       selfptr->m_watchdog_counter = 6;
3948 
3949       flush_send_buffer(selfptr, node);
3950       g_send_threads->alert_send_thread(node, now);
3951     }
3952     return 0;
3953   }
3954 
3955   /**
3956    * We're not using send threads.
3957    */
3958   Uint32 made_progress = 0;
3959   struct thr_repository* rep = g_thr_repository;
3960 
3961   for (Uint32 i = 0; i < count; i++)
3962   {
3963     Uint32 node = nodes[i];
3964     thr_repository::send_buffer * sb = rep->m_send_buffers + node;
3965 
3966     selfptr->m_watchdog_counter = 6;
3967     flush_send_buffer(selfptr, node);
3968 
3969     /**
3970      * If we must send now, set the force_send flag.
3971      *
3972      * This will ensure that if we do not get the send lock, the thread
3973      * holding the lock will try sending again for us when it has released
3974      * the lock.
3975      *
3976      * The lock/unlock pair works as a memory barrier to ensure that the
3977      * flag update is flushed to the other thread.
3978      */
3979     if (must_send)
3980     {
3981       sb->m_force_send = 1;
3982     }
3983 
3984     if (trylock(&sb->m_send_lock) != 0)
3985     {
3986       if (!must_send)
3987       {
3988         /**
3989          * Not doing this node now, re-add to pending list.
3990          *
3991          * As we only add from the start of an empty list, we are safe from
3992          * overwriting the list while we are iterating over it.
3993          */
3994         register_pending_send(selfptr, node);
3995       }
3996       else
3997       {
3998         /* Other thread will send for us as we set m_force_send. */
3999       }
4000     }
4001     else  //Got send_lock
4002     {
4003       /**
4004        * Now clear the flag, and start sending all data available to this node.
4005        *
4006        * Put a memory barrier here, so that if another thread tries to grab
4007        * the send lock but fails due to us holding it here, we either
4008        * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or
4009        * 2) We clear here the flag just set by the other thread, but then we
4010        * will (thanks to mb()) be able to see and send all of the data already
4011        * in the first send iteration.
4012        */
4013       sb->m_force_send = 0;
4014       mb();
4015 
4016       /**
4017        * Set m_send_thr so that our transporter callback can know which thread
4018        * holds the send lock for this remote node.
4019        */
4020       sb->m_send_thread = selfptr->m_thr_no;
4021       const bool more = globalTransporterRegistry.performSend(node);
4022       made_progress += sb->m_bytes_sent;
4023       sb->m_send_thread = NO_SEND_THREAD;
4024       unlock(&sb->m_send_lock);
4025 
4026       if (more)   //Didn't complete all my send work
4027       {
4028         register_pending_send(selfptr, node);
4029       }
4030       else
4031       {
4032         /**
4033          * We need a memory barrier here to prevent race between clearing lock
4034          *   and reading of m_force_send.
4035          *   CPU can reorder the load to before the clear of the lock
4036          */
4037         mb();
4038         if (sb->m_force_send) //Other thread forced us to do more send
4039         {
4040           made_progress++;    //Avoid false 'no progres' handling
4041           register_pending_send(selfptr, node);
4042         }
4043       }
4044     }
4045   } //for all nodes
4046 
4047   selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
4048 
4049   return (made_progress)               // Had some progress?
4050     ?  selfptr->m_pending_send_count   // More do_send is required
4051     : -selfptr->m_pending_send_count;  // All busy, or didn't find any work (-> -0)
4052 }
4053 
4054 /**
4055  * These are the implementations of the TransporterSendBufferHandle methods
4056  * in ndbmtd.
4057  */
4058 Uint32 *
getWritePtr(NodeId node,Uint32 len,Uint32 prio,Uint32 max)4059 mt_send_handle::getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max)
4060 {
4061   struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
4062   thr_send_page * p = b->m_last_page;
4063   if ((p != 0) && (p->m_bytes + p->m_start + len <= thr_send_page::max_bytes()))
4064   {
4065     return (Uint32*)(p->m_data + p->m_start + p->m_bytes);
4066   }
4067   else if (p != 0)
4068   {
4069     // TODO: maybe dont always flush on page-boundary ???
4070     flush_send_buffer(m_selfptr, node);
4071     if (!g_send_threads)
4072       try_send(m_selfptr, node);
4073   }
4074 
4075   if ((p = m_selfptr->m_send_buffer_pool.seize(g_thr_repository->m_mm,
4076                                                RG_TRANSPORTER_BUFFERS)) != 0)
4077   {
4078     p->m_bytes = 0;
4079     p->m_start = 0;
4080     p->m_next = 0;
4081     b->m_first_page = b->m_last_page = p;
4082     return (Uint32*)p->m_data;
4083   }
4084   return 0;
4085 }
4086 
4087 /**
4088  * Acquire send buffer size without locking and without gathering
4089  *
4090  * OJA: The usability of this function is rather questionable.
4091  *      m_node_total_send_buffer_size is only updated by
4092  *      link_thread_send_buffers() and bytes_sent(), both
4093  *      part of performSend(). Thus, it is valid after a send.
4094  *      However, checking it *before* a send in order to
4095  *      determine if the payload is yet too small doesn't
4096  *      really provide correct information of the current state.
4097  *      Most likely '0 will be returned if previous send succeeded.
4098  *
4099  *      A better alternative could be to add a 'min_send' argument
4100  *      to perform_send(), and skip sending if not '>='.
4101  *      (After real size is recalculated)
4102  */
4103 static Uint64
mt_get_send_buffer_bytes(NodeId node)4104 mt_get_send_buffer_bytes(NodeId node)
4105 {
4106   thr_repository *rep = g_thr_repository;
4107   thr_repository::send_buffer *sb = &rep->m_send_buffers[node];
4108   const Uint64 send_buffer_size = sb->m_node_total_send_buffer_size;
4109   return send_buffer_size;
4110 }
4111 
4112 void
mt_getSendBufferLevel(Uint32 self,NodeId node,SB_LevelType & level)4113 mt_getSendBufferLevel(Uint32 self, NodeId node, SB_LevelType &level)
4114 {
4115   Resource_limit rl, rl_shared;
4116   const Uint32 page_size = thr_send_page::PGSIZE;
4117   thr_repository *rep = g_thr_repository;
4118   thr_repository::send_buffer *b = &rep->m_send_buffers[node];
4119   Uint64 current_node_send_buffer_size = b->m_node_total_send_buffer_size;
4120 
4121   rep->m_mm->get_resource_limit_nolock(RG_TRANSPORTER_BUFFERS, rl);
4122   Uint64 current_send_buffer_size = rl.m_min * page_size;
4123   Uint64 current_used_send_buffer_size = rl.m_curr * page_size * 100;
4124   Uint64 current_percentage =
4125     current_used_send_buffer_size / current_send_buffer_size;
4126 
4127   if (current_percentage >= 90)
4128   {
4129     rep->m_mm->get_resource_limit(0, rl_shared);
4130     Uint32 avail_shared = rl_shared.m_max - rl_shared.m_curr;
4131     if (rl.m_min + avail_shared > rl.m_max)
4132     {
4133       current_send_buffer_size = rl.m_max * page_size;
4134     }
4135     else
4136     {
4137       current_send_buffer_size = (rl.m_min + avail_shared) * page_size;
4138     }
4139   }
4140   calculate_send_buffer_level(current_node_send_buffer_size,
4141                               current_send_buffer_size,
4142                               current_used_send_buffer_size,
4143                               num_threads,
4144                               level);
4145   return;
4146 }
4147 
4148 void
getSendBufferLevel(NodeId node,SB_LevelType & level)4149 mt_send_handle::getSendBufferLevel(NodeId node, SB_LevelType &level)
4150 {
4151   (void)node;
4152   (void)level;
4153   return;
4154 }
4155 
4156 Uint32
updateWritePtr(NodeId node,Uint32 lenBytes,Uint32 prio)4157 mt_send_handle::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
4158 {
4159   struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
4160   thr_send_page * p = b->m_last_page;
4161   p->m_bytes += lenBytes;
4162   return p->m_bytes;
4163 }
4164 
4165 /*
4166  * Insert a signal in a job queue.
4167  *
4168  * The signal is not visible to consumers yet after return from this function,
4169  * only recorded in the thr_jb_write_state. It is necessary to first call
4170  * flush_write_state() for this.
4171  *
4172  * The new_buffer is a job buffer to use if the current one gets full. If used,
4173  * we return true, indicating that the caller should allocate a new one for
4174  * the next call. (This is done to allow to insert under lock, but do the
4175  * allocation outside the lock).
4176  */
4177 static inline
4178 bool
insert_signal(thr_job_queue * q,thr_job_queue_head * h,thr_jb_write_state * w,Uint32 prioa,const SignalHeader * sh,const Uint32 * data,const Uint32 secPtr[3],thr_job_buffer * new_buffer)4179 insert_signal(thr_job_queue *q, thr_job_queue_head *h,
4180               thr_jb_write_state *w, Uint32 prioa,
4181               const SignalHeader* sh, const Uint32 *data,
4182               const Uint32 secPtr[3], thr_job_buffer *new_buffer)
4183 {
4184   Uint32 write_pos = w->m_write_pos;
4185   Uint32 datalen = sh->theLength;
4186   assert(w->is_open());
4187   assert(w->m_write_buffer == q->m_buffers[w->m_write_index]);
4188   memcpy(w->m_write_buffer->m_data + write_pos, sh, sizeof(*sh));
4189   write_pos += (sizeof(*sh) >> 2);
4190   memcpy(w->m_write_buffer->m_data + write_pos, data, 4*datalen);
4191   write_pos += datalen;
4192   const Uint32 *p= secPtr;
4193   for (Uint32 i = 0; i < sh->m_noOfSections; i++)
4194     w->m_write_buffer->m_data[write_pos++] = *p++;
4195   w->increment_pending_signals();
4196 
4197 #if SIZEOF_CHARP == 8
4198   /* Align to 8-byte boundary, to ensure aligned copies. */
4199   write_pos= (write_pos+1) & ~((Uint32)1);
4200 #endif
4201 
4202   /*
4203    * We make sure that there is always room for at least one signal in the
4204    * current buffer in the queue, so one insert is always possible without
4205    * adding a new buffer.
4206    */
4207   if (likely(write_pos + MAX_SIGNAL_SIZE <= thr_job_buffer::SIZE))
4208   {
4209     w->m_write_pos = write_pos;
4210     return false;
4211   }
4212   else
4213   {
4214     /*
4215      * Need a write memory barrier here, as this might make signal data visible
4216      * to other threads.
4217      *
4218      * ToDo: We actually only need the wmb() here if we already make this
4219      * buffer visible to the other thread. So we might optimize it a bit. But
4220      * wmb() is a no-op on x86 anyway...
4221      */
4222     wmb();
4223     w->m_write_buffer->m_len = write_pos;
4224     Uint32 write_index = (w->m_write_index + 1) % thr_job_queue::SIZE;
4225 
4226     /**
4227      * Full job buffer is fatal.
4228      *
4229      * ToDo: should we wait for it to become non-full? There is no guarantee
4230      * that this will actually happen...
4231      *
4232      * Or alternatively, ndbrequire() ?
4233      */
4234     if (unlikely(write_index == h->m_read_index))
4235     {
4236       job_buffer_full(0);
4237     }
4238     new_buffer->m_len = 0;
4239     new_buffer->m_prioa = prioa;
4240     q->m_buffers[write_index] = new_buffer;
4241     w->m_write_index = write_index;
4242     w->m_write_pos = 0;
4243     w->m_write_buffer = new_buffer;
4244     return true;                // Buffer new_buffer used
4245   }
4246 
4247   return false;                 // Buffer new_buffer not used
4248 }
4249 
4250 static
4251 void
read_jbb_state(thr_data * selfptr,Uint32 count)4252 read_jbb_state(thr_data *selfptr, Uint32 count)
4253 {
4254   thr_jb_read_state *r = selfptr->m_read_states;
4255   const thr_job_queue *q = selfptr->m_in_queue;
4256   const thr_job_queue_head *h = selfptr->m_in_queue_head;
4257   for (Uint32 i = 0; i < count; i++,r++)
4258   {
4259     if (r->is_open())
4260     {
4261       Uint32 read_index = r->m_read_index;
4262 
4263       /**
4264        * Optimization: Only reload when possibly empty.
4265        * Avoid cache reload of shared thr_job_queue_head
4266        * Read head directly to avoid unnecessary cache
4267        * load of first cache line of m_in_queue entry.
4268        */
4269       if (r->m_write_index == read_index)
4270       {
4271         r->m_write_index = h[i].m_write_index;
4272         read_barrier_depends();
4273         r->m_read_end = q[i].m_buffers[read_index]->m_len;
4274       }
4275     }
4276   }
4277 }
4278 
4279 static
4280 bool
read_jba_state(thr_data * selfptr)4281 read_jba_state(thr_data *selfptr)
4282 {
4283   thr_jb_read_state *r = &(selfptr->m_jba_read_state);
4284   r->m_write_index = selfptr->m_jba_head.m_write_index;
4285   read_barrier_depends();
4286   r->m_read_end = selfptr->m_jba.m_buffers[r->m_read_index]->m_len;
4287   return r->is_empty();
4288 }
4289 
4290 /* Check all job queues, return true only if all are empty. */
4291 static bool
check_queues_empty(thr_data * selfptr)4292 check_queues_empty(thr_data *selfptr)
4293 {
4294   Uint32 thr_count = g_thr_repository->m_thread_count;
4295   bool empty = read_jba_state(selfptr);
4296   if (!empty)
4297     return false;
4298 
4299   read_jbb_state(selfptr, thr_count);
4300   const thr_jb_read_state *r = selfptr->m_read_states;
4301   for (Uint32 i = 0; i < thr_count; i++,r++)
4302   {
4303     if (!r->is_empty())
4304       return false;
4305   }
4306   return true;
4307 }
4308 
4309 /*
4310  * Execute at most MAX_SIGNALS signals from one job queue, updating local read
4311  * state as appropriate.
4312  *
4313  * Returns number of signals actually executed.
4314  */
4315 static
4316 Uint32
execute_signals(thr_data * selfptr,thr_job_queue * q,thr_job_queue_head * h,thr_jb_read_state * r,Signal * sig,Uint32 max_signals)4317 execute_signals(thr_data *selfptr,
4318                 thr_job_queue *q, thr_job_queue_head *h,
4319                 thr_jb_read_state *r,
4320                 Signal *sig, Uint32 max_signals)
4321 {
4322   Uint32 num_signals;
4323   Uint32 read_index = r->m_read_index;
4324   Uint32 write_index = r->m_write_index;
4325   Uint32 read_pos = r->m_read_pos;
4326   Uint32 read_end = r->m_read_end;
4327   Uint32 *watchDogCounter = &selfptr->m_watchdog_counter;
4328 
4329   if (read_index == write_index && read_pos >= read_end)
4330     return 0;          // empty read_state
4331 
4332   thr_job_buffer *read_buffer = r->m_read_buffer;
4333 
4334   for (num_signals = 0; num_signals < max_signals; num_signals++)
4335   {
4336     while (read_pos >= read_end)
4337     {
4338       if (read_index == write_index)
4339       {
4340         /* No more available now. */
4341         return num_signals;
4342       }
4343       else
4344       {
4345         /* Move to next buffer. */
4346         read_index = (read_index + 1) % thr_job_queue::SIZE;
4347         release_buffer(g_thr_repository, selfptr->m_thr_no, read_buffer);
4348         read_buffer = q->m_buffers[read_index];
4349         read_pos = 0;
4350         read_end = read_buffer->m_len;
4351         /* Update thread-local read state. */
4352         r->m_read_index = h->m_read_index = read_index;
4353         r->m_read_buffer = read_buffer;
4354         r->m_read_pos = read_pos;
4355         r->m_read_end = read_end;
4356         /* Wakeup threads waiting for job buffers to become free */
4357         wakeup(&h->m_waiter);
4358       }
4359     }
4360 
4361     /*
4362      * These pre-fetching were found using OProfile to reduce cache misses.
4363      * (Though on Intel Core 2, they do not give much speedup, as apparently
4364      * the hardware prefetcher is already doing a fairly good job).
4365      */
4366     NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 16);
4367     NDB_PREFETCH_WRITE ((Uint32 *)&sig->header + 16);
4368 
4369 #ifdef VM_TRACE
4370     /* Find reading / propagation of junk */
4371     sig->garbage_register();
4372 #endif
4373     /* Now execute the signal. */
4374     SignalHeader* s =
4375       reinterpret_cast<SignalHeader*>(read_buffer->m_data + read_pos);
4376     Uint32 seccnt = s->m_noOfSections;
4377     Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
4378     if(siglen>16)
4379     {
4380       NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 32);
4381     }
4382     Uint32 bno = blockToMain(s->theReceiversBlockNumber);
4383     Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
4384     SimulatedBlock* block = globalData.mt_getBlock(bno, ino);
4385     assert(block != 0);
4386 
4387     Uint32 gsn = s->theVerId_signalNumber;
4388     *watchDogCounter = 1;
4389     /* Must update original buffer so signal dump will see it. */
4390     s->theSignalId = selfptr->m_signal_id_counter++;
4391     memcpy(&sig->header, s, 4*siglen);
4392     sig->m_sectionPtrI[0] = read_buffer->m_data[read_pos + siglen + 0];
4393     sig->m_sectionPtrI[1] = read_buffer->m_data[read_pos + siglen + 1];
4394     sig->m_sectionPtrI[2] = read_buffer->m_data[read_pos + siglen + 2];
4395 
4396     read_pos += siglen + seccnt;
4397 #if SIZEOF_CHARP == 8
4398     /* Handle 8-byte alignment. */
4399     read_pos = (read_pos + 1) & ~((Uint32)1);
4400 #endif
4401 
4402     /* Update just before execute so signal dump can know how far we are. */
4403     r->m_read_pos = read_pos;
4404 
4405 #ifdef VM_TRACE
4406     if (globalData.testOn)
4407     { //wl4391_todo segments
4408       SegmentedSectionPtr ptr[3];
4409       ptr[0].i = sig->m_sectionPtrI[0];
4410       ptr[1].i = sig->m_sectionPtrI[1];
4411       ptr[2].i = sig->m_sectionPtrI[2];
4412       ::getSections(seccnt, ptr);
4413       globalSignalLoggers.executeSignal(*s,
4414                                         0,
4415                                         &sig->theData[0],
4416                                         globalData.ownId,
4417                                         ptr, seccnt);
4418     }
4419 #endif
4420 
4421     block->jamBuffer()->markEndOfSigExec();
4422     block->executeFunction_async(gsn, sig);
4423   }
4424 
4425   return num_signals;
4426 }
4427 
4428 static
4429 Uint32
run_job_buffers(thr_data * selfptr,Signal * sig)4430 run_job_buffers(thr_data *selfptr, Signal *sig)
4431 {
4432   Uint32 thr_count = g_thr_repository->m_thread_count;
4433   Uint32 signal_count = 0;
4434   Uint32 signal_count_since_last_zero_time_queue = 0;
4435   Uint32 perjb = selfptr->m_max_signals_per_jb;
4436 
4437   read_jbb_state(selfptr, thr_count);
4438   /*
4439    * A load memory barrier to ensure that we see any prio A signal sent later
4440    * than loaded prio B signals.
4441    */
4442   rmb();
4443 
4444   thr_job_queue *queue = selfptr->m_in_queue;
4445   thr_job_queue_head *head = selfptr->m_in_queue_head;
4446   thr_jb_read_state *read_state = selfptr->m_read_states;
4447   for (Uint32 send_thr_no = 0; send_thr_no < thr_count;
4448        send_thr_no++,queue++,read_state++,head++)
4449   {
4450     /* Read the prio A state often, to avoid starvation of prio A. */
4451     while (!read_jba_state(selfptr))
4452     {
4453       selfptr->m_sent_local_prioa_signal = false;
4454       static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
4455       signal_count += execute_signals(selfptr,
4456                                       &(selfptr->m_jba),
4457                                       &(selfptr->m_jba_head),
4458                                       &(selfptr->m_jba_read_state), sig,
4459                                       max_prioA);
4460       if (!selfptr->m_sent_local_prioa_signal)
4461       {
4462         /**
4463          * Break out of loop if there was no prio A signals generated
4464          * from the local execution.
4465          */
4466         break;
4467       }
4468     }
4469 
4470     /**
4471      * Contended queues get an extra execute quota:
4472      *
4473      * If we didn't get a max 'perjb' quota, our out buffers
4474      * are about to fill up. This thread is thus effectively
4475      * slowed down in order to let other threads consume from
4476      * our out buffers. Eventually, when 'perjb==0', we will
4477      * have to wait/sleep for buffers to become available.
4478      *
4479      * This can bring is into a circular wait-lock, where
4480      * threads are stalled due to full out buffers. The same
4481      * thread may also have full in buffers, thus blocking other
4482      * threads from progressing. This could bring us into a
4483      * circular wait-lock, where no threads are able to progress.
4484      * The entire scheduler will then be stuck.
4485      *
4486      * We try to avoid this situation by reserving some
4487      * 'm_max_extra_signals' which are only used to consume
4488      * from 'almost full' in-buffers. We will then reduce the
4489      * risk of ending up in the above wait-lock.
4490      *
4491      * Exclude receiver threads, as there can't be a
4492      * circular wait between recv-thread and workers.
4493      */
4494     Uint32 extra = 0;
4495 
4496     if (perjb < MAX_SIGNALS_PER_JB)  //Job buffer contention
4497     {
4498       const Uint32 free = compute_free_buffers_in_queue(head);
4499       if (free <= thr_job_queue::ALMOST_FULL)
4500       {
4501         if (selfptr->m_max_extra_signals > MAX_SIGNALS_PER_JB - perjb)
4502 	{
4503           extra = MAX_SIGNALS_PER_JB - perjb;
4504         }
4505         else
4506 	{
4507           extra = selfptr->m_max_extra_signals;
4508           selfptr->m_max_exec_signals = 0; //Force recalc
4509         }
4510         selfptr->m_max_extra_signals -= extra;
4511       }
4512     }
4513 
4514 #ifdef ERROR_INSERT
4515 
4516 #define MIXOLOGY_MIX_MT_JBB 1
4517 
4518     if (unlikely(globalEmulatorData.theConfiguration->getMixologyLevel() &
4519                  MIXOLOGY_MIX_MT_JBB))
4520     {
4521       /**
4522        * Let's maximise interleaving to find inter-thread
4523        * signal order dependency bugs
4524        */
4525       perjb = 1;
4526       extra = 0;
4527     }
4528 #endif
4529 
4530     /* Now execute prio B signals from one thread. */
4531     signal_count += execute_signals(selfptr, queue, head, read_state,
4532                                     sig, perjb+extra);
4533 
4534     if (signal_count - signal_count_since_last_zero_time_queue >
4535         (MAX_SIGNALS_EXECUTED_BEFORE_ZERO_TIME_QUEUE_SCAN -
4536          MAX_SIGNALS_PER_JB))
4537     {
4538       /**
4539        * Each execution of execute_signals can at most execute 75 signals
4540        * from one node. We want to ensure that we execute no more than
4541        * 100 signals before we arrive here to get the signals from the
4542        * zero time queue. This implements the bounded delay signal
4543        * concept which is required for rate controlled activities.
4544        *
4545        * We scan the zero time queue if more than 25 signals were executed.
4546        * This means that at most 100 signals will be executed before we arrive
4547        * here again to check the bounded delay signals.
4548        */
4549       signal_count_since_last_zero_time_queue = signal_count;
4550       scan_zero_queue(selfptr);
4551     }
4552   }
4553 
4554   return signal_count;
4555 }
4556 
4557 struct thr_map_entry {
4558   enum { NULL_THR_NO = 0xFF };
4559   Uint8 thr_no;
thr_map_entrythr_map_entry4560   thr_map_entry() : thr_no(NULL_THR_NO) {}
4561 };
4562 
4563 static struct thr_map_entry thr_map[NO_OF_BLOCKS][NDBMT_MAX_BLOCK_INSTANCES];
4564 
4565 static inline Uint32
block2ThreadId(Uint32 block,Uint32 instance)4566 block2ThreadId(Uint32 block, Uint32 instance)
4567 {
4568   assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
4569   Uint32 index = block - MIN_BLOCK_NO;
4570   assert(instance < NDB_ARRAY_SIZE(thr_map[index]));
4571   const thr_map_entry& entry = thr_map[index][instance];
4572   assert(entry.thr_no < num_threads);
4573   return entry.thr_no;
4574 }
4575 
4576 void
add_thr_map(Uint32 main,Uint32 instance,Uint32 thr_no)4577 add_thr_map(Uint32 main, Uint32 instance, Uint32 thr_no)
4578 {
4579   assert(main == blockToMain(main));
4580   Uint32 index = main - MIN_BLOCK_NO;
4581   assert(index < NO_OF_BLOCKS);
4582   assert(instance < NDB_ARRAY_SIZE(thr_map[index]));
4583 
4584   SimulatedBlock* b = globalData.getBlock(main, instance);
4585   require(b != 0);
4586 
4587   /* Block number including instance. */
4588   Uint32 block = numberToBlock(main, instance);
4589 
4590   require(thr_no < num_threads);
4591   struct thr_repository* rep = g_thr_repository;
4592   struct thr_data* thr_ptr = &rep->m_thread[thr_no];
4593 
4594   /* Add to list. */
4595   {
4596     Uint32 i;
4597     for (i = 0; i < thr_ptr->m_instance_count; i++)
4598       require(thr_ptr->m_instance_list[i] != block);
4599   }
4600   require(thr_ptr->m_instance_count < MAX_INSTANCES_PER_THREAD);
4601   thr_ptr->m_instance_list[thr_ptr->m_instance_count++] = block;
4602 
4603   SimulatedBlock::ThreadContext ctx;
4604   ctx.threadId = thr_no;
4605   ctx.jamBuffer = &thr_ptr->m_jam;
4606   ctx.watchDogCounter = &thr_ptr->m_watchdog_counter;
4607   ctx.sectionPoolCache = &thr_ptr->m_sectionPoolCache;
4608   b->assignToThread(ctx);
4609 
4610   /* Create entry mapping block to thread. */
4611   thr_map_entry& entry = thr_map[index][instance];
4612   require(entry.thr_no == thr_map_entry::NULL_THR_NO);
4613   entry.thr_no = thr_no;
4614 }
4615 
4616 /* Static assignment of main instances (before first signal). */
4617 void
mt_init_thr_map()4618 mt_init_thr_map()
4619 {
4620   /* Keep mt-classic assignments in MT LQH. */
4621   const Uint32 thr_GLOBAL = 0;
4622   const Uint32 thr_LOCAL = 1;
4623 
4624   add_thr_map(BACKUP, 0, thr_LOCAL);
4625   add_thr_map(DBTC, 0, thr_GLOBAL);
4626   add_thr_map(DBDIH, 0, thr_GLOBAL);
4627   add_thr_map(DBLQH, 0, thr_LOCAL);
4628   add_thr_map(DBACC, 0, thr_LOCAL);
4629   add_thr_map(DBTUP, 0, thr_LOCAL);
4630   add_thr_map(DBDICT, 0, thr_GLOBAL);
4631   add_thr_map(NDBCNTR, 0, thr_GLOBAL);
4632   add_thr_map(QMGR, 0, thr_GLOBAL);
4633   add_thr_map(NDBFS, 0, thr_GLOBAL);
4634   add_thr_map(CMVMI, 0, thr_GLOBAL);
4635   add_thr_map(TRIX, 0, thr_GLOBAL);
4636   add_thr_map(DBUTIL, 0, thr_GLOBAL);
4637   add_thr_map(SUMA, 0, thr_LOCAL);
4638   add_thr_map(DBTUX, 0, thr_LOCAL);
4639   add_thr_map(TSMAN, 0, thr_LOCAL);
4640   add_thr_map(LGMAN, 0, thr_LOCAL);
4641   add_thr_map(PGMAN, 0, thr_LOCAL);
4642   add_thr_map(RESTORE, 0, thr_LOCAL);
4643   add_thr_map(DBINFO, 0, thr_LOCAL);
4644   add_thr_map(DBSPJ, 0, thr_GLOBAL);
4645   add_thr_map(THRMAN, 0, thr_GLOBAL);
4646   add_thr_map(TRPMAN, 0, thr_GLOBAL);
4647 }
4648 
4649 Uint32
mt_get_instance_count(Uint32 block)4650 mt_get_instance_count(Uint32 block)
4651 {
4652   switch(block){
4653   case DBLQH:
4654   case DBACC:
4655   case DBTUP:
4656   case DBTUX:
4657   case BACKUP:
4658   case RESTORE:
4659     return globalData.ndbMtLqhWorkers;
4660     break;
4661   case PGMAN:
4662     return globalData.ndbMtLqhWorkers + 1;
4663     break;
4664   case DBTC:
4665   case DBSPJ:
4666     return globalData.ndbMtTcThreads;
4667     break;
4668   case TRPMAN:
4669     return globalData.ndbMtReceiveThreads;
4670   case THRMAN:
4671     return num_threads;
4672   default:
4673     require(false);
4674   }
4675   return 0;
4676 }
4677 
4678 void
mt_add_thr_map(Uint32 block,Uint32 instance)4679 mt_add_thr_map(Uint32 block, Uint32 instance)
4680 {
4681   Uint32 num_lqh_threads = globalData.ndbMtLqhThreads;
4682   Uint32 num_tc_threads = globalData.ndbMtTcThreads;
4683 
4684   require(instance != 0);
4685   Uint32 thr_no = NUM_MAIN_THREADS;
4686   switch(block){
4687   case DBLQH:
4688   case DBACC:
4689   case DBTUP:
4690   case DBTUX:
4691   case BACKUP:
4692   case RESTORE:
4693     thr_no += (instance - 1) % num_lqh_threads;
4694     break;
4695   case PGMAN:
4696     if (instance == num_lqh_threads + 1)
4697     {
4698       // Put extra PGMAN together with it's Proxy
4699       thr_no = block2ThreadId(block, 0);
4700     }
4701     else
4702     {
4703       thr_no += (instance - 1) % num_lqh_threads;
4704     }
4705     break;
4706   case DBTC:
4707   case DBSPJ:
4708     thr_no += num_lqh_threads + (instance - 1);
4709     break;
4710   case THRMAN:
4711     thr_no = instance - 1;
4712     break;
4713   case TRPMAN:
4714     thr_no += num_lqh_threads + num_tc_threads + (instance - 1);
4715     break;
4716   default:
4717     require(false);
4718   }
4719 
4720   add_thr_map(block, instance, thr_no);
4721 }
4722 
4723 /**
4724  * create the duplicate entries needed so that
4725  *   sender doesnt need to know how many instances there
4726  *   actually are in this node...
4727  *
4728  * if only 1 instance...then duplicate that for all slots
4729  * else assume instance 0 is proxy...and duplicate workers (modulo)
4730  *
4731  * NOTE: extra pgman worker is instance 5
4732  */
4733 void
mt_finalize_thr_map()4734 mt_finalize_thr_map()
4735 {
4736   for (Uint32 b = 0; b < NO_OF_BLOCKS; b++)
4737   {
4738     Uint32 bno = b + MIN_BLOCK_NO;
4739     Uint32 cnt = 0;
4740     while (cnt < NDB_ARRAY_SIZE(thr_map[b]) &&
4741            thr_map[b][cnt].thr_no != thr_map_entry::NULL_THR_NO)
4742       cnt++;
4743 
4744     if (cnt != NDB_ARRAY_SIZE(thr_map[b]))
4745     {
4746       SimulatedBlock * main = globalData.getBlock(bno, 0);
4747       for (Uint32 i = cnt; i < NDB_ARRAY_SIZE(thr_map[b]); i++)
4748       {
4749         Uint32 dup = (cnt == 1) ? 0 : 1 + ((i - 1) % (cnt - 1));
4750         if (thr_map[b][i].thr_no == thr_map_entry::NULL_THR_NO)
4751         {
4752           thr_map[b][i] = thr_map[b][dup];
4753           main->addInstance(globalData.getBlock(bno, dup), i);
4754         }
4755         else
4756         {
4757           /**
4758            * extra pgman instance
4759            */
4760           require(bno == PGMAN);
4761         }
4762       }
4763     }
4764   }
4765 }
4766 
4767 static void
init_thread(thr_data * selfptr)4768 init_thread(thr_data *selfptr)
4769 {
4770   selfptr->m_waiter.init();
4771   selfptr->m_jam.theEmulatedJamIndex = 0;
4772   NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &selfptr->m_jam);
4773   NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
4774 
4775   unsigned thr_no = selfptr->m_thr_no;
4776   globalEmulatorData.theWatchDog->
4777     registerWatchedThread(&selfptr->m_watchdog_counter, thr_no);
4778   {
4779     while(selfptr->m_thread == 0)
4780       NdbSleep_MilliSleep(30);
4781   }
4782 
4783   THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
4784   BaseString tmp;
4785   tmp.appfmt("thr: %u ", thr_no);
4786 
4787   int tid = NdbThread_GetTid(selfptr->m_thread);
4788   if (tid != -1)
4789   {
4790     tmp.appfmt("tid: %u ", tid);
4791   }
4792 
4793   conf.appendInfo(tmp,
4794                   selfptr->m_instance_list,
4795                   selfptr->m_instance_count);
4796   int res = conf.do_bind(selfptr->m_thread,
4797                          selfptr->m_instance_list,
4798                          selfptr->m_instance_count);
4799   if (res < 0)
4800   {
4801     tmp.appfmt("err: %d ", -res);
4802   }
4803   else if (res > 0)
4804   {
4805     tmp.appfmt("OK ");
4806   }
4807   selfptr->m_realtime = conf.do_get_realtime(selfptr->m_instance_list,
4808                                              selfptr->m_instance_count);
4809   selfptr->m_spintime = conf.do_get_spintime(selfptr->m_instance_list,
4810                                              selfptr->m_instance_count);
4811 
4812   selfptr->m_thr_id = my_thread_self();
4813 
4814   for (Uint32 i = 0; i < selfptr->m_instance_count; i++)
4815   {
4816     BlockReference block = selfptr->m_instance_list[i];
4817     Uint32 main = blockToMain(block);
4818     Uint32 instance = blockToInstance(block);
4819     tmp.appfmt("%s(%u) ", getBlockName(main), instance);
4820   }
4821   printf("%s\n", tmp.c_str());
4822   fflush(stdout);
4823 }
4824 
4825 /**
4826  * Align signal buffer for better cache performance.
4827  * Also skew it a litte for each thread to avoid cache pollution.
4828  */
4829 #define SIGBUF_SIZE (sizeof(Signal) + 63 + 256 * MAX_BLOCK_THREADS)
4830 static Signal *
aligned_signal(unsigned char signal_buf[SIGBUF_SIZE],unsigned thr_no)4831 aligned_signal(unsigned char signal_buf[SIGBUF_SIZE], unsigned thr_no)
4832 {
4833   UintPtr sigtmp= (UintPtr)signal_buf;
4834   sigtmp= (sigtmp+63) & (~(UintPtr)63);
4835   sigtmp+= thr_no*256;
4836   return (Signal *)sigtmp;
4837 }
4838 
4839 /*
4840  * We only do receive in receiver thread(s), no other threads do receive.
4841  *
4842  * As part of the receive loop, we also periodically call update_connections()
4843  * (this way we are similar to single-threaded ndbd).
4844  *
4845  * The TRPMAN block (and no other blocks) run in the same thread as this
4846  * receive loop; this way we avoid races between update_connections() and
4847  * TRPMAN calls into the transporters.
4848  */
4849 
4850 /**
4851  * Array of pointers to TransporterReceiveHandleKernel
4852  *   these are not used "in traffic"
4853  */
4854 static TransporterReceiveHandleKernel *
4855   g_trp_receive_handle_ptr[MAX_NDBMT_RECEIVE_THREADS];
4856 
4857 /**
4858  * Array for mapping nodes to receiver threads and function to access it.
4859  */
4860 static NodeId g_node_to_recv_thr_map[MAX_NODES];
4861 
4862 /**
4863  * We use this method both to initialise the realtime variable
4864  * and also for updating it. Currently there is no method to
4865  * update it, but it's likely that we will soon invent one and
4866  * thus the code is prepared for this case.
4867  */
4868 static void
update_rt_config(struct thr_data * selfptr,bool & real_time,enum ThreadTypes type)4869 update_rt_config(struct thr_data *selfptr,
4870                  bool & real_time,
4871                  enum ThreadTypes type)
4872 {
4873   bool old_real_time = real_time;
4874   real_time = selfptr->m_realtime;
4875   if (old_real_time == true && real_time == false)
4876   {
4877     yield_rt_break(selfptr->m_thread,
4878                    type,
4879                    false);
4880   }
4881 }
4882 
4883 /**
4884  * We use this method both to initialise the spintime variable
4885  * and also for updating it. Currently there is no method to
4886  * update it, but it's likely that we will soon invent one and
4887  * thus the code is prepared for this case.
4888  */
4889 static void
update_spin_config(struct thr_data * selfptr,Uint64 & min_spin_timer)4890 update_spin_config(struct thr_data *selfptr,
4891                    Uint64 & min_spin_timer)
4892 {
4893   min_spin_timer = selfptr->m_spintime;
4894 }
4895 
4896 extern "C"
4897 void *
mt_receiver_thread_main(void * thr_arg)4898 mt_receiver_thread_main(void *thr_arg)
4899 {
4900   unsigned char signal_buf[SIGBUF_SIZE];
4901   Signal *signal;
4902   struct thr_repository* rep = g_thr_repository;
4903   struct thr_data* selfptr = (struct thr_data *)thr_arg;
4904   unsigned thr_no = selfptr->m_thr_no;
4905   Uint32& watchDogCounter = selfptr->m_watchdog_counter;
4906   const Uint32 recv_thread_idx = thr_no - first_receiver_thread_no;
4907   bool has_received = false;
4908   int cnt = 0;
4909   bool real_time = false;
4910   Uint64 min_spin_timer;
4911   NDB_TICKS start_spin_ticks;
4912   NDB_TICKS yield_ticks;
4913 
4914   init_thread(selfptr);
4915   signal = aligned_signal(signal_buf, thr_no);
4916   update_rt_config(selfptr, real_time, ReceiveThread);
4917   update_spin_config(selfptr, min_spin_timer);
4918 
4919   /**
4920    * Object that keeps track of our pollReceive-state
4921    */
4922   TransporterReceiveHandleKernel recvdata(thr_no, recv_thread_idx);
4923   recvdata.assign_nodes(g_node_to_recv_thr_map);
4924   globalTransporterRegistry.init(recvdata);
4925 
4926   /**
4927    * Save pointer to this for management/error-insert
4928    */
4929   g_trp_receive_handle_ptr[recv_thread_idx] = &recvdata;
4930 
4931   NdbTick_Invalidate(&start_spin_ticks);
4932   NDB_TICKS now = NdbTick_getCurrentTicks();
4933   selfptr->m_ticks = yield_ticks = now;
4934 
4935   while (globalData.theRestartFlag != perform_stop)
4936   {
4937     if (cnt == 0)
4938     {
4939       watchDogCounter = 5;
4940       globalTransporterRegistry.update_connections(recvdata);
4941     }
4942     cnt = (cnt + 1) & 15;
4943 
4944     watchDogCounter = 2;
4945 
4946     now = NdbTick_getCurrentTicks();
4947     const Uint32 lagging_timers = scan_time_queues(selfptr, now);
4948 
4949     Uint32 sum = run_job_buffers(selfptr, signal);
4950 
4951     if (sum || has_received)
4952     {
4953       watchDogCounter = 6;
4954       flush_jbb_write_state(selfptr);
4955     }
4956 
4957     const Int32 pending_send = do_send(selfptr, TRUE);
4958 
4959     watchDogCounter = 7;
4960 
4961     if (real_time)
4962     {
4963       check_real_time_break(now,
4964                             &yield_ticks,
4965                             selfptr->m_thread,
4966                             ReceiveThread);
4967     }
4968 
4969     /**
4970      * Only allow to sleep in pollReceive when:
4971      * 1) We are not lagging behind in handling timer events.
4972      * 2) No more pending sends, or no send progress.
4973      * 3) There are no 'min_spin' configured or min_spin has elapsed
4974      */
4975     Uint32 delay = 0;
4976 
4977     if (lagging_timers == 0 &&       // 1)
4978         pending_send   <= 0 &&       // 2)
4979         (min_spin_timer == 0 ||      // 3)
4980          check_yield(now,
4981                      &start_spin_ticks,
4982                      min_spin_timer)))
4983     {
4984       delay = 1; // 1ms
4985     }
4986 
4987     has_received = false;
4988     if (globalTransporterRegistry.pollReceive(delay, recvdata))
4989     {
4990       watchDogCounter = 8;
4991       lock(&rep->m_receive_lock[recv_thread_idx]);
4992       const bool buffersFull = (globalTransporterRegistry.performReceive(recvdata) != 0);
4993       unlock(&rep->m_receive_lock[recv_thread_idx]);
4994       has_received = true;
4995 
4996       if (buffersFull)       /* Receive queues(s) are full */
4997       {
4998         thr_data* waitthr = get_congested_recv_queue(rep, recv_thread_idx);
4999         if (waitthr != NULL) /* Will wait for buffers to be freed */
5000         {
5001           /**
5002            * Wait for thread 'waitthr' to consume some of the
5003            * pending signals in m_in_queue previously received
5004            * from this receive thread, 'thr_no'.
5005            * Will recheck queue status with 'check_recv_queue' after latch
5006            * has been set, and *before* going to sleep.
5007            */
5008           const Uint32 nano_wait = 1000*1000;    /* -> 1 ms */
5009           thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no;
5010 
5011           const bool waited = yield(&wait_queue->m_waiter,
5012                                     nano_wait,
5013                                     check_recv_queue,
5014                                     wait_queue);
5015           (void)waited;
5016         }
5017       }
5018     }
5019     selfptr->m_stat.m_loop_cnt++;
5020     selfptr->m_stat.m_exec_cnt += sum;
5021   }
5022 
5023   globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
5024   return NULL;                  // Return value not currently used
5025 }
5026 
5027 static
5028 inline
5029 void
sendpacked(struct thr_data * thr_ptr,Signal * signal)5030 sendpacked(struct thr_data* thr_ptr, Signal* signal)
5031 {
5032   Uint32 i;
5033   for (i = 0; i < thr_ptr->m_instance_count; i++)
5034   {
5035     BlockReference block = thr_ptr->m_instance_list[i];
5036     Uint32 main = blockToMain(block);
5037     Uint32 instance = blockToInstance(block);
5038     SimulatedBlock* b = globalData.getBlock(main, instance);
5039     // wl4391_todo remove useless assert
5040     assert(b != 0 && b->getThreadId() == thr_ptr->m_thr_no);
5041     /* b->send_at_job_buffer_end(); */
5042     b->executeFunction_async(GSN_SEND_PACKED, signal);
5043   }
5044 }
5045 
5046 /**
5047  * Callback function used by yield() to recheck
5048  * 'job queue full' condition before going to sleep.
5049  *
5050  * Check if the specified 'thr_job_queue_head' (arg)
5051  * is still full, return true if so.
5052  */
5053 static bool
check_congested_job_queue(thr_job_queue_head * waitfor)5054 check_congested_job_queue(thr_job_queue_head *waitfor)
5055 {
5056   return (compute_free_buffers_in_queue(waitfor) <= thr_job_queue::RESERVED);
5057 }
5058 
5059 /**
5060  * Check if any out-queues of selfptr is full.
5061  * If full: Return 'Thr_data*' for (one of) the thread(s)
5062  *          which we have to wait for. (to consume from queue)
5063  */
5064 static struct thr_data*
get_congested_job_queue(const thr_data * selfptr)5065 get_congested_job_queue(const thr_data *selfptr)
5066 {
5067   const Uint32 thr_no = selfptr->m_thr_no;
5068   struct thr_repository* rep = g_thr_repository;
5069   struct thr_data *thrptr = rep->m_thread;
5070   struct thr_data *waitfor = NULL;
5071 
5072   for (unsigned i = 0; i<num_threads; i++, thrptr++)
5073   {
5074     thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
5075 
5076     if (compute_free_buffers_in_queue(q_head) <= thr_job_queue::RESERVED)
5077     {
5078       if (thrptr != selfptr)  // Don't wait on myself (yet)
5079         return thrptr;
5080       else
5081         waitfor = thrptr;
5082     }
5083   }
5084   return waitfor;             // Possibly 'thrptr == selfptr'
5085 }
5086 
5087 /**
5088  * has_full_in_queues()
5089  *
5090  * Avoid circular waits between block-threads:
5091  * A thread is not allowed to sleep due to full
5092  * 'out' job-buffers if there are other threads
5093  * already having full 'in' job buffers sent to
5094  * this thread.
5095  *
5096  * run_job_buffers() has reserved a 'm_max_extra_signals'
5097  * quota which will be used to drain these 'full_in_queues'.
5098  * So we should allow it to be.
5099  *
5100  * Returns 'true' if any in-queues to this thread are full
5101  */
5102 static
5103 bool
has_full_in_queues(struct thr_data * selfptr)5104 has_full_in_queues(struct thr_data* selfptr)
5105 {
5106   thr_job_queue_head *head = selfptr->m_in_queue_head;
5107 
5108   for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++, head++)
5109   {
5110     if (compute_free_buffers_in_queue(head) <= thr_job_queue::RESERVED)
5111     {
5112       return true;
5113     }
5114   }
5115   return false;
5116 }
5117 
5118 /**
5119  * update_sched_config
5120  *
5121  *   In order to prevent "job-buffer-full", i.e
5122  *     that one thread(T1) produces so much signals to another thread(T2)
5123  *     so that the ring-buffer from T1 to T2 gets full
5124  *     the main loop have 2 "config" variables
5125  *   - m_max_exec_signals
5126  *     This is the *total* no of signals T1 can execute before calling
5127  *     this method again
5128  *   - m_max_signals_per_jb
5129  *     This is the max no of signals T1 can execute from each other thread
5130  *     in system
5131  *
5132  *   Assumption: each signal may send *at most* 4 signals
5133  *     - this assumption is made the same in ndbd and ndbmtd and is
5134  *       mostly followed by block-code, although not in all places :-(
5135  *
5136  *   This function return true, if it it slept
5137  *     (i.e that it concluded that it could not execute *any* signals, wo/
5138  *      risking job-buffer-full)
5139  */
5140 static
5141 bool
update_sched_config(struct thr_data * selfptr,Int32 pending_send)5142 update_sched_config(struct thr_data* selfptr, Int32 pending_send)
5143 {
5144   Uint32 sleeploop = 0;
5145   Uint32 thr_no = selfptr->m_thr_no;
5146 loop:
5147   Uint32 minfree = compute_min_free_out_buffers(thr_no);
5148   Uint32 reserved = (minfree > thr_job_queue::RESERVED)
5149                    ? thr_job_queue::RESERVED
5150                    : minfree;
5151 
5152   Uint32 avail = compute_max_signals_to_execute(minfree - reserved);
5153   Uint32 perjb = (avail + g_thr_repository->m_thread_count - 1) /
5154                   g_thr_repository->m_thread_count;
5155 
5156   if (perjb > MAX_SIGNALS_PER_JB)
5157     perjb = MAX_SIGNALS_PER_JB;
5158 
5159   selfptr->m_max_exec_signals = avail;
5160   selfptr->m_max_signals_per_jb = perjb;
5161   selfptr->m_max_extra_signals = compute_max_signals_to_execute(reserved);
5162 
5163   if (unlikely(perjb == 0))
5164   {
5165     if (sleeploop == 10)
5166     {
5167       /**
5168        * we've slept for 10ms...try running anyway
5169        */
5170       selfptr->m_max_signals_per_jb = 1;
5171       ndbout_c("thr_no:%u - sleeploop 10!! "
5172                "(Worker thread blocked (>= 10ms) by slow consumer threads)",
5173                selfptr->m_thr_no);
5174       return true;
5175     }
5176 
5177     struct thr_data* waitthr = get_congested_job_queue(selfptr);
5178     if (waitthr == NULL)                 // Waiters resolved
5179     {
5180       goto loop;
5181     }
5182     else if (has_full_in_queues(selfptr) &&
5183              selfptr->m_max_extra_signals > 0)
5184     {
5185       /* 'extra_signals' used to drain 'full_in_queues'. */
5186       return sleeploop > 0;
5187     }
5188 
5189     if (pending_send)
5190     {
5191       /* About to sleep, _must_ send now. */
5192       pending_send = do_send(selfptr, TRUE);
5193     }
5194 
5195     /**
5196      * Wait for thread 'waitthr' to consume some of the
5197      * pending signals in m_in_queue[].
5198      * Will recheck queue status with 'check_recv_queue'
5199      * after latch has been set, and *before* going to sleep.
5200      */
5201     const Uint32 nano_wait = 1000*1000;    /* -> 1 ms */
5202     thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no;
5203 
5204     const bool waited = yield(&wait_queue->m_waiter,
5205                               nano_wait,
5206                               check_congested_job_queue,
5207                               wait_queue);
5208     if (waited)
5209     {
5210       sleeploop++;
5211     }
5212     goto loop;
5213   }
5214 
5215   return sleeploop > 0;
5216 }
5217 
5218 extern "C"
5219 void *
mt_job_thread_main(void * thr_arg)5220 mt_job_thread_main(void *thr_arg)
5221 {
5222   unsigned char signal_buf[SIGBUF_SIZE];
5223   Signal *signal;
5224 
5225   struct thr_data* selfptr = (struct thr_data *)thr_arg;
5226   init_thread(selfptr);
5227   Uint32& watchDogCounter = selfptr->m_watchdog_counter;
5228 
5229   unsigned thr_no = selfptr->m_thr_no;
5230   signal = aligned_signal(signal_buf, thr_no);
5231 
5232   /* Avoid false watchdog alarms caused by race condition. */
5233   watchDogCounter = 1;
5234 
5235   Int32 pending_send = 0;
5236   Uint32 send_sum = 0;
5237   Uint32 loops = 0;
5238   Uint32 maxloops = 10;/* Loops before reading clock, fuzzy adapted to 1ms freq. */
5239   Uint32 waits = 0;
5240 
5241   NDB_TICKS start_spin_ticks;
5242   NDB_TICKS yield_ticks;
5243 
5244   Uint64 min_spin_timer;
5245   bool real_time = false;
5246 
5247   update_rt_config(selfptr, real_time, BlockThread);
5248   update_spin_config(selfptr, min_spin_timer);
5249 
5250   NdbTick_Invalidate(&start_spin_ticks);
5251   NDB_TICKS now = NdbTick_getCurrentTicks();
5252   selfptr->m_ticks = start_spin_ticks = yield_ticks = now;
5253 
5254   while (globalData.theRestartFlag != perform_stop)
5255   {
5256     loops++;
5257 
5258     /**
5259      * prefill our thread local send buffers
5260      *   up to THR_SEND_BUFFER_PRE_ALLOC (1Mb)
5261      *
5262      * and if this doesnt work pack buffers before start to execute signals
5263      */
5264     watchDogCounter = 11;
5265     if (!selfptr->m_send_buffer_pool.fill(g_thr_repository->m_mm,
5266                                           RG_TRANSPORTER_BUFFERS,
5267                                           THR_SEND_BUFFER_PRE_ALLOC))
5268     {
5269       try_pack_send_buffers(selfptr);
5270     }
5271 
5272     watchDogCounter = 2;
5273     const Uint32 lagging_timers = scan_time_queues(selfptr, now);
5274 
5275     Uint32 sum = run_job_buffers(selfptr, signal);
5276 
5277     watchDogCounter = 1;
5278     signal->header.m_noOfSections = 0; /* valgrind */
5279     sendpacked(selfptr, signal);
5280 
5281     if (sum)
5282     {
5283       watchDogCounter = 6;
5284       flush_jbb_write_state(selfptr);
5285       send_sum += sum;
5286       NdbTick_Invalidate(&start_spin_ticks);
5287 
5288       if (send_sum > MAX_SIGNALS_BEFORE_SEND)
5289       {
5290         /* Try to send, but skip for now in case of lock contention. */
5291         pending_send = do_send(selfptr, FALSE);
5292         send_sum = 0;
5293       }
5294       else
5295       {
5296         /* Send buffers append to send queues to dst. nodes. */
5297         do_flush(selfptr);
5298       }
5299     }
5300     /**
5301      * Scheduler is not allowed to yield until its internal
5302      * time has caught up on real time.
5303      */
5304     else if (lagging_timers == 0)
5305     {
5306       /* No signals processed, prepare to sleep to wait for more */
5307       if (send_sum > 0 || pending_send != 0)
5308       {
5309         /* About to sleep, _must_ send now. */
5310         pending_send = do_send(selfptr, TRUE);
5311         send_sum = 0;
5312       }
5313 
5314       /**
5315        * No more incoming signals to process yet, and we have
5316        * either completed all pending sends, or had no progress
5317        * due to full transporters in last do_send(). Wait for
5318        * more signals, use a shorter timeout if pending_send.
5319        */
5320       if (pending_send <= 0) /* Nothing pending, or no progress made */
5321       {
5322         if (min_spin_timer == 0 ||
5323             check_yield(now,
5324                         &start_spin_ticks,
5325                         min_spin_timer))
5326         {
5327           /**
5328            * Sleep, either a short nap if send failed due to send overload,
5329            * or a longer sleep if there are no more work waiting.
5330            */
5331           const Uint32 maxwait = (pending_send)
5332                                     ?  1 * 1000000   // Retry busy send after 1ms
5333                                     : 10 * 1000000;  // No more work -> 10ms
5334 
5335           bool waited = yield(&selfptr->m_waiter,
5336                               maxwait,
5337                               check_queues_empty,
5338                               selfptr);
5339           if (waited)
5340           {
5341             waits++;
5342             /* Update current time after sleeping */
5343             now = NdbTick_getCurrentTicks();
5344             yield_ticks = now;
5345             NdbTick_Invalidate(&start_spin_ticks);
5346             selfptr->m_stat.m_wait_cnt += waits;
5347             selfptr->m_stat.m_loop_cnt += loops;
5348             waits = loops = 0;
5349           }
5350         }
5351       }
5352     }
5353 
5354     /**
5355      * Check if we executed enough signals,
5356      *   and if so recompute how many signals to execute
5357      */
5358     if (sum >= selfptr->m_max_exec_signals)
5359     {
5360       if (update_sched_config(selfptr, send_sum + abs(pending_send)))
5361       {
5362         /* Update current time after sleeping */
5363         now = NdbTick_getCurrentTicks();
5364         selfptr->m_stat.m_wait_cnt += waits;
5365         selfptr->m_stat.m_loop_cnt += loops;
5366         waits = loops = 0;
5367         NdbTick_Invalidate(&start_spin_ticks);
5368         update_rt_config(selfptr, real_time, BlockThread);
5369         update_spin_config(selfptr, min_spin_timer);
5370       }
5371     }
5372     else
5373     {
5374       selfptr->m_max_exec_signals -= sum;
5375     }
5376 
5377     /**
5378      * Adaptive reading freq. of systeme time every time 1ms
5379      * is likely to have passed
5380      */
5381     if (loops > maxloops)
5382     {
5383       now = NdbTick_getCurrentTicks();
5384       if (real_time)
5385       {
5386         check_real_time_break(now,
5387                               &yield_ticks,
5388                               selfptr->m_thread,
5389                               BlockThread);
5390       }
5391       const Uint64 diff = NdbTick_Elapsed(selfptr->m_ticks, now).milliSec();
5392 
5393       /* Adjust 'maxloop' to achieve clock reading frequency of 1ms */
5394       if (diff < 1)
5395         maxloops += ((maxloops/10) + 1); /* No change: less frequent reading */
5396       else if (diff > 1 && maxloops > 1)
5397         maxloops -= ((maxloops/10) + 1); /* Overslept: Need more frequent read*/
5398 
5399       selfptr->m_stat.m_wait_cnt += waits;
5400       selfptr->m_stat.m_loop_cnt += loops;
5401       waits = loops = 0;
5402     }
5403     selfptr->m_stat.m_exec_cnt += sum;
5404   }
5405 
5406   globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
5407   return NULL;                  // Return value not currently used
5408 }
5409 
5410 /**
5411  * Get number of pending signals at B-level in our own thread. Used
5412  * to make some decisions in rate-critical parts of the data node.
5413  */
5414 Uint32
mt_getSignalsInJBB(Uint32 self)5415 mt_getSignalsInJBB(Uint32 self)
5416 {
5417   Uint32 pending_signals = 0;
5418   struct thr_repository* rep = g_thr_repository;
5419   struct thr_data *selfptr = &rep->m_thread[self];
5420   for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++)
5421   {
5422     thr_jb_write_state *w = selfptr->m_write_states + thr_no;
5423     pending_signals += w->get_pending_signals();
5424   }
5425   return pending_signals;
5426 }
5427 
5428 void
sendlocal(Uint32 self,const SignalHeader * s,const Uint32 * data,const Uint32 secPtr[3])5429 sendlocal(Uint32 self, const SignalHeader *s, const Uint32 *data,
5430           const Uint32 secPtr[3])
5431 {
5432   Uint32 block = blockToMain(s->theReceiversBlockNumber);
5433   Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
5434 
5435   /*
5436    * Max number of signals to put into job buffer before flushing the buffer
5437    * to the other thread.
5438    * This parameter found to be reasonable by benchmarking.
5439    */
5440   Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self >= first_receiver_thread_no) ?
5441     MAX_SIGNALS_BEFORE_FLUSH_RECEIVER :
5442     MAX_SIGNALS_BEFORE_FLUSH_OTHER;
5443 
5444   Uint32 dst = block2ThreadId(block, instance);
5445   struct thr_repository* rep = g_thr_repository;
5446   struct thr_data *selfptr = &rep->m_thread[self];
5447   assert(my_thread_equal(selfptr->m_thr_id, my_thread_self()));
5448   struct thr_data *dstptr = &rep->m_thread[dst];
5449 
5450   selfptr->m_stat.m_priob_count++;
5451   Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
5452   selfptr->m_stat.m_priob_size += siglen;
5453 
5454   thr_job_queue *q = dstptr->m_in_queue + self;
5455   thr_job_queue_head *h = dstptr->m_in_queue_head + self;
5456   thr_jb_write_state *w = selfptr->m_write_states + dst;
5457   if (insert_signal(q, h, w, false, s, data, secPtr, selfptr->m_next_buffer))
5458   {
5459     selfptr->m_next_buffer = seize_buffer(rep, self, false);
5460   }
5461   if (w->get_pending_signals() >= MAX_SIGNALS_BEFORE_FLUSH)
5462   {
5463     flush_write_state(selfptr, dstptr, h, w);
5464   }
5465 }
5466 
5467 void
sendprioa(Uint32 self,const SignalHeader * s,const uint32 * data,const Uint32 secPtr[3])5468 sendprioa(Uint32 self, const SignalHeader *s, const uint32 *data,
5469           const Uint32 secPtr[3])
5470 {
5471   Uint32 block = blockToMain(s->theReceiversBlockNumber);
5472   Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
5473 
5474   Uint32 dst = block2ThreadId(block, instance);
5475   struct thr_repository* rep = g_thr_repository;
5476   struct thr_data *selfptr = &rep->m_thread[self];
5477   assert(s->theVerId_signalNumber == GSN_START_ORD ||
5478          my_thread_equal(selfptr->m_thr_id, my_thread_self()));
5479   struct thr_data *dstptr = &rep->m_thread[dst];
5480 
5481   selfptr->m_stat.m_prioa_count++;
5482   Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
5483   selfptr->m_stat.m_prioa_size += siglen;
5484 
5485   thr_job_queue *q = &(dstptr->m_jba);
5486   thr_job_queue_head *h = &(dstptr->m_jba_head);
5487   thr_jb_write_state w;
5488 
5489   if (selfptr == dstptr)
5490   {
5491     /**
5492      * Indicate that we sent Prio A signal to ourself.
5493      */
5494     selfptr->m_sent_local_prioa_signal = true;
5495   }
5496 
5497   lock(&dstptr->m_jba_write_lock);
5498 
5499   Uint32 index = h->m_write_index;
5500   w.m_write_index = index;
5501   thr_job_buffer *buffer = q->m_buffers[index];
5502   w.m_write_buffer = buffer;
5503   w.m_write_pos = buffer->m_len;
5504   bool buf_used = insert_signal(q, h, &w, true, s, data, secPtr,
5505                                 selfptr->m_next_buffer);
5506   flush_write_state(selfptr, dstptr, h, &w);
5507 
5508   unlock(&dstptr->m_jba_write_lock);
5509   if (w.has_any_pending_signals())
5510   {
5511     wakeup(&(dstptr->m_waiter));
5512   }
5513   if (buf_used)
5514     selfptr->m_next_buffer = seize_buffer(rep, self, true);
5515 }
5516 
5517 /**
5518  * Send a signal to a remote node.
5519  *
5520  * (The signal is only queued here, and actually sent later in do_send()).
5521  */
5522 SendStatus
mt_send_remote(Uint32 self,const SignalHeader * sh,Uint8 prio,const Uint32 * data,NodeId nodeId,const LinearSectionPtr ptr[3])5523 mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
5524                const Uint32 * data, NodeId nodeId,
5525                const LinearSectionPtr ptr[3])
5526 {
5527   thr_repository *rep = g_thr_repository;
5528   struct thr_data *selfptr = &rep->m_thread[self];
5529   SendStatus ss;
5530 
5531   mt_send_handle handle(selfptr);
5532   register_pending_send(selfptr, nodeId);
5533   /* prepareSend() is lock-free, as we have per-thread send buffers. */
5534   ss = globalTransporterRegistry.prepareSend(&handle,
5535                                              sh, prio, data, nodeId, ptr);
5536   return ss;
5537 }
5538 
5539 SendStatus
mt_send_remote(Uint32 self,const SignalHeader * sh,Uint8 prio,const Uint32 * data,NodeId nodeId,class SectionSegmentPool * thePool,const SegmentedSectionPtr ptr[3])5540 mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
5541                const Uint32 *data, NodeId nodeId,
5542                class SectionSegmentPool *thePool,
5543                const SegmentedSectionPtr ptr[3])
5544 {
5545   thr_repository *rep = g_thr_repository;
5546   struct thr_data *selfptr = &rep->m_thread[self];
5547   SendStatus ss;
5548 
5549   mt_send_handle handle(selfptr);
5550   register_pending_send(selfptr, nodeId);
5551   ss = globalTransporterRegistry.prepareSend(&handle,
5552                                              sh, prio, data, nodeId,
5553                                              *thePool, ptr);
5554   return ss;
5555 }
5556 
5557 /*
5558  * This functions sends a prio A STOP_FOR_CRASH signal to a thread.
5559  *
5560  * It works when called from any other thread, not just from job processing
5561  * threads. But note that this signal will be the last signal to be executed by
5562  * the other thread, as it will exit immediately.
5563  */
5564 static
5565 void
sendprioa_STOP_FOR_CRASH(const struct thr_data * selfptr,Uint32 dst)5566 sendprioa_STOP_FOR_CRASH(const struct thr_data *selfptr, Uint32 dst)
5567 {
5568   SignalT<StopForCrash::SignalLength> signalT;
5569   struct thr_repository* rep = g_thr_repository;
5570   /* As this signal will be the last one executed by the other thread, it does
5571      not matter which buffer we use in case the current buffer is filled up by
5572      the STOP_FOR_CRASH signal; the data in it will never be read.
5573   */
5574   static thr_job_buffer dummy_buffer;
5575 
5576   /**
5577    * Pick any instance running in this thread
5578    */
5579   struct thr_data *dstptr = &rep->m_thread[dst];
5580   Uint32 bno = dstptr->m_instance_list[0];
5581 
5582   memset(&signalT.header, 0, sizeof(SignalHeader));
5583   signalT.header.theVerId_signalNumber   = GSN_STOP_FOR_CRASH;
5584   signalT.header.theReceiversBlockNumber = bno;
5585   signalT.header.theSendersBlockRef      = 0;
5586   signalT.header.theTrace                = 0;
5587   signalT.header.theSendersSignalId      = 0;
5588   signalT.header.theSignalId             = 0;
5589   signalT.header.theLength               = StopForCrash::SignalLength;
5590   StopForCrash * stopForCrash = CAST_PTR(StopForCrash, &signalT.theData[0]);
5591   stopForCrash->flags = 0;
5592 
5593   thr_job_queue *q = &(dstptr->m_jba);
5594   thr_job_queue_head *h = &(dstptr->m_jba_head);
5595   thr_jb_write_state w;
5596 
5597   lock(&dstptr->m_jba_write_lock);
5598 
5599   Uint32 index = h->m_write_index;
5600   w.m_write_index = index;
5601   thr_job_buffer *buffer = q->m_buffers[index];
5602   w.m_write_buffer = buffer;
5603   w.m_write_pos = buffer->m_len;
5604   insert_signal(q, h, &w, true, &signalT.header, signalT.theData, NULL,
5605                 &dummy_buffer);
5606   flush_write_state(selfptr, dstptr, h, &w);
5607 
5608   unlock(&dstptr->m_jba_write_lock);
5609   if (w.has_any_pending_signals())
5610   {
5611     wakeup(&(dstptr->m_waiter));
5612   }
5613 }
5614 
5615 /**
5616  * Identify type of thread.
5617  * Based on assumption that threads are allocated in the order:
5618  *  main, ldm, tc, recv, send
5619  */
5620 static bool
is_main_thread(unsigned thr_no)5621 is_main_thread(unsigned thr_no)
5622 {
5623   return thr_no < NUM_MAIN_THREADS;
5624 }
5625 
5626 static bool
is_ldm_thread(unsigned thr_no)5627 is_ldm_thread(unsigned thr_no)
5628 {
5629   return thr_no >= NUM_MAIN_THREADS &&
5630          thr_no <  NUM_MAIN_THREADS+globalData.ndbMtLqhThreads;
5631 }
5632 
5633 /**
5634  * All LDM threads are not created equal:
5635  * First LDMs BACKUP-thread act as client during BACKUP
5636  * (See usage of Backup::UserBackupInstanceKey)
5637  */
5638 static bool
is_first_ldm_thread(unsigned thr_no)5639 is_first_ldm_thread(unsigned thr_no)
5640 {
5641   return thr_no == NUM_MAIN_THREADS;
5642 }
5643 
5644 static bool
is_tc_thread(unsigned thr_no)5645 is_tc_thread(unsigned thr_no)
5646 {
5647   unsigned tc_base = NUM_MAIN_THREADS+globalData.ndbMtLqhThreads;
5648   return thr_no >= tc_base &&
5649          thr_no <  tc_base+globalData.ndbMtTcThreads;
5650 }
5651 
5652 static bool
is_recv_thread(unsigned thr_no)5653 is_recv_thread(unsigned thr_no)
5654 {
5655   unsigned recv_base = NUM_MAIN_THREADS+globalData.ndbMtLqhThreads+globalData.ndbMtTcThreads;
5656   return thr_no >= recv_base &&
5657          thr_no <  recv_base+globalData.ndbMtReceiveThreads;
5658 }
5659 
5660 /**
5661  * Implements the rules for which threads are allowed to have
5662  * communication with each other.
5663  * Also see compute_jb_pages() which has similar logic.
5664  */
5665 static bool
may_communicate(unsigned from,unsigned to)5666 may_communicate(unsigned from, unsigned to)
5667 {
5668   if (is_main_thread(from))
5669   {
5670     // Main threads communicates with all other threads
5671     return true;
5672   }
5673   else if (is_ldm_thread(from))
5674   {
5675     // First LDM is special as it may act as internal client
5676     // during backup, and thus communicate with other LDMs:
5677     if (is_first_ldm_thread(from) && is_ldm_thread(to))
5678       return true;
5679 
5680     // All LDM threads can communicates with TC-, main-
5681     // itself, and the BACKUP client (above)
5682     return is_main_thread(to) ||
5683            is_tc_thread(to)   ||
5684            is_first_ldm_thread(to) ||
5685            (to == from);
5686   }
5687   else if (is_tc_thread(from))
5688   {
5689     // TC threads can communicate with SPJ-, LQH-, main- and itself
5690     return is_main_thread(to) ||
5691            is_ldm_thread(to)  ||
5692            is_tc_thread(to);      // Cover both SPJs and itself
5693   }
5694   else
5695   {
5696     assert(is_recv_thread(from));
5697     // Receive treads communicate with all, except other receivers
5698     return !is_recv_thread(to);
5699   }
5700 }
5701 
5702 /**
5703  * init functions
5704  */
5705 static
5706 void
queue_init(struct thr_tq * tq)5707 queue_init(struct thr_tq* tq)
5708 {
5709   tq->m_next_timer = 0;
5710   tq->m_current_time = 0;
5711   tq->m_next_free = RNIL;
5712   tq->m_cnt[0] = tq->m_cnt[1] = tq->m_cnt[2] = 0;
5713   bzero(tq->m_delayed_signals, sizeof(tq->m_delayed_signals));
5714 }
5715 
5716 static
5717 void
thr_init(struct thr_repository * rep,struct thr_data * selfptr,unsigned int cnt,unsigned thr_no)5718 thr_init(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
5719          unsigned thr_no)
5720 {
5721   Uint32 i;
5722 
5723   selfptr->m_thr_no = thr_no;
5724   selfptr->m_max_signals_per_jb = MAX_SIGNALS_PER_JB;
5725   selfptr->m_max_exec_signals = 0;
5726   selfptr->m_max_extra_signals = 0;
5727   selfptr->m_first_free = 0;
5728   selfptr->m_first_unused = 0;
5729 
5730   {
5731     char buf[100];
5732     BaseString::snprintf(buf, sizeof(buf), "jbalock thr: %u", thr_no);
5733     register_lock(&selfptr->m_jba_write_lock, buf);
5734   }
5735   selfptr->m_jba_head.m_read_index = 0;
5736   selfptr->m_jba_head.m_write_index = 0;
5737   thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
5738   selfptr->m_jba.m_buffers[0] = buffer;
5739   selfptr->m_jba_read_state.m_read_index = 0;
5740   selfptr->m_jba_read_state.m_read_buffer = buffer;
5741   selfptr->m_jba_read_state.m_read_pos = 0;
5742   selfptr->m_jba_read_state.m_read_end = 0;
5743   selfptr->m_jba_read_state.m_write_index = 0;
5744   selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
5745   selfptr->m_send_buffer_pool.set_pool(&rep->m_sb_pool);
5746 
5747   for (i = 0; i<cnt; i++)
5748   {
5749     selfptr->m_in_queue_head[i].m_waiter.init();
5750     selfptr->m_in_queue_head[i].m_read_index = 0;
5751     selfptr->m_in_queue_head[i].m_write_index = 0;
5752     buffer = may_communicate(i,thr_no)
5753               ? seize_buffer(rep, thr_no, false) : NULL;
5754     selfptr->m_in_queue[i].m_buffers[0] = buffer;
5755     selfptr->m_read_states[i].m_read_index = 0;
5756     selfptr->m_read_states[i].m_read_buffer = buffer;
5757     selfptr->m_read_states[i].m_read_pos = 0;
5758     selfptr->m_read_states[i].m_read_end = 0;
5759     selfptr->m_read_states[i].m_write_index = 0;
5760   }
5761   queue_init(&selfptr->m_tq);
5762 
5763   bzero(&selfptr->m_stat, sizeof(selfptr->m_stat));
5764 
5765   selfptr->m_pending_send_count = 0;
5766   selfptr->m_pending_send_mask.clear();
5767 
5768   selfptr->m_instance_count = 0;
5769   for (i = 0; i < MAX_INSTANCES_PER_THREAD; i++)
5770     selfptr->m_instance_list[i] = 0;
5771 
5772   bzero(&selfptr->m_send_buffers, sizeof(selfptr->m_send_buffers));
5773 
5774   selfptr->m_thread = 0;
5775   selfptr->m_cpu = NO_LOCK_CPU;
5776 }
5777 
5778 /* Have to do this after init of all m_in_queues is done. */
5779 static
5780 void
thr_init2(struct thr_repository * rep,struct thr_data * selfptr,unsigned int cnt,unsigned thr_no)5781 thr_init2(struct thr_repository* rep, struct thr_data *selfptr,
5782           unsigned int cnt, unsigned thr_no)
5783 {
5784   for (Uint32 i = 0; i<cnt; i++)
5785   {
5786     selfptr->m_write_states[i].m_write_index = 0;
5787     selfptr->m_write_states[i].m_write_pos = 0;
5788     selfptr->m_write_states[i].m_write_buffer =
5789       rep->m_thread[i].m_in_queue[thr_no].m_buffers[0];
5790     selfptr->m_write_states[i].init_pending_signals();
5791   }
5792 }
5793 
5794 static
5795 void
receive_lock_init(Uint32 recv_thread_id,thr_repository * rep)5796 receive_lock_init(Uint32 recv_thread_id, thr_repository *rep)
5797 {
5798   char buf[100];
5799   BaseString::snprintf(buf, sizeof(buf), "receive lock thread id %d",
5800                        recv_thread_id);
5801   register_lock(&rep->m_receive_lock[recv_thread_id], buf);
5802 }
5803 
5804 static
5805 void
send_buffer_init(Uint32 node,thr_repository::send_buffer * sb)5806 send_buffer_init(Uint32 node, thr_repository::send_buffer * sb)
5807 {
5808   char buf[100];
5809   BaseString::snprintf(buf, sizeof(buf), "send lock node %d", node);
5810   register_lock(&sb->m_send_lock, buf);
5811   BaseString::snprintf(buf, sizeof(buf), "send_buffer lock node %d", node);
5812   register_lock(&sb->m_buffer_lock, buf);
5813   sb->m_force_send = 0;
5814   sb->m_bytes_sent = 0;
5815   sb->m_send_thread = NO_SEND_THREAD;
5816   bzero(&sb->m_buffer, sizeof(sb->m_buffer));
5817   bzero(&sb->m_sending, sizeof(sb->m_sending));
5818   bzero(sb->m_read_index, sizeof(sb->m_read_index));
5819 }
5820 
5821 static
5822 void
rep_init(struct thr_repository * rep,unsigned int cnt,Ndbd_mem_manager * mm)5823 rep_init(struct thr_repository* rep, unsigned int cnt, Ndbd_mem_manager *mm)
5824 {
5825   rep->m_mm = mm;
5826 
5827   rep->m_thread_count = cnt;
5828   for (unsigned int i = 0; i<cnt; i++)
5829   {
5830     thr_init(rep, &rep->m_thread[i], cnt, i);
5831   }
5832   for (unsigned int i = 0; i<cnt; i++)
5833   {
5834     thr_init2(rep, &rep->m_thread[i], cnt, i);
5835   }
5836 
5837   rep->stopped_threads = 0;
5838   NdbMutex_Init(&rep->stop_for_crash_mutex);
5839   NdbCondition_Init(&rep->stop_for_crash_cond);
5840 
5841   for (Uint32 i = 0; i < NDB_ARRAY_SIZE(rep->m_receive_lock); i++)
5842   {
5843     receive_lock_init(i, rep);
5844   }
5845   for (int i = 0 ; i < MAX_NTRANSPORTERS; i++)
5846   {
5847     send_buffer_init(i, rep->m_send_buffers+i);
5848   }
5849 
5850   bzero(rep->m_thread_send_buffers, sizeof(rep->m_thread_send_buffers));
5851 }
5852 
5853 
5854 /**
5855  * Thread Config
5856  */
5857 
5858 static Uint32
get_total_number_of_block_threads(void)5859 get_total_number_of_block_threads(void)
5860 {
5861   return (NUM_MAIN_THREADS +
5862           globalData.ndbMtLqhThreads +
5863           globalData.ndbMtTcThreads +
5864           globalData.ndbMtReceiveThreads);
5865 }
5866 
5867 static Uint32
get_num_nodes()5868 get_num_nodes()
5869 {
5870   Uint32 count = 0;
5871   for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
5872   {
5873     if (globalTransporterRegistry.get_transporter(nodeId))
5874     {
5875       count++;
5876     }
5877   }
5878   return count;
5879 }
5880 
5881 /**
5882  * This function returns the amount of extra send buffer pages
5883  * that we should allocate in addition to the amount allocated
5884  * for each node send buffer.
5885  */
5886 #define MIN_SEND_BUFFER_GENERAL (512) //16M
5887 #define MIN_SEND_BUFFER_PER_NODE (8) //256k
5888 #define MIN_SEND_BUFFER_PER_THREAD (64) //2M
5889 
5890 Uint32
mt_get_extra_send_buffer_pages(Uint32 curr_num_pages,Uint32 extra_mem_pages)5891 mt_get_extra_send_buffer_pages(Uint32 curr_num_pages,
5892                                Uint32 extra_mem_pages)
5893 {
5894   Uint32 loc_num_threads = get_total_number_of_block_threads();
5895   Uint32 num_nodes = get_num_nodes();
5896 
5897   Uint32 extra_pages = extra_mem_pages;
5898 
5899   /**
5900    * Add 2M for each thread since we allocate 1M every
5901    * time we allocate and also we ensure there is also a minimum
5902    * of 1M of send buffer in each thread. Thus we can easily have
5903    * 2M of send buffer just to keep the contention around the
5904    * send buffer page spinlock small. This memory we add independent
5905    * of the configuration settings since the user cannot be
5906    * expected to handle this and also since we could change this
5907    * behaviour at any time.
5908    */
5909   extra_pages += loc_num_threads * THR_SEND_BUFFER_MAX_FREE;
5910 
5911   if (extra_mem_pages == 0)
5912   {
5913     /**
5914      * The user have set extra send buffer memory to 0 and left for us
5915      * to decide on our own how much extra memory is needed.
5916      *
5917      * We'll make sure that we have at least a minimum of 16M +
5918      * 2M per thread + 256k per node. If we have this based on
5919      * curr_num_pages and our local additions we don't add
5920      * anything more, if we don't come up to this level we add to
5921      * reach this minimum level.
5922      */
5923     Uint32 min_pages = MIN_SEND_BUFFER_GENERAL +
5924       (MIN_SEND_BUFFER_PER_NODE * num_nodes) +
5925       (MIN_SEND_BUFFER_PER_THREAD * loc_num_threads);
5926 
5927     if ((curr_num_pages + extra_pages) < min_pages)
5928     {
5929       extra_pages = min_pages - curr_num_pages;
5930     }
5931   }
5932   return extra_pages;
5933 }
5934 
5935 Uint32
compute_jb_pages(struct EmulatorData * ed)5936 compute_jb_pages(struct EmulatorData * ed)
5937 {
5938   Uint32 cnt = get_total_number_of_block_threads();
5939   Uint32 num_receive_threads = globalData.ndbMtReceiveThreads;
5940   Uint32 num_lqh_threads = globalData.ndbMtLqhThreads;
5941   Uint32 num_tc_threads = globalData.ndbMtTcThreads;
5942   Uint32 num_main_threads = NUM_MAIN_THREADS;
5943 
5944   /**
5945    * Number of pages each thread needs to communicate with another
5946    * thread.
5947    */
5948   Uint32 job_queue_pages_per_thread = thr_job_queue::SIZE;
5949 
5950   /**
5951    * In 'perthread' we calculate number of pages required by
5952    * all 'block threads' (excludes 'send-threads'). 'perthread'
5953    * usage is independent of whether this thread will communicate
5954    * with other 'block threads' or not.
5955    */
5956   Uint32 perthread = 0;
5957 
5958   /**
5959    * Each threads has its own job_queue for 'prio A' signals
5960    */
5961   perthread += job_queue_pages_per_thread;
5962 
5963   /**
5964    * Each thread keeps a available free page in 'm_next_buffer'
5965    * in case it is required by insert_signal() into JBA or JBB.
5966    */
5967   perthread += 1;
5968 
5969   /**
5970    * Each thread keeps time-queued signals in 'struct thr_tq'
5971    * thr_tq::PAGES are used to store these.
5972    */
5973   perthread += thr_tq::PAGES;
5974 
5975   /**
5976    * Each thread has its own 'm_free_fifo[THR_FREE_BUF_MAX]' cache.
5977    * As it is filled to MAX *before* a page is allocated, which consumes a page,
5978    * it will never cache more than MAX-1 pages. Pages are also returned to global
5979    * allocator as soon as MAX is reached.
5980    */
5981   perthread += THR_FREE_BUF_MAX-1;
5982 
5983   /**
5984    * Start by calculating the basic number of pages required for
5985    * our 'cnt' block threads.
5986    * (no inter-thread communication assumed so far)
5987    */
5988   Uint32 tot = cnt * perthread;
5989 
5990   /**
5991    * We then start adding pages required for inter-thread communications:
5992    *
5993    * Receiver threads will be able to communicate with all other
5994    * threads except other receive threads.
5995    */
5996   tot += num_receive_threads *
5997          (cnt - num_receive_threads) *
5998          job_queue_pages_per_thread;
5999 
6000   /**
6001    * LQH threads can communicate with TC threads and main threads.
6002    * Cannot communicate with receive threads and other LQH threads,
6003    * but it can communicate with itself.
6004    */
6005   tot += num_lqh_threads *
6006          (num_tc_threads + num_main_threads + 1) *
6007          job_queue_pages_per_thread;
6008 
6009   /**
6010    * First LDM thread is special as it will act as client
6011    * during backup. It will send to, and receive from (2x)
6012    * the 'num_lqh_threads - 1' other LQH threads.
6013    */
6014   tot += 2 * (num_lqh_threads-1) *
6015          job_queue_pages_per_thread;
6016 
6017   /**
6018    * TC threads can communicate with SPJ-, LQH- and main threads.
6019    * Cannot communicate with receive threads and other TC threads,
6020    * but as SPJ is located together with TC, it is counted as it
6021    * communicate with all TC threads.
6022    */
6023   tot += num_tc_threads *
6024          (num_lqh_threads + num_main_threads + num_tc_threads) *
6025          job_queue_pages_per_thread;
6026 
6027   /**
6028    * Main threads can communicate with all other threads
6029    */
6030   tot += num_main_threads *
6031          cnt *
6032          job_queue_pages_per_thread;
6033 
6034   return tot;
6035 }
6036 
ThreadConfig()6037 ThreadConfig::ThreadConfig()
6038 {
6039   /**
6040    * We take great care within struct thr_repository to optimize
6041    * cache line placement of the different members. This all
6042    * depends on that the base address of thr_repository itself
6043    * is cache line alligned.
6044    *
6045    * So we allocate a char[] sufficient large to hold the
6046    * thr_repository object, with added bytes for placing
6047    * g_thr_repository on a CL-alligned offset withing it.
6048    */
6049   g_thr_repository_mem = new char[sizeof(thr_repository)+NDB_CL];
6050   const int alligned_offs = NDB_CL_PADSZ((UintPtr)g_thr_repository_mem);
6051   char* cache_alligned_mem = &g_thr_repository_mem[alligned_offs];
6052   require((((UintPtr)cache_alligned_mem) % NDB_CL) == 0);
6053   g_thr_repository = new(cache_alligned_mem) thr_repository();
6054 }
6055 
~ThreadConfig()6056 ThreadConfig::~ThreadConfig()
6057 {
6058   g_thr_repository->~thr_repository();
6059   g_thr_repository = NULL;
6060   delete[] g_thr_repository_mem;
6061   g_thr_repository_mem = NULL;
6062 }
6063 
6064 /*
6065  * We must do the init here rather than in the constructor, since at
6066  * constructor time the global memory manager is not available.
6067  */
6068 void
init()6069 ThreadConfig::init()
6070 {
6071   Uint32 num_lqh_threads = globalData.ndbMtLqhThreads;
6072   Uint32 num_tc_threads = globalData.ndbMtTcThreads;
6073   Uint32 num_recv_threads = globalData.ndbMtReceiveThreads;
6074   first_receiver_thread_no =
6075     NUM_MAIN_THREADS + num_tc_threads + num_lqh_threads;
6076   num_threads = first_receiver_thread_no + num_recv_threads;
6077   require(num_threads <= MAX_BLOCK_THREADS);
6078 
6079   ndbout << "NDBMT: number of block threads=" << num_threads << endl;
6080 
6081   ::rep_init(g_thr_repository, num_threads,
6082              globalEmulatorData.m_mem_manager);
6083 }
6084 
6085 /**
6086  * return receiver thread handling a particular node
6087  *   returned number is indexed from 0 and upwards to #receiver threads
6088  *   (or MAX_NODES is none)
6089  */
6090 Uint32
mt_get_recv_thread_idx(NodeId nodeId)6091 mt_get_recv_thread_idx(NodeId nodeId)
6092 {
6093   assert(nodeId < NDB_ARRAY_SIZE(g_node_to_recv_thr_map));
6094   return g_node_to_recv_thr_map[nodeId];
6095 }
6096 
6097 static
6098 void
assign_receiver_threads(void)6099 assign_receiver_threads(void)
6100 {
6101   Uint32 num_recv_threads = globalData.ndbMtReceiveThreads;
6102   Uint32 recv_thread_idx = 0;
6103   for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
6104   {
6105     Transporter *node_trp =
6106       globalTransporterRegistry.get_transporter(nodeId);
6107 
6108     if (node_trp)
6109     {
6110       g_node_to_recv_thr_map[nodeId] = recv_thread_idx;
6111       recv_thread_idx++;
6112       if (recv_thread_idx == num_recv_threads)
6113         recv_thread_idx = 0;
6114     }
6115     else
6116     {
6117       /* Flag for no transporter */
6118       g_node_to_recv_thr_map[nodeId] = MAX_NODES;
6119     }
6120   }
6121   return;
6122 }
6123 
6124 void
ipControlLoop(NdbThread * pThis)6125 ThreadConfig::ipControlLoop(NdbThread* pThis)
6126 {
6127   unsigned int thr_no;
6128   struct thr_repository* rep = g_thr_repository;
6129 
6130   rep->m_thread[first_receiver_thread_no].m_thr_index =
6131     globalEmulatorData.theConfiguration->addThread(pThis, ReceiveThread);
6132 
6133   max_send_delay = globalEmulatorData.theConfiguration->maxSendDelay();
6134 
6135   if (globalData.ndbMtSendThreads)
6136   {
6137     g_send_threads = new thr_send_threads();
6138   }
6139 
6140   /**
6141    * assign nodes to receiver threads
6142    */
6143   assign_receiver_threads();
6144 
6145   /* Start the send thread(s) */
6146   if (g_send_threads)
6147   {
6148     g_send_threads->start_send_threads();
6149   }
6150 
6151   /*
6152    * Start threads for all execution threads, except for the receiver
6153    * thread, which runs in the main thread.
6154    */
6155   for (thr_no = 0; thr_no < num_threads; thr_no++)
6156   {
6157     rep->m_thread[thr_no].m_ticks = NdbTick_getCurrentTicks();
6158 
6159     if (thr_no == first_receiver_thread_no)
6160       continue;                 // Will run in the main thread.
6161 
6162     /*
6163      * The NdbThread_Create() takes void **, but that is cast to void * when
6164      * passed to the thread function. Which is kind of strange ...
6165      */
6166     if (thr_no < first_receiver_thread_no)
6167     {
6168       /* Start block threads */
6169       struct NdbThread *thread_ptr =
6170         NdbThread_Create(mt_job_thread_main,
6171                          (void **)(rep->m_thread + thr_no),
6172                          1024*1024,
6173                          "execute thread", //ToDo add number
6174                          NDB_THREAD_PRIO_MEAN);
6175       require(thread_ptr != NULL);
6176       rep->m_thread[thr_no].m_thr_index =
6177         globalEmulatorData.theConfiguration->addThread(thread_ptr,
6178                                                        BlockThread);
6179       rep->m_thread[thr_no].m_thread = thread_ptr;
6180     }
6181     else
6182     {
6183       /* Start a receiver thread, also block thread for TRPMAN */
6184       struct NdbThread *thread_ptr =
6185         NdbThread_Create(mt_receiver_thread_main,
6186                          (void **)(&rep->m_thread[thr_no]),
6187                          1024*1024,
6188                          "receive thread", //ToDo add number
6189                          NDB_THREAD_PRIO_MEAN);
6190       require(thread_ptr != NULL);
6191       globalEmulatorData.theConfiguration->addThread(thread_ptr,
6192                                                      ReceiveThread);
6193       rep->m_thread[thr_no].m_thread = thread_ptr;
6194     }
6195   }
6196 
6197   /* Now run the main loop for first receiver thread directly. */
6198   rep->m_thread[first_receiver_thread_no].m_thread = pThis;
6199   mt_receiver_thread_main(&(rep->m_thread[first_receiver_thread_no]));
6200 
6201   /* Wait for all threads to shutdown. */
6202   for (thr_no = 0; thr_no < num_threads; thr_no++)
6203   {
6204     if (thr_no == first_receiver_thread_no)
6205       continue;
6206     void *dummy_return_status;
6207     NdbThread_WaitFor(rep->m_thread[thr_no].m_thread,
6208                       &dummy_return_status);
6209     globalEmulatorData.theConfiguration->removeThread(
6210       rep->m_thread[thr_no].m_thread);
6211     NdbThread_Destroy(&(rep->m_thread[thr_no].m_thread));
6212   }
6213 
6214   /* Delete send threads, includes waiting for threads to shutdown */
6215   if (g_send_threads)
6216   {
6217     delete g_send_threads;
6218     g_send_threads = NULL;
6219   }
6220   globalEmulatorData.theConfiguration->removeThread(pThis);
6221 }
6222 
6223 int
doStart(NodeState::StartLevel startLevel)6224 ThreadConfig::doStart(NodeState::StartLevel startLevel)
6225 {
6226   SignalT<3> signalT;
6227   memset(&signalT.header, 0, sizeof(SignalHeader));
6228 
6229   signalT.header.theVerId_signalNumber   = GSN_START_ORD;
6230   signalT.header.theReceiversBlockNumber = CMVMI;
6231   signalT.header.theSendersBlockRef      = 0;
6232   signalT.header.theTrace                = 0;
6233   signalT.header.theSignalId             = 0;
6234   signalT.header.theLength               = StartOrd::SignalLength;
6235 
6236   StartOrd * startOrd = CAST_PTR(StartOrd, &signalT.theData[0]);
6237   startOrd->restartInfo = 0;
6238 
6239   sendprioa(block2ThreadId(CMVMI, 0), &signalT.header, signalT.theData, 0);
6240   return 0;
6241 }
6242 
6243 Uint32
traceDumpGetNumThreads()6244 FastScheduler::traceDumpGetNumThreads()
6245 {
6246   /* The last thread is only for receiver -> no trace file. */
6247   return num_threads;
6248 }
6249 
6250 bool
traceDumpGetJam(Uint32 thr_no,const JamEvent * & thrdTheEmulatedJam,Uint32 & thrdTheEmulatedJamIndex)6251 FastScheduler::traceDumpGetJam(Uint32 thr_no,
6252                                const JamEvent * & thrdTheEmulatedJam,
6253                                Uint32 & thrdTheEmulatedJamIndex)
6254 {
6255   if (thr_no >= num_threads)
6256     return false;
6257 
6258 #ifdef NO_EMULATED_JAM
6259   thrdTheEmulatedJam = NULL;
6260   thrdTheEmulatedJamIndex = 0;
6261 #else
6262   const EmulatedJamBuffer *jamBuffer =
6263     &g_thr_repository->m_thread[thr_no].m_jam;
6264   thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
6265   thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
6266 #endif
6267   return true;
6268 }
6269 
6270 void
traceDumpPrepare(NdbShutdownType & nst)6271 FastScheduler::traceDumpPrepare(NdbShutdownType& nst)
6272 {
6273   /*
6274    * We are about to generate trace files for all threads.
6275    *
6276    * We want to stop all threads processing before we dump, as otherwise the
6277    * signal buffers could change while dumping, leading to inconsistent
6278    * results.
6279    *
6280    * To stop threads, we send the GSN_STOP_FOR_CRASH signal as prio A to each
6281    * thread. We then wait for threads to signal they are done (but not forever,
6282    * so as to not have one hanging thread prevent the generation of trace
6283    * dumps). We also must be careful not to send to ourself if the crash is
6284    * being processed by one of the threads processing signals.
6285    *
6286    * We do not stop the transporter thread, as it cannot receive signals (but
6287    * because it does not receive signals it does not really influence dumps in
6288    * any case).
6289    */
6290   void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
6291   const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
6292   /* The selfptr might be NULL, or pointer to thread that crashed. */
6293 
6294   Uint32 waitFor_count = 0;
6295   NdbMutex_Lock(&g_thr_repository->stop_for_crash_mutex);
6296   g_thr_repository->stopped_threads = 0;
6297 
6298   for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++)
6299   {
6300     if (selfptr != NULL && selfptr->m_thr_no == thr_no)
6301     {
6302       /* This is own thread; we have already stopped processing. */
6303       continue;
6304     }
6305 
6306     sendprioa_STOP_FOR_CRASH(selfptr, thr_no);
6307 
6308     waitFor_count++;
6309   }
6310 
6311   static const Uint32 max_wait_seconds = 2;
6312   const NDB_TICKS start = NdbTick_getCurrentTicks();
6313   while (g_thr_repository->stopped_threads < waitFor_count)
6314   {
6315     NdbCondition_WaitTimeout(&g_thr_repository->stop_for_crash_cond,
6316                              &g_thr_repository->stop_for_crash_mutex,
6317                              10);
6318     const NDB_TICKS now = NdbTick_getCurrentTicks();
6319     if (NdbTick_Elapsed(start,now).seconds() > max_wait_seconds)
6320       break;                    // Give up
6321   }
6322   if (g_thr_repository->stopped_threads < waitFor_count)
6323   {
6324     if (nst != NST_ErrorInsert)
6325     {
6326       nst = NST_Watchdog; // Make this abort fast
6327     }
6328     ndbout_c("Warning: %d thread(s) did not stop before starting crash dump.",
6329              waitFor_count - g_thr_repository->stopped_threads);
6330   }
6331   NdbMutex_Unlock(&g_thr_repository->stop_for_crash_mutex);
6332 
6333   /* Now we are ready (or as ready as can be) for doing crash dump. */
6334 }
6335 
6336 /**
6337  * In ndbmtd we could have a case where we actually have multiple threads
6338  * crashing at the same time. This causes several threads to start processing
6339  * the crash handling in parallel and eventually lead to a deadlock since
6340  * the crash handling thread waits for other threads to stop before completing
6341  * the crash handling.
6342  *
6343  * To avoid this we use this function that only is useful in ndbmtd where
6344  * we check if the crash handling has already started. We protect this
6345  * check using the stop_for_crash-mutex. This function is called twice,
6346  * first to write an entry in the error log and second to specify that the
6347  * error log write is completed.
6348  *
6349  * We proceed only from the first call if the crash handling hasn't started
6350  * or if the crash is not caused by an error insert. If it is caused by an
6351  * error insert it is a normal situation with multiple crashes, so we won't
6352  * clutter the error log with multiple entries in this case. If it is a real
6353  * crash and we have more than one thread crashing, then this is vital
6354  * information to write in the error log, we do however not want more than
6355  * one set of trace files.
6356  *
6357  * To ensure that writes of the error log happens for one thread at a time we
6358  * protect it with the stop_for_crash-mutex. We hold this mutex between the
6359  * first and second call of this function from the error reporter thread.
6360  *
6361  * We proceed from the first call only if we are the first thread that
6362  * reported an error. To handle this properly we start by acquiring the
6363  * mutex, then we write the error log, when we come back we set the
6364  * crash_started flag and release the mutex to enable other threads to
6365  * write into the error log, but still stopping them from proceeding to
6366  * write another set of trace files.
6367  *
6368  * We will not come back from this function the second time unless we are
6369  * the first crashing thread.
6370  */
6371 
6372 static bool crash_started = false;
6373 
6374 void
prepare_to_crash(bool first_phase,bool error_insert_crash)6375 ErrorReporter::prepare_to_crash(bool first_phase, bool error_insert_crash)
6376 {
6377   if (first_phase)
6378   {
6379     NdbMutex_Lock(&g_thr_repository->stop_for_crash_mutex);
6380     if (crash_started && error_insert_crash)
6381     {
6382       /**
6383        * Some other thread has already started the crash handling.
6384        * We call the below method which we will never return from.
6385        * We need not write multiple entries in error log for
6386        * error insert crashes since it is a normal event.
6387        */
6388       NdbMutex_Unlock(&g_thr_repository->stop_for_crash_mutex);
6389       mt_execSTOP_FOR_CRASH();
6390     }
6391     /**
6392      * Proceed to write error log before returning to this method
6393      * again with start set to 0.
6394      */
6395   }
6396   else if (crash_started)
6397   {
6398     (void)error_insert_crash;
6399     /**
6400      * No need to proceed since somebody already started handling the crash.
6401      * We proceed by calling mt_execSTOP_FOR_CRASH to stop this thread
6402      * in a manner that is similar to if we received the signal
6403      * STOP_FOR_CRASH.
6404      */
6405     NdbMutex_Unlock(&g_thr_repository->stop_for_crash_mutex);
6406     mt_execSTOP_FOR_CRASH();
6407   }
6408   else
6409   {
6410     /**
6411      * No crash had started previously, we will take care of it. Before
6412      * handling it we will mark the crash handling as started.
6413      */
6414     crash_started = true;
6415     NdbMutex_Unlock(&g_thr_repository->stop_for_crash_mutex);
6416   }
6417 }
6418 
mt_execSTOP_FOR_CRASH()6419 void mt_execSTOP_FOR_CRASH()
6420 {
6421   void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
6422   const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
6423   require(selfptr != NULL);
6424 
6425   NdbMutex_Lock(&g_thr_repository->stop_for_crash_mutex);
6426   g_thr_repository->stopped_threads++;
6427   NdbCondition_Signal(&g_thr_repository->stop_for_crash_cond);
6428   NdbMutex_Unlock(&g_thr_repository->stop_for_crash_mutex);
6429 
6430   /* ToDo: is this correct? */
6431   globalEmulatorData.theWatchDog->unregisterWatchedThread(selfptr->m_thr_no);
6432 
6433   my_thread_exit(NULL);
6434 }
6435 
6436 void
dumpSignalMemory(Uint32 thr_no,FILE * out)6437 FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE* out)
6438 {
6439   void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
6440   thr_data *selfptr = reinterpret_cast<thr_data *>(value);
6441   const thr_repository *rep = g_thr_repository;
6442   /*
6443    * The selfptr might be NULL, or pointer to thread that is doing the crash
6444    * jump.
6445    * If non-null, we should update the watchdog counter while dumping.
6446    */
6447   Uint32 *watchDogCounter;
6448   if (selfptr)
6449     watchDogCounter = &selfptr->m_watchdog_counter;
6450   else
6451     watchDogCounter = NULL;
6452 
6453   /*
6454    * We want to dump the signal buffers from last executed to first executed.
6455    * So we first need to find the correct sequence to output signals in, stored
6456    * in this arrray.
6457    *
6458    * We will check any buffers in the cyclic m_free_fifo. In addition,
6459    * we also need to scan the already executed part of the current
6460    * buffer in m_jba.
6461    *
6462    * Due to partial execution of prio A buffers, we will use signal ids to know
6463    * where to interleave prio A signals into the stream of prio B signals
6464    * read. So we will keep a pointer to a prio A buffer around; and while
6465    * scanning prio B buffers we will interleave prio A buffers from that buffer
6466    * when the signal id fits the sequence.
6467    *
6468    * This also means that we may have to discard the earliest part of available
6469    * prio A signal data due to too little prio B data present, or vice versa.
6470    */
6471   static const Uint32 MAX_SIGNALS_TO_DUMP = 4096;
6472   struct {
6473     const SignalHeader *ptr;
6474     bool prioa;
6475   } signalSequence[MAX_SIGNALS_TO_DUMP];
6476   Uint32 seq_start = 0;
6477   Uint32 seq_end = 0;
6478 
6479   const struct thr_data *thr_ptr = &rep->m_thread[thr_no];
6480   if (watchDogCounter)
6481     *watchDogCounter = 4;
6482 
6483   /*
6484    * ToDo: Might do some sanity check to avoid crashing on not yet initialised
6485    * thread.
6486    */
6487 
6488   /* Scan all available buffers with already executed signals. */
6489 
6490   /*
6491    * Keep track of all available buffers, so that we can pick out signals in
6492    * the same order they were executed (order obtained from signal id).
6493    *
6494    * We may need to keep track of THR_FREE_BUF_MAX buffers for fully executed
6495    * (and freed) buffers, plus MAX_BLOCK_THREADS buffers for currently active
6496    * prio B buffers, plus one active prio A buffer.
6497    */
6498   struct {
6499     const thr_job_buffer *m_jb;
6500     Uint32 m_pos;
6501     Uint32 m_max;
6502   } jbs[THR_FREE_BUF_MAX + MAX_BLOCK_THREADS + 1];
6503 
6504   Uint32 num_jbs = 0;
6505 
6506   /* Load released buffers. */
6507   Uint32 idx = thr_ptr->m_first_free;
6508   while (idx != thr_ptr->m_first_unused)
6509   {
6510     const thr_job_buffer *q = thr_ptr->m_free_fifo[idx];
6511     if (q->m_len > 0)
6512     {
6513       jbs[num_jbs].m_jb = q;
6514       jbs[num_jbs].m_pos = 0;
6515       jbs[num_jbs].m_max = q->m_len;
6516       num_jbs++;
6517     }
6518     idx = (idx + 1) % THR_FREE_BUF_MAX;
6519   }
6520   /* Load any active prio B buffers. */
6521   for (Uint32 thr_no = 0; thr_no < rep->m_thread_count; thr_no++)
6522   {
6523     const thr_job_queue *q = thr_ptr->m_in_queue + thr_no;
6524     const thr_jb_read_state *r = thr_ptr->m_read_states + thr_no;
6525     Uint32 read_pos = r->m_read_pos;
6526     if (r->is_open() && read_pos > 0)
6527     {
6528       jbs[num_jbs].m_jb = q->m_buffers[r->m_read_index];
6529       jbs[num_jbs].m_pos = 0;
6530       jbs[num_jbs].m_max = read_pos;
6531       num_jbs++;
6532     }
6533   }
6534   /* Load any active prio A buffer. */
6535   const thr_jb_read_state *r = &thr_ptr->m_jba_read_state;
6536   Uint32 read_pos = r->m_read_pos;
6537   if (read_pos > 0)
6538   {
6539     jbs[num_jbs].m_jb = thr_ptr->m_jba.m_buffers[r->m_read_index];
6540     jbs[num_jbs].m_pos = 0;
6541     jbs[num_jbs].m_max = read_pos;
6542     num_jbs++;
6543   }
6544 
6545   /* Use the next signal id as the smallest (oldest).
6546    *
6547    * Subtracting two signal ids with the smallest makes
6548    * them comparable using standard comparision of Uint32,
6549    * there the biggest value is the newest.
6550    * For example,
6551    *   (m_signal_id_counter - smallest_signal_id) == UINT32_MAX
6552    */
6553   const Uint32 smallest_signal_id = thr_ptr->m_signal_id_counter + 1;
6554 
6555   /* Now pick out one signal at a time, in signal id order. */
6556   while (num_jbs > 0)
6557   {
6558     if (watchDogCounter)
6559       *watchDogCounter = 4;
6560 
6561     /* Search out the smallest signal id remaining. */
6562     Uint32 idx_min = 0;
6563     const Uint32 *p = jbs[idx_min].m_jb->m_data + jbs[idx_min].m_pos;
6564     const SignalHeader *s_min = reinterpret_cast<const SignalHeader*>(p);
6565     Uint32 sid_min_adjusted = s_min->theSignalId - smallest_signal_id;
6566 
6567     for (Uint32 i = 1; i < num_jbs; i++)
6568     {
6569       p = jbs[i].m_jb->m_data + jbs[i].m_pos;
6570       const SignalHeader *s = reinterpret_cast<const SignalHeader*>(p);
6571       const Uint32 sid_adjusted = s->theSignalId - smallest_signal_id;
6572       if (sid_adjusted < sid_min_adjusted)
6573       {
6574         idx_min = i;
6575         s_min = s;
6576         sid_min_adjusted = sid_adjusted;
6577       }
6578     }
6579 
6580     /* We found the next signal, now put it in the ordered cyclic buffer. */
6581     signalSequence[seq_end].ptr = s_min;
6582     signalSequence[seq_end].prioa = jbs[idx_min].m_jb->m_prioa;
6583     Uint32 siglen =
6584       (sizeof(SignalHeader)>>2) + s_min->m_noOfSections + s_min->theLength;
6585 #if SIZEOF_CHARP == 8
6586     /* Align to 8-byte boundary, to ensure aligned copies. */
6587     siglen= (siglen+1) & ~((Uint32)1);
6588 #endif
6589     jbs[idx_min].m_pos += siglen;
6590     if (jbs[idx_min].m_pos >= jbs[idx_min].m_max)
6591     {
6592       /* We are done with this job buffer. */
6593       num_jbs--;
6594       jbs[idx_min] = jbs[num_jbs];
6595     }
6596     seq_end = (seq_end + 1) % MAX_SIGNALS_TO_DUMP;
6597     /* Drop old signals if too many available in history. */
6598     if (seq_end == seq_start)
6599       seq_start = (seq_start + 1) % MAX_SIGNALS_TO_DUMP;
6600   }
6601 
6602   /* Now, having build the correct signal sequence, we can dump them all. */
6603   fprintf(out, "\n");
6604   bool first_one = true;
6605   bool out_of_signals = false;
6606   Uint32 lastSignalId = 0;
6607   while (seq_end != seq_start)
6608   {
6609     if (watchDogCounter)
6610       *watchDogCounter = 4;
6611 
6612     if (seq_end == 0)
6613       seq_end = MAX_SIGNALS_TO_DUMP;
6614     seq_end--;
6615     SignalT<25> signal;
6616     const SignalHeader *s = signalSequence[seq_end].ptr;
6617     unsigned siglen = (sizeof(*s)>>2) + s->theLength;
6618     if (siglen > MAX_SIGNAL_SIZE)
6619       siglen = MAX_SIGNAL_SIZE;              // Sanity check
6620     memcpy(&signal.header, s, 4*siglen);
6621     // instance number in trace file is confusing if not MT LQH
6622     if (globalData.ndbMtLqhWorkers == 0)
6623       signal.header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
6624 
6625     const Uint32 *posptr = reinterpret_cast<const Uint32 *>(s);
6626     signal.m_sectionPtrI[0] = posptr[siglen + 0];
6627     signal.m_sectionPtrI[1] = posptr[siglen + 1];
6628     signal.m_sectionPtrI[2] = posptr[siglen + 2];
6629     bool prioa = signalSequence[seq_end].prioa;
6630 
6631     /* Make sure to display clearly when there is a gap in the dump. */
6632     if (!first_one && !out_of_signals && (s->theSignalId + 1) != lastSignalId)
6633     {
6634       out_of_signals = true;
6635       fprintf(out, "\n\n\nNo more prio %s signals, rest of dump will be "
6636               "incomplete.\n\n\n\n", prioa ? "B" : "A");
6637     }
6638     first_one = false;
6639     lastSignalId = s->theSignalId;
6640 
6641     fprintf(out, "--------------- Signal ----------------\n");
6642     Uint32 prio = (prioa ? JBA : JBB);
6643     SignalLoggerManager::printSignalHeader(out,
6644                                            signal.header,
6645                                            prio,
6646                                            globalData.ownId,
6647                                            true);
6648     SignalLoggerManager::printSignalData  (out,
6649                                            signal.header,
6650                                            &signal.theData[0]);
6651   }
6652   fflush(out);
6653 }
6654 
6655 int
traceDumpGetCurrentThread()6656 FastScheduler::traceDumpGetCurrentThread()
6657 {
6658   void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
6659   const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
6660 
6661   /* The selfptr might be NULL, or pointer to thread that crashed. */
6662   if (selfptr == 0)
6663   {
6664     return -1;
6665   }
6666   else
6667   {
6668     return (int)selfptr->m_thr_no;
6669   }
6670 }
6671 
6672 void
mt_section_lock()6673 mt_section_lock()
6674 {
6675   lock(&(g_thr_repository->m_section_lock));
6676 }
6677 
6678 void
mt_section_unlock()6679 mt_section_unlock()
6680 {
6681   unlock(&(g_thr_repository->m_section_lock));
6682 }
6683 
6684 void
mt_mem_manager_init()6685 mt_mem_manager_init()
6686 {
6687 }
6688 
6689 void
mt_mem_manager_lock()6690 mt_mem_manager_lock()
6691 {
6692   lock(&(g_thr_repository->m_mem_manager_lock));
6693 }
6694 
6695 void
mt_mem_manager_unlock()6696 mt_mem_manager_unlock()
6697 {
6698   unlock(&(g_thr_repository->m_mem_manager_lock));
6699 }
6700 
6701 Vector<mt_lock_stat> g_locks;
6702 template class Vector<mt_lock_stat>;
6703 
6704 static
6705 void
register_lock(const void * ptr,const char * name)6706 register_lock(const void * ptr, const char * name)
6707 {
6708   if (name == 0)
6709     return;
6710 
6711   mt_lock_stat* arr = g_locks.getBase();
6712   for (size_t i = 0; i<g_locks.size(); i++)
6713   {
6714     if (arr[i].m_ptr == ptr)
6715     {
6716       if (arr[i].m_name)
6717       {
6718         free(arr[i].m_name);
6719       }
6720       arr[i].m_name = strdup(name);
6721       return;
6722     }
6723   }
6724 
6725   mt_lock_stat ln;
6726   ln.m_ptr = ptr;
6727   ln.m_name = strdup(name);
6728   ln.m_contended_count = 0;
6729   ln.m_spin_count = 0;
6730   g_locks.push_back(ln);
6731 }
6732 
6733 #if defined(NDB_HAVE_XCNG) && defined(NDB_USE_SPINLOCK)
6734 static
6735 mt_lock_stat *
lookup_lock(const void * ptr)6736 lookup_lock(const void * ptr)
6737 {
6738   mt_lock_stat* arr = g_locks.getBase();
6739   for (size_t i = 0; i<g_locks.size(); i++)
6740   {
6741     if (arr[i].m_ptr == ptr)
6742       return arr + i;
6743   }
6744 
6745   return 0;
6746 }
6747 #endif
6748 
6749 Uint32
mt_get_thread_references_for_blocks(const Uint32 blocks[],Uint32 threadId,Uint32 dst[],Uint32 len)6750 mt_get_thread_references_for_blocks(const Uint32 blocks[], Uint32 threadId,
6751                                     Uint32 dst[], Uint32 len)
6752 {
6753   Uint32 cnt = 0;
6754   Bitmask<(MAX_BLOCK_THREADS+31)/32> mask;
6755   mask.set(threadId);
6756   for (Uint32 i = 0; blocks[i] != 0; i++)
6757   {
6758     Uint32 block = blocks[i];
6759     /**
6760      * Find each thread that has instance of block
6761      */
6762     assert(block == blockToMain(block));
6763     Uint32 index = block - MIN_BLOCK_NO;
6764     for (Uint32 instance = 0; instance < NDB_ARRAY_SIZE(thr_map[instance]); instance++)
6765     {
6766       Uint32 thr_no = thr_map[index][instance].thr_no;
6767       if (thr_no == thr_map_entry::NULL_THR_NO)
6768         break;
6769 
6770       if (mask.get(thr_no))
6771         continue;
6772 
6773       mask.set(thr_no);
6774       require(cnt < len);
6775       dst[cnt++] = numberToRef(block, instance, 0);
6776     }
6777   }
6778   return cnt;
6779 }
6780 
6781 void
mt_wakeup(class SimulatedBlock * block)6782 mt_wakeup(class SimulatedBlock* block)
6783 {
6784   Uint32 thr_no = block->getThreadId();
6785   struct thr_data *thrptr = &g_thr_repository->m_thread[thr_no];
6786   wakeup(&thrptr->m_waiter);
6787 }
6788 
6789 #ifdef VM_TRACE
6790 void
mt_assert_own_thread(SimulatedBlock * block)6791 mt_assert_own_thread(SimulatedBlock* block)
6792 {
6793   Uint32 thr_no = block->getThreadId();
6794   struct thr_data *thrptr = &g_thr_repository->m_thread[thr_no];
6795 
6796   if (unlikely(my_thread_equal(thrptr->m_thr_id, my_thread_self()) == 0))
6797   {
6798     fprintf(stderr, "mt_assert_own_thread() - assertion-failure\n");
6799     fflush(stderr);
6800     abort();
6801   }
6802 }
6803 #endif
6804 
6805 
6806 Uint32
mt_get_blocklist(SimulatedBlock * block,Uint32 arr[],Uint32 len)6807 mt_get_blocklist(SimulatedBlock * block, Uint32 arr[], Uint32 len)
6808 {
6809   Uint32 thr_no = block->getThreadId();
6810   struct thr_data *thr_ptr = &g_thr_repository->m_thread[thr_no];
6811 
6812   for (Uint32 i = 0; i < thr_ptr->m_instance_count; i++)
6813   {
6814     arr[i] = thr_ptr->m_instance_list[i];
6815   }
6816 
6817   return thr_ptr->m_instance_count;
6818 }
6819 
6820 void
mt_get_thr_stat(class SimulatedBlock * block,ndb_thr_stat * dst)6821 mt_get_thr_stat(class SimulatedBlock * block, ndb_thr_stat* dst)
6822 {
6823   bzero(dst, sizeof(* dst));
6824   Uint32 thr_no = block->getThreadId();
6825   struct thr_data *selfptr = &g_thr_repository->m_thread[thr_no];
6826 
6827   THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
6828   dst->thr_no = thr_no;
6829   dst->name = conf.getName(selfptr->m_instance_list, selfptr->m_instance_count);
6830   dst->os_tid = NdbThread_GetTid(selfptr->m_thread);
6831   dst->loop_cnt = selfptr->m_stat.m_loop_cnt;
6832   dst->exec_cnt = selfptr->m_stat.m_exec_cnt;
6833   dst->wait_cnt = selfptr->m_stat.m_wait_cnt;
6834   dst->local_sent_prioa = selfptr->m_stat.m_prioa_count;
6835   dst->local_sent_priob = selfptr->m_stat.m_priob_count;
6836 }
6837 
6838 TransporterReceiveHandle *
mt_get_trp_receive_handle(unsigned instance)6839 mt_get_trp_receive_handle(unsigned instance)
6840 {
6841   assert(instance > 0 && instance <= MAX_NDBMT_RECEIVE_THREADS);
6842   if (instance > 0 && instance <= MAX_NDBMT_RECEIVE_THREADS)
6843   {
6844     return g_trp_receive_handle_ptr[instance - 1 /* proxy */];
6845   }
6846   return 0;
6847 }
6848 
6849 /**
6850  * Global data
6851  */
6852 static struct trp_callback g_trp_callback;
6853 
6854 TransporterRegistry globalTransporterRegistry(&g_trp_callback, NULL,
6855                                               false);
6856