1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/cpp/ext/filters/census/client_filter.h"
22 
23 #include <string>
24 #include <utility>
25 #include <vector>
26 
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/string_view.h"
29 #include "opencensus/stats/stats.h"
30 #include "opencensus/tags/context_util.h"
31 #include "opencensus/tags/tag_key.h"
32 #include "opencensus/tags/tag_map.h"
33 
34 #include "src/core/lib/surface/call.h"
35 #include "src/cpp/ext/filters/census/grpc_plugin.h"
36 #include "src/cpp/ext/filters/census/measures.h"
37 
38 namespace grpc {
39 
40 constexpr uint32_t
41     OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTraceContextLen;
42 constexpr uint32_t
43     OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTagsLen;
44 
Init(grpc_call_element *,const grpc_call_element_args * args)45 grpc_error_handle CensusClientCallData::Init(
46     grpc_call_element* /* elem */, const grpc_call_element_args* args) {
47   tracer_ = args->arena->New<OpenCensusCallTracer>(args);
48   GPR_DEBUG_ASSERT(args->context[GRPC_CONTEXT_CALL_TRACER].value == nullptr);
49   args->context[GRPC_CONTEXT_CALL_TRACER].value = tracer_;
50   args->context[GRPC_CONTEXT_CALL_TRACER].destroy = [](void* tracer) {
51     (static_cast<OpenCensusCallTracer*>(tracer))->~OpenCensusCallTracer();
52   };
53   return GRPC_ERROR_NONE;
54 }
55 
StartTransportStreamOpBatch(grpc_call_element * elem,TransportStreamOpBatch * op)56 void CensusClientCallData::StartTransportStreamOpBatch(
57     grpc_call_element* elem, TransportStreamOpBatch* op) {
58   // Note that we are generating the overall call context here instead of in
59   // the constructor of `OpenCensusCallTracer` due to the semantics of
60   // `grpc_census_call_set_context` which allows the application to set the
61   // census context for a call anytime before the first call to
62   // `grpc_call_start_batch`.
63   if (op->op()->send_initial_metadata) {
64     tracer_->GenerateContext();
65   }
66   grpc_call_next_op(elem, op->op());
67 }
68 
69 //
70 // OpenCensusCallTracer::OpenCensusCallAttemptTracer
71 //
72 
73 namespace {
74 
CreateCensusContextForCallAttempt(absl::string_view method,const CensusContext & parent_context)75 CensusContext CreateCensusContextForCallAttempt(
76     absl::string_view method, const CensusContext& parent_context) {
77   GPR_DEBUG_ASSERT(parent_context.Context().IsValid());
78   return CensusContext(absl::StrCat("Attempt.", method), &parent_context.Span(),
79                        parent_context.tags());
80 }
81 
82 }  // namespace
83 
OpenCensusCallAttemptTracer(OpenCensusCallTracer * parent,uint64_t attempt_num,bool is_transparent_retry,bool arena_allocated)84 OpenCensusCallTracer::OpenCensusCallAttemptTracer::OpenCensusCallAttemptTracer(
85     OpenCensusCallTracer* parent, uint64_t attempt_num,
86     bool is_transparent_retry, bool arena_allocated)
87     : parent_(parent),
88       arena_allocated_(arena_allocated),
89       context_(CreateCensusContextForCallAttempt(parent_->method_,
90                                                  parent_->context_)),
91       start_time_(absl::Now()) {
92   context_.AddSpanAttribute("previous-rpc-attempts", attempt_num);
93   context_.AddSpanAttribute("transparent-retry", is_transparent_retry);
94   memset(&stats_bin_, 0, sizeof(grpc_linked_mdelem));
95   memset(&tracing_bin_, 0, sizeof(grpc_linked_mdelem));
96 }
97 
98 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata,uint32_t)99     RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata,
100                               uint32_t /* flags */) {
101   size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_,
102                                              kMaxTraceContextLen);
103   if (tracing_len > 0) {
104     GRPC_LOG_IF_ERROR(
105         "census grpc_filter",
106         grpc_metadata_batch_add_tail(
107             send_initial_metadata, &tracing_bin_,
108             grpc_mdelem_from_slices(
109                 GRPC_MDSTR_GRPC_TRACE_BIN,
110                 grpc_core::UnmanagedMemorySlice(tracing_buf_, tracing_len)),
111             GRPC_BATCH_GRPC_TRACE_BIN));
112   }
113   grpc_slice tags = grpc_empty_slice();
114   // TODO(unknown): Add in tagging serialization.
115   size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
116   if (encoded_tags_len > 0) {
117     GRPC_LOG_IF_ERROR(
118         "census grpc_filter",
119         grpc_metadata_batch_add_tail(
120             send_initial_metadata, &stats_bin_,
121             grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags),
122             GRPC_BATCH_GRPC_TAGS_BIN));
123   }
124 }
125 
RecordSendMessage(const grpc_core::ByteStream &)126 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendMessage(
127     const grpc_core::ByteStream& /* send_message */) {
128   ++sent_message_count_;
129 }
130 
RecordReceivedMessage(const grpc_core::ByteStream &)131 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedMessage(
132     const grpc_core::ByteStream& /* recv_message */) {
133   ++recv_message_count_;
134 }
135 
136 namespace {
137 
FilterTrailingMetadata(grpc_metadata_batch * b,uint64_t * elapsed_time)138 void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
139   if (b->legacy_index()->named.grpc_server_stats_bin != nullptr) {
140     ServerStatsDeserialize(
141         reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(
142             GRPC_MDVALUE(b->legacy_index()->named.grpc_server_stats_bin->md))),
143         GRPC_SLICE_LENGTH(
144             GRPC_MDVALUE(b->legacy_index()->named.grpc_server_stats_bin->md)),
145         elapsed_time);
146     b->Remove(b->legacy_index()->named.grpc_server_stats_bin);
147   }
148 }
149 
150 }  // namespace
151 
152 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,const grpc_transport_stream_stats & transport_stream_stats)153     RecordReceivedTrailingMetadata(
154         absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
155         const grpc_transport_stream_stats& transport_stream_stats) {
156   FilterTrailingMetadata(recv_trailing_metadata, &elapsed_time_);
157   const uint64_t request_size = transport_stream_stats.outgoing.data_bytes;
158   const uint64_t response_size = transport_stream_stats.incoming.data_bytes;
159   std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
160       context_.tags().tags();
161   tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
162   status_code_ = status.code();
163   std::string final_status = absl::StatusCodeToString(status_code_);
164   tags.emplace_back(ClientStatusTagKey(), final_status);
165   ::opencensus::stats::Record(
166       {{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)},
167        {RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)},
168        {RpcClientServerLatency(),
169         ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))}},
170       tags);
171 }
172 
RecordCancel(grpc_error_handle cancel_error)173 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordCancel(
174     grpc_error_handle cancel_error) {
175   status_code_ = absl::StatusCode::kCancelled;
176   GRPC_ERROR_UNREF(cancel_error);
177 }
178 
RecordEnd(const gpr_timespec &)179 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordEnd(
180     const gpr_timespec& /* latency */) {
181   double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_);
182   std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
183       context_.tags().tags();
184   tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
185   tags.emplace_back(ClientStatusTagKey(), StatusCodeToString(status_code_));
186   ::opencensus::stats::Record(
187       {{RpcClientRoundtripLatency(), latency_ms},
188        {RpcClientSentMessagesPerRpc(), sent_message_count_},
189        {RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
190       tags);
191   if (status_code_ != absl::StatusCode::kOk) {
192     context_.Span().SetStatus(opencensus::trace::StatusCode(status_code_),
193                               StatusCodeToString(status_code_));
194   }
195   context_.EndSpan();
196   grpc_core::MutexLock lock(&parent_->mu_);
197   if (--parent_->num_active_rpcs_ == 0) {
198     parent_->time_at_last_attempt_end_ = absl::Now();
199   }
200   if (arena_allocated_) {
201     this->~OpenCensusCallAttemptTracer();
202   } else {
203     delete this;
204   }
205 }
206 
207 //
208 // OpenCensusCallTracer
209 //
210 
OpenCensusCallTracer(const grpc_call_element_args * args)211 OpenCensusCallTracer::OpenCensusCallTracer(const grpc_call_element_args* args)
212     : call_context_(args->context),
213       path_(grpc_slice_ref_internal(args->path)),
214       method_(GetMethod(&path_)),
215       arena_(args->arena) {}
216 
~OpenCensusCallTracer()217 OpenCensusCallTracer::~OpenCensusCallTracer() {
218   std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
219       context_.tags().tags();
220   tags.emplace_back(ClientMethodTagKey(), std::string(method_));
221   ::opencensus::stats::Record(
222       {{RpcClientRetriesPerCall(), retries_ - 1},  // exclude first attempt
223        {RpcClientTransparentRetriesPerCall(), transparent_retries_},
224        {RpcClientRetryDelayPerCall(), ToDoubleMilliseconds(retry_delay_)}},
225       tags);
226   grpc_slice_unref_internal(path_);
227 }
228 
GenerateContext()229 void OpenCensusCallTracer::GenerateContext() {
230   auto* parent_context = reinterpret_cast<CensusContext*>(
231       call_context_[GRPC_CONTEXT_TRACING].value);
232   GenerateClientContext(absl::StrCat("Sent.", method_), &context_,
233                         (parent_context == nullptr) ? nullptr : parent_context);
234 }
235 
236 OpenCensusCallTracer::OpenCensusCallAttemptTracer*
StartNewAttempt(bool is_transparent_retry)237 OpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
238   // We allocate the first attempt on the arena and all subsequent attempts on
239   // the heap, so that in the common case we don't require a heap allocation,
240   // nor do we unnecessarily grow the arena.
241   bool is_first_attempt = true;
242   uint64_t attempt_num;
243   {
244     grpc_core::MutexLock lock(&mu_);
245     if (transparent_retries_ != 0 || retries_ != 0) {
246       is_first_attempt = false;
247       if (num_active_rpcs_ == 0) {
248         retry_delay_ += absl::Now() - time_at_last_attempt_end_;
249       }
250     }
251     attempt_num = retries_;
252     if (is_transparent_retry) {
253       ++transparent_retries_;
254     } else {
255       ++retries_;
256     }
257     ++num_active_rpcs_;
258   }
259   if (is_first_attempt) {
260     return arena_->New<OpenCensusCallAttemptTracer>(
261         this, attempt_num, is_transparent_retry, true /* arena_allocated */);
262   }
263   return new OpenCensusCallAttemptTracer(
264       this, attempt_num, is_transparent_retry, false /* arena_allocated */);
265 }
266 
267 }  // namespace grpc
268