1 /* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
22
23 #include <ndb_global.h>
24
25 #include <VMSignal.hpp>
26 #include <kernel_types.h>
27 #include <Prio.hpp>
28 #include <SignalLoggerManager.hpp>
29 #include <SimulatedBlock.hpp>
30 #include <ErrorHandlingMacros.hpp>
31 #include <GlobalData.hpp>
32 #include <WatchDog.hpp>
33 #include <TransporterDefinitions.hpp>
34 #include "FastScheduler.hpp"
35 #include "mt.hpp"
36 #include <DebuggerNames.hpp>
37 #include <signaldata/StopForCrash.hpp>
38 #include "TransporterCallbackKernel.hpp"
39 #include <NdbSleep.h>
40 #include <portlib/ndb_prefetch.h>
41
42 #include "mt-asm.h"
43
44 inline
45 SimulatedBlock*
mt_getBlock(BlockNumber blockNo,Uint32 instanceNo)46 GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo)
47 {
48 SimulatedBlock* b = getBlock(blockNo);
49 if (b != 0 && instanceNo != 0)
50 b = b->getInstance(instanceNo);
51 return b;
52 }
53
54 #ifdef __GNUC__
55 /* Provides a small (but noticeable) speedup in benchmarks. */
56 #define memcpy __builtin_memcpy
57 #endif
58
59 /* size of a cacheline */
60 #define NDB_CL 64
61
62 /* Constants found by benchmarks to be reasonable values. */
63
64 /* Maximum number of signals to execute before sending to remote nodes. */
65 static const Uint32 MAX_SIGNALS_BEFORE_SEND = 200;
66
67 /*
68 * Max. signals to execute from one job buffer before considering other
69 * possible stuff to do.
70 */
71 static const Uint32 MAX_SIGNALS_PER_JB = 100;
72
73 /**
74 * Max signals written to other thread before calling flush_jbb_write_state
75 */
76 static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_RECEIVER = 2;
77 static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_OTHER = 20;
78 static const Uint32 MAX_SIGNALS_BEFORE_WAKEUP = 128;
79
80 //#define NDB_MT_LOCK_TO_CPU
81
82 #define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_LQH_WORKERS + 1) //main+lqh+extra
83 #define NUM_MAIN_THREADS 2 // except receiver
84 #define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_LQH_THREADS + 1)
85
86 /* If this is too small it crashes before first signal. */
87 #define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)
88
89 static Uint32 num_lqh_workers = 0;
90 static Uint32 num_lqh_threads = 0;
91 static Uint32 num_threads = 0;
92 static Uint32 receiver_thread_no = 0;
93
94 #define NO_SEND_THREAD (MAX_THREADS + 1)
95
96 /* max signal is 32 words, 7 for signal header and 25 datawords */
97 #define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / 32)
98
99 struct mt_lock_stat
100 {
101 const void * m_ptr;
102 char * m_name;
103 Uint32 m_contended_count;
104 Uint32 m_spin_count;
105 };
106 static void register_lock(const void * ptr, const char * name);
107 static mt_lock_stat * lookup_lock(const void * ptr);
108
109 #if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG)
110 #define USE_FUTEX
111 #endif
112
113 #ifdef USE_FUTEX
114 #ifndef _GNU_SOURCE
115 #define _GNU_SOURCE
116 #endif
117 #include <unistd.h>
118 #include <sys/syscall.h>
119 #include <sys/types.h>
120
121 #define FUTEX_WAIT 0
122 #define FUTEX_WAKE 1
123 #define FUTEX_FD 2
124 #define FUTEX_REQUEUE 3
125 #define FUTEX_CMP_REQUEUE 4
126 #define FUTEX_WAKE_OP 5
127
128 static inline
129 int
futex_wait(volatile unsigned * addr,int val,const struct timespec * timeout)130 futex_wait(volatile unsigned * addr, int val, const struct timespec * timeout)
131 {
132 return syscall(SYS_futex,
133 addr, FUTEX_WAIT, val, timeout, 0, 0) == 0 ? 0 : errno;
134 }
135
136 static inline
137 int
futex_wake(volatile unsigned * addr)138 futex_wake(volatile unsigned * addr)
139 {
140 return syscall(SYS_futex, addr, FUTEX_WAKE, 1, 0, 0, 0) == 0 ? 0 : errno;
141 }
142
143 struct thr_wait
144 {
145 volatile unsigned m_futex_state;
146 enum {
147 FS_RUNNING = 0,
148 FS_SLEEPING = 1
149 };
thr_waitthr_wait150 thr_wait() { xcng(&m_futex_state, FS_RUNNING);}
initthr_wait151 void init () {}
152 };
153
154 /**
155 * Sleep until woken up or timeout occurs.
156 *
157 * Will call check_callback(check_arg) after proper synchronisation, and only
158 * if that returns true will it actually sleep, else it will return
159 * immediately. This is needed to avoid races with wakeup.
160 *
161 * Returns 'true' if it actually did sleep.
162 */
163 static inline
164 bool
yield(struct thr_wait * wait,const Uint32 nsec,bool (* check_callback)(struct thr_data *),struct thr_data * check_arg)165 yield(struct thr_wait* wait, const Uint32 nsec,
166 bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
167 {
168 volatile unsigned * val = &wait->m_futex_state;
169 #ifndef NDEBUG
170 int old =
171 #endif
172 xcng(val, thr_wait::FS_SLEEPING);
173 assert(old == thr_wait::FS_RUNNING);
174
175 /**
176 * At this point, we need to re-check the condition that made us decide to
177 * sleep, and skip sleeping if it changed..
178 *
179 * Otherwise, the condition may have not changed, and the thread making the
180 * change have already decided not to wake us, as our state was FS_RUNNING
181 * at the time.
182 *
183 * Also need a memory barrier to ensure this extra check is race-free.
184 * but that is already provided by xcng
185 */
186 bool waited = (*check_callback)(check_arg);
187 if (waited)
188 {
189 struct timespec timeout;
190 timeout.tv_sec = 0;
191 timeout.tv_nsec = nsec;
192 futex_wait(val, thr_wait::FS_SLEEPING, &timeout);
193 }
194 xcng(val, thr_wait::FS_RUNNING);
195 return waited;
196 }
197
198 static inline
199 int
wakeup(struct thr_wait * wait)200 wakeup(struct thr_wait* wait)
201 {
202 volatile unsigned * val = &wait->m_futex_state;
203 /**
204 * We must ensure that any state update (new data in buffers...) are visible
205 * to the other thread before we can look at the sleep state of that other
206 * thread.
207 */
208 if (xcng(val, thr_wait::FS_RUNNING) == thr_wait::FS_SLEEPING)
209 {
210 return futex_wake(val);
211 }
212 return 0;
213 }
214 #else
215 #include <NdbMutex.h>
216 #include <NdbCondition.h>
217
218 struct thr_wait
219 {
220 bool m_need_wakeup;
221 NdbMutex *m_mutex;
222 NdbCondition *m_cond;
thr_waitthr_wait223 thr_wait() : m_need_wakeup(false), m_mutex(0), m_cond(0) {}
224
initthr_wait225 void init() {
226 m_mutex = NdbMutex_Create();
227 m_cond = NdbCondition_Create();
228 }
229 };
230
231 static inline
232 bool
yield(struct thr_wait * wait,const Uint32 nsec,bool (* check_callback)(struct thr_data *),struct thr_data * check_arg)233 yield(struct thr_wait* wait, const Uint32 nsec,
234 bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
235 {
236 struct timespec end;
237 NdbCondition_ComputeAbsTime(&end, nsec/1000000);
238 NdbMutex_Lock(wait->m_mutex);
239
240 Uint32 waits = 0;
241 /* May have spurious wakeups: Always recheck condition predicate */
242 while ((*check_callback)(check_arg))
243 {
244 wait->m_need_wakeup = true;
245 waits++;
246 if (NdbCondition_WaitTimeoutAbs(wait->m_cond,
247 wait->m_mutex, &end) == ETIMEDOUT)
248 {
249 wait->m_need_wakeup = false;
250 break;
251 }
252 }
253 NdbMutex_Unlock(wait->m_mutex);
254 return (waits > 0);
255 }
256
257
258 static inline
259 int
wakeup(struct thr_wait * wait)260 wakeup(struct thr_wait* wait)
261 {
262 NdbMutex_Lock(wait->m_mutex);
263 // We should avoid signaling when not waiting for wakeup
264 if (wait->m_need_wakeup)
265 {
266 wait->m_need_wakeup = false;
267 NdbCondition_Signal(wait->m_cond);
268 }
269 NdbMutex_Unlock(wait->m_mutex);
270 return 0;
271 }
272
273 #endif
274
275 #ifdef NDB_HAVE_XCNG
276 template <unsigned SZ>
277 struct thr_spin_lock
278 {
thr_spin_lockthr_spin_lock279 thr_spin_lock(const char * name = 0)
280 {
281 m_lock = 0;
282 register_lock(this, name);
283 }
284
285 union {
286 volatile Uint32 m_lock;
287 char pad[SZ];
288 };
289 };
290
291 static
292 ATTRIBUTE_NOINLINE
293 void
lock_slow(void * sl,volatile unsigned * val)294 lock_slow(void * sl, volatile unsigned * val)
295 {
296 mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock
297
298 loop:
299 Uint32 spins = 0;
300 do {
301 spins++;
302 cpu_pause();
303 } while (* val == 1);
304
305 if (unlikely(xcng(val, 1) != 0))
306 goto loop;
307
308 if (s)
309 {
310 s->m_spin_count += spins;
311 Uint32 count = ++s->m_contended_count;
312 Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
313
314 if ((count % freq) == 0)
315 printf("%s waiting for lock, contentions: %u spins: %u\n",
316 s->m_name, count, s->m_spin_count);
317 }
318 }
319
320 template <unsigned SZ>
321 static
322 inline
323 void
lock(struct thr_spin_lock<SZ> * sl)324 lock(struct thr_spin_lock<SZ>* sl)
325 {
326 volatile unsigned* val = &sl->m_lock;
327 if (likely(xcng(val, 1) == 0))
328 return;
329
330 lock_slow(sl, val);
331 }
332
333 template <unsigned SZ>
334 static
335 inline
336 void
unlock(struct thr_spin_lock<SZ> * sl)337 unlock(struct thr_spin_lock<SZ>* sl)
338 {
339 /**
340 * Memory barrier here, to make sure all of our stores are visible before
341 * the lock release is.
342 */
343 mb();
344 sl->m_lock = 0;
345 }
346
347 template <unsigned SZ>
348 static
349 inline
350 int
trylock(struct thr_spin_lock<SZ> * sl)351 trylock(struct thr_spin_lock<SZ>* sl)
352 {
353 volatile unsigned* val = &sl->m_lock;
354 return xcng(val, 1);
355 }
356 #else
357 #define thr_spin_lock thr_mutex
358 #endif
359
360 template <unsigned SZ>
361 struct thr_mutex
362 {
thr_mutexthr_mutex363 thr_mutex(const char * name = 0) {
364 NdbMutex_Init(&m_mutex);
365 register_lock(this, name);
366 }
367
368 union {
369 NdbMutex m_mutex;
370 char pad[SZ];
371 };
372 };
373
374 template <unsigned SZ>
375 static
376 inline
377 void
lock(struct thr_mutex<SZ> * sl)378 lock(struct thr_mutex<SZ>* sl)
379 {
380 NdbMutex_Lock(&sl->m_mutex);
381 }
382
383 template <unsigned SZ>
384 static
385 inline
386 void
unlock(struct thr_mutex<SZ> * sl)387 unlock(struct thr_mutex<SZ>* sl)
388 {
389 NdbMutex_Unlock(&sl->m_mutex);
390 }
391
392 template <unsigned SZ>
393 static
394 inline
395 int
trylock(struct thr_mutex<SZ> * sl)396 trylock(struct thr_mutex<SZ> * sl)
397 {
398 return NdbMutex_Trylock(&sl->m_mutex);
399 }
400
401 /**
402 * thr_safe_pool
403 */
404 template<typename T>
405 struct thr_safe_pool
406 {
thr_safe_poolthr_safe_pool407 thr_safe_pool(const char * name) : m_free_list(0), m_cnt(0), m_lock(name) {}
408
409 T* m_free_list;
410 Uint32 m_cnt;
411 thr_spin_lock<NDB_CL - (sizeof(void*) + sizeof(Uint32))> m_lock;
412
seizethr_safe_pool413 T* seize(Ndbd_mem_manager *mm, Uint32 rg) {
414 T* ret = 0;
415 lock(&m_lock);
416 if (m_free_list)
417 {
418 assert(m_cnt);
419 m_cnt--;
420 ret = m_free_list;
421 m_free_list = ret->m_next;
422 unlock(&m_lock);
423 }
424 else
425 {
426 Uint32 dummy;
427 unlock(&m_lock);
428 ret = reinterpret_cast<T*>
429 (mm->alloc_page(rg, &dummy,
430 Ndbd_mem_manager::NDB_ZONE_ANY));
431 // ToDo: How to deal with failed allocation?!?
432 // I think in this case we need to start grabbing buffers kept for signal
433 // trace.
434 }
435 return ret;
436 }
437
releasethr_safe_pool438 void release(Ndbd_mem_manager *mm, Uint32 rg, T* t) {
439 lock(&m_lock);
440 t->m_next = m_free_list;
441 m_free_list = t;
442 m_cnt++;
443 unlock(&m_lock);
444 }
445
release_listthr_safe_pool446 void release_list(Ndbd_mem_manager *mm, Uint32 rg,
447 T* head, T* tail, Uint32 cnt) {
448 lock(&m_lock);
449 tail->m_next = m_free_list;
450 m_free_list = head;
451 m_cnt += cnt;
452 unlock(&m_lock);
453 }
454 };
455
456 /**
457 * thread_local_pool
458 */
459 template<typename T>
460 class thread_local_pool
461 {
462 public:
thread_local_pool(thr_safe_pool<T> * global_pool,unsigned max_free)463 thread_local_pool(thr_safe_pool<T> *global_pool, unsigned max_free) :
464 m_max_free(max_free),
465 m_free(0),
466 m_freelist(0),
467 m_global_pool(global_pool)
468 {
469 }
470
seize(Ndbd_mem_manager * mm,Uint32 rg)471 T *seize(Ndbd_mem_manager *mm, Uint32 rg) {
472 T *tmp = m_freelist;
473 if (tmp)
474 {
475 m_freelist = tmp->m_next;
476 assert(m_free > 0);
477 m_free--;
478 }
479 else
480 tmp = m_global_pool->seize(mm, rg);
481
482 validate();
483 return tmp;
484 }
485
release(Ndbd_mem_manager * mm,Uint32 rg,T * t)486 void release(Ndbd_mem_manager *mm, Uint32 rg, T *t) {
487 unsigned free = m_free;
488 if (free < m_max_free)
489 {
490 m_free = free + 1;
491 t->m_next = m_freelist;
492 m_freelist = t;
493 }
494 else
495 m_global_pool->release(mm, rg, t);
496
497 validate();
498 }
499
500 /**
501 * Release to local pool even if it get's "too" full
502 * (wrt to m_max_free)
503 */
release_local(T * t)504 void release_local(T *t) {
505 m_free++;
506 t->m_next = m_freelist;
507 m_freelist = t;
508
509 validate();
510 }
511
validate() const512 void validate() const {
513 #ifdef VM_TRACE
514 Uint32 cnt = 0;
515 T* t = m_freelist;
516 while (t)
517 {
518 cnt++;
519 t = t->m_next;
520 }
521 assert(cnt == m_free);
522 #endif
523 }
524
525 /**
526 * Release entries so that m_max_free is honored
527 * (likely used together with release_local)
528 */
release_global(Ndbd_mem_manager * mm,Uint32 rg)529 void release_global(Ndbd_mem_manager *mm, Uint32 rg) {
530 validate();
531 unsigned cnt = 0;
532 unsigned free = m_free;
533 Uint32 maxfree = m_max_free;
534 assert(maxfree > 0);
535
536 T* head = m_freelist;
537 T* tail = m_freelist;
538 if (free > maxfree)
539 {
540 cnt++;
541 free--;
542
543 while (free > maxfree)
544 {
545 cnt++;
546 free--;
547 tail = tail->m_next;
548 }
549
550 assert(free == maxfree);
551
552 m_free = free;
553 m_freelist = tail->m_next;
554 m_global_pool->release_list(mm, rg, head, tail, cnt);
555 }
556 validate();
557 }
558
release_all(Ndbd_mem_manager * mm,Uint32 rg)559 void release_all(Ndbd_mem_manager *mm, Uint32 rg) {
560 validate();
561 T* head = m_freelist;
562 T* tail = m_freelist;
563 if (tail)
564 {
565 unsigned cnt = 1;
566 while (tail->m_next != 0)
567 {
568 cnt++;
569 tail = tail->m_next;
570 }
571 m_global_pool->release_list(mm, rg, head, tail, cnt);
572 m_free = 0;
573 m_freelist = 0;
574 }
575 validate();
576 }
577
set_pool(thr_safe_pool<T> * pool)578 void set_pool(thr_safe_pool<T> * pool) { m_global_pool = pool; }
579
580 private:
581 unsigned m_max_free;
582 unsigned m_free;
583 T *m_freelist;
584 thr_safe_pool<T> *m_global_pool;
585 };
586
587 /**
588 * Signal buffers.
589 *
590 * Each thread job queue contains a list of these buffers with signals.
591 *
592 * There is an underlying assumption that the size of this structure is the
593 * same as the global memory manager page size.
594 */
595 struct thr_job_buffer // 32k
596 {
597 static const unsigned SIZE = 8190;
598
599 /*
600 * Amount of signal data currently in m_data buffer.
601 * Read/written by producer, read by consumer.
602 */
603 Uint32 m_len;
604 /*
605 * Whether this buffer contained prio A or prio B signals, used when dumping
606 * signals from released buffers.
607 */
608 Uint32 m_prioa;
609 union {
610 Uint32 m_data[SIZE];
611
612 thr_job_buffer * m_next; // For free-list
613 };
614 };
615
616 static
617 inline
618 Uint32
calc_fifo_used(Uint32 ri,Uint32 wi,Uint32 sz)619 calc_fifo_used(Uint32 ri, Uint32 wi, Uint32 sz)
620 {
621 return (wi >= ri) ? wi - ri : (sz - ri) + wi;
622 }
623
624 /**
625 * thr_job_queue is shared between consumer / producer.
626 *
627 * The hot-spot of the thr_job_queue are the read/write indexes.
628 * As they are updated and read frequently they have been placed
629 * in its own thr_job_queue_head[] in order to make them fit inside a
630 * single/few cache lines and thereby avoid complete L1-cache replacement
631 * every time the job_queue is scanned.
632 */
633 struct thr_job_queue_head
634 {
635 unsigned m_read_index; // Read/written by consumer, read by producer
636 unsigned m_write_index; // Read/written by producer, read by consumer
637
638 Uint32 used() const;
639 };
640
641 struct thr_job_queue
642 {
643 static const unsigned SIZE = 31;
644
645 struct thr_job_queue_head* m_head;
646 struct thr_job_buffer* m_buffers[SIZE];
647 };
648
649 inline
650 Uint32
used() const651 thr_job_queue_head::used() const
652 {
653 return calc_fifo_used(m_read_index, m_write_index, thr_job_queue::SIZE);
654 }
655
656 /*
657 * Two structures tightly associated with thr_job_queue.
658 *
659 * There will generally be exactly one thr_jb_read_state and one
660 * thr_jb_write_state associated with each thr_job_queue.
661 *
662 * The reason they are kept separate is to avoid unnecessary inter-CPU
663 * cache line pollution. All fields shared among producer and consumer
664 * threads are in thr_job_queue, thr_jb_write_state fields are only
665 * accessed by the producer thread(s), and thr_jb_read_state fields are
666 * only accessed by the consumer thread.
667 *
668 * For example, on Intel core 2 quad processors, there is a ~33%
669 * penalty for two cores accessing the same 64-byte cacheline.
670 */
671 struct thr_jb_write_state
672 {
673 /*
674 * The position to insert the next signal into the queue.
675 *
676 * m_write_index is the index into thr_job_queue::m_buffers[] of the buffer
677 * to insert into, and m_write_pos is the index into thr_job_buffer::m_data[]
678 * at which to store the next signal.
679 */
680 Uint32 m_write_index;
681 Uint32 m_write_pos;
682
683 /* Thread-local copy of thr_job_queue::m_buffers[m_write_index]. */
684 thr_job_buffer *m_write_buffer;
685
686 /* Number of signals inserted since last flush to thr_job_queue. */
687 Uint32 m_pending_signals;
688
689 /* Number of signals inserted since last wakeup */
690 Uint32 m_pending_signals_wakeup;
691 };
692
693 /*
694 * This structure is also used when dumping signal traces, to dump executed
695 * signals from the buffer(s) currently being processed.
696 */
697 struct thr_jb_read_state
698 {
699 /*
700 * Index into thr_job_queue::m_buffers[] of the buffer that we are currently
701 * executing signals from.
702 */
703 Uint32 m_read_index;
704 /*
705 * Index into m_read_buffer->m_data[] of the next signal to execute from the
706 * current buffer.
707 */
708 Uint32 m_read_pos;
709 /*
710 * Thread local copy of thr_job_queue::m_buffers[m_read_index].
711 */
712 thr_job_buffer *m_read_buffer;
713 /*
714 * These are thread-local copies of thr_job_queue::m_write_index and
715 * thr_job_buffer::m_len. They are read once at the start of the signal
716 * execution loop and used to determine when the end of available signals is
717 * reached.
718 */
719 Uint32 m_read_end; // End within current thr_job_buffer. (*m_read_buffer)
720
721 Uint32 m_write_index; // Last available thr_job_buffer.
722
is_emptythr_jb_read_state723 bool is_empty() const
724 {
725 assert(m_read_index != m_write_index || m_read_pos <= m_read_end);
726 return (m_read_index == m_write_index) && (m_read_pos >= m_read_end);
727 }
728 };
729
730 /**
731 * time-queue
732 */
733 struct thr_tq
734 {
735 static const unsigned SQ_SIZE = 512;
736 static const unsigned LQ_SIZE = 512;
737 static const unsigned PAGES = 32 * (SQ_SIZE + LQ_SIZE) / 8192;
738
739 Uint32 * m_delayed_signals[PAGES];
740 Uint32 m_next_free;
741 Uint32 m_next_timer;
742 Uint32 m_current_time;
743 Uint32 m_cnt[2];
744 Uint32 m_short_queue[SQ_SIZE];
745 Uint32 m_long_queue[LQ_SIZE];
746 };
747
748 /*
749 * Max number of thread-local job buffers to keep before releasing to
750 * global pool.
751 */
752 #define THR_FREE_BUF_MAX 32
753 /* Minimum number of buffers (to ensure useful trace dumps). */
754 #define THR_FREE_BUF_MIN 12
755 /*
756 * 1/THR_FREE_BUF_BATCH is the fraction of job buffers to allocate/free
757 * at a time from/to global pool.
758 */
759 #define THR_FREE_BUF_BATCH 6
760
761 /**
762 * a page with send data
763 */
764 struct thr_send_page
765 {
766 static const Uint32 PGSIZE = 32768;
767 #if SIZEOF_CHARP == 4
768 static const Uint32 HEADER_SIZE = 8;
769 #else
770 static const Uint32 HEADER_SIZE = 12;
771 #endif
772
max_bytesthr_send_page773 static Uint32 max_bytes() {
774 return PGSIZE - offsetof(thr_send_page, m_data);
775 }
776
777 /* Next page */
778 thr_send_page* m_next;
779
780 /* Bytes of send data available in this page. */
781 Uint16 m_bytes;
782
783 /* Start of unsent data */
784 Uint16 m_start;
785
786 /* Data; real size is to the end of one page. */
787 char m_data[2];
788 };
789
790 /**
791 * a linked list with thr_send_page
792 */
793 struct thr_send_buffer
794 {
795 thr_send_page* m_first_page;
796 thr_send_page* m_last_page;
797 };
798
799 /**
800 * a ring buffer with linked list of thr_send_page
801 */
802 struct thr_send_queue
803 {
804 unsigned m_write_index;
805 #if SIZEOF_CHARP == 8
806 unsigned m_unused;
807 thr_send_page* m_buffers[7];
808 static const unsigned SIZE = 7;
809 #else
810 thr_send_page* m_buffers[15];
811 static const unsigned SIZE = 15;
812 #endif
813 };
814
815 struct thr_data
816 {
thr_datathr_data817 thr_data() : m_jba_write_lock("jbalock"),
818 m_send_buffer_pool(0, THR_FREE_BUF_MAX) {}
819
820 thr_wait m_waiter;
821 unsigned m_thr_no;
822
823 /**
824 * max signals to execute per JBB buffer
825 */
826 unsigned m_max_signals_per_jb;
827
828 /**
829 * max signals to execute before recomputing m_max_signals_per_jb
830 */
831 unsigned m_max_exec_signals;
832
833 Uint64 m_time;
834 struct thr_tq m_tq;
835
836 /* Prio A signal incoming queue. */
837 struct thr_spin_lock<64> m_jba_write_lock;
838 struct thr_job_queue m_jba;
839
840 struct thr_job_queue_head m_jba_head;
841
842 /* Thread-local read state of prio A buffer. */
843 struct thr_jb_read_state m_jba_read_state;
844 /*
845 * There is no m_jba_write_state, as we have multiple writers to the prio A
846 * queue, so local state becomes invalid as soon as we release the lock.
847 */
848
849 /*
850 * In m_next_buffer we keep a free buffer at all times, so that when
851 * we hold the lock and find we need a new buffer, we can use this and this
852 * way defer allocation to after releasing the lock.
853 */
854 struct thr_job_buffer* m_next_buffer;
855
856 /*
857 * We keep a small number of buffers in a thread-local cyclic FIFO, so that
858 * we can avoid going to the global pool in most cases, and so that we have
859 * recent buffers available for dumping in trace files.
860 */
861 struct thr_job_buffer *m_free_fifo[THR_FREE_BUF_MAX];
862 /* m_first_free is the index of the entry to return next from seize(). */
863 Uint32 m_first_free;
864 /* m_first_unused is the first unused entry in m_free_fifo. */
865 Uint32 m_first_unused;
866
867 /*
868 * These are the thread input queues, where other threads deliver signals
869 * into.
870 */
871 struct thr_job_queue_head m_in_queue_head[MAX_THREADS];
872 struct thr_job_queue m_in_queue[MAX_THREADS];
873 /* These are the write states of m_in_queue[self] in each thread. */
874 struct thr_jb_write_state m_write_states[MAX_THREADS];
875 /* These are the read states of all of our own m_in_queue[]. */
876 struct thr_jb_read_state m_read_states[MAX_THREADS];
877
878 /* Jam buffers for making trace files at crashes. */
879 EmulatedJamBuffer m_jam;
880 /* Watchdog counter for this thread. */
881 Uint32 m_watchdog_counter;
882 /* Signal delivery statistics. */
883 Uint32 m_prioa_count;
884 Uint32 m_prioa_size;
885 Uint32 m_priob_count;
886 Uint32 m_priob_size;
887
888 /* Array of node ids with pending remote send data. */
889 Uint8 m_pending_send_nodes[MAX_NTRANSPORTERS];
890 /* Number of node ids in m_pending_send_nodes. */
891 Uint32 m_pending_send_count;
892
893 /**
894 * Bitmap of pending node ids with send data.
895 * Used to quickly check if a node id is already in m_pending_send_nodes.
896 */
897 Bitmask<(MAX_NTRANSPORTERS+31)/32> m_pending_send_mask;
898
899 /* pool for send buffers */
900 class thread_local_pool<thr_send_page> m_send_buffer_pool;
901
902 /* Send buffer for this thread, these are not touched by any other thread */
903 struct thr_send_buffer m_send_buffers[MAX_NTRANSPORTERS];
904
905 /* Block instances (main and worker) handled by this thread. */
906 /* Used for sendpacked (send-at-job-buffer-end). */
907 Uint32 m_instance_count;
908 BlockNumber m_instance_list[MAX_INSTANCES_PER_THREAD];
909
910 SectionSegmentPool::Cache m_sectionPoolCache;
911
912 Uint32 m_cpu;
913 pthread_t m_thr_id;
914 NdbThread* m_thread;
915 };
916
917 struct mt_send_handle : public TransporterSendBufferHandle
918 {
919 struct thr_data * m_selfptr;
mt_send_handlemt_send_handle920 mt_send_handle(thr_data* ptr) : m_selfptr(ptr) {}
~mt_send_handlemt_send_handle921 virtual ~mt_send_handle() {}
922
923 virtual Uint32 *getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max);
924 virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
925 virtual bool forceSend(NodeId node);
926 };
927
928 struct trp_callback : public TransporterCallbackKernel
929 {
trp_callbacktrp_callback930 trp_callback() {}
931
932 /* Callback interface. */
933 int checkJobBuffer();
934 void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
935 void lock_transporter(NodeId node);
936 void unlock_transporter(NodeId node);
937 Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
938 Uint32 bytes_sent(NodeId node, Uint32 bytes);
939 bool has_data_to_send(NodeId node);
940 void reset_send_buffer(NodeId node, bool should_be_empty);
941 };
942
943 extern trp_callback g_trp_callback; // Forward declaration
944 extern struct thr_repository g_thr_repository;
945
946 #include <NdbMutex.h>
947 #include <NdbCondition.h>
948
949 struct thr_repository
950 {
thr_repositorythr_repository951 thr_repository()
952 : m_receive_lock("recvlock"),
953 m_section_lock("sectionlock"),
954 m_mem_manager_lock("memmanagerlock"),
955 m_jb_pool("jobbufferpool"),
956 m_sb_pool("sendbufferpool")
957 {}
958
959 struct thr_spin_lock<64> m_receive_lock;
960 struct thr_spin_lock<64> m_section_lock;
961 struct thr_spin_lock<64> m_mem_manager_lock;
962 struct thr_safe_pool<thr_job_buffer> m_jb_pool;
963 struct thr_safe_pool<thr_send_page> m_sb_pool;
964 Ndbd_mem_manager * m_mm;
965 unsigned m_thread_count;
966 struct thr_data m_thread[MAX_THREADS];
967
968 /**
969 * send buffer handling
970 */
971
972 /* The buffers that are to be sent */
973 struct send_buffer
974 {
975 /**
976 * lock
977 */
978 struct thr_spin_lock<8> m_send_lock;
979
980 /**
981 * pending data
982 */
983 struct thr_send_buffer m_buffer;
984
985 /**
986 * Flag used to coordinate sending to same remote node from different
987 * threads.
988 *
989 * If two threads need to send to the same node at the same time, the
990 * second thread, rather than wait for the first to finish, will just
991 * set this flag, and the first thread will do an extra send when done
992 * with the first.
993 */
994 Uint32 m_force_send;
995
996 /**
997 * Which thread is currently holding the m_send_lock
998 */
999 Uint32 m_send_thread;
1000
1001 /**
1002 * bytes pending for this node
1003 */
1004 Uint32 m_bytes;
1005
1006 /* read index(es) in thr_send_queue */
1007 Uint32 m_read_index[MAX_THREADS];
1008 } m_send_buffers[MAX_NTRANSPORTERS];
1009
1010 /* The buffers published by threads */
1011 thr_send_queue m_thread_send_buffers[MAX_NTRANSPORTERS][MAX_THREADS];
1012
1013 /*
1014 * These are used to synchronize during crash / trace dumps.
1015 *
1016 */
1017 NdbMutex stop_for_crash_mutex;
1018 NdbCondition stop_for_crash_cond;
1019 Uint32 stopped_threads;
1020 };
1021
1022 #if 0
1023 static
1024 Uint32
1025 fifo_used_pages(struct thr_data* selfptr)
1026 {
1027 return calc_fifo_used(selfptr->m_first_unused,
1028 selfptr->m_first_free,
1029 THR_FREE_BUF_MAX);
1030 }
1031 #endif
1032
1033 static
1034 void
job_buffer_full(struct thr_data * selfptr)1035 job_buffer_full(struct thr_data* selfptr)
1036 {
1037 ndbout_c("job buffer full");
1038 abort();
1039 }
1040
1041 static
1042 void
out_of_job_buffer(struct thr_data * selfptr)1043 out_of_job_buffer(struct thr_data* selfptr)
1044 {
1045 ndbout_c("out of job buffer");
1046 abort();
1047 }
1048
1049 static
1050 thr_job_buffer*
seize_buffer(struct thr_repository * rep,int thr_no,bool prioa)1051 seize_buffer(struct thr_repository* rep, int thr_no, bool prioa)
1052 {
1053 thr_job_buffer* jb;
1054 thr_data* selfptr = rep->m_thread + thr_no;
1055 Uint32 first_free = selfptr->m_first_free;
1056 Uint32 first_unused = selfptr->m_first_unused;
1057
1058 /*
1059 * An empty FIFO is denoted by m_first_free == m_first_unused.
1060 * So we will never have a completely full FIFO array, at least one entry will
1061 * always be unused. But the code is simpler as a result.
1062 */
1063
1064 /*
1065 * We never allow the fifo to become completely empty, as we want to have
1066 * a good number of signals available for trace files in case of a forced
1067 * shutdown.
1068 */
1069 Uint32 buffers = (first_free > first_unused ?
1070 first_unused + THR_FREE_BUF_MAX - first_free :
1071 first_unused - first_free);
1072 if (unlikely(buffers <= THR_FREE_BUF_MIN))
1073 {
1074 /*
1075 * All used, allocate another batch from global pool.
1076 *
1077 * Put the new buffers at the head of the fifo, so as not to needlessly
1078 * push out any existing buffers from the fifo (that would loose useful
1079 * data for signal dumps in trace files).
1080 */
1081 Uint32 cnt = 0;
1082 Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
1083 assert(batch > 0);
1084 assert(batch + THR_FREE_BUF_MIN < THR_FREE_BUF_MAX);
1085 do {
1086 jb = rep->m_jb_pool.seize(rep->m_mm, RG_JOBBUFFER);
1087 if (unlikely(jb == 0))
1088 {
1089 if (unlikely(cnt == 0))
1090 {
1091 out_of_job_buffer(selfptr);
1092 }
1093 break;
1094 }
1095 jb->m_len = 0;
1096 jb->m_prioa = false;
1097 first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
1098 selfptr->m_free_fifo[first_free] = jb;
1099 batch--;
1100 } while (cnt < batch);
1101 selfptr->m_first_free = first_free;
1102 }
1103
1104 jb= selfptr->m_free_fifo[first_free];
1105 selfptr->m_first_free = (first_free + 1) % THR_FREE_BUF_MAX;
1106 /* Init here rather than in release_buffer() so signal dump will work. */
1107 jb->m_len = 0;
1108 jb->m_prioa = prioa;
1109 return jb;
1110 }
1111
1112 static
1113 void
release_buffer(struct thr_repository * rep,int thr_no,thr_job_buffer * jb)1114 release_buffer(struct thr_repository* rep, int thr_no, thr_job_buffer* jb)
1115 {
1116 struct thr_data* selfptr = rep->m_thread + thr_no;
1117 Uint32 first_free = selfptr->m_first_free;
1118 Uint32 first_unused = selfptr->m_first_unused;
1119
1120 /*
1121 * Pack near-empty signals, to get more info in the signal traces.
1122 *
1123 * This is not currently used, as we only release full job buffers, hence
1124 * the #if 0.
1125 */
1126 #if 0
1127 Uint32 last_free = (first_unused ? first_unused : THR_FREE_BUF_MAX) - 1;
1128 thr_job_buffer *last_jb = selfptr->m_free_fifo[last_free];
1129 Uint32 len1, len2;
1130
1131 if (!jb->m_prioa &&
1132 first_free != first_unused &&
1133 !last_jb->m_prioa &&
1134 (len2 = jb->m_len) <= (thr_job_buffer::SIZE / 4) &&
1135 (len1 = last_jb->m_len) + len2 <= thr_job_buffer::SIZE)
1136 {
1137 /*
1138 * The buffer being release is fairly empty, and what data it contains fit
1139 * in the previously released buffer.
1140 *
1141 * We want to avoid too many almost-empty buffers in the free fifo, as that
1142 * makes signal traces less useful due to too little data available. So in
1143 * this case we move the data from the buffer to be released into the
1144 * previous buffer, and place the to-be-released buffer at the head of the
1145 * fifo (to be immediately reused).
1146 *
1147 * This is only done for prio B buffers, as we must not merge prio A and B
1148 * data (or dumps would be incorrect), and prio A buffers are in any case
1149 * full when released.
1150 */
1151 memcpy(last_jb->m_data + len1, jb->m_data, len2*sizeof(jb->m_data[0]));
1152 last_jb->m_len = len1 + len2;
1153 jb->m_len = 0;
1154 first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
1155 selfptr->m_free_fifo[first_free] = jb;
1156 selfptr->m_first_free = first_free;
1157 }
1158 else
1159 #endif
1160 {
1161 /* Just insert at the end of the fifo. */
1162 selfptr->m_free_fifo[first_unused] = jb;
1163 first_unused = (first_unused + 1) % THR_FREE_BUF_MAX;
1164 selfptr->m_first_unused = first_unused;
1165 }
1166
1167 if (unlikely(first_unused == first_free))
1168 {
1169 /* FIFO full, need to release to global pool. */
1170 Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
1171 assert(batch > 0);
1172 assert(batch < THR_FREE_BUF_MAX);
1173 do {
1174 rep->m_jb_pool.release(rep->m_mm, RG_JOBBUFFER,
1175 selfptr->m_free_fifo[first_free]);
1176 first_free = (first_free + 1) % THR_FREE_BUF_MAX;
1177 batch--;
1178 } while (batch > 0);
1179 selfptr->m_first_free = first_free;
1180 }
1181 }
1182
1183 static
1184 inline
1185 Uint32
scan_queue(struct thr_data * selfptr,Uint32 cnt,Uint32 end,Uint32 * ptr)1186 scan_queue(struct thr_data* selfptr, Uint32 cnt, Uint32 end, Uint32* ptr)
1187 {
1188 Uint32 thr_no = selfptr->m_thr_no;
1189 Uint32 **pages = selfptr->m_tq.m_delayed_signals;
1190 Uint32 free = selfptr->m_tq.m_next_free;
1191 Uint32* save = ptr;
1192 for (Uint32 i = 0; i < cnt; i++, ptr++)
1193 {
1194 Uint32 val = * ptr;
1195 if ((val & 0xFFFF) <= end)
1196 {
1197 Uint32 idx = val >> 16;
1198 Uint32 buf = idx >> 8;
1199 Uint32 pos = 32 * (idx & 0xFF);
1200
1201 Uint32* page = * (pages + buf);
1202
1203 const SignalHeader *s = reinterpret_cast<SignalHeader*>(page + pos);
1204 const Uint32 *data = page + pos + (sizeof(*s)>>2);
1205 if (0)
1206 ndbout_c("found %p val: %d end: %d", s, val & 0xFFFF, end);
1207 /*
1208 * ToDo: Do measurements of the frequency of these prio A timed signals.
1209 *
1210 * If they are frequent, we may want to optimize, as sending one prio A
1211 * signal is somewhat expensive compared to sending one prio B.
1212 */
1213 sendprioa(thr_no, s, data,
1214 data + s->theLength);
1215 * (page + pos) = free;
1216 free = idx;
1217 }
1218 else if (i > 0)
1219 {
1220 selfptr->m_tq.m_next_free = free;
1221 memmove(save, ptr, 4 * (cnt - i));
1222 return i;
1223 }
1224 else
1225 {
1226 return 0;
1227 }
1228 }
1229 selfptr->m_tq.m_next_free = free;
1230 return cnt;
1231 }
1232
1233 static
1234 void
handle_time_wrap(struct thr_data * selfptr)1235 handle_time_wrap(struct thr_data* selfptr)
1236 {
1237 Uint32 i;
1238 struct thr_tq * tq = &selfptr->m_tq;
1239 Uint32 cnt0 = tq->m_cnt[0];
1240 Uint32 cnt1 = tq->m_cnt[1];
1241 Uint32 tmp0 = scan_queue(selfptr, cnt0, 32767, tq->m_short_queue);
1242 Uint32 tmp1 = scan_queue(selfptr, cnt1, 32767, tq->m_long_queue);
1243 cnt0 -= tmp0;
1244 cnt1 -= tmp1;
1245 tq->m_cnt[0] = cnt0;
1246 tq->m_cnt[1] = cnt1;
1247 for (i = 0; i<cnt0; i++)
1248 {
1249 assert((tq->m_short_queue[i] & 0xFFFF) > 32767);
1250 tq->m_short_queue[i] -= 32767;
1251 }
1252 for (i = 0; i<cnt1; i++)
1253 {
1254 assert((tq->m_long_queue[i] & 0xFFFF) > 32767);
1255 tq->m_long_queue[i] -= 32767;
1256 }
1257 }
1258
1259 static
1260 void
scan_time_queues_impl(struct thr_data * selfptr,NDB_TICKS now)1261 scan_time_queues_impl(struct thr_data* selfptr, NDB_TICKS now)
1262 {
1263 struct thr_tq * tq = &selfptr->m_tq;
1264 NDB_TICKS last = selfptr->m_time;
1265
1266 Uint32 curr = tq->m_current_time;
1267 Uint32 cnt0 = tq->m_cnt[0];
1268 Uint32 cnt1 = tq->m_cnt[1];
1269
1270 assert(now > last);
1271 Uint64 diff = now - last;
1272 Uint32 step = (Uint32)((diff > 20) ? 20 : diff);
1273 Uint32 end = (curr + step);
1274 if (end >= 32767)
1275 {
1276 handle_time_wrap(selfptr);
1277 cnt0 = tq->m_cnt[0];
1278 cnt1 = tq->m_cnt[1];
1279 end -= 32767;
1280 }
1281
1282 Uint32 tmp0 = scan_queue(selfptr, cnt0, end, tq->m_short_queue);
1283 Uint32 tmp1 = scan_queue(selfptr, cnt1, end, tq->m_long_queue);
1284
1285 tq->m_current_time = end;
1286 tq->m_cnt[0] = cnt0 - tmp0;
1287 tq->m_cnt[1] = cnt1 - tmp1;
1288 selfptr->m_time = last + step;
1289 }
1290
1291 static inline
1292 void
scan_time_queues(struct thr_data * selfptr,NDB_TICKS now)1293 scan_time_queues(struct thr_data* selfptr, NDB_TICKS now)
1294 {
1295 if (selfptr->m_time != now)
1296 scan_time_queues_impl(selfptr, now);
1297 }
1298
1299 static
1300 inline
1301 Uint32*
get_free_slot(struct thr_repository * rep,struct thr_data * selfptr,Uint32 * idxptr)1302 get_free_slot(struct thr_repository* rep,
1303 struct thr_data* selfptr,
1304 Uint32* idxptr)
1305 {
1306 struct thr_tq * tq = &selfptr->m_tq;
1307 Uint32 idx = tq->m_next_free;
1308 retry:
1309 Uint32 buf = idx >> 8;
1310 Uint32 pos = idx & 0xFF;
1311
1312 if (idx != RNIL)
1313 {
1314 Uint32* page = * (tq->m_delayed_signals + buf);
1315 Uint32* ptr = page + (32 * pos);
1316 tq->m_next_free = * ptr;
1317 * idxptr = idx;
1318 return ptr;
1319 }
1320
1321 Uint32 thr_no = selfptr->m_thr_no;
1322 for (Uint32 i = 0; i<thr_tq::PAGES; i++)
1323 {
1324 if (tq->m_delayed_signals[i] == 0)
1325 {
1326 struct thr_job_buffer *jb = seize_buffer(rep, thr_no, false);
1327 Uint32 * page = reinterpret_cast<Uint32*>(jb);
1328 tq->m_delayed_signals[i] = page;
1329
1330 ndbout_c("saving %p at %p (%d)", page, tq->m_delayed_signals+i, i);
1331
1332 /**
1333 * Init page
1334 */
1335 for (Uint32 j = 0; j<255; j ++)
1336 {
1337 page[j * 32] = (i << 8) + (j + 1);
1338 }
1339 page[255*32] = RNIL;
1340 idx = (i << 8);
1341 goto retry;
1342 }
1343 }
1344 abort();
1345 return NULL;
1346 }
1347
1348 void
senddelay(Uint32 thr_no,const SignalHeader * s,Uint32 delay)1349 senddelay(Uint32 thr_no, const SignalHeader* s, Uint32 delay)
1350 {
1351 struct thr_repository* rep = &g_thr_repository;
1352 struct thr_data * selfptr = rep->m_thread + thr_no;
1353 assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
1354 unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
1355
1356 Uint32 max;
1357 Uint32 * cntptr;
1358 Uint32 * queueptr;
1359
1360 Uint32 alarm = selfptr->m_tq.m_current_time + delay;
1361 Uint32 nexttimer = selfptr->m_tq.m_next_timer;
1362 if (delay < 100)
1363 {
1364 cntptr = selfptr->m_tq.m_cnt + 0;
1365 queueptr = selfptr->m_tq.m_short_queue;
1366 max = thr_tq::SQ_SIZE;
1367 }
1368 else
1369 {
1370 cntptr = selfptr->m_tq.m_cnt + 1;
1371 queueptr = selfptr->m_tq.m_long_queue;
1372 max = thr_tq::LQ_SIZE;
1373 }
1374
1375 Uint32 idx;
1376 Uint32* ptr = get_free_slot(rep, selfptr, &idx);
1377 memcpy(ptr, s, 4*siglen);
1378
1379 if (0)
1380 ndbout_c("now: %d alarm: %d send %s from %s to %s delay: %d idx: %x %p",
1381 selfptr->m_tq.m_current_time,
1382 alarm,
1383 getSignalName(s->theVerId_signalNumber),
1384 getBlockName(refToBlock(s->theSendersBlockRef)),
1385 getBlockName(s->theReceiversBlockNumber),
1386 delay,
1387 idx, ptr);
1388
1389 Uint32 i;
1390 Uint32 cnt = *cntptr;
1391 Uint32 newentry = (idx << 16) | (alarm & 0xFFFF);
1392
1393 * cntptr = cnt + 1;
1394 selfptr->m_tq.m_next_timer = alarm < nexttimer ? alarm : nexttimer;
1395
1396 if (cnt == 0)
1397 {
1398 queueptr[0] = newentry;
1399 return;
1400 }
1401 else if (cnt < max)
1402 {
1403 for (i = 0; i<cnt; i++)
1404 {
1405 Uint32 save = queueptr[i];
1406 if ((save & 0xFFFF) > alarm)
1407 {
1408 memmove(queueptr+i+1, queueptr+i, 4*(cnt - i));
1409 queueptr[i] = newentry;
1410 return;
1411 }
1412 }
1413 assert(i == cnt);
1414 queueptr[i] = newentry;
1415 return;
1416 }
1417 else
1418 {
1419 abort();
1420 }
1421 }
1422
1423 /*
1424 * Flush the write state to the job queue, making any new signals available to
1425 * receiving threads.
1426 *
1427 * Two versions:
1428 * - The general version flush_write_state_other() which may flush to
1429 * any thread, and possibly signal any waiters.
1430 * - The special version flush_write_state_self() which should only be used
1431 * to flush messages to itself.
1432 *
1433 * Call to these functions are encapsulated through flush_write_state
1434 * which decides which of these functions to call.
1435 */
1436 static inline
1437 void
flush_write_state_self(thr_job_queue_head * q_head,thr_jb_write_state * w)1438 flush_write_state_self(thr_job_queue_head *q_head, thr_jb_write_state *w)
1439 {
1440 /*
1441 * Can simplify the flush_write_state when writing to myself:
1442 * Simply update write references wo/ mutex, memory barrier and signaling
1443 */
1444 w->m_write_buffer->m_len = w->m_write_pos;
1445 q_head->m_write_index = w->m_write_index;
1446 w->m_pending_signals_wakeup = 0;
1447 w->m_pending_signals = 0;
1448 }
1449
1450 static inline
1451 void
flush_write_state_other(thr_data * dstptr,thr_job_queue_head * q_head,thr_jb_write_state * w)1452 flush_write_state_other(thr_data *dstptr, thr_job_queue_head *q_head,
1453 thr_jb_write_state *w)
1454 {
1455 /*
1456 * Two write memory barriers here, as assigning m_len may make signal data
1457 * available to other threads, and assigning m_write_index may make new
1458 * buffers available.
1459 *
1460 * We could optimize this by only doing it as needed, and only doing it
1461 * once before setting all m_len, and once before setting all m_write_index.
1462 *
1463 * But wmb() is a no-op anyway in x86 ...
1464 */
1465 wmb();
1466 w->m_write_buffer->m_len = w->m_write_pos;
1467 wmb();
1468 q_head->m_write_index = w->m_write_index;
1469
1470 w->m_pending_signals_wakeup += w->m_pending_signals;
1471 w->m_pending_signals = 0;
1472
1473 if (w->m_pending_signals_wakeup >= MAX_SIGNALS_BEFORE_WAKEUP)
1474 {
1475 w->m_pending_signals_wakeup = 0;
1476 wakeup(&(dstptr->m_waiter));
1477 }
1478 }
1479
1480 static inline
1481 void
flush_write_state(const thr_data * selfptr,thr_data * dstptr,thr_job_queue_head * q_head,thr_jb_write_state * w)1482 flush_write_state(const thr_data *selfptr, thr_data *dstptr,
1483 thr_job_queue_head *q_head, thr_jb_write_state *w)
1484 {
1485 if (dstptr == selfptr)
1486 {
1487 flush_write_state_self(q_head, w);
1488 }
1489 else
1490 {
1491 flush_write_state_other(dstptr, q_head, w);
1492 }
1493 }
1494
1495
1496 static
1497 void
flush_jbb_write_state(thr_data * selfptr)1498 flush_jbb_write_state(thr_data *selfptr)
1499 {
1500 Uint32 thr_count = g_thr_repository.m_thread_count;
1501 Uint32 self = selfptr->m_thr_no;
1502
1503 thr_jb_write_state *w = selfptr->m_write_states;
1504 thr_data *thrptr = g_thr_repository.m_thread;
1505 for (Uint32 thr_no = 0; thr_no < thr_count; thr_no++, thrptr++, w++)
1506 {
1507 if (w->m_pending_signals || w->m_pending_signals_wakeup)
1508 {
1509 w->m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
1510 thr_job_queue_head *q_head = thrptr->m_in_queue_head + self;
1511 flush_write_state(selfptr, thrptr, q_head, w);
1512 }
1513 }
1514 }
1515
1516 /**
1517 * Transporter will receive 1024 signals (MAX_RECEIVED_SIGNALS)
1518 * before running check_job_buffers
1519 *
1520 * This function returns 0 if there is space to receive this amount of
1521 * signals
1522 * else 1
1523 */
1524 static int
check_job_buffers(struct thr_repository * rep)1525 check_job_buffers(struct thr_repository* rep)
1526 {
1527 const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
1528 unsigned thr_no = receiver_thread_no;
1529 const thr_data *thrptr = rep->m_thread;
1530 for (unsigned i = 0; i<num_threads; i++, thrptr++)
1531 {
1532 /**
1533 * NOTE: m_read_index is read wo/ lock (and updated by different thread)
1534 * but since the different thread can only consume
1535 * signals this means that the value returned from this
1536 * function is always conservative (i.e it can be better than
1537 * returned value, if read-index has moved but we didnt see it)
1538 */
1539 const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
1540 unsigned ri = q_head->m_read_index;
1541 unsigned wi = q_head->m_write_index;
1542 unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
1543 if (1 + minfree + busy >= thr_job_queue::SIZE)
1544 {
1545 return 1;
1546 }
1547 }
1548
1549 return 0;
1550 }
1551
1552 /**
1553 * Compute max signals that thr_no can execute wo/ risking
1554 * job-buffer-full
1555 *
1556 * see-also update_sched_config
1557 *
1558 *
1559 * 1) compute free-slots in ring-buffer from self to each thread in system
1560 * 2) pick smallest value
1561 * 3) compute how many signals this corresponds to
1562 * 4) compute how many signals self can execute if all were to be to
1563 * the thread with the fullest ring-buffer (i.e the worst case)
1564 *
1565 * Assumption: each signal may send *at most* 4 signals
1566 * - this assumption is made the same in ndbd and ndbmtd and is
1567 * mostly followed by block-code, although not it all places :-(
1568 */
1569 static
1570 Uint32
compute_max_signals_to_execute(Uint32 thr_no)1571 compute_max_signals_to_execute(Uint32 thr_no)
1572 {
1573 Uint32 minfree = thr_job_queue::SIZE;
1574 const struct thr_repository* rep = &g_thr_repository;
1575 const thr_data *thrptr = rep->m_thread;
1576
1577 for (unsigned i = 0; i<num_threads; i++, thrptr++)
1578 {
1579 /**
1580 * NOTE: m_read_index is read wo/ lock (and updated by different thread)
1581 * but since the different thread can only consume
1582 * signals this means that the value returned from this
1583 * function is always conservative (i.e it can be better than
1584 * returned value, if read-index has moved but we didnt see it)
1585 */
1586 const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
1587 unsigned ri = q_head->m_read_index;
1588 unsigned wi = q_head->m_write_index;
1589 unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
1590
1591 assert(free <= thr_job_queue::SIZE);
1592
1593 if (free < minfree)
1594 minfree = free;
1595 }
1596
1597 #define SAFETY 2
1598
1599 if (minfree >= (1 + SAFETY))
1600 {
1601 return (3 + (minfree - (1 + SAFETY)) * MIN_SIGNALS_PER_PAGE) / 4;
1602 }
1603 else
1604 {
1605 return 0;
1606 }
1607 }
1608
1609 //#define NDBMT_RAND_YIELD
1610 #ifdef NDBMT_RAND_YIELD
1611 static Uint32 g_rand_yield = 0;
1612 static
1613 void
rand_yield(Uint32 limit,void * ptr0,void * ptr1)1614 rand_yield(Uint32 limit, void* ptr0, void * ptr1)
1615 {
1616 return;
1617 UintPtr tmp = UintPtr(ptr0) + UintPtr(ptr1);
1618 Uint8* tmpptr = (Uint8*)&tmp;
1619 Uint32 sum = g_rand_yield;
1620 for (Uint32 i = 0; i<sizeof(tmp); i++)
1621 sum = 33 * sum + tmpptr[i];
1622
1623 if ((sum % 100) < limit)
1624 {
1625 g_rand_yield++;
1626 sched_yield();
1627 }
1628 }
1629 #else
rand_yield(Uint32 limit,void * ptr0,void * ptr1)1630 static inline void rand_yield(Uint32 limit, void* ptr0, void * ptr1) {}
1631 #endif
1632
1633
1634
1635 void
reportSendLen(NodeId nodeId,Uint32 count,Uint64 bytes)1636 trp_callback::reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes)
1637 {
1638 SignalT<3> signalT;
1639 Signal &signal = * new (&signalT) Signal(0);
1640 memset(&signal.header, 0, sizeof(signal.header));
1641
1642 signal.header.theLength = 3;
1643 signal.header.theSendersSignalId = 0;
1644 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
1645 signal.theData[0] = NDB_LE_SendBytesStatistic;
1646 signal.theData[1] = nodeId;
1647 signal.theData[2] = (Uint32)(bytes/count);
1648 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
1649 signal.header.theReceiversBlockNumber = CMVMI;
1650 sendlocal(g_thr_repository.m_send_buffers[nodeId].m_send_thread,
1651 &signalT.header, signalT.theData, NULL);
1652 }
1653
1654 /**
1655 * To lock during connect/disconnect, we take both the send lock for the node
1656 * (to protect performSend(), and the global receive lock (to protect
1657 * performReceive()). By having two locks, we avoid contention between the
1658 * common send and receive operations.
1659 *
1660 * We can have contention between connect/disconnect of one transporter and
1661 * receive for the others. But the transporter code should try to keep this
1662 * lock only briefly, ie. only to set state to DISCONNECTING / socket fd to
1663 * NDB_INVALID_SOCKET, not for the actual close() syscall.
1664 */
1665 void
lock_transporter(NodeId node)1666 trp_callback::lock_transporter(NodeId node)
1667 {
1668 struct thr_repository* rep = &g_thr_repository;
1669 /**
1670 * Note: take the send lock _first_, so that we will not hold the receive
1671 * lock while blocking on the send lock.
1672 *
1673 * The reverse case, blocking send lock for one transporter while waiting
1674 * for receive lock, is not a problem, as the transporter being blocked is
1675 * in any case disconnecting/connecting at this point in time, and sends are
1676 * non-waiting (so we will not block sending on other transporters).
1677 */
1678 lock(&rep->m_send_buffers[node].m_send_lock);
1679 lock(&rep->m_receive_lock);
1680 }
1681
1682 void
unlock_transporter(NodeId node)1683 trp_callback::unlock_transporter(NodeId node)
1684 {
1685 struct thr_repository* rep = &g_thr_repository;
1686 unlock(&rep->m_receive_lock);
1687 unlock(&rep->m_send_buffers[node].m_send_lock);
1688 }
1689
1690 int
checkJobBuffer()1691 trp_callback::checkJobBuffer()
1692 {
1693 struct thr_repository* rep = &g_thr_repository;
1694 if (unlikely(check_job_buffers(rep)))
1695 {
1696 do
1697 {
1698 /**
1699 * theoretically (or when we do single threaded by using ndbmtd with
1700 * all in same thread) we should execute signals here...to
1701 * prevent dead-lock, but...with current ndbmtd only CMVMI runs in
1702 * this thread, and other thread is waiting for CMVMI
1703 * except for QMGR open/close connection, but that is not
1704 * (i think) sufficient to create a deadlock
1705 */
1706
1707 /** FIXME:
1708 * On a CMT chip where #CPU >= #NDB-threads sched_yield() is
1709 * effectively a NOOP as there will normally be an idle CPU available
1710 * to immediately resume thread execution.
1711 * On a Niagara chip this may severely impact performance as the CPUs
1712 * are virtualized by timemultiplexing the physical core.
1713 * The thread should really be 'parked' on
1714 * a condition to free its execution resources.
1715 */
1716 // usleep(a-few-usec); /* A micro-sleep would likely have been better... */
1717 #if defined HAVE_SCHED_YIELD
1718 sched_yield();
1719 #elif defined _WIN32
1720 SwitchToThread();
1721 #else
1722 NdbSleep_MilliSleep(0);
1723 #endif
1724
1725 } while (check_job_buffers(rep));
1726 }
1727
1728 return 0;
1729 }
1730
1731 /**
1732 * Link all send-buffer-pages into *one*
1733 * single linked list of buffers
1734 *
1735 * TODO: This is not completly fair,
1736 * it would be better to get one entry from each thr_send_queue
1737 * per thread instead (until empty)
1738 */
1739 static
1740 Uint32
link_thread_send_buffers(thr_repository::send_buffer * sb,Uint32 node)1741 link_thread_send_buffers(thr_repository::send_buffer * sb, Uint32 node)
1742 {
1743 Uint32 ri[MAX_THREADS];
1744 Uint32 wi[MAX_THREADS];
1745 thr_send_queue * src = g_thr_repository.m_thread_send_buffers[node];
1746 for (unsigned thr = 0; thr < num_threads; thr++)
1747 {
1748 ri[thr] = sb->m_read_index[thr];
1749 wi[thr] = src[thr].m_write_index;
1750 }
1751
1752 Uint64 sentinel[thr_send_page::HEADER_SIZE >> 1];
1753 thr_send_page* sentinel_page = new (&sentinel[0]) thr_send_page;
1754 sentinel_page->m_next = 0;
1755
1756 struct thr_send_buffer tmp;
1757 tmp.m_first_page = sentinel_page;
1758 tmp.m_last_page = sentinel_page;
1759
1760 Uint32 bytes = 0;
1761 for (unsigned thr = 0; thr < num_threads; thr++, src++)
1762 {
1763 Uint32 r = ri[thr];
1764 Uint32 w = wi[thr];
1765 if (r != w)
1766 {
1767 rmb();
1768 while (r != w)
1769 {
1770 thr_send_page * p = src->m_buffers[r];
1771 assert(p->m_start == 0);
1772 bytes += p->m_bytes;
1773 tmp.m_last_page->m_next = p;
1774 while (p->m_next != 0)
1775 {
1776 p = p->m_next;
1777 assert(p->m_start == 0);
1778 bytes += p->m_bytes;
1779 }
1780 tmp.m_last_page = p;
1781 assert(tmp.m_last_page != 0);
1782 r = (r + 1) % thr_send_queue::SIZE;
1783 }
1784 sb->m_read_index[thr] = r;
1785 }
1786 }
1787
1788 if (bytes)
1789 {
1790 if (sb->m_bytes)
1791 {
1792 assert(sb->m_buffer.m_first_page != 0);
1793 assert(sb->m_buffer.m_last_page != 0);
1794 sb->m_buffer.m_last_page->m_next = tmp.m_first_page->m_next;
1795 sb->m_buffer.m_last_page = tmp.m_last_page;
1796 }
1797 else
1798 {
1799 assert(sb->m_buffer.m_first_page == 0);
1800 assert(sb->m_buffer.m_last_page == 0);
1801 sb->m_buffer.m_first_page = tmp.m_first_page->m_next;
1802 sb->m_buffer.m_last_page = tmp.m_last_page;
1803 }
1804 sb->m_bytes += bytes;
1805 }
1806
1807 return sb->m_bytes;
1808 }
1809
1810 Uint32
get_bytes_to_send_iovec(NodeId node,struct iovec * dst,Uint32 max)1811 trp_callback::get_bytes_to_send_iovec(NodeId node,
1812 struct iovec *dst, Uint32 max)
1813 {
1814 thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers + node;
1815
1816 Uint32 bytes = link_thread_send_buffers(sb, node);
1817 if (max == 0 || bytes == 0)
1818 return 0;
1819
1820 /**
1821 * Process linked-list and put into iovecs
1822 * TODO: Here we would also pack stuff to get better utilization
1823 */
1824 Uint32 tot = 0;
1825 Uint32 pos = 0;
1826 thr_send_page * p = sb->m_buffer.m_first_page;
1827 do {
1828 dst[pos].iov_len = p->m_bytes;
1829 dst[pos].iov_base = p->m_data + p->m_start;
1830 assert(p->m_start + p->m_bytes <= p->max_bytes());
1831 tot += p->m_bytes;
1832 pos++;
1833 max--;
1834 p = p->m_next;
1835 } while (max && p != 0);
1836
1837 return pos;
1838 }
1839
1840 static
1841 void
release_list(thread_local_pool<thr_send_page> * pool,thr_send_page * head,thr_send_page * tail)1842 release_list(thread_local_pool<thr_send_page>* pool,
1843 thr_send_page* head, thr_send_page * tail)
1844 {
1845 while (head != tail)
1846 {
1847 thr_send_page * tmp = head;
1848 head = head->m_next;
1849 pool->release_local(tmp);
1850 }
1851 pool->release_local(tail);
1852 }
1853
1854
1855 static
1856 Uint32
bytes_sent(thread_local_pool<thr_send_page> * pool,thr_repository::send_buffer * sb,Uint32 bytes)1857 bytes_sent(thread_local_pool<thr_send_page>* pool,
1858 thr_repository::send_buffer* sb, Uint32 bytes)
1859 {
1860 assert(bytes);
1861
1862 Uint32 remain = bytes;
1863 thr_send_page * prev = 0;
1864 thr_send_page * curr = sb->m_buffer.m_first_page;
1865
1866 assert(sb->m_bytes >= bytes);
1867 while (remain && remain >= curr->m_bytes)
1868 {
1869 remain -= curr->m_bytes;
1870 prev = curr;
1871 curr = curr->m_next;
1872 }
1873
1874 Uint32 total_bytes = sb->m_bytes;
1875 if (total_bytes == bytes)
1876 {
1877 /**
1878 * Every thing was released
1879 */
1880 release_list(pool, sb->m_buffer.m_first_page, sb->m_buffer.m_last_page);
1881 sb->m_buffer.m_first_page = 0;
1882 sb->m_buffer.m_last_page = 0;
1883 sb->m_bytes = 0;
1884 return 0;
1885 }
1886 else if (remain)
1887 {
1888 /**
1889 * Half a page was released
1890 */
1891 curr->m_start += remain;
1892 assert(curr->m_bytes > remain);
1893 curr->m_bytes -= remain;
1894 if (prev)
1895 {
1896 release_list(pool, sb->m_buffer.m_first_page, prev);
1897 }
1898 }
1899 else
1900 {
1901 /**
1902 * X full page(s) was released
1903 */
1904 if (prev)
1905 {
1906 release_list(pool, sb->m_buffer.m_first_page, prev);
1907 }
1908 else
1909 {
1910 pool->release_local(sb->m_buffer.m_first_page);
1911 }
1912 }
1913
1914 sb->m_buffer.m_first_page = curr;
1915 assert(sb->m_bytes > bytes);
1916 sb->m_bytes -= bytes;
1917 return sb->m_bytes;
1918 }
1919
1920 Uint32
bytes_sent(NodeId node,Uint32 bytes)1921 trp_callback::bytes_sent(NodeId node, Uint32 bytes)
1922 {
1923 thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
1924 Uint32 thr_no = sb->m_send_thread;
1925 assert(thr_no != NO_SEND_THREAD);
1926 return ::bytes_sent(&g_thr_repository.m_thread[thr_no].m_send_buffer_pool,
1927 sb, bytes);
1928 }
1929
1930 bool
has_data_to_send(NodeId node)1931 trp_callback::has_data_to_send(NodeId node)
1932 {
1933 return true;
1934
1935 thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers + node;
1936 Uint32 thr_no = sb->m_send_thread;
1937 assert(thr_no != NO_SEND_THREAD);
1938 assert((sb->m_bytes > 0) == (sb->m_buffer.m_first_page != 0));
1939 if (sb->m_bytes > 0 || sb->m_force_send)
1940 return true;
1941
1942 thr_send_queue * dst = g_thr_repository.m_thread_send_buffers[node]+thr_no;
1943
1944 return sb->m_read_index[thr_no] != dst->m_write_index;
1945 }
1946
1947 void
reset_send_buffer(NodeId node,bool should_be_empty)1948 trp_callback::reset_send_buffer(NodeId node, bool should_be_empty)
1949 {
1950 struct thr_repository *rep = &g_thr_repository;
1951 thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
1952 struct iovec v[32];
1953
1954 thread_local_pool<thr_send_page> pool(&rep->m_sb_pool, 0);
1955
1956 lock(&sb->m_send_lock);
1957
1958 for (;;)
1959 {
1960 Uint32 count = get_bytes_to_send_iovec(node, v, sizeof(v)/sizeof(v[0]));
1961 if (count == 0)
1962 break;
1963 assert(!should_be_empty); // Got data when it should be empty
1964 int bytes = 0;
1965 for (Uint32 i = 0; i < count; i++)
1966 bytes += v[i].iov_len;
1967
1968 ::bytes_sent(&pool, sb, bytes);
1969 }
1970
1971 unlock(&sb->m_send_lock);
1972
1973 pool.release_all(rep->m_mm, RG_TRANSPORTER_BUFFERS);
1974 }
1975
1976 static inline
1977 void
register_pending_send(thr_data * selfptr,Uint32 nodeId)1978 register_pending_send(thr_data *selfptr, Uint32 nodeId)
1979 {
1980 /* Mark that this node has pending send data. */
1981 if (!selfptr->m_pending_send_mask.get(nodeId))
1982 {
1983 selfptr->m_pending_send_mask.set(nodeId, 1);
1984 Uint32 i = selfptr->m_pending_send_count;
1985 selfptr->m_pending_send_nodes[i] = nodeId;
1986 selfptr->m_pending_send_count = i + 1;
1987 }
1988 }
1989
1990 /**
1991 * publish thread-locally prepared send-buffer
1992 */
1993 static
1994 void
flush_send_buffer(thr_data * selfptr,Uint32 node)1995 flush_send_buffer(thr_data* selfptr, Uint32 node)
1996 {
1997 Uint32 thr_no = selfptr->m_thr_no;
1998 thr_send_buffer * src = selfptr->m_send_buffers + node;
1999 thr_repository* rep = &g_thr_repository;
2000
2001 if (src->m_first_page == 0)
2002 {
2003 return;
2004 }
2005 assert(src->m_last_page != 0);
2006
2007 thr_send_queue * dst = rep->m_thread_send_buffers[node]+thr_no;
2008 thr_repository::send_buffer* sb = rep->m_send_buffers+node;
2009
2010 Uint32 wi = dst->m_write_index;
2011 Uint32 next = (wi + 1) % thr_send_queue::SIZE;
2012 Uint32 ri = sb->m_read_index[thr_no];
2013
2014 if (unlikely(next == ri))
2015 {
2016 lock(&sb->m_send_lock);
2017 link_thread_send_buffers(sb, node);
2018 unlock(&sb->m_send_lock);
2019 }
2020
2021 dst->m_buffers[wi] = src->m_first_page;
2022 wmb();
2023 dst->m_write_index = next;
2024
2025 src->m_first_page = 0;
2026 src->m_last_page = 0;
2027 }
2028
2029 /**
2030 * This is used in case send buffer gets full, to force an emergency send,
2031 * hopefully freeing up some buffer space for the next signal.
2032 */
2033 bool
forceSend(NodeId nodeId)2034 mt_send_handle::forceSend(NodeId nodeId)
2035 {
2036 struct thr_repository *rep = &g_thr_repository;
2037 struct thr_data *selfptr = m_selfptr;
2038 struct thr_repository::send_buffer * sb = rep->m_send_buffers + nodeId;
2039
2040 do
2041 {
2042 sb->m_force_send = 0;
2043 lock(&sb->m_send_lock);
2044 sb->m_send_thread = selfptr->m_thr_no;
2045 globalTransporterRegistry.performSend(nodeId);
2046 sb->m_send_thread = NO_SEND_THREAD;
2047 unlock(&sb->m_send_lock);
2048 } while (sb->m_force_send);
2049
2050 selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
2051
2052 return true;
2053 }
2054
2055 /**
2056 * try sending data
2057 */
2058 static
2059 void
try_send(thr_data * selfptr,Uint32 node)2060 try_send(thr_data * selfptr, Uint32 node)
2061 {
2062 struct thr_repository *rep = &g_thr_repository;
2063 struct thr_repository::send_buffer * sb = rep->m_send_buffers + node;
2064
2065 do
2066 {
2067 if (trylock(&sb->m_send_lock) != 0)
2068 {
2069 return;
2070 }
2071
2072 sb->m_force_send = 0;
2073 mb();
2074
2075 sb->m_send_thread = selfptr->m_thr_no;
2076 globalTransporterRegistry.performSend(node);
2077 sb->m_send_thread = NO_SEND_THREAD;
2078 unlock(&sb->m_send_lock);
2079 } while (sb->m_force_send);
2080
2081 selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
2082 }
2083
2084 /**
2085 * Flush send buffers and append them to dst. nodes send queue
2086 *
2087 * Flushed buffer contents are piggybacked when another thread
2088 * do_send() to the same dst. node. This makes it possible to have
2089 * more data included in each message, and thereby reduces total
2090 * #messages handled by the OS which really impacts performance!
2091 */
2092 static
2093 void
do_flush(struct thr_data * selfptr)2094 do_flush(struct thr_data* selfptr)
2095 {
2096 Uint32 i;
2097 Uint32 count = selfptr->m_pending_send_count;
2098 Uint8 *nodes = selfptr->m_pending_send_nodes;
2099
2100 for (i = 0; i < count; i++)
2101 {
2102 flush_send_buffer(selfptr, nodes[i]);
2103 }
2104 }
2105
2106 /**
2107 * Send any pending data to remote nodes.
2108 *
2109 * If MUST_SEND is false, will only try to lock the send lock, but if it would
2110 * block, that node is skipped, to be tried again next time round.
2111 *
2112 * If MUST_SEND is true, will always take the lock, waiting on it if needed.
2113 *
2114 * The list of pending nodes to send to is thread-local, but the per-node send
2115 * buffer is shared by all threads. Thus we might skip a node for which
2116 * another thread has pending send data, and we might send pending data also
2117 * for another thread without clearing the node from the pending list of that
2118 * other thread (but we will never loose signals due to this).
2119 */
2120 static
2121 Uint32
do_send(struct thr_data * selfptr,bool must_send)2122 do_send(struct thr_data* selfptr, bool must_send)
2123 {
2124 Uint32 i;
2125 Uint32 count = selfptr->m_pending_send_count;
2126 Uint8 *nodes = selfptr->m_pending_send_nodes;
2127 struct thr_repository* rep = &g_thr_repository;
2128
2129 if (count == 0)
2130 {
2131 return 0; // send-buffers empty
2132 }
2133
2134 /* Clear the pending list. */
2135 selfptr->m_pending_send_mask.clear();
2136 selfptr->m_pending_send_count = 0;
2137
2138 for (i = 0; i < count; i++)
2139 {
2140 Uint32 node = nodes[i];
2141 selfptr->m_watchdog_counter = 6;
2142
2143 flush_send_buffer(selfptr, node);
2144
2145 thr_repository::send_buffer * sb = rep->m_send_buffers + node;
2146
2147 /**
2148 * If we must send now, set the force_send flag.
2149 *
2150 * This will ensure that if we do not get the send lock, the thread
2151 * holding the lock will try sending again for us when it has released
2152 * the lock.
2153 *
2154 * The lock/unlock pair works as a memory barrier to ensure that the
2155 * flag update is flushed to the other thread.
2156 */
2157 if (must_send)
2158 {
2159 sb->m_force_send = 1;
2160 }
2161
2162 do
2163 {
2164 if (trylock(&sb->m_send_lock) != 0)
2165 {
2166 if (!must_send)
2167 {
2168 /**
2169 * Not doing this node now, re-add to pending list.
2170 *
2171 * As we only add from the start of an empty list, we are safe from
2172 * overwriting the list while we are iterating over it.
2173 */
2174 register_pending_send(selfptr, node);
2175 }
2176 else
2177 {
2178 /* Other thread will send for us as we set m_force_send. */
2179 }
2180 break;
2181 }
2182
2183 /**
2184 * Now clear the flag, and start sending all data available to this node.
2185 *
2186 * Put a memory barrier here, so that if another thread tries to grab
2187 * the send lock but fails due to us holding it here, we either
2188 * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or
2189 * 2) We clear here the flag just set by the other thread, but then we
2190 * will (thanks to mb()) be able to see and send all of the data already
2191 * in the first send iteration.
2192 */
2193 sb->m_force_send = 0;
2194 mb();
2195
2196 /**
2197 * Set m_send_thr so that our transporter callback can know which thread
2198 * holds the send lock for this remote node.
2199 */
2200 sb->m_send_thread = selfptr->m_thr_no;
2201 int res = globalTransporterRegistry.performSend(node);
2202 sb->m_send_thread = NO_SEND_THREAD;
2203 unlock(&sb->m_send_lock);
2204 if (res)
2205 {
2206 register_pending_send(selfptr, node);
2207 }
2208 } while (sb->m_force_send);
2209 }
2210
2211 selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
2212
2213 return selfptr->m_pending_send_count;
2214 }
2215
2216 Uint32 *
getWritePtr(NodeId node,Uint32 len,Uint32 prio,Uint32 max)2217 mt_send_handle::getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max)
2218 {
2219 struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
2220 thr_send_page * p = b->m_last_page;
2221 if ((p != 0) && (p->m_bytes + p->m_start + len <= thr_send_page::max_bytes()))
2222 {
2223 return (Uint32*)(p->m_data + p->m_start + p->m_bytes);
2224 }
2225 else if (p != 0)
2226 {
2227 // TODO: maybe dont always flush on page-boundary ???
2228 flush_send_buffer(m_selfptr, node);
2229 try_send(m_selfptr, node);
2230 }
2231
2232 if ((p = m_selfptr->m_send_buffer_pool.seize(g_thr_repository.m_mm,
2233 RG_TRANSPORTER_BUFFERS)) != 0)
2234 {
2235 p->m_bytes = 0;
2236 p->m_start = 0;
2237 p->m_next = 0;
2238 b->m_first_page = b->m_last_page = p;
2239 return (Uint32*)p->m_data;
2240 }
2241 return 0;
2242 }
2243
2244 Uint32
updateWritePtr(NodeId node,Uint32 lenBytes,Uint32 prio)2245 mt_send_handle::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
2246 {
2247 struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
2248 thr_send_page * p = b->m_last_page;
2249 p->m_bytes += lenBytes;
2250 return p->m_bytes;
2251 }
2252
2253 /*
2254 * Insert a signal in a job queue.
2255 *
2256 * The signal is not visible to consumers yet after return from this function,
2257 * only recorded in the thr_jb_write_state. It is necessary to first call
2258 * flush_write_state() for this.
2259 *
2260 * The new_buffer is a job buffer to use if the current one gets full. If used,
2261 * we return true, indicating that the caller should allocate a new one for
2262 * the next call. (This is done to allow to insert under lock, but do the
2263 * allocation outside the lock).
2264 */
2265 static inline
2266 bool
insert_signal(thr_job_queue * q,thr_jb_write_state * w,Uint32 prioa,const SignalHeader * sh,const Uint32 * data,const Uint32 secPtr[3],thr_job_buffer * new_buffer)2267 insert_signal(thr_job_queue *q, thr_jb_write_state *w, Uint32 prioa,
2268 const SignalHeader* sh, const Uint32 *data,
2269 const Uint32 secPtr[3], thr_job_buffer *new_buffer)
2270 {
2271 Uint32 write_pos = w->m_write_pos;
2272 Uint32 datalen = sh->theLength;
2273 assert(w->m_write_buffer == q->m_buffers[w->m_write_index]);
2274 memcpy(w->m_write_buffer->m_data + write_pos, sh, sizeof(*sh));
2275 write_pos += (sizeof(*sh) >> 2);
2276 memcpy(w->m_write_buffer->m_data + write_pos, data, 4*datalen);
2277 write_pos += datalen;
2278 const Uint32 *p= secPtr;
2279 for (Uint32 i = 0; i < sh->m_noOfSections; i++)
2280 w->m_write_buffer->m_data[write_pos++] = *p++;
2281 w->m_pending_signals++;
2282
2283 #if SIZEOF_CHARP == 8
2284 /* Align to 8-byte boundary, to ensure aligned copies. */
2285 write_pos= (write_pos+1) & ~((Uint32)1);
2286 #endif
2287
2288 /*
2289 * We make sure that there is always room for at least one signal in the
2290 * current buffer in the queue, so one insert is always possible without
2291 * adding a new buffer.
2292 */
2293 if (likely(write_pos + 32 <= thr_job_buffer::SIZE))
2294 {
2295 w->m_write_pos = write_pos;
2296 return false;
2297 }
2298 else
2299 {
2300 /*
2301 * Need a write memory barrier here, as this might make signal data visible
2302 * to other threads.
2303 *
2304 * ToDo: We actually only need the wmb() here if we already make this
2305 * buffer visible to the other thread. So we might optimize it a bit. But
2306 * wmb() is a no-op on x86 anyway...
2307 */
2308 wmb();
2309 w->m_write_buffer->m_len = write_pos;
2310 Uint32 write_index = (w->m_write_index + 1) % thr_job_queue::SIZE;
2311
2312 /**
2313 * Full job buffer is fatal.
2314 *
2315 * ToDo: should we wait for it to become non-full? There is no guarantee
2316 * that this will actually happen...
2317 *
2318 * Or alternatively, ndbrequire() ?
2319 */
2320 if (unlikely(write_index == q->m_head->m_read_index))
2321 {
2322 job_buffer_full(0);
2323 }
2324 new_buffer->m_len = 0;
2325 new_buffer->m_prioa = prioa;
2326 q->m_buffers[write_index] = new_buffer;
2327 w->m_write_index = write_index;
2328 w->m_write_pos = 0;
2329 w->m_write_buffer = new_buffer;
2330 return true; // Buffer new_buffer used
2331 }
2332
2333 return false; // Buffer new_buffer not used
2334 }
2335
2336 static
2337 void
read_jbb_state(thr_data * selfptr,Uint32 count)2338 read_jbb_state(thr_data *selfptr, Uint32 count)
2339 {
2340
2341 thr_jb_read_state *r = selfptr->m_read_states;
2342 const thr_job_queue *q = selfptr->m_in_queue;
2343 for (Uint32 i = 0; i < count; i++,r++,q++)
2344 {
2345 Uint32 read_index = r->m_read_index;
2346
2347 /**
2348 * Optimization: Only reload when possibly empty.
2349 * Avoid cache reload of shared thr_job_queue_head
2350 */
2351 if (r->m_write_index == read_index)
2352 {
2353 r->m_write_index = q->m_head->m_write_index;
2354 read_barrier_depends();
2355 r->m_read_end = q->m_buffers[read_index]->m_len;
2356 }
2357 }
2358 }
2359
2360 static
2361 bool
read_jba_state(thr_data * selfptr)2362 read_jba_state(thr_data *selfptr)
2363 {
2364 thr_jb_read_state *r = &(selfptr->m_jba_read_state);
2365 r->m_write_index = selfptr->m_jba_head.m_write_index;
2366 read_barrier_depends();
2367 r->m_read_end = selfptr->m_jba.m_buffers[r->m_read_index]->m_len;
2368 return r->is_empty();
2369 }
2370
2371 /* Check all job queues, return true only if all are empty. */
2372 static bool
check_queues_empty(thr_data * selfptr)2373 check_queues_empty(thr_data *selfptr)
2374 {
2375 Uint32 thr_count = g_thr_repository.m_thread_count;
2376 bool empty = read_jba_state(selfptr);
2377 if (!empty)
2378 return false;
2379
2380 read_jbb_state(selfptr, thr_count);
2381 const thr_jb_read_state *r = selfptr->m_read_states;
2382 for (Uint32 i = 0; i < thr_count; i++,r++)
2383 {
2384 if (!r->is_empty())
2385 return false;
2386 }
2387 return true;
2388 }
2389
2390 /*
2391 * Execute at most MAX_SIGNALS signals from one job queue, updating local read
2392 * state as appropriate.
2393 *
2394 * Returns number of signals actually executed.
2395 */
2396 static
2397 Uint32
execute_signals(thr_data * selfptr,thr_job_queue * q,thr_jb_read_state * r,Signal * sig,Uint32 max_signals,Uint32 * signalIdCounter)2398 execute_signals(thr_data *selfptr, thr_job_queue *q, thr_jb_read_state *r,
2399 Signal *sig, Uint32 max_signals, Uint32 *signalIdCounter)
2400 {
2401 Uint32 num_signals;
2402 Uint32 read_index = r->m_read_index;
2403 Uint32 write_index = r->m_write_index;
2404 Uint32 read_pos = r->m_read_pos;
2405 Uint32 read_end = r->m_read_end;
2406 Uint32 *watchDogCounter = &selfptr->m_watchdog_counter;
2407
2408 if (read_index == write_index && read_pos >= read_end)
2409 return 0; // empty read_state
2410
2411 thr_job_buffer *read_buffer = r->m_read_buffer;
2412
2413 for (num_signals = 0; num_signals < max_signals; num_signals++)
2414 {
2415 while (read_pos >= read_end)
2416 {
2417 if (read_index == write_index)
2418 {
2419 /* No more available now. */
2420 return num_signals;
2421 }
2422 else
2423 {
2424 /* Move to next buffer. */
2425 read_index = (read_index + 1) % thr_job_queue::SIZE;
2426 release_buffer(&g_thr_repository, selfptr->m_thr_no, read_buffer);
2427 read_buffer = q->m_buffers[read_index];
2428 read_pos = 0;
2429 read_end = read_buffer->m_len;
2430 /* Update thread-local read state. */
2431 r->m_read_index = q->m_head->m_read_index = read_index;
2432 r->m_read_buffer = read_buffer;
2433 r->m_read_pos = read_pos;
2434 r->m_read_end = read_end;
2435 }
2436 }
2437
2438 /*
2439 * These pre-fetching were found using OProfile to reduce cache misses.
2440 * (Though on Intel Core 2, they do not give much speedup, as apparently
2441 * the hardware prefetcher is already doing a fairly good job).
2442 */
2443 NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 16);
2444 NDB_PREFETCH_WRITE ((Uint32 *)&sig->header + 16);
2445
2446 /* Now execute the signal. */
2447 SignalHeader* s =
2448 reinterpret_cast<SignalHeader*>(read_buffer->m_data + read_pos);
2449 Uint32 seccnt = s->m_noOfSections;
2450 Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
2451 if(siglen>16)
2452 {
2453 NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 32);
2454 }
2455 Uint32 bno = blockToMain(s->theReceiversBlockNumber);
2456 Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
2457 SimulatedBlock* block = globalData.mt_getBlock(bno, ino);
2458 assert(block != 0);
2459
2460 Uint32 gsn = s->theVerId_signalNumber;
2461 *watchDogCounter = 1;
2462 /* Must update original buffer so signal dump will see it. */
2463 s->theSignalId = (*signalIdCounter)++;
2464 memcpy(&sig->header, s, 4*siglen);
2465 sig->m_sectionPtrI[0] = read_buffer->m_data[read_pos + siglen + 0];
2466 sig->m_sectionPtrI[1] = read_buffer->m_data[read_pos + siglen + 1];
2467 sig->m_sectionPtrI[2] = read_buffer->m_data[read_pos + siglen + 2];
2468
2469 read_pos += siglen + seccnt;
2470 #if SIZEOF_CHARP == 8
2471 /* Handle 8-byte alignment. */
2472 read_pos = (read_pos + 1) & ~((Uint32)1);
2473 #endif
2474
2475 /* Update just before execute so signal dump can know how far we are. */
2476 r->m_read_pos = read_pos;
2477
2478 #ifdef VM_TRACE
2479 if (globalData.testOn)
2480 { //wl4391_todo segments
2481 SegmentedSectionPtr ptr[3];
2482 ptr[0].i = sig->m_sectionPtrI[0];
2483 ptr[1].i = sig->m_sectionPtrI[1];
2484 ptr[2].i = sig->m_sectionPtrI[2];
2485 ::getSections(seccnt, ptr);
2486 globalSignalLoggers.executeSignal(*s,
2487 0,
2488 &sig->theData[0],
2489 globalData.ownId,
2490 ptr, seccnt);
2491 }
2492 #endif
2493
2494 block->executeFunction(gsn, sig);
2495 }
2496
2497 return num_signals;
2498 }
2499
2500 static
2501 Uint32
run_job_buffers(thr_data * selfptr,Signal * sig,Uint32 * signalIdCounter)2502 run_job_buffers(thr_data *selfptr, Signal *sig, Uint32 *signalIdCounter)
2503 {
2504 Uint32 thr_count = g_thr_repository.m_thread_count;
2505 Uint32 signal_count = 0;
2506 Uint32 perjb = selfptr->m_max_signals_per_jb;
2507
2508 read_jbb_state(selfptr, thr_count);
2509 /*
2510 * A load memory barrier to ensure that we see any prio A signal sent later
2511 * than loaded prio B signals.
2512 */
2513 rmb();
2514
2515 thr_job_queue *queue = selfptr->m_in_queue;
2516 thr_jb_read_state *read_state = selfptr->m_read_states;
2517 for (Uint32 send_thr_no = 0; send_thr_no < thr_count;
2518 send_thr_no++,queue++,read_state++)
2519 {
2520 /* Read the prio A state often, to avoid starvation of prio A. */
2521 bool jba_empty = read_jba_state(selfptr);
2522 if (!jba_empty)
2523 {
2524 static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
2525 signal_count += execute_signals(selfptr, &(selfptr->m_jba),
2526 &(selfptr->m_jba_read_state), sig,
2527 max_prioA, signalIdCounter);
2528 }
2529
2530 /* Now execute prio B signals from one thread. */
2531 signal_count += execute_signals(selfptr, queue, read_state,
2532 sig, perjb, signalIdCounter);
2533 }
2534
2535 return signal_count;
2536 }
2537
2538 struct thr_map_entry {
2539 enum { NULL_THR_NO = 0xFF };
2540 Uint8 thr_no;
thr_map_entrythr_map_entry2541 thr_map_entry() : thr_no(NULL_THR_NO) {}
2542 };
2543
2544 static struct thr_map_entry thr_map[NO_OF_BLOCKS][MAX_BLOCK_INSTANCES];
2545
2546 static inline Uint32
block2ThreadId(Uint32 block,Uint32 instance)2547 block2ThreadId(Uint32 block, Uint32 instance)
2548 {
2549 assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
2550 Uint32 index = block - MIN_BLOCK_NO;
2551 assert(instance < MAX_BLOCK_INSTANCES);
2552 const thr_map_entry& entry = thr_map[index][instance];
2553 assert(entry.thr_no < num_threads);
2554 return entry.thr_no;
2555 }
2556
2557 void
add_thr_map(Uint32 main,Uint32 instance,Uint32 thr_no)2558 add_thr_map(Uint32 main, Uint32 instance, Uint32 thr_no)
2559 {
2560 assert(main == blockToMain(main));
2561 Uint32 index = main - MIN_BLOCK_NO;
2562 assert(index < NO_OF_BLOCKS);
2563 assert(instance < MAX_BLOCK_INSTANCES);
2564
2565 SimulatedBlock* b = globalData.getBlock(main, instance);
2566 require(b != 0);
2567
2568 /* Block number including instance. */
2569 Uint32 block = numberToBlock(main, instance);
2570
2571 require(thr_no < num_threads);
2572 struct thr_repository* rep = &g_thr_repository;
2573 thr_data* thr_ptr = rep->m_thread + thr_no;
2574
2575 /* Add to list. */
2576 {
2577 Uint32 i;
2578 for (i = 0; i < thr_ptr->m_instance_count; i++)
2579 require(thr_ptr->m_instance_list[i] != block);
2580 }
2581 require(thr_ptr->m_instance_count < MAX_INSTANCES_PER_THREAD);
2582 thr_ptr->m_instance_list[thr_ptr->m_instance_count++] = block;
2583
2584 SimulatedBlock::ThreadContext ctx;
2585 ctx.threadId = thr_no;
2586 ctx.jamBuffer = &thr_ptr->m_jam;
2587 ctx.watchDogCounter = &thr_ptr->m_watchdog_counter;
2588 ctx.sectionPoolCache = &thr_ptr->m_sectionPoolCache;
2589 b->assignToThread(ctx);
2590
2591 /* Create entry mapping block to thread. */
2592 thr_map_entry& entry = thr_map[index][instance];
2593 require(entry.thr_no == thr_map_entry::NULL_THR_NO);
2594 entry.thr_no = thr_no;
2595 }
2596
2597 /* Static assignment of main instances (before first signal). */
2598 void
add_main_thr_map()2599 add_main_thr_map()
2600 {
2601 /* Keep mt-classic assignments in MT LQH. */
2602 const Uint32 thr_GLOBAL = 0;
2603 const Uint32 thr_LOCAL = 1;
2604 const Uint32 thr_RECEIVER = receiver_thread_no;
2605
2606 add_thr_map(BACKUP, 0, thr_LOCAL);
2607 add_thr_map(DBTC, 0, thr_GLOBAL);
2608 add_thr_map(DBDIH, 0, thr_GLOBAL);
2609 add_thr_map(DBLQH, 0, thr_LOCAL);
2610 add_thr_map(DBACC, 0, thr_LOCAL);
2611 add_thr_map(DBTUP, 0, thr_LOCAL);
2612 add_thr_map(DBDICT, 0, thr_GLOBAL);
2613 add_thr_map(NDBCNTR, 0, thr_GLOBAL);
2614 add_thr_map(QMGR, 0, thr_GLOBAL);
2615 add_thr_map(NDBFS, 0, thr_GLOBAL);
2616 add_thr_map(CMVMI, 0, thr_RECEIVER);
2617 add_thr_map(TRIX, 0, thr_GLOBAL);
2618 add_thr_map(DBUTIL, 0, thr_GLOBAL);
2619 add_thr_map(SUMA, 0, thr_LOCAL);
2620 add_thr_map(DBTUX, 0, thr_LOCAL);
2621 add_thr_map(TSMAN, 0, thr_LOCAL);
2622 add_thr_map(LGMAN, 0, thr_LOCAL);
2623 add_thr_map(PGMAN, 0, thr_LOCAL);
2624 add_thr_map(RESTORE, 0, thr_LOCAL);
2625 add_thr_map(DBINFO, 0, thr_LOCAL);
2626 add_thr_map(DBSPJ, 0, thr_GLOBAL);
2627 }
2628
2629 /* Workers added by LocalProxy (before first signal). */
2630 void
add_lqh_worker_thr_map(Uint32 block,Uint32 instance)2631 add_lqh_worker_thr_map(Uint32 block, Uint32 instance)
2632 {
2633 require(instance != 0);
2634 Uint32 i = instance - 1;
2635 Uint32 thr_no = NUM_MAIN_THREADS + i % num_lqh_threads;
2636 add_thr_map(block, instance, thr_no);
2637 }
2638
2639 /* Extra workers run`in proxy thread. */
2640 void
add_extra_worker_thr_map(Uint32 block,Uint32 instance)2641 add_extra_worker_thr_map(Uint32 block, Uint32 instance)
2642 {
2643 require(instance != 0);
2644 Uint32 thr_no = block2ThreadId(block, 0);
2645 add_thr_map(block, instance, thr_no);
2646 }
2647
2648 /**
2649 * create the duplicate entries needed so that
2650 * sender doesnt need to know how many instances there
2651 * actually are in this node...
2652 *
2653 * if only 1 instance...then duplicate that for all slots
2654 * else assume instance 0 is proxy...and duplicate workers (modulo)
2655 *
2656 * NOTE: extra pgman worker is instance 5
2657 */
2658 void
finalize_thr_map()2659 finalize_thr_map()
2660 {
2661 for (Uint32 b = 0; b < NO_OF_BLOCKS; b++)
2662 {
2663 Uint32 bno = b + MIN_BLOCK_NO;
2664 Uint32 cnt = 0;
2665 while (cnt < MAX_BLOCK_INSTANCES &&
2666 thr_map[b][cnt].thr_no != thr_map_entry::NULL_THR_NO)
2667 cnt++;
2668
2669 if (cnt != MAX_BLOCK_INSTANCES)
2670 {
2671 SimulatedBlock * main = globalData.getBlock(bno, 0);
2672 for (Uint32 i = cnt; i < MAX_BLOCK_INSTANCES; i++)
2673 {
2674 Uint32 dup = (cnt == 1) ? 0 : 1 + ((i - 1) % (cnt - 1));
2675 if (thr_map[b][i].thr_no == thr_map_entry::NULL_THR_NO)
2676 {
2677 thr_map[b][i] = thr_map[b][dup];
2678 main->addInstance(globalData.getBlock(bno, dup), i);
2679 }
2680 else
2681 {
2682 /**
2683 * extra pgman instance
2684 */
2685 require(bno == PGMAN);
2686 }
2687 }
2688 }
2689 }
2690 }
2691
reportSignalStats(Uint32 self,Uint32 a_count,Uint32 a_size,Uint32 b_count,Uint32 b_size)2692 static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size,
2693 Uint32 b_count, Uint32 b_size)
2694 {
2695 SignalT<6> sT;
2696 Signal *s= new (&sT) Signal(0);
2697
2698 memset(&s->header, 0, sizeof(s->header));
2699 s->header.theLength = 6;
2700 s->header.theSendersSignalId = 0;
2701 s->header.theSendersBlockRef = numberToRef(0, 0);
2702 s->header.theVerId_signalNumber = GSN_EVENT_REP;
2703 s->header.theReceiversBlockNumber = CMVMI;
2704 s->theData[0] = NDB_LE_MTSignalStatistics;
2705 s->theData[1] = self;
2706 s->theData[2] = a_count;
2707 s->theData[3] = a_size;
2708 s->theData[4] = b_count;
2709 s->theData[5] = b_size;
2710 /* ToDo: need this really be prio A like in old code? */
2711 sendlocal(self, &s->header, s->theData,
2712 NULL);
2713 }
2714
2715 static inline void
update_sched_stats(thr_data * selfptr)2716 update_sched_stats(thr_data *selfptr)
2717 {
2718 if(selfptr->m_prioa_count + selfptr->m_priob_count >= 2000000)
2719 {
2720 reportSignalStats(selfptr->m_thr_no,
2721 selfptr->m_prioa_count,
2722 selfptr->m_prioa_size,
2723 selfptr->m_priob_count,
2724 selfptr->m_priob_size);
2725 selfptr->m_prioa_count = 0;
2726 selfptr->m_prioa_size = 0;
2727 selfptr->m_priob_count = 0;
2728 selfptr->m_priob_size = 0;
2729
2730 #if 0
2731 Uint32 thr_no = selfptr->m_thr_no;
2732 ndbout_c("--- %u fifo: %u jba: %u global: %u",
2733 thr_no,
2734 fifo_used_pages(selfptr),
2735 selfptr->m_jba_head.used(),
2736 g_thr_repository.m_free_list.m_cnt);
2737 for (Uint32 i = 0; i<num_threads; i++)
2738 {
2739 ndbout_c(" %u-%u : %u",
2740 thr_no, i, selfptr->m_in_queue_head[i].used());
2741 }
2742 #endif
2743 }
2744 }
2745
2746 static void
init_thread(thr_data * selfptr)2747 init_thread(thr_data *selfptr)
2748 {
2749 selfptr->m_waiter.init();
2750 selfptr->m_jam.theEmulatedJamIndex = 0;
2751 selfptr->m_jam.theEmulatedJamBlockNumber = 0;
2752 bzero(selfptr->m_jam.theEmulatedJam, sizeof(selfptr->m_jam.theEmulatedJam));
2753 NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &selfptr->m_jam);
2754 NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
2755
2756 unsigned thr_no = selfptr->m_thr_no;
2757 globalEmulatorData.theWatchDog->
2758 registerWatchedThread(&selfptr->m_watchdog_counter, thr_no);
2759 {
2760 while(selfptr->m_thread == 0)
2761 NdbSleep_MilliSleep(30);
2762 }
2763
2764 THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
2765 BaseString tmp;
2766 tmp.appfmt("thr: %u ", thr_no);
2767
2768 int tid = NdbThread_GetTid(selfptr->m_thread);
2769 if (tid != -1)
2770 {
2771 tmp.appfmt("tid: %u ", tid);
2772 }
2773
2774 conf.appendInfo(tmp,
2775 selfptr->m_instance_list, selfptr->m_instance_count);
2776 int res = conf.do_bind(selfptr->m_thread,
2777 selfptr->m_instance_list, selfptr->m_instance_count);
2778 if (res < 0)
2779 {
2780 tmp.appfmt("err: %d ", -res);
2781 }
2782 else if (res > 0)
2783 {
2784 tmp.appfmt("OK ");
2785 }
2786
2787 selfptr->m_thr_id = pthread_self();
2788
2789 for (Uint32 i = 0; i < selfptr->m_instance_count; i++)
2790 {
2791 BlockReference block = selfptr->m_instance_list[i];
2792 Uint32 main = blockToMain(block);
2793 Uint32 instance = blockToInstance(block);
2794 tmp.appfmt("%s(%u) ", getBlockName(main), instance);
2795 }
2796 printf("%s\n", tmp.c_str());
2797 fflush(stdout);
2798 }
2799
2800 /**
2801 * Align signal buffer for better cache performance.
2802 * Also skew it a litte for each thread to avoid cache pollution.
2803 */
2804 #define SIGBUF_SIZE (sizeof(Signal) + 63 + 256 * MAX_THREADS)
2805 static Signal *
aligned_signal(unsigned char signal_buf[SIGBUF_SIZE],unsigned thr_no)2806 aligned_signal(unsigned char signal_buf[SIGBUF_SIZE], unsigned thr_no)
2807 {
2808 UintPtr sigtmp= (UintPtr)signal_buf;
2809 sigtmp= (sigtmp+63) & (~(UintPtr)63);
2810 sigtmp+= thr_no*256;
2811 return (Signal *)sigtmp;
2812 }
2813
2814 Uint32 receiverThreadId;
2815
2816 /*
2817 * We only do receive in thread 2, no other threads do receive.
2818 *
2819 * As part of the receive loop, we also periodically call update_connections()
2820 * (this way we are similar to single-threaded ndbd).
2821 *
2822 * The CMVMI block (and no other blocks) run in the same thread as this
2823 * receive loop; this way we avoid races between update_connections() and
2824 * CMVMI calls into the transporters.
2825 *
2826 * Note that with this setup, local signals to CMVMI cannot wake up the thread
2827 * if it is sleeping on the receive sockets. Thus CMVMI local signal processing
2828 * can be (slightly) delayed, however CMVMI is not really performance critical
2829 * (hopefully).
2830 */
2831 extern "C"
2832 void *
mt_receiver_thread_main(void * thr_arg)2833 mt_receiver_thread_main(void *thr_arg)
2834 {
2835 unsigned char signal_buf[SIGBUF_SIZE];
2836 Signal *signal;
2837 struct thr_repository* rep = &g_thr_repository;
2838 struct thr_data* selfptr = (struct thr_data *)thr_arg;
2839 unsigned thr_no = selfptr->m_thr_no;
2840 Uint32& watchDogCounter = selfptr->m_watchdog_counter;
2841 Uint32 thrSignalId = 0;
2842 bool has_received = false;
2843
2844 init_thread(selfptr);
2845 receiverThreadId = thr_no;
2846 signal = aligned_signal(signal_buf, thr_no);
2847
2848 while (globalData.theRestartFlag != perform_stop)
2849 {
2850 static int cnt = 0;
2851
2852 update_sched_stats(selfptr);
2853
2854 if (cnt == 0)
2855 {
2856 watchDogCounter = 5;
2857 globalTransporterRegistry.update_connections();
2858 }
2859 cnt = (cnt + 1) & 15;
2860
2861 watchDogCounter = 2;
2862
2863 NDB_TICKS now = NdbTick_CurrentMillisecond();
2864 scan_time_queues(selfptr, now);
2865
2866 Uint32 sum = run_job_buffers(selfptr, signal, &thrSignalId);
2867
2868 if (sum || has_received)
2869 {
2870 watchDogCounter = 6;
2871 flush_jbb_write_state(selfptr);
2872 }
2873
2874 do_send(selfptr, TRUE);
2875
2876 watchDogCounter = 7;
2877
2878 has_received = false;
2879 if (globalTransporterRegistry.pollReceive(1))
2880 {
2881 if (check_job_buffers(rep) == 0)
2882 {
2883 watchDogCounter = 8;
2884 lock(&rep->m_receive_lock);
2885 globalTransporterRegistry.performReceive();
2886 unlock(&rep->m_receive_lock);
2887 has_received = true;
2888 }
2889 }
2890 }
2891
2892 globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
2893 return NULL; // Return value not currently used
2894 }
2895
2896 static
2897 inline
2898 void
sendpacked(struct thr_data * thr_ptr,Signal * signal)2899 sendpacked(struct thr_data* thr_ptr, Signal* signal)
2900 {
2901 Uint32 i;
2902 for (i = 0; i < thr_ptr->m_instance_count; i++)
2903 {
2904 BlockReference block = thr_ptr->m_instance_list[i];
2905 Uint32 main = blockToMain(block);
2906 Uint32 instance = blockToInstance(block);
2907 SimulatedBlock* b = globalData.getBlock(main, instance);
2908 // wl4391_todo remove useless assert
2909 assert(b != 0 && b->getThreadId() == thr_ptr->m_thr_no);
2910 /* b->send_at_job_buffer_end(); */
2911 b->executeFunction(GSN_SEND_PACKED, signal);
2912 }
2913 }
2914
2915 /**
2916 * check if out-queues of selfptr is full
2917 * return true is so
2918 */
2919 static bool
check_job_buffer_full(thr_data * selfptr)2920 check_job_buffer_full(thr_data *selfptr)
2921 {
2922 Uint32 thr_no = selfptr->m_thr_no;
2923 Uint32 tmp = compute_max_signals_to_execute(thr_no);
2924 #if 0
2925 Uint32 perjb = tmp / g_thr_repository.m_thread_count;
2926
2927 if (perjb == 0)
2928 {
2929 return true;
2930 }
2931
2932 return false;
2933 #else
2934 if (tmp < g_thr_repository.m_thread_count)
2935 return true;
2936 return false;
2937 #endif
2938 }
2939
2940 /**
2941 * update_sched_config
2942 *
2943 * In order to prevent "job-buffer-full", i.e
2944 * that one thread(T1) produces so much signals to another thread(T2)
2945 * so that the ring-buffer from T1 to T2 gets full
2946 * the mainlop have 2 "config" variables
2947 * - m_max_exec_signals
2948 * This is the *total* no of signals T1 can execute before calling
2949 * this method again
2950 * - m_max_signals_per_jb
2951 * This is the max no of signals T1 can execute from each other thread
2952 * in system
2953 *
2954 * Assumption: each signal may send *at most* 4 signals
2955 * - this assumption is made the same in ndbd and ndbmtd and is
2956 * mostly followed by block-code, although not it all places :-(
2957 *
2958 * This function return true, if it it slept
2959 * (i.e that it concluded that it could not execute *any* signals, wo/
2960 * risking job-buffer-full)
2961 */
2962 static
2963 bool
update_sched_config(struct thr_data * selfptr,Uint32 pending_send)2964 update_sched_config(struct thr_data* selfptr, Uint32 pending_send)
2965 {
2966 Uint32 sleeploop = 0;
2967 Uint32 thr_no = selfptr->m_thr_no;
2968 loop:
2969 Uint32 tmp = compute_max_signals_to_execute(thr_no);
2970 Uint32 perjb = tmp / g_thr_repository.m_thread_count;
2971
2972 if (perjb > MAX_SIGNALS_PER_JB)
2973 perjb = MAX_SIGNALS_PER_JB;
2974
2975 selfptr->m_max_exec_signals = tmp;
2976 selfptr->m_max_signals_per_jb = perjb;
2977
2978 if (unlikely(perjb == 0))
2979 {
2980 sleeploop++;
2981 if (sleeploop == 10)
2982 {
2983 /**
2984 * we've slept for 10ms...try running anyway
2985 */
2986 selfptr->m_max_signals_per_jb = 1;
2987 ndbout_c("%u - sleeploop 10!!", selfptr->m_thr_no);
2988 return true;
2989 }
2990
2991 if (pending_send)
2992 {
2993 /* About to sleep, _must_ send now. */
2994 pending_send = do_send(selfptr, TRUE);
2995 }
2996
2997 const Uint32 wait = 1000000; /* 1 ms */
2998 yield(&selfptr->m_waiter, wait, check_job_buffer_full, selfptr);
2999 goto loop;
3000 }
3001
3002 return sleeploop > 0;
3003 }
3004
3005 extern "C"
3006 void *
mt_job_thread_main(void * thr_arg)3007 mt_job_thread_main(void *thr_arg)
3008 {
3009 unsigned char signal_buf[SIGBUF_SIZE];
3010 Signal *signal;
3011 const Uint32 nowait = 10 * 1000000; /* 10 ms */
3012 Uint32 thrSignalId = 0;
3013
3014 struct thr_data* selfptr = (struct thr_data *)thr_arg;
3015 init_thread(selfptr);
3016 Uint32& watchDogCounter = selfptr->m_watchdog_counter;
3017
3018 unsigned thr_no = selfptr->m_thr_no;
3019 signal = aligned_signal(signal_buf, thr_no);
3020
3021 /* Avoid false watchdog alarms caused by race condition. */
3022 watchDogCounter = 1;
3023
3024 Uint32 pending_send = 0;
3025 Uint32 send_sum = 0;
3026 int loops = 0;
3027 int maxloops = 10;/* Loops before reading clock, fuzzy adapted to 1ms freq. */
3028 NDB_TICKS now = selfptr->m_time;
3029
3030 while (globalData.theRestartFlag != perform_stop)
3031 {
3032 loops++;
3033 update_sched_stats(selfptr);
3034
3035 watchDogCounter = 2;
3036 scan_time_queues(selfptr, now);
3037
3038 Uint32 sum = run_job_buffers(selfptr, signal, &thrSignalId);
3039
3040 watchDogCounter = 1;
3041 signal->header.m_noOfSections = 0; /* valgrind */
3042 sendpacked(selfptr, signal);
3043
3044 if (sum)
3045 {
3046 watchDogCounter = 6;
3047 flush_jbb_write_state(selfptr);
3048 send_sum += sum;
3049
3050 if (send_sum > MAX_SIGNALS_BEFORE_SEND)
3051 {
3052 /* Try to send, but skip for now in case of lock contention. */
3053 pending_send = do_send(selfptr, FALSE);
3054 send_sum = 0;
3055 }
3056 else
3057 {
3058 /* Send buffers append to send queues to dst. nodes. */
3059 do_flush(selfptr);
3060 }
3061 }
3062 else
3063 {
3064 /* No signals processed, prepare to sleep to wait for more */
3065 if (pending_send || send_sum > 0)
3066 {
3067 /* About to sleep, _must_ send now. */
3068 pending_send = do_send(selfptr, TRUE);
3069 send_sum = 0;
3070 }
3071
3072 if (pending_send == 0)
3073 {
3074 bool waited = yield(&selfptr->m_waiter, nowait, check_queues_empty,
3075 selfptr);
3076 if (waited)
3077 {
3078 /* Update current time after sleeping */
3079 now = NdbTick_CurrentMillisecond();
3080 loops = 0;
3081 }
3082 }
3083 }
3084
3085 /**
3086 * Check if we executed enough signals,
3087 * and if so recompute how many signals to execute
3088 */
3089 if (sum >= selfptr->m_max_exec_signals)
3090 {
3091 if (update_sched_config(selfptr, pending_send))
3092 {
3093 /* Update current time after sleeping */
3094 now = NdbTick_CurrentMillisecond();
3095 loops = 0;
3096 }
3097 }
3098 else
3099 {
3100 selfptr->m_max_exec_signals -= sum;
3101 }
3102
3103 /**
3104 * Adaptive reading freq. of systeme time every time 1ms
3105 * is likely to have passed
3106 */
3107 if (loops > maxloops)
3108 {
3109 now = NdbTick_CurrentMillisecond();
3110 Uint64 diff = now - selfptr->m_time;
3111
3112 /* Adjust 'maxloop' to achieve clock reading frequency of 1ms */
3113 if (diff < 1)
3114 maxloops += ((maxloops/10) + 1); /* No change: less frequent reading */
3115 else if (diff > 1 && maxloops > 1)
3116 maxloops -= ((maxloops/10) + 1); /* Overslept: Need more frequent read*/
3117
3118 loops = 0;
3119 }
3120 }
3121
3122 globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
3123 return NULL; // Return value not currently used
3124 }
3125
3126 void
sendlocal(Uint32 self,const SignalHeader * s,const Uint32 * data,const Uint32 secPtr[3])3127 sendlocal(Uint32 self, const SignalHeader *s, const Uint32 *data,
3128 const Uint32 secPtr[3])
3129 {
3130 Uint32 block = blockToMain(s->theReceiversBlockNumber);
3131 Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
3132
3133 /*
3134 * Max number of signals to put into job buffer before flushing the buffer
3135 * to the other thread.
3136 * This parameter found to be reasonable by benchmarking.
3137 */
3138 Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == receiver_thread_no) ?
3139 MAX_SIGNALS_BEFORE_FLUSH_RECEIVER :
3140 MAX_SIGNALS_BEFORE_FLUSH_OTHER;
3141
3142 Uint32 dst = block2ThreadId(block, instance);
3143 struct thr_repository* rep = &g_thr_repository;
3144 struct thr_data * selfptr = rep->m_thread + self;
3145 assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
3146 struct thr_data * dstptr = rep->m_thread + dst;
3147
3148 selfptr->m_priob_count++;
3149 Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
3150 selfptr->m_priob_size += siglen;
3151
3152 thr_job_queue *q = dstptr->m_in_queue + self;
3153 thr_jb_write_state *w = selfptr->m_write_states + dst;
3154 if (insert_signal(q, w, false, s, data, secPtr, selfptr->m_next_buffer))
3155 {
3156 selfptr->m_next_buffer = seize_buffer(rep, self, false);
3157 }
3158 if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
3159 flush_write_state(selfptr, dstptr, q->m_head, w);
3160 }
3161
3162 void
sendprioa(Uint32 self,const SignalHeader * s,const uint32 * data,const Uint32 secPtr[3])3163 sendprioa(Uint32 self, const SignalHeader *s, const uint32 *data,
3164 const Uint32 secPtr[3])
3165 {
3166 Uint32 block = blockToMain(s->theReceiversBlockNumber);
3167 Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
3168
3169 Uint32 dst = block2ThreadId(block, instance);
3170 struct thr_repository* rep = &g_thr_repository;
3171 struct thr_data *selfptr = rep->m_thread + self;
3172 assert(s->theVerId_signalNumber == GSN_START_ORD ||
3173 pthread_equal(selfptr->m_thr_id, pthread_self()));
3174 struct thr_data *dstptr = rep->m_thread + dst;
3175
3176 selfptr->m_prioa_count++;
3177 Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
3178 selfptr->m_prioa_size += siglen;
3179
3180 thr_job_queue *q = &(dstptr->m_jba);
3181 thr_jb_write_state w;
3182
3183 lock(&dstptr->m_jba_write_lock);
3184
3185 Uint32 index = q->m_head->m_write_index;
3186 w.m_write_index = index;
3187 thr_job_buffer *buffer = q->m_buffers[index];
3188 w.m_write_buffer = buffer;
3189 w.m_write_pos = buffer->m_len;
3190 w.m_pending_signals = 0;
3191 w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
3192 bool buf_used = insert_signal(q, &w, true, s, data, secPtr,
3193 selfptr->m_next_buffer);
3194 flush_write_state(selfptr, dstptr, q->m_head, &w);
3195
3196 unlock(&dstptr->m_jba_write_lock);
3197
3198 if (buf_used)
3199 selfptr->m_next_buffer = seize_buffer(rep, self, true);
3200 }
3201
3202 /**
3203 * Send a signal to a remote node.
3204 *
3205 * (The signal is only queued here, and actually sent later in do_send()).
3206 */
3207 SendStatus
mt_send_remote(Uint32 self,const SignalHeader * sh,Uint8 prio,const Uint32 * data,NodeId nodeId,const LinearSectionPtr ptr[3])3208 mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
3209 const Uint32 * data, NodeId nodeId,
3210 const LinearSectionPtr ptr[3])
3211 {
3212 thr_repository *rep = &g_thr_repository;
3213 thr_data *selfptr = rep->m_thread + self;
3214 SendStatus ss;
3215
3216 mt_send_handle handle(selfptr);
3217 register_pending_send(selfptr, nodeId);
3218 /* prepareSend() is lock-free, as we have per-thread send buffers. */
3219 ss = globalTransporterRegistry.prepareSend(&handle,
3220 sh, prio, data, nodeId, ptr);
3221 return ss;
3222 }
3223
3224 SendStatus
mt_send_remote(Uint32 self,const SignalHeader * sh,Uint8 prio,const Uint32 * data,NodeId nodeId,class SectionSegmentPool * thePool,const SegmentedSectionPtr ptr[3])3225 mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
3226 const Uint32 *data, NodeId nodeId,
3227 class SectionSegmentPool *thePool,
3228 const SegmentedSectionPtr ptr[3])
3229 {
3230 thr_repository *rep = &g_thr_repository;
3231 thr_data *selfptr = rep->m_thread + self;
3232 SendStatus ss;
3233
3234 mt_send_handle handle(selfptr);
3235 register_pending_send(selfptr, nodeId);
3236 ss = globalTransporterRegistry.prepareSend(&handle,
3237 sh, prio, data, nodeId,
3238 *thePool, ptr);
3239 return ss;
3240 }
3241
3242 /*
3243 * This functions sends a prio A STOP_FOR_CRASH signal to a thread.
3244 *
3245 * It works when called from any other thread, not just from job processing
3246 * threads. But note that this signal will be the last signal to be executed by
3247 * the other thread, as it will exit immediately.
3248 */
3249 static
3250 void
sendprioa_STOP_FOR_CRASH(const struct thr_data * selfptr,Uint32 dst)3251 sendprioa_STOP_FOR_CRASH(const struct thr_data *selfptr, Uint32 dst)
3252 {
3253 SignalT<StopForCrash::SignalLength> signalT;
3254 struct thr_repository* rep = &g_thr_repository;
3255 /* As this signal will be the last one executed by the other thread, it does
3256 not matter which buffer we use in case the current buffer is filled up by
3257 the STOP_FOR_CRASH signal; the data in it will never be read.
3258 */
3259 static thr_job_buffer dummy_buffer;
3260
3261 /**
3262 * Pick any instance running in this thread
3263 */
3264 struct thr_data * dstptr = rep->m_thread + dst;
3265 Uint32 bno = dstptr->m_instance_list[0];
3266
3267 memset(&signalT.header, 0, sizeof(SignalHeader));
3268 signalT.header.theVerId_signalNumber = GSN_STOP_FOR_CRASH;
3269 signalT.header.theReceiversBlockNumber = bno;
3270 signalT.header.theSendersBlockRef = 0;
3271 signalT.header.theTrace = 0;
3272 signalT.header.theSendersSignalId = 0;
3273 signalT.header.theSignalId = 0;
3274 signalT.header.theLength = StopForCrash::SignalLength;
3275 StopForCrash * stopForCrash = CAST_PTR(StopForCrash, &signalT.theData[0]);
3276 stopForCrash->flags = 0;
3277
3278 thr_job_queue *q = &(dstptr->m_jba);
3279 thr_jb_write_state w;
3280
3281 lock(&dstptr->m_jba_write_lock);
3282
3283 Uint32 index = q->m_head->m_write_index;
3284 w.m_write_index = index;
3285 thr_job_buffer *buffer = q->m_buffers[index];
3286 w.m_write_buffer = buffer;
3287 w.m_write_pos = buffer->m_len;
3288 w.m_pending_signals = 0;
3289 w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
3290 insert_signal(q, &w, true, &signalT.header, signalT.theData, NULL,
3291 &dummy_buffer);
3292 flush_write_state(selfptr, dstptr, q->m_head, &w);
3293
3294 unlock(&dstptr->m_jba_write_lock);
3295 }
3296
3297 /**
3298 * init functions
3299 */
3300 static
3301 void
queue_init(struct thr_tq * tq)3302 queue_init(struct thr_tq* tq)
3303 {
3304 tq->m_next_timer = 0;
3305 tq->m_current_time = 0;
3306 tq->m_next_free = RNIL;
3307 tq->m_cnt[0] = tq->m_cnt[1] = 0;
3308 bzero(tq->m_delayed_signals, sizeof(tq->m_delayed_signals));
3309 }
3310
3311 static
3312 void
thr_init(struct thr_repository * rep,struct thr_data * selfptr,unsigned int cnt,unsigned thr_no)3313 thr_init(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
3314 unsigned thr_no)
3315 {
3316 Uint32 i;
3317
3318 selfptr->m_thr_no = thr_no;
3319 selfptr->m_max_signals_per_jb = MAX_SIGNALS_PER_JB;
3320 selfptr->m_max_exec_signals = 0;
3321 selfptr->m_first_free = 0;
3322 selfptr->m_first_unused = 0;
3323
3324 {
3325 char buf[100];
3326 BaseString::snprintf(buf, sizeof(buf), "jbalock thr: %u", thr_no);
3327 register_lock(&selfptr->m_jba_write_lock, buf);
3328 }
3329 selfptr->m_jba_head.m_read_index = 0;
3330 selfptr->m_jba_head.m_write_index = 0;
3331 selfptr->m_jba.m_head = &selfptr->m_jba_head;
3332 thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
3333 selfptr->m_jba.m_buffers[0] = buffer;
3334 selfptr->m_jba_read_state.m_read_index = 0;
3335 selfptr->m_jba_read_state.m_read_buffer = buffer;
3336 selfptr->m_jba_read_state.m_read_pos = 0;
3337 selfptr->m_jba_read_state.m_read_end = 0;
3338 selfptr->m_jba_read_state.m_write_index = 0;
3339 selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
3340 selfptr->m_send_buffer_pool.set_pool(&rep->m_sb_pool);
3341
3342 for (i = 0; i<cnt; i++)
3343 {
3344 selfptr->m_in_queue_head[i].m_read_index = 0;
3345 selfptr->m_in_queue_head[i].m_write_index = 0;
3346 selfptr->m_in_queue[i].m_head = &selfptr->m_in_queue_head[i];
3347 buffer = seize_buffer(rep, thr_no, false);
3348 selfptr->m_in_queue[i].m_buffers[0] = buffer;
3349 selfptr->m_read_states[i].m_read_index = 0;
3350 selfptr->m_read_states[i].m_read_buffer = buffer;
3351 selfptr->m_read_states[i].m_read_pos = 0;
3352 selfptr->m_read_states[i].m_read_end = 0;
3353 selfptr->m_read_states[i].m_write_index = 0;
3354 }
3355 queue_init(&selfptr->m_tq);
3356
3357 selfptr->m_prioa_count = 0;
3358 selfptr->m_prioa_size = 0;
3359 selfptr->m_priob_count = 0;
3360 selfptr->m_priob_size = 0;
3361
3362 selfptr->m_pending_send_count = 0;
3363 selfptr->m_pending_send_mask.clear();
3364
3365 selfptr->m_instance_count = 0;
3366 for (i = 0; i < MAX_INSTANCES_PER_THREAD; i++)
3367 selfptr->m_instance_list[i] = 0;
3368
3369 bzero(&selfptr->m_send_buffers, sizeof(selfptr->m_send_buffers));
3370
3371 selfptr->m_thread = 0;
3372 selfptr->m_cpu = NO_LOCK_CPU;
3373 }
3374
3375 /* Have to do this after init of all m_in_queues is done. */
3376 static
3377 void
thr_init2(struct thr_repository * rep,struct thr_data * selfptr,unsigned int cnt,unsigned thr_no)3378 thr_init2(struct thr_repository* rep, struct thr_data *selfptr,
3379 unsigned int cnt, unsigned thr_no)
3380 {
3381 for (Uint32 i = 0; i<cnt; i++)
3382 {
3383 selfptr->m_write_states[i].m_write_index = 0;
3384 selfptr->m_write_states[i].m_write_pos = 0;
3385 selfptr->m_write_states[i].m_write_buffer =
3386 rep->m_thread[i].m_in_queue[thr_no].m_buffers[0];
3387 selfptr->m_write_states[i].m_pending_signals = 0;
3388 selfptr->m_write_states[i].m_pending_signals_wakeup = 0;
3389 }
3390 }
3391
3392 static
3393 void
send_buffer_init(Uint32 node,thr_repository::send_buffer * sb)3394 send_buffer_init(Uint32 node, thr_repository::send_buffer * sb)
3395 {
3396 char buf[100];
3397 BaseString::snprintf(buf, sizeof(buf), "send lock node %d", node);
3398 register_lock(&sb->m_send_lock, buf);
3399 sb->m_force_send = 0;
3400 sb->m_send_thread = NO_SEND_THREAD;
3401 bzero(&sb->m_buffer, sizeof(sb->m_buffer));
3402 sb->m_bytes = 0;
3403 bzero(sb->m_read_index, sizeof(sb->m_read_index));
3404 }
3405
3406 static
3407 void
rep_init(struct thr_repository * rep,unsigned int cnt,Ndbd_mem_manager * mm)3408 rep_init(struct thr_repository* rep, unsigned int cnt, Ndbd_mem_manager *mm)
3409 {
3410 rep->m_mm = mm;
3411
3412 rep->m_thread_count = cnt;
3413 for (unsigned int i = 0; i<cnt; i++)
3414 {
3415 thr_init(rep, rep->m_thread + i, cnt, i);
3416 }
3417 for (unsigned int i = 0; i<cnt; i++)
3418 {
3419 thr_init2(rep, rep->m_thread + i, cnt, i);
3420 }
3421
3422 rep->stopped_threads = 0;
3423 NdbMutex_Init(&rep->stop_for_crash_mutex);
3424 NdbCondition_Init(&rep->stop_for_crash_cond);
3425
3426 for (int i = 0 ; i < MAX_NTRANSPORTERS; i++)
3427 {
3428 send_buffer_init(i, rep->m_send_buffers+i);
3429 }
3430
3431 bzero(rep->m_thread_send_buffers, sizeof(rep->m_thread_send_buffers));
3432 }
3433
3434
3435 /**
3436 * Thread Config
3437 */
3438
3439 #include "ThreadConfig.hpp"
3440 #include <signaldata/StartOrd.hpp>
3441
3442 Uint32
compute_jb_pages(struct EmulatorData * ed)3443 compute_jb_pages(struct EmulatorData * ed)
3444 {
3445 Uint32 cnt = NUM_MAIN_THREADS + globalData.ndbMtLqhThreads + 1;
3446
3447 Uint32 perthread = 0;
3448
3449 /**
3450 * Each thread can have thr_job_queue::SIZE pages in out-queues
3451 * to each other thread
3452 */
3453 perthread += cnt * (1 + thr_job_queue::SIZE);
3454
3455 /**
3456 * And thr_job_queue::SIZE prio A signals
3457 */
3458 perthread += (1 + thr_job_queue::SIZE);
3459
3460 /**
3461 * And XXX time-queue signals
3462 */
3463 perthread += 32; // Say 1M for now
3464
3465 /**
3466 * Each thread also keeps an own cache with max THR_FREE_BUF_MAX
3467 */
3468 perthread += THR_FREE_BUF_MAX;
3469
3470 /**
3471 * Multiply by no of threads
3472 */
3473 Uint32 tot = cnt * perthread;
3474
3475 return tot;
3476 }
3477
ThreadConfig()3478 ThreadConfig::ThreadConfig()
3479 {
3480 }
3481
~ThreadConfig()3482 ThreadConfig::~ThreadConfig()
3483 {
3484 }
3485
3486 /*
3487 * We must do the init here rather than in the constructor, since at
3488 * constructor time the global memory manager is not available.
3489 */
3490 void
init()3491 ThreadConfig::init()
3492 {
3493 num_lqh_workers = globalData.ndbMtLqhWorkers;
3494 num_lqh_threads = globalData.ndbMtLqhThreads;
3495 num_threads = NUM_MAIN_THREADS + num_lqh_threads + 1;
3496 require(num_threads <= MAX_THREADS);
3497 receiver_thread_no = num_threads - 1;
3498
3499 ndbout << "NDBMT: num_threads=" << num_threads << endl;
3500
3501 ::rep_init(&g_thr_repository, num_threads,
3502 globalEmulatorData.m_mem_manager);
3503 }
3504
3505 static
3506 void
setcpuaffinity(struct thr_repository * rep)3507 setcpuaffinity(struct thr_repository* rep)
3508 {
3509 THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
3510 conf.create_cpusets();
3511 if (conf.getInfoMessage())
3512 {
3513 printf("%s", conf.getInfoMessage());
3514 fflush(stdout);
3515 }
3516 }
3517
3518 void
ipControlLoop(NdbThread * pThis,Uint32 thread_index)3519 ThreadConfig::ipControlLoop(NdbThread* pThis, Uint32 thread_index)
3520 {
3521 unsigned int thr_no;
3522 struct thr_repository* rep = &g_thr_repository;
3523
3524 /**
3525 * assign threads to CPU's
3526 */
3527 setcpuaffinity(rep);
3528
3529 /*
3530 * Start threads for all execution threads, except for the receiver
3531 * thread, which runs in the main thread.
3532 */
3533 for (thr_no = 0; thr_no < num_threads; thr_no++)
3534 {
3535 rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
3536
3537 if (thr_no == receiver_thread_no)
3538 continue; // Will run in the main thread.
3539
3540 /*
3541 * The NdbThread_Create() takes void **, but that is cast to void * when
3542 * passed to the thread function. Which is kind of strange ...
3543 */
3544 rep->m_thread[thr_no].m_thread =
3545 NdbThread_Create(mt_job_thread_main,
3546 (void **)(rep->m_thread + thr_no),
3547 1024*1024,
3548 "execute thread", //ToDo add number
3549 NDB_THREAD_PRIO_MEAN);
3550 require(rep->m_thread[thr_no].m_thread != NULL);
3551 }
3552
3553 /* Now run the main loop for thread 0 directly. */
3554 rep->m_thread[receiver_thread_no].m_thread = pThis;
3555 mt_receiver_thread_main(&(rep->m_thread[receiver_thread_no]));
3556
3557 /* Wait for all threads to shutdown. */
3558 for (thr_no = 0; thr_no < num_threads; thr_no++)
3559 {
3560 if (thr_no == receiver_thread_no)
3561 continue;
3562 void *dummy_return_status;
3563 NdbThread_WaitFor(rep->m_thread[thr_no].m_thread, &dummy_return_status);
3564 NdbThread_Destroy(&(rep->m_thread[thr_no].m_thread));
3565 }
3566 }
3567
3568 int
doStart(NodeState::StartLevel startLevel)3569 ThreadConfig::doStart(NodeState::StartLevel startLevel)
3570 {
3571 SignalT<3> signalT;
3572 memset(&signalT.header, 0, sizeof(SignalHeader));
3573
3574 signalT.header.theVerId_signalNumber = GSN_START_ORD;
3575 signalT.header.theReceiversBlockNumber = CMVMI;
3576 signalT.header.theSendersBlockRef = 0;
3577 signalT.header.theTrace = 0;
3578 signalT.header.theSignalId = 0;
3579 signalT.header.theLength = StartOrd::SignalLength;
3580
3581 StartOrd * startOrd = CAST_PTR(StartOrd, &signalT.theData[0]);
3582 startOrd->restartInfo = 0;
3583
3584 sendprioa(block2ThreadId(CMVMI, 0), &signalT.header, signalT.theData, 0);
3585 return 0;
3586 }
3587
3588 /*
3589 * Compare signal ids, taking into account overflow/wrapover.
3590 * Return same as strcmp().
3591 * Eg.
3592 * wrap_compare(0x10,0x20) -> -1
3593 * wrap_compare(0x10,0xffffff20) -> 1
3594 * wrap_compare(0xffffff80,0xffffff20) -> 1
3595 * wrap_compare(0x7fffffff, 0x80000001) -> -1
3596 */
3597 static
3598 inline
3599 int
wrap_compare(Uint32 a,Uint32 b)3600 wrap_compare(Uint32 a, Uint32 b)
3601 {
3602 /* Avoid dependencies on undefined C/C++ interger overflow semantics. */
3603 if (a >= 0x80000000)
3604 if (b >= 0x80000000)
3605 return (int)(a & 0x7fffffff) - (int)(b & 0x7fffffff);
3606 else
3607 return (a - b) >= 0x80000000 ? -1 : 1;
3608 else
3609 if (b >= 0x80000000)
3610 return (b - a) >= 0x80000000 ? 1 : -1;
3611 else
3612 return (int)a - (int)b;
3613 }
3614
3615 Uint32
traceDumpGetNumThreads()3616 FastScheduler::traceDumpGetNumThreads()
3617 {
3618 /* The last thread is only for receiver -> no trace file. */
3619 return num_threads;
3620 }
3621
3622 bool
traceDumpGetJam(Uint32 thr_no,Uint32 & jamBlockNumber,const Uint32 * & thrdTheEmulatedJam,Uint32 & thrdTheEmulatedJamIndex)3623 FastScheduler::traceDumpGetJam(Uint32 thr_no, Uint32 & jamBlockNumber,
3624 const Uint32 * & thrdTheEmulatedJam,
3625 Uint32 & thrdTheEmulatedJamIndex)
3626 {
3627 if (thr_no >= num_threads)
3628 return false;
3629
3630 #ifdef NO_EMULATED_JAM
3631 jamBlockNumber = 0;
3632 thrdTheEmulatedJam = NULL;
3633 thrdTheEmulatedJamIndex = 0;
3634 #else
3635 const EmulatedJamBuffer *jamBuffer = &g_thr_repository.m_thread[thr_no].m_jam;
3636 thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
3637 thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
3638 jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
3639 #endif
3640 return true;
3641 }
3642
3643 void
traceDumpPrepare(NdbShutdownType & nst)3644 FastScheduler::traceDumpPrepare(NdbShutdownType& nst)
3645 {
3646 /*
3647 * We are about to generate trace files for all threads.
3648 *
3649 * We want to stop all threads processing before we dump, as otherwise the
3650 * signal buffers could change while dumping, leading to inconsistent
3651 * results.
3652 *
3653 * To stop threads, we send the GSN_STOP_FOR_CRASH signal as prio A to each
3654 * thread. We then wait for threads to signal they are done (but not forever,
3655 * so as to not have one hanging thread prevent the generation of trace
3656 * dumps). We also must be careful not to send to ourself if the crash is
3657 * being processed by one of the threads processing signals.
3658 *
3659 * We do not stop the transporter thread, as it cannot receive signals (but
3660 * because it does not receive signals it does not really influence dumps in
3661 * any case).
3662 */
3663 void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3664 const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
3665 /* The selfptr might be NULL, or pointer to thread that crashed. */
3666
3667 Uint32 waitFor_count = 0;
3668 NdbMutex_Lock(&g_thr_repository.stop_for_crash_mutex);
3669 g_thr_repository.stopped_threads = 0;
3670
3671 for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++)
3672 {
3673 if (selfptr != NULL && selfptr->m_thr_no == thr_no)
3674 {
3675 /* This is own thread; we have already stopped processing. */
3676 continue;
3677 }
3678
3679 sendprioa_STOP_FOR_CRASH(selfptr, thr_no);
3680
3681 waitFor_count++;
3682 }
3683
3684 static const Uint32 max_wait_seconds = 2;
3685 NDB_TICKS start = NdbTick_CurrentMillisecond();
3686 while (g_thr_repository.stopped_threads < waitFor_count)
3687 {
3688 NdbCondition_WaitTimeout(&g_thr_repository.stop_for_crash_cond,
3689 &g_thr_repository.stop_for_crash_mutex,
3690 10);
3691 NDB_TICKS now = NdbTick_CurrentMillisecond();
3692 if (now > start + max_wait_seconds * 1000)
3693 break; // Give up
3694 }
3695 if (g_thr_repository.stopped_threads < waitFor_count)
3696 {
3697 if (nst != NST_ErrorInsert)
3698 {
3699 nst = NST_Watchdog; // Make this abort fast
3700 }
3701 ndbout_c("Warning: %d thread(s) did not stop before starting crash dump.",
3702 waitFor_count - g_thr_repository.stopped_threads);
3703 }
3704 NdbMutex_Unlock(&g_thr_repository.stop_for_crash_mutex);
3705
3706 /* Now we are ready (or as ready as can be) for doing crash dump. */
3707 }
3708
mt_execSTOP_FOR_CRASH()3709 void mt_execSTOP_FOR_CRASH()
3710 {
3711 void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3712 const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
3713 require(selfptr != NULL);
3714
3715 NdbMutex_Lock(&g_thr_repository.stop_for_crash_mutex);
3716 g_thr_repository.stopped_threads++;
3717 NdbCondition_Signal(&g_thr_repository.stop_for_crash_cond);
3718 NdbMutex_Unlock(&g_thr_repository.stop_for_crash_mutex);
3719
3720 /* ToDo: is this correct? */
3721 globalEmulatorData.theWatchDog->unregisterWatchedThread(selfptr->m_thr_no);
3722
3723 pthread_exit(NULL);
3724 }
3725
3726 void
dumpSignalMemory(Uint32 thr_no,FILE * out)3727 FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE* out)
3728 {
3729 void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3730 thr_data *selfptr = reinterpret_cast<thr_data *>(value);
3731 const thr_repository *rep = &g_thr_repository;
3732 /*
3733 * The selfptr might be NULL, or pointer to thread that is doing the crash
3734 * jump.
3735 * If non-null, we should update the watchdog counter while dumping.
3736 */
3737 Uint32 *watchDogCounter;
3738 if (selfptr)
3739 watchDogCounter = &selfptr->m_watchdog_counter;
3740 else
3741 watchDogCounter = NULL;
3742
3743 /*
3744 * We want to dump the signal buffers from last executed to first executed.
3745 * So we first need to find the correct sequence to output signals in, stored
3746 * in this arrray.
3747 *
3748 * We will check any buffers in the cyclic m_free_fifo. In addition,
3749 * we also need to scan the already executed part of the current
3750 * buffer in m_jba.
3751 *
3752 * Due to partial execution of prio A buffers, we will use signal ids to know
3753 * where to interleave prio A signals into the stream of prio B signals
3754 * read. So we will keep a pointer to a prio A buffer around; and while
3755 * scanning prio B buffers we will interleave prio A buffers from that buffer
3756 * when the signal id fits the sequence.
3757 *
3758 * This also means that we may have to discard the earliest part of available
3759 * prio A signal data due to too little prio B data present, or vice versa.
3760 */
3761 static const Uint32 MAX_SIGNALS_TO_DUMP = 4096;
3762 struct {
3763 const SignalHeader *ptr;
3764 bool prioa;
3765 } signalSequence[MAX_SIGNALS_TO_DUMP];
3766 Uint32 seq_start = 0;
3767 Uint32 seq_end = 0;
3768
3769 const thr_data *thr_ptr = &rep->m_thread[thr_no];
3770 if (watchDogCounter)
3771 *watchDogCounter = 4;
3772
3773 /*
3774 * ToDo: Might do some sanity check to avoid crashing on not yet initialised
3775 * thread.
3776 */
3777
3778 /* Scan all available buffers with already executed signals. */
3779
3780 /*
3781 * Keep track of all available buffers, so that we can pick out signals in
3782 * the same order they were executed (order obtained from signal id).
3783 *
3784 * We may need to keep track of THR_FREE_BUF_MAX buffers for fully executed
3785 * (and freed) buffers, plus MAX_THREADS buffers for currently active
3786 * prio B buffers, plus one active prio A buffer.
3787 */
3788 struct {
3789 const thr_job_buffer *m_jb;
3790 Uint32 m_pos;
3791 Uint32 m_max;
3792 } jbs[THR_FREE_BUF_MAX + MAX_THREADS + 1];
3793
3794 Uint32 num_jbs = 0;
3795
3796 /* Load released buffers. */
3797 Uint32 idx = thr_ptr->m_first_free;
3798 while (idx != thr_ptr->m_first_unused)
3799 {
3800 const thr_job_buffer *q = thr_ptr->m_free_fifo[idx];
3801 if (q->m_len > 0)
3802 {
3803 jbs[num_jbs].m_jb = q;
3804 jbs[num_jbs].m_pos = 0;
3805 jbs[num_jbs].m_max = q->m_len;
3806 num_jbs++;
3807 }
3808 idx = (idx + 1) % THR_FREE_BUF_MAX;
3809 }
3810 /* Load any active prio B buffers. */
3811 for (Uint32 thr_no = 0; thr_no < rep->m_thread_count; thr_no++)
3812 {
3813 const thr_job_queue *q = thr_ptr->m_in_queue + thr_no;
3814 const thr_jb_read_state *r = thr_ptr->m_read_states + thr_no;
3815 Uint32 read_pos = r->m_read_pos;
3816 if (read_pos > 0)
3817 {
3818 jbs[num_jbs].m_jb = q->m_buffers[r->m_read_index];
3819 jbs[num_jbs].m_pos = 0;
3820 jbs[num_jbs].m_max = read_pos;
3821 num_jbs++;
3822 }
3823 }
3824 /* Load any active prio A buffer. */
3825 const thr_jb_read_state *r = &thr_ptr->m_jba_read_state;
3826 Uint32 read_pos = r->m_read_pos;
3827 if (read_pos > 0)
3828 {
3829 jbs[num_jbs].m_jb = thr_ptr->m_jba.m_buffers[r->m_read_index];
3830 jbs[num_jbs].m_pos = 0;
3831 jbs[num_jbs].m_max = read_pos;
3832 num_jbs++;
3833 }
3834
3835 /* Now pick out one signal at a time, in signal id order. */
3836 while (num_jbs > 0)
3837 {
3838 if (watchDogCounter)
3839 *watchDogCounter = 4;
3840
3841 /* Search out the smallest signal id remaining. */
3842 Uint32 idx_min = 0;
3843 const Uint32 *p = jbs[idx_min].m_jb->m_data + jbs[idx_min].m_pos;
3844 const SignalHeader *s_min = reinterpret_cast<const SignalHeader*>(p);
3845 Uint32 sid_min = s_min->theSignalId;
3846
3847 for (Uint32 i = 1; i < num_jbs; i++)
3848 {
3849 p = jbs[i].m_jb->m_data + jbs[i].m_pos;
3850 const SignalHeader *s = reinterpret_cast<const SignalHeader*>(p);
3851 Uint32 sid = s->theSignalId;
3852 if (wrap_compare(sid, sid_min) < 0)
3853 {
3854 idx_min = i;
3855 s_min = s;
3856 sid_min = sid;
3857 }
3858 }
3859
3860 /* We found the next signal, now put it in the ordered cyclic buffer. */
3861 signalSequence[seq_end].ptr = s_min;
3862 signalSequence[seq_end].prioa = jbs[idx_min].m_jb->m_prioa;
3863 Uint32 siglen =
3864 (sizeof(SignalHeader)>>2) + s_min->m_noOfSections + s_min->theLength;
3865 #if SIZEOF_CHARP == 8
3866 /* Align to 8-byte boundary, to ensure aligned copies. */
3867 siglen= (siglen+1) & ~((Uint32)1);
3868 #endif
3869 jbs[idx_min].m_pos += siglen;
3870 if (jbs[idx_min].m_pos >= jbs[idx_min].m_max)
3871 {
3872 /* We are done with this job buffer. */
3873 num_jbs--;
3874 jbs[idx_min] = jbs[num_jbs];
3875 }
3876 seq_end = (seq_end + 1) % MAX_SIGNALS_TO_DUMP;
3877 /* Drop old signals if too many available in history. */
3878 if (seq_end == seq_start)
3879 seq_start = (seq_start + 1) % MAX_SIGNALS_TO_DUMP;
3880 }
3881
3882 /* Now, having build the correct signal sequence, we can dump them all. */
3883 fprintf(out, "\n");
3884 bool first_one = true;
3885 bool out_of_signals = false;
3886 Uint32 lastSignalId = 0;
3887 while (seq_end != seq_start)
3888 {
3889 if (watchDogCounter)
3890 *watchDogCounter = 4;
3891
3892 if (seq_end == 0)
3893 seq_end = MAX_SIGNALS_TO_DUMP;
3894 seq_end--;
3895 SignalT<25> signal;
3896 const SignalHeader *s = signalSequence[seq_end].ptr;
3897 unsigned siglen = (sizeof(*s)>>2) + s->theLength;
3898 if (siglen > 25)
3899 siglen = 25; // Sanity check
3900 memcpy(&signal.header, s, 4*siglen);
3901 // instance number in trace file is confusing if not MT LQH
3902 if (num_lqh_workers == 0)
3903 signal.header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
3904
3905 const Uint32 *posptr = reinterpret_cast<const Uint32 *>(s);
3906 signal.m_sectionPtrI[0] = posptr[siglen + 0];
3907 signal.m_sectionPtrI[1] = posptr[siglen + 1];
3908 signal.m_sectionPtrI[2] = posptr[siglen + 2];
3909 bool prioa = signalSequence[seq_end].prioa;
3910
3911 /* Make sure to display clearly when there is a gap in the dump. */
3912 if (!first_one && !out_of_signals && (s->theSignalId + 1) != lastSignalId)
3913 {
3914 out_of_signals = true;
3915 fprintf(out, "\n\n\nNo more prio %s signals, rest of dump will be "
3916 "incomplete.\n\n\n\n", prioa ? "B" : "A");
3917 }
3918 first_one = false;
3919 lastSignalId = s->theSignalId;
3920
3921 fprintf(out, "--------------- Signal ----------------\n");
3922 Uint32 prio = (prioa ? JBA : JBB);
3923 SignalLoggerManager::printSignalHeader(out,
3924 signal.header,
3925 prio,
3926 globalData.ownId,
3927 true);
3928 SignalLoggerManager::printSignalData (out,
3929 signal.header,
3930 &signal.theData[0]);
3931 }
3932 fflush(out);
3933 }
3934
3935 int
traceDumpGetCurrentThread()3936 FastScheduler::traceDumpGetCurrentThread()
3937 {
3938 void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3939 const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
3940
3941 /* The selfptr might be NULL, or pointer to thread that crashed. */
3942 if (selfptr == 0)
3943 {
3944 return -1;
3945 }
3946 else
3947 {
3948 return (int)selfptr->m_thr_no;
3949 }
3950 }
3951
3952 void
mt_section_lock()3953 mt_section_lock()
3954 {
3955 lock(&(g_thr_repository.m_section_lock));
3956 }
3957
3958 void
mt_section_unlock()3959 mt_section_unlock()
3960 {
3961 unlock(&(g_thr_repository.m_section_lock));
3962 }
3963
3964 void
mt_mem_manager_init()3965 mt_mem_manager_init()
3966 {
3967 }
3968
3969 void
mt_mem_manager_lock()3970 mt_mem_manager_lock()
3971 {
3972 lock(&(g_thr_repository.m_mem_manager_lock));
3973 }
3974
3975 void
mt_mem_manager_unlock()3976 mt_mem_manager_unlock()
3977 {
3978 unlock(&(g_thr_repository.m_mem_manager_lock));
3979 }
3980
3981 Vector<mt_lock_stat> g_locks;
3982 template class Vector<mt_lock_stat>;
3983
3984 static
3985 void
register_lock(const void * ptr,const char * name)3986 register_lock(const void * ptr, const char * name)
3987 {
3988 if (name == 0)
3989 return;
3990
3991 mt_lock_stat* arr = g_locks.getBase();
3992 for (size_t i = 0; i<g_locks.size(); i++)
3993 {
3994 if (arr[i].m_ptr == ptr)
3995 {
3996 if (arr[i].m_name)
3997 {
3998 free(arr[i].m_name);
3999 }
4000 arr[i].m_name = strdup(name);
4001 return;
4002 }
4003 }
4004
4005 mt_lock_stat ln;
4006 ln.m_ptr = ptr;
4007 ln.m_name = strdup(name);
4008 ln.m_contended_count = 0;
4009 ln.m_spin_count = 0;
4010 g_locks.push_back(ln);
4011 }
4012
4013 static
4014 mt_lock_stat *
lookup_lock(const void * ptr)4015 lookup_lock(const void * ptr)
4016 {
4017 mt_lock_stat* arr = g_locks.getBase();
4018 for (size_t i = 0; i<g_locks.size(); i++)
4019 {
4020 if (arr[i].m_ptr == ptr)
4021 return arr + i;
4022 }
4023
4024 return 0;
4025 }
4026
4027 Uint32
mt_get_thread_references_for_blocks(const Uint32 blocks[],Uint32 threadId,Uint32 dst[],Uint32 len)4028 mt_get_thread_references_for_blocks(const Uint32 blocks[], Uint32 threadId,
4029 Uint32 dst[], Uint32 len)
4030 {
4031 Uint32 cnt = 0;
4032 Bitmask<(MAX_THREADS+31)/32> mask;
4033 mask.set(threadId);
4034 for (Uint32 i = 0; blocks[i] != 0; i++)
4035 {
4036 Uint32 block = blocks[i];
4037 /**
4038 * Find each thread that has instance of block
4039 */
4040 assert(block == blockToMain(block));
4041 Uint32 index = block - MIN_BLOCK_NO;
4042 for (Uint32 instance = 0; instance < MAX_BLOCK_INSTANCES; instance++)
4043 {
4044 Uint32 thr_no = thr_map[index][instance].thr_no;
4045 if (thr_no == thr_map_entry::NULL_THR_NO)
4046 break;
4047
4048 if (mask.get(thr_no))
4049 continue;
4050
4051 mask.set(thr_no);
4052 require(cnt < len);
4053 dst[cnt++] = numberToRef(block, instance, 0);
4054 }
4055 }
4056 return cnt;
4057 }
4058
4059 void
mt_wakeup(class SimulatedBlock * block)4060 mt_wakeup(class SimulatedBlock* block)
4061 {
4062 Uint32 thr_no = block->getThreadId();
4063 thr_data *thrptr = g_thr_repository.m_thread + thr_no;
4064 wakeup(&thrptr->m_waiter);
4065 }
4066
4067 #ifdef VM_TRACE
4068 void
mt_assert_own_thread(SimulatedBlock * block)4069 mt_assert_own_thread(SimulatedBlock* block)
4070 {
4071 Uint32 thr_no = block->getThreadId();
4072 thr_data *thrptr = g_thr_repository.m_thread + thr_no;
4073
4074 if (unlikely(pthread_equal(thrptr->m_thr_id, pthread_self()) == 0))
4075 {
4076 fprintf(stderr, "mt_assert_own_thread() - assertion-failure\n");
4077 fflush(stderr);
4078 abort();
4079 }
4080 }
4081 #endif
4082
4083 /**
4084 * Global data
4085 */
4086 struct thr_repository g_thr_repository;
4087
4088 struct trp_callback g_trp_callback;
4089
4090 TransporterRegistry globalTransporterRegistry(&g_trp_callback, false);
4091