1 /* Copyright (c) 2008, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
22
23 #include <ndb_global.h>
24
25 #define NDBD_MULTITHREADED
26
27 #include <VMSignal.hpp>
28 #include <kernel_types.h>
29 #include <Prio.hpp>
30 #include <SignalLoggerManager.hpp>
31 #include <SimulatedBlock.hpp>
32 #include <ErrorHandlingMacros.hpp>
33 #include <GlobalData.hpp>
34 #include <WatchDog.hpp>
35 #include <TransporterDefinitions.hpp>
36 #include <TransporterRegistry.hpp>
37 #include "FastScheduler.hpp"
38 #include "mt.hpp"
39 #include <DebuggerNames.hpp>
40 #include <signaldata/StopForCrash.hpp>
41 #include "TransporterCallbackKernel.hpp"
42 #include <NdbSleep.h>
43 #include <portlib/ndb_prefetch.h>
44
45 #include "mt-asm.h"
46 #include "mt-lock.hpp"
47
48 #include "ThreadConfig.hpp"
49 #include <signaldata/StartOrd.hpp>
50
51 #include <NdbTick.h>
52 #include <NdbMutex.h>
53 #include <NdbCondition.h>
54 #include <ErrorReporter.hpp>
55 #include <EventLogger.hpp>
56
57 extern EventLogger * g_eventLogger;
58
59
60 static void dumpJobQueues(void);
61
62 inline
63 SimulatedBlock*
mt_getBlock(BlockNumber blockNo,Uint32 instanceNo)64 GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo)
65 {
66 SimulatedBlock* b = getBlock(blockNo);
67 if (b != 0 && instanceNo != 0)
68 b = b->getInstance(instanceNo);
69 return b;
70 }
71
72 #ifdef __GNUC__
73 /* Provides a small (but noticeable) speedup in benchmarks. */
74 #define memcpy __builtin_memcpy
75 #endif
76
77 /* Constants found by benchmarks to be reasonable values. */
78
79 /* Maximum number of signals to execute before sending to remote nodes. */
80 static const Uint32 MAX_SIGNALS_BEFORE_SEND = 200;
81
82 /*
83 * Max. signals to execute from one job buffer before considering other
84 * possible stuff to do.
85 */
86 static const Uint32 MAX_SIGNALS_PER_JB = 75;
87
88 /**
89 * Max signals written to other thread before calling flush_jbb_write_state
90 */
91 static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_RECEIVER = 2;
92 static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_OTHER = 20;
93 static const Uint32 MAX_SIGNALS_BEFORE_WAKEUP = 128;
94
95 //#define NDB_MT_LOCK_TO_CPU
96
97 #define NUM_MAIN_THREADS 2 // except receiver
98 /*
99 MAX_BLOCK_THREADS need not include the send threads since it's
100 used to set size of arrays used by all threads that contains a
101 job buffer and executes signals. The send threads only sends
102 messages directed to other nodes and contains no blocks and
103 executes thus no signals.
104 */
105 #define MAX_BLOCK_THREADS (NUM_MAIN_THREADS + \
106 MAX_NDBMT_LQH_THREADS + \
107 MAX_NDBMT_TC_THREADS + \
108 MAX_NDBMT_RECEIVE_THREADS)
109
110 /* If this is too small it crashes before first signal. */
111 #define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)
112
113 static Uint32 num_threads = 0;
114 static Uint32 first_receiver_thread_no = 0;
115 static Uint32 max_send_delay = 0;
116
117 #define NO_SEND_THREAD (MAX_BLOCK_THREADS + MAX_NDBMT_SEND_THREADS + 1)
118
119 /* max signal is 32 words, 7 for signal header and 25 datawords */
120 #define MAX_SIGNAL_SIZE 32
121 #define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / MAX_SIGNAL_SIZE) //255
122
123 #if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG)
124 #define USE_FUTEX
125 #endif
126
127 #ifdef USE_FUTEX
128 #ifndef _GNU_SOURCE
129 #define _GNU_SOURCE
130 #endif
131 #include <unistd.h>
132 #include <sys/syscall.h>
133 #include <sys/types.h>
134
135 #define FUTEX_WAIT 0
136 #define FUTEX_WAKE 1
137 #define FUTEX_FD 2
138 #define FUTEX_REQUEUE 3
139 #define FUTEX_CMP_REQUEUE 4
140 #define FUTEX_WAKE_OP 5
141
142 static inline
143 int
futex_wait(volatile unsigned * addr,int val,const struct timespec * timeout)144 futex_wait(volatile unsigned * addr, int val, const struct timespec * timeout)
145 {
146 return syscall(SYS_futex,
147 addr, FUTEX_WAIT, val, timeout, 0, 0) == 0 ? 0 : errno;
148 }
149
150 static inline
151 int
futex_wake(volatile unsigned * addr)152 futex_wake(volatile unsigned * addr)
153 {
154 return syscall(SYS_futex, addr, FUTEX_WAKE, 1, 0, 0, 0) == 0 ? 0 : errno;
155 }
156
MY_ALIGNED(NDB_CL)157 struct MY_ALIGNED(NDB_CL) thr_wait
158 {
159 volatile unsigned m_futex_state;
160 enum {
161 FS_RUNNING = 0,
162 FS_SLEEPING = 1
163 };
164 thr_wait() {
165 assert((sizeof(*this) % NDB_CL) == 0); //Maintain any CL-allignment
166 xcng(&m_futex_state, FS_RUNNING);
167 }
168 void init () {}
169 };
170
171 /**
172 * Sleep until woken up or timeout occurs.
173 *
174 * Will call check_callback(check_arg) after proper synchronisation, and only
175 * if that returns true will it actually sleep, else it will return
176 * immediately. This is needed to avoid races with wakeup.
177 *
178 * Returns 'true' if it actually did sleep.
179 */
180 template<typename T>
181 static inline
182 bool
yield(struct thr_wait * wait,const Uint32 nsec,bool (* check_callback)(T *),T * check_arg)183 yield(struct thr_wait* wait, const Uint32 nsec,
184 bool (*check_callback)(T*), T* check_arg)
185 {
186 volatile unsigned * val = &wait->m_futex_state;
187 #ifndef NDEBUG
188 int old =
189 #endif
190 xcng(val, thr_wait::FS_SLEEPING);
191 assert(old == thr_wait::FS_RUNNING);
192
193 /**
194 * At this point, we need to re-check the condition that made us decide to
195 * sleep, and skip sleeping if it changed..
196 *
197 * Otherwise, the condition may have not changed, and the thread making the
198 * change have already decided not to wake us, as our state was FS_RUNNING
199 * at the time.
200 *
201 * Also need a memory barrier to ensure this extra check is race-free.
202 * but that is already provided by xcng
203 */
204 const bool waited = (*check_callback)(check_arg);
205 if (waited)
206 {
207 struct timespec timeout;
208 timeout.tv_sec = 0;
209 timeout.tv_nsec = nsec;
210 futex_wait(val, thr_wait::FS_SLEEPING, &timeout);
211 }
212 xcng(val, thr_wait::FS_RUNNING);
213 return waited;
214 }
215
216 static inline
217 int
wakeup(struct thr_wait * wait)218 wakeup(struct thr_wait* wait)
219 {
220 volatile unsigned * val = &wait->m_futex_state;
221 /**
222 * We must ensure that any state update (new data in buffers...) are visible
223 * to the other thread before we can look at the sleep state of that other
224 * thread.
225 */
226 if (xcng(val, thr_wait::FS_RUNNING) == thr_wait::FS_SLEEPING)
227 {
228 return futex_wake(val);
229 }
230 return 0;
231 }
232 #else
233
MY_ALIGNED(NDB_CL)234 struct MY_ALIGNED(NDB_CL) thr_wait
235 {
236 NdbMutex *m_mutex;
237 NdbCondition *m_cond;
238 bool m_need_wakeup;
239 thr_wait() : m_mutex(0), m_cond(0), m_need_wakeup(false) {
240 assert((sizeof(*this) % NDB_CL) == 0); //Maintain any CL-allignment
241 }
242
243 void init() {
244 m_mutex = NdbMutex_Create();
245 m_cond = NdbCondition_Create();
246 }
247 };
248
249 template<typename T>
250 static inline
251 bool
yield(struct thr_wait * wait,const Uint32 nsec,bool (* check_callback)(T *),T * check_arg)252 yield(struct thr_wait* wait, const Uint32 nsec,
253 bool (*check_callback)(T*), T* check_arg)
254 {
255 struct timespec end;
256 NdbCondition_ComputeAbsTime(&end, (nsec >= 1000000) ? nsec/1000000 : 1);
257 NdbMutex_Lock(wait->m_mutex);
258
259 Uint32 waits = 0;
260 /* May have spurious wakeups: Always recheck condition predicate */
261 while ((*check_callback)(check_arg))
262 {
263 wait->m_need_wakeup = true;
264 waits++;
265 if (NdbCondition_WaitTimeoutAbs(wait->m_cond,
266 wait->m_mutex, &end) == ETIMEDOUT)
267 {
268 wait->m_need_wakeup = false;
269 break;
270 }
271 }
272 NdbMutex_Unlock(wait->m_mutex);
273 return (waits > 0);
274 }
275
276
277 static inline
278 int
wakeup(struct thr_wait * wait)279 wakeup(struct thr_wait* wait)
280 {
281 NdbMutex_Lock(wait->m_mutex);
282 // We should avoid signaling when not waiting for wakeup
283 if (wait->m_need_wakeup)
284 {
285 wait->m_need_wakeup = false;
286 NdbCondition_Signal(wait->m_cond);
287 }
288 NdbMutex_Unlock(wait->m_mutex);
289 return 0;
290 }
291
292 #endif
293
294 #define JAM_FILE_ID 236
295
296
297 /**
298 * thr_safe_pool
299 */
300 template<typename T>
MY_ALIGNED(NDB_CL)301 struct MY_ALIGNED(NDB_CL) thr_safe_pool
302 {
303 thr_safe_pool(const char * name) : m_lock(name), m_free_list(0), m_cnt(0) {
304 assert((sizeof(*this) % NDB_CL) == 0); //Maintain any CL-allignment
305 }
306
307 struct thr_spin_lock m_lock;
308
309 T* m_free_list;
310 Uint32 m_cnt;
311
312 T* seize(Ndbd_mem_manager *mm, Uint32 rg) {
313 T* ret = 0;
314 lock(&m_lock);
315 if (m_free_list)
316 {
317 assert(m_cnt);
318 m_cnt--;
319 ret = m_free_list;
320 m_free_list = ret->m_next;
321 unlock(&m_lock);
322 }
323 else
324 {
325 unlock(&m_lock);
326 Uint32 dummy;
327 ret = reinterpret_cast<T*>
328 (mm->alloc_page(rg, &dummy,
329 Ndbd_mem_manager::NDB_ZONE_ANY));
330 // ToDo: How to deal with failed allocation?!?
331 // I think in this case we need to start grabbing buffers kept for signal
332 // trace.
333 }
334 return ret;
335 }
336
337 Uint32 seize_list(Ndbd_mem_manager *mm, Uint32 rg,
338 Uint32 requested, T** head, T** tail)
339 {
340 lock(&m_lock);
341 if (m_cnt == 0)
342 {
343 unlock(&m_lock);
344 Uint32 dummy;
345 T* ret = reinterpret_cast<T*>
346 (mm->alloc_page(rg, &dummy,
347 Ndbd_mem_manager::NDB_ZONE_ANY));
348
349 if (ret == 0)
350 {
351 return 0;
352 }
353 else
354 {
355 ret->m_next = 0;
356 * head = * tail = ret;
357 return 1;
358 }
359 }
360 else
361 {
362 if (m_cnt < requested )
363 requested = m_cnt;
364
365 T* first = m_free_list;
366 T* last = first;
367 for (Uint32 i = 1; i < requested; i++)
368 {
369 last = last->m_next;
370 }
371 m_cnt -= requested;
372 m_free_list = last->m_next;
373 unlock(&m_lock);
374 last->m_next = 0;
375 * head = first;
376 * tail = last;
377 return requested;
378 }
379 }
380
381 void release(Ndbd_mem_manager *mm, Uint32 rg, T* t) {
382 lock(&m_lock);
383 t->m_next = m_free_list;
384 m_free_list = t;
385 m_cnt++;
386 unlock(&m_lock);
387 }
388
389 void release_list(Ndbd_mem_manager *mm, Uint32 rg,
390 T* head, T* tail, Uint32 cnt) {
391 lock(&m_lock);
392 tail->m_next = m_free_list;
393 m_free_list = head;
394 m_cnt += cnt;
395 unlock(&m_lock);
396 }
397 };
398
399 /**
400 * thread_local_pool
401 */
402 template<typename T>
403 class thread_local_pool
404 {
405 public:
thread_local_pool(thr_safe_pool<T> * global_pool,unsigned max_free,unsigned alloc_size=1)406 thread_local_pool(thr_safe_pool<T> *global_pool,
407 unsigned max_free, unsigned alloc_size = 1) :
408 m_max_free(max_free),
409 m_alloc_size(alloc_size),
410 m_free(0),
411 m_freelist(0),
412 m_global_pool(global_pool)
413 {
414 }
415
seize(Ndbd_mem_manager * mm,Uint32 rg)416 T *seize(Ndbd_mem_manager *mm, Uint32 rg) {
417 T *tmp = m_freelist;
418 if (tmp == 0)
419 {
420 T * tail;
421 m_free = m_global_pool->seize_list(mm, rg, m_alloc_size, &tmp, &tail);
422 }
423 if (tmp)
424 {
425 m_freelist = tmp->m_next;
426 assert(m_free > 0);
427 m_free--;
428 }
429
430 validate();
431 return tmp;
432 }
433
release(Ndbd_mem_manager * mm,Uint32 rg,T * t)434 void release(Ndbd_mem_manager *mm, Uint32 rg, T *t) {
435 unsigned free = m_free;
436 if (free < m_max_free)
437 {
438 m_free = free + 1;
439 t->m_next = m_freelist;
440 m_freelist = t;
441 }
442 else
443 m_global_pool->release(mm, rg, t);
444
445 validate();
446 }
447
448 /**
449 * Release to local pool even if it get's "too" full
450 * (wrt to m_max_free)
451 */
release_local(T * t)452 void release_local(T *t) {
453 m_free++;
454 t->m_next = m_freelist;
455 m_freelist = t;
456
457 validate();
458 }
459
validate() const460 void validate() const {
461 #ifdef VM_TRACE
462 Uint32 cnt = 0;
463 T* t = m_freelist;
464 while (t)
465 {
466 cnt++;
467 t = t->m_next;
468 }
469 assert(cnt == m_free);
470 #endif
471 }
472
473 /**
474 * Release entries so that m_max_free is honored
475 * (likely used together with release_local)
476 */
release_global(Ndbd_mem_manager * mm,Uint32 rg)477 void release_global(Ndbd_mem_manager *mm, Uint32 rg) {
478 validate();
479 unsigned cnt = 0;
480 unsigned free = m_free;
481 Uint32 maxfree = m_max_free;
482 assert(maxfree > 0);
483
484 T* head = m_freelist;
485 T* tail = m_freelist;
486 if (free > maxfree)
487 {
488 cnt++;
489 free--;
490
491 while (free > maxfree)
492 {
493 cnt++;
494 free--;
495 tail = tail->m_next;
496 }
497
498 assert(free == maxfree);
499
500 m_free = free;
501 m_freelist = tail->m_next;
502 m_global_pool->release_list(mm, rg, head, tail, cnt);
503 }
504 validate();
505 }
506
release_all(Ndbd_mem_manager * mm,Uint32 rg)507 void release_all(Ndbd_mem_manager *mm, Uint32 rg) {
508 validate();
509 T* head = m_freelist;
510 T* tail = m_freelist;
511 if (tail)
512 {
513 unsigned cnt = 1;
514 while (tail->m_next != 0)
515 {
516 cnt++;
517 tail = tail->m_next;
518 }
519 m_global_pool->release_list(mm, rg, head, tail, cnt);
520 m_free = 0;
521 m_freelist = 0;
522 }
523 validate();
524 }
525
526 /**
527 * release everything if more than m_max_free
528 * else do nothing
529 */
release_chunk(Ndbd_mem_manager * mm,Uint32 rg)530 void release_chunk(Ndbd_mem_manager *mm, Uint32 rg) {
531 if (m_free > m_max_free)
532 release_all(mm, rg);
533 }
534
535 /**
536 * prealloc up to <em>cnt</em> pages into this pool
537 */
fill(Ndbd_mem_manager * mm,Uint32 rg,Uint32 cnt)538 bool fill(Ndbd_mem_manager *mm, Uint32 rg, Uint32 cnt)
539 {
540 if (m_free >= cnt)
541 {
542 return true;
543 }
544
545 T *head, *tail;
546 Uint32 allocated = m_global_pool->seize_list(mm, rg, m_alloc_size,
547 &head, &tail);
548 if (allocated)
549 {
550 tail->m_next = m_freelist;
551 m_freelist = head;
552 m_free += allocated;
553 return m_free >= cnt;
554 }
555
556 return false;
557 }
558
set_pool(thr_safe_pool<T> * pool)559 void set_pool(thr_safe_pool<T> * pool) { m_global_pool = pool; }
560
561 private:
562 const unsigned m_max_free;
563 const unsigned m_alloc_size;
564 unsigned m_free;
565 T *m_freelist;
566 thr_safe_pool<T> *m_global_pool;
567 };
568
569 /**
570 * Signal buffers.
571 *
572 * Each thread job queue contains a list of these buffers with signals.
573 *
574 * There is an underlying assumption that the size of this structure is the
575 * same as the global memory manager page size.
576 */
577 struct thr_job_buffer // 32k
578 {
579 static const unsigned SIZE = 8190;
580
581 /*
582 * Amount of signal data currently in m_data buffer.
583 * Read/written by producer, read by consumer.
584 */
585 Uint32 m_len;
586 /*
587 * Whether this buffer contained prio A or prio B signals, used when dumping
588 * signals from released buffers.
589 */
590 Uint32 m_prioa;
591 union {
592 Uint32 m_data[SIZE];
593
594 thr_job_buffer * m_next; // For free-list
595 };
596 };
597
598 static
599 inline
600 Uint32
calc_fifo_used(Uint32 ri,Uint32 wi,Uint32 sz)601 calc_fifo_used(Uint32 ri, Uint32 wi, Uint32 sz)
602 {
603 return (wi >= ri) ? wi - ri : (sz - ri) + wi;
604 }
605
606 /**
607 * thr_job_queue is shared between consumer / producer.
608 *
609 * The hot-spot of the thr_job_queue are the read/write indexes.
610 * As they are updated and read frequently they have been placed
611 * in its own thr_job_queue_head[] in order to make them fit inside a
612 * single/few cache lines and thereby avoid complete L1-cache replacement
613 * every time the job_queue is scanned.
614 */
615 struct thr_job_queue_head
616 {
617 unsigned m_read_index; // Read/written by consumer, read by producer
618 unsigned m_write_index; // Read/written by producer, read by consumer
619
620 /**
621 * Waiter object: In case job queue is full, the produced thread
622 * will 'yield' on this waiter object until the consumer thread
623 * has consumed (at least) a job buffer.
624 */
625 thr_wait m_waiter;
626
627 Uint32 used() const;
628 };
629
630 struct thr_job_queue
631 {
632 static const unsigned SIZE = 32;
633
634 /**
635 * There is a SAFETY limit on free buffers we never allocate,
636 * but may allow these to be implicitly used as a last resort
637 * when job scheduler is really stuck. ('sleeploop 10')
638 */
639 static const unsigned SAFETY = 2;
640
641 /**
642 * Some more free buffers are RESERVED to be used to avoid
643 * or resolve circular wait-locks between threads waiting
644 * for buffers to become available.
645 */
646 static const unsigned RESERVED = 4;
647
648 /**
649 * When free buffer count drops below ALMOST_FULL, we
650 * are allowed to start using RESERVED buffers to prevent
651 * circular wait-locks.
652 */
653 static const unsigned ALMOST_FULL = RESERVED + 2;
654
655 struct thr_job_buffer* m_buffers[SIZE];
656 };
657
658 inline
659 Uint32
used() const660 thr_job_queue_head::used() const
661 {
662 return calc_fifo_used(m_read_index, m_write_index, thr_job_queue::SIZE);
663 }
664
665 /*
666 * Two structures tightly associated with thr_job_queue.
667 *
668 * There will generally be exactly one thr_jb_read_state and one
669 * thr_jb_write_state associated with each thr_job_queue.
670 *
671 * The reason they are kept separate is to avoid unnecessary inter-CPU
672 * cache line pollution. All fields shared among producer and consumer
673 * threads are in thr_job_queue, thr_jb_write_state fields are only
674 * accessed by the producer thread(s), and thr_jb_read_state fields are
675 * only accessed by the consumer thread.
676 *
677 * For example, on Intel core 2 quad processors, there is a ~33%
678 * penalty for two cores accessing the same 64-byte cacheline.
679 */
680 struct thr_jb_write_state
681 {
682 /*
683 * The position to insert the next signal into the queue.
684 *
685 * m_write_index is the index into thr_job_queue::m_buffers[] of the buffer
686 * to insert into, and m_write_pos is the index into thr_job_buffer::m_data[]
687 * at which to store the next signal.
688 */
689 Uint32 m_write_index;
690 Uint32 m_write_pos;
691
692 /* Thread-local copy of thr_job_queue::m_buffers[m_write_index]. */
693 thr_job_buffer *m_write_buffer;
694
695 /**
696 Number of signals inserted since last flush to thr_job_queue.
697 This variable stores the number of pending signals not yet flushed
698 in the lower 16 bits and the number of pending signals before a
699 wakeup is called of the other side in the upper 16 bits. To
700 simplify the code we implement the bit manipulations in the
701 methods below.
702
703 The reason for this optimisation is to minimise use of memory for
704 these variables as they are likely to consume CPU cache memory.
705 It also speeds up some pending signal checks.
706 */
707 Uint32 m_pending_signals;
708
has_any_pending_signalsthr_jb_write_state709 bool has_any_pending_signals() const
710 {
711 return m_pending_signals;
712 }
get_pending_signalsthr_jb_write_state713 Uint32 get_pending_signals() const
714 {
715 return (m_pending_signals & 0xFFFF);
716 }
get_pending_signals_wakeupthr_jb_write_state717 Uint32 get_pending_signals_wakeup() const
718 {
719 return (m_pending_signals >> 16);
720 }
clear_pending_signals_and_set_wakeupthr_jb_write_state721 void clear_pending_signals_and_set_wakeup(Uint32 wakeups)
722 {
723 m_pending_signals = (wakeups << 16);
724 }
increment_pending_signalsthr_jb_write_state725 void increment_pending_signals()
726 {
727 m_pending_signals++;
728 }
init_pending_signalsthr_jb_write_state729 void init_pending_signals()
730 {
731 m_pending_signals = 0;
732 }
733
734 /*
735 * Is this job buffer open for communication at all?
736 * Several threads are not expected to communicate, and thus does
737 * not allocate thr_job_buffer for exchange of signals.
738 * Don't access any job_buffers without ensuring 'is_open()==true'.
739 */
is_openthr_jb_write_state740 bool is_open() const
741 {
742 return (m_write_buffer != NULL);
743 }
744 };
745
746 /*
747 * This structure is also used when dumping signal traces, to dump executed
748 * signals from the buffer(s) currently being processed.
749 */
750 struct thr_jb_read_state
751 {
752 /*
753 * Index into thr_job_queue::m_buffers[] of the buffer that we are currently
754 * executing signals from.
755 */
756 Uint32 m_read_index;
757 /*
758 * Index into m_read_buffer->m_data[] of the next signal to execute from the
759 * current buffer.
760 */
761 Uint32 m_read_pos;
762 /*
763 * Thread local copy of thr_job_queue::m_buffers[m_read_index].
764 */
765 thr_job_buffer *m_read_buffer;
766 /*
767 * These are thread-local copies of thr_job_queue::m_write_index and
768 * thr_job_buffer::m_len. They are read once at the start of the signal
769 * execution loop and used to determine when the end of available signals is
770 * reached.
771 */
772 Uint32 m_read_end; // End within current thr_job_buffer. (*m_read_buffer)
773
774 Uint32 m_write_index; // Last available thr_job_buffer.
775
776 /*
777 * Is this job buffer open for communication at all?
778 * Several threads are not expected to communicate, and thus does
779 * not allocate thr_job_buffer for exchange of signals.
780 * Don't access any job_buffers without ensuring 'is_open()==true'.
781 */
is_openthr_jb_read_state782 bool is_open() const
783 {
784 return (m_read_buffer != NULL);
785 }
786
is_emptythr_jb_read_state787 bool is_empty() const
788 {
789 assert(m_read_index != m_write_index || m_read_pos <= m_read_end);
790 return (m_read_index == m_write_index) && (m_read_pos >= m_read_end);
791 }
792 };
793
794 /**
795 * time-queue
796 */
797 struct thr_tq
798 {
799 static const unsigned ZQ_SIZE = 256;
800 static const unsigned SQ_SIZE = 512;
801 static const unsigned LQ_SIZE = 512;
802 static const unsigned PAGES = (MAX_SIGNAL_SIZE *
803 (ZQ_SIZE + SQ_SIZE + LQ_SIZE)) / 8192;
804
805 Uint32 * m_delayed_signals[PAGES];
806 Uint32 m_next_free;
807 Uint32 m_next_timer;
808 Uint32 m_current_time;
809 Uint32 m_cnt[3];
810 Uint32 m_zero_queue[ZQ_SIZE];
811 Uint32 m_short_queue[SQ_SIZE];
812 Uint32 m_long_queue[LQ_SIZE];
813 };
814
815 /**
816 * THR_SEND_BUFFER_ALLOC_SIZE is the amount of 32k pages allocated
817 * when we allocate pages from the global pool of send buffers to
818 * the thread_local_pool (which is local to a thread).
819 *
820 * We allocate a bunch to decrease contention on send-buffer-pool-mutex
821 */
822 #define THR_SEND_BUFFER_ALLOC_SIZE 32
823
824 /**
825 * THR_SEND_BUFFER_PRE_ALLOC is the amout of 32k pages that are
826 * allocated before we start to run signals
827 */
828 #define THR_SEND_BUFFER_PRE_ALLOC 32
829
830 /**
831 * Amount of pages that is allowed to linger in a
832 * thread-local send-buffer pool
833 */
834 #define THR_SEND_BUFFER_MAX_FREE \
835 (THR_SEND_BUFFER_ALLOC_SIZE + THR_SEND_BUFFER_PRE_ALLOC - 1)
836
837 /*
838 * Max number of thread-local job buffers to keep before releasing to
839 * global pool.
840 */
841 #define THR_FREE_BUF_MAX 32
842 /* Minimum number of buffers (to ensure useful trace dumps). */
843 #define THR_FREE_BUF_MIN 12
844 /*
845 * 1/THR_FREE_BUF_BATCH is the fraction of job buffers to allocate/free
846 * at a time from/to global pool.
847 */
848 #define THR_FREE_BUF_BATCH 6
849
850 /**
851 * a page with send data
852 */
853 struct thr_send_page
854 {
855 static const Uint32 PGSIZE = 32768;
856 #if SIZEOF_CHARP == 4
857 static const Uint32 HEADER_SIZE = 8;
858 #else
859 static const Uint32 HEADER_SIZE = 12;
860 #endif
861
max_bytesthr_send_page862 static Uint32 max_bytes() {
863 return PGSIZE - offsetof(thr_send_page, m_data);
864 }
865
866 /* Next page */
867 thr_send_page* m_next;
868
869 /* Bytes of send data available in this page. */
870 Uint16 m_bytes;
871
872 /* Start of unsent data */
873 Uint16 m_start;
874
875 /* Data; real size is to the end of one page. */
876 char m_data[2];
877 };
878
879 /**
880 * a linked list with thr_send_page
881 */
882 struct thr_send_buffer
883 {
884 thr_send_page* m_first_page;
885 thr_send_page* m_last_page;
886 };
887
888 /**
889 * a ring buffer with linked list of thr_send_page
890 */
891 struct thr_send_queue
892 {
893 unsigned m_write_index;
894 #if SIZEOF_CHARP == 8
895 unsigned m_unused;
896 thr_send_page* m_buffers[7];
897 static const unsigned SIZE = 7;
898 #else
899 thr_send_page* m_buffers[15];
900 static const unsigned SIZE = 15;
901 #endif
902 };
903
MY_ALIGNED(NDB_CL)904 struct MY_ALIGNED(NDB_CL) thr_data
905 {
906 thr_data() : m_jba_write_lock("jbalock"),
907 m_signal_id_counter(0),
908 m_send_buffer_pool(0,
909 THR_SEND_BUFFER_MAX_FREE,
910 THR_SEND_BUFFER_ALLOC_SIZE) {
911
912 // Check cacheline allignment
913 assert((((UintPtr)this) % NDB_CL) == 0);
914 assert((((UintPtr)&m_waiter) % NDB_CL) == 0);
915 assert((((UintPtr)&m_jba_write_lock) % NDB_CL) == 0);
916 assert((((UintPtr)&m_jba) % NDB_CL) == 0);
917 assert((((UintPtr)m_in_queue_head) % NDB_CL) == 0);
918 assert((((UintPtr)m_in_queue) % NDB_CL) == 0);
919 }
920
921 /**
922 * We start with the data structures that are shared globally to
923 * ensure that they get the proper cache line alignment
924 */
925 thr_wait m_waiter; /* Cacheline aligned*/
926
927 /*
928 * Prio A signal incoming queue. This area is used from many threads
929 * protected by the spin lock. Thus it is also important to protect
930 * surrounding thread-local variables from CPU cache line sharing
931 * with this part.
932 */
933 MY_ALIGNED(NDB_CL) struct thr_spin_lock m_jba_write_lock;
934 MY_ALIGNED(NDB_CL) struct thr_job_queue m_jba;
935 struct thr_job_queue_head m_jba_head;
936
937 /*
938 * These are the thread input queues, where other threads deliver signals
939 * into.
940 * These cache lines are going to be updated by many different CPU's
941 * all the time whereas other neighbour variables are thread-local variables.
942 * Avoid false cacheline sharing by require an alignment.
943 */
944 MY_ALIGNED(NDB_CL) struct thr_job_queue_head m_in_queue_head[MAX_BLOCK_THREADS];
945 MY_ALIGNED(NDB_CL) struct thr_job_queue m_in_queue[MAX_BLOCK_THREADS];
946
947 /**
948 * The remainder of the variables in thr_data are thread-local,
949 * meaning that they are always updated by the thread that owns those
950 * data structures and thus those variables aren't shared with other
951 * CPUs.
952 */
953
954 unsigned m_thr_no;
955
956 /**
957 * Spin time of thread after completing all its work (in microseconds).
958 * We won't go to sleep until we have spun for sufficient time, the aim
959 * is to increase readiness in systems with much CPU resources
960 */
961 unsigned m_spintime;
962
963 /**
964 * Realtime scheduler activated for this thread. This means this
965 * thread will run at a very high priority even beyond the priority
966 * of the OS.
967 */
968 unsigned m_realtime;
969
970 /**
971 * Index of thread locally in Configuration.cpp
972 */
973 unsigned m_thr_index;
974
975 /**
976 * max signals to execute per JBB buffer
977 */
978 unsigned m_max_signals_per_jb;
979
980 /**
981 * Extra JBB signal execute quota allowed to be used to
982 * drain (almost) full in-buffers. Reserved for usage where
983 * we are about to end up in a circular wait-lock between
984 * threads where none if them will be able to proceed.
985 */
986 unsigned m_max_extra_signals;
987
988 /**
989 * max signals to execute before recomputing m_max_signals_per_jb
990 */
991 unsigned m_max_exec_signals;
992
993 /**
994 * Flag indicating that we have sent a local Prio A signal. Used to know
995 * if to scan for more prio A signals after executing those signals.
996 * This is used to ensure that if we execute at prio A level and send a
997 * prio A signal it will be immediately executed (or at least before any
998 * prio B signal).
999 */
1000 bool m_sent_local_prioa_signal;
1001
1002 NDB_TICKS m_ticks;
1003 struct thr_tq m_tq;
1004
1005 /*
1006 * In m_next_buffer we keep a free buffer at all times, so that when
1007 * we hold the lock and find we need a new buffer, we can use this and this
1008 * way defer allocation to after releasing the lock.
1009 */
1010 struct thr_job_buffer* m_next_buffer;
1011
1012 /*
1013 * We keep a small number of buffers in a thread-local cyclic FIFO, so that
1014 * we can avoid going to the global pool in most cases, and so that we have
1015 * recent buffers available for dumping in trace files.
1016 */
1017 struct thr_job_buffer *m_free_fifo[THR_FREE_BUF_MAX];
1018 /* m_first_free is the index of the entry to return next from seize(). */
1019 Uint32 m_first_free;
1020 /* m_first_unused is the first unused entry in m_free_fifo. */
1021 Uint32 m_first_unused;
1022
1023
1024 /* Thread-local read state of prio A buffer. */
1025 struct thr_jb_read_state m_jba_read_state;
1026
1027 /*
1028 * There is no m_jba_write_state, as we have multiple writers to the prio A
1029 * queue, so local state becomes invalid as soon as we release the lock.
1030 */
1031
1032 /* These are the write states of m_in_queue[self] in each thread. */
1033 struct thr_jb_write_state m_write_states[MAX_BLOCK_THREADS];
1034 /* These are the read states of all of our own m_in_queue[]. */
1035 struct thr_jb_read_state m_read_states[MAX_BLOCK_THREADS];
1036
1037 /* Jam buffers for making trace files at crashes. */
1038 EmulatedJamBuffer m_jam;
1039 /* Watchdog counter for this thread. */
1040 Uint32 m_watchdog_counter;
1041 /* Latest executed signal id assigned in this thread */
1042 Uint32 m_signal_id_counter;
1043
1044 /* Signal delivery statistics. */
1045 struct
1046 {
1047 Uint64 m_loop_cnt;
1048 Uint64 m_exec_cnt;
1049 Uint64 m_wait_cnt;
1050 Uint64 m_prioa_count;
1051 Uint64 m_prioa_size;
1052 Uint64 m_priob_count;
1053 Uint64 m_priob_size;
1054 } m_stat;
1055
1056 /* Array of node ids with pending remote send data. */
1057 Uint8 m_pending_send_nodes[MAX_NTRANSPORTERS];
1058 /* Number of node ids in m_pending_send_nodes. */
1059 Uint32 m_pending_send_count;
1060
1061 /**
1062 * Bitmap of pending node ids with send data.
1063 * Used to quickly check if a node id is already in m_pending_send_nodes.
1064 */
1065 Bitmask<(MAX_NTRANSPORTERS+31)/32> m_pending_send_mask;
1066
1067 /* pool for send buffers */
1068 class thread_local_pool<thr_send_page> m_send_buffer_pool;
1069
1070 /* Send buffer for this thread, these are not touched by any other thread */
1071 struct thr_send_buffer m_send_buffers[MAX_NTRANSPORTERS];
1072
1073 /* Block instances (main and worker) handled by this thread. */
1074 /* Used for sendpacked (send-at-job-buffer-end). */
1075 Uint32 m_instance_count;
1076 BlockNumber m_instance_list[MAX_INSTANCES_PER_THREAD];
1077
1078 SectionSegmentPool::Cache m_sectionPoolCache;
1079
1080 Uint32 m_cpu;
1081 my_thread_t m_thr_id;
1082 NdbThread* m_thread;
1083 };
1084
1085 struct mt_send_handle : public TransporterSendBufferHandle
1086 {
1087 struct thr_data * m_selfptr;
mt_send_handlemt_send_handle1088 mt_send_handle(thr_data* ptr) : m_selfptr(ptr) {}
~mt_send_handlemt_send_handle1089 virtual ~mt_send_handle() {}
1090
1091 virtual Uint32 *getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max);
1092 virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
1093 virtual void getSendBufferLevel(NodeId node, SB_LevelType &level);
1094 virtual bool forceSend(NodeId node);
1095 };
1096
1097 struct trp_callback : public TransporterCallback
1098 {
trp_callbacktrp_callback1099 trp_callback() {}
1100
1101 /* Callback interface. */
1102 void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
1103 void lock_transporter(NodeId node);
1104 void unlock_transporter(NodeId node);
1105 Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
1106 Uint32 bytes_sent(NodeId node, Uint32 bytes);
1107 bool has_data_to_send(NodeId node);
1108 void reset_send_buffer(NodeId node, bool should_be_empty);
1109 };
1110
1111 static char *g_thr_repository_mem = NULL;
1112 static struct thr_repository *g_thr_repository = NULL;
1113
1114 struct thr_repository
1115 {
thr_repositorythr_repository1116 thr_repository()
1117 : m_section_lock("sectionlock"),
1118 m_mem_manager_lock("memmanagerlock"),
1119 m_jb_pool("jobbufferpool"),
1120 m_sb_pool("sendbufferpool")
1121 {
1122 // Verify assumed cacheline allignment
1123 assert((((UintPtr)this) % NDB_CL) == 0);
1124 assert((((UintPtr)&m_receive_lock) % NDB_CL) == 0);
1125 assert((((UintPtr)&m_section_lock) % NDB_CL) == 0);
1126 assert((((UintPtr)&m_mem_manager_lock) % NDB_CL) == 0);
1127 assert((((UintPtr)&m_jb_pool) % NDB_CL) == 0);
1128 assert((((UintPtr)&m_sb_pool) % NDB_CL) == 0);
1129 assert((((UintPtr)m_thread) % NDB_CL) == 0);
1130 assert((sizeof(m_receive_lock[0]) % NDB_CL) == 0);
1131 }
1132
1133 /**
1134 * m_receive_lock, m_section_lock, m_mem_manager_lock, m_jb_pool
1135 * and m_sb_pool are all variables globally shared among the threads
1136 * and also heavily updated.
1137 * Requiring alignments avoid false cache line sharing.
1138 */
1139 MY_ALIGNED(NDB_CL)
1140 struct MY_ALIGNED(NDB_CL) aligned_locks : public thr_spin_lock
1141 {
1142 } m_receive_lock[MAX_NDBMT_RECEIVE_THREADS];
1143
1144 MY_ALIGNED(NDB_CL) struct thr_spin_lock m_section_lock;
1145 MY_ALIGNED(NDB_CL) struct thr_spin_lock m_mem_manager_lock;
1146 MY_ALIGNED(NDB_CL) struct thr_safe_pool<thr_job_buffer> m_jb_pool;
1147 MY_ALIGNED(NDB_CL) struct thr_safe_pool<thr_send_page> m_sb_pool;
1148
1149 /* m_mm and m_thread_count are globally shared and read only variables */
1150 Ndbd_mem_manager * m_mm;
1151 unsigned m_thread_count;
1152 /**
1153 * Protect m_mm and m_thread_count from CPU cache misses, first
1154 * part of m_thread (struct thr_data) is globally shared variables.
1155 * So sharing cache line with these for these read only variables
1156 * isn't a good idea
1157 */
1158 MY_ALIGNED(NDB_CL) struct thr_data m_thread[MAX_BLOCK_THREADS];
1159
1160 /* The buffers that are to be sent */
1161 struct send_buffer
1162 {
1163 /**
1164 * In order to reduce lock contention while
1165 * adding job buffer pages to the send buffers,
1166 * and sending these with the help of the send
1167 * transporters, there are two different
1168 * thr_send_buffer's. Each protected by its own lock:
1169 *
1170 * - m_buffer / m_buffer_lock:
1171 * Send buffer pages from all threads are linked into
1172 * the m_buffer when collected by link_thread_send_buffers().
1173 *
1174 * - m_sending / m_send_lock:
1175 * Before send buffers are given to the send-transporter,
1176 * they are moved from m_buffer -> m_sending by
1177 * get_bytes_to_send_iovec(). (Req. both locks.)
1178 * When transporter has consumed some/all of m_sending
1179 * buffers, ::bytes_sent() will update m_sending accordingly.
1180 *
1181 * If both locks are required, grab the m_send_lock first.
1182 * Release m_buffer_lock before releasing m_send_lock.
1183 */
1184 struct thr_spin_lock m_buffer_lock; //Protect m_buffer
1185 struct thr_send_buffer m_buffer;
1186
1187 struct thr_spin_lock m_send_lock; //Protect m_sending + transporter
1188 struct thr_send_buffer m_sending;
1189
1190 Uint64 m_node_total_send_buffer_size; //Protected by m_buffer_lock
1191 /**
1192 * Flag used to coordinate sending to same remote node from different
1193 * threads when there are contention on m_send_lock.
1194 *
1195 * If two threads need to send to the same node at the same time, the
1196 * second thread, rather than wait for the first to finish, will just
1197 * set this flag. The first thread will will then take responsibility
1198 * for sending to this node when done with its own sending.
1199 */
1200 Uint32 m_force_send; //Check after release of m_send_lock
1201
1202 /**
1203 * Which thread is currently holding the m_send_lock
1204 */
1205 Uint32 m_send_thread; //Protected by m_send_lock
1206
1207 /**
1208 * Bytes sent in last performSend().
1209 */
1210 Uint32 m_bytes_sent;
1211
1212 /* read index(es) in thr_send_queue */
1213 Uint32 m_read_index[MAX_BLOCK_THREADS];
1214 } m_send_buffers[MAX_NTRANSPORTERS];
1215
1216 /* The buffers published by threads */
1217 thr_send_queue m_thread_send_buffers[MAX_NTRANSPORTERS][MAX_BLOCK_THREADS];
1218
1219 /*
1220 * These are used to synchronize during crash / trace dumps.
1221 *
1222 */
1223 NdbMutex stop_for_crash_mutex;
1224 NdbCondition stop_for_crash_cond;
1225 Uint32 stopped_threads;
1226 };
1227
1228 /**
1229 * Class to handle send threads
1230 * ----------------------------
1231 * We can have up to 8 send threads.
1232 *
1233 * This class will handle when a block thread needs to send, it will
1234 * handle the running of the send thread and will also start the
1235 * send thread.
1236 */
1237 #define is_send_thread(thr_no) (thr_no >= num_threads)
1238
1239 struct thr_send_thread_instance
1240 {
thr_send_thread_instancethr_send_thread_instance1241 thr_send_thread_instance() :
1242 m_instance_no(0),
1243 m_watchdog_counter(0),
1244 m_awake(FALSE),
1245 m_thread(NULL),
1246 m_waiter_struct(),
1247 m_send_buffer_pool(0,
1248 THR_SEND_BUFFER_MAX_FREE,
1249 THR_SEND_BUFFER_ALLOC_SIZE)
1250 {}
1251 Uint32 m_instance_no;
1252 Uint32 m_watchdog_counter;
1253 Uint32 m_awake;
1254 Uint32 m_thr_index;
1255 NdbThread *m_thread;
1256 thr_wait m_waiter_struct;
1257 class thread_local_pool<thr_send_page> m_send_buffer_pool;
1258 };
1259
1260 struct thr_send_nodes
1261 {
1262 /**
1263 * 'm_next' implements a list of 'send_nodes' with PENDING'
1264 * data, not yet assigned to a send thread. 0 means NULL.
1265 */
1266 Uint16 m_next;
1267
1268 /**
1269 * m_data_available are incremented/decremented by each
1270 * party having data to be sent to this specific node.
1271 * It work in conjunction with a queue of get'able nodes
1272 * (insert_node(), get_node()) waiting to be served by
1273 * the send threads, such that:
1274 *
1275 * 1) IDLE-state (m_data_available==0, not in list)
1276 * There are no data available for sending, and
1277 * no send threads are assigned to this node.
1278 *
1279 * 2) PENDING-state (m_data_available>0, in list)
1280 * There are data available for sending, possibly
1281 * supplied by multiple parties. No send threads
1282 * are currently serving this request.
1283 *
1284 * 3) ACTIVE-state (m_data_available==1, not in list)
1285 * There are data available for sending, possibly
1286 * supplied by multiple parties, which are currently
1287 * being served by a send thread. All known
1288 * data available at the time when we became 'ACTIVE'
1289 * will be served now ( -> '==1')
1290 *
1291 * 3b ACTIVE-WITH-PENDING-state (m_data_available>1, not in list)
1292 * Variant of above state, send thread is serving requests,
1293 * and even more data became available since we started.
1294 *
1295 * Allowed state transitions are:
1296 *
1297 * IDLE -> PENDING (alert_send_thread w/ insert_node)
1298 * PENDING -> ACTIVE (get_node)
1299 * ACTIVE -> IDLE (run_send_thread if check_done_node)
1300 * ACTIVE -> PENDING (run_send_thread if 'more'
1301 * ACTIVE -> ACTIVE-P (alert_send_thread while ACTIVE)
1302 * ACTIVE-P -> PENDING (run_send_thread while not check_done_node)
1303 * ACTIVE-P -> ACTIVE-P (alert_send_thread while ACTIVE-P)
1304 *
1305 * A consequence of this, is that only a (single-) ACTIVE
1306 * send thread will serve send request to a specific node.
1307 * Thus, there will be no contention on the m_send_lock
1308 * caused by the send threads.
1309 */
1310 Uint16 m_data_available;
1311
1312 /**
1313 * m_send_thread is the current/last send thread instance
1314 * serving this send_node. Whenever possible we try to
1315 * reuse the same thread next time around to avoid
1316 * switching between CPUs.
1317 */
1318 Uint16 m_send_thread;
1319
1320 /* Send to this node has caused a Transporter overload */
1321 Uint16 m_send_overload;
1322
1323 /**
1324 * Further sending to this node should be delayed until
1325 * 'm_micros_delayed' has passed since 'm_inserted_time'.
1326 */
1327 Uint32 m_micros_delayed;
1328 NDB_TICKS m_inserted_time;
1329 };
1330
1331 class thr_send_threads
1332 {
1333 public:
1334 /* Create send thread environment */
1335 thr_send_threads();
1336
1337 /* Destroy send thread environment and ensure threads are stopped */
1338 ~thr_send_threads();
1339
1340 /* A block thread has flushed data for a node and wants it sent */
1341 void alert_send_thread(NodeId node, NDB_TICKS now);
1342
1343 /* Method used to run the send thread */
1344 void run_send_thread(Uint32 instance_no);
1345
1346 /* Method to start the send threads */
1347 void start_send_threads();
1348
1349 /**
1350 * Check if a node possibly is having data ready to be sent.
1351 * Upon 'true', callee should grab send_thread_mutex and
1352 * try to get_node() while holding lock.
1353 */
data_available() const1354 bool data_available() const
1355 {
1356 rmb();
1357 return (m_more_nodes == TRUE);
1358 }
1359
1360 /* Get send buffer pool for send thread */
get_send_buffer_pool(Uint32 thr_no)1361 thread_local_pool<thr_send_page>* get_send_buffer_pool(Uint32 thr_no)
1362 {
1363 return &m_send_threads[thr_no - num_threads].m_send_buffer_pool;
1364 }
1365
1366 private:
1367 /* Insert a node in list of nodes that has data available to send */
1368 void insert_node(NodeId node);
1369
1370 /* Get a node from the list in order to send to it */
1371 NodeId get_node(Uint32 instance_no, NDB_TICKS now);
1372
1373 /**
1374 * Set of utility methods to aid in scheduling of send work:
1375 *
1376 * Further sending to node can be delayed
1377 * until 'now+delay'. Used either to wait for more packets
1378 * to be available for bigger chunks, or to wait for an overload
1379 * situation to clear.
1380 */
1381 void set_max_delay(NodeId node, NDB_TICKS now, Uint32 delay_usec);
1382 void set_overload_delay(NodeId node, NDB_TICKS now, Uint32 delay_usec);
1383 Uint32 check_delay_expired(NodeId node, NDB_TICKS now);
1384
1385 /* Completed sending data to this node, check if more work pending. */
1386 bool check_done_node(NodeId node);
1387
1388 /* Get a send thread which isn't awake currently */
1389 struct thr_send_thread_instance* get_not_awake_send_thread(NodeId node);
1390 Uint32 count_awake_send_threads(void) const;
1391
1392 /* Try to lock send_buffer for this node. */
1393 static
1394 int trylock_send_node(NodeId node);
1395
1396 /* Perform the actual send to the node, release send_buffer lock.
1397 * Return 'true' if there are still more to be sent to this node.
1398 */
1399 static
1400 bool perform_send(NodeId node, Uint32 instance_no, Uint32& bytes_sent);
1401
1402 /* Have threads been started */
1403 Uint32 m_started_threads;
1404
1405 /* First node that has data to be sent */
1406 Uint32 m_first_node;
1407
1408 /* Last node in list of nodes with data available for sending */
1409 Uint32 m_last_node;
1410
1411 /* 'true': More nodes became available -> Need recheck ::get_node() */
1412 bool m_more_nodes;
1413
1414 /* Is data available and next reference for each node in cluster */
1415 struct thr_send_nodes m_node_state[MAX_NODES];
1416
1417 /**
1418 * Very few compiler (gcc) allow zero length arrays
1419 */
1420 #if MAX_NDBMT_SEND_THREADS == 0
1421 #define _MAX_SEND_THREADS 1
1422 #else
1423 #define _MAX_SEND_THREADS MAX_NDBMT_SEND_THREADS
1424 #endif
1425
1426 /* Data and state for the send threads */
1427 struct thr_send_thread_instance m_send_threads[_MAX_SEND_THREADS];
1428
1429 /**
1430 * Mutex protecting the linked list of nodes awaiting sending
1431 * and also the not_awake variable of the send thread.
1432 */
1433 NdbMutex *send_thread_mutex;
1434 };
1435
1436
1437 /*
1438 * The single instance of the thr_send_threads class, if this variable
1439 * is non-NULL, then we're using send threads, otherwise if NULL, there
1440 * are no send threads.
1441 */
1442 static thr_send_threads *g_send_threads = NULL;
1443
1444 extern "C"
1445 void *
mt_send_thread_main(void * thr_arg)1446 mt_send_thread_main(void *thr_arg)
1447 {
1448 struct thr_send_thread_instance *this_send_thread =
1449 (thr_send_thread_instance*)thr_arg;
1450
1451 Uint32 instance_no = this_send_thread->m_instance_no;
1452 g_send_threads->run_send_thread(instance_no);
1453 return NULL;
1454 }
1455
thr_send_threads()1456 thr_send_threads::thr_send_threads()
1457 : m_started_threads(FALSE),
1458 m_first_node(0),
1459 m_last_node(0),
1460 m_more_nodes(false),
1461 send_thread_mutex(NULL)
1462 {
1463 struct thr_repository *rep = g_thr_repository;
1464
1465 for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_node_state); i++)
1466 {
1467 m_node_state[i].m_next = 0;
1468 m_node_state[i].m_data_available = 0;
1469 m_node_state[i].m_send_thread = 0;
1470 m_node_state[i].m_send_overload = FALSE;
1471 m_node_state[i].m_micros_delayed = 0;
1472 NdbTick_Invalidate(&m_node_state[i].m_inserted_time);
1473 }
1474 for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_send_threads); i++)
1475 {
1476 m_send_threads[i].m_waiter_struct.init();
1477 m_send_threads[i].m_instance_no = i;
1478 m_send_threads[i].m_send_buffer_pool.set_pool(&rep->m_sb_pool);
1479 }
1480 send_thread_mutex = NdbMutex_Create();
1481 }
1482
~thr_send_threads()1483 thr_send_threads::~thr_send_threads()
1484 {
1485 if (!m_started_threads)
1486 return;
1487
1488 for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++)
1489 {
1490 void *dummy_return_status;
1491
1492 /* Ensure thread is woken up to die */
1493 wakeup(&(m_send_threads[i].m_waiter_struct));
1494 NdbThread_WaitFor(m_send_threads[i].m_thread, &dummy_return_status);
1495 globalEmulatorData.theConfiguration->removeThread(
1496 m_send_threads[i].m_thread);
1497 NdbThread_Destroy(&(m_send_threads[i].m_thread));
1498 }
1499 }
1500
1501 void
start_send_threads()1502 thr_send_threads::start_send_threads()
1503 {
1504 for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++)
1505 {
1506 m_send_threads[i].m_thread =
1507 NdbThread_Create(mt_send_thread_main,
1508 (void **)&m_send_threads[i],
1509 1024*1024,
1510 "send thread", //ToDo add number
1511 NDB_THREAD_PRIO_MEAN);
1512 m_send_threads[i].m_thr_index =
1513 globalEmulatorData.theConfiguration->addThread(
1514 m_send_threads[i].m_thread,
1515 SendThread);
1516 }
1517 m_started_threads = TRUE;
1518 }
1519
1520 /* Called under mutex protection of send_thread_mutex */
1521 void
insert_node(NodeId node)1522 thr_send_threads::insert_node(NodeId node)
1523 {
1524 Uint8 last_node = m_last_node;
1525 Uint8 first_node = m_first_node;
1526 struct thr_send_nodes &last_node_state = m_node_state[last_node];
1527 struct thr_send_nodes &node_state = m_node_state[node];
1528
1529 assert(node_state.m_data_available > 0);
1530 node_state.m_next = 0;
1531
1532 m_more_nodes = true;
1533 /* Ensure the lock free ::data_available see 'm_more_nodes == TRUE' */
1534 wmb();
1535
1536 m_last_node = node;
1537 if (first_node == 0)
1538 m_first_node = node;
1539 else
1540 last_node_state.m_next = node;
1541 }
1542
1543 /* Called under mutex protection of send_thread_mutex */
1544 void
set_max_delay(NodeId node,NDB_TICKS now,Uint32 delay_usec)1545 thr_send_threads::set_max_delay(NodeId node, NDB_TICKS now, Uint32 delay_usec)
1546 {
1547 struct thr_send_nodes &node_state = m_node_state[node];
1548 assert(node_state.m_data_available > 0);
1549 assert(!node_state.m_send_overload);
1550
1551 node_state.m_micros_delayed = delay_usec;
1552 node_state.m_inserted_time = now;
1553 }
1554
1555 /* Called under mutex protection of send_thread_mutex */
1556 void
set_overload_delay(NodeId node,NDB_TICKS now,Uint32 delay_usec)1557 thr_send_threads::set_overload_delay(NodeId node, NDB_TICKS now, Uint32 delay_usec)
1558 {
1559 struct thr_send_nodes &node_state = m_node_state[node];
1560 assert(node_state.m_data_available > 0);
1561 node_state.m_send_overload = TRUE;
1562 node_state.m_micros_delayed = delay_usec;
1563 node_state.m_inserted_time = now;
1564 }
1565
1566 /* Called under mutex protection of send_thread_mutex */
1567 Uint32
check_delay_expired(NodeId node,NDB_TICKS now)1568 thr_send_threads::check_delay_expired(NodeId node, NDB_TICKS now)
1569 {
1570 struct thr_send_nodes &node_state = m_node_state[node];
1571 assert(node_state.m_data_available > 0);
1572
1573 if (node_state.m_micros_delayed == 0)
1574 return 0;
1575
1576 const Uint64 micros_passed = NdbTick_Elapsed(node_state.m_inserted_time, now).microSec();
1577 if (micros_passed >= Uint64(node_state.m_micros_delayed)) //Expired
1578 {
1579 node_state.m_inserted_time = now;
1580 node_state.m_micros_delayed = 0;
1581 node_state.m_send_overload = FALSE;
1582 return 0;
1583 }
1584
1585 // Update and return remaining wait time
1586 node_state.m_inserted_time = now;
1587 node_state.m_micros_delayed -= micros_passed;
1588 return node_state.m_micros_delayed;
1589 }
1590
1591 /**
1592 * TODO RONM:
1593 * Add some more NDBINFO table to make it easier the workings
1594 * of the MaxSendDelay parameter.
1595 */
1596
1597 static Uint64 mt_get_send_buffer_bytes(NodeId node);
1598
1599 /**
1600 * MAX_SEND_BUFFER_SIZE_TO_DELAY is a heauristic constant that specifies
1601 * a send buffer size that will always be sent. The size of this is based
1602 * on experience that maximum performance of the send part is achieved at
1603 * around 64 kBytes of send buffer size and that the difference between
1604 * 20 kB and 64 kByte is small. So thus avoiding unnecessary delays that
1605 * gain no significant performance gain.
1606 */
1607 static const Uint64 MAX_SEND_BUFFER_SIZE_TO_DELAY = (20 * 1024);
1608
1609
1610 /**
1611 * Get a node having data to be sent to a node (returned).
1612 *
1613 * Sending could have been delayed, in such cases the node
1614 * to expire it delay first will be returned. It is then upto
1615 * the callee to either accept this node, or reinsert it
1616 * such that it can be returned and retried later.
1617 *
1618 * Called under mutex protection of send_thread_mutex
1619 */
1620 NodeId
get_node(Uint32 send_thread,NDB_TICKS now)1621 thr_send_threads::get_node(Uint32 send_thread, NDB_TICKS now)
1622 {
1623 Uint32 next;
1624 Uint32 prev = 0;
1625 Uint32 node = m_first_node;
1626 Uint32 delayed_node = 0;
1627
1628 if (!node)
1629 {
1630 m_more_nodes = false;
1631 return 0;
1632 }
1633
1634 /**
1635 * Search for a node ready to be sent to.
1636 * If none found, remember the one with the smallest delay.
1637 */
1638 Uint32 min_wait_usec = Uint32(~0);
1639 while (node)
1640 {
1641 next = m_node_state[node].m_next;
1642
1643 const Uint32 send_delay = check_delay_expired(node, now);
1644 if (likely(send_delay == 0))
1645 goto found;
1646
1647 /* Find remaining minimum wait: */
1648 if (min_wait_usec > send_delay)
1649 {
1650 min_wait_usec = send_delay;
1651 delayed_node = node;
1652 }
1653
1654 prev = node;
1655 node = next;
1656 }
1657
1658 // As 'm_first_node != 0', there has to be a 'delayed_node'
1659 assert(delayed_node != 0);
1660 m_more_nodes = false; // No more to execute without delays
1661
1662 // Relocate the delayed send node, return it */
1663 node = m_first_node;
1664 prev = 0;
1665 do
1666 {
1667 next = m_node_state[node].m_next;
1668 if (node == delayed_node)
1669 goto found;
1670
1671 prev = node;
1672 node = next;
1673 } while (node);
1674
1675 require(false); // Should never get here
1676
1677 found:
1678 struct thr_send_nodes &node_state = m_node_state[node];
1679 assert(node_state.m_data_available > 0);
1680 node_state.m_next = 0;
1681
1682 if (likely(node == m_first_node))
1683 m_first_node = next;
1684 else
1685 m_node_state[prev].m_next = next;
1686
1687 if (node == m_last_node)
1688 m_last_node = prev;
1689
1690 node_state.m_data_available = 1;
1691 node_state.m_send_thread = send_thread;
1692 return (NodeId)node;
1693 }
1694
1695 /* Called under mutex protection of send_thread_mutex */
1696 bool
check_done_node(NodeId node)1697 thr_send_threads::check_done_node(NodeId node)
1698 {
1699 struct thr_send_nodes &node_state = m_node_state[node];
1700 assert(node_state.m_data_available > 0);
1701 node_state.m_data_available--;
1702 return (node_state.m_data_available == 0);
1703 }
1704
1705 /* Called under mutex protection of send_thread_mutex */
1706 struct thr_send_thread_instance*
get_not_awake_send_thread(NodeId node)1707 thr_send_threads::get_not_awake_send_thread(NodeId node)
1708 {
1709 struct thr_send_thread_instance *used_send_thread;
1710
1711 /* Reuse previous send_thread if available */
1712 if (!m_send_threads[m_node_state[node].m_send_thread].m_awake)
1713 {
1714 used_send_thread= &m_send_threads[m_node_state[node].m_send_thread];
1715 return used_send_thread;
1716 }
1717
1718 /* Search for another available send thread */
1719 for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++)
1720 {
1721 if (!m_send_threads[i].m_awake)
1722 {
1723 used_send_thread= &m_send_threads[i];
1724 return used_send_thread;
1725 }
1726 }
1727 return NULL;
1728 }
1729
1730 Uint32
count_awake_send_threads() const1731 thr_send_threads::count_awake_send_threads() const
1732 {
1733 Uint32 count = 0;
1734 for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++)
1735 {
1736 if (m_send_threads[i].m_awake)
1737 {
1738 count++;
1739 }
1740 }
1741 return count;
1742 }
1743
1744 void
alert_send_thread(NodeId node,NDB_TICKS now)1745 thr_send_threads::alert_send_thread(NodeId node, NDB_TICKS now)
1746 {
1747 struct thr_send_nodes& node_state = m_node_state[node];
1748
1749 NdbMutex_Lock(send_thread_mutex);
1750 node_state.m_data_available++; // There is more to send
1751 if (node_state.m_data_available > 1)
1752 {
1753 /**
1754 * ACTIVE(_P) -> ACTIVE_P
1755 *
1756 * The node is already flagged that it has data needing to be sent.
1757 * There is no need to wake even more threads up in this case
1758 * since we piggyback on someone else's request.
1759 *
1760 * Waking another thread for sending to this node, had only
1761 * resulted in contention and blockage on the send_lock.
1762 *
1763 * We are safe that the buffers we have flushed will be read by a send
1764 * thread: They will either be piggybacked when the send thread
1765 * 'get_node()' for sending, or data will be available when
1766 * send thread 'check_done_node()', finds that more data has
1767 * become available. In the later case, the send thread will schedule
1768 * the node for another round with insert_node()
1769 */
1770 NdbMutex_Unlock(send_thread_mutex);
1771 return;
1772 }
1773 assert(!node_state.m_send_overload); // Caught above as ACTIVE
1774 insert_node(node); // IDLE -> PENDING
1775
1776 /**
1777 * We need to delay sending the data, as set in config.
1778 * This is the first send to this node, so we start the
1779 * delay timer now.
1780 */
1781 if (max_send_delay > 0) // Wait for more payload?
1782 {
1783 set_max_delay(node, now, max_send_delay);
1784 }
1785
1786 /*
1787 * Search for a send thread which is asleep, if there is one, wake it
1788 *
1789 * If everyone is already awake we don't need to wake anyone up since
1790 * the threads will check if there is nodes available to send to before
1791 * they go to sleep.
1792 *
1793 * The reason to look for anyone asleep is to ensure proper use of CPU
1794 * resources and ensure that we use all the send thread CPUs available
1795 * to our disposal when necessary.
1796 */
1797 struct thr_send_thread_instance *avail_send_thread
1798 = get_not_awake_send_thread(node);
1799
1800 NdbMutex_Unlock(send_thread_mutex);
1801
1802 if (avail_send_thread)
1803 {
1804 /*
1805 * Wake the assigned sleeping send thread, potentially a spurious wakeup,
1806 * but this is not a problem, important is to ensure that at least one
1807 * send thread is awoken to handle our request. If someone is already
1808 * awake and takes care of our request before we get to wake someone up it's
1809 * not a problem.
1810 */
1811 wakeup(&(avail_send_thread->m_waiter_struct));
1812 }
1813 }
1814
1815 static bool
check_available_send_data(struct thr_data * not_used)1816 check_available_send_data(struct thr_data *not_used)
1817 {
1818 (void)not_used;
1819 return !g_send_threads->data_available();
1820 }
1821
1822 //static
1823 int
trylock_send_node(NodeId node)1824 thr_send_threads::trylock_send_node(NodeId node)
1825 {
1826 thr_repository::send_buffer *sb = g_thr_repository->m_send_buffers+node;
1827 return trylock(&sb->m_send_lock);
1828 }
1829
1830 //static
1831 bool
perform_send(NodeId node,Uint32 instance_no,Uint32 & bytes_sent)1832 thr_send_threads::perform_send(NodeId node, Uint32 instance_no, Uint32& bytes_sent)
1833 {
1834 thr_repository::send_buffer * sb = g_thr_repository->m_send_buffers+node;
1835
1836 /**
1837 * Set m_send_thr so that our transporter callback can know which thread
1838 * holds the send lock for this remote node.
1839 */
1840 sb->m_send_thread = num_threads + instance_no;
1841 const bool more = globalTransporterRegistry.performSend(node);
1842 bytes_sent = sb->m_bytes_sent;
1843 sb->m_send_thread = NO_SEND_THREAD;
1844 unlock(&sb->m_send_lock);
1845 return more;
1846 }
1847
1848 static void
update_send_sched_config(THRConfigApplier & conf,unsigned instance_no,bool & real_time,Uint64 & spin_time)1849 update_send_sched_config(THRConfigApplier & conf,
1850 unsigned instance_no,
1851 bool & real_time,
1852 Uint64 & spin_time)
1853 {
1854 real_time = conf.do_get_realtime_send(instance_no);
1855 spin_time = (Uint64)conf.do_get_spintime_send(instance_no);
1856 }
1857
1858 static void
yield_rt_break(NdbThread * thread,enum ThreadTypes type,bool real_time)1859 yield_rt_break(NdbThread *thread,
1860 enum ThreadTypes type,
1861 bool real_time)
1862 {
1863 Configuration * conf = globalEmulatorData.theConfiguration;
1864 conf->setRealtimeScheduler(thread,
1865 type,
1866 FALSE,
1867 FALSE);
1868 conf->setRealtimeScheduler(thread,
1869 type,
1870 real_time,
1871 FALSE);
1872 }
1873
1874 static void
check_real_time_break(NDB_TICKS now,NDB_TICKS * yield_time,NdbThread * thread,enum ThreadTypes type)1875 check_real_time_break(NDB_TICKS now,
1876 NDB_TICKS *yield_time,
1877 NdbThread *thread,
1878 enum ThreadTypes type)
1879 {
1880 if (unlikely(NdbTick_Compare(now, *yield_time) < 0))
1881 {
1882 /**
1883 * Timer was adjusted backwards, or the monotonic timer implementation
1884 * on this platform is unstable. Best we can do is to restart
1885 * RT-yield timers from new current time.
1886 */
1887 *yield_time = now;
1888 }
1889
1890 const Uint64 micros_passed =
1891 NdbTick_Elapsed(*yield_time, now).microSec();
1892
1893 if (micros_passed > 50000)
1894 {
1895 /**
1896 * Lower scheduling prio to time-sharing mode to ensure that
1897 * other threads and processes gets a chance to be scheduled
1898 * if we run for an extended time.
1899 */
1900 yield_rt_break(thread, type, TRUE);
1901 *yield_time = now;
1902 }
1903 }
1904
1905 static bool
check_yield(NDB_TICKS now,NDB_TICKS * start_spin_ticks,Uint64 min_spin_timer)1906 check_yield(NDB_TICKS now,
1907 NDB_TICKS *start_spin_ticks,
1908 Uint64 min_spin_timer) //microseconds
1909 {
1910 assert(min_spin_timer > 0);
1911
1912 if (!NdbTick_IsValid(*start_spin_ticks))
1913 {
1914 /**
1915 * We haven't started spinning yet, start spin timer.
1916 */
1917 *start_spin_ticks = now;
1918 return false;
1919 }
1920 if (unlikely(NdbTick_Compare(now, *start_spin_ticks) < 0))
1921 {
1922 /**
1923 * Timer was adjusted backwards, or the monotonic timer implementation
1924 * on this platform is unstable. Best we can do is to restart
1925 * spin timers from new current time.
1926 */
1927 *start_spin_ticks = now;
1928 return false;
1929 }
1930
1931 /**
1932 * We have a minimum spin timer before we go to sleep.
1933 * We will go to sleep only if we have spun for longer
1934 * time than required by the minimum spin time.
1935 */
1936 const Uint64 micros_passed =
1937 NdbTick_Elapsed(*start_spin_ticks, now).microSec();
1938 return (micros_passed >= min_spin_timer);
1939 }
1940
1941 /**
1942 * There are some send scheduling algorithms build into the send thread.
1943 * Mainly implemented as part of ::run_send_thread, thus commented here:
1944 *
1945 * We have the possibility to set a 'send delay' for each node. This
1946 * is used both for handling send overload where we should wait
1947 * before retrying, and as an aid for collecting smaller packets into
1948 * larger, and thus fewer packets. Thus decreasing the send overhead
1949 * on a highly loaded system.
1950 *
1951 * A delay due to overload is always waited for. As there are already
1952 * queued up send work in the buffers, sending will be possible
1953 * without the send thread actively busy-retrying. However, delays
1954 * in order to increase the packed size can be ignored.
1955 *
1956 * The basic idea if the later is the following:
1957 * By introducing a delay we ensure that all block threads have
1958 * gotten a chance to execute messages that will generate data
1959 * to be sent to nodes. This is particularly helpful in e.g.
1960 * queries that are scanning a table. Here a SCAN_TABREQ is
1961 * received in a TC and this generates a number of SCAN_FRAGREQ
1962 * signals to each LDM, each of those LDMs will in turn generate
1963 * a number of new signals that are all destined to the same
1964 * node. So this delay here increases the chance that those
1965 * signals can be sent in the same TCP/IP packet over the wire.
1966 *
1967 * Another use case is applications using the asynchronous API
1968 * and thus sending many PK lookups that traverse a node in
1969 * parallel from the same destination node. These can benefit
1970 * greatly from this extra delay increasing the packet sizes.
1971 *
1972 * There is also a case when sending many updates that need to
1973 * be sent to the other node in the same node group. By delaying
1974 * the send of this data we ensure that the receiver thread on
1975 * the other end is getting larger packet sizes and thus we
1976 * improve the throughput of the system in all sorts of ways.
1977 *
1978 * However we also try to ensure that we don't delay signals in
1979 * an idle system where response time is more important than
1980 * the throughput. This is achieved by the fact that we will
1981 * send after looping through the nodes ready to send to. In
1982 * an idle system this will be a quick operation. In a loaded
1983 * system this delay can be fairly substantial on the other
1984 * hand.
1985 *
1986 * Finally we attempt to limit the use of more than one send
1987 * thread to cases of very high load. So if there are only
1988 * delayed node sends remaining, we deduce that the
1989 * system is lightly loaded and we will go to sleep if there
1990 * are other send threads also awake.
1991 */
1992 void
run_send_thread(Uint32 instance_no)1993 thr_send_threads::run_send_thread(Uint32 instance_no)
1994 {
1995 struct thr_send_thread_instance *this_send_thread =
1996 &m_send_threads[instance_no];
1997 const Uint32 thr_no = num_threads + instance_no;
1998
1999 {
2000 /**
2001 * Wait for thread object to be visible
2002 */
2003 while(this_send_thread->m_thread == 0)
2004 NdbSleep_MilliSleep(30);
2005 }
2006
2007 {
2008 /**
2009 * Print out information about starting thread
2010 * (number, tid, name, the CPU it's locked into (if locked at all))
2011 * Also perform the locking to CPU.
2012 */
2013 BaseString tmp;
2014 THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
2015 tmp.appfmt("thr: %u ", thr_no);
2016 int tid = NdbThread_GetTid(this_send_thread->m_thread);
2017 if (tid != -1)
2018 {
2019 tmp.appfmt("tid: %u ", tid);
2020 }
2021 conf.appendInfoSendThread(tmp, instance_no);
2022 int res = conf.do_bind_send(this_send_thread->m_thread, instance_no);
2023 if (res < 0)
2024 {
2025 tmp.appfmt("err: %d ", -res);
2026 }
2027 else if (res > 0)
2028 {
2029 tmp.appfmt("OK ");
2030 }
2031 printf("%s\n", tmp.c_str());
2032 fflush(stdout);
2033 }
2034
2035 /**
2036 * register watchdog
2037 */
2038 globalEmulatorData.theWatchDog->
2039 registerWatchedThread(&this_send_thread->m_watchdog_counter, thr_no);
2040
2041 NdbMutex_Lock(send_thread_mutex);
2042 this_send_thread->m_awake = FALSE;
2043 NdbMutex_Unlock(send_thread_mutex);
2044
2045 NDB_TICKS start_spin_ticks;
2046 NDB_TICKS yield_ticks;
2047 bool real_time = false;
2048 Uint64 min_spin_timer = 0;
2049
2050 NdbTick_Invalidate(&start_spin_ticks);
2051 yield_ticks = NdbTick_getCurrentTicks();
2052 THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
2053 update_send_sched_config(conf, instance_no, real_time, min_spin_timer);
2054
2055 NodeId node = 0;
2056 while (globalData.theRestartFlag != perform_stop)
2057 {
2058 this_send_thread->m_watchdog_counter = 1;
2059
2060 NDB_TICKS now = NdbTick_getCurrentTicks();
2061 NdbMutex_Lock(send_thread_mutex);
2062 this_send_thread->m_awake = TRUE;
2063
2064 /**
2065 * If waited for a specific node, reinsert it such that
2066 * it can be re-evaluated for send by get_node().
2067 */
2068 if (node != 0)
2069 {
2070 insert_node(node);
2071 node = 0;
2072 }
2073 while (globalData.theRestartFlag != perform_stop &&
2074 (node = get_node(instance_no, now)) != 0) // PENDING -> ACTIVE
2075 {
2076 if (m_node_state[node].m_micros_delayed > 0) // Node send is delayed
2077 {
2078 if (m_node_state[node].m_send_overload) // Pause overloaded node
2079 break;
2080
2081 /**
2082 * non-overload is a 'soft delay' which we might ignore depending on
2083 * current load. On a lightly loaded system we send immediately
2084 * to reduce latency. On a loaded system we increase throughput
2085 * by collecting into larger packets.
2086 * If multiple send threads are awake, excess threads are put to sleep.
2087 */
2088 if (count_awake_send_threads() == 1) // Lightly loaded system:
2089 set_max_delay(node, now, 0); // Send now to improve latency
2090 else if (mt_get_send_buffer_bytes(node) >= MAX_SEND_BUFFER_SIZE_TO_DELAY)
2091 set_max_delay(node, now, 0); // Large packet -> Send now
2092 else // Sleep, let last awake send
2093 {
2094 insert_node(node);
2095 m_more_nodes = false;
2096 node = 0;
2097 break;
2098 }
2099 }
2100
2101 /**
2102 * Multiple send threads can not 'get' the same
2103 * node simultaneously. Thus, we does not need
2104 * to keep the global send thread mutex any longer.
2105 * Also avoids worker threads blocking on us in
2106 * ::alert_send_thread
2107 */
2108 NdbMutex_Unlock(send_thread_mutex);
2109 this_send_thread->m_watchdog_counter = 6;
2110
2111 /**
2112 * Need a lock on the send buffers to protect against
2113 * worker thread doing ::forceSend, possibly
2114 * reset_send_buffers() and/or lock_/unlock_transporter().
2115 * To avoid a livelock with ::forceSend() on an overloaded
2116 * systems, we 'try-lock', and reinsert the node for
2117 * later retry if failed.
2118 */
2119 bool more = true;
2120 Uint32 bytes_sent = 0;
2121 if (likely(trylock_send_node(node) == 0))
2122 {
2123 more = perform_send(node, instance_no, bytes_sent);
2124 /* We return with no locks or mutexes held */
2125
2126 /* Release chunk-wise to decrease pressure on lock */
2127 this_send_thread->m_watchdog_counter = 3;
2128 this_send_thread->m_send_buffer_pool.
2129 release_chunk(g_thr_repository->m_mm, RG_TRANSPORTER_BUFFERS);
2130 }
2131
2132 /**
2133 * Either own perform_send() processing, or external 'alert'
2134 * could have signaled that there are more sends pending.
2135 * If we had no progress in perform_send, we conclude that
2136 * node is overloaded, and takes a break doing further send
2137 * attempts to that node. Also failure of trylock_send_node
2138 * will result on the 'overload' to be concluded.
2139 * (Quite reasonable as the worker thread is likely forceSend'ing)
2140 */
2141 now = NdbTick_getCurrentTicks();
2142 NdbMutex_Lock(send_thread_mutex);
2143
2144 if (more || // ACTIVE -> PENDING
2145 !check_done_node(node)) // ACTIVE-P -> PENDING
2146 {
2147 insert_node(node);
2148
2149 if (unlikely(more && bytes_sent == 0)) //Node is overloaded
2150 {
2151 set_overload_delay(node, now, 1000); //Delay send-retry by 1000us
2152 }
2153 } // else: // ACTIVE -> IDLE
2154 } // while (get_node()...)
2155
2156 /* No more nodes having data to send right now, prepare to sleep */
2157 this_send_thread->m_awake = FALSE;
2158 const Uint32 node_wait = (node != 0) ? m_node_state[node].m_micros_delayed : 0;
2159 NdbMutex_Unlock(send_thread_mutex);
2160
2161 if (real_time)
2162 {
2163 check_real_time_break(now,
2164 &yield_ticks,
2165 this_send_thread->m_thread,
2166 SendThread);
2167 }
2168
2169
2170 if (min_spin_timer == 0 ||
2171 check_yield(now,
2172 &start_spin_ticks,
2173 min_spin_timer))
2174 {
2175 Uint32 max_wait_usec;
2176 /**
2177 * We sleep a max time, possibly waiting for a specific node
2178 * with delayed send (overloaded, or waiting for more payload).
2179 * Send thread instance#0 should only be allowed to take short naps.
2180 * Other send threads may sleep longer if not needed right now.
2181 * (Will be alerted to start working when more send work arrives)
2182 */
2183 if (node_wait != 0)
2184 max_wait_usec = node_wait;
2185 else if (instance_no == 0)
2186 max_wait_usec = 10*1000; //10ms, default sleep if not set by ::get_node()
2187 else
2188 max_wait_usec = 50*1000; //50ms, has to wakeup before 100ms watchdog alert.
2189
2190 yield(&this_send_thread->m_waiter_struct, max_wait_usec*1000,
2191 check_available_send_data, (struct thr_data*)NULL);
2192 }
2193 }
2194
2195 globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
2196 }
2197
2198 #if 0
2199 static
2200 Uint32
2201 fifo_used_pages(struct thr_data* selfptr)
2202 {
2203 return calc_fifo_used(selfptr->m_first_unused,
2204 selfptr->m_first_free,
2205 THR_FREE_BUF_MAX);
2206 }
2207 #endif
2208
2209 ATTRIBUTE_NOINLINE
2210 static
2211 void
job_buffer_full(struct thr_data * selfptr)2212 job_buffer_full(struct thr_data* selfptr)
2213 {
2214 ndbout_c("job buffer full");
2215 dumpJobQueues();
2216 abort();
2217 }
2218
2219 ATTRIBUTE_NOINLINE
2220 static
2221 void
out_of_job_buffer(struct thr_data * selfptr)2222 out_of_job_buffer(struct thr_data* selfptr)
2223 {
2224 ndbout_c("out of job buffer");
2225 dumpJobQueues();
2226 abort();
2227 }
2228
2229 static
2230 thr_job_buffer*
seize_buffer(struct thr_repository * rep,int thr_no,bool prioa)2231 seize_buffer(struct thr_repository* rep, int thr_no, bool prioa)
2232 {
2233 thr_job_buffer* jb;
2234 struct thr_data* selfptr = &rep->m_thread[thr_no];
2235 Uint32 first_free = selfptr->m_first_free;
2236 Uint32 first_unused = selfptr->m_first_unused;
2237
2238 /*
2239 * An empty FIFO is denoted by m_first_free == m_first_unused.
2240 * So we will never have a completely full FIFO array, at least one entry will
2241 * always be unused. But the code is simpler as a result.
2242 */
2243
2244 /*
2245 * We never allow the fifo to become completely empty, as we want to have
2246 * a good number of signals available for trace files in case of a forced
2247 * shutdown.
2248 */
2249 Uint32 buffers = (first_free > first_unused ?
2250 first_unused + THR_FREE_BUF_MAX - first_free :
2251 first_unused - first_free);
2252 if (unlikely(buffers <= THR_FREE_BUF_MIN))
2253 {
2254 /*
2255 * All used, allocate another batch from global pool.
2256 *
2257 * Put the new buffers at the head of the fifo, so as not to needlessly
2258 * push out any existing buffers from the fifo (that would loose useful
2259 * data for signal dumps in trace files).
2260 */
2261 Uint32 cnt = 0;
2262 Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
2263 assert(batch > 0);
2264 assert(batch + THR_FREE_BUF_MIN < THR_FREE_BUF_MAX);
2265 do {
2266 jb = rep->m_jb_pool.seize(rep->m_mm, RG_JOBBUFFER);
2267 if (unlikely(jb == 0))
2268 {
2269 if (unlikely(cnt == 0))
2270 {
2271 out_of_job_buffer(selfptr);
2272 }
2273 break;
2274 }
2275 jb->m_len = 0;
2276 jb->m_prioa = false;
2277 first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
2278 selfptr->m_free_fifo[first_free] = jb;
2279 batch--;
2280 } while (cnt < batch);
2281 selfptr->m_first_free = first_free;
2282 }
2283
2284 jb= selfptr->m_free_fifo[first_free];
2285 selfptr->m_first_free = (first_free + 1) % THR_FREE_BUF_MAX;
2286 /* Init here rather than in release_buffer() so signal dump will work. */
2287 jb->m_len = 0;
2288 jb->m_prioa = prioa;
2289 return jb;
2290 }
2291
2292 static
2293 void
release_buffer(struct thr_repository * rep,int thr_no,thr_job_buffer * jb)2294 release_buffer(struct thr_repository* rep, int thr_no, thr_job_buffer* jb)
2295 {
2296 struct thr_data* selfptr = &rep->m_thread[thr_no];
2297 Uint32 first_free = selfptr->m_first_free;
2298 Uint32 first_unused = selfptr->m_first_unused;
2299
2300 /*
2301 * Pack near-empty signals, to get more info in the signal traces.
2302 *
2303 * This is not currently used, as we only release full job buffers, hence
2304 * the #if 0.
2305 */
2306 #if 0
2307 Uint32 last_free = (first_unused ? first_unused : THR_FREE_BUF_MAX) - 1;
2308 thr_job_buffer *last_jb = selfptr->m_free_fifo[last_free];
2309 Uint32 len1, len2;
2310
2311 if (!jb->m_prioa &&
2312 first_free != first_unused &&
2313 !last_jb->m_prioa &&
2314 (len2 = jb->m_len) <= (thr_job_buffer::SIZE / 4) &&
2315 (len1 = last_jb->m_len) + len2 <= thr_job_buffer::SIZE)
2316 {
2317 /*
2318 * The buffer being release is fairly empty, and what data it contains fit
2319 * in the previously released buffer.
2320 *
2321 * We want to avoid too many almost-empty buffers in the free fifo, as that
2322 * makes signal traces less useful due to too little data available. So in
2323 * this case we move the data from the buffer to be released into the
2324 * previous buffer, and place the to-be-released buffer at the head of the
2325 * fifo (to be immediately reused).
2326 *
2327 * This is only done for prio B buffers, as we must not merge prio A and B
2328 * data (or dumps would be incorrect), and prio A buffers are in any case
2329 * full when released.
2330 */
2331 memcpy(last_jb->m_data + len1, jb->m_data, len2*sizeof(jb->m_data[0]));
2332 last_jb->m_len = len1 + len2;
2333 jb->m_len = 0;
2334 first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
2335 selfptr->m_free_fifo[first_free] = jb;
2336 selfptr->m_first_free = first_free;
2337 }
2338 else
2339 #endif
2340 {
2341 /* Just insert at the end of the fifo. */
2342 selfptr->m_free_fifo[first_unused] = jb;
2343 first_unused = (first_unused + 1) % THR_FREE_BUF_MAX;
2344 selfptr->m_first_unused = first_unused;
2345 }
2346
2347 if (unlikely(first_unused == first_free))
2348 {
2349 /* FIFO full, need to release to global pool. */
2350 Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
2351 assert(batch > 0);
2352 assert(batch < THR_FREE_BUF_MAX);
2353 do {
2354 rep->m_jb_pool.release(rep->m_mm, RG_JOBBUFFER,
2355 selfptr->m_free_fifo[first_free]);
2356 first_free = (first_free + 1) % THR_FREE_BUF_MAX;
2357 batch--;
2358 } while (batch > 0);
2359 selfptr->m_first_free = first_free;
2360 }
2361 }
2362
2363 static
2364 inline
2365 Uint32
scan_queue(struct thr_data * selfptr,Uint32 cnt,Uint32 end,Uint32 * ptr)2366 scan_queue(struct thr_data* selfptr, Uint32 cnt, Uint32 end, Uint32* ptr)
2367 {
2368 Uint32 thr_no = selfptr->m_thr_no;
2369 Uint32 **pages = selfptr->m_tq.m_delayed_signals;
2370 Uint32 free = selfptr->m_tq.m_next_free;
2371 Uint32* save = ptr;
2372 for (Uint32 i = 0; i < cnt; i++, ptr++)
2373 {
2374 Uint32 val = * ptr;
2375 if ((val & 0xFFFF) <= end)
2376 {
2377 Uint32 idx = val >> 16;
2378 Uint32 buf = idx >> 8;
2379 Uint32 pos = MAX_SIGNAL_SIZE * (idx & 0xFF);
2380
2381 Uint32* page = * (pages + buf);
2382
2383 const SignalHeader *s = reinterpret_cast<SignalHeader*>(page + pos);
2384 const Uint32 *data = page + pos + (sizeof(*s)>>2);
2385 if (0)
2386 ndbout_c("found %p val: %d end: %d", s, val & 0xFFFF, end);
2387 /*
2388 * ToDo: Do measurements of the frequency of these prio A timed signals.
2389 *
2390 * If they are frequent, we may want to optimize, as sending one prio A
2391 * signal is somewhat expensive compared to sending one prio B.
2392 */
2393 sendprioa(thr_no, s, data,
2394 data + s->theLength);
2395 * (page + pos) = free;
2396 free = idx;
2397 }
2398 else if (i > 0)
2399 {
2400 selfptr->m_tq.m_next_free = free;
2401 memmove(save, ptr, 4 * (cnt - i));
2402 return i;
2403 }
2404 else
2405 {
2406 return 0;
2407 }
2408 }
2409 selfptr->m_tq.m_next_free = free;
2410 return cnt;
2411 }
2412
2413 static
2414 void
handle_time_wrap(struct thr_data * selfptr)2415 handle_time_wrap(struct thr_data* selfptr)
2416 {
2417 Uint32 i;
2418 struct thr_tq * tq = &selfptr->m_tq;
2419 Uint32 cnt0 = tq->m_cnt[0];
2420 Uint32 cnt1 = tq->m_cnt[1];
2421 Uint32 tmp0 = scan_queue(selfptr, cnt0, 32767, tq->m_short_queue);
2422 Uint32 tmp1 = scan_queue(selfptr, cnt1, 32767, tq->m_long_queue);
2423 cnt0 -= tmp0;
2424 cnt1 -= tmp1;
2425 tq->m_cnt[0] = cnt0;
2426 tq->m_cnt[1] = cnt1;
2427 for (i = 0; i<cnt0; i++)
2428 {
2429 assert((tq->m_short_queue[i] & 0xFFFF) > 32767);
2430 tq->m_short_queue[i] -= 32767;
2431 }
2432 for (i = 0; i<cnt1; i++)
2433 {
2434 assert((tq->m_long_queue[i] & 0xFFFF) > 32767);
2435 tq->m_long_queue[i] -= 32767;
2436 }
2437 }
2438
2439 /**
2440 * FUNCTION: scan_time_queues(), scan_time_queues_impl(),
2441 * scan_time_queues_backtick()
2442 *
2443 * scan_time_queues() Implements the part we want to be inlined
2444 * into the scheduler loops, while *_impl() & *_backtick() is
2445 * the more unlikely part we don't call unless the timer has
2446 * ticked backward or forward more than 1ms since last 'scan_time.
2447 *
2448 * Check if any delayed signals has expired and should be sent now.
2449 * The time_queues will be checked every time we detect a change
2450 * in current time of >= 1ms. If idle we will sleep for max 10ms
2451 * before rechecking the time_queue.
2452 *
2453 * However, some situations need special attention:
2454 * - Even if we prefer monotonic timers, they are not available, or
2455 * implemented in our abstraction layer, for all platforms.
2456 * A non-monotonic timer may leap when adjusted by the user, both
2457 * forward or backwards.
2458 * - Early implementation of monotonic timers had bugs where time
2459 * could jump. Similar problems has been reported for several VMs.
2460 * - There might be CPU contention or system swapping where we might
2461 * sleep for significantly longer that 10ms, causing long forward
2462 * leaps in perceived time.
2463 *
2464 * In order to adapt to this non-perfect clock behaviour, the
2465 * scheduler has its own 'm_ticks' which is the current time
2466 * as perceived by the scheduler. On entering this function, 'now'
2467 * is the 'real' current time fetched from NdbTick_getCurrentTime().
2468 * 'selfptr->m_ticks' is the previous tick seen by the scheduler,
2469 * and as such is the timestamp which reflects the current time
2470 * as seen by the timer queues.
2471 *
2472 * Normally only a few milliseconds will elapse between each ticks
2473 * as seen by the diff between 'now' and 'selfthr->m_ticks'.
2474 * However, if there are larger leaps in the current time,
2475 * we breaks this up in several small(20ms) steps
2476 * by gradually increasing schedulers 'm_ticks' time. This ensure
2477 * that delayed signals will arrive in correct relative order,
2478 * and repeated signals (pace signals) are received with
2479 * the expected frequence. However, each individual signal may
2480 * be delayed or arriving to fast. Where excact timing is critical,
2481 * these signals should do their own time calculation by reading
2482 * the clock, instead of trusting that the signal is delivered as
2483 * specified by the 'delay' argument
2484 *
2485 * If there are leaps larger than 1500ms, we try a hybrid
2486 * solution by moving the 'm_ticks' forward, close to the
2487 * actuall current time, then continue as above from that
2488 * point in time. A 'time leap Warning' will also be printed
2489 * in the logs.
2490 */
2491 static
2492 Uint32
scan_time_queues_impl(struct thr_data * selfptr,Uint32 diff)2493 scan_time_queues_impl(struct thr_data* selfptr, Uint32 diff)
2494 {
2495 NDB_TICKS last = selfptr->m_ticks;
2496 Uint32 step = diff;
2497
2498 if (unlikely(diff > 20)) // Break up into max 20ms steps
2499 {
2500 if (unlikely(diff > 1500)) // Time leaped more than 1500ms
2501 {
2502 /**
2503 * There was a long leap in the time since last checking
2504 * of the time_queues. The clock could have been adjusted, or we
2505 * are CPU starved. Anyway, we can never make up for the lost
2506 * CPU cycles, so we forget about them and start fresh from
2507 * a point in time 1000ms behind our current time.
2508 */
2509 g_eventLogger->warning("thr: %u: Overslept %u ms, expected ~10ms",
2510 selfptr->m_thr_no, diff);
2511
2512 last = NdbTick_AddMilliseconds(last, diff-1000);
2513 }
2514 step = 20; // Max expire intervall handled is 20ms
2515 }
2516
2517 struct thr_tq * tq = &selfptr->m_tq;
2518 Uint32 curr = tq->m_current_time;
2519 Uint32 cnt0 = tq->m_cnt[0];
2520 Uint32 cnt1 = tq->m_cnt[1];
2521 Uint32 end = (curr + step);
2522 if (end >= 32767)
2523 {
2524 handle_time_wrap(selfptr);
2525 cnt0 = tq->m_cnt[0];
2526 cnt1 = tq->m_cnt[1];
2527 end -= 32767;
2528 }
2529
2530 Uint32 tmp0 = scan_queue(selfptr, cnt0, end, tq->m_short_queue);
2531 Uint32 tmp1 = scan_queue(selfptr, cnt1, end, tq->m_long_queue);
2532
2533 tq->m_current_time = end;
2534 tq->m_cnt[0] = cnt0 - tmp0;
2535 tq->m_cnt[1] = cnt1 - tmp1;
2536 selfptr->m_ticks = NdbTick_AddMilliseconds(last, step);
2537 return (diff - step);
2538 }
2539
2540 /**
2541 * Clock has ticked backwards. We try to handle this
2542 * as best we can.
2543 */
2544 static
2545 void
scan_time_queues_backtick(struct thr_data * selfptr,NDB_TICKS now)2546 scan_time_queues_backtick(struct thr_data* selfptr, NDB_TICKS now)
2547 {
2548 const NDB_TICKS last = selfptr->m_ticks;
2549 assert(NdbTick_Compare(now, last) < 0);
2550
2551 const Uint64 backward = NdbTick_Elapsed(now, last).milliSec();
2552
2553 /**
2554 * Silently ignore sub millisecond backticks.
2555 * Such 'noise' is unfortunately common, even for monotonic timers.
2556 */
2557 if (backward > 0)
2558 {
2559 g_eventLogger->warning("thr: %u Time ticked backwards %llu ms.",
2560 selfptr->m_thr_no, backward);
2561
2562 /* Long backticks should never happen for monotonic timers */
2563 assert(backward < 100 || !NdbTick_IsMonotonic());
2564
2565 /* Accept new time as current */
2566 selfptr->m_ticks = now;
2567 }
2568 }
2569
2570 /**
2571 * If someone sends a signal with delay it means that the signal
2572 * should be executed as soon as we come to the scan_time_queues
2573 * independent of the amount of time spent since it was sent. We
2574 * use a special time queue for bounded delay signals to avoid having
2575 * to scan through all short time queue signals in every loop of
2576 * the run job buffers.
2577 */
2578 static inline
2579 void
scan_zero_queue(struct thr_data * selfptr)2580 scan_zero_queue(struct thr_data* selfptr)
2581 {
2582 struct thr_tq * tq = &selfptr->m_tq;
2583 Uint32 cnt = tq->m_cnt[2];
2584 if (cnt)
2585 {
2586 Uint32 num_found = scan_queue(selfptr,
2587 cnt,
2588 tq->m_current_time,
2589 tq->m_zero_queue);
2590 require(num_found == cnt);
2591 }
2592 tq->m_cnt[2] = 0;
2593 }
2594
2595 static inline
2596 Uint32
scan_time_queues(struct thr_data * selfptr,NDB_TICKS now)2597 scan_time_queues(struct thr_data* selfptr, NDB_TICKS now)
2598 {
2599 scan_zero_queue(selfptr);
2600 const NDB_TICKS last = selfptr->m_ticks;
2601 if (unlikely(NdbTick_Compare(now, last) < 0))
2602 {
2603 scan_time_queues_backtick(selfptr, now);
2604 return 0;
2605 }
2606
2607 const Uint32 diff = (Uint32)NdbTick_Elapsed(last, now).milliSec();
2608 if (unlikely(diff > 0))
2609 {
2610 return scan_time_queues_impl(selfptr, diff);
2611 }
2612 return 0;
2613 }
2614
2615 static
2616 inline
2617 Uint32*
get_free_slot(struct thr_repository * rep,struct thr_data * selfptr,Uint32 * idxptr)2618 get_free_slot(struct thr_repository* rep,
2619 struct thr_data* selfptr,
2620 Uint32* idxptr)
2621 {
2622 struct thr_tq * tq = &selfptr->m_tq;
2623 Uint32 idx = tq->m_next_free;
2624 retry:
2625
2626 if (idx != RNIL)
2627 {
2628 Uint32 buf = idx >> 8;
2629 Uint32 pos = idx & 0xFF;
2630 Uint32* page = * (tq->m_delayed_signals + buf);
2631 Uint32* ptr = page + (MAX_SIGNAL_SIZE * pos);
2632 tq->m_next_free = * ptr;
2633 * idxptr = idx;
2634 return ptr;
2635 }
2636
2637 Uint32 thr_no = selfptr->m_thr_no;
2638 for (Uint32 i = 0; i<thr_tq::PAGES; i++)
2639 {
2640 if (tq->m_delayed_signals[i] == 0)
2641 {
2642 struct thr_job_buffer *jb = seize_buffer(rep, thr_no, false);
2643 Uint32 * page = reinterpret_cast<Uint32*>(jb);
2644 tq->m_delayed_signals[i] = page;
2645 /**
2646 * Init page
2647 */
2648 for (Uint32 j = 0; j < MIN_SIGNALS_PER_PAGE; j ++)
2649 {
2650 page[j * MAX_SIGNAL_SIZE] = (i << 8) + (j + 1);
2651 }
2652 page[MIN_SIGNALS_PER_PAGE*MAX_SIGNAL_SIZE] = RNIL;
2653 idx = (i << 8);
2654 goto retry;
2655 }
2656 }
2657 abort();
2658 return NULL;
2659 }
2660
2661 void
senddelay(Uint32 thr_no,const SignalHeader * s,Uint32 delay)2662 senddelay(Uint32 thr_no, const SignalHeader* s, Uint32 delay)
2663 {
2664 struct thr_repository* rep = g_thr_repository;
2665 struct thr_data* selfptr = &rep->m_thread[thr_no];
2666 assert(my_thread_equal(selfptr->m_thr_id, my_thread_self()));
2667 unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
2668
2669 Uint32 max;
2670 Uint32 * cntptr;
2671 Uint32 * queueptr;
2672
2673 Uint32 alarm;
2674 Uint32 nexttimer = selfptr->m_tq.m_next_timer;
2675 if (delay == SimulatedBlock::BOUNDED_DELAY)
2676 {
2677 alarm = selfptr->m_tq.m_current_time;
2678 cntptr = selfptr->m_tq.m_cnt + 2;
2679 queueptr = selfptr->m_tq.m_zero_queue;
2680 max = thr_tq::ZQ_SIZE;
2681 }
2682 else
2683 {
2684 alarm = selfptr->m_tq.m_current_time + delay;
2685 if (delay < 100)
2686 {
2687 cntptr = selfptr->m_tq.m_cnt + 0;
2688 queueptr = selfptr->m_tq.m_short_queue;
2689 max = thr_tq::SQ_SIZE;
2690 }
2691 else
2692 {
2693 cntptr = selfptr->m_tq.m_cnt + 1;
2694 queueptr = selfptr->m_tq.m_long_queue;
2695 max = thr_tq::LQ_SIZE;
2696 }
2697 }
2698
2699 Uint32 idx;
2700 Uint32* ptr = get_free_slot(rep, selfptr, &idx);
2701 memcpy(ptr, s, 4*siglen);
2702
2703 if (0)
2704 ndbout_c("now: %d alarm: %d send %s from %s to %s delay: %d idx: %x %p",
2705 selfptr->m_tq.m_current_time,
2706 alarm,
2707 getSignalName(s->theVerId_signalNumber),
2708 getBlockName(refToBlock(s->theSendersBlockRef)),
2709 getBlockName(s->theReceiversBlockNumber),
2710 delay,
2711 idx, ptr);
2712
2713 Uint32 i;
2714 Uint32 cnt = *cntptr;
2715 Uint32 newentry = (idx << 16) | (alarm & 0xFFFF);
2716
2717 * cntptr = cnt + 1;
2718 selfptr->m_tq.m_next_timer = alarm < nexttimer ? alarm : nexttimer;
2719
2720 if (cnt == 0 || delay == SimulatedBlock::BOUNDED_DELAY)
2721 {
2722 /* First delayed signal needs no order and bounded delay is FIFO */
2723 queueptr[cnt] = newentry;
2724 return;
2725 }
2726 else if (cnt < max)
2727 {
2728 for (i = 0; i<cnt; i++)
2729 {
2730 Uint32 save = queueptr[i];
2731 if ((save & 0xFFFF) > alarm)
2732 {
2733 memmove(queueptr+i+1, queueptr+i, 4*(cnt - i));
2734 queueptr[i] = newentry;
2735 return;
2736 }
2737 }
2738 assert(i == cnt);
2739 queueptr[i] = newentry;
2740 return;
2741 }
2742 else
2743 {
2744 /* Out of entries in time queue, issue proper error */
2745 if (cntptr == (selfptr->m_tq.m_cnt + 0))
2746 {
2747 /* Error in short time queue */
2748 ERROR_SET(ecError, NDBD_EXIT_TIME_QUEUE_SHORT,
2749 "Too many in Short Time Queue", "mt.cpp" );
2750 }
2751 else if (cntptr == (selfptr->m_tq.m_cnt + 1))
2752 {
2753 /* Error in long time queue */
2754 ERROR_SET(ecError, NDBD_EXIT_TIME_QUEUE_LONG,
2755 "Too many in Long Time Queue", "mt.cpp" );
2756 }
2757 else
2758 {
2759 /* Error in zero time queue */
2760 ERROR_SET(ecError, NDBD_EXIT_TIME_QUEUE_ZERO,
2761 "Too many in Zero Time Queue", "mt.cpp" );
2762 }
2763 }
2764 }
2765
2766 /*
2767 * Flush the write state to the job queue, making any new signals available to
2768 * receiving threads.
2769 *
2770 * Two versions:
2771 * - The general version flush_write_state_other() which may flush to
2772 * any thread, and possibly signal any waiters.
2773 * - The special version flush_write_state_self() which should only be used
2774 * to flush messages to itself.
2775 *
2776 * Call to these functions are encapsulated through flush_write_state
2777 * which decides which of these functions to call.
2778 */
2779 static inline
2780 void
flush_write_state_self(thr_job_queue_head * q_head,thr_jb_write_state * w)2781 flush_write_state_self(thr_job_queue_head *q_head, thr_jb_write_state *w)
2782 {
2783 /*
2784 * Can simplify the flush_write_state when writing to myself:
2785 * Simply update write references wo/ mutex, memory barrier and signaling
2786 */
2787 w->m_write_buffer->m_len = w->m_write_pos;
2788 q_head->m_write_index = w->m_write_index;
2789 w->init_pending_signals();
2790 }
2791
2792 static inline
2793 void
flush_write_state_other(thr_data * dstptr,thr_job_queue_head * q_head,thr_jb_write_state * w)2794 flush_write_state_other(thr_data *dstptr, thr_job_queue_head *q_head,
2795 thr_jb_write_state *w)
2796 {
2797 Uint32 pending_signals_saved;
2798 /*
2799 * Two write memory barriers here, as assigning m_len may make signal data
2800 * available to other threads, and assigning m_write_index may make new
2801 * buffers available.
2802 *
2803 * We could optimize this by only doing it as needed, and only doing it
2804 * once before setting all m_len, and once before setting all m_write_index.
2805 *
2806 * But wmb() is a no-op anyway in x86 ...
2807 */
2808 wmb();
2809 w->m_write_buffer->m_len = w->m_write_pos;
2810 wmb();
2811 q_head->m_write_index = w->m_write_index;
2812
2813 pending_signals_saved = w->get_pending_signals_wakeup();
2814 pending_signals_saved += w->get_pending_signals();
2815
2816 if (pending_signals_saved >= MAX_SIGNALS_BEFORE_WAKEUP)
2817 {
2818 w->init_pending_signals();
2819 wakeup(&(dstptr->m_waiter));
2820 }
2821 else
2822 {
2823 w->clear_pending_signals_and_set_wakeup(pending_signals_saved);
2824 }
2825 }
2826
2827 /**
2828 This function is used when we need to send signal immediately
2829 due to the flush limit being reached. We don't know whether
2830 signal is to ourselves in this case and we act dependent on who
2831 is the receiver of the signal.
2832 */
2833 static inline
2834 void
flush_write_state(const thr_data * selfptr,thr_data * dstptr,thr_job_queue_head * q_head,thr_jb_write_state * w)2835 flush_write_state(const thr_data *selfptr, thr_data *dstptr,
2836 thr_job_queue_head *q_head, thr_jb_write_state *w)
2837 {
2838 if (dstptr == selfptr)
2839 {
2840 flush_write_state_self(q_head, w);
2841 }
2842 else
2843 {
2844 flush_write_state_other(dstptr, q_head, w);
2845 }
2846 }
2847
2848 /**
2849 This function is used when we are called from flush_jbb_write_state
2850 where we know that the receiver should wakeup to receive the signals
2851 we're sending.
2852 */
2853 static inline
2854 void
flush_write_state_other_wakeup(thr_data * dstptr,thr_job_queue_head * q_head,thr_jb_write_state * w)2855 flush_write_state_other_wakeup(thr_data *dstptr,
2856 thr_job_queue_head *q_head,
2857 thr_jb_write_state *w)
2858 {
2859 /*
2860 * We already did a memory barrier before the loop calling this
2861 * function to ensure the buffer is properly seen by receiving
2862 * thread.
2863 */
2864 w->m_write_buffer->m_len = w->m_write_pos;
2865 wmb();
2866 q_head->m_write_index = w->m_write_index;
2867
2868 w->init_pending_signals();
2869 wakeup(&(dstptr->m_waiter));
2870 }
2871
2872 static
2873 void
flush_jbb_write_state(thr_data * selfptr)2874 flush_jbb_write_state(thr_data *selfptr)
2875 {
2876 Uint32 thr_count = g_thr_repository->m_thread_count;
2877 Uint32 self = selfptr->m_thr_no;
2878
2879 thr_jb_write_state *w = selfptr->m_write_states + self;
2880 thr_data *thrptr = g_thr_repository->m_thread;
2881
2882 /**
2883 We start by flushing to ourselves, this requires no extra memory
2884 barriers and ensures that we can proceed in the loop knowing that
2885 we will only send to remote threads.
2886
2887 After this we will insert a memory barrier before we start updating
2888 the m_len variable that makes other threads see our signals that
2889 we're sending to them. We need the memory barrier to ensure that the
2890 buffers are seen properly updated by the remote thread when they see
2891 the pointer to them.
2892 */
2893 if (w->has_any_pending_signals())
2894 {
2895 flush_write_state_self(selfptr->m_in_queue_head + self, w);
2896 }
2897 wmb();
2898 w = selfptr->m_write_states;
2899 thr_jb_write_state *w_end = selfptr->m_write_states + thr_count;
2900 for (; w < w_end; thrptr++, w++)
2901 {
2902 if (w->has_any_pending_signals())
2903 {
2904 thr_job_queue_head *q_head = thrptr->m_in_queue_head + self;
2905 flush_write_state_other_wakeup(thrptr, q_head, w);
2906 }
2907 }
2908 }
2909
2910 /**
2911 * Receive thread will unpack 1024 signals (MAX_RECEIVED_SIGNALS)
2912 * from Transporters before running another check_recv_queue
2913 *
2914 * This function returns true if there is not space to unpack
2915 * this amount of signals, else false.
2916 *
2917 * Also used as callback function from yield() to recheck
2918 * 'full' condition before going to sleep.
2919 */
2920 static bool
check_recv_queue(thr_job_queue_head * q_head)2921 check_recv_queue(thr_job_queue_head *q_head)
2922 {
2923 const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
2924 /**
2925 * NOTE: m_read_index is read wo/ lock (and updated by different thread)
2926 * but since the different thread can only consume
2927 * signals this means that the value returned from this
2928 * function is always conservative (i.e it can be better than
2929 * returned value, if read-index has moved but we didnt see it)
2930 */
2931 const unsigned ri = q_head->m_read_index;
2932 const unsigned wi = q_head->m_write_index;
2933 const unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
2934 return (1 + minfree + busy >= thr_job_queue::SIZE);
2935 }
2936
2937 /**
2938 * Check if any of the receive queues for the threads being served
2939 * by this receive thread, are full.
2940 * If full: Return 'Thr_data*' for (one of) the thread(s)
2941 * which we have to wait for. (to consume from queue)
2942 */
2943 static struct thr_data*
get_congested_recv_queue(struct thr_repository * rep,Uint32 recv_thread_id)2944 get_congested_recv_queue(struct thr_repository* rep, Uint32 recv_thread_id)
2945 {
2946 const unsigned thr_no = first_receiver_thread_no + recv_thread_id;
2947 thr_data *thrptr = rep->m_thread;
2948
2949 for (unsigned i = 0; i<num_threads; i++, thrptr++)
2950 {
2951 thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
2952 if (check_recv_queue(q_head))
2953 {
2954 return thrptr;
2955 }
2956 }
2957 return NULL;
2958 }
2959
2960 /**
2961 * Compute free buffers in specified queue.
2962 * The SAFETY margin is subtracted from the available
2963 * 'free'. which is returned.
2964 */
2965 static
2966 Uint32
compute_free_buffers_in_queue(const thr_job_queue_head * q_head)2967 compute_free_buffers_in_queue(const thr_job_queue_head *q_head)
2968 {
2969 /**
2970 * NOTE: m_read_index is read wo/ lock (and updated by different thread)
2971 * but since the different thread can only consume
2972 * signals this means that the value returned from this
2973 * function is always conservative (i.e it can be better than
2974 * returned value, if read-index has moved but we didnt see it)
2975 */
2976 unsigned ri = q_head->m_read_index;
2977 unsigned wi = q_head->m_write_index;
2978 unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
2979
2980 assert(free <= thr_job_queue::SIZE);
2981
2982 if (free <= (1 + thr_job_queue::SAFETY))
2983 return 0;
2984 else
2985 return free - (1 + thr_job_queue::SAFETY);
2986 }
2987
2988 static
2989 Uint32
compute_min_free_out_buffers(Uint32 thr_no)2990 compute_min_free_out_buffers(Uint32 thr_no)
2991 {
2992 Uint32 minfree = thr_job_queue::SIZE;
2993 const struct thr_repository* rep = g_thr_repository;
2994 const struct thr_data *thrptr = rep->m_thread;
2995
2996 for (unsigned i = 0; i<num_threads; i++, thrptr++)
2997 {
2998 const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
2999 unsigned free = compute_free_buffers_in_queue(q_head);
3000
3001 if (free < minfree)
3002 minfree = free;
3003 }
3004 return minfree;
3005 }
3006
3007 /**
3008 * Compute max signals that thr_no can execute wo/ risking
3009 * job-buffer-full
3010 *
3011 * see-also update_sched_config
3012 *
3013 *
3014 * 1) compute free-slots in ring-buffer from self to each thread in system
3015 * 2) pick smallest value
3016 * 3) compute how many signals this corresponds to
3017 * 4) compute how many signals self can execute if all were to be to
3018 * the thread with the fullest ring-buffer (i.e the worst case)
3019 *
3020 * Assumption: each signal may send *at most* 4 signals
3021 * - this assumption is made the same in ndbd and ndbmtd and is
3022 * mostly followed by block-code, although not it all places :-(
3023 */
3024 static
3025 Uint32
compute_max_signals_to_execute(Uint32 min_free_buffers)3026 compute_max_signals_to_execute(Uint32 min_free_buffers)
3027 {
3028 return ((min_free_buffers * MIN_SIGNALS_PER_PAGE) + 3) / 4;
3029 }
3030
3031 static
3032 void
dumpJobQueues(void)3033 dumpJobQueues(void)
3034 {
3035 BaseString tmp;
3036 const struct thr_repository* rep = g_thr_repository;
3037 for (unsigned from = 0; from<num_threads; from++)
3038 {
3039 for (unsigned to = 0; to<num_threads; to++)
3040 {
3041 const thr_data *thrptr = rep->m_thread + to;
3042 const thr_job_queue_head *q_head = thrptr->m_in_queue_head + from;
3043
3044 const unsigned used = q_head->used();
3045 if (used > 0)
3046 {
3047 tmp.appfmt(" job buffer %d --> %d, used %d",
3048 from, to, used);
3049 unsigned free = compute_free_buffers_in_queue(q_head);
3050 if (free <= 0)
3051 {
3052 tmp.appfmt(" FULL!");
3053 }
3054 else if (free <= thr_job_queue::RESERVED)
3055 {
3056 tmp.appfmt(" HIGH LOAD (free:%d)", free);
3057 }
3058 tmp.appfmt("\n");
3059 }
3060 }
3061 }
3062 if (!tmp.empty())
3063 {
3064 ndbout_c("Dumping non-empty job queues:\n%s", tmp.c_str());
3065 }
3066 }
3067
3068 void
reportSendLen(NodeId nodeId,Uint32 count,Uint64 bytes)3069 trp_callback::reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes)
3070 {
3071 SignalT<3> signalT;
3072 Signal &signal = * new (&signalT) Signal(0);
3073 memset(&signal.header, 0, sizeof(signal.header));
3074
3075 if (g_send_threads)
3076 {
3077 /**
3078 * TODO: Implement this also when using send threads!!
3079 */
3080 return;
3081 }
3082
3083 signal.header.theLength = 3;
3084 signal.header.theSendersSignalId = 0;
3085 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
3086 signal.theData[0] = NDB_LE_SendBytesStatistic;
3087 signal.theData[1] = nodeId;
3088 signal.theData[2] = (Uint32)(bytes/count);
3089 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
3090 signal.header.theReceiversBlockNumber = CMVMI;
3091 sendlocal(g_thr_repository->m_send_buffers[nodeId].m_send_thread,
3092 &signalT.header, signalT.theData, NULL);
3093 }
3094
3095 /**
3096 * To lock during connect/disconnect, we take both the send lock for the node
3097 * (to protect performSend(), and the global receive lock (to protect
3098 * performReceive()). By having two locks, we avoid contention between the
3099 * common send and receive operations.
3100 *
3101 * We can have contention between connect/disconnect of one transporter and
3102 * receive for the others. But the transporter code should try to keep this
3103 * lock only briefly, ie. only to set state to DISCONNECTING / socket fd to
3104 * NDB_INVALID_SOCKET, not for the actual close() syscall.
3105 */
3106 void
lock_transporter(NodeId node)3107 trp_callback::lock_transporter(NodeId node)
3108 {
3109 Uint32 recv_thread_idx = mt_get_recv_thread_idx(node);
3110 struct thr_repository* rep = g_thr_repository;
3111 /**
3112 * Note: take the send lock _first_, so that we will not hold the receive
3113 * lock while blocking on the send lock.
3114 *
3115 * The reverse case, blocking send lock for one transporter while waiting
3116 * for receive lock, is not a problem, as the transporter being blocked is
3117 * in any case disconnecting/connecting at this point in time, and sends are
3118 * non-waiting (so we will not block sending on other transporters).
3119 */
3120 lock(&rep->m_send_buffers[node].m_send_lock);
3121 lock(&rep->m_receive_lock[recv_thread_idx]);
3122 }
3123
3124 void
unlock_transporter(NodeId node)3125 trp_callback::unlock_transporter(NodeId node)
3126 {
3127 Uint32 recv_thread_idx = mt_get_recv_thread_idx(node);
3128 struct thr_repository* rep = g_thr_repository;
3129 unlock(&rep->m_receive_lock[recv_thread_idx]);
3130 unlock(&rep->m_send_buffers[node].m_send_lock);
3131 }
3132
3133 int
mt_checkDoJob(Uint32 recv_thread_idx)3134 mt_checkDoJob(Uint32 recv_thread_idx)
3135 {
3136 struct thr_repository* rep = g_thr_repository;
3137
3138 /**
3139 * Return '1' if we are not allowed to receive more signals
3140 * into the job buffers from this 'recv_thread_idx'.
3141 *
3142 * NOTE:
3143 * We should not loop-wait for buffers to become available
3144 * here as we currently hold the receiver-lock. Furthermore
3145 * waiting too long here could cause the receiver thread to be
3146 * less responsive wrt. moving incoming (TCP) data from the
3147 * TCPTransporters into the (local) receiveBuffers.
3148 * The thread could also oversleep on its other tasks as
3149 * handling open/close of connections, and catching
3150 * its own shutdown events
3151 */
3152 return (get_congested_recv_queue(rep, recv_thread_idx) != NULL);
3153 }
3154
3155 /**
3156 * Collect all send-buffer-pages to be delivered to 'node'
3157 * from each thread. Link them together and append them to
3158 * the single send_buffer list 'sb->m_buffer'.
3159 *
3160 * The 'sb->m_buffer_lock' has to be held prior to calling
3161 * this function.
3162 *
3163 * Return: Number of bytes in the collected send-buffers.
3164 *
3165 * TODO: This is not completely fair,
3166 * it would be better to get one entry from each thr_send_queue
3167 * per thread instead (until empty)
3168 */
3169 static
3170 Uint32
link_thread_send_buffers(thr_repository::send_buffer * sb,Uint32 node)3171 link_thread_send_buffers(thr_repository::send_buffer * sb, Uint32 node)
3172 {
3173 Uint32 ri[MAX_BLOCK_THREADS];
3174 Uint32 wi[MAX_BLOCK_THREADS];
3175 thr_send_queue *src = g_thr_repository->m_thread_send_buffers[node];
3176 for (unsigned thr = 0; thr < num_threads; thr++)
3177 {
3178 ri[thr] = sb->m_read_index[thr];
3179 wi[thr] = src[thr].m_write_index;
3180 }
3181
3182 Uint64 sentinel[thr_send_page::HEADER_SIZE >> 1];
3183 thr_send_page* sentinel_page = new (&sentinel[0]) thr_send_page;
3184 sentinel_page->m_next = 0;
3185
3186 struct thr_send_buffer tmp;
3187 tmp.m_first_page = sentinel_page;
3188 tmp.m_last_page = sentinel_page;
3189
3190 Uint32 bytes = 0;
3191
3192 #ifdef ERROR_INSERT
3193
3194 #define MIXOLOGY_MIX_MT_SEND 2
3195
3196 if (unlikely(globalEmulatorData.theConfiguration->getMixologyLevel() &
3197 MIXOLOGY_MIX_MT_SEND))
3198 {
3199 /**
3200 * DEBUGGING only
3201 * Interleave at the page level from all threads with
3202 * pages to send - intended to help expose signal
3203 * order dependency bugs
3204 * TODO : Avoid having a whole separate implementation
3205 * like this.
3206 */
3207 bool more_pages;
3208
3209 do
3210 {
3211 src = g_thr_repository->m_thread_send_buffers[node];
3212 more_pages = false;
3213 for (unsigned thr = 0; thr < num_threads; thr++, src++)
3214 {
3215 Uint32 r = ri[thr];
3216 Uint32 w = wi[thr];
3217 if (r != w)
3218 {
3219 rmb();
3220 /* Take one page from this thread's send buffer for this node */
3221 thr_send_page * p = src->m_buffers[r];
3222 assert(p->m_start == 0);
3223 bytes += p->m_bytes;
3224 tmp.m_last_page->m_next = p;
3225 tmp.m_last_page = p;
3226
3227 /* Take page out of read_index slot list */
3228 thr_send_page * next = p->m_next;
3229 p->m_next = NULL;
3230 src->m_buffers[r] = next;
3231
3232 if (next == NULL)
3233 {
3234 /**
3235 * Used up read slot, any more slots available to read
3236 * from this thread?
3237 */
3238 r = (r+1) % thr_send_queue::SIZE;
3239 more_pages |= (r != w);
3240
3241 /* Update global and local per thread read indices */
3242 sb->m_read_index[thr] = r;
3243 ri[thr] = r;
3244 }
3245 else
3246 {
3247 more_pages |= true;
3248 }
3249 }
3250 }
3251 } while (more_pages);
3252 }
3253 else
3254
3255 #endif
3256
3257 {
3258 for (unsigned thr = 0; thr < num_threads; thr++, src++)
3259 {
3260 Uint32 r = ri[thr];
3261 Uint32 w = wi[thr];
3262 if (r != w)
3263 {
3264 rmb();
3265 while (r != w)
3266 {
3267 thr_send_page * p = src->m_buffers[r];
3268 assert(p->m_start == 0);
3269 bytes += p->m_bytes;
3270 tmp.m_last_page->m_next = p;
3271 while (p->m_next != 0)
3272 {
3273 p = p->m_next;
3274 assert(p->m_start == 0);
3275 bytes += p->m_bytes;
3276 }
3277 tmp.m_last_page = p;
3278 assert(tmp.m_last_page != 0); /* Impossible */
3279 r = (r + 1) % thr_send_queue::SIZE;
3280 }
3281 sb->m_read_index[thr] = r;
3282 }
3283 }
3284 }
3285 Uint64 node_total_send_buffer_size = sb->m_node_total_send_buffer_size;
3286 if (bytes)
3287 {
3288 /**
3289 * Append send buffers collected from threads
3290 * to end of existing m_buffers.
3291 */
3292 if (sb->m_buffer.m_first_page)
3293 {
3294 assert(sb->m_buffer.m_first_page != NULL);
3295 assert(sb->m_buffer.m_last_page != NULL);
3296 sb->m_buffer.m_last_page->m_next = tmp.m_first_page->m_next;
3297 sb->m_buffer.m_last_page = tmp.m_last_page;
3298 }
3299 else
3300 {
3301 assert(sb->m_buffer.m_first_page == NULL);
3302 assert(sb->m_buffer.m_last_page == NULL);
3303 sb->m_buffer.m_first_page = tmp.m_first_page->m_next;
3304 sb->m_buffer.m_last_page = tmp.m_last_page;
3305 }
3306 }
3307 sb->m_node_total_send_buffer_size =
3308 node_total_send_buffer_size + bytes;
3309 return bytes;
3310 }
3311
3312 /**
3313 * pack thr_send_pages for a particular send-buffer <em>db</em>
3314 * release pages (local) to <em>pool</em>
3315 *
3316 * We're using a very simple algorithm that packs two neighbour
3317 * pages into one page if possible, if not possible we simply
3318 * move on. This guarantees that pages will at least be full to
3319 * 50% fill level which should be sufficient for our needs here.
3320 *
3321 * We call pack_sb_pages() when we fail to send all data to one
3322 * specific node immediately. This ensures that we won't keep
3323 * pages allocated with lots of free spaces.
3324 *
3325 * We may also pack_sb_pages() from get_bytes_to_send_iovec()
3326 * if all send buffers can't be filled into the iovec[]. Thus
3327 * possibly saving extra send roundtrips.
3328 *
3329 * The send threads will use the pack_sb_pages()
3330 * from the bytes_sent function which is a callback from
3331 * the transporter.
3332 *
3333 * Can only be called with relevant lock held on 'buffer'.
3334 * Return remaining unsent bytes in 'buffer'.
3335 */
3336 static
3337 Uint32
pack_sb_pages(thread_local_pool<thr_send_page> * pool,struct thr_send_buffer * buffer)3338 pack_sb_pages(thread_local_pool<thr_send_page>* pool,
3339 struct thr_send_buffer* buffer)
3340 {
3341 assert(buffer->m_first_page != NULL);
3342 assert(buffer->m_last_page != NULL);
3343 assert(buffer->m_last_page->m_next == NULL);
3344
3345 thr_send_page* curr = buffer->m_first_page;
3346 Uint32 curr_free = curr->max_bytes() - (curr->m_bytes + curr->m_start);
3347 Uint32 bytes = curr->m_bytes;
3348 while (curr->m_next != 0)
3349 {
3350 thr_send_page* next = curr->m_next;
3351 bytes += next->m_bytes;
3352 assert(next->m_start == 0); // only first page should have half sent bytes
3353 if (next->m_bytes <= curr_free)
3354 {
3355 /**
3356 * There is free space in the current page and it is sufficient to
3357 * store the entire next-page. Copy from next page to current page
3358 * and update current page and release next page to local pool.
3359 */
3360 thr_send_page * save = next;
3361 memcpy(curr->m_data + (curr->m_bytes + curr->m_start),
3362 next->m_data,
3363 next->m_bytes);
3364
3365 curr_free -= next->m_bytes;
3366
3367 curr->m_bytes += next->m_bytes;
3368 curr->m_next = next->m_next;
3369
3370 pool->release_local(save);
3371 }
3372 else
3373 {
3374 /* Not enough free space in current, move to next page */
3375 curr = next;
3376 curr_free = curr->max_bytes() - (curr->m_bytes + curr->m_start);
3377 }
3378 }
3379
3380 buffer->m_last_page = curr;
3381 assert(bytes > 0);
3382 return bytes;
3383 }
3384
3385 /**
3386 * Get buffered pages ready to be sent by the transporter.
3387 * All pages returned from this function will refer to
3388 * pages in the m_sending buffers
3389 *
3390 * The 'sb->m_send_lock' has to be held prior to calling
3391 * this function.
3392 *
3393 * If more send_buffer pages are required from the
3394 * 'm_buffer', we will also grab the m_buffer_lock as
3395 * required. Any grabbed m_buffer's will be moved to
3396 * m_sending buffers
3397 */
3398 Uint32
get_bytes_to_send_iovec(NodeId node,struct iovec * dst,Uint32 max)3399 trp_callback::get_bytes_to_send_iovec(NodeId node,
3400 struct iovec *dst,
3401 Uint32 max)
3402 {
3403 thr_repository::send_buffer *sb = g_thr_repository->m_send_buffers + node;
3404
3405 if (max == 0)
3406 return 0;
3407
3408 /**
3409 * Collect any available send pages from the thread queues
3410 * and 'm_buffers'. Append them to the end of m_sending buffers
3411 */
3412 {
3413 lock(&sb->m_buffer_lock);
3414 link_thread_send_buffers(sb, node);
3415
3416 if (sb->m_buffer.m_first_page != NULL)
3417 {
3418 if (sb->m_sending.m_first_page == NULL)
3419 {
3420 sb->m_sending = sb->m_buffer;
3421 }
3422 else
3423 {
3424 assert(sb->m_sending.m_last_page != NULL);
3425 sb->m_sending.m_last_page->m_next = sb->m_buffer.m_first_page;
3426 sb->m_sending.m_last_page = sb->m_buffer.m_last_page;
3427 }
3428 sb->m_buffer.m_first_page = NULL;
3429 sb->m_buffer.m_last_page = NULL;
3430 }
3431 unlock(&sb->m_buffer_lock);
3432
3433 if (sb->m_sending.m_first_page == NULL)
3434 return 0;
3435 }
3436
3437 /**
3438 * Process linked-list and put into iovecs
3439 */
3440 fill_iovec:
3441 Uint32 tot = 0;
3442 Uint32 pos = 0;
3443 thr_send_page * p = sb->m_sending.m_first_page;
3444 sb->m_bytes_sent = 0;
3445
3446 do {
3447 dst[pos].iov_len = p->m_bytes;
3448 dst[pos].iov_base = p->m_data + p->m_start;
3449 assert(p->m_start + p->m_bytes <= p->max_bytes());
3450 tot += p->m_bytes;
3451 pos++;
3452 p = p->m_next;
3453 if (p == NULL)
3454 return pos;
3455 } while (pos < max);
3456
3457 /**
3458 * Possibly pack send-buffers to get better utilization:
3459 * If we were unable to fill all sendbuffers into iovec[],
3460 * we pack the sendbuffers now if they have a low fill degree.
3461 * This could save us another OS-send for sending the remaining.
3462 */
3463 if (pos == max && max > 1 && // Exhausted iovec[]
3464 tot < (pos * thr_send_page::max_bytes())/4) // < 25% filled
3465 {
3466 const Uint32 thr_no = sb->m_send_thread;
3467 assert(thr_no != NO_SEND_THREAD);
3468
3469 if (!is_send_thread(thr_no))
3470 {
3471 thr_data * thrptr = &g_thr_repository->m_thread[thr_no];
3472 pack_sb_pages(&thrptr->m_send_buffer_pool, &sb->m_sending);
3473 }
3474 else
3475 {
3476 pack_sb_pages(g_send_threads->get_send_buffer_pool(thr_no), &sb->m_sending);
3477 }
3478
3479 /**
3480 * Retry filling iovec[]. As 'pack' will ensure at least 50% fill degree,
3481 * we will not do another 'pack' after the retry.
3482 */
3483 goto fill_iovec;
3484 }
3485 return pos;
3486 }
3487
3488 static
3489 void
release_list(thread_local_pool<thr_send_page> * pool,thr_send_page * head,thr_send_page * tail)3490 release_list(thread_local_pool<thr_send_page>* pool,
3491 thr_send_page* head, thr_send_page * tail)
3492 {
3493 while (head != tail)
3494 {
3495 thr_send_page * tmp = head;
3496 head = head->m_next;
3497 pool->release_local(tmp);
3498 }
3499 pool->release_local(tail);
3500 }
3501
3502 static
3503 Uint32
bytes_sent(thread_local_pool<thr_send_page> * pool,thr_repository::send_buffer * sb,Uint32 bytes)3504 bytes_sent(thread_local_pool<thr_send_page>* pool,
3505 thr_repository::send_buffer* sb, Uint32 bytes)
3506 {
3507 Uint64 node_total_send_buffer_size = sb->m_node_total_send_buffer_size;
3508 assert(bytes);
3509
3510 sb->m_bytes_sent = bytes;
3511
3512 Uint32 remain = bytes;
3513 thr_send_page * prev = NULL;
3514 thr_send_page * curr = sb->m_sending.m_first_page;
3515 sb->m_node_total_send_buffer_size = node_total_send_buffer_size - bytes;
3516
3517 /* Some, or all, in 'm_sending' was sent, find endpoint. */
3518 while (remain && remain >= curr->m_bytes)
3519 {
3520 /**
3521 * Calculate new current page such that we can release the
3522 * pages that have been completed and update the state of
3523 * the new current page
3524 */
3525 remain -= curr->m_bytes;
3526 prev = curr;
3527 curr = curr->m_next;
3528 }
3529
3530 if (remain)
3531 {
3532 /**
3533 * Not all pages was fully sent and we stopped in the middle of
3534 * a page
3535 *
3536 * Update state of new current page and release any pages
3537 * that have already been sent
3538 */
3539 curr->m_start += remain;
3540 assert(curr->m_bytes > remain);
3541 curr->m_bytes -= remain;
3542 if (prev)
3543 {
3544 release_list(pool, sb->m_sending.m_first_page, prev);
3545 }
3546 }
3547 else
3548 {
3549 /**
3550 * We sent a couple of full pages and the sending stopped at a
3551 * page boundary, so we only need to release the sent pages
3552 * and update the new current page.
3553 */
3554 if (prev)
3555 {
3556 release_list(pool, sb->m_sending.m_first_page, prev);
3557
3558 if (prev == sb->m_sending.m_last_page)
3559 {
3560 /**
3561 * Every thing was released, release the pages in the local pool
3562 */
3563 sb->m_sending.m_first_page = NULL;
3564 sb->m_sending.m_last_page = NULL;
3565 return 0;
3566 }
3567 }
3568 else
3569 {
3570 assert(sb->m_sending.m_first_page != NULL);
3571 pool->release_local(sb->m_sending.m_first_page);
3572 }
3573 }
3574
3575 sb->m_sending.m_first_page = curr;
3576
3577 /**
3578 * Since not all bytes were sent...
3579 * spend the time to try to pack the m_sending pages
3580 * possibly releasing send-buffer
3581 */
3582 return pack_sb_pages(pool, &sb->m_sending);
3583 }
3584
3585 /**
3586 * Register the specified amount of 'bytes' as sent, starting
3587 * from the first avail byte in the m_sending buffer.
3588 *
3589 * The 'm_send_lock' has to be held prior to calling
3590 * this function.
3591 */
3592 Uint32
bytes_sent(NodeId node,Uint32 bytes)3593 trp_callback::bytes_sent(NodeId node, Uint32 bytes)
3594 {
3595 thr_repository::send_buffer *sb = g_thr_repository->m_send_buffers+node;
3596 Uint32 thr_no = sb->m_send_thread;
3597 assert(thr_no != NO_SEND_THREAD);
3598 if (!is_send_thread(thr_no))
3599 {
3600 thr_data * thrptr = &g_thr_repository->m_thread[thr_no];
3601 return ::bytes_sent(&thrptr->m_send_buffer_pool,
3602 sb,
3603 bytes);
3604 }
3605 else
3606 {
3607 return ::bytes_sent(g_send_threads->get_send_buffer_pool(thr_no),
3608 sb,
3609 bytes);
3610 }
3611 }
3612
3613 /**
3614 * NOTE:
3615 * ::has_data_to_send() is only called
3616 * from TransporterRegistry::performSend().
3617 * ::performSend() in turn, is only called from either
3618 * the single threaded scheduler, or the API, which
3619 * will end up in the single threaded ::has_data_to_send()
3620 * implemented in class TransporterCallbackKernelNonMT
3621 * Thus, this ::has_data_to_send is actually never used!
3622 *
3623 * However, a simple implementaton based on probing
3624 * get_bytes_to_send_iovec() is provided for completenes.
3625 * As this is unused code, it is completely untested.
3626 */
3627 bool
has_data_to_send(NodeId node)3628 trp_callback::has_data_to_send(NodeId node)
3629 {
3630 assert(false); //Trap untested code, see comment above
3631 struct iovec v[1];
3632 return (get_bytes_to_send_iovec(node, v, 1) > 0);
3633 }
3634
3635 /**
3636 * Reset send buffers by releasing all buffered send pages,
3637 * in both the m_buffer and m_sending buffers, *and*
3638 * available thread send buffers.
3639 *
3640 * Neither m_send_lock or m_buffer_lock should be set prior
3641 * to calling this function, they will be acquired here
3642 * as required.
3643 */
3644 void
reset_send_buffer(NodeId node,bool should_be_empty)3645 trp_callback::reset_send_buffer(NodeId node, bool should_be_empty)
3646 {
3647 struct thr_repository *rep = g_thr_repository;
3648 thr_repository::send_buffer * sb = rep->m_send_buffers+node;
3649
3650 thread_local_pool<thr_send_page> pool(&rep->m_sb_pool, 0);
3651
3652 lock(&sb->m_send_lock);
3653 lock(&sb->m_buffer_lock);
3654
3655 /* Collect thread send buffers into m_buffer. */
3656 link_thread_send_buffers(sb, node);
3657
3658 /* Drop all pending data in m_buffer. */
3659 if (sb->m_buffer.m_first_page)
3660 {
3661 release_list(&pool, sb->m_buffer.m_first_page, sb->m_buffer.m_last_page);
3662 sb->m_buffer.m_first_page = NULL;
3663 sb->m_buffer.m_last_page = NULL;
3664 assert(!should_be_empty); // Got data when it should be empty
3665 }
3666
3667 /* Drop all pending data in m_sending buffers. */
3668 if (sb->m_sending.m_first_page)
3669 {
3670 release_list(&pool, sb->m_sending.m_first_page, sb->m_sending.m_last_page);
3671 sb->m_sending.m_first_page = NULL;
3672 sb->m_sending.m_last_page = NULL;
3673
3674 assert(!should_be_empty); // Got data when it should be empty
3675 }
3676 sb->m_node_total_send_buffer_size = 0;
3677 unlock(&sb->m_buffer_lock);
3678 unlock(&sb->m_send_lock);
3679
3680 pool.release_all(rep->m_mm, RG_TRANSPORTER_BUFFERS);
3681 }
3682
3683 static inline
3684 void
register_pending_send(thr_data * selfptr,Uint32 nodeId)3685 register_pending_send(thr_data *selfptr, Uint32 nodeId)
3686 {
3687 /* Mark that this node has pending send data. */
3688 if (!selfptr->m_pending_send_mask.get(nodeId))
3689 {
3690 selfptr->m_pending_send_mask.set(nodeId, 1);
3691 Uint32 i = selfptr->m_pending_send_count;
3692 selfptr->m_pending_send_nodes[i] = nodeId;
3693 selfptr->m_pending_send_count = i + 1;
3694 }
3695 }
3696
3697 /**
3698 Pack send buffers to make memory available to other threads. The signals
3699 sent uses often one page per signal which means that most pages are very
3700 unpacked. In some situations this means that we can run out of send buffers
3701 and still have massive amounts of free space.
3702
3703 We call this from the main loop in the block threads when we fail to
3704 allocate enough send buffers. In addition we call the node local
3705 pack_sb_pages() several places - See header-comment for that function.
3706 */
3707 static
3708 void
try_pack_send_buffers(thr_data * selfptr)3709 try_pack_send_buffers(thr_data* selfptr)
3710 {
3711 thr_repository* rep = g_thr_repository;
3712 thread_local_pool<thr_send_page>* pool = &selfptr->m_send_buffer_pool;
3713
3714 for (Uint32 i = 1; i < NDB_ARRAY_SIZE(selfptr->m_send_buffers); i++)
3715 {
3716 if (globalTransporterRegistry.get_transporter(i))
3717 {
3718 thr_repository::send_buffer* sb = rep->m_send_buffers+i;
3719 if (trylock(&sb->m_buffer_lock) != 0)
3720 {
3721 continue; // Continue with next if busy
3722 }
3723
3724 link_thread_send_buffers(sb, i);
3725 if (sb->m_buffer.m_first_page != NULL)
3726 {
3727 pack_sb_pages(pool, &sb->m_buffer);
3728 }
3729 unlock(&sb->m_buffer_lock);
3730 }
3731 }
3732 /* Release surplus buffers from local pool to global pool */
3733 pool->release_global(g_thr_repository->m_mm, RG_TRANSPORTER_BUFFERS);
3734 }
3735
3736
3737 /**
3738 * publish thread-locally prepared send-buffer
3739 */
3740 static
3741 void
flush_send_buffer(thr_data * selfptr,Uint32 node)3742 flush_send_buffer(thr_data* selfptr, Uint32 node)
3743 {
3744 Uint32 thr_no = selfptr->m_thr_no;
3745 thr_send_buffer * src = selfptr->m_send_buffers + node;
3746 thr_repository* rep = g_thr_repository;
3747
3748 if (src->m_first_page == 0)
3749 {
3750 return;
3751 }
3752 assert(src->m_last_page != 0);
3753
3754 thr_send_queue * dst = rep->m_thread_send_buffers[node]+thr_no;
3755 thr_repository::send_buffer* sb = rep->m_send_buffers+node;
3756
3757 Uint32 wi = dst->m_write_index;
3758 Uint32 next = (wi + 1) % thr_send_queue::SIZE;
3759 Uint32 ri = sb->m_read_index[thr_no];
3760
3761 /**
3762 * If thread local ring buffer of send-buffers is full:
3763 * Empty it by transfering them to the global send_buffer list.
3764 */
3765 if (unlikely(next == ri))
3766 {
3767 lock(&sb->m_buffer_lock);
3768 link_thread_send_buffers(sb, node);
3769 unlock(&sb->m_buffer_lock);
3770 }
3771
3772 dst->m_buffers[wi] = src->m_first_page;
3773 wmb();
3774 dst->m_write_index = next;
3775
3776 src->m_first_page = 0;
3777 src->m_last_page = 0;
3778 }
3779
3780 /**
3781 * This is used in case send buffer gets full, to force an emergency send,
3782 * hopefully freeing up some buffer space for the next signal.
3783 */
3784 bool
forceSend(NodeId nodeId)3785 mt_send_handle::forceSend(NodeId nodeId)
3786 {
3787 struct thr_repository *rep = g_thr_repository;
3788 struct thr_data *selfptr = m_selfptr;
3789 struct thr_repository::send_buffer * sb = rep->m_send_buffers + nodeId;
3790
3791 {
3792 /**
3793 * NOTE: we don't need a memory barrier after clearing
3794 * m_force_send here as we unconditionally lock m_send_lock
3795 * hence there is no way that our data can be "unsent"
3796 */
3797 sb->m_force_send = 0;
3798
3799 lock(&sb->m_send_lock);
3800 sb->m_send_thread = selfptr->m_thr_no;
3801 globalTransporterRegistry.performSend(nodeId);
3802 sb->m_send_thread = NO_SEND_THREAD;
3803 unlock(&sb->m_send_lock);
3804
3805 /**
3806 * release buffers prior to maybe looping on sb->m_force_send
3807 */
3808 selfptr->m_send_buffer_pool.release_global(rep->m_mm,
3809 RG_TRANSPORTER_BUFFERS);
3810 /**
3811 * We need a memory barrier here to prevent race between clearing lock
3812 * and reading of m_force_send.
3813 * CPU can reorder the load to before the clear of the lock
3814 */
3815 mb();
3816 if (unlikely(sb->m_force_send))
3817 {
3818 register_pending_send(selfptr, nodeId);
3819 }
3820 }
3821
3822 return true;
3823 }
3824
3825 /**
3826 * try sending data
3827 */
3828 static
3829 void
try_send(thr_data * selfptr,Uint32 node)3830 try_send(thr_data * selfptr, Uint32 node)
3831 {
3832 struct thr_repository *rep = g_thr_repository;
3833 struct thr_repository::send_buffer * sb = rep->m_send_buffers + node;
3834
3835 if (trylock(&sb->m_send_lock) == 0)
3836 {
3837 /**
3838 * Now clear the flag, and start sending all data available to this node.
3839 *
3840 * Put a memory barrier here, so that if another thread tries to grab
3841 * the send lock but fails due to us holding it here, we either
3842 * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or
3843 * 2) We clear here the flag just set by the other thread, but then we
3844 * will (thanks to mb()) be able to see and send all of the data already
3845 * in the first send iteration.
3846 */
3847 sb->m_force_send = 0;
3848 mb();
3849
3850 sb->m_send_thread = selfptr->m_thr_no;
3851 globalTransporterRegistry.performSend(node);
3852 sb->m_send_thread = NO_SEND_THREAD;
3853 unlock(&sb->m_send_lock);
3854
3855 /**
3856 * release buffers prior to maybe looping on sb->m_force_send
3857 */
3858 selfptr->m_send_buffer_pool.release_global(rep->m_mm,
3859 RG_TRANSPORTER_BUFFERS);
3860
3861 /**
3862 * We need a memory barrier here to prevent race between clearing lock
3863 * and reading of m_force_send.
3864 * CPU can reorder the load to before the clear of the lock
3865 */
3866 mb();
3867 if (unlikely(sb->m_force_send))
3868 {
3869 register_pending_send(selfptr, node);
3870 }
3871 }
3872 }
3873
3874 /**
3875 * Flush send buffers and append them to dst. nodes send queue
3876 *
3877 * Flushed buffer contents are piggybacked when another thread
3878 * do_send() to the same dst. node. This makes it possible to have
3879 * more data included in each message, and thereby reduces total
3880 * #messages handled by the OS which really impacts performance!
3881 */
3882 static
3883 void
do_flush(struct thr_data * selfptr)3884 do_flush(struct thr_data* selfptr)
3885 {
3886 Uint32 i;
3887 Uint32 count = selfptr->m_pending_send_count;
3888 Uint8 *nodes = selfptr->m_pending_send_nodes;
3889
3890 for (i = 0; i < count; i++)
3891 {
3892 flush_send_buffer(selfptr, nodes[i]);
3893 }
3894 }
3895
3896 /**
3897 * Send any pending data to remote nodes.
3898 *
3899 * If MUST_SEND is false, will only try to lock the send lock, but if it would
3900 * block, that node is skipped, to be tried again next time round.
3901 *
3902 * If MUST_SEND is true, we still only try to lock, but if it would block,
3903 * we will force the thread holding the lock, to do the sending on our behalf.
3904 *
3905 * The list of pending nodes to send to is thread-local, but the per-node send
3906 * buffer is shared by all threads. Thus we might skip a node for which
3907 * another thread has pending send data, and we might send pending data also
3908 * for another thread without clearing the node from the pending list of that
3909 * other thread (but we will never loose signals due to this).
3910 *
3911 * Return number of nodes which still has pending data to be sent.
3912 * These will be retried again in the next round. 'Pending' is
3913 * returned as a negative number if nothing was sent in this round.
3914 *
3915 * (Likely due to receivers consuming too slow, and receive and send buffers
3916 * already being filled up)
3917 */
3918 static
3919 Int32
do_send(struct thr_data * selfptr,bool must_send)3920 do_send(struct thr_data* selfptr, bool must_send)
3921 {
3922 Uint32 count = selfptr->m_pending_send_count;
3923 Uint8 *nodes = selfptr->m_pending_send_nodes;
3924
3925 if (count == 0)
3926 {
3927 return 0; // send-buffers empty
3928 }
3929
3930 /* Clear the pending list. */
3931 selfptr->m_pending_send_mask.clear();
3932 selfptr->m_pending_send_count = 0;
3933
3934 if (g_send_threads)
3935 {
3936 const NDB_TICKS now = NdbTick_getCurrentTicks();
3937
3938 /**
3939 * We're using send threads, in this case we simply alert any send
3940 * thread to take over the actual sending of the signals. In this case
3941 * we will never have any failures. So we simply need to flush buffers
3942 * leave over to the send thread
3943 */
3944 for (Uint32 i = 0; i < count; i++)
3945 {
3946 Uint32 node = nodes[i];
3947 selfptr->m_watchdog_counter = 6;
3948
3949 flush_send_buffer(selfptr, node);
3950 g_send_threads->alert_send_thread(node, now);
3951 }
3952 return 0;
3953 }
3954
3955 /**
3956 * We're not using send threads.
3957 */
3958 Uint32 made_progress = 0;
3959 struct thr_repository* rep = g_thr_repository;
3960
3961 for (Uint32 i = 0; i < count; i++)
3962 {
3963 Uint32 node = nodes[i];
3964 thr_repository::send_buffer * sb = rep->m_send_buffers + node;
3965
3966 selfptr->m_watchdog_counter = 6;
3967 flush_send_buffer(selfptr, node);
3968
3969 /**
3970 * If we must send now, set the force_send flag.
3971 *
3972 * This will ensure that if we do not get the send lock, the thread
3973 * holding the lock will try sending again for us when it has released
3974 * the lock.
3975 *
3976 * The lock/unlock pair works as a memory barrier to ensure that the
3977 * flag update is flushed to the other thread.
3978 */
3979 if (must_send)
3980 {
3981 sb->m_force_send = 1;
3982 }
3983
3984 if (trylock(&sb->m_send_lock) != 0)
3985 {
3986 if (!must_send)
3987 {
3988 /**
3989 * Not doing this node now, re-add to pending list.
3990 *
3991 * As we only add from the start of an empty list, we are safe from
3992 * overwriting the list while we are iterating over it.
3993 */
3994 register_pending_send(selfptr, node);
3995 }
3996 else
3997 {
3998 /* Other thread will send for us as we set m_force_send. */
3999 }
4000 }
4001 else //Got send_lock
4002 {
4003 /**
4004 * Now clear the flag, and start sending all data available to this node.
4005 *
4006 * Put a memory barrier here, so that if another thread tries to grab
4007 * the send lock but fails due to us holding it here, we either
4008 * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or
4009 * 2) We clear here the flag just set by the other thread, but then we
4010 * will (thanks to mb()) be able to see and send all of the data already
4011 * in the first send iteration.
4012 */
4013 sb->m_force_send = 0;
4014 mb();
4015
4016 /**
4017 * Set m_send_thr so that our transporter callback can know which thread
4018 * holds the send lock for this remote node.
4019 */
4020 sb->m_send_thread = selfptr->m_thr_no;
4021 const bool more = globalTransporterRegistry.performSend(node);
4022 made_progress += sb->m_bytes_sent;
4023 sb->m_send_thread = NO_SEND_THREAD;
4024 unlock(&sb->m_send_lock);
4025
4026 if (more) //Didn't complete all my send work
4027 {
4028 register_pending_send(selfptr, node);
4029 }
4030 else
4031 {
4032 /**
4033 * We need a memory barrier here to prevent race between clearing lock
4034 * and reading of m_force_send.
4035 * CPU can reorder the load to before the clear of the lock
4036 */
4037 mb();
4038 if (sb->m_force_send) //Other thread forced us to do more send
4039 {
4040 made_progress++; //Avoid false 'no progres' handling
4041 register_pending_send(selfptr, node);
4042 }
4043 }
4044 }
4045 } //for all nodes
4046
4047 selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
4048
4049 return (made_progress) // Had some progress?
4050 ? selfptr->m_pending_send_count // More do_send is required
4051 : -selfptr->m_pending_send_count; // All busy, or didn't find any work (-> -0)
4052 }
4053
4054 /**
4055 * These are the implementations of the TransporterSendBufferHandle methods
4056 * in ndbmtd.
4057 */
4058 Uint32 *
getWritePtr(NodeId node,Uint32 len,Uint32 prio,Uint32 max)4059 mt_send_handle::getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max)
4060 {
4061 struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
4062 thr_send_page * p = b->m_last_page;
4063 if ((p != 0) && (p->m_bytes + p->m_start + len <= thr_send_page::max_bytes()))
4064 {
4065 return (Uint32*)(p->m_data + p->m_start + p->m_bytes);
4066 }
4067 else if (p != 0)
4068 {
4069 // TODO: maybe dont always flush on page-boundary ???
4070 flush_send_buffer(m_selfptr, node);
4071 if (!g_send_threads)
4072 try_send(m_selfptr, node);
4073 }
4074
4075 if ((p = m_selfptr->m_send_buffer_pool.seize(g_thr_repository->m_mm,
4076 RG_TRANSPORTER_BUFFERS)) != 0)
4077 {
4078 p->m_bytes = 0;
4079 p->m_start = 0;
4080 p->m_next = 0;
4081 b->m_first_page = b->m_last_page = p;
4082 return (Uint32*)p->m_data;
4083 }
4084 return 0;
4085 }
4086
4087 /**
4088 * Acquire send buffer size without locking and without gathering
4089 *
4090 * OJA: The usability of this function is rather questionable.
4091 * m_node_total_send_buffer_size is only updated by
4092 * link_thread_send_buffers() and bytes_sent(), both
4093 * part of performSend(). Thus, it is valid after a send.
4094 * However, checking it *before* a send in order to
4095 * determine if the payload is yet too small doesn't
4096 * really provide correct information of the current state.
4097 * Most likely '0 will be returned if previous send succeeded.
4098 *
4099 * A better alternative could be to add a 'min_send' argument
4100 * to perform_send(), and skip sending if not '>='.
4101 * (After real size is recalculated)
4102 */
4103 static Uint64
mt_get_send_buffer_bytes(NodeId node)4104 mt_get_send_buffer_bytes(NodeId node)
4105 {
4106 thr_repository *rep = g_thr_repository;
4107 thr_repository::send_buffer *sb = &rep->m_send_buffers[node];
4108 const Uint64 send_buffer_size = sb->m_node_total_send_buffer_size;
4109 return send_buffer_size;
4110 }
4111
4112 void
mt_getSendBufferLevel(Uint32 self,NodeId node,SB_LevelType & level)4113 mt_getSendBufferLevel(Uint32 self, NodeId node, SB_LevelType &level)
4114 {
4115 Resource_limit rl, rl_shared;
4116 const Uint32 page_size = thr_send_page::PGSIZE;
4117 thr_repository *rep = g_thr_repository;
4118 thr_repository::send_buffer *b = &rep->m_send_buffers[node];
4119 Uint64 current_node_send_buffer_size = b->m_node_total_send_buffer_size;
4120
4121 rep->m_mm->get_resource_limit_nolock(RG_TRANSPORTER_BUFFERS, rl);
4122 Uint64 current_send_buffer_size = rl.m_min * page_size;
4123 Uint64 current_used_send_buffer_size = rl.m_curr * page_size * 100;
4124 Uint64 current_percentage =
4125 current_used_send_buffer_size / current_send_buffer_size;
4126
4127 if (current_percentage >= 90)
4128 {
4129 rep->m_mm->get_resource_limit(0, rl_shared);
4130 Uint32 avail_shared = rl_shared.m_max - rl_shared.m_curr;
4131 if (rl.m_min + avail_shared > rl.m_max)
4132 {
4133 current_send_buffer_size = rl.m_max * page_size;
4134 }
4135 else
4136 {
4137 current_send_buffer_size = (rl.m_min + avail_shared) * page_size;
4138 }
4139 }
4140 calculate_send_buffer_level(current_node_send_buffer_size,
4141 current_send_buffer_size,
4142 current_used_send_buffer_size,
4143 num_threads,
4144 level);
4145 return;
4146 }
4147
4148 void
getSendBufferLevel(NodeId node,SB_LevelType & level)4149 mt_send_handle::getSendBufferLevel(NodeId node, SB_LevelType &level)
4150 {
4151 (void)node;
4152 (void)level;
4153 return;
4154 }
4155
4156 Uint32
updateWritePtr(NodeId node,Uint32 lenBytes,Uint32 prio)4157 mt_send_handle::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
4158 {
4159 struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
4160 thr_send_page * p = b->m_last_page;
4161 p->m_bytes += lenBytes;
4162 return p->m_bytes;
4163 }
4164
4165 /*
4166 * Insert a signal in a job queue.
4167 *
4168 * The signal is not visible to consumers yet after return from this function,
4169 * only recorded in the thr_jb_write_state. It is necessary to first call
4170 * flush_write_state() for this.
4171 *
4172 * The new_buffer is a job buffer to use if the current one gets full. If used,
4173 * we return true, indicating that the caller should allocate a new one for
4174 * the next call. (This is done to allow to insert under lock, but do the
4175 * allocation outside the lock).
4176 */
4177 static inline
4178 bool
insert_signal(thr_job_queue * q,thr_job_queue_head * h,thr_jb_write_state * w,Uint32 prioa,const SignalHeader * sh,const Uint32 * data,const Uint32 secPtr[3],thr_job_buffer * new_buffer)4179 insert_signal(thr_job_queue *q, thr_job_queue_head *h,
4180 thr_jb_write_state *w, Uint32 prioa,
4181 const SignalHeader* sh, const Uint32 *data,
4182 const Uint32 secPtr[3], thr_job_buffer *new_buffer)
4183 {
4184 Uint32 write_pos = w->m_write_pos;
4185 Uint32 datalen = sh->theLength;
4186 assert(w->is_open());
4187 assert(w->m_write_buffer == q->m_buffers[w->m_write_index]);
4188 memcpy(w->m_write_buffer->m_data + write_pos, sh, sizeof(*sh));
4189 write_pos += (sizeof(*sh) >> 2);
4190 memcpy(w->m_write_buffer->m_data + write_pos, data, 4*datalen);
4191 write_pos += datalen;
4192 const Uint32 *p= secPtr;
4193 for (Uint32 i = 0; i < sh->m_noOfSections; i++)
4194 w->m_write_buffer->m_data[write_pos++] = *p++;
4195 w->increment_pending_signals();
4196
4197 #if SIZEOF_CHARP == 8
4198 /* Align to 8-byte boundary, to ensure aligned copies. */
4199 write_pos= (write_pos+1) & ~((Uint32)1);
4200 #endif
4201
4202 /*
4203 * We make sure that there is always room for at least one signal in the
4204 * current buffer in the queue, so one insert is always possible without
4205 * adding a new buffer.
4206 */
4207 if (likely(write_pos + MAX_SIGNAL_SIZE <= thr_job_buffer::SIZE))
4208 {
4209 w->m_write_pos = write_pos;
4210 return false;
4211 }
4212 else
4213 {
4214 /*
4215 * Need a write memory barrier here, as this might make signal data visible
4216 * to other threads.
4217 *
4218 * ToDo: We actually only need the wmb() here if we already make this
4219 * buffer visible to the other thread. So we might optimize it a bit. But
4220 * wmb() is a no-op on x86 anyway...
4221 */
4222 wmb();
4223 w->m_write_buffer->m_len = write_pos;
4224 Uint32 write_index = (w->m_write_index + 1) % thr_job_queue::SIZE;
4225
4226 /**
4227 * Full job buffer is fatal.
4228 *
4229 * ToDo: should we wait for it to become non-full? There is no guarantee
4230 * that this will actually happen...
4231 *
4232 * Or alternatively, ndbrequire() ?
4233 */
4234 if (unlikely(write_index == h->m_read_index))
4235 {
4236 job_buffer_full(0);
4237 }
4238 new_buffer->m_len = 0;
4239 new_buffer->m_prioa = prioa;
4240 q->m_buffers[write_index] = new_buffer;
4241 w->m_write_index = write_index;
4242 w->m_write_pos = 0;
4243 w->m_write_buffer = new_buffer;
4244 return true; // Buffer new_buffer used
4245 }
4246
4247 return false; // Buffer new_buffer not used
4248 }
4249
4250 static
4251 void
read_jbb_state(thr_data * selfptr,Uint32 count)4252 read_jbb_state(thr_data *selfptr, Uint32 count)
4253 {
4254 thr_jb_read_state *r = selfptr->m_read_states;
4255 const thr_job_queue *q = selfptr->m_in_queue;
4256 const thr_job_queue_head *h = selfptr->m_in_queue_head;
4257 for (Uint32 i = 0; i < count; i++,r++)
4258 {
4259 if (r->is_open())
4260 {
4261 Uint32 read_index = r->m_read_index;
4262
4263 /**
4264 * Optimization: Only reload when possibly empty.
4265 * Avoid cache reload of shared thr_job_queue_head
4266 * Read head directly to avoid unnecessary cache
4267 * load of first cache line of m_in_queue entry.
4268 */
4269 if (r->m_write_index == read_index)
4270 {
4271 r->m_write_index = h[i].m_write_index;
4272 read_barrier_depends();
4273 r->m_read_end = q[i].m_buffers[read_index]->m_len;
4274 }
4275 }
4276 }
4277 }
4278
4279 static
4280 bool
read_jba_state(thr_data * selfptr)4281 read_jba_state(thr_data *selfptr)
4282 {
4283 thr_jb_read_state *r = &(selfptr->m_jba_read_state);
4284 r->m_write_index = selfptr->m_jba_head.m_write_index;
4285 read_barrier_depends();
4286 r->m_read_end = selfptr->m_jba.m_buffers[r->m_read_index]->m_len;
4287 return r->is_empty();
4288 }
4289
4290 /* Check all job queues, return true only if all are empty. */
4291 static bool
check_queues_empty(thr_data * selfptr)4292 check_queues_empty(thr_data *selfptr)
4293 {
4294 Uint32 thr_count = g_thr_repository->m_thread_count;
4295 bool empty = read_jba_state(selfptr);
4296 if (!empty)
4297 return false;
4298
4299 read_jbb_state(selfptr, thr_count);
4300 const thr_jb_read_state *r = selfptr->m_read_states;
4301 for (Uint32 i = 0; i < thr_count; i++,r++)
4302 {
4303 if (!r->is_empty())
4304 return false;
4305 }
4306 return true;
4307 }
4308
4309 /*
4310 * Execute at most MAX_SIGNALS signals from one job queue, updating local read
4311 * state as appropriate.
4312 *
4313 * Returns number of signals actually executed.
4314 */
4315 static
4316 Uint32
execute_signals(thr_data * selfptr,thr_job_queue * q,thr_job_queue_head * h,thr_jb_read_state * r,Signal * sig,Uint32 max_signals)4317 execute_signals(thr_data *selfptr,
4318 thr_job_queue *q, thr_job_queue_head *h,
4319 thr_jb_read_state *r,
4320 Signal *sig, Uint32 max_signals)
4321 {
4322 Uint32 num_signals;
4323 Uint32 read_index = r->m_read_index;
4324 Uint32 write_index = r->m_write_index;
4325 Uint32 read_pos = r->m_read_pos;
4326 Uint32 read_end = r->m_read_end;
4327 Uint32 *watchDogCounter = &selfptr->m_watchdog_counter;
4328
4329 if (read_index == write_index && read_pos >= read_end)
4330 return 0; // empty read_state
4331
4332 thr_job_buffer *read_buffer = r->m_read_buffer;
4333
4334 for (num_signals = 0; num_signals < max_signals; num_signals++)
4335 {
4336 while (read_pos >= read_end)
4337 {
4338 if (read_index == write_index)
4339 {
4340 /* No more available now. */
4341 return num_signals;
4342 }
4343 else
4344 {
4345 /* Move to next buffer. */
4346 read_index = (read_index + 1) % thr_job_queue::SIZE;
4347 release_buffer(g_thr_repository, selfptr->m_thr_no, read_buffer);
4348 read_buffer = q->m_buffers[read_index];
4349 read_pos = 0;
4350 read_end = read_buffer->m_len;
4351 /* Update thread-local read state. */
4352 r->m_read_index = h->m_read_index = read_index;
4353 r->m_read_buffer = read_buffer;
4354 r->m_read_pos = read_pos;
4355 r->m_read_end = read_end;
4356 /* Wakeup threads waiting for job buffers to become free */
4357 wakeup(&h->m_waiter);
4358 }
4359 }
4360
4361 /*
4362 * These pre-fetching were found using OProfile to reduce cache misses.
4363 * (Though on Intel Core 2, they do not give much speedup, as apparently
4364 * the hardware prefetcher is already doing a fairly good job).
4365 */
4366 NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 16);
4367 NDB_PREFETCH_WRITE ((Uint32 *)&sig->header + 16);
4368
4369 #ifdef VM_TRACE
4370 /* Find reading / propagation of junk */
4371 sig->garbage_register();
4372 #endif
4373 /* Now execute the signal. */
4374 SignalHeader* s =
4375 reinterpret_cast<SignalHeader*>(read_buffer->m_data + read_pos);
4376 Uint32 seccnt = s->m_noOfSections;
4377 Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
4378 if(siglen>16)
4379 {
4380 NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 32);
4381 }
4382 Uint32 bno = blockToMain(s->theReceiversBlockNumber);
4383 Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
4384 SimulatedBlock* block = globalData.mt_getBlock(bno, ino);
4385 assert(block != 0);
4386
4387 Uint32 gsn = s->theVerId_signalNumber;
4388 *watchDogCounter = 1;
4389 /* Must update original buffer so signal dump will see it. */
4390 s->theSignalId = selfptr->m_signal_id_counter++;
4391 memcpy(&sig->header, s, 4*siglen);
4392 sig->m_sectionPtrI[0] = read_buffer->m_data[read_pos + siglen + 0];
4393 sig->m_sectionPtrI[1] = read_buffer->m_data[read_pos + siglen + 1];
4394 sig->m_sectionPtrI[2] = read_buffer->m_data[read_pos + siglen + 2];
4395
4396 read_pos += siglen + seccnt;
4397 #if SIZEOF_CHARP == 8
4398 /* Handle 8-byte alignment. */
4399 read_pos = (read_pos + 1) & ~((Uint32)1);
4400 #endif
4401
4402 /* Update just before execute so signal dump can know how far we are. */
4403 r->m_read_pos = read_pos;
4404
4405 #ifdef VM_TRACE
4406 if (globalData.testOn)
4407 { //wl4391_todo segments
4408 SegmentedSectionPtr ptr[3];
4409 ptr[0].i = sig->m_sectionPtrI[0];
4410 ptr[1].i = sig->m_sectionPtrI[1];
4411 ptr[2].i = sig->m_sectionPtrI[2];
4412 ::getSections(seccnt, ptr);
4413 globalSignalLoggers.executeSignal(*s,
4414 0,
4415 &sig->theData[0],
4416 globalData.ownId,
4417 ptr, seccnt);
4418 }
4419 #endif
4420
4421 block->jamBuffer()->markEndOfSigExec();
4422 block->executeFunction_async(gsn, sig);
4423 }
4424
4425 return num_signals;
4426 }
4427
4428 static
4429 Uint32
run_job_buffers(thr_data * selfptr,Signal * sig)4430 run_job_buffers(thr_data *selfptr, Signal *sig)
4431 {
4432 Uint32 thr_count = g_thr_repository->m_thread_count;
4433 Uint32 signal_count = 0;
4434 Uint32 signal_count_since_last_zero_time_queue = 0;
4435 Uint32 perjb = selfptr->m_max_signals_per_jb;
4436
4437 read_jbb_state(selfptr, thr_count);
4438 /*
4439 * A load memory barrier to ensure that we see any prio A signal sent later
4440 * than loaded prio B signals.
4441 */
4442 rmb();
4443
4444 thr_job_queue *queue = selfptr->m_in_queue;
4445 thr_job_queue_head *head = selfptr->m_in_queue_head;
4446 thr_jb_read_state *read_state = selfptr->m_read_states;
4447 for (Uint32 send_thr_no = 0; send_thr_no < thr_count;
4448 send_thr_no++,queue++,read_state++,head++)
4449 {
4450 /* Read the prio A state often, to avoid starvation of prio A. */
4451 while (!read_jba_state(selfptr))
4452 {
4453 selfptr->m_sent_local_prioa_signal = false;
4454 static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
4455 signal_count += execute_signals(selfptr,
4456 &(selfptr->m_jba),
4457 &(selfptr->m_jba_head),
4458 &(selfptr->m_jba_read_state), sig,
4459 max_prioA);
4460 if (!selfptr->m_sent_local_prioa_signal)
4461 {
4462 /**
4463 * Break out of loop if there was no prio A signals generated
4464 * from the local execution.
4465 */
4466 break;
4467 }
4468 }
4469
4470 /**
4471 * Contended queues get an extra execute quota:
4472 *
4473 * If we didn't get a max 'perjb' quota, our out buffers
4474 * are about to fill up. This thread is thus effectively
4475 * slowed down in order to let other threads consume from
4476 * our out buffers. Eventually, when 'perjb==0', we will
4477 * have to wait/sleep for buffers to become available.
4478 *
4479 * This can bring is into a circular wait-lock, where
4480 * threads are stalled due to full out buffers. The same
4481 * thread may also have full in buffers, thus blocking other
4482 * threads from progressing. This could bring us into a
4483 * circular wait-lock, where no threads are able to progress.
4484 * The entire scheduler will then be stuck.
4485 *
4486 * We try to avoid this situation by reserving some
4487 * 'm_max_extra_signals' which are only used to consume
4488 * from 'almost full' in-buffers. We will then reduce the
4489 * risk of ending up in the above wait-lock.
4490 *
4491 * Exclude receiver threads, as there can't be a
4492 * circular wait between recv-thread and workers.
4493 */
4494 Uint32 extra = 0;
4495
4496 if (perjb < MAX_SIGNALS_PER_JB) //Job buffer contention
4497 {
4498 const Uint32 free = compute_free_buffers_in_queue(head);
4499 if (free <= thr_job_queue::ALMOST_FULL)
4500 {
4501 if (selfptr->m_max_extra_signals > MAX_SIGNALS_PER_JB - perjb)
4502 {
4503 extra = MAX_SIGNALS_PER_JB - perjb;
4504 }
4505 else
4506 {
4507 extra = selfptr->m_max_extra_signals;
4508 selfptr->m_max_exec_signals = 0; //Force recalc
4509 }
4510 selfptr->m_max_extra_signals -= extra;
4511 }
4512 }
4513
4514 #ifdef ERROR_INSERT
4515
4516 #define MIXOLOGY_MIX_MT_JBB 1
4517
4518 if (unlikely(globalEmulatorData.theConfiguration->getMixologyLevel() &
4519 MIXOLOGY_MIX_MT_JBB))
4520 {
4521 /**
4522 * Let's maximise interleaving to find inter-thread
4523 * signal order dependency bugs
4524 */
4525 perjb = 1;
4526 extra = 0;
4527 }
4528 #endif
4529
4530 /* Now execute prio B signals from one thread. */
4531 signal_count += execute_signals(selfptr, queue, head, read_state,
4532 sig, perjb+extra);
4533
4534 if (signal_count - signal_count_since_last_zero_time_queue >
4535 (MAX_SIGNALS_EXECUTED_BEFORE_ZERO_TIME_QUEUE_SCAN -
4536 MAX_SIGNALS_PER_JB))
4537 {
4538 /**
4539 * Each execution of execute_signals can at most execute 75 signals
4540 * from one node. We want to ensure that we execute no more than
4541 * 100 signals before we arrive here to get the signals from the
4542 * zero time queue. This implements the bounded delay signal
4543 * concept which is required for rate controlled activities.
4544 *
4545 * We scan the zero time queue if more than 25 signals were executed.
4546 * This means that at most 100 signals will be executed before we arrive
4547 * here again to check the bounded delay signals.
4548 */
4549 signal_count_since_last_zero_time_queue = signal_count;
4550 scan_zero_queue(selfptr);
4551 }
4552 }
4553
4554 return signal_count;
4555 }
4556
4557 struct thr_map_entry {
4558 enum { NULL_THR_NO = 0xFF };
4559 Uint8 thr_no;
thr_map_entrythr_map_entry4560 thr_map_entry() : thr_no(NULL_THR_NO) {}
4561 };
4562
4563 static struct thr_map_entry thr_map[NO_OF_BLOCKS][NDBMT_MAX_BLOCK_INSTANCES];
4564
4565 static inline Uint32
block2ThreadId(Uint32 block,Uint32 instance)4566 block2ThreadId(Uint32 block, Uint32 instance)
4567 {
4568 assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
4569 Uint32 index = block - MIN_BLOCK_NO;
4570 assert(instance < NDB_ARRAY_SIZE(thr_map[index]));
4571 const thr_map_entry& entry = thr_map[index][instance];
4572 assert(entry.thr_no < num_threads);
4573 return entry.thr_no;
4574 }
4575
4576 void
add_thr_map(Uint32 main,Uint32 instance,Uint32 thr_no)4577 add_thr_map(Uint32 main, Uint32 instance, Uint32 thr_no)
4578 {
4579 assert(main == blockToMain(main));
4580 Uint32 index = main - MIN_BLOCK_NO;
4581 assert(index < NO_OF_BLOCKS);
4582 assert(instance < NDB_ARRAY_SIZE(thr_map[index]));
4583
4584 SimulatedBlock* b = globalData.getBlock(main, instance);
4585 require(b != 0);
4586
4587 /* Block number including instance. */
4588 Uint32 block = numberToBlock(main, instance);
4589
4590 require(thr_no < num_threads);
4591 struct thr_repository* rep = g_thr_repository;
4592 struct thr_data* thr_ptr = &rep->m_thread[thr_no];
4593
4594 /* Add to list. */
4595 {
4596 Uint32 i;
4597 for (i = 0; i < thr_ptr->m_instance_count; i++)
4598 require(thr_ptr->m_instance_list[i] != block);
4599 }
4600 require(thr_ptr->m_instance_count < MAX_INSTANCES_PER_THREAD);
4601 thr_ptr->m_instance_list[thr_ptr->m_instance_count++] = block;
4602
4603 SimulatedBlock::ThreadContext ctx;
4604 ctx.threadId = thr_no;
4605 ctx.jamBuffer = &thr_ptr->m_jam;
4606 ctx.watchDogCounter = &thr_ptr->m_watchdog_counter;
4607 ctx.sectionPoolCache = &thr_ptr->m_sectionPoolCache;
4608 b->assignToThread(ctx);
4609
4610 /* Create entry mapping block to thread. */
4611 thr_map_entry& entry = thr_map[index][instance];
4612 require(entry.thr_no == thr_map_entry::NULL_THR_NO);
4613 entry.thr_no = thr_no;
4614 }
4615
4616 /* Static assignment of main instances (before first signal). */
4617 void
mt_init_thr_map()4618 mt_init_thr_map()
4619 {
4620 /* Keep mt-classic assignments in MT LQH. */
4621 const Uint32 thr_GLOBAL = 0;
4622 const Uint32 thr_LOCAL = 1;
4623
4624 add_thr_map(BACKUP, 0, thr_LOCAL);
4625 add_thr_map(DBTC, 0, thr_GLOBAL);
4626 add_thr_map(DBDIH, 0, thr_GLOBAL);
4627 add_thr_map(DBLQH, 0, thr_LOCAL);
4628 add_thr_map(DBACC, 0, thr_LOCAL);
4629 add_thr_map(DBTUP, 0, thr_LOCAL);
4630 add_thr_map(DBDICT, 0, thr_GLOBAL);
4631 add_thr_map(NDBCNTR, 0, thr_GLOBAL);
4632 add_thr_map(QMGR, 0, thr_GLOBAL);
4633 add_thr_map(NDBFS, 0, thr_GLOBAL);
4634 add_thr_map(CMVMI, 0, thr_GLOBAL);
4635 add_thr_map(TRIX, 0, thr_GLOBAL);
4636 add_thr_map(DBUTIL, 0, thr_GLOBAL);
4637 add_thr_map(SUMA, 0, thr_LOCAL);
4638 add_thr_map(DBTUX, 0, thr_LOCAL);
4639 add_thr_map(TSMAN, 0, thr_LOCAL);
4640 add_thr_map(LGMAN, 0, thr_LOCAL);
4641 add_thr_map(PGMAN, 0, thr_LOCAL);
4642 add_thr_map(RESTORE, 0, thr_LOCAL);
4643 add_thr_map(DBINFO, 0, thr_LOCAL);
4644 add_thr_map(DBSPJ, 0, thr_GLOBAL);
4645 add_thr_map(THRMAN, 0, thr_GLOBAL);
4646 add_thr_map(TRPMAN, 0, thr_GLOBAL);
4647 }
4648
4649 Uint32
mt_get_instance_count(Uint32 block)4650 mt_get_instance_count(Uint32 block)
4651 {
4652 switch(block){
4653 case DBLQH:
4654 case DBACC:
4655 case DBTUP:
4656 case DBTUX:
4657 case BACKUP:
4658 case RESTORE:
4659 return globalData.ndbMtLqhWorkers;
4660 break;
4661 case PGMAN:
4662 return globalData.ndbMtLqhWorkers + 1;
4663 break;
4664 case DBTC:
4665 case DBSPJ:
4666 return globalData.ndbMtTcThreads;
4667 break;
4668 case TRPMAN:
4669 return globalData.ndbMtReceiveThreads;
4670 case THRMAN:
4671 return num_threads;
4672 default:
4673 require(false);
4674 }
4675 return 0;
4676 }
4677
4678 void
mt_add_thr_map(Uint32 block,Uint32 instance)4679 mt_add_thr_map(Uint32 block, Uint32 instance)
4680 {
4681 Uint32 num_lqh_threads = globalData.ndbMtLqhThreads;
4682 Uint32 num_tc_threads = globalData.ndbMtTcThreads;
4683
4684 require(instance != 0);
4685 Uint32 thr_no = NUM_MAIN_THREADS;
4686 switch(block){
4687 case DBLQH:
4688 case DBACC:
4689 case DBTUP:
4690 case DBTUX:
4691 case BACKUP:
4692 case RESTORE:
4693 thr_no += (instance - 1) % num_lqh_threads;
4694 break;
4695 case PGMAN:
4696 if (instance == num_lqh_threads + 1)
4697 {
4698 // Put extra PGMAN together with it's Proxy
4699 thr_no = block2ThreadId(block, 0);
4700 }
4701 else
4702 {
4703 thr_no += (instance - 1) % num_lqh_threads;
4704 }
4705 break;
4706 case DBTC:
4707 case DBSPJ:
4708 thr_no += num_lqh_threads + (instance - 1);
4709 break;
4710 case THRMAN:
4711 thr_no = instance - 1;
4712 break;
4713 case TRPMAN:
4714 thr_no += num_lqh_threads + num_tc_threads + (instance - 1);
4715 break;
4716 default:
4717 require(false);
4718 }
4719
4720 add_thr_map(block, instance, thr_no);
4721 }
4722
4723 /**
4724 * create the duplicate entries needed so that
4725 * sender doesnt need to know how many instances there
4726 * actually are in this node...
4727 *
4728 * if only 1 instance...then duplicate that for all slots
4729 * else assume instance 0 is proxy...and duplicate workers (modulo)
4730 *
4731 * NOTE: extra pgman worker is instance 5
4732 */
4733 void
mt_finalize_thr_map()4734 mt_finalize_thr_map()
4735 {
4736 for (Uint32 b = 0; b < NO_OF_BLOCKS; b++)
4737 {
4738 Uint32 bno = b + MIN_BLOCK_NO;
4739 Uint32 cnt = 0;
4740 while (cnt < NDB_ARRAY_SIZE(thr_map[b]) &&
4741 thr_map[b][cnt].thr_no != thr_map_entry::NULL_THR_NO)
4742 cnt++;
4743
4744 if (cnt != NDB_ARRAY_SIZE(thr_map[b]))
4745 {
4746 SimulatedBlock * main = globalData.getBlock(bno, 0);
4747 for (Uint32 i = cnt; i < NDB_ARRAY_SIZE(thr_map[b]); i++)
4748 {
4749 Uint32 dup = (cnt == 1) ? 0 : 1 + ((i - 1) % (cnt - 1));
4750 if (thr_map[b][i].thr_no == thr_map_entry::NULL_THR_NO)
4751 {
4752 thr_map[b][i] = thr_map[b][dup];
4753 main->addInstance(globalData.getBlock(bno, dup), i);
4754 }
4755 else
4756 {
4757 /**
4758 * extra pgman instance
4759 */
4760 require(bno == PGMAN);
4761 }
4762 }
4763 }
4764 }
4765 }
4766
4767 static void
init_thread(thr_data * selfptr)4768 init_thread(thr_data *selfptr)
4769 {
4770 selfptr->m_waiter.init();
4771 selfptr->m_jam.theEmulatedJamIndex = 0;
4772 NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &selfptr->m_jam);
4773 NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
4774
4775 unsigned thr_no = selfptr->m_thr_no;
4776 globalEmulatorData.theWatchDog->
4777 registerWatchedThread(&selfptr->m_watchdog_counter, thr_no);
4778 {
4779 while(selfptr->m_thread == 0)
4780 NdbSleep_MilliSleep(30);
4781 }
4782
4783 THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
4784 BaseString tmp;
4785 tmp.appfmt("thr: %u ", thr_no);
4786
4787 int tid = NdbThread_GetTid(selfptr->m_thread);
4788 if (tid != -1)
4789 {
4790 tmp.appfmt("tid: %u ", tid);
4791 }
4792
4793 conf.appendInfo(tmp,
4794 selfptr->m_instance_list,
4795 selfptr->m_instance_count);
4796 int res = conf.do_bind(selfptr->m_thread,
4797 selfptr->m_instance_list,
4798 selfptr->m_instance_count);
4799 if (res < 0)
4800 {
4801 tmp.appfmt("err: %d ", -res);
4802 }
4803 else if (res > 0)
4804 {
4805 tmp.appfmt("OK ");
4806 }
4807 selfptr->m_realtime = conf.do_get_realtime(selfptr->m_instance_list,
4808 selfptr->m_instance_count);
4809 selfptr->m_spintime = conf.do_get_spintime(selfptr->m_instance_list,
4810 selfptr->m_instance_count);
4811
4812 selfptr->m_thr_id = my_thread_self();
4813
4814 for (Uint32 i = 0; i < selfptr->m_instance_count; i++)
4815 {
4816 BlockReference block = selfptr->m_instance_list[i];
4817 Uint32 main = blockToMain(block);
4818 Uint32 instance = blockToInstance(block);
4819 tmp.appfmt("%s(%u) ", getBlockName(main), instance);
4820 }
4821 printf("%s\n", tmp.c_str());
4822 fflush(stdout);
4823 }
4824
4825 /**
4826 * Align signal buffer for better cache performance.
4827 * Also skew it a litte for each thread to avoid cache pollution.
4828 */
4829 #define SIGBUF_SIZE (sizeof(Signal) + 63 + 256 * MAX_BLOCK_THREADS)
4830 static Signal *
aligned_signal(unsigned char signal_buf[SIGBUF_SIZE],unsigned thr_no)4831 aligned_signal(unsigned char signal_buf[SIGBUF_SIZE], unsigned thr_no)
4832 {
4833 UintPtr sigtmp= (UintPtr)signal_buf;
4834 sigtmp= (sigtmp+63) & (~(UintPtr)63);
4835 sigtmp+= thr_no*256;
4836 return (Signal *)sigtmp;
4837 }
4838
4839 /*
4840 * We only do receive in receiver thread(s), no other threads do receive.
4841 *
4842 * As part of the receive loop, we also periodically call update_connections()
4843 * (this way we are similar to single-threaded ndbd).
4844 *
4845 * The TRPMAN block (and no other blocks) run in the same thread as this
4846 * receive loop; this way we avoid races between update_connections() and
4847 * TRPMAN calls into the transporters.
4848 */
4849
4850 /**
4851 * Array of pointers to TransporterReceiveHandleKernel
4852 * these are not used "in traffic"
4853 */
4854 static TransporterReceiveHandleKernel *
4855 g_trp_receive_handle_ptr[MAX_NDBMT_RECEIVE_THREADS];
4856
4857 /**
4858 * Array for mapping nodes to receiver threads and function to access it.
4859 */
4860 static NodeId g_node_to_recv_thr_map[MAX_NODES];
4861
4862 /**
4863 * We use this method both to initialise the realtime variable
4864 * and also for updating it. Currently there is no method to
4865 * update it, but it's likely that we will soon invent one and
4866 * thus the code is prepared for this case.
4867 */
4868 static void
update_rt_config(struct thr_data * selfptr,bool & real_time,enum ThreadTypes type)4869 update_rt_config(struct thr_data *selfptr,
4870 bool & real_time,
4871 enum ThreadTypes type)
4872 {
4873 bool old_real_time = real_time;
4874 real_time = selfptr->m_realtime;
4875 if (old_real_time == true && real_time == false)
4876 {
4877 yield_rt_break(selfptr->m_thread,
4878 type,
4879 false);
4880 }
4881 }
4882
4883 /**
4884 * We use this method both to initialise the spintime variable
4885 * and also for updating it. Currently there is no method to
4886 * update it, but it's likely that we will soon invent one and
4887 * thus the code is prepared for this case.
4888 */
4889 static void
update_spin_config(struct thr_data * selfptr,Uint64 & min_spin_timer)4890 update_spin_config(struct thr_data *selfptr,
4891 Uint64 & min_spin_timer)
4892 {
4893 min_spin_timer = selfptr->m_spintime;
4894 }
4895
4896 extern "C"
4897 void *
mt_receiver_thread_main(void * thr_arg)4898 mt_receiver_thread_main(void *thr_arg)
4899 {
4900 unsigned char signal_buf[SIGBUF_SIZE];
4901 Signal *signal;
4902 struct thr_repository* rep = g_thr_repository;
4903 struct thr_data* selfptr = (struct thr_data *)thr_arg;
4904 unsigned thr_no = selfptr->m_thr_no;
4905 Uint32& watchDogCounter = selfptr->m_watchdog_counter;
4906 const Uint32 recv_thread_idx = thr_no - first_receiver_thread_no;
4907 bool has_received = false;
4908 int cnt = 0;
4909 bool real_time = false;
4910 Uint64 min_spin_timer;
4911 NDB_TICKS start_spin_ticks;
4912 NDB_TICKS yield_ticks;
4913
4914 init_thread(selfptr);
4915 signal = aligned_signal(signal_buf, thr_no);
4916 update_rt_config(selfptr, real_time, ReceiveThread);
4917 update_spin_config(selfptr, min_spin_timer);
4918
4919 /**
4920 * Object that keeps track of our pollReceive-state
4921 */
4922 TransporterReceiveHandleKernel recvdata(thr_no, recv_thread_idx);
4923 recvdata.assign_nodes(g_node_to_recv_thr_map);
4924 globalTransporterRegistry.init(recvdata);
4925
4926 /**
4927 * Save pointer to this for management/error-insert
4928 */
4929 g_trp_receive_handle_ptr[recv_thread_idx] = &recvdata;
4930
4931 NdbTick_Invalidate(&start_spin_ticks);
4932 NDB_TICKS now = NdbTick_getCurrentTicks();
4933 selfptr->m_ticks = yield_ticks = now;
4934
4935 while (globalData.theRestartFlag != perform_stop)
4936 {
4937 if (cnt == 0)
4938 {
4939 watchDogCounter = 5;
4940 globalTransporterRegistry.update_connections(recvdata);
4941 }
4942 cnt = (cnt + 1) & 15;
4943
4944 watchDogCounter = 2;
4945
4946 now = NdbTick_getCurrentTicks();
4947 const Uint32 lagging_timers = scan_time_queues(selfptr, now);
4948
4949 Uint32 sum = run_job_buffers(selfptr, signal);
4950
4951 if (sum || has_received)
4952 {
4953 watchDogCounter = 6;
4954 flush_jbb_write_state(selfptr);
4955 }
4956
4957 const Int32 pending_send = do_send(selfptr, TRUE);
4958
4959 watchDogCounter = 7;
4960
4961 if (real_time)
4962 {
4963 check_real_time_break(now,
4964 &yield_ticks,
4965 selfptr->m_thread,
4966 ReceiveThread);
4967 }
4968
4969 /**
4970 * Only allow to sleep in pollReceive when:
4971 * 1) We are not lagging behind in handling timer events.
4972 * 2) No more pending sends, or no send progress.
4973 * 3) There are no 'min_spin' configured or min_spin has elapsed
4974 */
4975 Uint32 delay = 0;
4976
4977 if (lagging_timers == 0 && // 1)
4978 pending_send <= 0 && // 2)
4979 (min_spin_timer == 0 || // 3)
4980 check_yield(now,
4981 &start_spin_ticks,
4982 min_spin_timer)))
4983 {
4984 delay = 1; // 1ms
4985 }
4986
4987 has_received = false;
4988 if (globalTransporterRegistry.pollReceive(delay, recvdata))
4989 {
4990 watchDogCounter = 8;
4991 lock(&rep->m_receive_lock[recv_thread_idx]);
4992 const bool buffersFull = (globalTransporterRegistry.performReceive(recvdata) != 0);
4993 unlock(&rep->m_receive_lock[recv_thread_idx]);
4994 has_received = true;
4995
4996 if (buffersFull) /* Receive queues(s) are full */
4997 {
4998 thr_data* waitthr = get_congested_recv_queue(rep, recv_thread_idx);
4999 if (waitthr != NULL) /* Will wait for buffers to be freed */
5000 {
5001 /**
5002 * Wait for thread 'waitthr' to consume some of the
5003 * pending signals in m_in_queue previously received
5004 * from this receive thread, 'thr_no'.
5005 * Will recheck queue status with 'check_recv_queue' after latch
5006 * has been set, and *before* going to sleep.
5007 */
5008 const Uint32 nano_wait = 1000*1000; /* -> 1 ms */
5009 thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no;
5010
5011 const bool waited = yield(&wait_queue->m_waiter,
5012 nano_wait,
5013 check_recv_queue,
5014 wait_queue);
5015 (void)waited;
5016 }
5017 }
5018 }
5019 selfptr->m_stat.m_loop_cnt++;
5020 selfptr->m_stat.m_exec_cnt += sum;
5021 }
5022
5023 globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
5024 return NULL; // Return value not currently used
5025 }
5026
5027 static
5028 inline
5029 void
sendpacked(struct thr_data * thr_ptr,Signal * signal)5030 sendpacked(struct thr_data* thr_ptr, Signal* signal)
5031 {
5032 Uint32 i;
5033 for (i = 0; i < thr_ptr->m_instance_count; i++)
5034 {
5035 BlockReference block = thr_ptr->m_instance_list[i];
5036 Uint32 main = blockToMain(block);
5037 Uint32 instance = blockToInstance(block);
5038 SimulatedBlock* b = globalData.getBlock(main, instance);
5039 // wl4391_todo remove useless assert
5040 assert(b != 0 && b->getThreadId() == thr_ptr->m_thr_no);
5041 /* b->send_at_job_buffer_end(); */
5042 b->executeFunction_async(GSN_SEND_PACKED, signal);
5043 }
5044 }
5045
5046 /**
5047 * Callback function used by yield() to recheck
5048 * 'job queue full' condition before going to sleep.
5049 *
5050 * Check if the specified 'thr_job_queue_head' (arg)
5051 * is still full, return true if so.
5052 */
5053 static bool
check_congested_job_queue(thr_job_queue_head * waitfor)5054 check_congested_job_queue(thr_job_queue_head *waitfor)
5055 {
5056 return (compute_free_buffers_in_queue(waitfor) <= thr_job_queue::RESERVED);
5057 }
5058
5059 /**
5060 * Check if any out-queues of selfptr is full.
5061 * If full: Return 'Thr_data*' for (one of) the thread(s)
5062 * which we have to wait for. (to consume from queue)
5063 */
5064 static struct thr_data*
get_congested_job_queue(const thr_data * selfptr)5065 get_congested_job_queue(const thr_data *selfptr)
5066 {
5067 const Uint32 thr_no = selfptr->m_thr_no;
5068 struct thr_repository* rep = g_thr_repository;
5069 struct thr_data *thrptr = rep->m_thread;
5070 struct thr_data *waitfor = NULL;
5071
5072 for (unsigned i = 0; i<num_threads; i++, thrptr++)
5073 {
5074 thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
5075
5076 if (compute_free_buffers_in_queue(q_head) <= thr_job_queue::RESERVED)
5077 {
5078 if (thrptr != selfptr) // Don't wait on myself (yet)
5079 return thrptr;
5080 else
5081 waitfor = thrptr;
5082 }
5083 }
5084 return waitfor; // Possibly 'thrptr == selfptr'
5085 }
5086
5087 /**
5088 * has_full_in_queues()
5089 *
5090 * Avoid circular waits between block-threads:
5091 * A thread is not allowed to sleep due to full
5092 * 'out' job-buffers if there are other threads
5093 * already having full 'in' job buffers sent to
5094 * this thread.
5095 *
5096 * run_job_buffers() has reserved a 'm_max_extra_signals'
5097 * quota which will be used to drain these 'full_in_queues'.
5098 * So we should allow it to be.
5099 *
5100 * Returns 'true' if any in-queues to this thread are full
5101 */
5102 static
5103 bool
has_full_in_queues(struct thr_data * selfptr)5104 has_full_in_queues(struct thr_data* selfptr)
5105 {
5106 thr_job_queue_head *head = selfptr->m_in_queue_head;
5107
5108 for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++, head++)
5109 {
5110 if (compute_free_buffers_in_queue(head) <= thr_job_queue::RESERVED)
5111 {
5112 return true;
5113 }
5114 }
5115 return false;
5116 }
5117
5118 /**
5119 * update_sched_config
5120 *
5121 * In order to prevent "job-buffer-full", i.e
5122 * that one thread(T1) produces so much signals to another thread(T2)
5123 * so that the ring-buffer from T1 to T2 gets full
5124 * the main loop have 2 "config" variables
5125 * - m_max_exec_signals
5126 * This is the *total* no of signals T1 can execute before calling
5127 * this method again
5128 * - m_max_signals_per_jb
5129 * This is the max no of signals T1 can execute from each other thread
5130 * in system
5131 *
5132 * Assumption: each signal may send *at most* 4 signals
5133 * - this assumption is made the same in ndbd and ndbmtd and is
5134 * mostly followed by block-code, although not in all places :-(
5135 *
5136 * This function return true, if it it slept
5137 * (i.e that it concluded that it could not execute *any* signals, wo/
5138 * risking job-buffer-full)
5139 */
5140 static
5141 bool
update_sched_config(struct thr_data * selfptr,Int32 pending_send)5142 update_sched_config(struct thr_data* selfptr, Int32 pending_send)
5143 {
5144 Uint32 sleeploop = 0;
5145 Uint32 thr_no = selfptr->m_thr_no;
5146 loop:
5147 Uint32 minfree = compute_min_free_out_buffers(thr_no);
5148 Uint32 reserved = (minfree > thr_job_queue::RESERVED)
5149 ? thr_job_queue::RESERVED
5150 : minfree;
5151
5152 Uint32 avail = compute_max_signals_to_execute(minfree - reserved);
5153 Uint32 perjb = (avail + g_thr_repository->m_thread_count - 1) /
5154 g_thr_repository->m_thread_count;
5155
5156 if (perjb > MAX_SIGNALS_PER_JB)
5157 perjb = MAX_SIGNALS_PER_JB;
5158
5159 selfptr->m_max_exec_signals = avail;
5160 selfptr->m_max_signals_per_jb = perjb;
5161 selfptr->m_max_extra_signals = compute_max_signals_to_execute(reserved);
5162
5163 if (unlikely(perjb == 0))
5164 {
5165 if (sleeploop == 10)
5166 {
5167 /**
5168 * we've slept for 10ms...try running anyway
5169 */
5170 selfptr->m_max_signals_per_jb = 1;
5171 ndbout_c("thr_no:%u - sleeploop 10!! "
5172 "(Worker thread blocked (>= 10ms) by slow consumer threads)",
5173 selfptr->m_thr_no);
5174 return true;
5175 }
5176
5177 struct thr_data* waitthr = get_congested_job_queue(selfptr);
5178 if (waitthr == NULL) // Waiters resolved
5179 {
5180 goto loop;
5181 }
5182 else if (has_full_in_queues(selfptr) &&
5183 selfptr->m_max_extra_signals > 0)
5184 {
5185 /* 'extra_signals' used to drain 'full_in_queues'. */
5186 return sleeploop > 0;
5187 }
5188
5189 if (pending_send)
5190 {
5191 /* About to sleep, _must_ send now. */
5192 pending_send = do_send(selfptr, TRUE);
5193 }
5194
5195 /**
5196 * Wait for thread 'waitthr' to consume some of the
5197 * pending signals in m_in_queue[].
5198 * Will recheck queue status with 'check_recv_queue'
5199 * after latch has been set, and *before* going to sleep.
5200 */
5201 const Uint32 nano_wait = 1000*1000; /* -> 1 ms */
5202 thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no;
5203
5204 const bool waited = yield(&wait_queue->m_waiter,
5205 nano_wait,
5206 check_congested_job_queue,
5207 wait_queue);
5208 if (waited)
5209 {
5210 sleeploop++;
5211 }
5212 goto loop;
5213 }
5214
5215 return sleeploop > 0;
5216 }
5217
5218 extern "C"
5219 void *
mt_job_thread_main(void * thr_arg)5220 mt_job_thread_main(void *thr_arg)
5221 {
5222 unsigned char signal_buf[SIGBUF_SIZE];
5223 Signal *signal;
5224
5225 struct thr_data* selfptr = (struct thr_data *)thr_arg;
5226 init_thread(selfptr);
5227 Uint32& watchDogCounter = selfptr->m_watchdog_counter;
5228
5229 unsigned thr_no = selfptr->m_thr_no;
5230 signal = aligned_signal(signal_buf, thr_no);
5231
5232 /* Avoid false watchdog alarms caused by race condition. */
5233 watchDogCounter = 1;
5234
5235 Int32 pending_send = 0;
5236 Uint32 send_sum = 0;
5237 Uint32 loops = 0;
5238 Uint32 maxloops = 10;/* Loops before reading clock, fuzzy adapted to 1ms freq. */
5239 Uint32 waits = 0;
5240
5241 NDB_TICKS start_spin_ticks;
5242 NDB_TICKS yield_ticks;
5243
5244 Uint64 min_spin_timer;
5245 bool real_time = false;
5246
5247 update_rt_config(selfptr, real_time, BlockThread);
5248 update_spin_config(selfptr, min_spin_timer);
5249
5250 NdbTick_Invalidate(&start_spin_ticks);
5251 NDB_TICKS now = NdbTick_getCurrentTicks();
5252 selfptr->m_ticks = start_spin_ticks = yield_ticks = now;
5253
5254 while (globalData.theRestartFlag != perform_stop)
5255 {
5256 loops++;
5257
5258 /**
5259 * prefill our thread local send buffers
5260 * up to THR_SEND_BUFFER_PRE_ALLOC (1Mb)
5261 *
5262 * and if this doesnt work pack buffers before start to execute signals
5263 */
5264 watchDogCounter = 11;
5265 if (!selfptr->m_send_buffer_pool.fill(g_thr_repository->m_mm,
5266 RG_TRANSPORTER_BUFFERS,
5267 THR_SEND_BUFFER_PRE_ALLOC))
5268 {
5269 try_pack_send_buffers(selfptr);
5270 }
5271
5272 watchDogCounter = 2;
5273 const Uint32 lagging_timers = scan_time_queues(selfptr, now);
5274
5275 Uint32 sum = run_job_buffers(selfptr, signal);
5276
5277 watchDogCounter = 1;
5278 signal->header.m_noOfSections = 0; /* valgrind */
5279 sendpacked(selfptr, signal);
5280
5281 if (sum)
5282 {
5283 watchDogCounter = 6;
5284 flush_jbb_write_state(selfptr);
5285 send_sum += sum;
5286 NdbTick_Invalidate(&start_spin_ticks);
5287
5288 if (send_sum > MAX_SIGNALS_BEFORE_SEND)
5289 {
5290 /* Try to send, but skip for now in case of lock contention. */
5291 pending_send = do_send(selfptr, FALSE);
5292 send_sum = 0;
5293 }
5294 else
5295 {
5296 /* Send buffers append to send queues to dst. nodes. */
5297 do_flush(selfptr);
5298 }
5299 }
5300 /**
5301 * Scheduler is not allowed to yield until its internal
5302 * time has caught up on real time.
5303 */
5304 else if (lagging_timers == 0)
5305 {
5306 /* No signals processed, prepare to sleep to wait for more */
5307 if (send_sum > 0 || pending_send != 0)
5308 {
5309 /* About to sleep, _must_ send now. */
5310 pending_send = do_send(selfptr, TRUE);
5311 send_sum = 0;
5312 }
5313
5314 /**
5315 * No more incoming signals to process yet, and we have
5316 * either completed all pending sends, or had no progress
5317 * due to full transporters in last do_send(). Wait for
5318 * more signals, use a shorter timeout if pending_send.
5319 */
5320 if (pending_send <= 0) /* Nothing pending, or no progress made */
5321 {
5322 if (min_spin_timer == 0 ||
5323 check_yield(now,
5324 &start_spin_ticks,
5325 min_spin_timer))
5326 {
5327 /**
5328 * Sleep, either a short nap if send failed due to send overload,
5329 * or a longer sleep if there are no more work waiting.
5330 */
5331 const Uint32 maxwait = (pending_send)
5332 ? 1 * 1000000 // Retry busy send after 1ms
5333 : 10 * 1000000; // No more work -> 10ms
5334
5335 bool waited = yield(&selfptr->m_waiter,
5336 maxwait,
5337 check_queues_empty,
5338 selfptr);
5339 if (waited)
5340 {
5341 waits++;
5342 /* Update current time after sleeping */
5343 now = NdbTick_getCurrentTicks();
5344 yield_ticks = now;
5345 NdbTick_Invalidate(&start_spin_ticks);
5346 selfptr->m_stat.m_wait_cnt += waits;
5347 selfptr->m_stat.m_loop_cnt += loops;
5348 waits = loops = 0;
5349 }
5350 }
5351 }
5352 }
5353
5354 /**
5355 * Check if we executed enough signals,
5356 * and if so recompute how many signals to execute
5357 */
5358 if (sum >= selfptr->m_max_exec_signals)
5359 {
5360 if (update_sched_config(selfptr, send_sum + abs(pending_send)))
5361 {
5362 /* Update current time after sleeping */
5363 now = NdbTick_getCurrentTicks();
5364 selfptr->m_stat.m_wait_cnt += waits;
5365 selfptr->m_stat.m_loop_cnt += loops;
5366 waits = loops = 0;
5367 NdbTick_Invalidate(&start_spin_ticks);
5368 update_rt_config(selfptr, real_time, BlockThread);
5369 update_spin_config(selfptr, min_spin_timer);
5370 }
5371 }
5372 else
5373 {
5374 selfptr->m_max_exec_signals -= sum;
5375 }
5376
5377 /**
5378 * Adaptive reading freq. of systeme time every time 1ms
5379 * is likely to have passed
5380 */
5381 if (loops > maxloops)
5382 {
5383 now = NdbTick_getCurrentTicks();
5384 if (real_time)
5385 {
5386 check_real_time_break(now,
5387 &yield_ticks,
5388 selfptr->m_thread,
5389 BlockThread);
5390 }
5391 const Uint64 diff = NdbTick_Elapsed(selfptr->m_ticks, now).milliSec();
5392
5393 /* Adjust 'maxloop' to achieve clock reading frequency of 1ms */
5394 if (diff < 1)
5395 maxloops += ((maxloops/10) + 1); /* No change: less frequent reading */
5396 else if (diff > 1 && maxloops > 1)
5397 maxloops -= ((maxloops/10) + 1); /* Overslept: Need more frequent read*/
5398
5399 selfptr->m_stat.m_wait_cnt += waits;
5400 selfptr->m_stat.m_loop_cnt += loops;
5401 waits = loops = 0;
5402 }
5403 selfptr->m_stat.m_exec_cnt += sum;
5404 }
5405
5406 globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
5407 return NULL; // Return value not currently used
5408 }
5409
5410 /**
5411 * Get number of pending signals at B-level in our own thread. Used
5412 * to make some decisions in rate-critical parts of the data node.
5413 */
5414 Uint32
mt_getSignalsInJBB(Uint32 self)5415 mt_getSignalsInJBB(Uint32 self)
5416 {
5417 Uint32 pending_signals = 0;
5418 struct thr_repository* rep = g_thr_repository;
5419 struct thr_data *selfptr = &rep->m_thread[self];
5420 for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++)
5421 {
5422 thr_jb_write_state *w = selfptr->m_write_states + thr_no;
5423 pending_signals += w->get_pending_signals();
5424 }
5425 return pending_signals;
5426 }
5427
5428 void
sendlocal(Uint32 self,const SignalHeader * s,const Uint32 * data,const Uint32 secPtr[3])5429 sendlocal(Uint32 self, const SignalHeader *s, const Uint32 *data,
5430 const Uint32 secPtr[3])
5431 {
5432 Uint32 block = blockToMain(s->theReceiversBlockNumber);
5433 Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
5434
5435 /*
5436 * Max number of signals to put into job buffer before flushing the buffer
5437 * to the other thread.
5438 * This parameter found to be reasonable by benchmarking.
5439 */
5440 Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self >= first_receiver_thread_no) ?
5441 MAX_SIGNALS_BEFORE_FLUSH_RECEIVER :
5442 MAX_SIGNALS_BEFORE_FLUSH_OTHER;
5443
5444 Uint32 dst = block2ThreadId(block, instance);
5445 struct thr_repository* rep = g_thr_repository;
5446 struct thr_data *selfptr = &rep->m_thread[self];
5447 assert(my_thread_equal(selfptr->m_thr_id, my_thread_self()));
5448 struct thr_data *dstptr = &rep->m_thread[dst];
5449
5450 selfptr->m_stat.m_priob_count++;
5451 Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
5452 selfptr->m_stat.m_priob_size += siglen;
5453
5454 thr_job_queue *q = dstptr->m_in_queue + self;
5455 thr_job_queue_head *h = dstptr->m_in_queue_head + self;
5456 thr_jb_write_state *w = selfptr->m_write_states + dst;
5457 if (insert_signal(q, h, w, false, s, data, secPtr, selfptr->m_next_buffer))
5458 {
5459 selfptr->m_next_buffer = seize_buffer(rep, self, false);
5460 }
5461 if (w->get_pending_signals() >= MAX_SIGNALS_BEFORE_FLUSH)
5462 {
5463 flush_write_state(selfptr, dstptr, h, w);
5464 }
5465 }
5466
5467 void
sendprioa(Uint32 self,const SignalHeader * s,const uint32 * data,const Uint32 secPtr[3])5468 sendprioa(Uint32 self, const SignalHeader *s, const uint32 *data,
5469 const Uint32 secPtr[3])
5470 {
5471 Uint32 block = blockToMain(s->theReceiversBlockNumber);
5472 Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
5473
5474 Uint32 dst = block2ThreadId(block, instance);
5475 struct thr_repository* rep = g_thr_repository;
5476 struct thr_data *selfptr = &rep->m_thread[self];
5477 assert(s->theVerId_signalNumber == GSN_START_ORD ||
5478 my_thread_equal(selfptr->m_thr_id, my_thread_self()));
5479 struct thr_data *dstptr = &rep->m_thread[dst];
5480
5481 selfptr->m_stat.m_prioa_count++;
5482 Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
5483 selfptr->m_stat.m_prioa_size += siglen;
5484
5485 thr_job_queue *q = &(dstptr->m_jba);
5486 thr_job_queue_head *h = &(dstptr->m_jba_head);
5487 thr_jb_write_state w;
5488
5489 if (selfptr == dstptr)
5490 {
5491 /**
5492 * Indicate that we sent Prio A signal to ourself.
5493 */
5494 selfptr->m_sent_local_prioa_signal = true;
5495 }
5496
5497 lock(&dstptr->m_jba_write_lock);
5498
5499 Uint32 index = h->m_write_index;
5500 w.m_write_index = index;
5501 thr_job_buffer *buffer = q->m_buffers[index];
5502 w.m_write_buffer = buffer;
5503 w.m_write_pos = buffer->m_len;
5504 bool buf_used = insert_signal(q, h, &w, true, s, data, secPtr,
5505 selfptr->m_next_buffer);
5506 flush_write_state(selfptr, dstptr, h, &w);
5507
5508 unlock(&dstptr->m_jba_write_lock);
5509 if (w.has_any_pending_signals())
5510 {
5511 wakeup(&(dstptr->m_waiter));
5512 }
5513 if (buf_used)
5514 selfptr->m_next_buffer = seize_buffer(rep, self, true);
5515 }
5516
5517 /**
5518 * Send a signal to a remote node.
5519 *
5520 * (The signal is only queued here, and actually sent later in do_send()).
5521 */
5522 SendStatus
mt_send_remote(Uint32 self,const SignalHeader * sh,Uint8 prio,const Uint32 * data,NodeId nodeId,const LinearSectionPtr ptr[3])5523 mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
5524 const Uint32 * data, NodeId nodeId,
5525 const LinearSectionPtr ptr[3])
5526 {
5527 thr_repository *rep = g_thr_repository;
5528 struct thr_data *selfptr = &rep->m_thread[self];
5529 SendStatus ss;
5530
5531 mt_send_handle handle(selfptr);
5532 register_pending_send(selfptr, nodeId);
5533 /* prepareSend() is lock-free, as we have per-thread send buffers. */
5534 ss = globalTransporterRegistry.prepareSend(&handle,
5535 sh, prio, data, nodeId, ptr);
5536 return ss;
5537 }
5538
5539 SendStatus
mt_send_remote(Uint32 self,const SignalHeader * sh,Uint8 prio,const Uint32 * data,NodeId nodeId,class SectionSegmentPool * thePool,const SegmentedSectionPtr ptr[3])5540 mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
5541 const Uint32 *data, NodeId nodeId,
5542 class SectionSegmentPool *thePool,
5543 const SegmentedSectionPtr ptr[3])
5544 {
5545 thr_repository *rep = g_thr_repository;
5546 struct thr_data *selfptr = &rep->m_thread[self];
5547 SendStatus ss;
5548
5549 mt_send_handle handle(selfptr);
5550 register_pending_send(selfptr, nodeId);
5551 ss = globalTransporterRegistry.prepareSend(&handle,
5552 sh, prio, data, nodeId,
5553 *thePool, ptr);
5554 return ss;
5555 }
5556
5557 /*
5558 * This functions sends a prio A STOP_FOR_CRASH signal to a thread.
5559 *
5560 * It works when called from any other thread, not just from job processing
5561 * threads. But note that this signal will be the last signal to be executed by
5562 * the other thread, as it will exit immediately.
5563 */
5564 static
5565 void
sendprioa_STOP_FOR_CRASH(const struct thr_data * selfptr,Uint32 dst)5566 sendprioa_STOP_FOR_CRASH(const struct thr_data *selfptr, Uint32 dst)
5567 {
5568 SignalT<StopForCrash::SignalLength> signalT;
5569 struct thr_repository* rep = g_thr_repository;
5570 /* As this signal will be the last one executed by the other thread, it does
5571 not matter which buffer we use in case the current buffer is filled up by
5572 the STOP_FOR_CRASH signal; the data in it will never be read.
5573 */
5574 static thr_job_buffer dummy_buffer;
5575
5576 /**
5577 * Pick any instance running in this thread
5578 */
5579 struct thr_data *dstptr = &rep->m_thread[dst];
5580 Uint32 bno = dstptr->m_instance_list[0];
5581
5582 memset(&signalT.header, 0, sizeof(SignalHeader));
5583 signalT.header.theVerId_signalNumber = GSN_STOP_FOR_CRASH;
5584 signalT.header.theReceiversBlockNumber = bno;
5585 signalT.header.theSendersBlockRef = 0;
5586 signalT.header.theTrace = 0;
5587 signalT.header.theSendersSignalId = 0;
5588 signalT.header.theSignalId = 0;
5589 signalT.header.theLength = StopForCrash::SignalLength;
5590 StopForCrash * stopForCrash = CAST_PTR(StopForCrash, &signalT.theData[0]);
5591 stopForCrash->flags = 0;
5592
5593 thr_job_queue *q = &(dstptr->m_jba);
5594 thr_job_queue_head *h = &(dstptr->m_jba_head);
5595 thr_jb_write_state w;
5596
5597 lock(&dstptr->m_jba_write_lock);
5598
5599 Uint32 index = h->m_write_index;
5600 w.m_write_index = index;
5601 thr_job_buffer *buffer = q->m_buffers[index];
5602 w.m_write_buffer = buffer;
5603 w.m_write_pos = buffer->m_len;
5604 insert_signal(q, h, &w, true, &signalT.header, signalT.theData, NULL,
5605 &dummy_buffer);
5606 flush_write_state(selfptr, dstptr, h, &w);
5607
5608 unlock(&dstptr->m_jba_write_lock);
5609 if (w.has_any_pending_signals())
5610 {
5611 wakeup(&(dstptr->m_waiter));
5612 }
5613 }
5614
5615 /**
5616 * Identify type of thread.
5617 * Based on assumption that threads are allocated in the order:
5618 * main, ldm, tc, recv, send
5619 */
5620 static bool
is_main_thread(unsigned thr_no)5621 is_main_thread(unsigned thr_no)
5622 {
5623 return thr_no < NUM_MAIN_THREADS;
5624 }
5625
5626 static bool
is_ldm_thread(unsigned thr_no)5627 is_ldm_thread(unsigned thr_no)
5628 {
5629 return thr_no >= NUM_MAIN_THREADS &&
5630 thr_no < NUM_MAIN_THREADS+globalData.ndbMtLqhThreads;
5631 }
5632
5633 /**
5634 * All LDM threads are not created equal:
5635 * First LDMs BACKUP-thread act as client during BACKUP
5636 * (See usage of Backup::UserBackupInstanceKey)
5637 */
5638 static bool
is_first_ldm_thread(unsigned thr_no)5639 is_first_ldm_thread(unsigned thr_no)
5640 {
5641 return thr_no == NUM_MAIN_THREADS;
5642 }
5643
5644 static bool
is_tc_thread(unsigned thr_no)5645 is_tc_thread(unsigned thr_no)
5646 {
5647 unsigned tc_base = NUM_MAIN_THREADS+globalData.ndbMtLqhThreads;
5648 return thr_no >= tc_base &&
5649 thr_no < tc_base+globalData.ndbMtTcThreads;
5650 }
5651
5652 static bool
is_recv_thread(unsigned thr_no)5653 is_recv_thread(unsigned thr_no)
5654 {
5655 unsigned recv_base = NUM_MAIN_THREADS+globalData.ndbMtLqhThreads+globalData.ndbMtTcThreads;
5656 return thr_no >= recv_base &&
5657 thr_no < recv_base+globalData.ndbMtReceiveThreads;
5658 }
5659
5660 /**
5661 * Implements the rules for which threads are allowed to have
5662 * communication with each other.
5663 * Also see compute_jb_pages() which has similar logic.
5664 */
5665 static bool
may_communicate(unsigned from,unsigned to)5666 may_communicate(unsigned from, unsigned to)
5667 {
5668 if (is_main_thread(from))
5669 {
5670 // Main threads communicates with all other threads
5671 return true;
5672 }
5673 else if (is_ldm_thread(from))
5674 {
5675 // First LDM is special as it may act as internal client
5676 // during backup, and thus communicate with other LDMs:
5677 if (is_first_ldm_thread(from) && is_ldm_thread(to))
5678 return true;
5679
5680 // All LDM threads can communicates with TC-, main-
5681 // itself, and the BACKUP client (above)
5682 return is_main_thread(to) ||
5683 is_tc_thread(to) ||
5684 is_first_ldm_thread(to) ||
5685 (to == from);
5686 }
5687 else if (is_tc_thread(from))
5688 {
5689 // TC threads can communicate with SPJ-, LQH-, main- and itself
5690 return is_main_thread(to) ||
5691 is_ldm_thread(to) ||
5692 is_tc_thread(to); // Cover both SPJs and itself
5693 }
5694 else
5695 {
5696 assert(is_recv_thread(from));
5697 // Receive treads communicate with all, except other receivers
5698 return !is_recv_thread(to);
5699 }
5700 }
5701
5702 /**
5703 * init functions
5704 */
5705 static
5706 void
queue_init(struct thr_tq * tq)5707 queue_init(struct thr_tq* tq)
5708 {
5709 tq->m_next_timer = 0;
5710 tq->m_current_time = 0;
5711 tq->m_next_free = RNIL;
5712 tq->m_cnt[0] = tq->m_cnt[1] = tq->m_cnt[2] = 0;
5713 bzero(tq->m_delayed_signals, sizeof(tq->m_delayed_signals));
5714 }
5715
5716 static
5717 void
thr_init(struct thr_repository * rep,struct thr_data * selfptr,unsigned int cnt,unsigned thr_no)5718 thr_init(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
5719 unsigned thr_no)
5720 {
5721 Uint32 i;
5722
5723 selfptr->m_thr_no = thr_no;
5724 selfptr->m_max_signals_per_jb = MAX_SIGNALS_PER_JB;
5725 selfptr->m_max_exec_signals = 0;
5726 selfptr->m_max_extra_signals = 0;
5727 selfptr->m_first_free = 0;
5728 selfptr->m_first_unused = 0;
5729
5730 {
5731 char buf[100];
5732 BaseString::snprintf(buf, sizeof(buf), "jbalock thr: %u", thr_no);
5733 register_lock(&selfptr->m_jba_write_lock, buf);
5734 }
5735 selfptr->m_jba_head.m_read_index = 0;
5736 selfptr->m_jba_head.m_write_index = 0;
5737 thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
5738 selfptr->m_jba.m_buffers[0] = buffer;
5739 selfptr->m_jba_read_state.m_read_index = 0;
5740 selfptr->m_jba_read_state.m_read_buffer = buffer;
5741 selfptr->m_jba_read_state.m_read_pos = 0;
5742 selfptr->m_jba_read_state.m_read_end = 0;
5743 selfptr->m_jba_read_state.m_write_index = 0;
5744 selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
5745 selfptr->m_send_buffer_pool.set_pool(&rep->m_sb_pool);
5746
5747 for (i = 0; i<cnt; i++)
5748 {
5749 selfptr->m_in_queue_head[i].m_waiter.init();
5750 selfptr->m_in_queue_head[i].m_read_index = 0;
5751 selfptr->m_in_queue_head[i].m_write_index = 0;
5752 buffer = may_communicate(i,thr_no)
5753 ? seize_buffer(rep, thr_no, false) : NULL;
5754 selfptr->m_in_queue[i].m_buffers[0] = buffer;
5755 selfptr->m_read_states[i].m_read_index = 0;
5756 selfptr->m_read_states[i].m_read_buffer = buffer;
5757 selfptr->m_read_states[i].m_read_pos = 0;
5758 selfptr->m_read_states[i].m_read_end = 0;
5759 selfptr->m_read_states[i].m_write_index = 0;
5760 }
5761 queue_init(&selfptr->m_tq);
5762
5763 bzero(&selfptr->m_stat, sizeof(selfptr->m_stat));
5764
5765 selfptr->m_pending_send_count = 0;
5766 selfptr->m_pending_send_mask.clear();
5767
5768 selfptr->m_instance_count = 0;
5769 for (i = 0; i < MAX_INSTANCES_PER_THREAD; i++)
5770 selfptr->m_instance_list[i] = 0;
5771
5772 bzero(&selfptr->m_send_buffers, sizeof(selfptr->m_send_buffers));
5773
5774 selfptr->m_thread = 0;
5775 selfptr->m_cpu = NO_LOCK_CPU;
5776 }
5777
5778 /* Have to do this after init of all m_in_queues is done. */
5779 static
5780 void
thr_init2(struct thr_repository * rep,struct thr_data * selfptr,unsigned int cnt,unsigned thr_no)5781 thr_init2(struct thr_repository* rep, struct thr_data *selfptr,
5782 unsigned int cnt, unsigned thr_no)
5783 {
5784 for (Uint32 i = 0; i<cnt; i++)
5785 {
5786 selfptr->m_write_states[i].m_write_index = 0;
5787 selfptr->m_write_states[i].m_write_pos = 0;
5788 selfptr->m_write_states[i].m_write_buffer =
5789 rep->m_thread[i].m_in_queue[thr_no].m_buffers[0];
5790 selfptr->m_write_states[i].init_pending_signals();
5791 }
5792 }
5793
5794 static
5795 void
receive_lock_init(Uint32 recv_thread_id,thr_repository * rep)5796 receive_lock_init(Uint32 recv_thread_id, thr_repository *rep)
5797 {
5798 char buf[100];
5799 BaseString::snprintf(buf, sizeof(buf), "receive lock thread id %d",
5800 recv_thread_id);
5801 register_lock(&rep->m_receive_lock[recv_thread_id], buf);
5802 }
5803
5804 static
5805 void
send_buffer_init(Uint32 node,thr_repository::send_buffer * sb)5806 send_buffer_init(Uint32 node, thr_repository::send_buffer * sb)
5807 {
5808 char buf[100];
5809 BaseString::snprintf(buf, sizeof(buf), "send lock node %d", node);
5810 register_lock(&sb->m_send_lock, buf);
5811 BaseString::snprintf(buf, sizeof(buf), "send_buffer lock node %d", node);
5812 register_lock(&sb->m_buffer_lock, buf);
5813 sb->m_force_send = 0;
5814 sb->m_bytes_sent = 0;
5815 sb->m_send_thread = NO_SEND_THREAD;
5816 bzero(&sb->m_buffer, sizeof(sb->m_buffer));
5817 bzero(&sb->m_sending, sizeof(sb->m_sending));
5818 bzero(sb->m_read_index, sizeof(sb->m_read_index));
5819 }
5820
5821 static
5822 void
rep_init(struct thr_repository * rep,unsigned int cnt,Ndbd_mem_manager * mm)5823 rep_init(struct thr_repository* rep, unsigned int cnt, Ndbd_mem_manager *mm)
5824 {
5825 rep->m_mm = mm;
5826
5827 rep->m_thread_count = cnt;
5828 for (unsigned int i = 0; i<cnt; i++)
5829 {
5830 thr_init(rep, &rep->m_thread[i], cnt, i);
5831 }
5832 for (unsigned int i = 0; i<cnt; i++)
5833 {
5834 thr_init2(rep, &rep->m_thread[i], cnt, i);
5835 }
5836
5837 rep->stopped_threads = 0;
5838 NdbMutex_Init(&rep->stop_for_crash_mutex);
5839 NdbCondition_Init(&rep->stop_for_crash_cond);
5840
5841 for (Uint32 i = 0; i < NDB_ARRAY_SIZE(rep->m_receive_lock); i++)
5842 {
5843 receive_lock_init(i, rep);
5844 }
5845 for (int i = 0 ; i < MAX_NTRANSPORTERS; i++)
5846 {
5847 send_buffer_init(i, rep->m_send_buffers+i);
5848 }
5849
5850 bzero(rep->m_thread_send_buffers, sizeof(rep->m_thread_send_buffers));
5851 }
5852
5853
5854 /**
5855 * Thread Config
5856 */
5857
5858 static Uint32
get_total_number_of_block_threads(void)5859 get_total_number_of_block_threads(void)
5860 {
5861 return (NUM_MAIN_THREADS +
5862 globalData.ndbMtLqhThreads +
5863 globalData.ndbMtTcThreads +
5864 globalData.ndbMtReceiveThreads);
5865 }
5866
5867 static Uint32
get_num_nodes()5868 get_num_nodes()
5869 {
5870 Uint32 count = 0;
5871 for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
5872 {
5873 if (globalTransporterRegistry.get_transporter(nodeId))
5874 {
5875 count++;
5876 }
5877 }
5878 return count;
5879 }
5880
5881 /**
5882 * This function returns the amount of extra send buffer pages
5883 * that we should allocate in addition to the amount allocated
5884 * for each node send buffer.
5885 */
5886 #define MIN_SEND_BUFFER_GENERAL (512) //16M
5887 #define MIN_SEND_BUFFER_PER_NODE (8) //256k
5888 #define MIN_SEND_BUFFER_PER_THREAD (64) //2M
5889
5890 Uint32
mt_get_extra_send_buffer_pages(Uint32 curr_num_pages,Uint32 extra_mem_pages)5891 mt_get_extra_send_buffer_pages(Uint32 curr_num_pages,
5892 Uint32 extra_mem_pages)
5893 {
5894 Uint32 loc_num_threads = get_total_number_of_block_threads();
5895 Uint32 num_nodes = get_num_nodes();
5896
5897 Uint32 extra_pages = extra_mem_pages;
5898
5899 /**
5900 * Add 2M for each thread since we allocate 1M every
5901 * time we allocate and also we ensure there is also a minimum
5902 * of 1M of send buffer in each thread. Thus we can easily have
5903 * 2M of send buffer just to keep the contention around the
5904 * send buffer page spinlock small. This memory we add independent
5905 * of the configuration settings since the user cannot be
5906 * expected to handle this and also since we could change this
5907 * behaviour at any time.
5908 */
5909 extra_pages += loc_num_threads * THR_SEND_BUFFER_MAX_FREE;
5910
5911 if (extra_mem_pages == 0)
5912 {
5913 /**
5914 * The user have set extra send buffer memory to 0 and left for us
5915 * to decide on our own how much extra memory is needed.
5916 *
5917 * We'll make sure that we have at least a minimum of 16M +
5918 * 2M per thread + 256k per node. If we have this based on
5919 * curr_num_pages and our local additions we don't add
5920 * anything more, if we don't come up to this level we add to
5921 * reach this minimum level.
5922 */
5923 Uint32 min_pages = MIN_SEND_BUFFER_GENERAL +
5924 (MIN_SEND_BUFFER_PER_NODE * num_nodes) +
5925 (MIN_SEND_BUFFER_PER_THREAD * loc_num_threads);
5926
5927 if ((curr_num_pages + extra_pages) < min_pages)
5928 {
5929 extra_pages = min_pages - curr_num_pages;
5930 }
5931 }
5932 return extra_pages;
5933 }
5934
5935 Uint32
compute_jb_pages(struct EmulatorData * ed)5936 compute_jb_pages(struct EmulatorData * ed)
5937 {
5938 Uint32 cnt = get_total_number_of_block_threads();
5939 Uint32 num_receive_threads = globalData.ndbMtReceiveThreads;
5940 Uint32 num_lqh_threads = globalData.ndbMtLqhThreads;
5941 Uint32 num_tc_threads = globalData.ndbMtTcThreads;
5942 Uint32 num_main_threads = NUM_MAIN_THREADS;
5943
5944 /**
5945 * Number of pages each thread needs to communicate with another
5946 * thread.
5947 */
5948 Uint32 job_queue_pages_per_thread = thr_job_queue::SIZE;
5949
5950 /**
5951 * In 'perthread' we calculate number of pages required by
5952 * all 'block threads' (excludes 'send-threads'). 'perthread'
5953 * usage is independent of whether this thread will communicate
5954 * with other 'block threads' or not.
5955 */
5956 Uint32 perthread = 0;
5957
5958 /**
5959 * Each threads has its own job_queue for 'prio A' signals
5960 */
5961 perthread += job_queue_pages_per_thread;
5962
5963 /**
5964 * Each thread keeps a available free page in 'm_next_buffer'
5965 * in case it is required by insert_signal() into JBA or JBB.
5966 */
5967 perthread += 1;
5968
5969 /**
5970 * Each thread keeps time-queued signals in 'struct thr_tq'
5971 * thr_tq::PAGES are used to store these.
5972 */
5973 perthread += thr_tq::PAGES;
5974
5975 /**
5976 * Each thread has its own 'm_free_fifo[THR_FREE_BUF_MAX]' cache.
5977 * As it is filled to MAX *before* a page is allocated, which consumes a page,
5978 * it will never cache more than MAX-1 pages. Pages are also returned to global
5979 * allocator as soon as MAX is reached.
5980 */
5981 perthread += THR_FREE_BUF_MAX-1;
5982
5983 /**
5984 * Start by calculating the basic number of pages required for
5985 * our 'cnt' block threads.
5986 * (no inter-thread communication assumed so far)
5987 */
5988 Uint32 tot = cnt * perthread;
5989
5990 /**
5991 * We then start adding pages required for inter-thread communications:
5992 *
5993 * Receiver threads will be able to communicate with all other
5994 * threads except other receive threads.
5995 */
5996 tot += num_receive_threads *
5997 (cnt - num_receive_threads) *
5998 job_queue_pages_per_thread;
5999
6000 /**
6001 * LQH threads can communicate with TC threads and main threads.
6002 * Cannot communicate with receive threads and other LQH threads,
6003 * but it can communicate with itself.
6004 */
6005 tot += num_lqh_threads *
6006 (num_tc_threads + num_main_threads + 1) *
6007 job_queue_pages_per_thread;
6008
6009 /**
6010 * First LDM thread is special as it will act as client
6011 * during backup. It will send to, and receive from (2x)
6012 * the 'num_lqh_threads - 1' other LQH threads.
6013 */
6014 tot += 2 * (num_lqh_threads-1) *
6015 job_queue_pages_per_thread;
6016
6017 /**
6018 * TC threads can communicate with SPJ-, LQH- and main threads.
6019 * Cannot communicate with receive threads and other TC threads,
6020 * but as SPJ is located together with TC, it is counted as it
6021 * communicate with all TC threads.
6022 */
6023 tot += num_tc_threads *
6024 (num_lqh_threads + num_main_threads + num_tc_threads) *
6025 job_queue_pages_per_thread;
6026
6027 /**
6028 * Main threads can communicate with all other threads
6029 */
6030 tot += num_main_threads *
6031 cnt *
6032 job_queue_pages_per_thread;
6033
6034 return tot;
6035 }
6036
ThreadConfig()6037 ThreadConfig::ThreadConfig()
6038 {
6039 /**
6040 * We take great care within struct thr_repository to optimize
6041 * cache line placement of the different members. This all
6042 * depends on that the base address of thr_repository itself
6043 * is cache line alligned.
6044 *
6045 * So we allocate a char[] sufficient large to hold the
6046 * thr_repository object, with added bytes for placing
6047 * g_thr_repository on a CL-alligned offset withing it.
6048 */
6049 g_thr_repository_mem = new char[sizeof(thr_repository)+NDB_CL];
6050 const int alligned_offs = NDB_CL_PADSZ((UintPtr)g_thr_repository_mem);
6051 char* cache_alligned_mem = &g_thr_repository_mem[alligned_offs];
6052 require((((UintPtr)cache_alligned_mem) % NDB_CL) == 0);
6053 g_thr_repository = new(cache_alligned_mem) thr_repository();
6054 }
6055
~ThreadConfig()6056 ThreadConfig::~ThreadConfig()
6057 {
6058 g_thr_repository->~thr_repository();
6059 g_thr_repository = NULL;
6060 delete[] g_thr_repository_mem;
6061 g_thr_repository_mem = NULL;
6062 }
6063
6064 /*
6065 * We must do the init here rather than in the constructor, since at
6066 * constructor time the global memory manager is not available.
6067 */
6068 void
init()6069 ThreadConfig::init()
6070 {
6071 Uint32 num_lqh_threads = globalData.ndbMtLqhThreads;
6072 Uint32 num_tc_threads = globalData.ndbMtTcThreads;
6073 Uint32 num_recv_threads = globalData.ndbMtReceiveThreads;
6074 first_receiver_thread_no =
6075 NUM_MAIN_THREADS + num_tc_threads + num_lqh_threads;
6076 num_threads = first_receiver_thread_no + num_recv_threads;
6077 require(num_threads <= MAX_BLOCK_THREADS);
6078
6079 ndbout << "NDBMT: number of block threads=" << num_threads << endl;
6080
6081 ::rep_init(g_thr_repository, num_threads,
6082 globalEmulatorData.m_mem_manager);
6083 }
6084
6085 /**
6086 * return receiver thread handling a particular node
6087 * returned number is indexed from 0 and upwards to #receiver threads
6088 * (or MAX_NODES is none)
6089 */
6090 Uint32
mt_get_recv_thread_idx(NodeId nodeId)6091 mt_get_recv_thread_idx(NodeId nodeId)
6092 {
6093 assert(nodeId < NDB_ARRAY_SIZE(g_node_to_recv_thr_map));
6094 return g_node_to_recv_thr_map[nodeId];
6095 }
6096
6097 static
6098 void
assign_receiver_threads(void)6099 assign_receiver_threads(void)
6100 {
6101 Uint32 num_recv_threads = globalData.ndbMtReceiveThreads;
6102 Uint32 recv_thread_idx = 0;
6103 for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
6104 {
6105 Transporter *node_trp =
6106 globalTransporterRegistry.get_transporter(nodeId);
6107
6108 if (node_trp)
6109 {
6110 g_node_to_recv_thr_map[nodeId] = recv_thread_idx;
6111 recv_thread_idx++;
6112 if (recv_thread_idx == num_recv_threads)
6113 recv_thread_idx = 0;
6114 }
6115 else
6116 {
6117 /* Flag for no transporter */
6118 g_node_to_recv_thr_map[nodeId] = MAX_NODES;
6119 }
6120 }
6121 return;
6122 }
6123
6124 void
ipControlLoop(NdbThread * pThis)6125 ThreadConfig::ipControlLoop(NdbThread* pThis)
6126 {
6127 unsigned int thr_no;
6128 struct thr_repository* rep = g_thr_repository;
6129
6130 rep->m_thread[first_receiver_thread_no].m_thr_index =
6131 globalEmulatorData.theConfiguration->addThread(pThis, ReceiveThread);
6132
6133 max_send_delay = globalEmulatorData.theConfiguration->maxSendDelay();
6134
6135 if (globalData.ndbMtSendThreads)
6136 {
6137 g_send_threads = new thr_send_threads();
6138 }
6139
6140 /**
6141 * assign nodes to receiver threads
6142 */
6143 assign_receiver_threads();
6144
6145 /* Start the send thread(s) */
6146 if (g_send_threads)
6147 {
6148 g_send_threads->start_send_threads();
6149 }
6150
6151 /*
6152 * Start threads for all execution threads, except for the receiver
6153 * thread, which runs in the main thread.
6154 */
6155 for (thr_no = 0; thr_no < num_threads; thr_no++)
6156 {
6157 rep->m_thread[thr_no].m_ticks = NdbTick_getCurrentTicks();
6158
6159 if (thr_no == first_receiver_thread_no)
6160 continue; // Will run in the main thread.
6161
6162 /*
6163 * The NdbThread_Create() takes void **, but that is cast to void * when
6164 * passed to the thread function. Which is kind of strange ...
6165 */
6166 if (thr_no < first_receiver_thread_no)
6167 {
6168 /* Start block threads */
6169 struct NdbThread *thread_ptr =
6170 NdbThread_Create(mt_job_thread_main,
6171 (void **)(rep->m_thread + thr_no),
6172 1024*1024,
6173 "execute thread", //ToDo add number
6174 NDB_THREAD_PRIO_MEAN);
6175 require(thread_ptr != NULL);
6176 rep->m_thread[thr_no].m_thr_index =
6177 globalEmulatorData.theConfiguration->addThread(thread_ptr,
6178 BlockThread);
6179 rep->m_thread[thr_no].m_thread = thread_ptr;
6180 }
6181 else
6182 {
6183 /* Start a receiver thread, also block thread for TRPMAN */
6184 struct NdbThread *thread_ptr =
6185 NdbThread_Create(mt_receiver_thread_main,
6186 (void **)(&rep->m_thread[thr_no]),
6187 1024*1024,
6188 "receive thread", //ToDo add number
6189 NDB_THREAD_PRIO_MEAN);
6190 require(thread_ptr != NULL);
6191 globalEmulatorData.theConfiguration->addThread(thread_ptr,
6192 ReceiveThread);
6193 rep->m_thread[thr_no].m_thread = thread_ptr;
6194 }
6195 }
6196
6197 /* Now run the main loop for first receiver thread directly. */
6198 rep->m_thread[first_receiver_thread_no].m_thread = pThis;
6199 mt_receiver_thread_main(&(rep->m_thread[first_receiver_thread_no]));
6200
6201 /* Wait for all threads to shutdown. */
6202 for (thr_no = 0; thr_no < num_threads; thr_no++)
6203 {
6204 if (thr_no == first_receiver_thread_no)
6205 continue;
6206 void *dummy_return_status;
6207 NdbThread_WaitFor(rep->m_thread[thr_no].m_thread,
6208 &dummy_return_status);
6209 globalEmulatorData.theConfiguration->removeThread(
6210 rep->m_thread[thr_no].m_thread);
6211 NdbThread_Destroy(&(rep->m_thread[thr_no].m_thread));
6212 }
6213
6214 /* Delete send threads, includes waiting for threads to shutdown */
6215 if (g_send_threads)
6216 {
6217 delete g_send_threads;
6218 g_send_threads = NULL;
6219 }
6220 globalEmulatorData.theConfiguration->removeThread(pThis);
6221 }
6222
6223 int
doStart(NodeState::StartLevel startLevel)6224 ThreadConfig::doStart(NodeState::StartLevel startLevel)
6225 {
6226 SignalT<3> signalT;
6227 memset(&signalT.header, 0, sizeof(SignalHeader));
6228
6229 signalT.header.theVerId_signalNumber = GSN_START_ORD;
6230 signalT.header.theReceiversBlockNumber = CMVMI;
6231 signalT.header.theSendersBlockRef = 0;
6232 signalT.header.theTrace = 0;
6233 signalT.header.theSignalId = 0;
6234 signalT.header.theLength = StartOrd::SignalLength;
6235
6236 StartOrd * startOrd = CAST_PTR(StartOrd, &signalT.theData[0]);
6237 startOrd->restartInfo = 0;
6238
6239 sendprioa(block2ThreadId(CMVMI, 0), &signalT.header, signalT.theData, 0);
6240 return 0;
6241 }
6242
6243 Uint32
traceDumpGetNumThreads()6244 FastScheduler::traceDumpGetNumThreads()
6245 {
6246 /* The last thread is only for receiver -> no trace file. */
6247 return num_threads;
6248 }
6249
6250 bool
traceDumpGetJam(Uint32 thr_no,const JamEvent * & thrdTheEmulatedJam,Uint32 & thrdTheEmulatedJamIndex)6251 FastScheduler::traceDumpGetJam(Uint32 thr_no,
6252 const JamEvent * & thrdTheEmulatedJam,
6253 Uint32 & thrdTheEmulatedJamIndex)
6254 {
6255 if (thr_no >= num_threads)
6256 return false;
6257
6258 #ifdef NO_EMULATED_JAM
6259 thrdTheEmulatedJam = NULL;
6260 thrdTheEmulatedJamIndex = 0;
6261 #else
6262 const EmulatedJamBuffer *jamBuffer =
6263 &g_thr_repository->m_thread[thr_no].m_jam;
6264 thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
6265 thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
6266 #endif
6267 return true;
6268 }
6269
6270 void
traceDumpPrepare(NdbShutdownType & nst)6271 FastScheduler::traceDumpPrepare(NdbShutdownType& nst)
6272 {
6273 /*
6274 * We are about to generate trace files for all threads.
6275 *
6276 * We want to stop all threads processing before we dump, as otherwise the
6277 * signal buffers could change while dumping, leading to inconsistent
6278 * results.
6279 *
6280 * To stop threads, we send the GSN_STOP_FOR_CRASH signal as prio A to each
6281 * thread. We then wait for threads to signal they are done (but not forever,
6282 * so as to not have one hanging thread prevent the generation of trace
6283 * dumps). We also must be careful not to send to ourself if the crash is
6284 * being processed by one of the threads processing signals.
6285 *
6286 * We do not stop the transporter thread, as it cannot receive signals (but
6287 * because it does not receive signals it does not really influence dumps in
6288 * any case).
6289 */
6290 void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
6291 const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
6292 /* The selfptr might be NULL, or pointer to thread that crashed. */
6293
6294 Uint32 waitFor_count = 0;
6295 NdbMutex_Lock(&g_thr_repository->stop_for_crash_mutex);
6296 g_thr_repository->stopped_threads = 0;
6297
6298 for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++)
6299 {
6300 if (selfptr != NULL && selfptr->m_thr_no == thr_no)
6301 {
6302 /* This is own thread; we have already stopped processing. */
6303 continue;
6304 }
6305
6306 sendprioa_STOP_FOR_CRASH(selfptr, thr_no);
6307
6308 waitFor_count++;
6309 }
6310
6311 static const Uint32 max_wait_seconds = 2;
6312 const NDB_TICKS start = NdbTick_getCurrentTicks();
6313 while (g_thr_repository->stopped_threads < waitFor_count)
6314 {
6315 NdbCondition_WaitTimeout(&g_thr_repository->stop_for_crash_cond,
6316 &g_thr_repository->stop_for_crash_mutex,
6317 10);
6318 const NDB_TICKS now = NdbTick_getCurrentTicks();
6319 if (NdbTick_Elapsed(start,now).seconds() > max_wait_seconds)
6320 break; // Give up
6321 }
6322 if (g_thr_repository->stopped_threads < waitFor_count)
6323 {
6324 if (nst != NST_ErrorInsert)
6325 {
6326 nst = NST_Watchdog; // Make this abort fast
6327 }
6328 ndbout_c("Warning: %d thread(s) did not stop before starting crash dump.",
6329 waitFor_count - g_thr_repository->stopped_threads);
6330 }
6331 NdbMutex_Unlock(&g_thr_repository->stop_for_crash_mutex);
6332
6333 /* Now we are ready (or as ready as can be) for doing crash dump. */
6334 }
6335
6336 /**
6337 * In ndbmtd we could have a case where we actually have multiple threads
6338 * crashing at the same time. This causes several threads to start processing
6339 * the crash handling in parallel and eventually lead to a deadlock since
6340 * the crash handling thread waits for other threads to stop before completing
6341 * the crash handling.
6342 *
6343 * To avoid this we use this function that only is useful in ndbmtd where
6344 * we check if the crash handling has already started. We protect this
6345 * check using the stop_for_crash-mutex. This function is called twice,
6346 * first to write an entry in the error log and second to specify that the
6347 * error log write is completed.
6348 *
6349 * We proceed only from the first call if the crash handling hasn't started
6350 * or if the crash is not caused by an error insert. If it is caused by an
6351 * error insert it is a normal situation with multiple crashes, so we won't
6352 * clutter the error log with multiple entries in this case. If it is a real
6353 * crash and we have more than one thread crashing, then this is vital
6354 * information to write in the error log, we do however not want more than
6355 * one set of trace files.
6356 *
6357 * To ensure that writes of the error log happens for one thread at a time we
6358 * protect it with the stop_for_crash-mutex. We hold this mutex between the
6359 * first and second call of this function from the error reporter thread.
6360 *
6361 * We proceed from the first call only if we are the first thread that
6362 * reported an error. To handle this properly we start by acquiring the
6363 * mutex, then we write the error log, when we come back we set the
6364 * crash_started flag and release the mutex to enable other threads to
6365 * write into the error log, but still stopping them from proceeding to
6366 * write another set of trace files.
6367 *
6368 * We will not come back from this function the second time unless we are
6369 * the first crashing thread.
6370 */
6371
6372 static bool crash_started = false;
6373
6374 void
prepare_to_crash(bool first_phase,bool error_insert_crash)6375 ErrorReporter::prepare_to_crash(bool first_phase, bool error_insert_crash)
6376 {
6377 if (first_phase)
6378 {
6379 NdbMutex_Lock(&g_thr_repository->stop_for_crash_mutex);
6380 if (crash_started && error_insert_crash)
6381 {
6382 /**
6383 * Some other thread has already started the crash handling.
6384 * We call the below method which we will never return from.
6385 * We need not write multiple entries in error log for
6386 * error insert crashes since it is a normal event.
6387 */
6388 NdbMutex_Unlock(&g_thr_repository->stop_for_crash_mutex);
6389 mt_execSTOP_FOR_CRASH();
6390 }
6391 /**
6392 * Proceed to write error log before returning to this method
6393 * again with start set to 0.
6394 */
6395 }
6396 else if (crash_started)
6397 {
6398 (void)error_insert_crash;
6399 /**
6400 * No need to proceed since somebody already started handling the crash.
6401 * We proceed by calling mt_execSTOP_FOR_CRASH to stop this thread
6402 * in a manner that is similar to if we received the signal
6403 * STOP_FOR_CRASH.
6404 */
6405 NdbMutex_Unlock(&g_thr_repository->stop_for_crash_mutex);
6406 mt_execSTOP_FOR_CRASH();
6407 }
6408 else
6409 {
6410 /**
6411 * No crash had started previously, we will take care of it. Before
6412 * handling it we will mark the crash handling as started.
6413 */
6414 crash_started = true;
6415 NdbMutex_Unlock(&g_thr_repository->stop_for_crash_mutex);
6416 }
6417 }
6418
mt_execSTOP_FOR_CRASH()6419 void mt_execSTOP_FOR_CRASH()
6420 {
6421 void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
6422 const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
6423 require(selfptr != NULL);
6424
6425 NdbMutex_Lock(&g_thr_repository->stop_for_crash_mutex);
6426 g_thr_repository->stopped_threads++;
6427 NdbCondition_Signal(&g_thr_repository->stop_for_crash_cond);
6428 NdbMutex_Unlock(&g_thr_repository->stop_for_crash_mutex);
6429
6430 /* ToDo: is this correct? */
6431 globalEmulatorData.theWatchDog->unregisterWatchedThread(selfptr->m_thr_no);
6432
6433 my_thread_exit(NULL);
6434 }
6435
6436 void
dumpSignalMemory(Uint32 thr_no,FILE * out)6437 FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE* out)
6438 {
6439 void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
6440 thr_data *selfptr = reinterpret_cast<thr_data *>(value);
6441 const thr_repository *rep = g_thr_repository;
6442 /*
6443 * The selfptr might be NULL, or pointer to thread that is doing the crash
6444 * jump.
6445 * If non-null, we should update the watchdog counter while dumping.
6446 */
6447 Uint32 *watchDogCounter;
6448 if (selfptr)
6449 watchDogCounter = &selfptr->m_watchdog_counter;
6450 else
6451 watchDogCounter = NULL;
6452
6453 /*
6454 * We want to dump the signal buffers from last executed to first executed.
6455 * So we first need to find the correct sequence to output signals in, stored
6456 * in this arrray.
6457 *
6458 * We will check any buffers in the cyclic m_free_fifo. In addition,
6459 * we also need to scan the already executed part of the current
6460 * buffer in m_jba.
6461 *
6462 * Due to partial execution of prio A buffers, we will use signal ids to know
6463 * where to interleave prio A signals into the stream of prio B signals
6464 * read. So we will keep a pointer to a prio A buffer around; and while
6465 * scanning prio B buffers we will interleave prio A buffers from that buffer
6466 * when the signal id fits the sequence.
6467 *
6468 * This also means that we may have to discard the earliest part of available
6469 * prio A signal data due to too little prio B data present, or vice versa.
6470 */
6471 static const Uint32 MAX_SIGNALS_TO_DUMP = 4096;
6472 struct {
6473 const SignalHeader *ptr;
6474 bool prioa;
6475 } signalSequence[MAX_SIGNALS_TO_DUMP];
6476 Uint32 seq_start = 0;
6477 Uint32 seq_end = 0;
6478
6479 const struct thr_data *thr_ptr = &rep->m_thread[thr_no];
6480 if (watchDogCounter)
6481 *watchDogCounter = 4;
6482
6483 /*
6484 * ToDo: Might do some sanity check to avoid crashing on not yet initialised
6485 * thread.
6486 */
6487
6488 /* Scan all available buffers with already executed signals. */
6489
6490 /*
6491 * Keep track of all available buffers, so that we can pick out signals in
6492 * the same order they were executed (order obtained from signal id).
6493 *
6494 * We may need to keep track of THR_FREE_BUF_MAX buffers for fully executed
6495 * (and freed) buffers, plus MAX_BLOCK_THREADS buffers for currently active
6496 * prio B buffers, plus one active prio A buffer.
6497 */
6498 struct {
6499 const thr_job_buffer *m_jb;
6500 Uint32 m_pos;
6501 Uint32 m_max;
6502 } jbs[THR_FREE_BUF_MAX + MAX_BLOCK_THREADS + 1];
6503
6504 Uint32 num_jbs = 0;
6505
6506 /* Load released buffers. */
6507 Uint32 idx = thr_ptr->m_first_free;
6508 while (idx != thr_ptr->m_first_unused)
6509 {
6510 const thr_job_buffer *q = thr_ptr->m_free_fifo[idx];
6511 if (q->m_len > 0)
6512 {
6513 jbs[num_jbs].m_jb = q;
6514 jbs[num_jbs].m_pos = 0;
6515 jbs[num_jbs].m_max = q->m_len;
6516 num_jbs++;
6517 }
6518 idx = (idx + 1) % THR_FREE_BUF_MAX;
6519 }
6520 /* Load any active prio B buffers. */
6521 for (Uint32 thr_no = 0; thr_no < rep->m_thread_count; thr_no++)
6522 {
6523 const thr_job_queue *q = thr_ptr->m_in_queue + thr_no;
6524 const thr_jb_read_state *r = thr_ptr->m_read_states + thr_no;
6525 Uint32 read_pos = r->m_read_pos;
6526 if (r->is_open() && read_pos > 0)
6527 {
6528 jbs[num_jbs].m_jb = q->m_buffers[r->m_read_index];
6529 jbs[num_jbs].m_pos = 0;
6530 jbs[num_jbs].m_max = read_pos;
6531 num_jbs++;
6532 }
6533 }
6534 /* Load any active prio A buffer. */
6535 const thr_jb_read_state *r = &thr_ptr->m_jba_read_state;
6536 Uint32 read_pos = r->m_read_pos;
6537 if (read_pos > 0)
6538 {
6539 jbs[num_jbs].m_jb = thr_ptr->m_jba.m_buffers[r->m_read_index];
6540 jbs[num_jbs].m_pos = 0;
6541 jbs[num_jbs].m_max = read_pos;
6542 num_jbs++;
6543 }
6544
6545 /* Use the next signal id as the smallest (oldest).
6546 *
6547 * Subtracting two signal ids with the smallest makes
6548 * them comparable using standard comparision of Uint32,
6549 * there the biggest value is the newest.
6550 * For example,
6551 * (m_signal_id_counter - smallest_signal_id) == UINT32_MAX
6552 */
6553 const Uint32 smallest_signal_id = thr_ptr->m_signal_id_counter + 1;
6554
6555 /* Now pick out one signal at a time, in signal id order. */
6556 while (num_jbs > 0)
6557 {
6558 if (watchDogCounter)
6559 *watchDogCounter = 4;
6560
6561 /* Search out the smallest signal id remaining. */
6562 Uint32 idx_min = 0;
6563 const Uint32 *p = jbs[idx_min].m_jb->m_data + jbs[idx_min].m_pos;
6564 const SignalHeader *s_min = reinterpret_cast<const SignalHeader*>(p);
6565 Uint32 sid_min_adjusted = s_min->theSignalId - smallest_signal_id;
6566
6567 for (Uint32 i = 1; i < num_jbs; i++)
6568 {
6569 p = jbs[i].m_jb->m_data + jbs[i].m_pos;
6570 const SignalHeader *s = reinterpret_cast<const SignalHeader*>(p);
6571 const Uint32 sid_adjusted = s->theSignalId - smallest_signal_id;
6572 if (sid_adjusted < sid_min_adjusted)
6573 {
6574 idx_min = i;
6575 s_min = s;
6576 sid_min_adjusted = sid_adjusted;
6577 }
6578 }
6579
6580 /* We found the next signal, now put it in the ordered cyclic buffer. */
6581 signalSequence[seq_end].ptr = s_min;
6582 signalSequence[seq_end].prioa = jbs[idx_min].m_jb->m_prioa;
6583 Uint32 siglen =
6584 (sizeof(SignalHeader)>>2) + s_min->m_noOfSections + s_min->theLength;
6585 #if SIZEOF_CHARP == 8
6586 /* Align to 8-byte boundary, to ensure aligned copies. */
6587 siglen= (siglen+1) & ~((Uint32)1);
6588 #endif
6589 jbs[idx_min].m_pos += siglen;
6590 if (jbs[idx_min].m_pos >= jbs[idx_min].m_max)
6591 {
6592 /* We are done with this job buffer. */
6593 num_jbs--;
6594 jbs[idx_min] = jbs[num_jbs];
6595 }
6596 seq_end = (seq_end + 1) % MAX_SIGNALS_TO_DUMP;
6597 /* Drop old signals if too many available in history. */
6598 if (seq_end == seq_start)
6599 seq_start = (seq_start + 1) % MAX_SIGNALS_TO_DUMP;
6600 }
6601
6602 /* Now, having build the correct signal sequence, we can dump them all. */
6603 fprintf(out, "\n");
6604 bool first_one = true;
6605 bool out_of_signals = false;
6606 Uint32 lastSignalId = 0;
6607 while (seq_end != seq_start)
6608 {
6609 if (watchDogCounter)
6610 *watchDogCounter = 4;
6611
6612 if (seq_end == 0)
6613 seq_end = MAX_SIGNALS_TO_DUMP;
6614 seq_end--;
6615 SignalT<25> signal;
6616 const SignalHeader *s = signalSequence[seq_end].ptr;
6617 unsigned siglen = (sizeof(*s)>>2) + s->theLength;
6618 if (siglen > MAX_SIGNAL_SIZE)
6619 siglen = MAX_SIGNAL_SIZE; // Sanity check
6620 memcpy(&signal.header, s, 4*siglen);
6621 // instance number in trace file is confusing if not MT LQH
6622 if (globalData.ndbMtLqhWorkers == 0)
6623 signal.header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
6624
6625 const Uint32 *posptr = reinterpret_cast<const Uint32 *>(s);
6626 signal.m_sectionPtrI[0] = posptr[siglen + 0];
6627 signal.m_sectionPtrI[1] = posptr[siglen + 1];
6628 signal.m_sectionPtrI[2] = posptr[siglen + 2];
6629 bool prioa = signalSequence[seq_end].prioa;
6630
6631 /* Make sure to display clearly when there is a gap in the dump. */
6632 if (!first_one && !out_of_signals && (s->theSignalId + 1) != lastSignalId)
6633 {
6634 out_of_signals = true;
6635 fprintf(out, "\n\n\nNo more prio %s signals, rest of dump will be "
6636 "incomplete.\n\n\n\n", prioa ? "B" : "A");
6637 }
6638 first_one = false;
6639 lastSignalId = s->theSignalId;
6640
6641 fprintf(out, "--------------- Signal ----------------\n");
6642 Uint32 prio = (prioa ? JBA : JBB);
6643 SignalLoggerManager::printSignalHeader(out,
6644 signal.header,
6645 prio,
6646 globalData.ownId,
6647 true);
6648 SignalLoggerManager::printSignalData (out,
6649 signal.header,
6650 &signal.theData[0]);
6651 }
6652 fflush(out);
6653 }
6654
6655 int
traceDumpGetCurrentThread()6656 FastScheduler::traceDumpGetCurrentThread()
6657 {
6658 void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
6659 const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
6660
6661 /* The selfptr might be NULL, or pointer to thread that crashed. */
6662 if (selfptr == 0)
6663 {
6664 return -1;
6665 }
6666 else
6667 {
6668 return (int)selfptr->m_thr_no;
6669 }
6670 }
6671
6672 void
mt_section_lock()6673 mt_section_lock()
6674 {
6675 lock(&(g_thr_repository->m_section_lock));
6676 }
6677
6678 void
mt_section_unlock()6679 mt_section_unlock()
6680 {
6681 unlock(&(g_thr_repository->m_section_lock));
6682 }
6683
6684 void
mt_mem_manager_init()6685 mt_mem_manager_init()
6686 {
6687 }
6688
6689 void
mt_mem_manager_lock()6690 mt_mem_manager_lock()
6691 {
6692 lock(&(g_thr_repository->m_mem_manager_lock));
6693 }
6694
6695 void
mt_mem_manager_unlock()6696 mt_mem_manager_unlock()
6697 {
6698 unlock(&(g_thr_repository->m_mem_manager_lock));
6699 }
6700
6701 Vector<mt_lock_stat> g_locks;
6702 template class Vector<mt_lock_stat>;
6703
6704 static
6705 void
register_lock(const void * ptr,const char * name)6706 register_lock(const void * ptr, const char * name)
6707 {
6708 if (name == 0)
6709 return;
6710
6711 mt_lock_stat* arr = g_locks.getBase();
6712 for (size_t i = 0; i<g_locks.size(); i++)
6713 {
6714 if (arr[i].m_ptr == ptr)
6715 {
6716 if (arr[i].m_name)
6717 {
6718 free(arr[i].m_name);
6719 }
6720 arr[i].m_name = strdup(name);
6721 return;
6722 }
6723 }
6724
6725 mt_lock_stat ln;
6726 ln.m_ptr = ptr;
6727 ln.m_name = strdup(name);
6728 ln.m_contended_count = 0;
6729 ln.m_spin_count = 0;
6730 g_locks.push_back(ln);
6731 }
6732
6733 #if defined(NDB_HAVE_XCNG) && defined(NDB_USE_SPINLOCK)
6734 static
6735 mt_lock_stat *
lookup_lock(const void * ptr)6736 lookup_lock(const void * ptr)
6737 {
6738 mt_lock_stat* arr = g_locks.getBase();
6739 for (size_t i = 0; i<g_locks.size(); i++)
6740 {
6741 if (arr[i].m_ptr == ptr)
6742 return arr + i;
6743 }
6744
6745 return 0;
6746 }
6747 #endif
6748
6749 Uint32
mt_get_thread_references_for_blocks(const Uint32 blocks[],Uint32 threadId,Uint32 dst[],Uint32 len)6750 mt_get_thread_references_for_blocks(const Uint32 blocks[], Uint32 threadId,
6751 Uint32 dst[], Uint32 len)
6752 {
6753 Uint32 cnt = 0;
6754 Bitmask<(MAX_BLOCK_THREADS+31)/32> mask;
6755 mask.set(threadId);
6756 for (Uint32 i = 0; blocks[i] != 0; i++)
6757 {
6758 Uint32 block = blocks[i];
6759 /**
6760 * Find each thread that has instance of block
6761 */
6762 assert(block == blockToMain(block));
6763 Uint32 index = block - MIN_BLOCK_NO;
6764 for (Uint32 instance = 0; instance < NDB_ARRAY_SIZE(thr_map[instance]); instance++)
6765 {
6766 Uint32 thr_no = thr_map[index][instance].thr_no;
6767 if (thr_no == thr_map_entry::NULL_THR_NO)
6768 break;
6769
6770 if (mask.get(thr_no))
6771 continue;
6772
6773 mask.set(thr_no);
6774 require(cnt < len);
6775 dst[cnt++] = numberToRef(block, instance, 0);
6776 }
6777 }
6778 return cnt;
6779 }
6780
6781 void
mt_wakeup(class SimulatedBlock * block)6782 mt_wakeup(class SimulatedBlock* block)
6783 {
6784 Uint32 thr_no = block->getThreadId();
6785 struct thr_data *thrptr = &g_thr_repository->m_thread[thr_no];
6786 wakeup(&thrptr->m_waiter);
6787 }
6788
6789 #ifdef VM_TRACE
6790 void
mt_assert_own_thread(SimulatedBlock * block)6791 mt_assert_own_thread(SimulatedBlock* block)
6792 {
6793 Uint32 thr_no = block->getThreadId();
6794 struct thr_data *thrptr = &g_thr_repository->m_thread[thr_no];
6795
6796 if (unlikely(my_thread_equal(thrptr->m_thr_id, my_thread_self()) == 0))
6797 {
6798 fprintf(stderr, "mt_assert_own_thread() - assertion-failure\n");
6799 fflush(stderr);
6800 abort();
6801 }
6802 }
6803 #endif
6804
6805
6806 Uint32
mt_get_blocklist(SimulatedBlock * block,Uint32 arr[],Uint32 len)6807 mt_get_blocklist(SimulatedBlock * block, Uint32 arr[], Uint32 len)
6808 {
6809 Uint32 thr_no = block->getThreadId();
6810 struct thr_data *thr_ptr = &g_thr_repository->m_thread[thr_no];
6811
6812 for (Uint32 i = 0; i < thr_ptr->m_instance_count; i++)
6813 {
6814 arr[i] = thr_ptr->m_instance_list[i];
6815 }
6816
6817 return thr_ptr->m_instance_count;
6818 }
6819
6820 void
mt_get_thr_stat(class SimulatedBlock * block,ndb_thr_stat * dst)6821 mt_get_thr_stat(class SimulatedBlock * block, ndb_thr_stat* dst)
6822 {
6823 bzero(dst, sizeof(* dst));
6824 Uint32 thr_no = block->getThreadId();
6825 struct thr_data *selfptr = &g_thr_repository->m_thread[thr_no];
6826
6827 THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
6828 dst->thr_no = thr_no;
6829 dst->name = conf.getName(selfptr->m_instance_list, selfptr->m_instance_count);
6830 dst->os_tid = NdbThread_GetTid(selfptr->m_thread);
6831 dst->loop_cnt = selfptr->m_stat.m_loop_cnt;
6832 dst->exec_cnt = selfptr->m_stat.m_exec_cnt;
6833 dst->wait_cnt = selfptr->m_stat.m_wait_cnt;
6834 dst->local_sent_prioa = selfptr->m_stat.m_prioa_count;
6835 dst->local_sent_priob = selfptr->m_stat.m_priob_count;
6836 }
6837
6838 TransporterReceiveHandle *
mt_get_trp_receive_handle(unsigned instance)6839 mt_get_trp_receive_handle(unsigned instance)
6840 {
6841 assert(instance > 0 && instance <= MAX_NDBMT_RECEIVE_THREADS);
6842 if (instance > 0 && instance <= MAX_NDBMT_RECEIVE_THREADS)
6843 {
6844 return g_trp_receive_handle_ptr[instance - 1 /* proxy */];
6845 }
6846 return 0;
6847 }
6848
6849 /**
6850 * Global data
6851 */
6852 static struct trp_callback g_trp_callback;
6853
6854 TransporterRegistry globalTransporterRegistry(&g_trp_callback, NULL,
6855 false);
6856