1 /*
2 Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "mt_thr_config.hpp"
26 #include <kernel/ndb_limits.h>
27 #include "../../common/util/parse_mask.hpp"
28
29 #ifndef TEST_MT_THR_CONFIG
30 #define SUPPORT_CPU_SET 0
31 #else
32 #define SUPPORT_CPU_SET 1
33 #endif
34
35 static const struct THRConfig::Entries m_entries[] =
36 {
37 // name type min max
38 { "main", THRConfig::T_MAIN, 1, 1 },
39 { "ldm", THRConfig::T_LDM, 1, MAX_NDBMT_LQH_THREADS },
40 { "recv", THRConfig::T_RECV, 1, 1 },
41 { "rep", THRConfig::T_REP, 1, 1 },
42 { "io", THRConfig::T_IO, 1, 1 }
43 };
44
45 static const struct THRConfig::Param m_params[] =
46 {
47 { "count", THRConfig::Param::S_UNSIGNED },
48 { "cpubind", THRConfig::Param::S_BITMASK },
49 { "cpuset", THRConfig::Param::S_BITMASK }
50 };
51
52 #define IX_COUNT 0
53 #define IX_CPUBOUND 1
54 #define IX_CPUSET 2
55
56 static
57 unsigned
getMaxEntries(Uint32 type)58 getMaxEntries(Uint32 type)
59 {
60 for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_entries); i++)
61 {
62 if (m_entries[i].m_type == type)
63 return m_entries[i].m_max_cnt;
64 }
65 return 0;
66 }
67
68 static
69 const char *
getEntryName(Uint32 type)70 getEntryName(Uint32 type)
71 {
72 for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_entries); i++)
73 {
74 if (m_entries[i].m_type == type)
75 return m_entries[i].m_name;
76 }
77 return 0;
78 }
79
80 static
81 Uint32
getEntryType(const char * type)82 getEntryType(const char * type)
83 {
84 for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_entries); i++)
85 {
86 if (strcasecmp(type, m_entries[i].m_name) == 0)
87 return i;
88 }
89
90 return THRConfig::T_END;
91 }
92
THRConfig()93 THRConfig::THRConfig()
94 {
95 m_classic = false;
96 }
97
~THRConfig()98 THRConfig::~THRConfig()
99 {
100 }
101
102 int
setLockExecuteThreadToCPU(const char * mask)103 THRConfig::setLockExecuteThreadToCPU(const char * mask)
104 {
105 int res = parse_mask(mask, m_LockExecuteThreadToCPU);
106 if (res < 0)
107 {
108 m_err_msg.assfmt("failed to parse 'LockExecuteThreadToCPU=%s' "
109 "(error: %d)",
110 mask, res);
111 return -1;
112 }
113 return 0;
114 }
115
116 int
setLockIoThreadsToCPU(unsigned val)117 THRConfig::setLockIoThreadsToCPU(unsigned val)
118 {
119 m_LockIoThreadsToCPU.set(val);
120 return 0;
121 }
122
123 void
add(T_Type t)124 THRConfig::add(T_Type t)
125 {
126 T_Thread tmp;
127 tmp.m_type = t;
128 tmp.m_bind_type = T_Thread::B_UNBOUND;
129 tmp.m_no = m_threads[t].size();
130 m_threads[t].push_back(tmp);
131 }
132
133 int
do_parse(unsigned MaxNoOfExecutionThreads,unsigned __ndbmt_lqh_threads,unsigned __ndbmt_classic)134 THRConfig::do_parse(unsigned MaxNoOfExecutionThreads,
135 unsigned __ndbmt_lqh_threads,
136 unsigned __ndbmt_classic)
137 {
138 /**
139 * This is old ndbd.cpp : get_multithreaded_config
140 */
141 if (__ndbmt_classic)
142 {
143 m_classic = true;
144 add(T_LDM);
145 add(T_MAIN);
146 add(T_IO);
147 return do_bindings();
148 }
149
150 Uint32 lqhthreads = 0;
151 switch(MaxNoOfExecutionThreads){
152 case 0:
153 case 1:
154 case 2:
155 case 3:
156 lqhthreads = 1; // TC + receiver + SUMA + LQH
157 break;
158 case 4:
159 case 5:
160 case 6:
161 lqhthreads = 2; // TC + receiver + SUMA + 2 * LQH
162 break;
163 default:
164 lqhthreads = 4; // TC + receiver + SUMA + 4 * LQH
165 }
166
167 if (__ndbmt_lqh_threads)
168 {
169 lqhthreads = __ndbmt_lqh_threads;
170 }
171
172 add(T_MAIN);
173 add(T_REP);
174 add(T_RECV);
175 add(T_IO);
176 for(Uint32 i = 0; i < lqhthreads; i++)
177 {
178 add(T_LDM);
179 }
180
181 return do_bindings() || do_validate();
182 }
183
184 int
do_bindings()185 THRConfig::do_bindings()
186 {
187 if (m_LockIoThreadsToCPU.count() == 1)
188 {
189 m_threads[T_IO][0].m_bind_type = T_Thread::B_CPU_BOUND;
190 m_threads[T_IO][0].m_bind_no = m_LockIoThreadsToCPU.getBitNo(0);
191 }
192 else if (m_LockIoThreadsToCPU.count() > 1)
193 {
194 unsigned no = createCpuSet(m_LockIoThreadsToCPU);
195 m_threads[T_IO][0].m_bind_type = T_Thread::B_CPUSET_BOUND;
196 m_threads[T_IO][0].m_bind_no = no;
197 }
198
199 /**
200 * Check that no cpu_sets overlap
201 */
202 for (unsigned i = 0; i<m_cpu_sets.size(); i++)
203 {
204 for (unsigned j = i + 1; j < m_cpu_sets.size(); j++)
205 {
206 if (m_cpu_sets[i].overlaps(m_cpu_sets[j]))
207 {
208 m_err_msg.assfmt("Overlapping cpuset's [ %s ] and [ %s ]",
209 m_cpu_sets[i].str().c_str(),
210 m_cpu_sets[j].str().c_str());
211 return -1;
212 }
213 }
214 }
215
216 /**
217 * Check that no cpu_sets overlap
218 */
219 for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
220 {
221 for (unsigned j = 0; j < m_threads[i].size(); j++)
222 {
223 if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
224 {
225 unsigned cpu = m_threads[i][j].m_bind_no;
226 for (unsigned k = 0; k<m_cpu_sets.size(); k++)
227 {
228 if (m_cpu_sets[k].get(cpu))
229 {
230 m_err_msg.assfmt("Overlapping cpubind %u with cpuset [ %s ]",
231 cpu,
232 m_cpu_sets[k].str().c_str());
233
234 return -1;
235 }
236 }
237 }
238 }
239 }
240
241 /**
242 * Remove all already bound threads from LockExecuteThreadToCPU-mask
243 */
244 for (unsigned i = 0; i<m_cpu_sets.size(); i++)
245 {
246 for (unsigned j = 0; j < m_cpu_sets[i].count(); j++)
247 {
248 m_LockExecuteThreadToCPU.clear(m_cpu_sets[i].getBitNo(j));
249 }
250 }
251
252 unsigned cnt_unbound = 0;
253 for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
254 {
255 for (unsigned j = 0; j < m_threads[i].size(); j++)
256 {
257 if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
258 {
259 unsigned cpu = m_threads[i][j].m_bind_no;
260 m_LockExecuteThreadToCPU.clear(cpu);
261 }
262 else if (m_threads[i][j].m_bind_type == T_Thread::B_UNBOUND)
263 {
264 cnt_unbound ++;
265 }
266 }
267 }
268
269 if (m_threads[T_IO][0].m_bind_type == T_Thread::B_UNBOUND)
270 {
271 /**
272 * don't count this one...
273 */
274 cnt_unbound--;
275 }
276
277 if (m_LockExecuteThreadToCPU.count())
278 {
279 /**
280 * This is old mt.cpp : setcpuaffinity
281 */
282 SparseBitmask& mask = m_LockExecuteThreadToCPU;
283 unsigned cnt = mask.count();
284 unsigned num_threads = cnt_unbound;
285 bool isMtLqh = !m_classic;
286
287 if (cnt < num_threads)
288 {
289 m_info_msg.assfmt("WARNING: Too few CPU's specified with "
290 "LockExecuteThreadToCPU. Only %u specified "
291 " but %u was needed, this may cause contention.\n",
292 cnt, num_threads);
293 }
294
295 if (cnt >= num_threads)
296 {
297 m_info_msg.appfmt("Assigning each thread its own CPU\n");
298 unsigned no = 0;
299 for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
300 {
301 if (i == T_IO)
302 continue;
303 for (unsigned j = 0; j < m_threads[i].size(); j++)
304 {
305 if (m_threads[i][j].m_bind_type == T_Thread::B_UNBOUND)
306 {
307 m_threads[i][j].m_bind_type = T_Thread::B_CPU_BOUND;
308 m_threads[i][j].m_bind_no = mask.getBitNo(no);
309 no++;
310 }
311 }
312 }
313 }
314 else if (cnt == 1)
315 {
316 unsigned cpu = mask.getBitNo(0);
317 m_info_msg.appfmt("Assigning all threads to CPU %u\n", cpu);
318 for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
319 {
320 if (i == T_IO)
321 continue;
322 bind_unbound(m_threads[i], cpu);
323 }
324 }
325 else if (isMtLqh)
326 {
327 unsigned unbound_ldm = count_unbound(m_threads[T_LDM]);
328 if (cnt > unbound_ldm)
329 {
330 /**
331 * let each LQH have it's own CPU and rest share...
332 */
333 m_info_msg.append("Assigning LQH threads to dedicated CPU(s) and "
334 "other threads will share remaining\n");
335 unsigned cpu = mask.find(0);
336 for (unsigned i = 0; i < m_threads[T_LDM].size(); i++)
337 {
338 if (m_threads[T_LDM][i].m_bind_type == T_Thread::B_UNBOUND)
339 {
340 m_threads[T_LDM][i].m_bind_type = T_Thread::B_CPU_BOUND;
341 m_threads[T_LDM][i].m_bind_no = cpu;
342 mask.clear(cpu);
343 cpu = mask.find(cpu + 1);
344 }
345 }
346
347 cpu = mask.find(0);
348 bind_unbound(m_threads[T_MAIN], cpu);
349 bind_unbound(m_threads[T_REP], cpu);
350 if ((cpu = mask.find(cpu + 1)) == mask.NotFound)
351 {
352 cpu = mask.find(0);
353 }
354 bind_unbound(m_threads[T_RECV], cpu);
355 }
356 else
357 {
358 // put receiver, tc, backup/suma in 1 thread,
359 // and round robin LQH for rest
360 unsigned cpu = mask.find(0);
361 m_info_msg.appfmt("Assigning LQH threads round robin to CPU(s) and "
362 "other threads will share CPU %u\n", cpu);
363 bind_unbound(m_threads[T_MAIN], cpu); // TC
364 bind_unbound(m_threads[T_REP], cpu);
365 bind_unbound(m_threads[T_RECV], cpu);
366 mask.clear(cpu);
367
368 cpu = mask.find(0);
369 for (unsigned i = 0; i < m_threads[T_LDM].size(); i++)
370 {
371 if (m_threads[T_LDM][i].m_bind_type == T_Thread::B_UNBOUND)
372 {
373 m_threads[T_LDM][i].m_bind_type = T_Thread::B_CPU_BOUND;
374 m_threads[T_LDM][i].m_bind_no = cpu;
375 if ((cpu = mask.find(cpu + 1)) == mask.NotFound)
376 {
377 cpu = mask.find(0);
378 }
379 }
380 }
381 }
382 }
383 else
384 {
385 unsigned cpu = mask.find(0);
386 m_info_msg.appfmt("Assigning LQH thread to CPU %u and "
387 "other threads will share\n", cpu);
388 bind_unbound(m_threads[T_LDM], cpu);
389 cpu = mask.find(cpu + 1);
390 bind_unbound(m_threads[T_MAIN], cpu);
391 bind_unbound(m_threads[T_RECV], cpu);
392 }
393 }
394
395 return 0;
396 }
397
398 unsigned
count_unbound(const Vector<T_Thread> & vec) const399 THRConfig::count_unbound(const Vector<T_Thread>& vec) const
400 {
401 unsigned cnt = 0;
402 for (unsigned i = 0; i < vec.size(); i++)
403 {
404 if (vec[i].m_bind_type == T_Thread::B_UNBOUND)
405 cnt ++;
406 }
407 return cnt;
408 }
409
410 void
bind_unbound(Vector<T_Thread> & vec,unsigned cpu)411 THRConfig::bind_unbound(Vector<T_Thread>& vec, unsigned cpu)
412 {
413 for (unsigned i = 0; i < vec.size(); i++)
414 {
415 if (vec[i].m_bind_type == T_Thread::B_UNBOUND)
416 {
417 vec[i].m_bind_type = T_Thread::B_CPU_BOUND;
418 vec[i].m_bind_no = cpu;
419 }
420 }
421 }
422
423 int
do_validate()424 THRConfig::do_validate()
425 {
426 /**
427 * Check that there aren't too many of any thread type
428 */
429 for (unsigned i = 0; i< NDB_ARRAY_SIZE(m_threads); i++)
430 {
431 if (m_threads[i].size() > getMaxEntries(i))
432 {
433 m_err_msg.assfmt("Too many instances(%u) of %s max supported: %u",
434 m_threads[i].size(),
435 getEntryName(i),
436 getMaxEntries(i));
437 return -1;
438 }
439 }
440
441 /**
442 * LDM can be 1 2 4
443 */
444 if (m_threads[T_LDM].size() == 3)
445 {
446 m_err_msg.assfmt("No of LDM-instances can be 1,2,4. Specified: %u",
447 m_threads[T_LDM].size());
448 return -1;
449 }
450
451 return 0;
452 }
453
454 const char *
getConfigString()455 THRConfig::getConfigString()
456 {
457 m_cfg_string.clear();
458 const char * sep = "";
459 for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
460 {
461 if (m_threads[i].size())
462 {
463 const char * name = getEntryName(i);
464 if (i != T_IO)
465 {
466 for (unsigned j = 0; j < m_threads[i].size(); j++)
467 {
468 m_cfg_string.append(sep);
469 sep=",";
470 m_cfg_string.append(name);
471 if (m_threads[i][j].m_bind_type != T_Thread::B_UNBOUND)
472 {
473 m_cfg_string.append("={");
474 if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
475 {
476 m_cfg_string.appfmt("cpubind=%u", m_threads[i][j].m_bind_no);
477 }
478 else if (m_threads[i][j].m_bind_type == T_Thread::B_CPUSET_BOUND)
479 {
480 m_cfg_string.appfmt("cpuset=%s",
481 m_cpu_sets[m_threads[i][j].m_bind_no].str().c_str());
482 }
483 m_cfg_string.append("}");
484 }
485 }
486 }
487 else
488 {
489 for (unsigned j = 0; j < m_threads[i].size(); j++)
490 {
491 if (m_threads[i][j].m_bind_type != T_Thread::B_UNBOUND)
492 {
493 m_cfg_string.append(sep);
494 sep=",";
495 m_cfg_string.append(name);
496 m_cfg_string.append("={");
497 if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
498 {
499 m_cfg_string.appfmt("cpubind=%u", m_threads[i][j].m_bind_no);
500 }
501 else if (m_threads[i][j].m_bind_type == T_Thread::B_CPUSET_BOUND)
502 {
503 m_cfg_string.appfmt("cpuset=%s",
504 m_cpu_sets[m_threads[i][j].m_bind_no].str().c_str());
505 }
506 m_cfg_string.append("}");
507 }
508 }
509 }
510 }
511 }
512 return m_cfg_string.c_str();
513 }
514
515 Uint32
getThreadCount() const516 THRConfig::getThreadCount() const
517 {
518 // Note! not counting T_IO
519 Uint32 cnt = 0;
520 for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
521 {
522 if (i != T_IO)
523 {
524 cnt += m_threads[i].size();
525 }
526 }
527 return cnt;
528 }
529
530 Uint32
getThreadCount(T_Type type) const531 THRConfig::getThreadCount(T_Type type) const
532 {
533 for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
534 {
535 if (i == (Uint32)type)
536 {
537 return m_threads[i].size();
538 }
539 }
540 return 0;
541 }
542
543 const char *
getErrorMessage() const544 THRConfig::getErrorMessage() const
545 {
546 if (m_err_msg.empty())
547 return 0;
548 return m_err_msg.c_str();
549 }
550
551 const char *
getInfoMessage() const552 THRConfig::getInfoMessage() const
553 {
554 if (m_info_msg.empty())
555 return 0;
556 return m_info_msg.c_str();
557 }
558
559 static
560 char *
skipblank(char * str)561 skipblank(char * str)
562 {
563 while (isspace(* str))
564 str++;
565 return str;
566 }
567
568 Uint32
find_type(char * & str)569 THRConfig::find_type(char *& str)
570 {
571 str = skipblank(str);
572
573 char * name = str;
574 if (* name == 0)
575 {
576 m_err_msg.assfmt("empty thread specification");
577 return 0;
578 }
579 char * end = name;
580 while(isalpha(* end))
581 end++;
582
583 char save = * end;
584 * end = 0;
585 Uint32 t = getEntryType(name);
586 if (t == T_END)
587 {
588 m_err_msg.assfmt("unknown thread type '%s'", name);
589 }
590 * end = save;
591 str = end;
592 return t;
593 }
594
595 struct ParamValue
596 {
ParamValueParamValue597 ParamValue() { found = false;}
598 bool found;
599 const char * string_val;
600 unsigned unsigned_val;
601 SparseBitmask mask_val;
602 };
603
604 static
605 int
parseUnsigned(char * & str,unsigned * dst)606 parseUnsigned(char *& str, unsigned * dst)
607 {
608 str = skipblank(str);
609 char * endptr = 0;
610 errno = 0;
611 long val = strtol(str, &endptr, 0);
612 if (errno == ERANGE)
613 return -1;
614 if (val < 0 || Int64(val) > 0xFFFFFFFF)
615 return -1;
616 if (endptr == str)
617 return -1;
618 str = endptr;
619 *dst = (unsigned)val;
620 return 0;
621 }
622
623 static
624 int
parseBitmask(char * & str,SparseBitmask * mask)625 parseBitmask(char *& str, SparseBitmask * mask)
626 {
627 str = skipblank(str);
628 size_t len = strspn(str, "0123456789-, ");
629 if (len == 0)
630 return -1;
631
632 while (isspace(str[len-1]))
633 len--;
634 if (str[len-1] == ',')
635 len--;
636 char save = str[len];
637 str[len] = 0;
638 int res = parse_mask(str, *mask);
639 str[len] = save;
640 str = str + len;
641 return res;
642 }
643
644 static
645 int
parseParams(char * str,ParamValue values[],BaseString & err)646 parseParams(char * str, ParamValue values[], BaseString& err)
647 {
648 const char * const save = str;
649 while (* str)
650 {
651 str = skipblank(str);
652
653 unsigned idx = 0;
654 for (; idx < NDB_ARRAY_SIZE(m_params); idx++)
655 {
656
657 #if ! SUPPORT_CPU_SET
658 if (idx == IX_CPUSET)
659 continue;
660 #endif
661
662 if (strncasecmp(str, m_params[idx].name, strlen(m_params[idx].name)) == 0)
663 {
664 str += strlen(m_params[idx].name);
665 break;
666 }
667 }
668
669 if (idx == NDB_ARRAY_SIZE(m_params))
670 {
671 err.assfmt("Unknown param near: '%s'", str);
672 return -1;
673 }
674
675 if (values[idx].found == true)
676 {
677 err.assfmt("Param '%s' found twice", m_params[idx].name);
678 return -1;
679 }
680
681 str = skipblank(str);
682 if (* str != '=')
683 {
684 err.assfmt("Missing '=' after %s in '%s'", m_params[idx].name, save);
685 return -1;
686 }
687 str++;
688 str = skipblank(str);
689
690 int res = 0;
691 switch(m_params[idx].type){
692 case THRConfig::Param::S_UNSIGNED:
693 res = parseUnsigned(str, &values[idx].unsigned_val);
694 break;
695 case THRConfig::Param::S_BITMASK:
696 res = parseBitmask(str, &values[idx].mask_val);
697 break;
698 default:
699 err.assfmt("Internal error, unknown type for param: '%s'",
700 m_params[idx].name);
701 return -1;
702 }
703 if (res == -1)
704 {
705 err.assfmt("Unable to parse %s=%s", m_params[idx].name, str);
706 return -1;
707 }
708 values[idx].found = true;
709 str = skipblank(str);
710
711 if (* str == 0)
712 break;
713
714 if (* str != ',')
715 {
716 err.assfmt("Unable to parse near '%s'", str);
717 return -1;
718 }
719 str++;
720 }
721 return 0;
722 }
723
724 int
find_spec(char * & str,T_Type type)725 THRConfig::find_spec(char *& str, T_Type type)
726 {
727 str = skipblank(str);
728
729 switch(* str){
730 case ',':
731 case 0:
732 add(type);
733 return 0;
734 }
735
736 if (* str != '=')
737 {
738 err:
739 int len = (int)strlen(str);
740 m_err_msg.assfmt("Invalid format near: '%.*s'",
741 (len > 10) ? 10 : len, str);
742 return -1;
743 }
744
745 str++; // skip over =
746 str = skipblank(str);
747
748 if (* str != '{')
749 {
750 goto err;
751 }
752
753 str++;
754 char * start = str;
755
756 /**
757 * Find end
758 */
759 while (* str && (* str) != '}')
760 str++;
761
762 if (* str != '}')
763 {
764 goto err;
765 }
766
767 char * end = str;
768 char save = * end;
769 * end = 0;
770
771 ParamValue values[NDB_ARRAY_SIZE(m_params)];
772 values[IX_COUNT].unsigned_val = 1;
773 int res = parseParams(start, values, m_err_msg);
774 * end = save;
775
776 if (res != 0)
777 {
778 return -1;
779 }
780
781 if (values[IX_CPUBOUND].found && values[IX_CPUSET].found)
782 {
783 m_err_msg.assfmt("Both cpuset and cpubind specified!");
784 return -1;
785 }
786
787 unsigned cnt = values[IX_COUNT].unsigned_val;
788 const int index = m_threads[type].size();
789 for (unsigned i = 0; i < cnt; i++)
790 {
791 add(type);
792 }
793
794 assert(m_threads[type].size() == index + cnt);
795 if (values[IX_CPUSET].found)
796 {
797 SparseBitmask & mask = values[IX_CPUSET].mask_val;
798 unsigned no = createCpuSet(mask);
799 for (unsigned i = 0; i < cnt; i++)
800 {
801 m_threads[type][index+i].m_bind_type = T_Thread::B_CPUSET_BOUND;
802 m_threads[type][index+i].m_bind_no = no;
803 }
804 }
805 else if (values[IX_CPUBOUND].found)
806 {
807 SparseBitmask & mask = values[IX_CPUBOUND].mask_val;
808 if (mask.count() < cnt)
809 {
810 m_err_msg.assfmt("%s: trying to bind %u threads to %u cpus [%s]",
811 getEntryName(type),
812 cnt,
813 mask.count(),
814 mask.str().c_str());
815 return -1;
816 }
817 for (unsigned i = 0; i < cnt; i++)
818 {
819 m_threads[type][index+i].m_bind_type = T_Thread::B_CPU_BOUND;
820 m_threads[type][index+i].m_bind_no = mask.getBitNo(i % mask.count());
821 }
822 }
823
824 str++; // skip over }
825 return 0;
826 }
827
828 int
find_next(char * & str)829 THRConfig::find_next(char *& str)
830 {
831 str = skipblank(str);
832
833 if (* str == 0)
834 {
835 return 0;
836 }
837 else if (* str == ',')
838 {
839 str++;
840 return 1;
841 }
842
843 int len = (int)strlen(str);
844 m_err_msg.assfmt("Invalid format near: '%.*s'",
845 (len > 10) ? 10 : len, str);
846 return -1;
847 }
848
849 int
do_parse(const char * ThreadConfig)850 THRConfig::do_parse(const char * ThreadConfig)
851 {
852 BaseString str(ThreadConfig);
853 char * ptr = (char*)str.c_str();
854 while (* ptr)
855 {
856 Uint32 type = find_type(ptr);
857 if (type == T_END)
858 return -1;
859
860 if (find_spec(ptr, (T_Type)type) < 0)
861 return -1;
862
863 int ret = find_next(ptr);
864 if (ret < 0)
865 return ret;
866
867 if (ret == 0)
868 break;
869 }
870
871 for (Uint32 i = 0; i < T_END; i++)
872 {
873 while (m_threads[i].size() < m_entries[i].m_min_cnt)
874 add((T_Type)i);
875 }
876
877 return do_bindings() || do_validate();
878 }
879
880 unsigned
createCpuSet(const SparseBitmask & mask)881 THRConfig::createCpuSet(const SparseBitmask& mask)
882 {
883 for (unsigned i = 0; i < m_cpu_sets.size(); i++)
884 if (m_cpu_sets[i].equal(mask))
885 return i;
886
887 m_cpu_sets.push_back(mask);
888 return m_cpu_sets.size() - 1;
889 }
890
891 template class Vector<SparseBitmask>;
892 template class Vector<THRConfig::T_Thread>;
893
894 #ifndef TEST_MT_THR_CONFIG
895 #include <BlockNumbers.h>
896 #include <NdbThread.h>
897
898 static
899 int
findBlock(Uint32 blockNo,const unsigned short list[],unsigned cnt)900 findBlock(Uint32 blockNo, const unsigned short list[], unsigned cnt)
901 {
902 for (Uint32 i = 0; i < cnt; i++)
903 {
904 if (blockToMain(list[i]) == blockNo)
905 return blockToInstance(list[i]);
906 }
907 return -1;
908 }
909
910 const THRConfig::T_Thread*
find_thread(const unsigned short instancelist[],unsigned cnt) const911 THRConfigApplier::find_thread(const unsigned short instancelist[], unsigned cnt) const
912 {
913 int instanceNo;
914 if ((instanceNo = findBlock(SUMA, instancelist, cnt)) >= 0)
915 {
916 return &m_threads[T_REP][instanceNo];
917 }
918 else if ((instanceNo = findBlock(CMVMI, instancelist, cnt)) >= 0)
919 {
920 return &m_threads[T_RECV][instanceNo];
921 }
922 else if ((instanceNo = findBlock(DBDIH, instancelist, cnt)) >= 0)
923 {
924 return &m_threads[T_MAIN][instanceNo];
925 }
926 else if ((instanceNo = findBlock(DBLQH, instancelist, cnt)) >= 0)
927 {
928 return &m_threads[T_LDM][instanceNo - 1]; // remove proxy...
929 }
930 return 0;
931 }
932
933 void
appendInfo(BaseString & str,const unsigned short list[],unsigned cnt) const934 THRConfigApplier::appendInfo(BaseString& str,
935 const unsigned short list[], unsigned cnt) const
936 {
937 const T_Thread* thr = find_thread(list, cnt);
938 assert(thr != 0);
939 str.appfmt("(%s) ", getEntryName(thr->m_type));
940 if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
941 {
942 str.appfmt("cpu: %u ", thr->m_bind_no);
943 }
944 else if (thr->m_bind_type == T_Thread::B_CPUSET_BOUND)
945 {
946 str.appfmt("cpuset: [ %s ] ", m_cpu_sets[thr->m_bind_no].str().c_str());
947 }
948 }
949
950 int
create_cpusets()951 THRConfigApplier::create_cpusets()
952 {
953 return 0;
954 }
955
956 int
do_bind(NdbThread * thread,const unsigned short list[],unsigned cnt)957 THRConfigApplier::do_bind(NdbThread* thread,
958 const unsigned short list[], unsigned cnt)
959 {
960 const T_Thread* thr = find_thread(list, cnt);
961 if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
962 {
963 int res = NdbThread_LockCPU(thread, thr->m_bind_no);
964 if (res == 0)
965 return 1;
966 else
967 return -res;
968 }
969 #if TODO
970 else if (thr->m_bind_type == T_Thread::B_CPUSET_BOUND)
971 {
972 }
973 #endif
974
975 return 0;
976 }
977
978 int
do_bind_io(NdbThread * thread)979 THRConfigApplier::do_bind_io(NdbThread* thread)
980 {
981 const T_Thread* thr = &m_threads[T_IO][0];
982 if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
983 {
984 int res = NdbThread_LockCPU(thread, thr->m_bind_no);
985 if (res == 0)
986 return 1;
987 else
988 return -res;
989 }
990 #if TODO
991 else if (thr->m_bind_type == T_Thread::B_CPUSET_BOUND)
992 {
993 }
994 #endif
995
996 return 0;
997 }
998 #endif
999
1000 #ifdef TEST_MT_THR_CONFIG
1001
1002 #include <NdbTap.hpp>
1003
TAPTEST(mt_thr_config)1004 TAPTEST(mt_thr_config)
1005 {
1006 {
1007 THRConfig tmp;
1008 OK(tmp.do_parse(8, 0, 0) == 0);
1009 }
1010
1011 /**
1012 * BASIC test
1013 */
1014 {
1015 const char * ok[] =
1016 {
1017 "ldm,ldm",
1018 "ldm={count=3},ldm",
1019 "ldm={cpubind=1-2,5,count=3},ldm",
1020 "ldm={ cpubind = 1- 2, 5 , count = 3 },ldm",
1021 "ldm={count=3,cpubind=1-2,5 }, ldm",
1022 "ldm={cpuset=1-3,count=3 },ldm",
1023 "main,ldm={},ldm",
1024 0
1025 };
1026
1027 const char * fail [] =
1028 {
1029 "ldm,ldm,ldm",
1030 "ldm={cpubind= 1 , cpuset=2 },ldm",
1031 "ldm={count=4,cpubind=1-3},ldm",
1032 "main,main,ldm,ldm",
1033 "main={ keso=88, count=23},ldm,ldm",
1034 "main={ cpuset=1-3 }, ldm={cpuset=3-4}",
1035 "main={ cpuset=1-3 }, ldm={cpubind=2}",
1036 0
1037 };
1038
1039 for (Uint32 i = 0; ok[i]; i++)
1040 {
1041 THRConfig tmp;
1042 int res = tmp.do_parse(ok[i]);
1043 printf("do_parse(%s) => %s - %s\n", ok[i],
1044 res == 0 ? "OK" : "FAIL",
1045 res == 0 ? "" : tmp.getErrorMessage());
1046 OK(res == 0);
1047 {
1048 BaseString out(tmp.getConfigString());
1049 THRConfig check;
1050 OK(check.do_parse(out.c_str()) == 0);
1051 OK(strcmp(out.c_str(), check.getConfigString()) == 0);
1052 }
1053 }
1054
1055 for (Uint32 i = 0; fail[i]; i++)
1056 {
1057 THRConfig tmp;
1058 int res = tmp.do_parse(fail[i]);
1059 printf("do_parse(%s) => %s - %s\n", fail[i],
1060 res == 0 ? "OK" : "FAIL",
1061 res == 0 ? "" : tmp.getErrorMessage());
1062 OK(res != 0);
1063 }
1064 }
1065
1066 {
1067 /**
1068 * Test interaction with LockExecuteThreadToCPU
1069 */
1070 const char * t[] =
1071 {
1072 /** threads, LockExecuteThreadToCPU, answer */
1073 "1-8",
1074 "ldm={count=4}",
1075 "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7}",
1076
1077 "1-5",
1078 "ldm={count=4}",
1079 "main={cpubind=5},ldm={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},recv={cpubind=5},rep={cpubind=5}",
1080
1081 "1-3",
1082 "ldm={count=4}",
1083 "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=2},ldm={cpubind=3},recv={cpubind=1},rep={cpubind=1}",
1084
1085 "1-4",
1086 "ldm={count=4}",
1087 "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=2},recv={cpubind=1},rep={cpubind=1}",
1088
1089 "1-8",
1090 "ldm={count=4},io={cpubind=8}",
1091 "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7},io={cpubind=8}",
1092
1093 "1-8",
1094 "ldm={count=4,cpubind=1,4,5,6}",
1095 "main={cpubind=2},ldm={cpubind=1},ldm={cpubind=4},ldm={cpubind=5},ldm={cpubind=6},recv={cpubind=3},rep={cpubind=7}",
1096
1097 // END
1098 0
1099 };
1100
1101 for (unsigned i = 0; t[i]; i+= 3)
1102 {
1103 THRConfig tmp;
1104 tmp.setLockExecuteThreadToCPU(t[i+0]);
1105 int res = tmp.do_parse(t[i+1]);
1106 int ok = strcmp(tmp.getConfigString(), t[i+2]) == 0;
1107 printf("mask: %s conf: %s => %s(%s) - %s - %s\n",
1108 t[i+0],
1109 t[i+1],
1110 res == 0 ? "OK" : "FAIL",
1111 res == 0 ? "" : tmp.getErrorMessage(),
1112 tmp.getConfigString(),
1113 ok == 1 ? "CORRECT" : "INCORRECT");
1114 OK(res == 0);
1115 OK(ok == 1);
1116 }
1117 }
1118
1119 return 1;
1120 }
1121
1122 #endif
1123