1 /*
2    Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "mt_thr_config.hpp"
26 #include <kernel/ndb_limits.h>
27 #include "../../common/util/parse_mask.hpp"
28 
29 #ifndef TEST_MT_THR_CONFIG
30 #define SUPPORT_CPU_SET 0
31 #else
32 #define SUPPORT_CPU_SET 1
33 #endif
34 
35 static const struct THRConfig::Entries m_entries[] =
36 {
37   // name    type              min  max
38   { "main",  THRConfig::T_MAIN,  1, 1 },
39   { "ldm",   THRConfig::T_LDM,   1, MAX_NDBMT_LQH_THREADS },
40   { "recv",  THRConfig::T_RECV,  1, 1 },
41   { "rep",   THRConfig::T_REP,   1, 1 },
42   { "io",    THRConfig::T_IO,    1, 1 }
43 };
44 
45 static const struct THRConfig::Param m_params[] =
46 {
47   { "count",   THRConfig::Param::S_UNSIGNED },
48   { "cpubind", THRConfig::Param::S_BITMASK },
49   { "cpuset",  THRConfig::Param::S_BITMASK }
50 };
51 
52 #define IX_COUNT    0
53 #define IX_CPUBOUND 1
54 #define IX_CPUSET   2
55 
56 static
57 unsigned
getMaxEntries(Uint32 type)58 getMaxEntries(Uint32 type)
59 {
60   for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_entries); i++)
61   {
62     if (m_entries[i].m_type == type)
63       return m_entries[i].m_max_cnt;
64   }
65   return 0;
66 }
67 
68 static
69 const char *
getEntryName(Uint32 type)70 getEntryName(Uint32 type)
71 {
72   for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_entries); i++)
73   {
74     if (m_entries[i].m_type == type)
75       return m_entries[i].m_name;
76   }
77   return 0;
78 }
79 
80 static
81 Uint32
getEntryType(const char * type)82 getEntryType(const char * type)
83 {
84   for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_entries); i++)
85   {
86     if (strcasecmp(type, m_entries[i].m_name) == 0)
87       return i;
88   }
89 
90   return THRConfig::T_END;
91 }
92 
THRConfig()93 THRConfig::THRConfig()
94 {
95   m_classic = false;
96 }
97 
~THRConfig()98 THRConfig::~THRConfig()
99 {
100 }
101 
102 int
setLockExecuteThreadToCPU(const char * mask)103 THRConfig::setLockExecuteThreadToCPU(const char * mask)
104 {
105   int res = parse_mask(mask, m_LockExecuteThreadToCPU);
106   if (res < 0)
107   {
108     m_err_msg.assfmt("failed to parse 'LockExecuteThreadToCPU=%s' "
109                      "(error: %d)",
110                      mask, res);
111     return -1;
112   }
113   return 0;
114 }
115 
116 int
setLockIoThreadsToCPU(unsigned val)117 THRConfig::setLockIoThreadsToCPU(unsigned val)
118 {
119   m_LockIoThreadsToCPU.set(val);
120   return 0;
121 }
122 
123 void
add(T_Type t)124 THRConfig::add(T_Type t)
125 {
126   T_Thread tmp;
127   tmp.m_type = t;
128   tmp.m_bind_type = T_Thread::B_UNBOUND;
129   tmp.m_no = m_threads[t].size();
130   m_threads[t].push_back(tmp);
131 }
132 
133 int
do_parse(unsigned MaxNoOfExecutionThreads,unsigned __ndbmt_lqh_threads,unsigned __ndbmt_classic)134 THRConfig::do_parse(unsigned MaxNoOfExecutionThreads,
135                     unsigned __ndbmt_lqh_threads,
136                     unsigned __ndbmt_classic)
137 {
138   /**
139    * This is old ndbd.cpp : get_multithreaded_config
140    */
141   if (__ndbmt_classic)
142   {
143     m_classic = true;
144     add(T_LDM);
145     add(T_MAIN);
146     add(T_IO);
147     return do_bindings();
148   }
149 
150   Uint32 lqhthreads = 0;
151   switch(MaxNoOfExecutionThreads){
152   case 0:
153   case 1:
154   case 2:
155   case 3:
156     lqhthreads = 1; // TC + receiver + SUMA + LQH
157     break;
158   case 4:
159   case 5:
160   case 6:
161     lqhthreads = 2; // TC + receiver + SUMA + 2 * LQH
162     break;
163   default:
164     lqhthreads = 4; // TC + receiver + SUMA + 4 * LQH
165   }
166 
167   if (__ndbmt_lqh_threads)
168   {
169     lqhthreads = __ndbmt_lqh_threads;
170   }
171 
172   add(T_MAIN);
173   add(T_REP);
174   add(T_RECV);
175   add(T_IO);
176   for(Uint32 i = 0; i < lqhthreads; i++)
177   {
178     add(T_LDM);
179   }
180 
181   return do_bindings() || do_validate();
182 }
183 
184 int
do_bindings()185 THRConfig::do_bindings()
186 {
187   if (m_LockIoThreadsToCPU.count() == 1)
188   {
189     m_threads[T_IO][0].m_bind_type = T_Thread::B_CPU_BOUND;
190     m_threads[T_IO][0].m_bind_no = m_LockIoThreadsToCPU.getBitNo(0);
191   }
192   else if (m_LockIoThreadsToCPU.count() > 1)
193   {
194     unsigned no = createCpuSet(m_LockIoThreadsToCPU);
195     m_threads[T_IO][0].m_bind_type = T_Thread::B_CPUSET_BOUND;
196     m_threads[T_IO][0].m_bind_no = no;
197   }
198 
199   /**
200    * Check that no cpu_sets overlap
201    */
202   for (unsigned i = 0; i<m_cpu_sets.size(); i++)
203   {
204     for (unsigned j = i + 1; j < m_cpu_sets.size(); j++)
205     {
206       if (m_cpu_sets[i].overlaps(m_cpu_sets[j]))
207       {
208         m_err_msg.assfmt("Overlapping cpuset's [ %s ] and [ %s ]",
209                          m_cpu_sets[i].str().c_str(),
210                          m_cpu_sets[j].str().c_str());
211         return -1;
212       }
213     }
214   }
215 
216   /**
217    * Check that no cpu_sets overlap
218    */
219   for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
220   {
221     for (unsigned j = 0; j < m_threads[i].size(); j++)
222     {
223       if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
224       {
225         unsigned cpu = m_threads[i][j].m_bind_no;
226         for (unsigned k = 0; k<m_cpu_sets.size(); k++)
227         {
228           if (m_cpu_sets[k].get(cpu))
229           {
230             m_err_msg.assfmt("Overlapping cpubind %u with cpuset [ %s ]",
231                              cpu,
232                              m_cpu_sets[k].str().c_str());
233 
234             return -1;
235           }
236         }
237       }
238     }
239   }
240 
241   /**
242    * Remove all already bound threads from LockExecuteThreadToCPU-mask
243    */
244   for (unsigned i = 0; i<m_cpu_sets.size(); i++)
245   {
246     for (unsigned j = 0; j < m_cpu_sets[i].count(); j++)
247     {
248       m_LockExecuteThreadToCPU.clear(m_cpu_sets[i].getBitNo(j));
249     }
250   }
251 
252   unsigned cnt_unbound = 0;
253   for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
254   {
255     for (unsigned j = 0; j < m_threads[i].size(); j++)
256     {
257       if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
258       {
259         unsigned cpu = m_threads[i][j].m_bind_no;
260         m_LockExecuteThreadToCPU.clear(cpu);
261       }
262       else if (m_threads[i][j].m_bind_type == T_Thread::B_UNBOUND)
263       {
264         cnt_unbound ++;
265       }
266     }
267   }
268 
269   if (m_threads[T_IO][0].m_bind_type == T_Thread::B_UNBOUND)
270   {
271     /**
272      * don't count this one...
273      */
274     cnt_unbound--;
275   }
276 
277   if (m_LockExecuteThreadToCPU.count())
278   {
279     /**
280      * This is old mt.cpp : setcpuaffinity
281      */
282     SparseBitmask& mask = m_LockExecuteThreadToCPU;
283     unsigned cnt = mask.count();
284     unsigned num_threads = cnt_unbound;
285     bool isMtLqh = !m_classic;
286 
287     if (cnt < num_threads)
288     {
289       m_info_msg.assfmt("WARNING: Too few CPU's specified with "
290                         "LockExecuteThreadToCPU. Only %u specified "
291                         " but %u was needed, this may cause contention.\n",
292                         cnt, num_threads);
293     }
294 
295     if (cnt >= num_threads)
296     {
297       m_info_msg.appfmt("Assigning each thread its own CPU\n");
298       unsigned no = 0;
299       for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
300       {
301         if (i == T_IO)
302           continue;
303         for (unsigned j = 0; j < m_threads[i].size(); j++)
304         {
305           if (m_threads[i][j].m_bind_type == T_Thread::B_UNBOUND)
306           {
307             m_threads[i][j].m_bind_type = T_Thread::B_CPU_BOUND;
308             m_threads[i][j].m_bind_no = mask.getBitNo(no);
309             no++;
310           }
311         }
312       }
313     }
314     else if (cnt == 1)
315     {
316       unsigned cpu = mask.getBitNo(0);
317       m_info_msg.appfmt("Assigning all threads to CPU %u\n", cpu);
318       for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
319       {
320         if (i == T_IO)
321           continue;
322         bind_unbound(m_threads[i], cpu);
323       }
324     }
325     else if (isMtLqh)
326     {
327       unsigned unbound_ldm = count_unbound(m_threads[T_LDM]);
328       if (cnt > unbound_ldm)
329       {
330         /**
331          * let each LQH have it's own CPU and rest share...
332          */
333         m_info_msg.append("Assigning LQH threads to dedicated CPU(s) and "
334                           "other threads will share remaining\n");
335         unsigned cpu = mask.find(0);
336         for (unsigned i = 0; i < m_threads[T_LDM].size(); i++)
337         {
338           if (m_threads[T_LDM][i].m_bind_type == T_Thread::B_UNBOUND)
339           {
340             m_threads[T_LDM][i].m_bind_type = T_Thread::B_CPU_BOUND;
341             m_threads[T_LDM][i].m_bind_no = cpu;
342             mask.clear(cpu);
343             cpu = mask.find(cpu + 1);
344           }
345         }
346 
347         cpu = mask.find(0);
348         bind_unbound(m_threads[T_MAIN], cpu);
349         bind_unbound(m_threads[T_REP], cpu);
350         if ((cpu = mask.find(cpu + 1)) == mask.NotFound)
351         {
352           cpu = mask.find(0);
353         }
354         bind_unbound(m_threads[T_RECV], cpu);
355       }
356       else
357       {
358         // put receiver, tc, backup/suma in 1 thread,
359         // and round robin LQH for rest
360         unsigned cpu = mask.find(0);
361         m_info_msg.appfmt("Assigning LQH threads round robin to CPU(s) and "
362                           "other threads will share CPU %u\n", cpu);
363         bind_unbound(m_threads[T_MAIN], cpu); // TC
364         bind_unbound(m_threads[T_REP], cpu);
365         bind_unbound(m_threads[T_RECV], cpu);
366         mask.clear(cpu);
367 
368         cpu = mask.find(0);
369         for (unsigned i = 0; i < m_threads[T_LDM].size(); i++)
370         {
371           if (m_threads[T_LDM][i].m_bind_type == T_Thread::B_UNBOUND)
372           {
373             m_threads[T_LDM][i].m_bind_type = T_Thread::B_CPU_BOUND;
374             m_threads[T_LDM][i].m_bind_no = cpu;
375             if ((cpu = mask.find(cpu + 1)) == mask.NotFound)
376             {
377               cpu = mask.find(0);
378             }
379           }
380         }
381       }
382     }
383     else
384     {
385       unsigned cpu = mask.find(0);
386       m_info_msg.appfmt("Assigning LQH thread to CPU %u and "
387                         "other threads will share\n", cpu);
388       bind_unbound(m_threads[T_LDM], cpu);
389       cpu = mask.find(cpu + 1);
390       bind_unbound(m_threads[T_MAIN], cpu);
391       bind_unbound(m_threads[T_RECV], cpu);
392     }
393   }
394 
395   return 0;
396 }
397 
398 unsigned
count_unbound(const Vector<T_Thread> & vec) const399 THRConfig::count_unbound(const Vector<T_Thread>& vec) const
400 {
401   unsigned cnt = 0;
402   for (unsigned i = 0; i < vec.size(); i++)
403   {
404     if (vec[i].m_bind_type == T_Thread::B_UNBOUND)
405       cnt ++;
406   }
407   return cnt;
408 }
409 
410 void
bind_unbound(Vector<T_Thread> & vec,unsigned cpu)411 THRConfig::bind_unbound(Vector<T_Thread>& vec, unsigned cpu)
412 {
413   for (unsigned i = 0; i < vec.size(); i++)
414   {
415     if (vec[i].m_bind_type == T_Thread::B_UNBOUND)
416     {
417       vec[i].m_bind_type = T_Thread::B_CPU_BOUND;
418       vec[i].m_bind_no = cpu;
419     }
420   }
421 }
422 
423 int
do_validate()424 THRConfig::do_validate()
425 {
426   /**
427    * Check that there aren't too many of any thread type
428    */
429   for (unsigned i = 0; i< NDB_ARRAY_SIZE(m_threads); i++)
430   {
431     if (m_threads[i].size() > getMaxEntries(i))
432     {
433       m_err_msg.assfmt("Too many instances(%u) of %s max supported: %u",
434                        m_threads[i].size(),
435                        getEntryName(i),
436                        getMaxEntries(i));
437       return -1;
438     }
439   }
440 
441   /**
442    * LDM can be 1 2 4
443    */
444   if (m_threads[T_LDM].size() == 3)
445   {
446     m_err_msg.assfmt("No of LDM-instances can be 1,2,4. Specified: %u",
447                      m_threads[T_LDM].size());
448     return -1;
449   }
450 
451   return 0;
452 }
453 
454 const char *
getConfigString()455 THRConfig::getConfigString()
456 {
457   m_cfg_string.clear();
458   const char * sep = "";
459   for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
460   {
461     if (m_threads[i].size())
462     {
463       const char * name = getEntryName(i);
464       if (i != T_IO)
465       {
466         for (unsigned j = 0; j < m_threads[i].size(); j++)
467         {
468           m_cfg_string.append(sep);
469           sep=",";
470           m_cfg_string.append(name);
471           if (m_threads[i][j].m_bind_type != T_Thread::B_UNBOUND)
472           {
473             m_cfg_string.append("={");
474             if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
475             {
476               m_cfg_string.appfmt("cpubind=%u", m_threads[i][j].m_bind_no);
477             }
478             else if (m_threads[i][j].m_bind_type == T_Thread::B_CPUSET_BOUND)
479             {
480               m_cfg_string.appfmt("cpuset=%s",
481                                   m_cpu_sets[m_threads[i][j].m_bind_no].str().c_str());
482             }
483             m_cfg_string.append("}");
484           }
485         }
486       }
487       else
488       {
489         for (unsigned j = 0; j < m_threads[i].size(); j++)
490         {
491           if (m_threads[i][j].m_bind_type != T_Thread::B_UNBOUND)
492           {
493             m_cfg_string.append(sep);
494             sep=",";
495             m_cfg_string.append(name);
496             m_cfg_string.append("={");
497             if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
498             {
499               m_cfg_string.appfmt("cpubind=%u", m_threads[i][j].m_bind_no);
500             }
501             else if (m_threads[i][j].m_bind_type == T_Thread::B_CPUSET_BOUND)
502             {
503               m_cfg_string.appfmt("cpuset=%s",
504                                   m_cpu_sets[m_threads[i][j].m_bind_no].str().c_str());
505             }
506             m_cfg_string.append("}");
507           }
508         }
509       }
510     }
511   }
512   return m_cfg_string.c_str();
513 }
514 
515 Uint32
getThreadCount() const516 THRConfig::getThreadCount() const
517 {
518   // Note! not counting T_IO
519   Uint32 cnt = 0;
520   for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
521   {
522     if (i != T_IO)
523     {
524       cnt += m_threads[i].size();
525     }
526   }
527   return cnt;
528 }
529 
530 Uint32
getThreadCount(T_Type type) const531 THRConfig::getThreadCount(T_Type type) const
532 {
533   for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
534   {
535     if (i == (Uint32)type)
536     {
537       return m_threads[i].size();
538     }
539   }
540   return 0;
541 }
542 
543 const char *
getErrorMessage() const544 THRConfig::getErrorMessage() const
545 {
546   if (m_err_msg.empty())
547     return 0;
548   return m_err_msg.c_str();
549 }
550 
551 const char *
getInfoMessage() const552 THRConfig::getInfoMessage() const
553 {
554   if (m_info_msg.empty())
555     return 0;
556   return m_info_msg.c_str();
557 }
558 
559 static
560 char *
skipblank(char * str)561 skipblank(char * str)
562 {
563   while (isspace(* str))
564     str++;
565   return str;
566 }
567 
568 Uint32
find_type(char * & str)569 THRConfig::find_type(char *& str)
570 {
571   str = skipblank(str);
572 
573   char * name = str;
574   if (* name == 0)
575   {
576     m_err_msg.assfmt("empty thread specification");
577     return 0;
578   }
579   char * end = name;
580   while(isalpha(* end))
581     end++;
582 
583   char save = * end;
584   * end = 0;
585   Uint32 t = getEntryType(name);
586   if (t == T_END)
587   {
588     m_err_msg.assfmt("unknown thread type '%s'", name);
589   }
590   * end = save;
591   str = end;
592   return t;
593 }
594 
595 struct ParamValue
596 {
ParamValueParamValue597   ParamValue() { found = false;}
598   bool found;
599   const char * string_val;
600   unsigned unsigned_val;
601   SparseBitmask mask_val;
602 };
603 
604 static
605 int
parseUnsigned(char * & str,unsigned * dst)606 parseUnsigned(char *& str, unsigned * dst)
607 {
608   str = skipblank(str);
609   char * endptr = 0;
610   errno = 0;
611   long val = strtol(str, &endptr, 0);
612   if (errno == ERANGE)
613     return -1;
614   if (val < 0 || Int64(val) > 0xFFFFFFFF)
615     return -1;
616   if (endptr == str)
617     return -1;
618   str = endptr;
619   *dst = (unsigned)val;
620   return 0;
621 }
622 
623 static
624 int
parseBitmask(char * & str,SparseBitmask * mask)625 parseBitmask(char *& str, SparseBitmask * mask)
626 {
627   str = skipblank(str);
628   size_t len = strspn(str, "0123456789-, ");
629   if (len == 0)
630     return -1;
631 
632   while (isspace(str[len-1]))
633     len--;
634   if (str[len-1] == ',')
635     len--;
636   char save = str[len];
637   str[len] = 0;
638   int res = parse_mask(str, *mask);
639   str[len] = save;
640   str = str + len;
641   return res;
642 }
643 
644 static
645 int
parseParams(char * str,ParamValue values[],BaseString & err)646 parseParams(char * str, ParamValue values[], BaseString& err)
647 {
648   const char * const save = str;
649   while (* str)
650   {
651     str = skipblank(str);
652 
653     unsigned idx = 0;
654     for (; idx < NDB_ARRAY_SIZE(m_params); idx++)
655     {
656 
657 #if ! SUPPORT_CPU_SET
658       if (idx == IX_CPUSET)
659         continue;
660 #endif
661 
662       if (strncasecmp(str, m_params[idx].name, strlen(m_params[idx].name)) == 0)
663       {
664         str += strlen(m_params[idx].name);
665         break;
666       }
667     }
668 
669     if (idx == NDB_ARRAY_SIZE(m_params))
670     {
671       err.assfmt("Unknown param near: '%s'", str);
672       return -1;
673     }
674 
675     if (values[idx].found == true)
676     {
677       err.assfmt("Param '%s' found twice", m_params[idx].name);
678       return -1;
679     }
680 
681     str = skipblank(str);
682     if (* str != '=')
683     {
684       err.assfmt("Missing '=' after %s in '%s'", m_params[idx].name, save);
685       return -1;
686     }
687     str++;
688     str = skipblank(str);
689 
690     int res = 0;
691     switch(m_params[idx].type){
692     case THRConfig::Param::S_UNSIGNED:
693       res = parseUnsigned(str, &values[idx].unsigned_val);
694       break;
695     case THRConfig::Param::S_BITMASK:
696       res = parseBitmask(str, &values[idx].mask_val);
697       break;
698     default:
699       err.assfmt("Internal error, unknown type for param: '%s'",
700                  m_params[idx].name);
701       return -1;
702     }
703     if (res == -1)
704     {
705       err.assfmt("Unable to parse %s=%s", m_params[idx].name, str);
706       return -1;
707     }
708     values[idx].found = true;
709     str = skipblank(str);
710 
711     if (* str == 0)
712       break;
713 
714     if (* str != ',')
715     {
716       err.assfmt("Unable to parse near '%s'", str);
717       return -1;
718     }
719     str++;
720   }
721   return 0;
722 }
723 
724 int
find_spec(char * & str,T_Type type)725 THRConfig::find_spec(char *& str, T_Type type)
726 {
727   str = skipblank(str);
728 
729   switch(* str){
730   case ',':
731   case 0:
732     add(type);
733     return 0;
734   }
735 
736   if (* str != '=')
737   {
738 err:
739     int len = (int)strlen(str);
740     m_err_msg.assfmt("Invalid format near: '%.*s'",
741                      (len > 10) ? 10 : len, str);
742     return -1;
743   }
744 
745   str++; // skip over =
746   str = skipblank(str);
747 
748   if (* str != '{')
749   {
750     goto err;
751   }
752 
753   str++;
754   char * start = str;
755 
756   /**
757    * Find end
758    */
759   while (* str && (* str) != '}')
760     str++;
761 
762   if (* str != '}')
763   {
764     goto err;
765   }
766 
767   char * end = str;
768   char save = * end;
769   * end = 0;
770 
771   ParamValue values[NDB_ARRAY_SIZE(m_params)];
772   values[IX_COUNT].unsigned_val = 1;
773   int res = parseParams(start, values, m_err_msg);
774   * end = save;
775 
776   if (res != 0)
777   {
778     return -1;
779   }
780 
781   if (values[IX_CPUBOUND].found && values[IX_CPUSET].found)
782   {
783     m_err_msg.assfmt("Both cpuset and cpubind specified!");
784     return -1;
785   }
786 
787   unsigned cnt = values[IX_COUNT].unsigned_val;
788   const int index = m_threads[type].size();
789   for (unsigned i = 0; i < cnt; i++)
790   {
791     add(type);
792   }
793 
794   assert(m_threads[type].size() == index + cnt);
795   if (values[IX_CPUSET].found)
796   {
797     SparseBitmask & mask = values[IX_CPUSET].mask_val;
798     unsigned no = createCpuSet(mask);
799     for (unsigned i = 0; i < cnt; i++)
800     {
801       m_threads[type][index+i].m_bind_type = T_Thread::B_CPUSET_BOUND;
802       m_threads[type][index+i].m_bind_no = no;
803     }
804   }
805   else if (values[IX_CPUBOUND].found)
806   {
807     SparseBitmask & mask = values[IX_CPUBOUND].mask_val;
808     if (mask.count() < cnt)
809     {
810       m_err_msg.assfmt("%s: trying to bind %u threads to %u cpus [%s]",
811                        getEntryName(type),
812                        cnt,
813                        mask.count(),
814                        mask.str().c_str());
815       return -1;
816     }
817     for (unsigned i = 0; i < cnt; i++)
818     {
819       m_threads[type][index+i].m_bind_type = T_Thread::B_CPU_BOUND;
820       m_threads[type][index+i].m_bind_no = mask.getBitNo(i % mask.count());
821     }
822   }
823 
824   str++; // skip over }
825   return 0;
826 }
827 
828 int
find_next(char * & str)829 THRConfig::find_next(char *& str)
830 {
831   str = skipblank(str);
832 
833   if (* str == 0)
834   {
835     return 0;
836   }
837   else if (* str == ',')
838   {
839     str++;
840     return 1;
841   }
842 
843   int len = (int)strlen(str);
844   m_err_msg.assfmt("Invalid format near: '%.*s'",
845                    (len > 10) ? 10 : len, str);
846   return -1;
847 }
848 
849 int
do_parse(const char * ThreadConfig)850 THRConfig::do_parse(const char * ThreadConfig)
851 {
852   BaseString str(ThreadConfig);
853   char * ptr = (char*)str.c_str();
854   while (* ptr)
855   {
856     Uint32 type = find_type(ptr);
857     if (type == T_END)
858       return -1;
859 
860     if (find_spec(ptr, (T_Type)type) < 0)
861       return -1;
862 
863     int ret = find_next(ptr);
864     if (ret < 0)
865       return ret;
866 
867     if (ret == 0)
868       break;
869   }
870 
871   for (Uint32 i = 0; i < T_END; i++)
872   {
873     while (m_threads[i].size() < m_entries[i].m_min_cnt)
874       add((T_Type)i);
875   }
876 
877   return do_bindings() || do_validate();
878 }
879 
880 unsigned
createCpuSet(const SparseBitmask & mask)881 THRConfig::createCpuSet(const SparseBitmask& mask)
882 {
883   for (unsigned i = 0; i < m_cpu_sets.size(); i++)
884     if (m_cpu_sets[i].equal(mask))
885       return i;
886 
887   m_cpu_sets.push_back(mask);
888   return m_cpu_sets.size() - 1;
889 }
890 
891 template class Vector<SparseBitmask>;
892 template class Vector<THRConfig::T_Thread>;
893 
894 #ifndef TEST_MT_THR_CONFIG
895 #include <BlockNumbers.h>
896 #include <NdbThread.h>
897 
898 static
899 int
findBlock(Uint32 blockNo,const unsigned short list[],unsigned cnt)900 findBlock(Uint32 blockNo, const unsigned short list[], unsigned cnt)
901 {
902   for (Uint32 i = 0; i < cnt; i++)
903   {
904     if (blockToMain(list[i]) == blockNo)
905       return blockToInstance(list[i]);
906   }
907   return -1;
908 }
909 
910 const THRConfig::T_Thread*
find_thread(const unsigned short instancelist[],unsigned cnt) const911 THRConfigApplier::find_thread(const unsigned short instancelist[], unsigned cnt) const
912 {
913   int instanceNo;
914   if ((instanceNo = findBlock(SUMA, instancelist, cnt)) >= 0)
915   {
916     return &m_threads[T_REP][instanceNo];
917   }
918   else if ((instanceNo = findBlock(CMVMI, instancelist, cnt)) >= 0)
919   {
920     return &m_threads[T_RECV][instanceNo];
921   }
922   else if ((instanceNo = findBlock(DBDIH, instancelist, cnt)) >= 0)
923   {
924     return &m_threads[T_MAIN][instanceNo];
925   }
926   else if ((instanceNo = findBlock(DBLQH, instancelist, cnt)) >= 0)
927   {
928     return &m_threads[T_LDM][instanceNo - 1]; // remove proxy...
929   }
930   return 0;
931 }
932 
933 void
appendInfo(BaseString & str,const unsigned short list[],unsigned cnt) const934 THRConfigApplier::appendInfo(BaseString& str,
935                              const unsigned short list[], unsigned cnt) const
936 {
937   const T_Thread* thr = find_thread(list, cnt);
938   assert(thr != 0);
939   str.appfmt("(%s) ", getEntryName(thr->m_type));
940   if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
941   {
942     str.appfmt("cpu: %u ", thr->m_bind_no);
943   }
944   else if (thr->m_bind_type == T_Thread::B_CPUSET_BOUND)
945   {
946     str.appfmt("cpuset: [ %s ] ", m_cpu_sets[thr->m_bind_no].str().c_str());
947   }
948 }
949 
950 int
create_cpusets()951 THRConfigApplier::create_cpusets()
952 {
953   return 0;
954 }
955 
956 int
do_bind(NdbThread * thread,const unsigned short list[],unsigned cnt)957 THRConfigApplier::do_bind(NdbThread* thread,
958                           const unsigned short list[], unsigned cnt)
959 {
960   const T_Thread* thr = find_thread(list, cnt);
961   if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
962   {
963     int res = NdbThread_LockCPU(thread, thr->m_bind_no);
964     if (res == 0)
965       return 1;
966     else
967       return -res;
968   }
969 #if TODO
970   else if (thr->m_bind_type == T_Thread::B_CPUSET_BOUND)
971   {
972   }
973 #endif
974 
975   return 0;
976 }
977 
978 int
do_bind_io(NdbThread * thread)979 THRConfigApplier::do_bind_io(NdbThread* thread)
980 {
981   const T_Thread* thr = &m_threads[T_IO][0];
982   if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
983   {
984     int res = NdbThread_LockCPU(thread, thr->m_bind_no);
985     if (res == 0)
986       return 1;
987     else
988       return -res;
989   }
990 #if TODO
991   else if (thr->m_bind_type == T_Thread::B_CPUSET_BOUND)
992   {
993   }
994 #endif
995 
996   return 0;
997 }
998 #endif
999 
1000 #ifdef TEST_MT_THR_CONFIG
1001 
1002 #include <NdbTap.hpp>
1003 
TAPTEST(mt_thr_config)1004 TAPTEST(mt_thr_config)
1005 {
1006   {
1007     THRConfig tmp;
1008     OK(tmp.do_parse(8, 0, 0) == 0);
1009   }
1010 
1011   /**
1012    * BASIC test
1013    */
1014   {
1015     const char * ok[] =
1016       {
1017         "ldm,ldm",
1018         "ldm={count=3},ldm",
1019         "ldm={cpubind=1-2,5,count=3},ldm",
1020         "ldm={ cpubind = 1- 2, 5 , count = 3 },ldm",
1021         "ldm={count=3,cpubind=1-2,5 },  ldm",
1022         "ldm={cpuset=1-3,count=3 },ldm",
1023         "main,ldm={},ldm",
1024         0
1025       };
1026 
1027     const char * fail [] =
1028       {
1029         "ldm,ldm,ldm",
1030         "ldm={cpubind= 1 , cpuset=2 },ldm",
1031         "ldm={count=4,cpubind=1-3},ldm",
1032         "main,main,ldm,ldm",
1033         "main={ keso=88, count=23},ldm,ldm",
1034         "main={ cpuset=1-3 }, ldm={cpuset=3-4}",
1035         "main={ cpuset=1-3 }, ldm={cpubind=2}",
1036         0
1037       };
1038 
1039     for (Uint32 i = 0; ok[i]; i++)
1040     {
1041       THRConfig tmp;
1042       int res = tmp.do_parse(ok[i]);
1043       printf("do_parse(%s) => %s - %s\n", ok[i],
1044              res == 0 ? "OK" : "FAIL",
1045              res == 0 ? "" : tmp.getErrorMessage());
1046       OK(res == 0);
1047       {
1048         BaseString out(tmp.getConfigString());
1049         THRConfig check;
1050         OK(check.do_parse(out.c_str()) == 0);
1051         OK(strcmp(out.c_str(), check.getConfigString()) == 0);
1052       }
1053     }
1054 
1055     for (Uint32 i = 0; fail[i]; i++)
1056     {
1057       THRConfig tmp;
1058       int res = tmp.do_parse(fail[i]);
1059       printf("do_parse(%s) => %s - %s\n", fail[i],
1060              res == 0 ? "OK" : "FAIL",
1061              res == 0 ? "" : tmp.getErrorMessage());
1062       OK(res != 0);
1063     }
1064   }
1065 
1066   {
1067     /**
1068      * Test interaction with LockExecuteThreadToCPU
1069      */
1070     const char * t[] =
1071     {
1072       /** threads, LockExecuteThreadToCPU, answer */
1073       "1-8",
1074       "ldm={count=4}",
1075       "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7}",
1076 
1077       "1-5",
1078       "ldm={count=4}",
1079       "main={cpubind=5},ldm={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},recv={cpubind=5},rep={cpubind=5}",
1080 
1081       "1-3",
1082       "ldm={count=4}",
1083       "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=2},ldm={cpubind=3},recv={cpubind=1},rep={cpubind=1}",
1084 
1085       "1-4",
1086       "ldm={count=4}",
1087       "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=2},recv={cpubind=1},rep={cpubind=1}",
1088 
1089       "1-8",
1090       "ldm={count=4},io={cpubind=8}",
1091       "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7},io={cpubind=8}",
1092 
1093       "1-8",
1094       "ldm={count=4,cpubind=1,4,5,6}",
1095       "main={cpubind=2},ldm={cpubind=1},ldm={cpubind=4},ldm={cpubind=5},ldm={cpubind=6},recv={cpubind=3},rep={cpubind=7}",
1096 
1097       // END
1098       0
1099     };
1100 
1101     for (unsigned i = 0; t[i]; i+= 3)
1102     {
1103       THRConfig tmp;
1104       tmp.setLockExecuteThreadToCPU(t[i+0]);
1105       int res = tmp.do_parse(t[i+1]);
1106       int ok = strcmp(tmp.getConfigString(), t[i+2]) == 0;
1107       printf("mask: %s conf: %s => %s(%s) - %s - %s\n",
1108              t[i+0],
1109              t[i+1],
1110              res == 0 ? "OK" : "FAIL",
1111              res == 0 ? "" : tmp.getErrorMessage(),
1112              tmp.getConfigString(),
1113              ok == 1 ? "CORRECT" : "INCORRECT");
1114       OK(res == 0);
1115       OK(ok == 1);
1116     }
1117   }
1118 
1119   return 1;
1120 }
1121 
1122 #endif
1123