1 /* Copyright (c) 2016, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include <mysql/group_replication_priv.h>
24 #include "pipeline_stats.h"
25 #include "plugin_server_include.h"
26 #include "plugin_log.h"
27 #include "plugin.h"
28 
29 /*
30   The QUOTA based flow control tries to calculate how many
31   transactions the slowest members can handle, at the certifier or
32   at the applier level, by checking which members have a queue
33   larger than the user-specified thresholds and, on those, checking
34   which one has the lowest number of transactions certified/applied
35   on the last step - let's call it MMT, which stands for Minimum
36   Member Throughput. We then divide MMT by the number of writing
37   members in the last step to specify how many transactions a
38   member can safely send to the group (if a new member starts to
39   write then the quota will be larger for one period but will be
40   corrected on the next).
41   About these factors:
42     1. If we used MMT as the assigned quota (and if MMT represented
43        well the capacity of the nodes) then the queue size would
44        stabilize but would not decrease. To allow a delayed node to
45        catch up on the certifier and/or queues we need to reserve
46        some capacity on the slowest node, which this HOLD_FACTOR
47        represents: 10% reserved to catch up.
48     2. Once the queue is reduced below the user-specified threshold,
49        the nodes would start to issue transactions at full speed
50        even if that full speed meant pilling up many transactions
51        in a single period. To avoid that we introduce the
52        RELEASE_FACTOR (50%), which is enough to let the write
53        capacity to grow quickly but still maintain a relation with
54        the last throttled value so that the oscillation in number
55        of transactions per second is not very steep, letting the
56        throughput oscillate smoothly around the real cluster
57        capacity.
58 */
59 const int64 Flow_control_module::MAXTPS= INT_MAX32;
60 const double Flow_control_module::HOLD_FACTOR= 0.9;
61 const double Flow_control_module::RELEASE_FACTOR= 1.5;
62 
63 
Pipeline_stats_member_message(int32 transactions_waiting_certification,int32 transactions_waiting_apply,int64 transactions_certified,int64 transactions_applied,int64 transactions_local)64 Pipeline_stats_member_message::Pipeline_stats_member_message(
65     int32 transactions_waiting_certification,
66     int32 transactions_waiting_apply,
67     int64 transactions_certified,
68     int64 transactions_applied,
69     int64 transactions_local)
70   : Plugin_gcs_message(CT_PIPELINE_STATS_MEMBER_MESSAGE),
71     m_transactions_waiting_certification(transactions_waiting_certification),
72     m_transactions_waiting_apply(transactions_waiting_apply),
73     m_transactions_certified(transactions_certified),
74     m_transactions_applied(transactions_applied),
75     m_transactions_local(transactions_local)
76 {}
77 
78 
Pipeline_stats_member_message(const unsigned char * buf,uint64 len)79 Pipeline_stats_member_message::Pipeline_stats_member_message(const unsigned char *buf, uint64 len)
80   : Plugin_gcs_message(CT_PIPELINE_STATS_MEMBER_MESSAGE),
81     m_transactions_waiting_certification(0),
82     m_transactions_waiting_apply(0),
83     m_transactions_certified(0),
84     m_transactions_applied(0),
85     m_transactions_local(0)
86 {
87   decode(buf, len);
88 }
89 
90 
~Pipeline_stats_member_message()91 Pipeline_stats_member_message::~Pipeline_stats_member_message()
92 {}
93 
94 
95 int32
get_transactions_waiting_certification()96 Pipeline_stats_member_message::get_transactions_waiting_certification()
97 {
98   DBUG_ENTER("Pipeline_stats_member_message::get_transactions_waiting_certification");
99   DBUG_RETURN(m_transactions_waiting_certification);
100 }
101 
102 
103 int64
get_transactions_certified()104 Pipeline_stats_member_message::get_transactions_certified()
105 {
106   DBUG_ENTER("Pipeline_stats_member_message::get_transactions_certified");
107   DBUG_RETURN(m_transactions_certified);
108 }
109 
110 
111 int32
get_transactions_waiting_apply()112 Pipeline_stats_member_message::get_transactions_waiting_apply()
113 {
114   DBUG_ENTER("Pipeline_stats_member_message::get_transactions_waiting_apply");
115   DBUG_RETURN(m_transactions_waiting_apply);
116 }
117 
118 
119 int64
get_transactions_applied()120 Pipeline_stats_member_message::get_transactions_applied()
121 {
122   DBUG_ENTER("Pipeline_stats_member_message::get_transactions_applied");
123   DBUG_RETURN(m_transactions_applied);
124 }
125 
126 
127 int64
get_transactions_local()128 Pipeline_stats_member_message::get_transactions_local()
129 {
130   DBUG_ENTER("Pipeline_stats_member_message::get_transactions_local");
131   DBUG_RETURN(m_transactions_local);
132 }
133 
134 
135 void
encode_payload(std::vector<unsigned char> * buffer) const136 Pipeline_stats_member_message::encode_payload(std::vector<unsigned char> *buffer) const
137 {
138   DBUG_ENTER("Pipeline_stats_member_message::encode_payload");
139 
140   uint32 transactions_waiting_certification_aux=
141       (uint32)m_transactions_waiting_certification;
142   encode_payload_item_int4(buffer, PIT_TRANSACTIONS_WAITING_CERTIFICATION,
143                            transactions_waiting_certification_aux);
144 
145   uint32 transactions_waiting_apply_aux=
146       (uint32)m_transactions_waiting_apply;
147   encode_payload_item_int4(buffer, PIT_TRANSACTIONS_WAITING_APPLY,
148                            transactions_waiting_apply_aux);
149 
150   uint64 transactions_certified_aux=
151       (uint64)m_transactions_certified;
152   encode_payload_item_int8(buffer, PIT_TRANSACTIONS_CERTIFIED,
153                            transactions_certified_aux);
154 
155   uint64 transactions_applied_aux=
156       (uint64)m_transactions_applied;
157   encode_payload_item_int8(buffer, PIT_TRANSACTIONS_APPLIED,
158                            transactions_applied_aux);
159 
160   uint64 transactions_local_aux=
161       (uint64)m_transactions_local;
162   encode_payload_item_int8(buffer, PIT_TRANSACTIONS_LOCAL,
163                            transactions_local_aux);
164 
165   DBUG_VOID_RETURN;
166 }
167 
168 
169 void
decode_payload(const unsigned char * buffer,const unsigned char * end)170 Pipeline_stats_member_message::decode_payload(const unsigned char *buffer,
171                                               const unsigned char *end)
172 {
173   DBUG_ENTER("Pipeline_stats_member_message::decode_payload");
174   const unsigned char *slider= buffer;
175   uint16 payload_item_type= 0;
176 
177   uint32 transactions_waiting_certification_aux= 0;
178   decode_payload_item_int4(&slider,
179                            &payload_item_type,
180                            &transactions_waiting_certification_aux);
181   m_transactions_waiting_certification=
182       (int32)transactions_waiting_certification_aux;
183 
184   uint32 transactions_waiting_apply_aux= 0;
185   decode_payload_item_int4(&slider,
186                            &payload_item_type,
187                            &transactions_waiting_apply_aux);
188   m_transactions_waiting_apply=
189       (int32)transactions_waiting_apply_aux;
190 
191   uint64 transactions_certified_aux= 0;
192   decode_payload_item_int8(&slider,
193                            &payload_item_type,
194                            &transactions_certified_aux);
195   m_transactions_certified=
196       (int64)transactions_certified_aux;
197 
198   uint64 transactions_applied_aux= 0;
199   decode_payload_item_int8(&slider,
200                            &payload_item_type,
201                            &transactions_applied_aux);
202   m_transactions_applied=
203       (int64)transactions_applied_aux;
204 
205   uint64 transactions_local_aux= 0;
206   decode_payload_item_int8(&slider,
207                            &payload_item_type,
208                            &transactions_local_aux);
209   m_transactions_local=
210       (int64)transactions_local_aux;
211 
212   DBUG_VOID_RETURN;
213 }
214 
215 
Pipeline_stats_member_collector()216 Pipeline_stats_member_collector::Pipeline_stats_member_collector()
217   : m_transactions_waiting_apply(0), m_transactions_certified(0),
218     m_transactions_applied(0), m_transactions_local(0)
219 {
220   mysql_mutex_init(key_GR_LOCK_pipeline_stats_transactions_waiting_apply,
221                    &m_transactions_waiting_apply_lock,
222                    MY_MUTEX_INIT_FAST);
223 }
224 
225 
~Pipeline_stats_member_collector()226 Pipeline_stats_member_collector::~Pipeline_stats_member_collector()
227 {
228   mysql_mutex_destroy(&m_transactions_waiting_apply_lock);
229 }
230 
231 
232 void
increment_transactions_waiting_apply()233 Pipeline_stats_member_collector::increment_transactions_waiting_apply()
234 {
235   mysql_mutex_lock(&m_transactions_waiting_apply_lock);
236   assert(my_atomic_load32(&m_transactions_waiting_apply) >= 0);
237   my_atomic_add32(&m_transactions_waiting_apply, 1);
238   mysql_mutex_unlock(&m_transactions_waiting_apply_lock);
239 }
240 
241 
242 void
decrement_transactions_waiting_apply()243 Pipeline_stats_member_collector::decrement_transactions_waiting_apply()
244 {
245   mysql_mutex_lock(&m_transactions_waiting_apply_lock);
246   if (m_transactions_waiting_apply > 0)
247     my_atomic_add32(&m_transactions_waiting_apply, -1);
248   assert(my_atomic_load32(&m_transactions_waiting_apply) >= 0);
249   mysql_mutex_unlock(&m_transactions_waiting_apply_lock);
250 }
251 
252 
253 void
increment_transactions_certified()254 Pipeline_stats_member_collector::increment_transactions_certified()
255 {
256   my_atomic_add64(&m_transactions_certified, 1);
257 }
258 
259 
260 void
increment_transactions_applied()261 Pipeline_stats_member_collector::increment_transactions_applied()
262 {
263   my_atomic_add64(&m_transactions_applied, 1);
264 }
265 
266 
267 void
increment_transactions_local()268 Pipeline_stats_member_collector::increment_transactions_local()
269 {
270   my_atomic_add64(&m_transactions_local, 1);
271 }
272 
get_transactions_waiting_apply()273 int32 Pipeline_stats_member_collector::get_transactions_waiting_apply()
274 {
275   return my_atomic_load32(&m_transactions_waiting_apply);
276 }
277 
get_transactions_certified()278 int64 Pipeline_stats_member_collector::get_transactions_certified()
279 {
280   return my_atomic_load64(&m_transactions_certified);
281 }
282 
get_transactions_applied()283 int64 Pipeline_stats_member_collector::get_transactions_applied()
284 {
285   return my_atomic_load64(&m_transactions_applied);
286 }
287 
get_transactions_local()288 int64 Pipeline_stats_member_collector::get_transactions_local()
289 {
290   return my_atomic_load64(&m_transactions_local);
291 }
292 
293 void
send_stats_member_message()294 Pipeline_stats_member_collector::send_stats_member_message()
295 {
296   if (local_member_info == NULL)
297     return; /* purecov: inspected */
298   Group_member_info::Group_member_status member_status=
299       local_member_info->get_recovery_status();
300   if (member_status != Group_member_info::MEMBER_ONLINE &&
301       member_status != Group_member_info::MEMBER_IN_RECOVERY)
302     return;
303 
304   Pipeline_stats_member_message message(
305       static_cast<int32>(applier_module->get_message_queue_size()),
306       my_atomic_load32(&m_transactions_waiting_apply),
307       my_atomic_load64(&m_transactions_certified),
308       my_atomic_load64(&m_transactions_applied),
309       my_atomic_load64(&m_transactions_local));
310 
311   enum_gcs_error msg_error= gcs_module->send_message(message, true);
312   if (msg_error != GCS_OK)
313   {
314     log_message(MY_INFORMATION_LEVEL,
315                 "Error while sending stats message"); /* purecov: inspected */
316   }
317 }
318 
319 
Pipeline_member_stats()320 Pipeline_member_stats::Pipeline_member_stats()
321   : m_transactions_waiting_certification(0),
322     m_transactions_waiting_apply(0),
323     m_transactions_certified(0),
324     m_delta_transactions_certified(0),
325     m_transactions_applied(0),
326     m_delta_transactions_applied(0),
327     m_transactions_local(0),
328     m_delta_transactions_local(0),
329     m_transactions_negative_certified(0),
330     m_transactions_rows_validating(0),
331     m_transactions_committed_all_members(),
332     m_transaction_last_conflict_free(),
333     m_stamp(0)
334 {}
335 
336 
Pipeline_member_stats(Pipeline_stats_member_message & msg)337 Pipeline_member_stats::Pipeline_member_stats(Pipeline_stats_member_message &msg)
338   : m_transactions_waiting_certification(msg.get_transactions_waiting_certification()),
339     m_transactions_waiting_apply(msg.get_transactions_waiting_apply()),
340     m_transactions_certified(msg.get_transactions_certified()),
341     m_delta_transactions_certified(0),
342     m_transactions_applied(msg.get_transactions_applied()),
343     m_delta_transactions_applied(0),
344     m_transactions_local(msg.get_transactions_local()),
345     m_delta_transactions_local(0),
346     m_transactions_negative_certified(0),
347     m_transactions_rows_validating(0),
348     m_transactions_committed_all_members(),
349     m_transaction_last_conflict_free(),
350     m_stamp(0)
351 {}
352 
Pipeline_member_stats(Pipeline_stats_member_collector * pipeline_stats,ulonglong applier_queue,ulonglong negative_certified,ulonglong certification_size)353 Pipeline_member_stats::Pipeline_member_stats(
354     Pipeline_stats_member_collector *pipeline_stats, ulonglong applier_queue,
355     ulonglong negative_certified, ulonglong certification_size)
356   : m_transactions_committed_all_members(),
357     m_transaction_last_conflict_free()
358 {
359   m_transactions_waiting_certification= applier_queue;
360   m_transactions_waiting_apply= pipeline_stats->get_transactions_waiting_apply();
361   m_transactions_certified= pipeline_stats->get_transactions_certified();
362   m_delta_transactions_certified= 0;
363   m_transactions_applied= pipeline_stats->get_transactions_applied();
364   m_delta_transactions_applied= 0;
365   m_transactions_local= pipeline_stats->get_transactions_local();
366   m_delta_transactions_local= 0;
367   m_transactions_negative_certified= negative_certified;
368   m_transactions_rows_validating= certification_size;
369   m_stamp= 0;
370 }
371 
~Pipeline_member_stats()372 Pipeline_member_stats::~Pipeline_member_stats()
373 {}
374 
375 
376 void
update_member_stats(Pipeline_stats_member_message & msg,uint64 stamp)377 Pipeline_member_stats::update_member_stats(Pipeline_stats_member_message &msg,
378                                            uint64 stamp)
379 {
380   m_transactions_waiting_certification=
381       msg.get_transactions_waiting_certification();
382 
383   m_transactions_waiting_apply=
384       msg.get_transactions_waiting_apply();
385 
386   int64 previous_transactions_certified= m_transactions_certified;
387   m_transactions_certified= msg.get_transactions_certified();
388   m_delta_transactions_certified=
389       m_transactions_certified - previous_transactions_certified;
390 
391   int64 previous_transactions_applied= m_transactions_applied;
392   m_transactions_applied= msg.get_transactions_applied();
393   m_delta_transactions_applied=
394       m_transactions_applied - previous_transactions_applied;
395 
396   int64 previous_transactions_local= m_transactions_local;
397   m_transactions_local= msg.get_transactions_local();
398   m_delta_transactions_local=
399       m_transactions_local - previous_transactions_local;
400 
401   m_stamp= stamp;
402 }
403 
404 
405 bool
is_flow_control_needed()406 Pipeline_member_stats::is_flow_control_needed()
407 {
408   return (m_transactions_waiting_certification > flow_control_certifier_threshold_var
409           || m_transactions_waiting_apply > flow_control_applier_threshold_var);
410 }
411 
412 
413 int32
get_transactions_waiting_certification()414 Pipeline_member_stats::get_transactions_waiting_certification()
415 {
416   return m_transactions_waiting_certification;
417 }
418 
419 
420 int32
get_transactions_waiting_apply()421 Pipeline_member_stats::get_transactions_waiting_apply()
422 {
423   return m_transactions_waiting_apply;
424 }
425 
426 
427 int64
get_delta_transactions_certified()428 Pipeline_member_stats::get_delta_transactions_certified()
429 {
430   return m_delta_transactions_certified;
431 }
432 
433 
434 int64
get_delta_transactions_applied()435 Pipeline_member_stats::get_delta_transactions_applied()
436 {
437   return m_delta_transactions_applied;
438 }
439 
440 
441 int64
get_delta_transactions_local()442 Pipeline_member_stats::get_delta_transactions_local()
443 {
444   return m_delta_transactions_local;
445 }
446 
get_transactions_negative_certified()447 int64 Pipeline_member_stats::get_transactions_negative_certified()
448 {
449   return m_transactions_negative_certified;
450 }
451 
get_transactions_rows_validating()452 int64 Pipeline_member_stats::get_transactions_rows_validating()
453 {
454   return m_transactions_rows_validating;
455 }
456 
get_transaction_committed_all_members(std::string & value)457 void Pipeline_member_stats::get_transaction_committed_all_members(std::string &value)
458 {
459   value.assign(m_transactions_committed_all_members);
460 }
461 
set_transaction_committed_all_members(char * str,size_t len)462 void Pipeline_member_stats::set_transaction_committed_all_members(char *str, size_t len)
463 {
464   m_transactions_committed_all_members.assign(str, len);
465 }
466 
get_transaction_last_conflict_free(std::string & value)467 void Pipeline_member_stats::get_transaction_last_conflict_free(
468     std::string &value)
469 {
470   value.assign(m_transaction_last_conflict_free);
471 }
472 
set_transaction_last_conflict_free(std::string & value)473 void Pipeline_member_stats::set_transaction_last_conflict_free(
474     std::string &value)
475 {
476   m_transaction_last_conflict_free.assign(value);
477 }
478 
get_transactions_certified()479 int64 Pipeline_member_stats::get_transactions_certified()
480 {
481   return m_transactions_certified;
482 }
483 
484 uint64
get_stamp()485 Pipeline_member_stats::get_stamp()
486 {
487   return m_stamp;
488 }
489 
490 
491 #ifndef NDEBUG
492 void
debug(const char * member,int64 quota_size,int64 quota_used)493 Pipeline_member_stats::debug(const char *member, int64 quota_size,
494                              int64 quota_used)
495 {
496   log_message(MY_INFORMATION_LEVEL, "Flow control - update member stats: "
497       "%s stats: certifier_queue %d, applier_queue %d,"
498       " certified %ld (%ld), applied %ld (%ld), local %ld (%ld), quota %ld (%ld)",
499       member, m_transactions_waiting_certification,
500       m_transactions_waiting_apply, m_transactions_certified,
501       m_delta_transactions_certified, m_transactions_applied,
502       m_delta_transactions_applied, m_transactions_local,
503       m_delta_transactions_local, quota_size, quota_used); /* purecov: inspected */
504 }
505 #endif
506 
507 
Flow_control_module()508 Flow_control_module::Flow_control_module()
509   : m_holds_in_period(0), m_quota_used(0), m_quota_size(0), m_stamp(0)
510 {
511   mysql_mutex_init(key_GR_LOCK_pipeline_stats_flow_control, &m_flow_control_lock, MY_MUTEX_INIT_FAST);
512   mysql_cond_init(key_GR_COND_pipeline_stats_flow_control, &m_flow_control_cond);
513 }
514 
515 
~Flow_control_module()516 Flow_control_module::~Flow_control_module()
517 {
518   mysql_mutex_destroy(&m_flow_control_lock);
519   mysql_cond_destroy(&m_flow_control_cond);
520 }
521 
522 
523 void
flow_control_step()524 Flow_control_module::flow_control_step()
525 {
526   m_stamp++;
527   int32 holds= my_atomic_fas32(&m_holds_in_period, 0);
528 
529   switch(static_cast<Flow_control_mode>(flow_control_mode_var))
530   {
531     case FCM_QUOTA:
532     {
533       /*
534         Postponed transactions
535       */
536       int64 quota_size= my_atomic_fas64(&m_quota_size, 0);
537       int64 quota_used= my_atomic_fas64(&m_quota_used, 0);
538       int64 extra_quota=
539           (quota_size > 0 && quota_used > quota_size) ? quota_used - quota_size : 0;
540 
541       /*
542         Release waiting transactions on do_wait().
543       */
544       if (extra_quota > 0)
545       {
546         mysql_mutex_lock(&m_flow_control_lock);
547         mysql_cond_broadcast(&m_flow_control_cond);
548         mysql_mutex_unlock(&m_flow_control_lock);
549       }
550 
551       if (holds > 0)
552       {
553         uint num_writing_members= 0;
554         int64 min_certifier_capacity= MAXTPS, min_applier_capacity= MAXTPS, safe_capacity= MAXTPS;
555 
556         Flow_control_module_info::iterator it= m_info.begin();
557         while (it != m_info.end())
558         {
559           if (it->second.get_stamp() < (m_stamp - 10))
560           {
561             /*
562               Purge member stats that were not updated on the last
563               10 flow control steps.
564             */
565             m_info.erase(it++);
566           }
567           else
568           {
569             if (flow_control_certifier_threshold_var > 0
570                 && it->second.get_delta_transactions_certified() > 0
571                 && it->second.get_transactions_waiting_certification() - flow_control_certifier_threshold_var > 0
572                 && min_certifier_capacity > it->second.get_delta_transactions_certified())
573               min_certifier_capacity= it->second.get_delta_transactions_certified();
574 
575             if (it->second.get_delta_transactions_certified() > 0)
576               safe_capacity= std::min(safe_capacity, it->second.get_delta_transactions_certified());
577 
578             if (flow_control_applier_threshold_var > 0
579                 && it->second.get_delta_transactions_applied() > 0
580                 && it->second.get_transactions_waiting_apply() - flow_control_applier_threshold_var > 0
581                 && min_applier_capacity > it->second.get_delta_transactions_applied())
582               min_applier_capacity= it->second.get_delta_transactions_applied();
583 
584             if (it->second.get_delta_transactions_applied() > 0)
585               safe_capacity= std::min(safe_capacity, it->second.get_delta_transactions_applied());
586 
587             if (it->second.get_delta_transactions_local() > 0)
588               num_writing_members++;
589 
590             ++it;
591           }
592         }
593 
594         // Avoid division by zero.
595         num_writing_members= num_writing_members > 0 ? num_writing_members : 1;
596         int64 min_capacity= (min_certifier_capacity > 0 && min_certifier_capacity < min_applier_capacity)
597                              ? min_certifier_capacity : min_applier_capacity;
598 
599         // Minimum capacity will never be less than lim_throttle.
600         int64 lim_throttle= static_cast<int64>(0.05 * std::min(flow_control_certifier_threshold_var,
601                                             flow_control_applier_threshold_var));
602         min_capacity= std::max(std::min(min_capacity, safe_capacity), lim_throttle);
603         quota_size= static_cast<int64>((min_capacity * HOLD_FACTOR) / num_writing_members - extra_quota);
604         my_atomic_store64(&m_quota_size, quota_size > 1 ? quota_size : 1);
605       }
606       else
607       {
608         if (quota_size > 0 && (quota_size * RELEASE_FACTOR) < MAXTPS)
609         {
610           int64 quota_size_next= static_cast<int64>(quota_size * RELEASE_FACTOR);
611           quota_size= quota_size_next > quota_size ? quota_size_next : quota_size + 1;
612         }
613         else
614           quota_size= 0;
615 
616         my_atomic_store64(&m_quota_size, quota_size);
617       }
618 
619       my_atomic_store64(&m_quota_used, 0);
620       break;
621     }
622 
623     case FCM_DISABLED:
624       my_atomic_store64(&m_quota_size, 0);
625       my_atomic_store64(&m_quota_used, 0);
626       break;
627 
628     default:
629       assert(0);
630   }
631 }
632 
633 
634 int
handle_stats_data(const uchar * data,uint64 len,const std::string & member_id)635 Flow_control_module::handle_stats_data(const uchar *data,
636                                        uint64 len,
637                                        const std::string& member_id)
638 {
639   DBUG_ENTER("Flow_control_module::handle_stats_data");
640   int error= 0;
641   Pipeline_stats_member_message message(data, len);
642 
643   /*
644     This method is called synchronously by communication layer, so
645     we do not need concurrency control.
646   */
647   Flow_control_module_info::iterator it= m_info.find(member_id);
648   if (it == m_info.end())
649   {
650     Pipeline_member_stats stats;
651 
652     std::pair<Flow_control_module_info::iterator, bool> ret=
653       m_info.insert(std::pair<std::string, Pipeline_member_stats>
654                     (member_id, stats));
655     error= !ret.second;
656     it= ret.first;
657   }
658   it->second.update_member_stats(message, m_stamp);
659 
660   /*
661     Verify if flow control is required.
662   */
663   if (it->second.is_flow_control_needed())
664   {
665     my_atomic_add32(&m_holds_in_period, 1);
666 #ifndef NDEBUG
667     it->second.debug(it->first.c_str(),
668                      my_atomic_load64(&m_quota_size),
669                      my_atomic_load64(&m_quota_used));
670 #endif
671   }
672 
673   DBUG_RETURN(error);
674 }
675 
676 
677 int32
do_wait()678 Flow_control_module::do_wait()
679 {
680   DBUG_ENTER("Flow_control_module::do_wait");
681   int64 quota_size= my_atomic_load64(&m_quota_size);
682   int64 quota_used= my_atomic_add64(&m_quota_used, 1);
683 
684   if (quota_used > quota_size && quota_size != 0)
685   {
686     struct timespec delay;
687     set_timespec(&delay, 1);
688 
689     mysql_mutex_lock(&m_flow_control_lock);
690     mysql_cond_timedwait(&m_flow_control_cond, &m_flow_control_lock, &delay);
691     mysql_mutex_unlock(&m_flow_control_lock);
692   }
693 
694   DBUG_RETURN(0);
695 }
696