1 /* Copyright (c) 2016, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include <mysql/group_replication_priv.h>
24 #include "pipeline_stats.h"
25 #include "plugin_server_include.h"
26 #include "plugin_log.h"
27 #include "plugin.h"
28
29 /*
30 The QUOTA based flow control tries to calculate how many
31 transactions the slowest members can handle, at the certifier or
32 at the applier level, by checking which members have a queue
33 larger than the user-specified thresholds and, on those, checking
34 which one has the lowest number of transactions certified/applied
35 on the last step - let's call it MMT, which stands for Minimum
36 Member Throughput. We then divide MMT by the number of writing
37 members in the last step to specify how many transactions a
38 member can safely send to the group (if a new member starts to
39 write then the quota will be larger for one period but will be
40 corrected on the next).
41 About these factors:
42 1. If we used MMT as the assigned quota (and if MMT represented
43 well the capacity of the nodes) then the queue size would
44 stabilize but would not decrease. To allow a delayed node to
45 catch up on the certifier and/or queues we need to reserve
46 some capacity on the slowest node, which this HOLD_FACTOR
47 represents: 10% reserved to catch up.
48 2. Once the queue is reduced below the user-specified threshold,
49 the nodes would start to issue transactions at full speed
50 even if that full speed meant pilling up many transactions
51 in a single period. To avoid that we introduce the
52 RELEASE_FACTOR (50%), which is enough to let the write
53 capacity to grow quickly but still maintain a relation with
54 the last throttled value so that the oscillation in number
55 of transactions per second is not very steep, letting the
56 throughput oscillate smoothly around the real cluster
57 capacity.
58 */
59 const int64 Flow_control_module::MAXTPS= INT_MAX32;
60 const double Flow_control_module::HOLD_FACTOR= 0.9;
61 const double Flow_control_module::RELEASE_FACTOR= 1.5;
62
63
Pipeline_stats_member_message(int32 transactions_waiting_certification,int32 transactions_waiting_apply,int64 transactions_certified,int64 transactions_applied,int64 transactions_local)64 Pipeline_stats_member_message::Pipeline_stats_member_message(
65 int32 transactions_waiting_certification,
66 int32 transactions_waiting_apply,
67 int64 transactions_certified,
68 int64 transactions_applied,
69 int64 transactions_local)
70 : Plugin_gcs_message(CT_PIPELINE_STATS_MEMBER_MESSAGE),
71 m_transactions_waiting_certification(transactions_waiting_certification),
72 m_transactions_waiting_apply(transactions_waiting_apply),
73 m_transactions_certified(transactions_certified),
74 m_transactions_applied(transactions_applied),
75 m_transactions_local(transactions_local)
76 {}
77
78
Pipeline_stats_member_message(const unsigned char * buf,uint64 len)79 Pipeline_stats_member_message::Pipeline_stats_member_message(const unsigned char *buf, uint64 len)
80 : Plugin_gcs_message(CT_PIPELINE_STATS_MEMBER_MESSAGE),
81 m_transactions_waiting_certification(0),
82 m_transactions_waiting_apply(0),
83 m_transactions_certified(0),
84 m_transactions_applied(0),
85 m_transactions_local(0)
86 {
87 decode(buf, len);
88 }
89
90
~Pipeline_stats_member_message()91 Pipeline_stats_member_message::~Pipeline_stats_member_message()
92 {}
93
94
95 int32
get_transactions_waiting_certification()96 Pipeline_stats_member_message::get_transactions_waiting_certification()
97 {
98 DBUG_ENTER("Pipeline_stats_member_message::get_transactions_waiting_certification");
99 DBUG_RETURN(m_transactions_waiting_certification);
100 }
101
102
103 int64
get_transactions_certified()104 Pipeline_stats_member_message::get_transactions_certified()
105 {
106 DBUG_ENTER("Pipeline_stats_member_message::get_transactions_certified");
107 DBUG_RETURN(m_transactions_certified);
108 }
109
110
111 int32
get_transactions_waiting_apply()112 Pipeline_stats_member_message::get_transactions_waiting_apply()
113 {
114 DBUG_ENTER("Pipeline_stats_member_message::get_transactions_waiting_apply");
115 DBUG_RETURN(m_transactions_waiting_apply);
116 }
117
118
119 int64
get_transactions_applied()120 Pipeline_stats_member_message::get_transactions_applied()
121 {
122 DBUG_ENTER("Pipeline_stats_member_message::get_transactions_applied");
123 DBUG_RETURN(m_transactions_applied);
124 }
125
126
127 int64
get_transactions_local()128 Pipeline_stats_member_message::get_transactions_local()
129 {
130 DBUG_ENTER("Pipeline_stats_member_message::get_transactions_local");
131 DBUG_RETURN(m_transactions_local);
132 }
133
134
135 void
encode_payload(std::vector<unsigned char> * buffer) const136 Pipeline_stats_member_message::encode_payload(std::vector<unsigned char> *buffer) const
137 {
138 DBUG_ENTER("Pipeline_stats_member_message::encode_payload");
139
140 uint32 transactions_waiting_certification_aux=
141 (uint32)m_transactions_waiting_certification;
142 encode_payload_item_int4(buffer, PIT_TRANSACTIONS_WAITING_CERTIFICATION,
143 transactions_waiting_certification_aux);
144
145 uint32 transactions_waiting_apply_aux=
146 (uint32)m_transactions_waiting_apply;
147 encode_payload_item_int4(buffer, PIT_TRANSACTIONS_WAITING_APPLY,
148 transactions_waiting_apply_aux);
149
150 uint64 transactions_certified_aux=
151 (uint64)m_transactions_certified;
152 encode_payload_item_int8(buffer, PIT_TRANSACTIONS_CERTIFIED,
153 transactions_certified_aux);
154
155 uint64 transactions_applied_aux=
156 (uint64)m_transactions_applied;
157 encode_payload_item_int8(buffer, PIT_TRANSACTIONS_APPLIED,
158 transactions_applied_aux);
159
160 uint64 transactions_local_aux=
161 (uint64)m_transactions_local;
162 encode_payload_item_int8(buffer, PIT_TRANSACTIONS_LOCAL,
163 transactions_local_aux);
164
165 DBUG_VOID_RETURN;
166 }
167
168
169 void
decode_payload(const unsigned char * buffer,const unsigned char * end)170 Pipeline_stats_member_message::decode_payload(const unsigned char *buffer,
171 const unsigned char *end)
172 {
173 DBUG_ENTER("Pipeline_stats_member_message::decode_payload");
174 const unsigned char *slider= buffer;
175 uint16 payload_item_type= 0;
176
177 uint32 transactions_waiting_certification_aux= 0;
178 decode_payload_item_int4(&slider,
179 &payload_item_type,
180 &transactions_waiting_certification_aux);
181 m_transactions_waiting_certification=
182 (int32)transactions_waiting_certification_aux;
183
184 uint32 transactions_waiting_apply_aux= 0;
185 decode_payload_item_int4(&slider,
186 &payload_item_type,
187 &transactions_waiting_apply_aux);
188 m_transactions_waiting_apply=
189 (int32)transactions_waiting_apply_aux;
190
191 uint64 transactions_certified_aux= 0;
192 decode_payload_item_int8(&slider,
193 &payload_item_type,
194 &transactions_certified_aux);
195 m_transactions_certified=
196 (int64)transactions_certified_aux;
197
198 uint64 transactions_applied_aux= 0;
199 decode_payload_item_int8(&slider,
200 &payload_item_type,
201 &transactions_applied_aux);
202 m_transactions_applied=
203 (int64)transactions_applied_aux;
204
205 uint64 transactions_local_aux= 0;
206 decode_payload_item_int8(&slider,
207 &payload_item_type,
208 &transactions_local_aux);
209 m_transactions_local=
210 (int64)transactions_local_aux;
211
212 DBUG_VOID_RETURN;
213 }
214
215
Pipeline_stats_member_collector()216 Pipeline_stats_member_collector::Pipeline_stats_member_collector()
217 : m_transactions_waiting_apply(0), m_transactions_certified(0),
218 m_transactions_applied(0), m_transactions_local(0)
219 {
220 mysql_mutex_init(key_GR_LOCK_pipeline_stats_transactions_waiting_apply,
221 &m_transactions_waiting_apply_lock,
222 MY_MUTEX_INIT_FAST);
223 }
224
225
~Pipeline_stats_member_collector()226 Pipeline_stats_member_collector::~Pipeline_stats_member_collector()
227 {
228 mysql_mutex_destroy(&m_transactions_waiting_apply_lock);
229 }
230
231
232 void
increment_transactions_waiting_apply()233 Pipeline_stats_member_collector::increment_transactions_waiting_apply()
234 {
235 mysql_mutex_lock(&m_transactions_waiting_apply_lock);
236 assert(my_atomic_load32(&m_transactions_waiting_apply) >= 0);
237 my_atomic_add32(&m_transactions_waiting_apply, 1);
238 mysql_mutex_unlock(&m_transactions_waiting_apply_lock);
239 }
240
241
242 void
decrement_transactions_waiting_apply()243 Pipeline_stats_member_collector::decrement_transactions_waiting_apply()
244 {
245 mysql_mutex_lock(&m_transactions_waiting_apply_lock);
246 if (m_transactions_waiting_apply > 0)
247 my_atomic_add32(&m_transactions_waiting_apply, -1);
248 assert(my_atomic_load32(&m_transactions_waiting_apply) >= 0);
249 mysql_mutex_unlock(&m_transactions_waiting_apply_lock);
250 }
251
252
253 void
increment_transactions_certified()254 Pipeline_stats_member_collector::increment_transactions_certified()
255 {
256 my_atomic_add64(&m_transactions_certified, 1);
257 }
258
259
260 void
increment_transactions_applied()261 Pipeline_stats_member_collector::increment_transactions_applied()
262 {
263 my_atomic_add64(&m_transactions_applied, 1);
264 }
265
266
267 void
increment_transactions_local()268 Pipeline_stats_member_collector::increment_transactions_local()
269 {
270 my_atomic_add64(&m_transactions_local, 1);
271 }
272
get_transactions_waiting_apply()273 int32 Pipeline_stats_member_collector::get_transactions_waiting_apply()
274 {
275 return my_atomic_load32(&m_transactions_waiting_apply);
276 }
277
get_transactions_certified()278 int64 Pipeline_stats_member_collector::get_transactions_certified()
279 {
280 return my_atomic_load64(&m_transactions_certified);
281 }
282
get_transactions_applied()283 int64 Pipeline_stats_member_collector::get_transactions_applied()
284 {
285 return my_atomic_load64(&m_transactions_applied);
286 }
287
get_transactions_local()288 int64 Pipeline_stats_member_collector::get_transactions_local()
289 {
290 return my_atomic_load64(&m_transactions_local);
291 }
292
293 void
send_stats_member_message()294 Pipeline_stats_member_collector::send_stats_member_message()
295 {
296 if (local_member_info == NULL)
297 return; /* purecov: inspected */
298 Group_member_info::Group_member_status member_status=
299 local_member_info->get_recovery_status();
300 if (member_status != Group_member_info::MEMBER_ONLINE &&
301 member_status != Group_member_info::MEMBER_IN_RECOVERY)
302 return;
303
304 Pipeline_stats_member_message message(
305 static_cast<int32>(applier_module->get_message_queue_size()),
306 my_atomic_load32(&m_transactions_waiting_apply),
307 my_atomic_load64(&m_transactions_certified),
308 my_atomic_load64(&m_transactions_applied),
309 my_atomic_load64(&m_transactions_local));
310
311 enum_gcs_error msg_error= gcs_module->send_message(message, true);
312 if (msg_error != GCS_OK)
313 {
314 log_message(MY_INFORMATION_LEVEL,
315 "Error while sending stats message"); /* purecov: inspected */
316 }
317 }
318
319
Pipeline_member_stats()320 Pipeline_member_stats::Pipeline_member_stats()
321 : m_transactions_waiting_certification(0),
322 m_transactions_waiting_apply(0),
323 m_transactions_certified(0),
324 m_delta_transactions_certified(0),
325 m_transactions_applied(0),
326 m_delta_transactions_applied(0),
327 m_transactions_local(0),
328 m_delta_transactions_local(0),
329 m_transactions_negative_certified(0),
330 m_transactions_rows_validating(0),
331 m_transactions_committed_all_members(),
332 m_transaction_last_conflict_free(),
333 m_stamp(0)
334 {}
335
336
Pipeline_member_stats(Pipeline_stats_member_message & msg)337 Pipeline_member_stats::Pipeline_member_stats(Pipeline_stats_member_message &msg)
338 : m_transactions_waiting_certification(msg.get_transactions_waiting_certification()),
339 m_transactions_waiting_apply(msg.get_transactions_waiting_apply()),
340 m_transactions_certified(msg.get_transactions_certified()),
341 m_delta_transactions_certified(0),
342 m_transactions_applied(msg.get_transactions_applied()),
343 m_delta_transactions_applied(0),
344 m_transactions_local(msg.get_transactions_local()),
345 m_delta_transactions_local(0),
346 m_transactions_negative_certified(0),
347 m_transactions_rows_validating(0),
348 m_transactions_committed_all_members(),
349 m_transaction_last_conflict_free(),
350 m_stamp(0)
351 {}
352
Pipeline_member_stats(Pipeline_stats_member_collector * pipeline_stats,ulonglong applier_queue,ulonglong negative_certified,ulonglong certification_size)353 Pipeline_member_stats::Pipeline_member_stats(
354 Pipeline_stats_member_collector *pipeline_stats, ulonglong applier_queue,
355 ulonglong negative_certified, ulonglong certification_size)
356 : m_transactions_committed_all_members(),
357 m_transaction_last_conflict_free()
358 {
359 m_transactions_waiting_certification= applier_queue;
360 m_transactions_waiting_apply= pipeline_stats->get_transactions_waiting_apply();
361 m_transactions_certified= pipeline_stats->get_transactions_certified();
362 m_delta_transactions_certified= 0;
363 m_transactions_applied= pipeline_stats->get_transactions_applied();
364 m_delta_transactions_applied= 0;
365 m_transactions_local= pipeline_stats->get_transactions_local();
366 m_delta_transactions_local= 0;
367 m_transactions_negative_certified= negative_certified;
368 m_transactions_rows_validating= certification_size;
369 m_stamp= 0;
370 }
371
~Pipeline_member_stats()372 Pipeline_member_stats::~Pipeline_member_stats()
373 {}
374
375
376 void
update_member_stats(Pipeline_stats_member_message & msg,uint64 stamp)377 Pipeline_member_stats::update_member_stats(Pipeline_stats_member_message &msg,
378 uint64 stamp)
379 {
380 m_transactions_waiting_certification=
381 msg.get_transactions_waiting_certification();
382
383 m_transactions_waiting_apply=
384 msg.get_transactions_waiting_apply();
385
386 int64 previous_transactions_certified= m_transactions_certified;
387 m_transactions_certified= msg.get_transactions_certified();
388 m_delta_transactions_certified=
389 m_transactions_certified - previous_transactions_certified;
390
391 int64 previous_transactions_applied= m_transactions_applied;
392 m_transactions_applied= msg.get_transactions_applied();
393 m_delta_transactions_applied=
394 m_transactions_applied - previous_transactions_applied;
395
396 int64 previous_transactions_local= m_transactions_local;
397 m_transactions_local= msg.get_transactions_local();
398 m_delta_transactions_local=
399 m_transactions_local - previous_transactions_local;
400
401 m_stamp= stamp;
402 }
403
404
405 bool
is_flow_control_needed()406 Pipeline_member_stats::is_flow_control_needed()
407 {
408 return (m_transactions_waiting_certification > flow_control_certifier_threshold_var
409 || m_transactions_waiting_apply > flow_control_applier_threshold_var);
410 }
411
412
413 int32
get_transactions_waiting_certification()414 Pipeline_member_stats::get_transactions_waiting_certification()
415 {
416 return m_transactions_waiting_certification;
417 }
418
419
420 int32
get_transactions_waiting_apply()421 Pipeline_member_stats::get_transactions_waiting_apply()
422 {
423 return m_transactions_waiting_apply;
424 }
425
426
427 int64
get_delta_transactions_certified()428 Pipeline_member_stats::get_delta_transactions_certified()
429 {
430 return m_delta_transactions_certified;
431 }
432
433
434 int64
get_delta_transactions_applied()435 Pipeline_member_stats::get_delta_transactions_applied()
436 {
437 return m_delta_transactions_applied;
438 }
439
440
441 int64
get_delta_transactions_local()442 Pipeline_member_stats::get_delta_transactions_local()
443 {
444 return m_delta_transactions_local;
445 }
446
get_transactions_negative_certified()447 int64 Pipeline_member_stats::get_transactions_negative_certified()
448 {
449 return m_transactions_negative_certified;
450 }
451
get_transactions_rows_validating()452 int64 Pipeline_member_stats::get_transactions_rows_validating()
453 {
454 return m_transactions_rows_validating;
455 }
456
get_transaction_committed_all_members(std::string & value)457 void Pipeline_member_stats::get_transaction_committed_all_members(std::string &value)
458 {
459 value.assign(m_transactions_committed_all_members);
460 }
461
set_transaction_committed_all_members(char * str,size_t len)462 void Pipeline_member_stats::set_transaction_committed_all_members(char *str, size_t len)
463 {
464 m_transactions_committed_all_members.assign(str, len);
465 }
466
get_transaction_last_conflict_free(std::string & value)467 void Pipeline_member_stats::get_transaction_last_conflict_free(
468 std::string &value)
469 {
470 value.assign(m_transaction_last_conflict_free);
471 }
472
set_transaction_last_conflict_free(std::string & value)473 void Pipeline_member_stats::set_transaction_last_conflict_free(
474 std::string &value)
475 {
476 m_transaction_last_conflict_free.assign(value);
477 }
478
get_transactions_certified()479 int64 Pipeline_member_stats::get_transactions_certified()
480 {
481 return m_transactions_certified;
482 }
483
484 uint64
get_stamp()485 Pipeline_member_stats::get_stamp()
486 {
487 return m_stamp;
488 }
489
490
491 #ifndef NDEBUG
492 void
debug(const char * member,int64 quota_size,int64 quota_used)493 Pipeline_member_stats::debug(const char *member, int64 quota_size,
494 int64 quota_used)
495 {
496 log_message(MY_INFORMATION_LEVEL, "Flow control - update member stats: "
497 "%s stats: certifier_queue %d, applier_queue %d,"
498 " certified %ld (%ld), applied %ld (%ld), local %ld (%ld), quota %ld (%ld)",
499 member, m_transactions_waiting_certification,
500 m_transactions_waiting_apply, m_transactions_certified,
501 m_delta_transactions_certified, m_transactions_applied,
502 m_delta_transactions_applied, m_transactions_local,
503 m_delta_transactions_local, quota_size, quota_used); /* purecov: inspected */
504 }
505 #endif
506
507
Flow_control_module()508 Flow_control_module::Flow_control_module()
509 : m_holds_in_period(0), m_quota_used(0), m_quota_size(0), m_stamp(0)
510 {
511 mysql_mutex_init(key_GR_LOCK_pipeline_stats_flow_control, &m_flow_control_lock, MY_MUTEX_INIT_FAST);
512 mysql_cond_init(key_GR_COND_pipeline_stats_flow_control, &m_flow_control_cond);
513 }
514
515
~Flow_control_module()516 Flow_control_module::~Flow_control_module()
517 {
518 mysql_mutex_destroy(&m_flow_control_lock);
519 mysql_cond_destroy(&m_flow_control_cond);
520 }
521
522
523 void
flow_control_step()524 Flow_control_module::flow_control_step()
525 {
526 m_stamp++;
527 int32 holds= my_atomic_fas32(&m_holds_in_period, 0);
528
529 switch(static_cast<Flow_control_mode>(flow_control_mode_var))
530 {
531 case FCM_QUOTA:
532 {
533 /*
534 Postponed transactions
535 */
536 int64 quota_size= my_atomic_fas64(&m_quota_size, 0);
537 int64 quota_used= my_atomic_fas64(&m_quota_used, 0);
538 int64 extra_quota=
539 (quota_size > 0 && quota_used > quota_size) ? quota_used - quota_size : 0;
540
541 /*
542 Release waiting transactions on do_wait().
543 */
544 if (extra_quota > 0)
545 {
546 mysql_mutex_lock(&m_flow_control_lock);
547 mysql_cond_broadcast(&m_flow_control_cond);
548 mysql_mutex_unlock(&m_flow_control_lock);
549 }
550
551 if (holds > 0)
552 {
553 uint num_writing_members= 0;
554 int64 min_certifier_capacity= MAXTPS, min_applier_capacity= MAXTPS, safe_capacity= MAXTPS;
555
556 Flow_control_module_info::iterator it= m_info.begin();
557 while (it != m_info.end())
558 {
559 if (it->second.get_stamp() < (m_stamp - 10))
560 {
561 /*
562 Purge member stats that were not updated on the last
563 10 flow control steps.
564 */
565 m_info.erase(it++);
566 }
567 else
568 {
569 if (flow_control_certifier_threshold_var > 0
570 && it->second.get_delta_transactions_certified() > 0
571 && it->second.get_transactions_waiting_certification() - flow_control_certifier_threshold_var > 0
572 && min_certifier_capacity > it->second.get_delta_transactions_certified())
573 min_certifier_capacity= it->second.get_delta_transactions_certified();
574
575 if (it->second.get_delta_transactions_certified() > 0)
576 safe_capacity= std::min(safe_capacity, it->second.get_delta_transactions_certified());
577
578 if (flow_control_applier_threshold_var > 0
579 && it->second.get_delta_transactions_applied() > 0
580 && it->second.get_transactions_waiting_apply() - flow_control_applier_threshold_var > 0
581 && min_applier_capacity > it->second.get_delta_transactions_applied())
582 min_applier_capacity= it->second.get_delta_transactions_applied();
583
584 if (it->second.get_delta_transactions_applied() > 0)
585 safe_capacity= std::min(safe_capacity, it->second.get_delta_transactions_applied());
586
587 if (it->second.get_delta_transactions_local() > 0)
588 num_writing_members++;
589
590 ++it;
591 }
592 }
593
594 // Avoid division by zero.
595 num_writing_members= num_writing_members > 0 ? num_writing_members : 1;
596 int64 min_capacity= (min_certifier_capacity > 0 && min_certifier_capacity < min_applier_capacity)
597 ? min_certifier_capacity : min_applier_capacity;
598
599 // Minimum capacity will never be less than lim_throttle.
600 int64 lim_throttle= static_cast<int64>(0.05 * std::min(flow_control_certifier_threshold_var,
601 flow_control_applier_threshold_var));
602 min_capacity= std::max(std::min(min_capacity, safe_capacity), lim_throttle);
603 quota_size= static_cast<int64>((min_capacity * HOLD_FACTOR) / num_writing_members - extra_quota);
604 my_atomic_store64(&m_quota_size, quota_size > 1 ? quota_size : 1);
605 }
606 else
607 {
608 if (quota_size > 0 && (quota_size * RELEASE_FACTOR) < MAXTPS)
609 {
610 int64 quota_size_next= static_cast<int64>(quota_size * RELEASE_FACTOR);
611 quota_size= quota_size_next > quota_size ? quota_size_next : quota_size + 1;
612 }
613 else
614 quota_size= 0;
615
616 my_atomic_store64(&m_quota_size, quota_size);
617 }
618
619 my_atomic_store64(&m_quota_used, 0);
620 break;
621 }
622
623 case FCM_DISABLED:
624 my_atomic_store64(&m_quota_size, 0);
625 my_atomic_store64(&m_quota_used, 0);
626 break;
627
628 default:
629 assert(0);
630 }
631 }
632
633
634 int
handle_stats_data(const uchar * data,uint64 len,const std::string & member_id)635 Flow_control_module::handle_stats_data(const uchar *data,
636 uint64 len,
637 const std::string& member_id)
638 {
639 DBUG_ENTER("Flow_control_module::handle_stats_data");
640 int error= 0;
641 Pipeline_stats_member_message message(data, len);
642
643 /*
644 This method is called synchronously by communication layer, so
645 we do not need concurrency control.
646 */
647 Flow_control_module_info::iterator it= m_info.find(member_id);
648 if (it == m_info.end())
649 {
650 Pipeline_member_stats stats;
651
652 std::pair<Flow_control_module_info::iterator, bool> ret=
653 m_info.insert(std::pair<std::string, Pipeline_member_stats>
654 (member_id, stats));
655 error= !ret.second;
656 it= ret.first;
657 }
658 it->second.update_member_stats(message, m_stamp);
659
660 /*
661 Verify if flow control is required.
662 */
663 if (it->second.is_flow_control_needed())
664 {
665 my_atomic_add32(&m_holds_in_period, 1);
666 #ifndef NDEBUG
667 it->second.debug(it->first.c_str(),
668 my_atomic_load64(&m_quota_size),
669 my_atomic_load64(&m_quota_used));
670 #endif
671 }
672
673 DBUG_RETURN(error);
674 }
675
676
677 int32
do_wait()678 Flow_control_module::do_wait()
679 {
680 DBUG_ENTER("Flow_control_module::do_wait");
681 int64 quota_size= my_atomic_load64(&m_quota_size);
682 int64 quota_used= my_atomic_add64(&m_quota_used, 1);
683
684 if (quota_used > quota_size && quota_size != 0)
685 {
686 struct timespec delay;
687 set_timespec(&delay, 1);
688
689 mysql_mutex_lock(&m_flow_control_lock);
690 mysql_cond_timedwait(&m_flow_control_cond, &m_flow_control_lock, &delay);
691 mysql_mutex_unlock(&m_flow_control_lock);
692 }
693
694 DBUG_RETURN(0);
695 }
696