1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "services/tracing/perfetto/consumer_host.h"
6
7 #include <algorithm>
8 #include <cstring>
9 #include <string>
10 #include <tuple>
11 #include <utility>
12 #include <vector>
13
14 #include "base/logging.h"
15 #include "base/numerics/ranges.h"
16 #include "base/stl_util.h"
17 #include "base/strings/strcat.h"
18 #include "base/strings/string_number_conversions.h"
19 #include "base/strings/string_util.h"
20 #include "base/task/post_task.h"
21 #include "base/task/thread_pool.h"
22 #include "base/trace_event/trace_log.h"
23 #include "base/values.h"
24 #include "build/build_config.h"
25 #include "mojo/public/cpp/bindings/self_owned_receiver.h"
26 #include "mojo/public/cpp/system/wait.h"
27 #include "services/tracing/perfetto/perfetto_service.h"
28 #include "services/tracing/public/cpp/perfetto/perfetto_session.h"
29 #include "services/tracing/public/cpp/trace_event_args_allowlist.h"
30 #include "third_party/perfetto/include/perfetto/ext/trace_processor/export_json.h"
31 #include "third_party/perfetto/include/perfetto/ext/tracing/core/observable_events.h"
32 #include "third_party/perfetto/include/perfetto/ext/tracing/core/slice.h"
33 #include "third_party/perfetto/include/perfetto/ext/tracing/core/trace_packet.h"
34 #include "third_party/perfetto/include/perfetto/ext/tracing/core/trace_stats.h"
35 #include "third_party/perfetto/include/perfetto/trace_processor/basic_types.h"
36 #include "third_party/perfetto/include/perfetto/trace_processor/trace_processor_storage.h"
37 #include "third_party/perfetto/include/perfetto/tracing/core/trace_config.h"
38 #include "third_party/perfetto/protos/perfetto/config/trace_config.pb.h"
39
40 namespace tracing {
41
42 namespace {
43
44 const int32_t kEnableTracingTimeoutSeconds = 10;
45
46 class JsonStringOutputWriter
47 : public perfetto::trace_processor::json::OutputWriter {
48 public:
49 using FlushCallback =
50 base::RepeatingCallback<void(std::string json, bool has_more)>;
51
JsonStringOutputWriter(FlushCallback flush_callback)52 JsonStringOutputWriter(FlushCallback flush_callback)
53 : flush_callback_(std::move(flush_callback)) {
54 buffer_.reserve(kBufferReserveCapacity);
55 }
56
~JsonStringOutputWriter()57 ~JsonStringOutputWriter() override {
58 flush_callback_.Run(std::move(buffer_), false);
59 }
60
AppendString(const std::string & string)61 perfetto::trace_processor::util::Status AppendString(
62 const std::string& string) override {
63 buffer_ += string;
64 if (buffer_.size() > kBufferLimitInBytes) {
65 flush_callback_.Run(std::move(buffer_), true);
66 // Reset the buffer_ after moving it above.
67 buffer_.clear();
68 buffer_.reserve(kBufferReserveCapacity);
69 }
70 return perfetto::trace_processor::util::OkStatus();
71 }
72
73 private:
74 static constexpr size_t kBufferLimitInBytes = 100 * 1024;
75 // Since we write each string before checking the limit, we'll always go
76 // slightly over and hence we reserve some extra space to avoid most
77 // reallocs.
78 static constexpr size_t kBufferReserveCapacity = kBufferLimitInBytes * 5 / 4;
79
80 FlushCallback flush_callback_;
81 std::string buffer_;
82 };
83
84 } // namespace
85
86 class ConsumerHost::StreamWriter {
87 public:
88 using Slice = std::string;
89
CreateTaskRunner()90 static scoped_refptr<base::SequencedTaskRunner> CreateTaskRunner() {
91 return base::ThreadPool::CreateSequencedTaskRunner(
92 {base::WithBaseSyncPrimitives(), base::TaskPriority::BEST_EFFORT});
93 }
94
StreamWriter(mojo::ScopedDataPipeProducerHandle stream,TracingSession::ReadBuffersCallback callback,base::OnceClosure disconnect_callback,scoped_refptr<base::SequencedTaskRunner> callback_task_runner)95 StreamWriter(mojo::ScopedDataPipeProducerHandle stream,
96 TracingSession::ReadBuffersCallback callback,
97 base::OnceClosure disconnect_callback,
98 scoped_refptr<base::SequencedTaskRunner> callback_task_runner)
99 : stream_(std::move(stream)),
100 read_buffers_callback_(std::move(callback)),
101 disconnect_callback_(std::move(disconnect_callback)),
102 callback_task_runner_(callback_task_runner) {}
103
WriteToStream(std::unique_ptr<Slice> slice,bool has_more)104 void WriteToStream(std::unique_ptr<Slice> slice, bool has_more) {
105 DCHECK(stream_.is_valid());
106
107 uint32_t write_position = 0;
108 while (write_position < slice->size()) {
109 uint32_t write_bytes = slice->size() - write_position;
110
111 MojoResult result =
112 stream_->WriteData(slice->data() + write_position, &write_bytes,
113 MOJO_WRITE_DATA_FLAG_NONE);
114
115 if (result == MOJO_RESULT_OK) {
116 write_position += write_bytes;
117 continue;
118 }
119
120 if (result == MOJO_RESULT_SHOULD_WAIT) {
121 result = mojo::Wait(stream_.get(), MOJO_HANDLE_SIGNAL_WRITABLE);
122 }
123
124 if (result != MOJO_RESULT_OK) {
125 if (!disconnect_callback_.is_null()) {
126 callback_task_runner_->PostTask(FROM_HERE,
127 std::move(disconnect_callback_));
128 }
129 return;
130 }
131 }
132
133 if (!has_more && !read_buffers_callback_.is_null()) {
134 callback_task_runner_->PostTask(FROM_HERE,
135 std::move(read_buffers_callback_));
136 }
137 }
138
139 private:
140 mojo::ScopedDataPipeProducerHandle stream_;
141 TracingSession::ReadBuffersCallback read_buffers_callback_;
142 base::OnceClosure disconnect_callback_;
143 scoped_refptr<base::SequencedTaskRunner> callback_task_runner_;
144
145 DISALLOW_COPY_AND_ASSIGN(StreamWriter);
146 };
147
TracingSession(ConsumerHost * host,mojo::PendingReceiver<mojom::TracingSessionHost> tracing_session_host,mojo::PendingRemote<mojom::TracingSessionClient> tracing_session_client,const perfetto::TraceConfig & trace_config,mojom::TracingClientPriority priority)148 ConsumerHost::TracingSession::TracingSession(
149 ConsumerHost* host,
150 mojo::PendingReceiver<mojom::TracingSessionHost> tracing_session_host,
151 mojo::PendingRemote<mojom::TracingSessionClient> tracing_session_client,
152 const perfetto::TraceConfig& trace_config,
153 mojom::TracingClientPriority priority)
154 : host_(host),
155 tracing_session_client_(std::move(tracing_session_client)),
156 receiver_(this, std::move(tracing_session_host)),
157 tracing_priority_(priority) {
158 host_->service()->RegisterTracingSession(this);
159
160 tracing_session_client_.set_disconnect_handler(base::BindOnce(
161 &ConsumerHost::DestructTracingSession, base::Unretained(host)));
162 receiver_.set_disconnect_handler(base::BindOnce(
163 &ConsumerHost::DestructTracingSession, base::Unretained(host)));
164
165 privacy_filtering_enabled_ = false;
166 for (const auto& data_source : trace_config.data_sources()) {
167 if (data_source.config().chrome_config().privacy_filtering_enabled()) {
168 privacy_filtering_enabled_ = true;
169 }
170 if (data_source.config().chrome_config().convert_to_legacy_json()) {
171 convert_to_legacy_json_ = true;
172 }
173 }
174 #if DCHECK_IS_ON()
175 if (privacy_filtering_enabled_) {
176 // If enabled, filtering must be enabled for all data sources.
177 for (const auto& data_source : trace_config.data_sources()) {
178 DCHECK(data_source.config().chrome_config().privacy_filtering_enabled());
179 }
180 }
181 #endif
182
183 filtered_pids_.clear();
184 for (const auto& ds_config : trace_config.data_sources()) {
185 if (ds_config.config().name() == mojom::kTraceEventDataSourceName) {
186 for (const auto& filter : ds_config.producer_name_filter()) {
187 base::ProcessId pid;
188 if (PerfettoService::ParsePidFromProducerName(filter, &pid)) {
189 filtered_pids_.insert(pid);
190 }
191 }
192 break;
193 }
194 }
195
196 pending_enable_tracing_ack_pids_ = host_->service()->active_service_pids();
197 base::EraseIf(*pending_enable_tracing_ack_pids_,
198 [this](base::ProcessId pid) { return !IsExpectedPid(pid); });
199
200 perfetto::TraceConfig effective_config(trace_config);
201 // If we're going to convert the data to JSON, don't enable privacy filtering
202 // at the data source level since it will be performed at conversion time
203 // (otherwise there's nothing to pass through the allowlist).
204 if (convert_to_legacy_json_ && privacy_filtering_enabled_) {
205 for (auto& data_source : *effective_config.mutable_data_sources()) {
206 auto* chrome_config =
207 data_source.mutable_config()->mutable_chrome_config();
208 chrome_config->set_privacy_filtering_enabled(false);
209 // Argument filtering should still be enabled together with privacy
210 // filtering to ensure, for example, that only the expected metadata gets
211 // written.
212 base::trace_event::TraceConfig base_config(chrome_config->trace_config());
213 base_config.EnableArgumentFilter();
214 chrome_config->set_trace_config(base_config.ToString());
215 }
216 }
217
218 host_->consumer_endpoint()->EnableTracing(effective_config);
219 MaybeSendEnableTracingAck();
220
221 if (pending_enable_tracing_ack_pids_) {
222 // We can't know for sure whether all processes we request to connect to the
223 // tracing service will connect back, or if all the connected services will
224 // ACK our EnableTracing request eventually, so we'll add a timeout for that
225 // case.
226 enable_tracing_ack_timer_.Start(
227 FROM_HERE, base::TimeDelta::FromSeconds(kEnableTracingTimeoutSeconds),
228 this, &ConsumerHost::TracingSession::OnEnableTracingTimeout);
229 }
230 }
231
~TracingSession()232 ConsumerHost::TracingSession::~TracingSession() {
233 host_->service()->UnregisterTracingSession(this);
234 if (host_->consumer_endpoint()) {
235 host_->consumer_endpoint()->FreeBuffers();
236 }
237 }
238
OnPerfettoEvents(const perfetto::ObservableEvents & events)239 void ConsumerHost::TracingSession::OnPerfettoEvents(
240 const perfetto::ObservableEvents& events) {
241 if (!pending_enable_tracing_ack_pids_ ||
242 !events.instance_state_changes_size()) {
243 return;
244 }
245
246 for (const auto& state_change : events.instance_state_changes()) {
247 DataSourceHandle handle(state_change.producer_name(),
248 state_change.data_source_name());
249 data_source_states_[handle] =
250 state_change.state() ==
251 perfetto::ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
252 }
253
254 // Data sources are first reported as being stopped before starting, so once
255 // all the data sources we know about have started we can declare tracing
256 // begun.
257 bool all_data_sources_started = std::all_of(
258 data_source_states_.cbegin(), data_source_states_.cend(),
259 [](std::pair<DataSourceHandle, bool> state) { return state.second; });
260 if (!all_data_sources_started)
261 return;
262
263 for (const auto& it : data_source_states_) {
264 // Attempt to parse the PID out of the producer name.
265 base::ProcessId pid;
266 if (!PerfettoService::ParsePidFromProducerName(it.first.producer_name(),
267 &pid)) {
268 continue;
269 }
270 pending_enable_tracing_ack_pids_->erase(pid);
271 }
272 MaybeSendEnableTracingAck();
273 }
274
OnActiveServicePidAdded(base::ProcessId pid)275 void ConsumerHost::TracingSession::OnActiveServicePidAdded(
276 base::ProcessId pid) {
277 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
278 if (pending_enable_tracing_ack_pids_ && IsExpectedPid(pid)) {
279 pending_enable_tracing_ack_pids_->insert(pid);
280 }
281 }
282
OnActiveServicePidRemoved(base::ProcessId pid)283 void ConsumerHost::TracingSession::OnActiveServicePidRemoved(
284 base::ProcessId pid) {
285 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
286 if (pending_enable_tracing_ack_pids_) {
287 pending_enable_tracing_ack_pids_->erase(pid);
288 MaybeSendEnableTracingAck();
289 }
290 }
291
OnActiveServicePidsInitialized()292 void ConsumerHost::TracingSession::OnActiveServicePidsInitialized() {
293 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
294 MaybeSendEnableTracingAck();
295 }
296
RequestDisableTracing(base::OnceClosure on_disabled_callback)297 void ConsumerHost::TracingSession::RequestDisableTracing(
298 base::OnceClosure on_disabled_callback) {
299 DCHECK(!on_disabled_callback_);
300 on_disabled_callback_ = std::move(on_disabled_callback);
301 DisableTracing();
302 }
303
OnEnableTracingTimeout()304 void ConsumerHost::TracingSession::OnEnableTracingTimeout() {
305 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
306 if (!pending_enable_tracing_ack_pids_) {
307 return;
308 }
309
310 std::stringstream error;
311 error << "Timed out waiting for processes to ack BeginTracing: ";
312 for (auto pid : *pending_enable_tracing_ack_pids_) {
313 error << pid << " ";
314 }
315 LOG(ERROR) << error.rdbuf();
316
317 DCHECK(tracing_session_client_);
318 tracing_session_client_->OnTracingEnabled();
319 pending_enable_tracing_ack_pids_.reset();
320 }
321
MaybeSendEnableTracingAck()322 void ConsumerHost::TracingSession::MaybeSendEnableTracingAck() {
323 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
324 if (!pending_enable_tracing_ack_pids_ ||
325 !pending_enable_tracing_ack_pids_->empty() ||
326 !host_->service()->active_service_pids_initialized()) {
327 return;
328 }
329
330 DCHECK(tracing_session_client_);
331 tracing_session_client_->OnTracingEnabled();
332 pending_enable_tracing_ack_pids_.reset();
333 enable_tracing_ack_timer_.Stop();
334 }
335
IsExpectedPid(base::ProcessId pid) const336 bool ConsumerHost::TracingSession::IsExpectedPid(base::ProcessId pid) const {
337 return filtered_pids_.empty() || base::Contains(filtered_pids_, pid);
338 }
339
ChangeTraceConfig(const perfetto::TraceConfig & trace_config)340 void ConsumerHost::TracingSession::ChangeTraceConfig(
341 const perfetto::TraceConfig& trace_config) {
342 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
343
344 host_->consumer_endpoint()->ChangeTraceConfig(trace_config);
345 }
346
DisableTracing()347 void ConsumerHost::TracingSession::DisableTracing() {
348 host_->consumer_endpoint()->DisableTracing();
349 }
350
OnTracingDisabled(const std::string & error)351 void ConsumerHost::TracingSession::OnTracingDisabled(const std::string& error) {
352 DCHECK(tracing_session_client_);
353
354 if (enable_tracing_ack_timer_.IsRunning()) {
355 enable_tracing_ack_timer_.FireNow();
356 }
357 DCHECK(!pending_enable_tracing_ack_pids_);
358
359 tracing_session_client_->OnTracingDisabled(
360 /*tracing_succeeded=*/error.empty());
361
362 if (trace_processor_) {
363 host_->consumer_endpoint()->ReadBuffers();
364 }
365
366 tracing_enabled_ = false;
367
368 if (on_disabled_callback_) {
369 std::move(on_disabled_callback_).Run();
370 }
371 }
372
OnConsumerClientDisconnected()373 void ConsumerHost::TracingSession::OnConsumerClientDisconnected() {
374 // The TracingSession will be deleted after this point.
375 host_->DestructTracingSession();
376 }
377
ReadBuffers(mojo::ScopedDataPipeProducerHandle stream,ReadBuffersCallback callback)378 void ConsumerHost::TracingSession::ReadBuffers(
379 mojo::ScopedDataPipeProducerHandle stream,
380 ReadBuffersCallback callback) {
381 DCHECK(!convert_to_legacy_json_);
382 read_buffers_stream_writer_ = base::SequenceBound<StreamWriter>(
383 StreamWriter::CreateTaskRunner(), std::move(stream), std::move(callback),
384 base::BindOnce(&TracingSession::OnConsumerClientDisconnected,
385 weak_factory_.GetWeakPtr()),
386 base::SequencedTaskRunnerHandle::Get());
387
388 host_->consumer_endpoint()->ReadBuffers();
389 }
390
RequestBufferUsage(RequestBufferUsageCallback callback)391 void ConsumerHost::TracingSession::RequestBufferUsage(
392 RequestBufferUsageCallback callback) {
393 if (!request_buffer_usage_callback_.is_null()) {
394 std::move(callback).Run(false, 0, false);
395 return;
396 }
397
398 request_buffer_usage_callback_ = std::move(callback);
399 host_->consumer_endpoint()->GetTraceStats();
400 }
401
DisableTracingAndEmitJson(const std::string & agent_label_filter,mojo::ScopedDataPipeProducerHandle stream,bool privacy_filtering_enabled,DisableTracingAndEmitJsonCallback callback)402 void ConsumerHost::TracingSession::DisableTracingAndEmitJson(
403 const std::string& agent_label_filter,
404 mojo::ScopedDataPipeProducerHandle stream,
405 bool privacy_filtering_enabled,
406 DisableTracingAndEmitJsonCallback callback) {
407 DCHECK(!read_buffers_stream_writer_);
408
409 read_buffers_stream_writer_ = base::SequenceBound<StreamWriter>(
410 StreamWriter::CreateTaskRunner(), std::move(stream), std::move(callback),
411 base::BindOnce(&TracingSession::OnConsumerClientDisconnected,
412 weak_factory_.GetWeakPtr()),
413 base::SequencedTaskRunnerHandle::Get());
414
415 if (privacy_filtering_enabled) {
416 // For filtering/allowlisting to be possible at JSON export time,
417 // filtering must not have been enabled during proto emission time
418 // (or there's nothing to pass through the allowlist).
419 DCHECK(!privacy_filtering_enabled_ || convert_to_legacy_json_);
420 privacy_filtering_enabled_ = true;
421 }
422
423 json_agent_label_filter_ = agent_label_filter;
424
425 perfetto::trace_processor::Config processor_config;
426 trace_processor_ =
427 perfetto::trace_processor::TraceProcessorStorage::CreateInstance(
428 processor_config);
429
430 if (tracing_enabled_) {
431 DisableTracing();
432 } else {
433 host_->consumer_endpoint()->ReadBuffers();
434 }
435 }
436
ExportJson()437 void ConsumerHost::TracingSession::ExportJson() {
438 // In legacy backend, the trace event agent sets the predicate used by
439 // TraceLog. For perfetto backend, ensure that predicate is always set
440 // before creating the exporter. The agent can be created later than this
441 // point.
442 if (base::trace_event::TraceLog::GetInstance()
443 ->GetArgumentFilterPredicate()
444 .is_null()) {
445 base::trace_event::TraceLog::GetInstance()->SetArgumentFilterPredicate(
446 base::BindRepeating(&IsTraceEventArgsAllowlisted));
447 base::trace_event::TraceLog::GetInstance()->SetMetadataFilterPredicate(
448 base::BindRepeating(&IsMetadataAllowlisted));
449 }
450
451 perfetto::trace_processor::json::ArgumentFilterPredicate argument_filter;
452 perfetto::trace_processor::json::MetadataFilterPredicate metadata_filter;
453 perfetto::trace_processor::json::LabelFilterPredicate label_filter;
454
455 if (privacy_filtering_enabled_) {
456 auto* trace_log = base::trace_event::TraceLog::GetInstance();
457 base::trace_event::ArgumentFilterPredicate argument_filter_predicate =
458 trace_log->GetArgumentFilterPredicate();
459 argument_filter =
460 [argument_filter_predicate](
461 const char* category_group_name, const char* event_name,
462 perfetto::trace_processor::json::ArgumentNameFilterPredicate*
463 name_filter) {
464 base::trace_event::ArgumentNameFilterPredicate name_filter_predicate;
465 bool result = argument_filter_predicate.Run(
466 category_group_name, event_name, &name_filter_predicate);
467 if (name_filter_predicate) {
468 *name_filter = [name_filter_predicate](const char* arg_name) {
469 return name_filter_predicate.Run(arg_name);
470 };
471 }
472 return result;
473 };
474 base::trace_event::MetadataFilterPredicate metadata_filter_predicate =
475 trace_log->GetMetadataFilterPredicate();
476 metadata_filter = [metadata_filter_predicate](const char* metadata_name) {
477 return metadata_filter_predicate.Run(metadata_name);
478 };
479 }
480
481 if (!json_agent_label_filter_.empty()) {
482 label_filter = [this](const char* label) {
483 return strcmp(label, json_agent_label_filter_.c_str()) == 0;
484 };
485 }
486
487 JsonStringOutputWriter output_writer(base::BindRepeating(
488 &ConsumerHost::TracingSession::OnJSONTraceData, base::Unretained(this)));
489 auto status = perfetto::trace_processor::json::ExportJson(
490 trace_processor_.get(), &output_writer, argument_filter, metadata_filter,
491 label_filter);
492 DCHECK(status.ok()) << status.message();
493 }
494
OnJSONTraceData(std::string json,bool has_more)495 void ConsumerHost::TracingSession::OnJSONTraceData(std::string json,
496 bool has_more) {
497 auto slice = std::make_unique<StreamWriter::Slice>();
498 slice->swap(json);
499 read_buffers_stream_writer_.AsyncCall(&StreamWriter::WriteToStream)
500 .WithArgs(std::move(slice), has_more);
501
502 if (!has_more) {
503 read_buffers_stream_writer_.Reset();
504 }
505 }
506
OnTraceData(std::vector<perfetto::TracePacket> packets,bool has_more)507 void ConsumerHost::TracingSession::OnTraceData(
508 std::vector<perfetto::TracePacket> packets,
509 bool has_more) {
510 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
511
512 // Calculate space needed for trace chunk. Each packet has a preamble and
513 // payload size.
514 size_t max_size = packets.size() * perfetto::TracePacket::kMaxPreambleBytes;
515 for (const auto& packet : packets) {
516 max_size += packet.size();
517 }
518
519 if (trace_processor_) {
520 // Copy packets into a trace file chunk.
521 size_t position = 0;
522 std::unique_ptr<uint8_t[]> data(new uint8_t[max_size]);
523 for (perfetto::TracePacket& packet : packets) {
524 char* preamble;
525 size_t preamble_size;
526 std::tie(preamble, preamble_size) = packet.GetProtoPreamble();
527 DCHECK_LT(position + preamble_size, max_size);
528 memcpy(&data[position], preamble, preamble_size);
529 position += preamble_size;
530 for (const perfetto::Slice& slice : packet.slices()) {
531 DCHECK_LT(position + slice.size, max_size);
532 memcpy(&data[position], slice.start, slice.size);
533 position += slice.size;
534 }
535 }
536
537 auto status = trace_processor_->Parse(std::move(data), position);
538 // TODO(eseckler): There's no way to propagate this error at the moment - If
539 // one occurs on production builds, we silently ignore it and will end up
540 // producing an empty JSON result.
541 DCHECK(status.ok()) << status.message();
542 if (!has_more) {
543 trace_processor_->NotifyEndOfFile();
544 ExportJson();
545 trace_processor_.reset();
546 }
547 return;
548 }
549
550 // Copy packets into a trace slice.
551 auto chunk = std::make_unique<StreamWriter::Slice>();
552 chunk->reserve(max_size);
553 for (auto& packet : packets) {
554 char* data;
555 size_t size;
556 std::tie(data, size) = packet.GetProtoPreamble();
557 chunk->append(data, size);
558 auto& slices = packet.slices();
559 for (auto& slice : slices) {
560 chunk->append(static_cast<const char*>(slice.start), slice.size);
561 }
562 }
563 read_buffers_stream_writer_.AsyncCall(&StreamWriter::WriteToStream)
564 .WithArgs(std::move(chunk), has_more);
565 if (!has_more) {
566 read_buffers_stream_writer_.Reset();
567 }
568 }
569
OnTraceStats(bool success,const perfetto::TraceStats & stats)570 void ConsumerHost::TracingSession::OnTraceStats(
571 bool success,
572 const perfetto::TraceStats& stats) {
573 if (!request_buffer_usage_callback_) {
574 return;
575 }
576
577 if (!success || stats.buffer_stats_size() != 1) {
578 std::move(request_buffer_usage_callback_).Run(false, 0.0f, false);
579 return;
580 }
581 double percent_full = GetTraceBufferUsage(stats);
582 bool data_loss = HasLostData(stats);
583 std::move(request_buffer_usage_callback_).Run(true, percent_full, data_loss);
584 }
585
Flush(uint32_t timeout,base::OnceCallback<void (bool)> callback)586 void ConsumerHost::TracingSession::Flush(
587 uint32_t timeout,
588 base::OnceCallback<void(bool)> callback) {
589 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
590 flush_callback_ = std::move(callback);
591 base::WeakPtr<TracingSession> weak_this = weak_factory_.GetWeakPtr();
592 host_->consumer_endpoint()->Flush(timeout, [weak_this](bool success) {
593 if (!weak_this) {
594 return;
595 }
596
597 if (weak_this->flush_callback_) {
598 std::move(weak_this->flush_callback_).Run(success);
599 }
600 });
601 }
602
603 // static
BindConsumerReceiver(PerfettoService * service,mojo::PendingReceiver<mojom::ConsumerHost> receiver)604 void ConsumerHost::BindConsumerReceiver(
605 PerfettoService* service,
606 mojo::PendingReceiver<mojom::ConsumerHost> receiver) {
607 mojo::MakeSelfOwnedReceiver(std::make_unique<ConsumerHost>(service),
608 std::move(receiver));
609 }
610
ConsumerHost(PerfettoService * service)611 ConsumerHost::ConsumerHost(PerfettoService* service) : service_(service) {
612 DETACH_FROM_SEQUENCE(sequence_checker_);
613 consumer_endpoint_ =
614 service_->GetService()->ConnectConsumer(this, 0 /*uid_t*/);
615 consumer_endpoint_->ObserveEvents(
616 perfetto::ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
617 }
618
~ConsumerHost()619 ConsumerHost::~ConsumerHost() {
620 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
621 // Make sure the tracing_session is destroyed first, as it keeps a pointer to
622 // the ConsumerHost parent and accesses it on destruction.
623 tracing_session_.reset();
624 }
625
EnableTracing(mojo::PendingReceiver<mojom::TracingSessionHost> tracing_session_host,mojo::PendingRemote<mojom::TracingSessionClient> tracing_session_client,const perfetto::TraceConfig & trace_config)626 void ConsumerHost::EnableTracing(
627 mojo::PendingReceiver<mojom::TracingSessionHost> tracing_session_host,
628 mojo::PendingRemote<mojom::TracingSessionClient> tracing_session_client,
629 const perfetto::TraceConfig& trace_config) {
630 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
631 DCHECK(!tracing_session_);
632
633 auto priority = mojom::TracingClientPriority::kUnknown;
634 for (const auto& data_source : trace_config.data_sources()) {
635 if (!data_source.has_config() ||
636 !data_source.config().has_chrome_config()) {
637 continue;
638 }
639 switch (data_source.config().chrome_config().client_priority()) {
640 case perfetto::protos::gen::ChromeConfig::BACKGROUND:
641 priority =
642 std::max(priority, mojom::TracingClientPriority::kBackground);
643 break;
644 case perfetto::protos::gen::ChromeConfig::USER_INITIATED:
645 priority =
646 std::max(priority, mojom::TracingClientPriority::kUserInitiated);
647 break;
648 default:
649 case perfetto::protos::gen::ChromeConfig::UNKNOWN:
650 break;
651 }
652 }
653
654 // We create our new TracingSession async, if the PerfettoService allows
655 // us to, after it's stopped any currently running lower or equal priority
656 // tracing sessions.
657 service_->RequestTracingSession(
658 priority, base::BindOnce(
659 [](base::WeakPtr<ConsumerHost> weak_this,
660 mojo::PendingReceiver<mojom::TracingSessionHost>
661 tracing_session_host,
662 mojo::PendingRemote<mojom::TracingSessionClient>
663 tracing_session_client,
664 const perfetto::TraceConfig& trace_config,
665 mojom::TracingClientPriority priority) {
666 if (!weak_this) {
667 return;
668 }
669
670 weak_this->tracing_session_ =
671 std::make_unique<TracingSession>(
672 weak_this.get(), std::move(tracing_session_host),
673 std::move(tracing_session_client), trace_config,
674 priority);
675 },
676 weak_factory_.GetWeakPtr(), std::move(tracing_session_host),
677 std::move(tracing_session_client), trace_config, priority));
678 }
679
OnConnect()680 void ConsumerHost::OnConnect() {}
681
OnDisconnect()682 void ConsumerHost::OnDisconnect() {}
683
OnTracingDisabled(const std::string & error)684 void ConsumerHost::OnTracingDisabled(const std::string& error) {
685 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
686 if (tracing_session_) {
687 tracing_session_->OnTracingDisabled(error);
688 }
689 }
690
OnTraceData(std::vector<perfetto::TracePacket> packets,bool has_more)691 void ConsumerHost::OnTraceData(std::vector<perfetto::TracePacket> packets,
692 bool has_more) {
693 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
694 if (tracing_session_) {
695 tracing_session_->OnTraceData(std::move(packets), has_more);
696 }
697 }
698
OnObservableEvents(const perfetto::ObservableEvents & events)699 void ConsumerHost::OnObservableEvents(
700 const perfetto::ObservableEvents& events) {
701 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
702 if (tracing_session_) {
703 tracing_session_->OnPerfettoEvents(events);
704 }
705 }
706
OnTraceStats(bool success,const perfetto::TraceStats & stats)707 void ConsumerHost::OnTraceStats(bool success,
708 const perfetto::TraceStats& stats) {
709 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
710 if (tracing_session_) {
711 tracing_session_->OnTraceStats(success, stats);
712 }
713 }
714
DestructTracingSession()715 void ConsumerHost::DestructTracingSession() {
716 tracing_session_.reset();
717 }
718
719 } // namespace tracing
720