1 /*
2 Copyright 2019 Google LLC
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 https://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "apib/apib_reporting.h"
18
19 #include <arpa/inet.h>
20 #include <netdb.h>
21 #include <netinet/in.h>
22 #include <sys/socket.h>
23 #include <sys/types.h>
24 #include <unistd.h>
25
26 #include <algorithm>
27 #include <atomic>
28 #include <cassert>
29 #include <cmath>
30 #include <iomanip>
31 #include <iostream>
32 #include <mutex>
33 #include <numeric>
34 #include <valarray>
35 #include <vector>
36
37 #include "absl/strings/numbers.h"
38 #include "absl/strings/str_format.h"
39 #include "apib/apib_cpu.h"
40 #include "apib/apib_time.h"
41
42 using absl::StrFormat;
43 using std::cerr;
44 using std::endl;
45
46 namespace apib {
47
48 static const std::string kCPUCmd("cpu\n");
49 static const std::string kMemCmd("mem\n");
50
51 static std::mutex latch;
52 static volatile bool reporting = 0;
53 static bool cpuAvailable = false;
54 static std::atomic_int_fast32_t socketErrors;
55 static std::atomic_int_fast32_t connectionsOpened;
56
57 static int_fast32_t successfulRequests;
58 static int_fast32_t unsuccessfulRequests;
59
60 static int64_t startTime;
61 static int64_t stopTime;
62 static int64_t intervalStartTime;
63
64 static std::vector<std::unique_ptr<Counters>> accumulatedResults;
65
66 static std::vector<double> clientSamples;
67 static std::vector<double> remoteSamples;
68 static std::vector<double> remote2Samples;
69
70 static double clientMem = 0.0;
71 static double remoteMem = 0.0;
72 static double remote2Mem = 0.0;
73
74 static CPUUsage cpuUsage;
75 static int remoteCpuSocket = 0;
76 static int remote2CpuSocket = 0;
77 static std::string remoteMonitorHost;
78 static std::string remote2MonitorHost;
79
80 static int64_t totalBytesSent = 0LL;
81 static int64_t totalBytesReceived = 0LL;
82
connectMonitor(absl::string_view hn,int * fd)83 static void connectMonitor(absl::string_view hn, int* fd) {
84 assert(fd != NULL);
85
86 const auto colonPos = hn.find(':');
87 if ((colonPos == absl::string_view::npos) || (colonPos == hn.size())) {
88 cerr << "Invalid monitor host \"" << hn << "\"\n";
89 return;
90 }
91
92 const std::string hostName = std::string(hn.substr(0, colonPos));
93 absl::string_view portStr = hn.substr(colonPos + 1, hn.size() - colonPos - 1);
94 int port;
95 const bool portStrOk = absl::SimpleAtoi(portStr, &port);
96 if (!portStrOk) {
97 cerr << "Invalid monitor host and port \"" << hn << "\"\n";
98 return;
99 }
100
101 struct addrinfo hints;
102 // For now, look up only IP V4 addresses
103 hints.ai_family = AF_INET;
104 hints.ai_socktype = SOCK_STREAM;
105 hints.ai_protocol = 0;
106 hints.ai_flags = 0;
107
108 struct addrinfo* hostInfo = NULL;
109 int err = getaddrinfo(hostName.c_str(), nullptr, &hints, &hostInfo);
110 if (err != 0) {
111 cerr << "Cannot look up remote monitoring host: " << errno << endl;
112 goto done;
113 }
114
115 *fd = socket(hostInfo->ai_family, SOCK_STREAM, 0);
116 assert(*fd > 0);
117
118 // IP4 and IP6 versions of this should have port in same place
119 ((struct sockaddr_in*)hostInfo->ai_addr)->sin_port = htons(port);
120
121 err = connect(*fd, hostInfo->ai_addr, hostInfo->ai_addrlen);
122 if (err != 0) {
123 cerr << "Connection error: " << errno << endl
124 << "Cannot connect to remote monitoring host \"" << hostName
125 << " on port " << port << endl;
126 close(*fd);
127 goto done;
128 }
129
130 done:
131 if (hostInfo != NULL) {
132 freeaddrinfo(hostInfo);
133 }
134 }
135
getRemoteStat(const std::string & cmd,int * fd)136 static double getRemoteStat(const std::string& cmd, int* fd) {
137 assert(fd != NULL);
138 char buf[64];
139 int rc;
140
141 const ssize_t wc = write(*fd, cmd.data(), cmd.size());
142 if (wc != (ssize_t)cmd.size()) {
143 cerr << "Error writing to monitoring server: " << errno << endl;
144 goto failure;
145 }
146
147 rc = read(*fd, buf, 64);
148 if (rc <= 0) {
149 cerr << "Error reading from monitoring server: " << errno << endl;
150 goto failure;
151 }
152
153 return strtod(buf, NULL);
154
155 failure:
156 close(*fd);
157 *fd = 0;
158 return 0.0;
159 }
160
RecordSocketError(void)161 void RecordSocketError(void) {
162 if (!reporting) {
163 return;
164 }
165 socketErrors++;
166 }
167
RecordConnectionOpen(void)168 void RecordConnectionOpen(void) {
169 if (!reporting) {
170 return;
171 }
172 connectionsOpened++;
173 }
174
RecordByteCounts(int64_t sent,int64_t received)175 void RecordByteCounts(int64_t sent, int64_t received) {
176 totalBytesSent += sent;
177 totalBytesReceived += received;
178 }
179
RecordInit(const std::string & monitorHost,const std::string & host2)180 void RecordInit(const std::string& monitorHost, const std::string& host2) {
181 int err = cpu_Init();
182 cpuAvailable = (err == 0);
183 remoteMonitorHost = monitorHost;
184 remote2MonitorHost = host2;
185 }
186
RecordStart(bool startReporting,const ThreadList & threads)187 void RecordStart(bool startReporting, const ThreadList& threads) {
188 /* When we warm up we want to zero these out before continuing */
189 std::lock_guard<std::mutex> lock(latch);
190 successfulRequests = 0;
191 unsuccessfulRequests = 0;
192 socketErrors = 0;
193 connectionsOpened = 0;
194 totalBytesSent = 0;
195 totalBytesReceived = 0;
196 accumulatedResults.clear();
197
198 // We also want to zero out each thread's counters
199 // since they may have started already!
200 for (auto it = threads.cbegin(); it != threads.cend(); it++) {
201 Counters* c = (*it)->exchangeCounters();
202 delete c;
203 }
204
205 reporting = startReporting;
206 cpu_GetUsage(&cpuUsage);
207
208 if (!remoteMonitorHost.empty()) {
209 if (remoteCpuSocket == 0) {
210 connectMonitor(remoteMonitorHost, &remoteCpuSocket);
211 } else {
212 // Just re-set the CPU time
213 getRemoteStat(kCPUCmd, &remoteCpuSocket);
214 }
215 }
216 if (!remote2MonitorHost.empty()) {
217 if (remote2CpuSocket == 0) {
218 connectMonitor(remote2MonitorHost, &remote2CpuSocket);
219 } else {
220 // Just re-set the CPU time
221 getRemoteStat(kCPUCmd, &remote2CpuSocket);
222 }
223 }
224
225 startTime = GetTime();
226 intervalStartTime = startTime;
227
228 clientSamples.clear();
229 remoteSamples.clear();
230 remote2Samples.clear();
231 }
232
RecordStop(const ThreadList & threads)233 void RecordStop(const ThreadList& threads) {
234 SampleCPU();
235 clientMem = cpu_GetMemoryUsage();
236 if (remoteCpuSocket != 0) {
237 remoteMem = getRemoteStat(kMemCmd, &remoteCpuSocket);
238 }
239 if (remote2CpuSocket != 0) {
240 remote2Mem = getRemoteStat(kMemCmd, &remote2CpuSocket);
241 }
242
243 reporting = false;
244 for (auto it = threads.cbegin(); it != threads.cend(); it++) {
245 Counters* c = (*it)->exchangeCounters();
246 totalBytesReceived += c->bytesRead;
247 totalBytesSent += c->bytesWritten;
248 successfulRequests += c->successfulRequests;
249 unsuccessfulRequests += c->failedRequests;
250 accumulatedResults.push_back(std::unique_ptr<Counters>(c));
251 }
252 stopTime = GetTime();
253 }
254
ReportIntervalResults(const ThreadList & threads)255 BenchmarkIntervalResults ReportIntervalResults(const ThreadList& threads) {
256 int_fast32_t intervalSuccesses = 0LL;
257 int_fast32_t intervalFailures = 0LL;
258 const int64_t now = GetTime();
259
260 for (auto it = threads.cbegin(); it != threads.cend(); it++) {
261 Counters* c = (*it)->exchangeCounters();
262 totalBytesReceived += c->bytesRead;
263 totalBytesSent += c->bytesWritten;
264 intervalSuccesses += c->successfulRequests;
265 intervalFailures += c->failedRequests;
266 accumulatedResults.push_back(std::unique_ptr<Counters>(c));
267 }
268
269 // "exchangeCounters" clears thread-specific counters. Transfer new totals
270 // to the grand total for an accurate result.
271 successfulRequests += intervalSuccesses;
272 unsuccessfulRequests += intervalFailures;
273
274 BenchmarkIntervalResults r;
275 r.successfulRequests = intervalSuccesses;
276 r.intervalTime = Seconds(now - intervalStartTime);
277 r.elapsedTime = Seconds(now - startTime);
278 r.averageThroughput = (double)r.successfulRequests / r.intervalTime;
279 intervalStartTime = now;
280 return r;
281 }
282
SampleCPU()283 void SampleCPU() {
284 if (remoteCpuSocket != 0) {
285 const double remoteCpu = getRemoteStat(kCPUCmd, &remoteCpuSocket);
286 remoteSamples.push_back(remoteCpu);
287 }
288 if (remote2CpuSocket != 0) {
289 const double remote2Cpu = getRemoteStat(kCPUCmd, &remote2CpuSocket);
290 remote2Samples.push_back(remote2Cpu);
291 }
292 const double cpu = cpu_GetInterval(&cpuUsage);
293 clientSamples.push_back(cpu);
294 }
295
ReportInterval(std::ostream & out,const ThreadList & threads,int totalDuration,bool warmup)296 void ReportInterval(std::ostream& out, const ThreadList& threads,
297 int totalDuration, bool warmup) {
298 double cpu = 0.0;
299 double remoteCpu = 0.0;
300 double remote2Cpu = 0.0;
301
302 if (remoteCpuSocket != 0) {
303 remoteCpu = getRemoteStat(kCPUCmd, &remoteCpuSocket);
304 remoteSamples.push_back(remoteCpu);
305 }
306 if (remote2CpuSocket != 0) {
307 remote2Cpu = getRemoteStat(kCPUCmd, &remote2CpuSocket);
308 remote2Samples.push_back(remote2Cpu);
309 }
310 cpu = cpu_GetInterval(&cpuUsage);
311 clientSamples.push_back(cpu);
312
313 const BenchmarkIntervalResults r = ReportIntervalResults(threads);
314 const std::string warm = (warmup ? "Warming up: " : "");
315
316 out << StrFormat("%s(%.0f / %i) %.3f", warm, r.elapsedTime, totalDuration,
317 r.averageThroughput);
318 if (cpu > 0.0) {
319 out << StrFormat(" %.0f%% cpu", cpu * 100.0);
320 }
321 if (remoteCpu > 0.0) {
322 out << StrFormat(" %.0f%% remote cpu", remoteCpu * 100.0);
323 }
324 out << endl;
325 }
326
getLatencyPercent(const std::vector<int_fast64_t> & latencies,int percent)327 static int64_t getLatencyPercent(const std::vector<int_fast64_t>& latencies,
328 int percent) {
329 if (latencies.empty()) {
330 return 0;
331 }
332 if (percent == 100) {
333 return latencies[latencies.size() - 1];
334 }
335 size_t index = (latencies.size() / 100.0) * percent;
336 return latencies[index];
337 }
338
getAverageLatency(const std::vector<int_fast64_t> & latencies)339 static int64_t getAverageLatency(const std::vector<int_fast64_t>& latencies) {
340 if (latencies.empty()) {
341 return 0LL;
342 }
343 return std::accumulate(latencies.begin(), latencies.end(), 0LL) /
344 (int64_t)latencies.size();
345 }
346
getLatencyStdDev(const std::vector<int_fast64_t> & latencies)347 static double getLatencyStdDev(const std::vector<int_fast64_t>& latencies) {
348 if (latencies.empty()) {
349 return 0.0;
350 }
351 unsigned long avg = Milliseconds(getAverageLatency(latencies));
352 double differences = 0.0;
353
354 std::for_each(latencies.begin(), latencies.end(),
355 [&differences, avg](const int_fast64_t& l) {
356 differences += pow(Milliseconds(l) - avg, 2.0);
357 });
358
359 return sqrt(differences / (double)latencies.size());
360 }
361
getAverageCpu(const std::vector<double> & s)362 static double getAverageCpu(const std::vector<double>& s) {
363 if (s.empty()) {
364 return 0.0;
365 }
366 double total = 0.0;
367 std::for_each(s.cbegin(), s.cend(), [&total](double d) { total += d; });
368 return total / s.size();
369 }
370
getMaxCpu(const std::vector<double> & s)371 static double getMaxCpu(const std::vector<double>& s) {
372 if (s.empty()) {
373 return 0.0;
374 }
375 return *(std::max_element(s.cbegin(), s.cend()));
376 }
377
ReportResults()378 BenchmarkResults ReportResults() {
379 std::vector<int_fast64_t> allLatencies;
380 for (auto it = accumulatedResults.begin(); it != accumulatedResults.end();
381 it++) {
382 for (auto lit = (*it)->latencies.begin(); lit != (*it)->latencies.end();
383 lit++) {
384 allLatencies.push_back(*lit);
385 }
386 }
387 std::sort(allLatencies.begin(), allLatencies.end());
388
389 BenchmarkResults r;
390 std::lock_guard<std::mutex> lock(latch);
391
392 r.completedRequests = successfulRequests + unsuccessfulRequests;
393 r.successfulRequests = successfulRequests;
394 r.unsuccessfulRequests = unsuccessfulRequests;
395 r.socketErrors = socketErrors;
396 r.connectionsOpened = connectionsOpened;
397 r.totalBytesSent = totalBytesSent;
398 r.totalBytesReceived = totalBytesReceived;
399
400 const int64_t rawElapsed = stopTime - startTime;
401 r.elapsedTime = Seconds(rawElapsed);
402 r.averageLatency = Milliseconds(getAverageLatency(allLatencies));
403 r.latencyStdDev = getLatencyStdDev(allLatencies);
404 for (int i = 0; i < 101; i++) {
405 r.latencies[i] = Milliseconds(getLatencyPercent(allLatencies, i));
406 }
407 r.averageThroughput = (double)r.completedRequests / r.elapsedTime;
408 r.averageSendBandwidth = (totalBytesSent * 8.0 / 1048576.0) / r.elapsedTime;
409 r.averageReceiveBandwidth =
410 (totalBytesReceived * 8.0 / 1048576.0) / r.elapsedTime;
411 return r;
412 }
413
PrintFullResults(std::ostream & out)414 void PrintFullResults(std::ostream& out) {
415 const BenchmarkResults r = ReportResults();
416
417 out << StrFormat("Duration: %.3f seconds\n", r.elapsedTime);
418 out << StrFormat("Attempted requests: %i\n", r.completedRequests);
419 out << StrFormat("Successful requests: %i\n", r.successfulRequests);
420 out << StrFormat("Non-200 results: %i\n", r.unsuccessfulRequests);
421 out << StrFormat("Connections opened: %i\n", r.connectionsOpened);
422 out << StrFormat("Socket errors: %i\n", r.socketErrors);
423 out << '\n';
424 out << StrFormat("Throughput: %.3f requests/second\n",
425 r.averageThroughput);
426 out << StrFormat("Average latency: %.3f milliseconds\n",
427 r.averageLatency);
428 out << StrFormat("Minimum latency: %.3f milliseconds\n", r.latencies[0]);
429 out << StrFormat("Maximum latency: %.3f milliseconds\n",
430 r.latencies[100]);
431 out << StrFormat("Latency std. dev: %.3f milliseconds\n",
432 r.latencyStdDev);
433 out << StrFormat("50%% latency: %.3f milliseconds\n",
434 r.latencies[50]);
435 out << StrFormat("90%% latency: %.3f milliseconds\n",
436 r.latencies[90]);
437 out << StrFormat("98%% latency: %.3f milliseconds\n",
438 r.latencies[98]);
439 out << StrFormat("99%% latency: %.3f milliseconds\n",
440 r.latencies[99]);
441 out << '\n';
442 if (!clientSamples.empty()) {
443 out << StrFormat("Client CPU average: %.0f%%\n",
444 getAverageCpu(clientSamples) * 100.0);
445 out << StrFormat("Client CPU max: %.0f%%\n",
446 getMaxCpu(clientSamples) * 100.0);
447 }
448 out << StrFormat("Client memory usage: %.0f%%\n", clientMem * 100.0);
449 if (!remoteSamples.empty()) {
450 out << StrFormat("Remote CPU average: %.0f%%\n",
451 getAverageCpu(remoteSamples) * 100.0);
452 out << StrFormat("Remote CPU max: %.0f%%\n",
453 getMaxCpu(remoteSamples) * 100.0);
454 out << StrFormat("Remote memory usage: %.0f%%\n", remoteMem * 100.0);
455 }
456 if (!remote2Samples.empty()) {
457 out << StrFormat("Remote 2 CPU average: %.0f%%\n",
458 getAverageCpu(remote2Samples) * 100.0);
459 out << StrFormat("Remote 2 CPU max: %.0f%%\n",
460 getMaxCpu(remote2Samples) * 100.0);
461 out << StrFormat("Remote 2 memory usage: %.0f%%\n", remote2Mem * 100.0);
462 }
463 out << '\n';
464 out << StrFormat("Total bytes sent: %.2f megabytes\n",
465 r.totalBytesSent / 1048576.0);
466 out << StrFormat("Total bytes received: %.2f megabytes\n",
467 r.totalBytesReceived / 1048576.0);
468 out << StrFormat("Send bandwidth: %.2f megabits / second\n",
469 r.averageSendBandwidth);
470 out << StrFormat("Receive bandwidth: %.2f megabits / second\n",
471 r.averageReceiveBandwidth);
472 }
473
PrintShortResults(std::ostream & out,const std::string & runName,size_t numThreads,int connections)474 void PrintShortResults(std::ostream& out, const std::string& runName,
475 size_t numThreads, int connections) {
476 const BenchmarkResults r = ReportResults();
477
478 // See "PrintReportingHeader for column names
479 out << StrFormat(
480 "%s,%.3f,%.3f,%i,%i,%.3f,%i,%i,%i,%i,%.3f,%.3f,%.3f,%.3f,%.3f,%."
481 "3f,%.3f,%.0f,%.0f,%.0f,%.0f,%.0f,%.0f,%.2f,%.2f\n",
482 runName, r.averageThroughput, r.averageLatency, numThreads, connections,
483 r.elapsedTime, r.completedRequests, r.successfulRequests, r.socketErrors,
484 r.connectionsOpened, r.latencies[0], r.latencies[100], r.latencies[50],
485 r.latencies[90], r.latencies[98], r.latencies[99], r.latencyStdDev,
486 getAverageCpu(clientSamples) * 100.0,
487 getAverageCpu(remoteSamples) * 100.0,
488 getAverageCpu(remote2Samples) * 100.0, clientMem * 100.0,
489 remoteMem * 100.0, remote2Mem * 100.0, r.averageSendBandwidth,
490 r.averageReceiveBandwidth);
491 }
492
PrintReportingHeader(std::ostream & out)493 void PrintReportingHeader(std::ostream& out) {
494 out << "Name,Throughput,Avg. Latency,Threads,Connections,Duration,"
495 "Completed,Successful,Errors,Sockets,"
496 "Min. latency,Max. latency,50% Latency,90% Latency,"
497 "98% Latency,99% Latency,Latency Std Dev,Avg Client CPU,"
498 "Avg Server CPU,Avg Server 2 CPU,"
499 "Client Mem Usage,Server Mem,Server 2 Mem,"
500 "Avg. Send Bandwidth,Avg. Recv. Bandwidth\n";
501 }
502
EndReporting()503 void EndReporting() {
504 if (remoteCpuSocket != 0) {
505 close(remoteCpuSocket);
506 }
507 if (remote2CpuSocket != 0) {
508 close(remote2CpuSocket);
509 }
510 }
511
512 } // namespace apib
513