1 /*
2 Copyright 2019 Google LLC
3 
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7 
8     https://www.apache.org/licenses/LICENSE-2.0
9 
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16 
17 #include "apib/apib_reporting.h"
18 
19 #include <arpa/inet.h>
20 #include <netdb.h>
21 #include <netinet/in.h>
22 #include <sys/socket.h>
23 #include <sys/types.h>
24 #include <unistd.h>
25 
26 #include <algorithm>
27 #include <atomic>
28 #include <cassert>
29 #include <cmath>
30 #include <iomanip>
31 #include <iostream>
32 #include <mutex>
33 #include <numeric>
34 #include <valarray>
35 #include <vector>
36 
37 #include "absl/strings/numbers.h"
38 #include "absl/strings/str_format.h"
39 #include "apib/apib_cpu.h"
40 #include "apib/apib_time.h"
41 
42 using absl::StrFormat;
43 using std::cerr;
44 using std::endl;
45 
46 namespace apib {
47 
48 static const std::string kCPUCmd("cpu\n");
49 static const std::string kMemCmd("mem\n");
50 
51 static std::mutex latch;
52 static volatile bool reporting = 0;
53 static bool cpuAvailable = false;
54 static std::atomic_int_fast32_t socketErrors;
55 static std::atomic_int_fast32_t connectionsOpened;
56 
57 static int_fast32_t successfulRequests;
58 static int_fast32_t unsuccessfulRequests;
59 
60 static int64_t startTime;
61 static int64_t stopTime;
62 static int64_t intervalStartTime;
63 
64 static std::vector<std::unique_ptr<Counters>> accumulatedResults;
65 
66 static std::vector<double> clientSamples;
67 static std::vector<double> remoteSamples;
68 static std::vector<double> remote2Samples;
69 
70 static double clientMem = 0.0;
71 static double remoteMem = 0.0;
72 static double remote2Mem = 0.0;
73 
74 static CPUUsage cpuUsage;
75 static int remoteCpuSocket = 0;
76 static int remote2CpuSocket = 0;
77 static std::string remoteMonitorHost;
78 static std::string remote2MonitorHost;
79 
80 static int64_t totalBytesSent = 0LL;
81 static int64_t totalBytesReceived = 0LL;
82 
connectMonitor(absl::string_view hn,int * fd)83 static void connectMonitor(absl::string_view hn, int* fd) {
84   assert(fd != NULL);
85 
86   const auto colonPos = hn.find(':');
87   if ((colonPos == absl::string_view::npos) || (colonPos == hn.size())) {
88     cerr << "Invalid monitor host \"" << hn << "\"\n";
89     return;
90   }
91 
92   const std::string hostName = std::string(hn.substr(0, colonPos));
93   absl::string_view portStr = hn.substr(colonPos + 1, hn.size() - colonPos - 1);
94   int port;
95   const bool portStrOk = absl::SimpleAtoi(portStr, &port);
96   if (!portStrOk) {
97     cerr << "Invalid monitor host and port \"" << hn << "\"\n";
98     return;
99   }
100 
101   struct addrinfo hints;
102   // For now, look up only IP V4 addresses
103   hints.ai_family = AF_INET;
104   hints.ai_socktype = SOCK_STREAM;
105   hints.ai_protocol = 0;
106   hints.ai_flags = 0;
107 
108   struct addrinfo* hostInfo = NULL;
109   int err = getaddrinfo(hostName.c_str(), nullptr, &hints, &hostInfo);
110   if (err != 0) {
111     cerr << "Cannot look up remote monitoring host: " << errno << endl;
112     goto done;
113   }
114 
115   *fd = socket(hostInfo->ai_family, SOCK_STREAM, 0);
116   assert(*fd > 0);
117 
118   // IP4 and IP6 versions of this should have port in same place
119   ((struct sockaddr_in*)hostInfo->ai_addr)->sin_port = htons(port);
120 
121   err = connect(*fd, hostInfo->ai_addr, hostInfo->ai_addrlen);
122   if (err != 0) {
123     cerr << "Connection error: " << errno << endl
124          << "Cannot connect to remote monitoring host \"" << hostName
125          << " on port " << port << endl;
126     close(*fd);
127     goto done;
128   }
129 
130 done:
131   if (hostInfo != NULL) {
132     freeaddrinfo(hostInfo);
133   }
134 }
135 
getRemoteStat(const std::string & cmd,int * fd)136 static double getRemoteStat(const std::string& cmd, int* fd) {
137   assert(fd != NULL);
138   char buf[64];
139   int rc;
140 
141   const ssize_t wc = write(*fd, cmd.data(), cmd.size());
142   if (wc != (ssize_t)cmd.size()) {
143     cerr << "Error writing to monitoring server: " << errno << endl;
144     goto failure;
145   }
146 
147   rc = read(*fd, buf, 64);
148   if (rc <= 0) {
149     cerr << "Error reading from monitoring server: " << errno << endl;
150     goto failure;
151   }
152 
153   return strtod(buf, NULL);
154 
155 failure:
156   close(*fd);
157   *fd = 0;
158   return 0.0;
159 }
160 
RecordSocketError(void)161 void RecordSocketError(void) {
162   if (!reporting) {
163     return;
164   }
165   socketErrors++;
166 }
167 
RecordConnectionOpen(void)168 void RecordConnectionOpen(void) {
169   if (!reporting) {
170     return;
171   }
172   connectionsOpened++;
173 }
174 
RecordByteCounts(int64_t sent,int64_t received)175 void RecordByteCounts(int64_t sent, int64_t received) {
176   totalBytesSent += sent;
177   totalBytesReceived += received;
178 }
179 
RecordInit(const std::string & monitorHost,const std::string & host2)180 void RecordInit(const std::string& monitorHost, const std::string& host2) {
181   int err = cpu_Init();
182   cpuAvailable = (err == 0);
183   remoteMonitorHost = monitorHost;
184   remote2MonitorHost = host2;
185 }
186 
RecordStart(bool startReporting,const ThreadList & threads)187 void RecordStart(bool startReporting, const ThreadList& threads) {
188   /* When we warm up we want to zero these out before continuing */
189   std::lock_guard<std::mutex> lock(latch);
190   successfulRequests = 0;
191   unsuccessfulRequests = 0;
192   socketErrors = 0;
193   connectionsOpened = 0;
194   totalBytesSent = 0;
195   totalBytesReceived = 0;
196   accumulatedResults.clear();
197 
198   // We also want to zero out each thread's counters
199   // since they may have started already!
200   for (auto it = threads.cbegin(); it != threads.cend(); it++) {
201     Counters* c = (*it)->exchangeCounters();
202     delete c;
203   }
204 
205   reporting = startReporting;
206   cpu_GetUsage(&cpuUsage);
207 
208   if (!remoteMonitorHost.empty()) {
209     if (remoteCpuSocket == 0) {
210       connectMonitor(remoteMonitorHost, &remoteCpuSocket);
211     } else {
212       // Just re-set the CPU time
213       getRemoteStat(kCPUCmd, &remoteCpuSocket);
214     }
215   }
216   if (!remote2MonitorHost.empty()) {
217     if (remote2CpuSocket == 0) {
218       connectMonitor(remote2MonitorHost, &remote2CpuSocket);
219     } else {
220       // Just re-set the CPU time
221       getRemoteStat(kCPUCmd, &remote2CpuSocket);
222     }
223   }
224 
225   startTime = GetTime();
226   intervalStartTime = startTime;
227 
228   clientSamples.clear();
229   remoteSamples.clear();
230   remote2Samples.clear();
231 }
232 
RecordStop(const ThreadList & threads)233 void RecordStop(const ThreadList& threads) {
234   SampleCPU();
235   clientMem = cpu_GetMemoryUsage();
236   if (remoteCpuSocket != 0) {
237     remoteMem = getRemoteStat(kMemCmd, &remoteCpuSocket);
238   }
239   if (remote2CpuSocket != 0) {
240     remote2Mem = getRemoteStat(kMemCmd, &remote2CpuSocket);
241   }
242 
243   reporting = false;
244   for (auto it = threads.cbegin(); it != threads.cend(); it++) {
245     Counters* c = (*it)->exchangeCounters();
246     totalBytesReceived += c->bytesRead;
247     totalBytesSent += c->bytesWritten;
248     successfulRequests += c->successfulRequests;
249     unsuccessfulRequests += c->failedRequests;
250     accumulatedResults.push_back(std::unique_ptr<Counters>(c));
251   }
252   stopTime = GetTime();
253 }
254 
ReportIntervalResults(const ThreadList & threads)255 BenchmarkIntervalResults ReportIntervalResults(const ThreadList& threads) {
256   int_fast32_t intervalSuccesses = 0LL;
257   int_fast32_t intervalFailures = 0LL;
258   const int64_t now = GetTime();
259 
260   for (auto it = threads.cbegin(); it != threads.cend(); it++) {
261     Counters* c = (*it)->exchangeCounters();
262     totalBytesReceived += c->bytesRead;
263     totalBytesSent += c->bytesWritten;
264     intervalSuccesses += c->successfulRequests;
265     intervalFailures += c->failedRequests;
266     accumulatedResults.push_back(std::unique_ptr<Counters>(c));
267   }
268 
269   // "exchangeCounters" clears thread-specific counters. Transfer new totals
270   // to the grand total for an accurate result.
271   successfulRequests += intervalSuccesses;
272   unsuccessfulRequests += intervalFailures;
273 
274   BenchmarkIntervalResults r;
275   r.successfulRequests = intervalSuccesses;
276   r.intervalTime = Seconds(now - intervalStartTime);
277   r.elapsedTime = Seconds(now - startTime);
278   r.averageThroughput = (double)r.successfulRequests / r.intervalTime;
279   intervalStartTime = now;
280   return r;
281 }
282 
SampleCPU()283 void SampleCPU() {
284   if (remoteCpuSocket != 0) {
285     const double remoteCpu = getRemoteStat(kCPUCmd, &remoteCpuSocket);
286     remoteSamples.push_back(remoteCpu);
287   }
288   if (remote2CpuSocket != 0) {
289     const double remote2Cpu = getRemoteStat(kCPUCmd, &remote2CpuSocket);
290     remote2Samples.push_back(remote2Cpu);
291   }
292   const double cpu = cpu_GetInterval(&cpuUsage);
293   clientSamples.push_back(cpu);
294 }
295 
ReportInterval(std::ostream & out,const ThreadList & threads,int totalDuration,bool warmup)296 void ReportInterval(std::ostream& out, const ThreadList& threads,
297                     int totalDuration, bool warmup) {
298   double cpu = 0.0;
299   double remoteCpu = 0.0;
300   double remote2Cpu = 0.0;
301 
302   if (remoteCpuSocket != 0) {
303     remoteCpu = getRemoteStat(kCPUCmd, &remoteCpuSocket);
304     remoteSamples.push_back(remoteCpu);
305   }
306   if (remote2CpuSocket != 0) {
307     remote2Cpu = getRemoteStat(kCPUCmd, &remote2CpuSocket);
308     remote2Samples.push_back(remote2Cpu);
309   }
310   cpu = cpu_GetInterval(&cpuUsage);
311   clientSamples.push_back(cpu);
312 
313   const BenchmarkIntervalResults r = ReportIntervalResults(threads);
314   const std::string warm = (warmup ? "Warming up: " : "");
315 
316   out << StrFormat("%s(%.0f / %i) %.3f", warm, r.elapsedTime, totalDuration,
317                    r.averageThroughput);
318   if (cpu > 0.0) {
319     out << StrFormat(" %.0f%% cpu", cpu * 100.0);
320   }
321   if (remoteCpu > 0.0) {
322     out << StrFormat(" %.0f%% remote cpu", remoteCpu * 100.0);
323   }
324   out << endl;
325 }
326 
getLatencyPercent(const std::vector<int_fast64_t> & latencies,int percent)327 static int64_t getLatencyPercent(const std::vector<int_fast64_t>& latencies,
328                                  int percent) {
329   if (latencies.empty()) {
330     return 0;
331   }
332   if (percent == 100) {
333     return latencies[latencies.size() - 1];
334   }
335   size_t index = (latencies.size() / 100.0) * percent;
336   return latencies[index];
337 }
338 
getAverageLatency(const std::vector<int_fast64_t> & latencies)339 static int64_t getAverageLatency(const std::vector<int_fast64_t>& latencies) {
340   if (latencies.empty()) {
341     return 0LL;
342   }
343   return std::accumulate(latencies.begin(), latencies.end(), 0LL) /
344          (int64_t)latencies.size();
345 }
346 
getLatencyStdDev(const std::vector<int_fast64_t> & latencies)347 static double getLatencyStdDev(const std::vector<int_fast64_t>& latencies) {
348   if (latencies.empty()) {
349     return 0.0;
350   }
351   unsigned long avg = Milliseconds(getAverageLatency(latencies));
352   double differences = 0.0;
353 
354   std::for_each(latencies.begin(), latencies.end(),
355                 [&differences, avg](const int_fast64_t& l) {
356                   differences += pow(Milliseconds(l) - avg, 2.0);
357                 });
358 
359   return sqrt(differences / (double)latencies.size());
360 }
361 
getAverageCpu(const std::vector<double> & s)362 static double getAverageCpu(const std::vector<double>& s) {
363   if (s.empty()) {
364     return 0.0;
365   }
366   double total = 0.0;
367   std::for_each(s.cbegin(), s.cend(), [&total](double d) { total += d; });
368   return total / s.size();
369 }
370 
getMaxCpu(const std::vector<double> & s)371 static double getMaxCpu(const std::vector<double>& s) {
372   if (s.empty()) {
373     return 0.0;
374   }
375   return *(std::max_element(s.cbegin(), s.cend()));
376 }
377 
ReportResults()378 BenchmarkResults ReportResults() {
379   std::vector<int_fast64_t> allLatencies;
380   for (auto it = accumulatedResults.begin(); it != accumulatedResults.end();
381        it++) {
382     for (auto lit = (*it)->latencies.begin(); lit != (*it)->latencies.end();
383          lit++) {
384       allLatencies.push_back(*lit);
385     }
386   }
387   std::sort(allLatencies.begin(), allLatencies.end());
388 
389   BenchmarkResults r;
390   std::lock_guard<std::mutex> lock(latch);
391 
392   r.completedRequests = successfulRequests + unsuccessfulRequests;
393   r.successfulRequests = successfulRequests;
394   r.unsuccessfulRequests = unsuccessfulRequests;
395   r.socketErrors = socketErrors;
396   r.connectionsOpened = connectionsOpened;
397   r.totalBytesSent = totalBytesSent;
398   r.totalBytesReceived = totalBytesReceived;
399 
400   const int64_t rawElapsed = stopTime - startTime;
401   r.elapsedTime = Seconds(rawElapsed);
402   r.averageLatency = Milliseconds(getAverageLatency(allLatencies));
403   r.latencyStdDev = getLatencyStdDev(allLatencies);
404   for (int i = 0; i < 101; i++) {
405     r.latencies[i] = Milliseconds(getLatencyPercent(allLatencies, i));
406   }
407   r.averageThroughput = (double)r.completedRequests / r.elapsedTime;
408   r.averageSendBandwidth = (totalBytesSent * 8.0 / 1048576.0) / r.elapsedTime;
409   r.averageReceiveBandwidth =
410       (totalBytesReceived * 8.0 / 1048576.0) / r.elapsedTime;
411   return r;
412 }
413 
PrintFullResults(std::ostream & out)414 void PrintFullResults(std::ostream& out) {
415   const BenchmarkResults r = ReportResults();
416 
417   out << StrFormat("Duration:             %.3f seconds\n", r.elapsedTime);
418   out << StrFormat("Attempted requests:   %i\n", r.completedRequests);
419   out << StrFormat("Successful requests:  %i\n", r.successfulRequests);
420   out << StrFormat("Non-200 results:      %i\n", r.unsuccessfulRequests);
421   out << StrFormat("Connections opened:   %i\n", r.connectionsOpened);
422   out << StrFormat("Socket errors:        %i\n", r.socketErrors);
423   out << '\n';
424   out << StrFormat("Throughput:           %.3f requests/second\n",
425                    r.averageThroughput);
426   out << StrFormat("Average latency:      %.3f milliseconds\n",
427                    r.averageLatency);
428   out << StrFormat("Minimum latency:      %.3f milliseconds\n", r.latencies[0]);
429   out << StrFormat("Maximum latency:      %.3f milliseconds\n",
430                    r.latencies[100]);
431   out << StrFormat("Latency std. dev:     %.3f milliseconds\n",
432                    r.latencyStdDev);
433   out << StrFormat("50%% latency:          %.3f milliseconds\n",
434                    r.latencies[50]);
435   out << StrFormat("90%% latency:          %.3f milliseconds\n",
436                    r.latencies[90]);
437   out << StrFormat("98%% latency:          %.3f milliseconds\n",
438                    r.latencies[98]);
439   out << StrFormat("99%% latency:          %.3f milliseconds\n",
440                    r.latencies[99]);
441   out << '\n';
442   if (!clientSamples.empty()) {
443     out << StrFormat("Client CPU average:   %.0f%%\n",
444                      getAverageCpu(clientSamples) * 100.0);
445     out << StrFormat("Client CPU max:       %.0f%%\n",
446                      getMaxCpu(clientSamples) * 100.0);
447   }
448   out << StrFormat("Client memory usage:  %.0f%%\n", clientMem * 100.0);
449   if (!remoteSamples.empty()) {
450     out << StrFormat("Remote CPU average:   %.0f%%\n",
451                      getAverageCpu(remoteSamples) * 100.0);
452     out << StrFormat("Remote CPU max:       %.0f%%\n",
453                      getMaxCpu(remoteSamples) * 100.0);
454     out << StrFormat("Remote memory usage:  %.0f%%\n", remoteMem * 100.0);
455   }
456   if (!remote2Samples.empty()) {
457     out << StrFormat("Remote 2 CPU average:   %.0f%%\n",
458                      getAverageCpu(remote2Samples) * 100.0);
459     out << StrFormat("Remote 2 CPU max:       %.0f%%\n",
460                      getMaxCpu(remote2Samples) * 100.0);
461     out << StrFormat("Remote 2 memory usage:  %.0f%%\n", remote2Mem * 100.0);
462   }
463   out << '\n';
464   out << StrFormat("Total bytes sent:     %.2f megabytes\n",
465                    r.totalBytesSent / 1048576.0);
466   out << StrFormat("Total bytes received: %.2f megabytes\n",
467                    r.totalBytesReceived / 1048576.0);
468   out << StrFormat("Send bandwidth:       %.2f megabits / second\n",
469                    r.averageSendBandwidth);
470   out << StrFormat("Receive bandwidth:    %.2f megabits / second\n",
471                    r.averageReceiveBandwidth);
472 }
473 
PrintShortResults(std::ostream & out,const std::string & runName,size_t numThreads,int connections)474 void PrintShortResults(std::ostream& out, const std::string& runName,
475                        size_t numThreads, int connections) {
476   const BenchmarkResults r = ReportResults();
477 
478   // See "PrintReportingHeader for column names
479   out << StrFormat(
480       "%s,%.3f,%.3f,%i,%i,%.3f,%i,%i,%i,%i,%.3f,%.3f,%.3f,%.3f,%.3f,%."
481       "3f,%.3f,%.0f,%.0f,%.0f,%.0f,%.0f,%.0f,%.2f,%.2f\n",
482       runName, r.averageThroughput, r.averageLatency, numThreads, connections,
483       r.elapsedTime, r.completedRequests, r.successfulRequests, r.socketErrors,
484       r.connectionsOpened, r.latencies[0], r.latencies[100], r.latencies[50],
485       r.latencies[90], r.latencies[98], r.latencies[99], r.latencyStdDev,
486       getAverageCpu(clientSamples) * 100.0,
487       getAverageCpu(remoteSamples) * 100.0,
488       getAverageCpu(remote2Samples) * 100.0, clientMem * 100.0,
489       remoteMem * 100.0, remote2Mem * 100.0, r.averageSendBandwidth,
490       r.averageReceiveBandwidth);
491 }
492 
PrintReportingHeader(std::ostream & out)493 void PrintReportingHeader(std::ostream& out) {
494   out << "Name,Throughput,Avg. Latency,Threads,Connections,Duration,"
495          "Completed,Successful,Errors,Sockets,"
496          "Min. latency,Max. latency,50% Latency,90% Latency,"
497          "98% Latency,99% Latency,Latency Std Dev,Avg Client CPU,"
498          "Avg Server CPU,Avg Server 2 CPU,"
499          "Client Mem Usage,Server Mem,Server 2 Mem,"
500          "Avg. Send Bandwidth,Avg. Recv. Bandwidth\n";
501 }
502 
EndReporting()503 void EndReporting() {
504   if (remoteCpuSocket != 0) {
505     close(remoteCpuSocket);
506   }
507   if (remote2CpuSocket != 0) {
508     close(remote2CpuSocket);
509   }
510 }
511 
512 }  // namespace apib
513