1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/cpp/ext/filters/census/client_filter.h"
22
23 #include <string>
24 #include <utility>
25 #include <vector>
26
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/string_view.h"
29 #include "opencensus/stats/stats.h"
30 #include "opencensus/tags/context_util.h"
31 #include "opencensus/tags/tag_key.h"
32 #include "opencensus/tags/tag_map.h"
33
34 #include "src/core/lib/surface/call.h"
35 #include "src/cpp/ext/filters/census/grpc_plugin.h"
36 #include "src/cpp/ext/filters/census/measures.h"
37
38 namespace grpc {
39
40 constexpr uint32_t
41 OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTraceContextLen;
42 constexpr uint32_t
43 OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTagsLen;
44
Init(grpc_call_element *,const grpc_call_element_args * args)45 grpc_error_handle CensusClientCallData::Init(
46 grpc_call_element* /* elem */, const grpc_call_element_args* args) {
47 tracer_ = args->arena->New<OpenCensusCallTracer>(args);
48 GPR_DEBUG_ASSERT(args->context[GRPC_CONTEXT_CALL_TRACER].value == nullptr);
49 args->context[GRPC_CONTEXT_CALL_TRACER].value = tracer_;
50 args->context[GRPC_CONTEXT_CALL_TRACER].destroy = [](void* tracer) {
51 (static_cast<OpenCensusCallTracer*>(tracer))->~OpenCensusCallTracer();
52 };
53 return GRPC_ERROR_NONE;
54 }
55
StartTransportStreamOpBatch(grpc_call_element * elem,TransportStreamOpBatch * op)56 void CensusClientCallData::StartTransportStreamOpBatch(
57 grpc_call_element* elem, TransportStreamOpBatch* op) {
58 // Note that we are generating the overall call context here instead of in
59 // the constructor of `OpenCensusCallTracer` due to the semantics of
60 // `grpc_census_call_set_context` which allows the application to set the
61 // census context for a call anytime before the first call to
62 // `grpc_call_start_batch`.
63 if (op->op()->send_initial_metadata) {
64 tracer_->GenerateContext();
65 }
66 grpc_call_next_op(elem, op->op());
67 }
68
69 //
70 // OpenCensusCallTracer::OpenCensusCallAttemptTracer
71 //
72
73 namespace {
74
CreateCensusContextForCallAttempt(absl::string_view method,const CensusContext & parent_context)75 CensusContext CreateCensusContextForCallAttempt(
76 absl::string_view method, const CensusContext& parent_context) {
77 GPR_DEBUG_ASSERT(parent_context.Context().IsValid());
78 return CensusContext(absl::StrCat("Attempt.", method), &parent_context.Span(),
79 parent_context.tags());
80 }
81
82 } // namespace
83
OpenCensusCallAttemptTracer(OpenCensusCallTracer * parent,uint64_t attempt_num,bool is_transparent_retry,bool arena_allocated)84 OpenCensusCallTracer::OpenCensusCallAttemptTracer::OpenCensusCallAttemptTracer(
85 OpenCensusCallTracer* parent, uint64_t attempt_num,
86 bool is_transparent_retry, bool arena_allocated)
87 : parent_(parent),
88 arena_allocated_(arena_allocated),
89 context_(CreateCensusContextForCallAttempt(parent_->method_,
90 parent_->context_)),
91 start_time_(absl::Now()) {
92 context_.AddSpanAttribute("previous-rpc-attempts", attempt_num);
93 context_.AddSpanAttribute("transparent-retry", is_transparent_retry);
94 memset(&stats_bin_, 0, sizeof(grpc_linked_mdelem));
95 memset(&tracing_bin_, 0, sizeof(grpc_linked_mdelem));
96 }
97
98 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata,uint32_t)99 RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata,
100 uint32_t /* flags */) {
101 size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_,
102 kMaxTraceContextLen);
103 if (tracing_len > 0) {
104 GRPC_LOG_IF_ERROR(
105 "census grpc_filter",
106 grpc_metadata_batch_add_tail(
107 send_initial_metadata, &tracing_bin_,
108 grpc_mdelem_from_slices(
109 GRPC_MDSTR_GRPC_TRACE_BIN,
110 grpc_core::UnmanagedMemorySlice(tracing_buf_, tracing_len)),
111 GRPC_BATCH_GRPC_TRACE_BIN));
112 }
113 grpc_slice tags = grpc_empty_slice();
114 // TODO(unknown): Add in tagging serialization.
115 size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
116 if (encoded_tags_len > 0) {
117 GRPC_LOG_IF_ERROR(
118 "census grpc_filter",
119 grpc_metadata_batch_add_tail(
120 send_initial_metadata, &stats_bin_,
121 grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags),
122 GRPC_BATCH_GRPC_TAGS_BIN));
123 }
124 }
125
RecordSendMessage(const grpc_core::ByteStream &)126 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendMessage(
127 const grpc_core::ByteStream& /* send_message */) {
128 ++sent_message_count_;
129 }
130
RecordReceivedMessage(const grpc_core::ByteStream &)131 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedMessage(
132 const grpc_core::ByteStream& /* recv_message */) {
133 ++recv_message_count_;
134 }
135
136 namespace {
137
FilterTrailingMetadata(grpc_metadata_batch * b,uint64_t * elapsed_time)138 void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
139 if (b->legacy_index()->named.grpc_server_stats_bin != nullptr) {
140 ServerStatsDeserialize(
141 reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(
142 GRPC_MDVALUE(b->legacy_index()->named.grpc_server_stats_bin->md))),
143 GRPC_SLICE_LENGTH(
144 GRPC_MDVALUE(b->legacy_index()->named.grpc_server_stats_bin->md)),
145 elapsed_time);
146 b->Remove(b->legacy_index()->named.grpc_server_stats_bin);
147 }
148 }
149
150 } // namespace
151
152 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,const grpc_transport_stream_stats & transport_stream_stats)153 RecordReceivedTrailingMetadata(
154 absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
155 const grpc_transport_stream_stats& transport_stream_stats) {
156 FilterTrailingMetadata(recv_trailing_metadata, &elapsed_time_);
157 const uint64_t request_size = transport_stream_stats.outgoing.data_bytes;
158 const uint64_t response_size = transport_stream_stats.incoming.data_bytes;
159 std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
160 context_.tags().tags();
161 tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
162 status_code_ = status.code();
163 std::string final_status = absl::StatusCodeToString(status_code_);
164 tags.emplace_back(ClientStatusTagKey(), final_status);
165 ::opencensus::stats::Record(
166 {{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)},
167 {RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)},
168 {RpcClientServerLatency(),
169 ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))}},
170 tags);
171 }
172
RecordCancel(grpc_error_handle cancel_error)173 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordCancel(
174 grpc_error_handle cancel_error) {
175 status_code_ = absl::StatusCode::kCancelled;
176 GRPC_ERROR_UNREF(cancel_error);
177 }
178
RecordEnd(const gpr_timespec &)179 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordEnd(
180 const gpr_timespec& /* latency */) {
181 double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_);
182 std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
183 context_.tags().tags();
184 tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
185 tags.emplace_back(ClientStatusTagKey(), StatusCodeToString(status_code_));
186 ::opencensus::stats::Record(
187 {{RpcClientRoundtripLatency(), latency_ms},
188 {RpcClientSentMessagesPerRpc(), sent_message_count_},
189 {RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
190 tags);
191 if (status_code_ != absl::StatusCode::kOk) {
192 context_.Span().SetStatus(opencensus::trace::StatusCode(status_code_),
193 StatusCodeToString(status_code_));
194 }
195 context_.EndSpan();
196 grpc_core::MutexLock lock(&parent_->mu_);
197 if (--parent_->num_active_rpcs_ == 0) {
198 parent_->time_at_last_attempt_end_ = absl::Now();
199 }
200 if (arena_allocated_) {
201 this->~OpenCensusCallAttemptTracer();
202 } else {
203 delete this;
204 }
205 }
206
207 //
208 // OpenCensusCallTracer
209 //
210
OpenCensusCallTracer(const grpc_call_element_args * args)211 OpenCensusCallTracer::OpenCensusCallTracer(const grpc_call_element_args* args)
212 : call_context_(args->context),
213 path_(grpc_slice_ref_internal(args->path)),
214 method_(GetMethod(&path_)),
215 arena_(args->arena) {}
216
~OpenCensusCallTracer()217 OpenCensusCallTracer::~OpenCensusCallTracer() {
218 std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
219 context_.tags().tags();
220 tags.emplace_back(ClientMethodTagKey(), std::string(method_));
221 ::opencensus::stats::Record(
222 {{RpcClientRetriesPerCall(), retries_ - 1}, // exclude first attempt
223 {RpcClientTransparentRetriesPerCall(), transparent_retries_},
224 {RpcClientRetryDelayPerCall(), ToDoubleMilliseconds(retry_delay_)}},
225 tags);
226 grpc_slice_unref_internal(path_);
227 }
228
GenerateContext()229 void OpenCensusCallTracer::GenerateContext() {
230 auto* parent_context = reinterpret_cast<CensusContext*>(
231 call_context_[GRPC_CONTEXT_TRACING].value);
232 GenerateClientContext(absl::StrCat("Sent.", method_), &context_,
233 (parent_context == nullptr) ? nullptr : parent_context);
234 }
235
236 OpenCensusCallTracer::OpenCensusCallAttemptTracer*
StartNewAttempt(bool is_transparent_retry)237 OpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
238 // We allocate the first attempt on the arena and all subsequent attempts on
239 // the heap, so that in the common case we don't require a heap allocation,
240 // nor do we unnecessarily grow the arena.
241 bool is_first_attempt = true;
242 uint64_t attempt_num;
243 {
244 grpc_core::MutexLock lock(&mu_);
245 if (transparent_retries_ != 0 || retries_ != 0) {
246 is_first_attempt = false;
247 if (num_active_rpcs_ == 0) {
248 retry_delay_ += absl::Now() - time_at_last_attempt_end_;
249 }
250 }
251 attempt_num = retries_;
252 if (is_transparent_retry) {
253 ++transparent_retries_;
254 } else {
255 ++retries_;
256 }
257 ++num_active_rpcs_;
258 }
259 if (is_first_attempt) {
260 return arena_->New<OpenCensusCallAttemptTracer>(
261 this, attempt_num, is_transparent_retry, true /* arena_allocated */);
262 }
263 return new OpenCensusCallAttemptTracer(
264 this, attempt_num, is_transparent_retry, false /* arena_allocated */);
265 }
266
267 } // namespace grpc
268