1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "services/tracing/perfetto/consumer_host.h"
6 
7 #include <algorithm>
8 #include <cstring>
9 #include <string>
10 #include <tuple>
11 #include <utility>
12 #include <vector>
13 
14 #include "base/logging.h"
15 #include "base/numerics/ranges.h"
16 #include "base/stl_util.h"
17 #include "base/strings/strcat.h"
18 #include "base/strings/string_number_conversions.h"
19 #include "base/strings/string_util.h"
20 #include "base/task/post_task.h"
21 #include "base/task/thread_pool.h"
22 #include "base/trace_event/trace_log.h"
23 #include "base/values.h"
24 #include "build/build_config.h"
25 #include "mojo/public/cpp/bindings/self_owned_receiver.h"
26 #include "mojo/public/cpp/system/wait.h"
27 #include "services/tracing/perfetto/perfetto_service.h"
28 #include "services/tracing/public/cpp/perfetto/perfetto_session.h"
29 #include "services/tracing/public/cpp/trace_event_args_allowlist.h"
30 #include "third_party/perfetto/include/perfetto/ext/trace_processor/export_json.h"
31 #include "third_party/perfetto/include/perfetto/ext/tracing/core/observable_events.h"
32 #include "third_party/perfetto/include/perfetto/ext/tracing/core/slice.h"
33 #include "third_party/perfetto/include/perfetto/ext/tracing/core/trace_packet.h"
34 #include "third_party/perfetto/include/perfetto/ext/tracing/core/trace_stats.h"
35 #include "third_party/perfetto/include/perfetto/trace_processor/basic_types.h"
36 #include "third_party/perfetto/include/perfetto/trace_processor/trace_processor_storage.h"
37 #include "third_party/perfetto/include/perfetto/tracing/core/trace_config.h"
38 #include "third_party/perfetto/protos/perfetto/config/trace_config.pb.h"
39 
40 namespace tracing {
41 
42 namespace {
43 
44 const int32_t kEnableTracingTimeoutSeconds = 10;
45 
46 class JsonStringOutputWriter
47     : public perfetto::trace_processor::json::OutputWriter {
48  public:
49   using FlushCallback =
50       base::RepeatingCallback<void(std::string json, bool has_more)>;
51 
JsonStringOutputWriter(FlushCallback flush_callback)52   JsonStringOutputWriter(FlushCallback flush_callback)
53       : flush_callback_(std::move(flush_callback)) {
54     buffer_.reserve(kBufferReserveCapacity);
55   }
56 
~JsonStringOutputWriter()57   ~JsonStringOutputWriter() override {
58     flush_callback_.Run(std::move(buffer_), false);
59   }
60 
AppendString(const std::string & string)61   perfetto::trace_processor::util::Status AppendString(
62       const std::string& string) override {
63     buffer_ += string;
64     if (buffer_.size() > kBufferLimitInBytes) {
65       flush_callback_.Run(std::move(buffer_), true);
66       // Reset the buffer_ after moving it above.
67       buffer_.clear();
68       buffer_.reserve(kBufferReserveCapacity);
69     }
70     return perfetto::trace_processor::util::OkStatus();
71   }
72 
73  private:
74   static constexpr size_t kBufferLimitInBytes = 100 * 1024;
75   // Since we write each string before checking the limit, we'll always go
76   // slightly over and hence we reserve some extra space to avoid most
77   // reallocs.
78   static constexpr size_t kBufferReserveCapacity = kBufferLimitInBytes * 5 / 4;
79 
80   FlushCallback flush_callback_;
81   std::string buffer_;
82 };
83 
84 }  // namespace
85 
86 class ConsumerHost::StreamWriter {
87  public:
88   using Slice = std::string;
89 
CreateTaskRunner()90   static scoped_refptr<base::SequencedTaskRunner> CreateTaskRunner() {
91     return base::ThreadPool::CreateSequencedTaskRunner(
92         {base::WithBaseSyncPrimitives(), base::TaskPriority::BEST_EFFORT});
93   }
94 
StreamWriter(mojo::ScopedDataPipeProducerHandle stream,TracingSession::ReadBuffersCallback callback,base::OnceClosure disconnect_callback,scoped_refptr<base::SequencedTaskRunner> callback_task_runner)95   StreamWriter(mojo::ScopedDataPipeProducerHandle stream,
96                TracingSession::ReadBuffersCallback callback,
97                base::OnceClosure disconnect_callback,
98                scoped_refptr<base::SequencedTaskRunner> callback_task_runner)
99       : stream_(std::move(stream)),
100         read_buffers_callback_(std::move(callback)),
101         disconnect_callback_(std::move(disconnect_callback)),
102         callback_task_runner_(callback_task_runner) {}
103 
WriteToStream(std::unique_ptr<Slice> slice,bool has_more)104   void WriteToStream(std::unique_ptr<Slice> slice, bool has_more) {
105     DCHECK(stream_.is_valid());
106 
107     uint32_t write_position = 0;
108     while (write_position < slice->size()) {
109       uint32_t write_bytes = slice->size() - write_position;
110 
111       MojoResult result =
112           stream_->WriteData(slice->data() + write_position, &write_bytes,
113                              MOJO_WRITE_DATA_FLAG_NONE);
114 
115       if (result == MOJO_RESULT_OK) {
116         write_position += write_bytes;
117         continue;
118       }
119 
120       if (result == MOJO_RESULT_SHOULD_WAIT) {
121         result = mojo::Wait(stream_.get(), MOJO_HANDLE_SIGNAL_WRITABLE);
122       }
123 
124       if (result != MOJO_RESULT_OK) {
125         if (!disconnect_callback_.is_null()) {
126           callback_task_runner_->PostTask(FROM_HERE,
127                                           std::move(disconnect_callback_));
128         }
129         return;
130       }
131     }
132 
133     if (!has_more && !read_buffers_callback_.is_null()) {
134       callback_task_runner_->PostTask(FROM_HERE,
135                                       std::move(read_buffers_callback_));
136     }
137   }
138 
139  private:
140   mojo::ScopedDataPipeProducerHandle stream_;
141   TracingSession::ReadBuffersCallback read_buffers_callback_;
142   base::OnceClosure disconnect_callback_;
143   scoped_refptr<base::SequencedTaskRunner> callback_task_runner_;
144 
145   DISALLOW_COPY_AND_ASSIGN(StreamWriter);
146 };
147 
TracingSession(ConsumerHost * host,mojo::PendingReceiver<mojom::TracingSessionHost> tracing_session_host,mojo::PendingRemote<mojom::TracingSessionClient> tracing_session_client,const perfetto::TraceConfig & trace_config,mojom::TracingClientPriority priority)148 ConsumerHost::TracingSession::TracingSession(
149     ConsumerHost* host,
150     mojo::PendingReceiver<mojom::TracingSessionHost> tracing_session_host,
151     mojo::PendingRemote<mojom::TracingSessionClient> tracing_session_client,
152     const perfetto::TraceConfig& trace_config,
153     mojom::TracingClientPriority priority)
154     : host_(host),
155       tracing_session_client_(std::move(tracing_session_client)),
156       receiver_(this, std::move(tracing_session_host)),
157       tracing_priority_(priority) {
158   host_->service()->RegisterTracingSession(this);
159 
160   tracing_session_client_.set_disconnect_handler(base::BindOnce(
161       &ConsumerHost::DestructTracingSession, base::Unretained(host)));
162   receiver_.set_disconnect_handler(base::BindOnce(
163       &ConsumerHost::DestructTracingSession, base::Unretained(host)));
164 
165   privacy_filtering_enabled_ = false;
166   for (const auto& data_source : trace_config.data_sources()) {
167     if (data_source.config().chrome_config().privacy_filtering_enabled()) {
168       privacy_filtering_enabled_ = true;
169     }
170     if (data_source.config().chrome_config().convert_to_legacy_json()) {
171       convert_to_legacy_json_ = true;
172     }
173   }
174 #if DCHECK_IS_ON()
175   if (privacy_filtering_enabled_) {
176     // If enabled, filtering must be enabled for all data sources.
177     for (const auto& data_source : trace_config.data_sources()) {
178       DCHECK(data_source.config().chrome_config().privacy_filtering_enabled());
179     }
180   }
181 #endif
182 
183   filtered_pids_.clear();
184   for (const auto& ds_config : trace_config.data_sources()) {
185     if (ds_config.config().name() == mojom::kTraceEventDataSourceName) {
186       for (const auto& filter : ds_config.producer_name_filter()) {
187         base::ProcessId pid;
188         if (PerfettoService::ParsePidFromProducerName(filter, &pid)) {
189           filtered_pids_.insert(pid);
190         }
191       }
192       break;
193     }
194   }
195 
196   pending_enable_tracing_ack_pids_ = host_->service()->active_service_pids();
197   base::EraseIf(*pending_enable_tracing_ack_pids_,
198                 [this](base::ProcessId pid) { return !IsExpectedPid(pid); });
199 
200   perfetto::TraceConfig effective_config(trace_config);
201   // If we're going to convert the data to JSON, don't enable privacy filtering
202   // at the data source level since it will be performed at conversion time
203   // (otherwise there's nothing to pass through the allowlist).
204   if (convert_to_legacy_json_ && privacy_filtering_enabled_) {
205     for (auto& data_source : *effective_config.mutable_data_sources()) {
206       auto* chrome_config =
207           data_source.mutable_config()->mutable_chrome_config();
208       chrome_config->set_privacy_filtering_enabled(false);
209       // Argument filtering should still be enabled together with privacy
210       // filtering to ensure, for example, that only the expected metadata gets
211       // written.
212       base::trace_event::TraceConfig base_config(chrome_config->trace_config());
213       base_config.EnableArgumentFilter();
214       chrome_config->set_trace_config(base_config.ToString());
215     }
216   }
217 
218   host_->consumer_endpoint()->EnableTracing(effective_config);
219   MaybeSendEnableTracingAck();
220 
221   if (pending_enable_tracing_ack_pids_) {
222     // We can't know for sure whether all processes we request to connect to the
223     // tracing service will connect back, or if all the connected services will
224     // ACK our EnableTracing request eventually, so we'll add a timeout for that
225     // case.
226     enable_tracing_ack_timer_.Start(
227         FROM_HERE, base::TimeDelta::FromSeconds(kEnableTracingTimeoutSeconds),
228         this, &ConsumerHost::TracingSession::OnEnableTracingTimeout);
229   }
230 }
231 
~TracingSession()232 ConsumerHost::TracingSession::~TracingSession() {
233   host_->service()->UnregisterTracingSession(this);
234   if (host_->consumer_endpoint()) {
235     host_->consumer_endpoint()->FreeBuffers();
236   }
237 }
238 
OnPerfettoEvents(const perfetto::ObservableEvents & events)239 void ConsumerHost::TracingSession::OnPerfettoEvents(
240     const perfetto::ObservableEvents& events) {
241   if (!pending_enable_tracing_ack_pids_ ||
242       !events.instance_state_changes_size()) {
243     return;
244   }
245 
246   for (const auto& state_change : events.instance_state_changes()) {
247     DataSourceHandle handle(state_change.producer_name(),
248                             state_change.data_source_name());
249     data_source_states_[handle] =
250         state_change.state() ==
251         perfetto::ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
252   }
253 
254   // Data sources are first reported as being stopped before starting, so once
255   // all the data sources we know about have started we can declare tracing
256   // begun.
257   bool all_data_sources_started = std::all_of(
258       data_source_states_.cbegin(), data_source_states_.cend(),
259       [](std::pair<DataSourceHandle, bool> state) { return state.second; });
260   if (!all_data_sources_started)
261     return;
262 
263   for (const auto& it : data_source_states_) {
264     // Attempt to parse the PID out of the producer name.
265     base::ProcessId pid;
266     if (!PerfettoService::ParsePidFromProducerName(it.first.producer_name(),
267                                                    &pid)) {
268       continue;
269     }
270     pending_enable_tracing_ack_pids_->erase(pid);
271   }
272   MaybeSendEnableTracingAck();
273 }
274 
OnActiveServicePidAdded(base::ProcessId pid)275 void ConsumerHost::TracingSession::OnActiveServicePidAdded(
276     base::ProcessId pid) {
277   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
278   if (pending_enable_tracing_ack_pids_ && IsExpectedPid(pid)) {
279     pending_enable_tracing_ack_pids_->insert(pid);
280   }
281 }
282 
OnActiveServicePidRemoved(base::ProcessId pid)283 void ConsumerHost::TracingSession::OnActiveServicePidRemoved(
284     base::ProcessId pid) {
285   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
286   if (pending_enable_tracing_ack_pids_) {
287     pending_enable_tracing_ack_pids_->erase(pid);
288     MaybeSendEnableTracingAck();
289   }
290 }
291 
OnActiveServicePidsInitialized()292 void ConsumerHost::TracingSession::OnActiveServicePidsInitialized() {
293   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
294   MaybeSendEnableTracingAck();
295 }
296 
RequestDisableTracing(base::OnceClosure on_disabled_callback)297 void ConsumerHost::TracingSession::RequestDisableTracing(
298     base::OnceClosure on_disabled_callback) {
299   DCHECK(!on_disabled_callback_);
300   on_disabled_callback_ = std::move(on_disabled_callback);
301   DisableTracing();
302 }
303 
OnEnableTracingTimeout()304 void ConsumerHost::TracingSession::OnEnableTracingTimeout() {
305   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
306   if (!pending_enable_tracing_ack_pids_) {
307     return;
308   }
309 
310   std::stringstream error;
311   error << "Timed out waiting for processes to ack BeginTracing: ";
312   for (auto pid : *pending_enable_tracing_ack_pids_) {
313     error << pid << " ";
314   }
315   LOG(ERROR) << error.rdbuf();
316 
317   DCHECK(tracing_session_client_);
318   tracing_session_client_->OnTracingEnabled();
319   pending_enable_tracing_ack_pids_.reset();
320 }
321 
MaybeSendEnableTracingAck()322 void ConsumerHost::TracingSession::MaybeSendEnableTracingAck() {
323   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
324   if (!pending_enable_tracing_ack_pids_ ||
325       !pending_enable_tracing_ack_pids_->empty() ||
326       !host_->service()->active_service_pids_initialized()) {
327     return;
328   }
329 
330   DCHECK(tracing_session_client_);
331   tracing_session_client_->OnTracingEnabled();
332   pending_enable_tracing_ack_pids_.reset();
333   enable_tracing_ack_timer_.Stop();
334 }
335 
IsExpectedPid(base::ProcessId pid) const336 bool ConsumerHost::TracingSession::IsExpectedPid(base::ProcessId pid) const {
337   return filtered_pids_.empty() || base::Contains(filtered_pids_, pid);
338 }
339 
ChangeTraceConfig(const perfetto::TraceConfig & trace_config)340 void ConsumerHost::TracingSession::ChangeTraceConfig(
341     const perfetto::TraceConfig& trace_config) {
342   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
343 
344   host_->consumer_endpoint()->ChangeTraceConfig(trace_config);
345 }
346 
DisableTracing()347 void ConsumerHost::TracingSession::DisableTracing() {
348   host_->consumer_endpoint()->DisableTracing();
349 }
350 
OnTracingDisabled(const std::string & error)351 void ConsumerHost::TracingSession::OnTracingDisabled(const std::string& error) {
352   DCHECK(tracing_session_client_);
353 
354   if (enable_tracing_ack_timer_.IsRunning()) {
355     enable_tracing_ack_timer_.FireNow();
356   }
357   DCHECK(!pending_enable_tracing_ack_pids_);
358 
359   tracing_session_client_->OnTracingDisabled(
360       /*tracing_succeeded=*/error.empty());
361 
362   if (trace_processor_) {
363     host_->consumer_endpoint()->ReadBuffers();
364   }
365 
366   tracing_enabled_ = false;
367 
368   if (on_disabled_callback_) {
369     std::move(on_disabled_callback_).Run();
370   }
371 }
372 
OnConsumerClientDisconnected()373 void ConsumerHost::TracingSession::OnConsumerClientDisconnected() {
374   // The TracingSession will be deleted after this point.
375   host_->DestructTracingSession();
376 }
377 
ReadBuffers(mojo::ScopedDataPipeProducerHandle stream,ReadBuffersCallback callback)378 void ConsumerHost::TracingSession::ReadBuffers(
379     mojo::ScopedDataPipeProducerHandle stream,
380     ReadBuffersCallback callback) {
381   DCHECK(!convert_to_legacy_json_);
382   read_buffers_stream_writer_ = base::SequenceBound<StreamWriter>(
383       StreamWriter::CreateTaskRunner(), std::move(stream), std::move(callback),
384       base::BindOnce(&TracingSession::OnConsumerClientDisconnected,
385                      weak_factory_.GetWeakPtr()),
386       base::SequencedTaskRunnerHandle::Get());
387 
388   host_->consumer_endpoint()->ReadBuffers();
389 }
390 
RequestBufferUsage(RequestBufferUsageCallback callback)391 void ConsumerHost::TracingSession::RequestBufferUsage(
392     RequestBufferUsageCallback callback) {
393   if (!request_buffer_usage_callback_.is_null()) {
394     std::move(callback).Run(false, 0, false);
395     return;
396   }
397 
398   request_buffer_usage_callback_ = std::move(callback);
399   host_->consumer_endpoint()->GetTraceStats();
400 }
401 
DisableTracingAndEmitJson(const std::string & agent_label_filter,mojo::ScopedDataPipeProducerHandle stream,bool privacy_filtering_enabled,DisableTracingAndEmitJsonCallback callback)402 void ConsumerHost::TracingSession::DisableTracingAndEmitJson(
403     const std::string& agent_label_filter,
404     mojo::ScopedDataPipeProducerHandle stream,
405     bool privacy_filtering_enabled,
406     DisableTracingAndEmitJsonCallback callback) {
407   DCHECK(!read_buffers_stream_writer_);
408 
409   read_buffers_stream_writer_ = base::SequenceBound<StreamWriter>(
410       StreamWriter::CreateTaskRunner(), std::move(stream), std::move(callback),
411       base::BindOnce(&TracingSession::OnConsumerClientDisconnected,
412                      weak_factory_.GetWeakPtr()),
413       base::SequencedTaskRunnerHandle::Get());
414 
415   if (privacy_filtering_enabled) {
416     // For filtering/allowlisting to be possible at JSON export time,
417     // filtering must not have been enabled during proto emission time
418     // (or there's nothing to pass through the allowlist).
419     DCHECK(!privacy_filtering_enabled_ || convert_to_legacy_json_);
420     privacy_filtering_enabled_ = true;
421   }
422 
423   json_agent_label_filter_ = agent_label_filter;
424 
425   perfetto::trace_processor::Config processor_config;
426   trace_processor_ =
427       perfetto::trace_processor::TraceProcessorStorage::CreateInstance(
428           processor_config);
429 
430   if (tracing_enabled_) {
431     DisableTracing();
432   } else {
433     host_->consumer_endpoint()->ReadBuffers();
434   }
435 }
436 
ExportJson()437 void ConsumerHost::TracingSession::ExportJson() {
438   // In legacy backend, the trace event agent sets the predicate used by
439   // TraceLog. For perfetto backend, ensure that predicate is always set
440   // before creating the exporter. The agent can be created later than this
441   // point.
442   if (base::trace_event::TraceLog::GetInstance()
443           ->GetArgumentFilterPredicate()
444           .is_null()) {
445     base::trace_event::TraceLog::GetInstance()->SetArgumentFilterPredicate(
446         base::BindRepeating(&IsTraceEventArgsAllowlisted));
447     base::trace_event::TraceLog::GetInstance()->SetMetadataFilterPredicate(
448         base::BindRepeating(&IsMetadataAllowlisted));
449   }
450 
451   perfetto::trace_processor::json::ArgumentFilterPredicate argument_filter;
452   perfetto::trace_processor::json::MetadataFilterPredicate metadata_filter;
453   perfetto::trace_processor::json::LabelFilterPredicate label_filter;
454 
455   if (privacy_filtering_enabled_) {
456     auto* trace_log = base::trace_event::TraceLog::GetInstance();
457     base::trace_event::ArgumentFilterPredicate argument_filter_predicate =
458         trace_log->GetArgumentFilterPredicate();
459     argument_filter =
460         [argument_filter_predicate](
461             const char* category_group_name, const char* event_name,
462             perfetto::trace_processor::json::ArgumentNameFilterPredicate*
463                 name_filter) {
464           base::trace_event::ArgumentNameFilterPredicate name_filter_predicate;
465           bool result = argument_filter_predicate.Run(
466               category_group_name, event_name, &name_filter_predicate);
467           if (name_filter_predicate) {
468             *name_filter = [name_filter_predicate](const char* arg_name) {
469               return name_filter_predicate.Run(arg_name);
470             };
471           }
472           return result;
473         };
474     base::trace_event::MetadataFilterPredicate metadata_filter_predicate =
475         trace_log->GetMetadataFilterPredicate();
476     metadata_filter = [metadata_filter_predicate](const char* metadata_name) {
477       return metadata_filter_predicate.Run(metadata_name);
478     };
479   }
480 
481   if (!json_agent_label_filter_.empty()) {
482     label_filter = [this](const char* label) {
483       return strcmp(label, json_agent_label_filter_.c_str()) == 0;
484     };
485   }
486 
487   JsonStringOutputWriter output_writer(base::BindRepeating(
488       &ConsumerHost::TracingSession::OnJSONTraceData, base::Unretained(this)));
489   auto status = perfetto::trace_processor::json::ExportJson(
490       trace_processor_.get(), &output_writer, argument_filter, metadata_filter,
491       label_filter);
492   DCHECK(status.ok()) << status.message();
493 }
494 
OnJSONTraceData(std::string json,bool has_more)495 void ConsumerHost::TracingSession::OnJSONTraceData(std::string json,
496                                                    bool has_more) {
497   auto slice = std::make_unique<StreamWriter::Slice>();
498   slice->swap(json);
499   read_buffers_stream_writer_.AsyncCall(&StreamWriter::WriteToStream)
500       .WithArgs(std::move(slice), has_more);
501 
502   if (!has_more) {
503     read_buffers_stream_writer_.Reset();
504   }
505 }
506 
OnTraceData(std::vector<perfetto::TracePacket> packets,bool has_more)507 void ConsumerHost::TracingSession::OnTraceData(
508     std::vector<perfetto::TracePacket> packets,
509     bool has_more) {
510   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
511 
512   // Calculate space needed for trace chunk. Each packet has a preamble and
513   // payload size.
514   size_t max_size = packets.size() * perfetto::TracePacket::kMaxPreambleBytes;
515   for (const auto& packet : packets) {
516     max_size += packet.size();
517   }
518 
519   if (trace_processor_) {
520     // Copy packets into a trace file chunk.
521     size_t position = 0;
522     std::unique_ptr<uint8_t[]> data(new uint8_t[max_size]);
523     for (perfetto::TracePacket& packet : packets) {
524       char* preamble;
525       size_t preamble_size;
526       std::tie(preamble, preamble_size) = packet.GetProtoPreamble();
527       DCHECK_LT(position + preamble_size, max_size);
528       memcpy(&data[position], preamble, preamble_size);
529       position += preamble_size;
530       for (const perfetto::Slice& slice : packet.slices()) {
531         DCHECK_LT(position + slice.size, max_size);
532         memcpy(&data[position], slice.start, slice.size);
533         position += slice.size;
534       }
535     }
536 
537     auto status = trace_processor_->Parse(std::move(data), position);
538     // TODO(eseckler): There's no way to propagate this error at the moment - If
539     // one occurs on production builds, we silently ignore it and will end up
540     // producing an empty JSON result.
541     DCHECK(status.ok()) << status.message();
542     if (!has_more) {
543       trace_processor_->NotifyEndOfFile();
544       ExportJson();
545       trace_processor_.reset();
546     }
547     return;
548   }
549 
550   // Copy packets into a trace slice.
551   auto chunk = std::make_unique<StreamWriter::Slice>();
552   chunk->reserve(max_size);
553   for (auto& packet : packets) {
554     char* data;
555     size_t size;
556     std::tie(data, size) = packet.GetProtoPreamble();
557     chunk->append(data, size);
558     auto& slices = packet.slices();
559     for (auto& slice : slices) {
560       chunk->append(static_cast<const char*>(slice.start), slice.size);
561     }
562   }
563   read_buffers_stream_writer_.AsyncCall(&StreamWriter::WriteToStream)
564       .WithArgs(std::move(chunk), has_more);
565   if (!has_more) {
566     read_buffers_stream_writer_.Reset();
567   }
568 }
569 
OnTraceStats(bool success,const perfetto::TraceStats & stats)570 void ConsumerHost::TracingSession::OnTraceStats(
571     bool success,
572     const perfetto::TraceStats& stats) {
573   if (!request_buffer_usage_callback_) {
574     return;
575   }
576 
577   if (!success || stats.buffer_stats_size() != 1) {
578     std::move(request_buffer_usage_callback_).Run(false, 0.0f, false);
579     return;
580   }
581   double percent_full = GetTraceBufferUsage(stats);
582   bool data_loss = HasLostData(stats);
583   std::move(request_buffer_usage_callback_).Run(true, percent_full, data_loss);
584 }
585 
Flush(uint32_t timeout,base::OnceCallback<void (bool)> callback)586 void ConsumerHost::TracingSession::Flush(
587     uint32_t timeout,
588     base::OnceCallback<void(bool)> callback) {
589   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
590   flush_callback_ = std::move(callback);
591   base::WeakPtr<TracingSession> weak_this = weak_factory_.GetWeakPtr();
592   host_->consumer_endpoint()->Flush(timeout, [weak_this](bool success) {
593     if (!weak_this) {
594       return;
595     }
596 
597     if (weak_this->flush_callback_) {
598       std::move(weak_this->flush_callback_).Run(success);
599     }
600   });
601 }
602 
603 // static
BindConsumerReceiver(PerfettoService * service,mojo::PendingReceiver<mojom::ConsumerHost> receiver)604 void ConsumerHost::BindConsumerReceiver(
605     PerfettoService* service,
606     mojo::PendingReceiver<mojom::ConsumerHost> receiver) {
607   mojo::MakeSelfOwnedReceiver(std::make_unique<ConsumerHost>(service),
608                               std::move(receiver));
609 }
610 
ConsumerHost(PerfettoService * service)611 ConsumerHost::ConsumerHost(PerfettoService* service) : service_(service) {
612   DETACH_FROM_SEQUENCE(sequence_checker_);
613   consumer_endpoint_ =
614       service_->GetService()->ConnectConsumer(this, 0 /*uid_t*/);
615   consumer_endpoint_->ObserveEvents(
616       perfetto::ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
617 }
618 
~ConsumerHost()619 ConsumerHost::~ConsumerHost() {
620   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
621   // Make sure the tracing_session is destroyed first, as it keeps a pointer to
622   // the ConsumerHost parent and accesses it on destruction.
623   tracing_session_.reset();
624 }
625 
EnableTracing(mojo::PendingReceiver<mojom::TracingSessionHost> tracing_session_host,mojo::PendingRemote<mojom::TracingSessionClient> tracing_session_client,const perfetto::TraceConfig & trace_config)626 void ConsumerHost::EnableTracing(
627     mojo::PendingReceiver<mojom::TracingSessionHost> tracing_session_host,
628     mojo::PendingRemote<mojom::TracingSessionClient> tracing_session_client,
629     const perfetto::TraceConfig& trace_config) {
630   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
631   DCHECK(!tracing_session_);
632 
633   auto priority = mojom::TracingClientPriority::kUnknown;
634   for (const auto& data_source : trace_config.data_sources()) {
635     if (!data_source.has_config() ||
636         !data_source.config().has_chrome_config()) {
637       continue;
638     }
639     switch (data_source.config().chrome_config().client_priority()) {
640       case perfetto::protos::gen::ChromeConfig::BACKGROUND:
641         priority =
642             std::max(priority, mojom::TracingClientPriority::kBackground);
643         break;
644       case perfetto::protos::gen::ChromeConfig::USER_INITIATED:
645         priority =
646             std::max(priority, mojom::TracingClientPriority::kUserInitiated);
647         break;
648       default:
649       case perfetto::protos::gen::ChromeConfig::UNKNOWN:
650         break;
651     }
652   }
653 
654   // We create our new TracingSession async, if the PerfettoService allows
655   // us to, after it's stopped any currently running lower or equal priority
656   // tracing sessions.
657   service_->RequestTracingSession(
658       priority, base::BindOnce(
659                     [](base::WeakPtr<ConsumerHost> weak_this,
660                        mojo::PendingReceiver<mojom::TracingSessionHost>
661                            tracing_session_host,
662                        mojo::PendingRemote<mojom::TracingSessionClient>
663                            tracing_session_client,
664                        const perfetto::TraceConfig& trace_config,
665                        mojom::TracingClientPriority priority) {
666                       if (!weak_this) {
667                         return;
668                       }
669 
670                       weak_this->tracing_session_ =
671                           std::make_unique<TracingSession>(
672                               weak_this.get(), std::move(tracing_session_host),
673                               std::move(tracing_session_client), trace_config,
674                               priority);
675                     },
676                     weak_factory_.GetWeakPtr(), std::move(tracing_session_host),
677                     std::move(tracing_session_client), trace_config, priority));
678 }
679 
OnConnect()680 void ConsumerHost::OnConnect() {}
681 
OnDisconnect()682 void ConsumerHost::OnDisconnect() {}
683 
OnTracingDisabled(const std::string & error)684 void ConsumerHost::OnTracingDisabled(const std::string& error) {
685   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
686   if (tracing_session_) {
687     tracing_session_->OnTracingDisabled(error);
688   }
689 }
690 
OnTraceData(std::vector<perfetto::TracePacket> packets,bool has_more)691 void ConsumerHost::OnTraceData(std::vector<perfetto::TracePacket> packets,
692                                bool has_more) {
693   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
694   if (tracing_session_) {
695     tracing_session_->OnTraceData(std::move(packets), has_more);
696   }
697 }
698 
OnObservableEvents(const perfetto::ObservableEvents & events)699 void ConsumerHost::OnObservableEvents(
700     const perfetto::ObservableEvents& events) {
701   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
702   if (tracing_session_) {
703     tracing_session_->OnPerfettoEvents(events);
704   }
705 }
706 
OnTraceStats(bool success,const perfetto::TraceStats & stats)707 void ConsumerHost::OnTraceStats(bool success,
708                                 const perfetto::TraceStats& stats) {
709   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
710   if (tracing_session_) {
711     tracing_session_->OnTraceStats(success, stats);
712   }
713 }
714 
DestructTracingSession()715 void ConsumerHost::DestructTracingSession() {
716   tracing_session_.reset();
717 }
718 
719 }  // namespace tracing
720