1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <string>
20 #include <thread>  // NOLINT
21 #include <vector>
22 
23 #include "absl/strings/str_cat.h"
24 #include "absl/strings/string_view.h"
25 #include "gmock/gmock.h"
26 #include "gtest/gtest.h"
27 #include "include/grpc++/grpc++.h"
28 #include "include/grpcpp/opencensus.h"
29 #include "opencensus/stats/stats.h"
30 #include "opencensus/stats/tag_key.h"
31 #include "opencensus/stats/testing/test_utils.h"
32 #include "opencensus/tags/tag_map.h"
33 #include "opencensus/tags/with_tag_map.h"
34 #include "src/cpp/ext/filters/census/grpc_plugin.h"
35 #include "src/proto/grpc/testing/echo.grpc.pb.h"
36 #include "test/core/util/test_config.h"
37 
38 namespace grpc {
39 namespace testing {
40 namespace {
41 
42 using ::opencensus::stats::Aggregation;
43 using ::opencensus::stats::Distribution;
44 using ::opencensus::stats::View;
45 using ::opencensus::stats::ViewDescriptor;
46 using ::opencensus::stats::testing::TestUtils;
47 using ::opencensus::tags::TagKey;
48 using ::opencensus::tags::WithTagMap;
49 
50 static const auto TEST_TAG_KEY = TagKey::Register("my_key");
51 static const auto TEST_TAG_VALUE = "my_value";
52 
53 class EchoServer final : public EchoTestService::Service {
Echo(::grpc::ServerContext * context,const EchoRequest * request,EchoResponse * response)54   ::grpc::Status Echo(::grpc::ServerContext* context,
55                       const EchoRequest* request,
56                       EchoResponse* response) override {
57     if (request->param().expected_error().code() == 0) {
58       response->set_message(request->message());
59       return ::grpc::Status::OK;
60     } else {
61       return ::grpc::Status(static_cast<::grpc::StatusCode>(
62                                 request->param().expected_error().code()),
63                             "");
64     }
65   }
66 };
67 
68 class StatsPluginEnd2EndTest : public ::testing::Test {
69  protected:
SetUpTestCase()70   static void SetUpTestCase() { RegisterOpenCensusPlugin(); }
71 
SetUp()72   void SetUp() override {
73     // Set up a synchronous server on a different thread to avoid the asynch
74     // interface.
75     ::grpc::ServerBuilder builder;
76     int port;
77     // Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis.
78     builder.AddListeningPort("0.0.0.0:0", ::grpc::InsecureServerCredentials(),
79                              &port);
80     builder.RegisterService(&service_);
81     server_ = builder.BuildAndStart();
82     ASSERT_NE(nullptr, server_);
83     ASSERT_NE(0, port);
84     server_address_ = absl::StrCat("localhost:", port);
85     server_thread_ = std::thread(&StatsPluginEnd2EndTest::RunServerLoop, this);
86 
87     stub_ = EchoTestService::NewStub(::grpc::CreateChannel(
88         server_address_, ::grpc::InsecureChannelCredentials()));
89   }
90 
TearDown()91   void TearDown() override {
92     server_->Shutdown();
93     server_thread_.join();
94   }
95 
RunServerLoop()96   void RunServerLoop() { server_->Wait(); }
97 
98   const std::string client_method_name_ = "grpc.testing.EchoTestService/Echo";
99   const std::string server_method_name_ = "grpc.testing.EchoTestService/Echo";
100 
101   std::string server_address_;
102   EchoServer service_;
103   std::unique_ptr<grpc::Server> server_;
104   std::thread server_thread_;
105 
106   std::unique_ptr<EchoTestService::Stub> stub_;
107 };
108 
TEST_F(StatsPluginEnd2EndTest,ErrorCount)109 TEST_F(StatsPluginEnd2EndTest, ErrorCount) {
110   const auto client_method_descriptor =
111       ViewDescriptor()
112           .set_measure(kRpcClientRoundtripLatencyMeasureName)
113           .set_name("client_method")
114           .set_aggregation(Aggregation::Count())
115           .add_column(ClientMethodTagKey())
116           .add_column(TEST_TAG_KEY);
117   View client_method_view(client_method_descriptor);
118   const auto server_method_descriptor =
119       ViewDescriptor()
120           .set_measure(kRpcServerServerLatencyMeasureName)
121           .set_name("server_method")
122           .set_aggregation(Aggregation::Count())
123           .add_column(ServerMethodTagKey());
124   //.add_column(TEST_TAG_KEY);
125   View server_method_view(server_method_descriptor);
126 
127   const auto client_status_descriptor =
128       ViewDescriptor()
129           .set_measure(kRpcClientRoundtripLatencyMeasureName)
130           .set_name("client_status")
131           .set_aggregation(Aggregation::Count())
132           .add_column(ClientStatusTagKey())
133           .add_column(TEST_TAG_KEY);
134   View client_status_view(client_status_descriptor);
135   const auto server_status_descriptor =
136       ViewDescriptor()
137           .set_measure(kRpcServerServerLatencyMeasureName)
138           .set_name("server_status")
139           .set_aggregation(Aggregation::Count())
140           .add_column(ServerStatusTagKey());
141   View server_status_view(server_status_descriptor);
142 
143   // Cover all valid statuses.
144   for (int i = 0; i <= 16; ++i) {
145     EchoRequest request;
146     request.set_message("foo");
147     request.mutable_param()->mutable_expected_error()->set_code(i);
148     EchoResponse response;
149     ::grpc::ClientContext context;
150     {
151       WithTagMap tags({{TEST_TAG_KEY, TEST_TAG_VALUE}});
152       ::grpc::Status status = stub_->Echo(&context, request, &response);
153     }
154   }
155   absl::SleepFor(absl::Milliseconds(500));
156   TestUtils::Flush();
157 
158   // Client side views can be tagged with custom tags.
159   EXPECT_THAT(
160       client_method_view.GetData().int_data(),
161       ::testing::UnorderedElementsAre(::testing::Pair(
162           ::testing::ElementsAre(client_method_name_, TEST_TAG_VALUE), 17)));
163   // TODO: Implement server view tagging with custom tags.
164   EXPECT_THAT(server_method_view.GetData().int_data(),
165               ::testing::UnorderedElementsAre(::testing::Pair(
166                   ::testing::ElementsAre(server_method_name_), 17)));
167 
168   // Client side views can be tagged with custom tags.
169   auto client_tags = {
170       ::testing::Pair(::testing::ElementsAre("OK", TEST_TAG_VALUE), 1),
171       ::testing::Pair(::testing::ElementsAre("CANCELLED", TEST_TAG_VALUE), 1),
172       ::testing::Pair(::testing::ElementsAre("UNKNOWN", TEST_TAG_VALUE), 1),
173       ::testing::Pair(
174           ::testing::ElementsAre("INVALID_ARGUMENT", TEST_TAG_VALUE), 1),
175       ::testing::Pair(
176           ::testing::ElementsAre("DEADLINE_EXCEEDED", TEST_TAG_VALUE), 1),
177       ::testing::Pair(::testing::ElementsAre("NOT_FOUND", TEST_TAG_VALUE), 1),
178       ::testing::Pair(::testing::ElementsAre("ALREADY_EXISTS", TEST_TAG_VALUE),
179                       1),
180       ::testing::Pair(
181           ::testing::ElementsAre("PERMISSION_DENIED", TEST_TAG_VALUE), 1),
182       ::testing::Pair(::testing::ElementsAre("UNAUTHENTICATED", TEST_TAG_VALUE),
183                       1),
184       ::testing::Pair(
185           ::testing::ElementsAre("RESOURCE_EXHAUSTED", TEST_TAG_VALUE), 1),
186       ::testing::Pair(
187           ::testing::ElementsAre("FAILED_PRECONDITION", TEST_TAG_VALUE), 1),
188       ::testing::Pair(::testing::ElementsAre("ABORTED", TEST_TAG_VALUE), 1),
189       ::testing::Pair(::testing::ElementsAre("OUT_OF_RANGE", TEST_TAG_VALUE),
190                       1),
191       ::testing::Pair(::testing::ElementsAre("UNIMPLEMENTED", TEST_TAG_VALUE),
192                       1),
193       ::testing::Pair(::testing::ElementsAre("INTERNAL", TEST_TAG_VALUE), 1),
194       ::testing::Pair(::testing::ElementsAre("UNAVAILABLE", TEST_TAG_VALUE), 1),
195       ::testing::Pair(::testing::ElementsAre("DATA_LOSS", TEST_TAG_VALUE), 1),
196   };
197 
198   // TODO: Implement server view tagging with custom tags.
199   auto server_tags = {
200       ::testing::Pair(::testing::ElementsAre("OK"), 1),
201       ::testing::Pair(::testing::ElementsAre("CANCELLED"), 1),
202       ::testing::Pair(::testing::ElementsAre("UNKNOWN"), 1),
203       ::testing::Pair(::testing::ElementsAre("INVALID_ARGUMENT"), 1),
204       ::testing::Pair(::testing::ElementsAre("DEADLINE_EXCEEDED"), 1),
205       ::testing::Pair(::testing::ElementsAre("NOT_FOUND"), 1),
206       ::testing::Pair(::testing::ElementsAre("ALREADY_EXISTS"), 1),
207       ::testing::Pair(::testing::ElementsAre("PERMISSION_DENIED"), 1),
208       ::testing::Pair(::testing::ElementsAre("UNAUTHENTICATED"), 1),
209       ::testing::Pair(::testing::ElementsAre("RESOURCE_EXHAUSTED"), 1),
210       ::testing::Pair(::testing::ElementsAre("FAILED_PRECONDITION"), 1),
211       ::testing::Pair(::testing::ElementsAre("ABORTED"), 1),
212       ::testing::Pair(::testing::ElementsAre("OUT_OF_RANGE"), 1),
213       ::testing::Pair(::testing::ElementsAre("UNIMPLEMENTED"), 1),
214       ::testing::Pair(::testing::ElementsAre("INTERNAL"), 1),
215       ::testing::Pair(::testing::ElementsAre("UNAVAILABLE"), 1),
216       ::testing::Pair(::testing::ElementsAre("DATA_LOSS"), 1),
217   };
218 
219   EXPECT_THAT(client_status_view.GetData().int_data(),
220               ::testing::UnorderedElementsAreArray(client_tags));
221   EXPECT_THAT(server_status_view.GetData().int_data(),
222               ::testing::UnorderedElementsAreArray(server_tags));
223 }
224 
TEST_F(StatsPluginEnd2EndTest,RequestReceivedBytesPerRpc)225 TEST_F(StatsPluginEnd2EndTest, RequestReceivedBytesPerRpc) {
226   View client_sent_bytes_per_rpc_view(ClientSentBytesPerRpcCumulative());
227   View client_received_bytes_per_rpc_view(
228       ClientReceivedBytesPerRpcCumulative());
229   View server_sent_bytes_per_rpc_view(ServerSentBytesPerRpcCumulative());
230   View server_received_bytes_per_rpc_view(
231       ServerReceivedBytesPerRpcCumulative());
232 
233   {
234     EchoRequest request;
235     request.set_message("foo");
236     EchoResponse response;
237     ::grpc::ClientContext context;
238     ::grpc::Status status = stub_->Echo(&context, request, &response);
239     ASSERT_TRUE(status.ok());
240     EXPECT_EQ("foo", response.message());
241   }
242   absl::SleepFor(absl::Milliseconds(500));
243   TestUtils::Flush();
244 
245   EXPECT_THAT(client_received_bytes_per_rpc_view.GetData().distribution_data(),
246               ::testing::UnorderedElementsAre(::testing::Pair(
247                   ::testing::ElementsAre(client_method_name_),
248                   ::testing::AllOf(::testing::Property(&Distribution::count, 1),
249                                    ::testing::Property(&Distribution::mean,
250                                                        ::testing::Gt(0.0))))));
251   EXPECT_THAT(client_sent_bytes_per_rpc_view.GetData().distribution_data(),
252               ::testing::UnorderedElementsAre(::testing::Pair(
253                   ::testing::ElementsAre(client_method_name_),
254                   ::testing::AllOf(::testing::Property(&Distribution::count, 1),
255                                    ::testing::Property(&Distribution::mean,
256                                                        ::testing::Gt(0.0))))));
257   EXPECT_THAT(server_received_bytes_per_rpc_view.GetData().distribution_data(),
258               ::testing::UnorderedElementsAre(::testing::Pair(
259                   ::testing::ElementsAre(server_method_name_),
260                   ::testing::AllOf(::testing::Property(&Distribution::count, 1),
261                                    ::testing::Property(&Distribution::mean,
262                                                        ::testing::Gt(0.0))))));
263   EXPECT_THAT(server_sent_bytes_per_rpc_view.GetData().distribution_data(),
264               ::testing::UnorderedElementsAre(::testing::Pair(
265                   ::testing::ElementsAre(server_method_name_),
266                   ::testing::AllOf(::testing::Property(&Distribution::count, 1),
267                                    ::testing::Property(&Distribution::mean,
268                                                        ::testing::Gt(0.0))))));
269 }
270 
TEST_F(StatsPluginEnd2EndTest,Latency)271 TEST_F(StatsPluginEnd2EndTest, Latency) {
272   View client_latency_view(ClientRoundtripLatencyCumulative());
273   View client_server_latency_view(ClientServerLatencyCumulative());
274   View server_server_latency_view(ServerServerLatencyCumulative());
275 
276   const absl::Time start_time = absl::Now();
277   {
278     EchoRequest request;
279     request.set_message("foo");
280     EchoResponse response;
281     ::grpc::ClientContext context;
282     ::grpc::Status status = stub_->Echo(&context, request, &response);
283     ASSERT_TRUE(status.ok());
284     EXPECT_EQ("foo", response.message());
285   }
286   // We do not know exact latency/elapsed time, but we know it is less than the
287   // entire time spent making the RPC.
288   const double max_time = absl::ToDoubleMilliseconds(absl::Now() - start_time);
289 
290   absl::SleepFor(absl::Milliseconds(500));
291   TestUtils::Flush();
292 
293   EXPECT_THAT(
294       client_latency_view.GetData().distribution_data(),
295       ::testing::UnorderedElementsAre(::testing::Pair(
296           ::testing::ElementsAre(client_method_name_),
297           ::testing::AllOf(
298               ::testing::Property(&Distribution::count, 1),
299               ::testing::Property(&Distribution::mean, ::testing::Gt(0.0)),
300               ::testing::Property(&Distribution::mean,
301                                   ::testing::Lt(max_time))))));
302 
303   // Elapsed time is a subinterval of total latency.
304   const auto client_latency = client_latency_view.GetData()
305                                   .distribution_data()
306                                   .find({client_method_name_})
307                                   ->second.mean();
308   EXPECT_THAT(
309       client_server_latency_view.GetData().distribution_data(),
310       ::testing::UnorderedElementsAre(::testing::Pair(
311           ::testing::ElementsAre(client_method_name_),
312           ::testing::AllOf(
313               ::testing::Property(&Distribution::count, 1),
314               ::testing::Property(&Distribution::mean, ::testing::Gt(0.0)),
315               ::testing::Property(&Distribution::mean,
316                                   ::testing::Lt(client_latency))))));
317 
318   // client server elapsed time should be the same value propagated to the
319   // client.
320   const auto client_elapsed_time = client_server_latency_view.GetData()
321                                        .distribution_data()
322                                        .find({client_method_name_})
323                                        ->second.mean();
324   EXPECT_THAT(
325       server_server_latency_view.GetData().distribution_data(),
326       ::testing::UnorderedElementsAre(::testing::Pair(
327           ::testing::ElementsAre(server_method_name_),
328           ::testing::AllOf(
329               ::testing::Property(&Distribution::count, 1),
330               ::testing::Property(&Distribution::mean,
331                                   ::testing::DoubleEq(client_elapsed_time))))));
332 }
333 
TEST_F(StatsPluginEnd2EndTest,CompletedRpcs)334 TEST_F(StatsPluginEnd2EndTest, CompletedRpcs) {
335   View client_completed_rpcs_view(ClientCompletedRpcsCumulative());
336   View server_completed_rpcs_view(ServerCompletedRpcsCumulative());
337 
338   EchoRequest request;
339   request.set_message("foo");
340   EchoResponse response;
341   const int count = 5;
342   for (int i = 0; i < count; ++i) {
343     {
344       ::grpc::ClientContext context;
345       ::grpc::Status status = stub_->Echo(&context, request, &response);
346       ASSERT_TRUE(status.ok());
347       EXPECT_EQ("foo", response.message());
348     }
349     absl::SleepFor(absl::Milliseconds(500));
350     TestUtils::Flush();
351 
352     EXPECT_THAT(client_completed_rpcs_view.GetData().int_data(),
353                 ::testing::UnorderedElementsAre(::testing::Pair(
354                     ::testing::ElementsAre(client_method_name_, "OK"), i + 1)));
355     EXPECT_THAT(server_completed_rpcs_view.GetData().int_data(),
356                 ::testing::UnorderedElementsAre(::testing::Pair(
357                     ::testing::ElementsAre(server_method_name_, "OK"), i + 1)));
358   }
359 }
360 
TEST_F(StatsPluginEnd2EndTest,RequestReceivedMessagesPerRpc)361 TEST_F(StatsPluginEnd2EndTest, RequestReceivedMessagesPerRpc) {
362   // TODO: Use streaming RPCs.
363   View client_received_messages_per_rpc_view(
364       ClientSentMessagesPerRpcCumulative());
365   View client_sent_messages_per_rpc_view(
366       ClientReceivedMessagesPerRpcCumulative());
367   View server_received_messages_per_rpc_view(
368       ServerSentMessagesPerRpcCumulative());
369   View server_sent_messages_per_rpc_view(
370       ServerReceivedMessagesPerRpcCumulative());
371 
372   EchoRequest request;
373   request.set_message("foo");
374   EchoResponse response;
375   const int count = 5;
376   for (int i = 0; i < count; ++i) {
377     {
378       ::grpc::ClientContext context;
379       ::grpc::Status status = stub_->Echo(&context, request, &response);
380       ASSERT_TRUE(status.ok());
381       EXPECT_EQ("foo", response.message());
382     }
383     absl::SleepFor(absl::Milliseconds(500));
384     TestUtils::Flush();
385 
386     EXPECT_THAT(
387         client_received_messages_per_rpc_view.GetData().distribution_data(),
388         ::testing::UnorderedElementsAre(::testing::Pair(
389             ::testing::ElementsAre(client_method_name_),
390             ::testing::AllOf(::testing::Property(&Distribution::count, i + 1),
391                              ::testing::Property(&Distribution::mean,
392                                                  ::testing::DoubleEq(1.0))))));
393     EXPECT_THAT(
394         client_sent_messages_per_rpc_view.GetData().distribution_data(),
395         ::testing::UnorderedElementsAre(::testing::Pair(
396             ::testing::ElementsAre(client_method_name_),
397             ::testing::AllOf(::testing::Property(&Distribution::count, i + 1),
398                              ::testing::Property(&Distribution::mean,
399                                                  ::testing::DoubleEq(1.0))))));
400     EXPECT_THAT(
401         server_received_messages_per_rpc_view.GetData().distribution_data(),
402         ::testing::UnorderedElementsAre(::testing::Pair(
403             ::testing::ElementsAre(server_method_name_),
404             ::testing::AllOf(::testing::Property(&Distribution::count, i + 1),
405                              ::testing::Property(&Distribution::mean,
406                                                  ::testing::DoubleEq(1.0))))));
407     EXPECT_THAT(
408         server_sent_messages_per_rpc_view.GetData().distribution_data(),
409         ::testing::UnorderedElementsAre(::testing::Pair(
410             ::testing::ElementsAre(server_method_name_),
411             ::testing::AllOf(::testing::Property(&Distribution::count, i + 1),
412                              ::testing::Property(&Distribution::mean,
413                                                  ::testing::DoubleEq(1.0))))));
414   }
415 }
416 
417 }  // namespace
418 }  // namespace testing
419 }  // namespace grpc
420 
main(int argc,char ** argv)421 int main(int argc, char** argv) {
422   grpc::testing::TestEnvironment env(argc, argv);
423   ::testing::InitGoogleTest(&argc, argv);
424   return RUN_ALL_TESTS();
425 }
426