1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/tracing_service_impl.h"
18 
19 #include "perfetto/base/build_config.h"
20 
21 #include <errno.h>
22 #include <inttypes.h>
23 #include <limits.h>
24 #include <string.h>
25 #include <regex>
26 #include <unordered_set>
27 
28 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
29     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
30 #include <sys/uio.h>
31 #include <sys/utsname.h>
32 #include <unistd.h>
33 #endif
34 
35 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
36 #include <sys/system_properties.h>
37 #endif
38 
39 #include <algorithm>
40 
41 #include "perfetto/base/build_config.h"
42 #include "perfetto/base/task_runner.h"
43 #include "perfetto/ext/base/file_utils.h"
44 #include "perfetto/ext/base/metatrace.h"
45 #include "perfetto/ext/base/utils.h"
46 #include "perfetto/ext/base/watchdog.h"
47 #include "perfetto/ext/tracing/core/consumer.h"
48 #include "perfetto/ext/tracing/core/observable_events.h"
49 #include "perfetto/ext/tracing/core/producer.h"
50 #include "perfetto/ext/tracing/core/shared_memory.h"
51 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
52 #include "perfetto/ext/tracing/core/trace_packet.h"
53 #include "perfetto/ext/tracing/core/trace_writer.h"
54 #include "perfetto/protozero/scattered_heap_buffer.h"
55 #include "perfetto/protozero/static_buffer.h"
56 #include "perfetto/tracing/core/data_source_descriptor.h"
57 #include "perfetto/tracing/core/tracing_service_state.h"
58 #include "src/tracing/core/packet_stream_validator.h"
59 #include "src/tracing/core/shared_memory_arbiter_impl.h"
60 #include "src/tracing/core/trace_buffer.h"
61 
62 #include "protos/perfetto/common/trace_stats.pbzero.h"
63 #include "protos/perfetto/config/trace_config.pbzero.h"
64 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
65 #include "protos/perfetto/trace/system_info.pbzero.h"
66 #include "protos/perfetto/trace/trace_packet.pbzero.h"
67 #include "protos/perfetto/trace/trigger.pbzero.h"
68 
69 // General note: this class must assume that Producers are malicious and will
70 // try to crash / exploit this class. We can trust pointers because they come
71 // from the IPC layer, but we should never assume that that the producer calls
72 // come in the right order or their arguments are sane / within bounds.
73 
74 namespace perfetto {
75 
76 namespace {
77 constexpr int kMaxBuffersPerConsumer = 128;
78 constexpr base::TimeMillis kSnapshotsInterval(10 * 1000);
79 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
80 constexpr int kMaxConcurrentTracingSessions = 15;
81 constexpr int kMaxConcurrentTracingSessionsPerUid = 5;
82 constexpr int64_t kMinSecondsBetweenTracesGuardrail = 5 * 60;
83 
84 constexpr uint32_t kMillisPerHour = 3600000;
85 constexpr uint32_t kMaxTracingDurationMillis = 7 * 24 * kMillisPerHour;
86 
87 // These apply only if enable_extra_guardrails is true.
88 constexpr uint32_t kGuardrailsMaxTracingBufferSizeKb = 128 * 1024;
89 constexpr uint32_t kGuardrailsMaxTracingDurationMillis = 24 * kMillisPerHour;
90 
91 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) || PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
92 struct iovec {
93   void* iov_base;  // Address
94   size_t iov_len;  // Block size
95 };
96 
97 // Simple implementation of writev. Note that this does not give the atomicity
98 // guarantees of a real writev, but we don't depend on these (we aren't writing
99 // to the same file from another thread).
writev(int fd,const struct iovec * iov,int iovcnt)100 ssize_t writev(int fd, const struct iovec* iov, int iovcnt) {
101   ssize_t total_size = 0;
102   for (int i = 0; i < iovcnt; ++i) {
103     ssize_t current_size = base::WriteAll(fd, iov[i].iov_base, iov[i].iov_len);
104     if (current_size != static_cast<ssize_t>(iov[i].iov_len))
105       return -1;
106     total_size += current_size;
107   }
108   return total_size;
109 }
110 
111 #define IOV_MAX 1024  // Linux compatible limit.
112 
113 // uid checking is a NOP on Windows.
getuid()114 uid_t getuid() {
115   return 0;
116 }
geteuid()117 uid_t geteuid() {
118   return 0;
119 }
120 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) ||
121         // PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
122 
123 // Partially encodes a CommitDataRequest in an int32 for the purposes of
124 // metatracing. Note that it encodes only the bottom 10 bits of the producer id
125 // (which is technically 16 bits wide).
126 //
127 // Format (by bit range):
128 // [   31 ][         30 ][             29:20 ][            19:10 ][        9:0]
129 // [unused][has flush id][num chunks to patch][num chunks to move][producer id]
EncodeCommitDataRequest(ProducerID producer_id,const CommitDataRequest & req_untrusted)130 static int32_t EncodeCommitDataRequest(ProducerID producer_id,
131                                        const CommitDataRequest& req_untrusted) {
132   uint32_t cmov = static_cast<uint32_t>(req_untrusted.chunks_to_move_size());
133   uint32_t cpatch = static_cast<uint32_t>(req_untrusted.chunks_to_patch_size());
134   uint32_t has_flush_id = req_untrusted.flush_request_id() != 0;
135 
136   uint32_t mask = (1 << 10) - 1;
137   uint32_t acc = 0;
138   acc |= has_flush_id << 30;
139   acc |= (cpatch & mask) << 20;
140   acc |= (cmov & mask) << 10;
141   acc |= (producer_id & mask);
142   return static_cast<int32_t>(acc);
143 }
144 
SerializeAndAppendPacket(std::vector<TracePacket> * packets,std::vector<uint8_t> packet)145 void SerializeAndAppendPacket(std::vector<TracePacket>* packets,
146                               std::vector<uint8_t> packet) {
147   Slice slice = Slice::Allocate(packet.size());
148   memcpy(slice.own_data(), packet.data(), packet.size());
149   packets->emplace_back();
150   packets->back().AddSlice(std::move(slice));
151 }
152 
EnsureValidShmSizes(size_t shm_size,size_t page_size)153 std::tuple<size_t /*shm_size*/, size_t /*page_size*/> EnsureValidShmSizes(
154     size_t shm_size,
155     size_t page_size) {
156   // Theoretically the max page size supported by the ABI is 64KB.
157   // However, the current implementation of TraceBuffer (the non-shared
158   // userspace buffer where the service copies data) supports at most
159   // 32K. Setting 64K "works" from the producer<>consumer viewpoint
160   // but then causes the data to be discarded when copying it into
161   // TraceBuffer.
162   constexpr size_t kMaxPageSize = 32 * 1024;
163   static_assert(kMaxPageSize <= SharedMemoryABI::kMaxPageSize, "");
164 
165   if (page_size == 0)
166     page_size = TracingServiceImpl::kDefaultShmPageSize;
167   if (shm_size == 0)
168     shm_size = TracingServiceImpl::kDefaultShmSize;
169 
170   page_size = std::min<size_t>(page_size, kMaxPageSize);
171   shm_size = std::min<size_t>(shm_size, TracingServiceImpl::kMaxShmSize);
172 
173   // Page size has to be multiple of system's page size.
174   bool page_size_is_valid = page_size >= base::kPageSize;
175   page_size_is_valid &= page_size % base::kPageSize == 0;
176 
177   // Only allow power of two numbers of pages, i.e. 1, 2, 4, 8 pages.
178   size_t num_pages = page_size / base::kPageSize;
179   page_size_is_valid &= (num_pages & (num_pages - 1)) == 0;
180 
181   if (!page_size_is_valid || shm_size < page_size ||
182       shm_size % page_size != 0) {
183     return std::make_tuple(TracingServiceImpl::kDefaultShmSize,
184                            TracingServiceImpl::kDefaultShmPageSize);
185   }
186   return std::make_tuple(shm_size, page_size);
187 }
188 
NameMatchesFilter(const std::string & name,const std::vector<std::string> & name_filter,const std::vector<std::string> & name_regex_filter)189 bool NameMatchesFilter(const std::string& name,
190                        const std::vector<std::string>& name_filter,
191                        const std::vector<std::string>& name_regex_filter) {
192   bool filter_is_set = !name_filter.empty() || !name_regex_filter.empty();
193   if (!filter_is_set)
194     return true;
195   bool filter_matches = std::find(name_filter.begin(), name_filter.end(),
196                                   name) != name_filter.end();
197   bool filter_regex_matches =
198       std::find_if(name_regex_filter.begin(), name_regex_filter.end(),
199                    [&](const std::string& regex) {
200                      return std::regex_match(
201                          name, std::regex(regex, std::regex::extended));
202                    }) != name_regex_filter.end();
203   return filter_matches || filter_regex_matches;
204 }
205 
206 }  // namespace
207 
208 // These constants instead are defined in the header because are used by tests.
209 constexpr size_t TracingServiceImpl::kDefaultShmSize;
210 constexpr size_t TracingServiceImpl::kDefaultShmPageSize;
211 
212 constexpr size_t TracingServiceImpl::kMaxShmSize;
213 constexpr uint32_t TracingServiceImpl::kDataSourceStopTimeoutMs;
214 constexpr uint8_t TracingServiceImpl::kSyncMarker[];
215 
216 // static
CreateInstance(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner)217 std::unique_ptr<TracingService> TracingService::CreateInstance(
218     std::unique_ptr<SharedMemory::Factory> shm_factory,
219     base::TaskRunner* task_runner) {
220   return std::unique_ptr<TracingService>(
221       new TracingServiceImpl(std::move(shm_factory), task_runner));
222 }
223 
TracingServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner)224 TracingServiceImpl::TracingServiceImpl(
225     std::unique_ptr<SharedMemory::Factory> shm_factory,
226     base::TaskRunner* task_runner)
227     : task_runner_(task_runner),
228       shm_factory_(std::move(shm_factory)),
229       uid_(getuid()),
230       buffer_ids_(kMaxTraceBufferID),
231       weak_ptr_factory_(this) {
232   PERFETTO_DCHECK(task_runner_);
233 }
234 
~TracingServiceImpl()235 TracingServiceImpl::~TracingServiceImpl() {
236   // TODO(fmayer): handle teardown of all Producer.
237 }
238 
239 std::unique_ptr<TracingService::ProducerEndpoint>
ConnectProducer(Producer * producer,uid_t uid,const std::string & producer_name,size_t shared_memory_size_hint_bytes,bool in_process,ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm)240 TracingServiceImpl::ConnectProducer(Producer* producer,
241                                     uid_t uid,
242                                     const std::string& producer_name,
243                                     size_t shared_memory_size_hint_bytes,
244                                     bool in_process,
245                                     ProducerSMBScrapingMode smb_scraping_mode,
246                                     size_t shared_memory_page_size_hint_bytes,
247                                     std::unique_ptr<SharedMemory> shm) {
248   PERFETTO_DCHECK_THREAD(thread_checker_);
249 
250   if (lockdown_mode_ && uid != geteuid()) {
251     PERFETTO_DLOG("Lockdown mode. Rejecting producer with UID %ld",
252                   static_cast<unsigned long>(uid));
253     return nullptr;
254   }
255 
256   if (producers_.size() >= kMaxProducerID) {
257     PERFETTO_DFATAL("Too many producers.");
258     return nullptr;
259   }
260   const ProducerID id = GetNextProducerID();
261   PERFETTO_DLOG("Producer %" PRIu16 " connected", id);
262 
263   bool smb_scraping_enabled = smb_scraping_enabled_;
264   switch (smb_scraping_mode) {
265     case ProducerSMBScrapingMode::kDefault:
266       break;
267     case ProducerSMBScrapingMode::kEnabled:
268       smb_scraping_enabled = true;
269       break;
270     case ProducerSMBScrapingMode::kDisabled:
271       smb_scraping_enabled = false;
272       break;
273   }
274 
275   std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
276       id, uid, this, task_runner_, producer, producer_name, in_process,
277       smb_scraping_enabled));
278   auto it_and_inserted = producers_.emplace(id, endpoint.get());
279   PERFETTO_DCHECK(it_and_inserted.second);
280   endpoint->shmem_size_hint_bytes_ = shared_memory_size_hint_bytes;
281   endpoint->shmem_page_size_hint_bytes_ = shared_memory_page_size_hint_bytes;
282 
283   // Producer::OnConnect() should run before Producer::OnTracingSetup(). The
284   // latter may be posted by SetupSharedMemory() below, so post OnConnect() now.
285   task_runner_->PostTask(std::bind(&Producer::OnConnect, endpoint->producer_));
286 
287   if (shm) {
288     // The producer supplied an SMB. This is used only by Chrome; in the most
289     // common cases the SMB is created by the service and passed via
290     // OnTracingSetup(). Verify that it is correctly sized before we attempt to
291     // use it. The transport layer has to verify the integrity of the SMB (e.g.
292     // ensure that the producer can't resize if after the fact).
293     size_t shm_size, page_size;
294     std::tie(shm_size, page_size) =
295         EnsureValidShmSizes(shm->size(), endpoint->shmem_page_size_hint_bytes_);
296     if (shm_size == shm->size() &&
297         page_size == endpoint->shmem_page_size_hint_bytes_) {
298       PERFETTO_DLOG(
299           "Adopting producer-provided SMB of %zu kB for producer \"%s\"",
300           shm_size / 1024, endpoint->name_.c_str());
301       endpoint->SetupSharedMemory(std::move(shm), page_size,
302                                   /*provided_by_producer=*/true);
303     } else {
304       PERFETTO_LOG(
305           "Discarding incorrectly sized producer-provided SMB for producer "
306           "\"%s\", falling back to service-provided SMB. Requested sizes: %zu "
307           "B total, %zu B page size; suggested corrected sizes: %zu B total, "
308           "%zu B page size",
309           endpoint->name_.c_str(), shm->size(),
310           endpoint->shmem_page_size_hint_bytes_, shm_size, page_size);
311       shm.reset();
312     }
313   }
314 
315   return std::unique_ptr<ProducerEndpoint>(std::move(endpoint));
316 }
317 
DisconnectProducer(ProducerID id)318 void TracingServiceImpl::DisconnectProducer(ProducerID id) {
319   PERFETTO_DCHECK_THREAD(thread_checker_);
320   PERFETTO_DLOG("Producer %" PRIu16 " disconnected", id);
321   PERFETTO_DCHECK(producers_.count(id));
322 
323   // Scrape remaining chunks for this producer to ensure we don't lose data.
324   if (auto* producer = GetProducer(id)) {
325     for (auto& session_id_and_session : tracing_sessions_)
326       ScrapeSharedMemoryBuffers(&session_id_and_session.second, producer);
327   }
328 
329   for (auto it = data_sources_.begin(); it != data_sources_.end();) {
330     auto next = it;
331     next++;
332     if (it->second.producer_id == id)
333       UnregisterDataSource(id, it->second.descriptor.name());
334     it = next;
335   }
336 
337   producers_.erase(id);
338   UpdateMemoryGuardrail();
339 }
340 
GetProducer(ProducerID id) const341 TracingServiceImpl::ProducerEndpointImpl* TracingServiceImpl::GetProducer(
342     ProducerID id) const {
343   PERFETTO_DCHECK_THREAD(thread_checker_);
344   auto it = producers_.find(id);
345   if (it == producers_.end())
346     return nullptr;
347   return it->second;
348 }
349 
350 std::unique_ptr<TracingService::ConsumerEndpoint>
ConnectConsumer(Consumer * consumer,uid_t uid)351 TracingServiceImpl::ConnectConsumer(Consumer* consumer, uid_t uid) {
352   PERFETTO_DCHECK_THREAD(thread_checker_);
353   PERFETTO_DLOG("Consumer %p connected", reinterpret_cast<void*>(consumer));
354   std::unique_ptr<ConsumerEndpointImpl> endpoint(
355       new ConsumerEndpointImpl(this, task_runner_, consumer, uid));
356   auto it_and_inserted = consumers_.emplace(endpoint.get());
357   PERFETTO_DCHECK(it_and_inserted.second);
358   // Consumer might go away before we're able to send the connect notification,
359   // if that is the case just bail out.
360   auto weak_ptr = endpoint->GetWeakPtr();
361   task_runner_->PostTask([weak_ptr] {
362     if (!weak_ptr) {
363       return;
364     }
365     weak_ptr->consumer_->OnConnect();
366   });
367   return std::unique_ptr<ConsumerEndpoint>(std::move(endpoint));
368 }
369 
DisconnectConsumer(ConsumerEndpointImpl * consumer)370 void TracingServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) {
371   PERFETTO_DCHECK_THREAD(thread_checker_);
372   PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer));
373   PERFETTO_DCHECK(consumers_.count(consumer));
374 
375   // TODO(primiano) : Check that this is safe (what happens if there are
376   // ReadBuffers() calls posted in the meantime? They need to become noop).
377   if (consumer->tracing_session_id_)
378     FreeBuffers(consumer->tracing_session_id_);  // Will also DisableTracing().
379   consumers_.erase(consumer);
380 
381   // At this point no more pointers to |consumer| should be around.
382   PERFETTO_DCHECK(!std::any_of(
383       tracing_sessions_.begin(), tracing_sessions_.end(),
384       [consumer](const std::pair<const TracingSessionID, TracingSession>& kv) {
385         return kv.second.consumer_maybe_null == consumer;
386       }));
387 }
388 
DetachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)389 bool TracingServiceImpl::DetachConsumer(ConsumerEndpointImpl* consumer,
390                                         const std::string& key) {
391   PERFETTO_DCHECK_THREAD(thread_checker_);
392   PERFETTO_DLOG("Consumer %p detached", reinterpret_cast<void*>(consumer));
393   PERFETTO_DCHECK(consumers_.count(consumer));
394 
395   TracingSessionID tsid = consumer->tracing_session_id_;
396   TracingSession* tracing_session;
397   if (!tsid || !(tracing_session = GetTracingSession(tsid)))
398     return false;
399 
400   if (GetDetachedSession(consumer->uid_, key)) {
401     PERFETTO_ELOG("Another session has been detached with the same key \"%s\"",
402                   key.c_str());
403     return false;
404   }
405 
406   PERFETTO_DCHECK(tracing_session->consumer_maybe_null == consumer);
407   tracing_session->consumer_maybe_null = nullptr;
408   tracing_session->detach_key = key;
409   consumer->tracing_session_id_ = 0;
410   return true;
411 }
412 
AttachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)413 bool TracingServiceImpl::AttachConsumer(ConsumerEndpointImpl* consumer,
414                                         const std::string& key) {
415   PERFETTO_DCHECK_THREAD(thread_checker_);
416   PERFETTO_DLOG("Consumer %p attaching to session %s",
417                 reinterpret_cast<void*>(consumer), key.c_str());
418   PERFETTO_DCHECK(consumers_.count(consumer));
419 
420   if (consumer->tracing_session_id_) {
421     PERFETTO_ELOG(
422         "Cannot reattach consumer to session %s"
423         " while it already attached tracing session ID %" PRIu64,
424         key.c_str(), consumer->tracing_session_id_);
425     return false;
426   }
427 
428   auto* tracing_session = GetDetachedSession(consumer->uid_, key);
429   if (!tracing_session) {
430     PERFETTO_ELOG(
431         "Failed to attach consumer, session '%s' not found for uid %d",
432         key.c_str(), static_cast<int>(consumer->uid_));
433     return false;
434   }
435 
436   consumer->tracing_session_id_ = tracing_session->id;
437   tracing_session->consumer_maybe_null = consumer;
438   tracing_session->detach_key.clear();
439   return true;
440 }
441 
EnableTracing(ConsumerEndpointImpl * consumer,const TraceConfig & cfg,base::ScopedFile fd)442 bool TracingServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
443                                        const TraceConfig& cfg,
444                                        base::ScopedFile fd) {
445   PERFETTO_DCHECK_THREAD(thread_checker_);
446   PERFETTO_DLOG("Enabling tracing for consumer %p",
447                 reinterpret_cast<void*>(consumer));
448   if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_SET)
449     lockdown_mode_ = true;
450   if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_CLEAR)
451     lockdown_mode_ = false;
452   TracingSession* tracing_session =
453       GetTracingSession(consumer->tracing_session_id_);
454   if (tracing_session) {
455     PERFETTO_DLOG(
456         "A Consumer is trying to EnableTracing() but another tracing session "
457         "is already active (forgot a call to FreeBuffers() ?)");
458     return false;
459   }
460 
461   const uint32_t max_duration_ms = cfg.enable_extra_guardrails()
462                                        ? kGuardrailsMaxTracingDurationMillis
463                                        : kMaxTracingDurationMillis;
464   if (cfg.duration_ms() > max_duration_ms) {
465     PERFETTO_ELOG("Requested too long trace (%" PRIu32 "ms  > %" PRIu32 " ms)",
466                   cfg.duration_ms(), max_duration_ms);
467     return false;
468   }
469 
470   const bool has_trigger_config = cfg.trigger_config().trigger_mode() !=
471                                   TraceConfig::TriggerConfig::UNSPECIFIED;
472   if (has_trigger_config && (cfg.trigger_config().trigger_timeout_ms() == 0 ||
473                              cfg.trigger_config().trigger_timeout_ms() >
474                                  kGuardrailsMaxTracingDurationMillis)) {
475     PERFETTO_ELOG(
476         "Traces with START_TRACING triggers must provide a positive "
477         "trigger_timeout_ms < 7 days (received %" PRIu32 "ms)",
478         cfg.trigger_config().trigger_timeout_ms());
479     return false;
480   }
481 
482   if (has_trigger_config && cfg.duration_ms() != 0) {
483     PERFETTO_ELOG(
484         "duration_ms was set, this must not be set for traces with triggers.");
485     return false;
486   }
487 
488   std::unordered_set<std::string> triggers;
489   for (const auto& trigger : cfg.trigger_config().triggers()) {
490     if (!triggers.insert(trigger.name()).second) {
491       PERFETTO_ELOG("Duplicate trigger name: %s", trigger.name().c_str());
492       return false;
493     }
494   }
495 
496   if (cfg.enable_extra_guardrails()) {
497     if (cfg.deferred_start()) {
498       PERFETTO_ELOG(
499           "deferred_start=true is not supported in unsupervised traces");
500       return false;
501     }
502     uint64_t buf_size_sum = 0;
503     for (const auto& buf : cfg.buffers())
504       buf_size_sum += buf.size_kb();
505     if (buf_size_sum > kGuardrailsMaxTracingBufferSizeKb) {
506       PERFETTO_ELOG("Requested too large trace buffer (%" PRIu64
507                     "kB  > %" PRIu32 " kB)",
508                     buf_size_sum, kGuardrailsMaxTracingBufferSizeKb);
509       return false;
510     }
511   }
512 
513   if (cfg.buffers_size() > kMaxBuffersPerConsumer) {
514     PERFETTO_ELOG("Too many buffers configured (%d)", cfg.buffers_size());
515     return false;
516   }
517 
518   if (!cfg.unique_session_name().empty()) {
519     const std::string& name = cfg.unique_session_name();
520     for (auto& kv : tracing_sessions_) {
521       if (kv.second.config.unique_session_name() == name) {
522         PERFETTO_ELOG(
523             "A trace with this unique session name (%s) already exists",
524             name.c_str());
525         return false;
526       }
527     }
528   }
529 
530   if (cfg.enable_extra_guardrails()) {
531     // unique_session_name can be empty
532     const std::string& name = cfg.unique_session_name();
533     int64_t now_s = base::GetBootTimeS().count();
534 
535     // Remove any entries where the time limit has passed so this map doesn't
536     // grow indefinitely:
537     std::map<std::string, int64_t>& sessions = session_to_last_trace_s_;
538     for (auto it = sessions.cbegin(); it != sessions.cend();) {
539       if (now_s - it->second > kMinSecondsBetweenTracesGuardrail) {
540         it = sessions.erase(it);
541       } else {
542         ++it;
543       }
544     }
545 
546     int64_t& previous_s = session_to_last_trace_s_[name];
547     if (previous_s == 0) {
548       previous_s = now_s;
549     } else {
550       PERFETTO_ELOG(
551           "A trace with unique session name \"%s\" began less than %" PRId64
552           "s ago (%" PRId64 "s)",
553           name.c_str(), kMinSecondsBetweenTracesGuardrail, now_s - previous_s);
554       return false;
555     }
556   }
557 
558   const long sessions_for_uid = std::count_if(
559       tracing_sessions_.begin(), tracing_sessions_.end(),
560       [consumer](const decltype(tracing_sessions_)::value_type& s) {
561         return s.second.consumer_uid == consumer->uid_;
562       });
563   if (sessions_for_uid >= kMaxConcurrentTracingSessionsPerUid) {
564     PERFETTO_ELOG("Too many concurrent tracing sesions (%ld) for uid %d",
565                   sessions_for_uid, static_cast<int>(consumer->uid_));
566     return false;
567   }
568 
569   // TODO(primiano): This is a workaround to prevent that a producer gets stuck
570   // in a state where it stalls by design by having more TraceWriterImpl
571   // instances than free pages in the buffer. This is really a bug in
572   // trace_probes and the way it handles stalls in the shmem buffer.
573   if (tracing_sessions_.size() >= kMaxConcurrentTracingSessions) {
574     PERFETTO_ELOG("Too many concurrent tracing sesions (%zu)",
575                   tracing_sessions_.size());
576     return false;
577   }
578 
579   const TracingSessionID tsid = ++last_tracing_session_id_;
580   tracing_session =
581       &tracing_sessions_.emplace(tsid, TracingSession(tsid, consumer, cfg))
582            .first->second;
583 
584   if (cfg.write_into_file()) {
585     if (!fd) {
586       PERFETTO_ELOG(
587           "The TraceConfig had write_into_file==true but no fd was passed");
588       tracing_sessions_.erase(tsid);
589       return false;
590     }
591     tracing_session->write_into_file = std::move(fd);
592     uint32_t write_period_ms = cfg.file_write_period_ms();
593     if (write_period_ms == 0)
594       write_period_ms = kDefaultWriteIntoFilePeriodMs;
595     if (write_period_ms < min_write_period_ms_)
596       write_period_ms = min_write_period_ms_;
597     tracing_session->write_period_ms = write_period_ms;
598     tracing_session->max_file_size_bytes = cfg.max_file_size_bytes();
599     tracing_session->bytes_written_into_file = 0;
600   }
601 
602   // Initialize the log buffers.
603   bool did_allocate_all_buffers = true;
604 
605   // Allocate the trace buffers. Also create a map to translate a consumer
606   // relative index (TraceConfig.DataSourceConfig.target_buffer) into the
607   // corresponding BufferID, which is a global ID namespace for the service and
608   // all producers.
609   size_t total_buf_size_kb = 0;
610   const size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
611   tracing_session->buffers_index.reserve(num_buffers);
612   for (size_t i = 0; i < num_buffers; i++) {
613     const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i];
614     BufferID global_id = buffer_ids_.Allocate();
615     if (!global_id) {
616       did_allocate_all_buffers = false;  // We ran out of IDs.
617       break;
618     }
619     tracing_session->buffers_index.push_back(global_id);
620     const size_t buf_size_bytes = buffer_cfg.size_kb() * 1024u;
621     total_buf_size_kb += buffer_cfg.size_kb();
622     TraceBuffer::OverwritePolicy policy =
623         buffer_cfg.fill_policy() == TraceConfig::BufferConfig::DISCARD
624             ? TraceBuffer::kDiscard
625             : TraceBuffer::kOverwrite;
626     auto it_and_inserted = buffers_.emplace(
627         global_id, TraceBuffer::Create(buf_size_bytes, policy));
628     PERFETTO_DCHECK(it_and_inserted.second);  // buffers_.count(global_id) == 0.
629     std::unique_ptr<TraceBuffer>& trace_buffer = it_and_inserted.first->second;
630     if (!trace_buffer) {
631       did_allocate_all_buffers = false;
632       break;
633     }
634   }
635 
636   UpdateMemoryGuardrail();
637 
638   // This can happen if either:
639   // - All the kMaxTraceBufferID slots are taken.
640   // - OOM, or, more relistically, we exhausted virtual memory.
641   // In any case, free all the previously allocated buffers and abort.
642   // TODO(fmayer): add a test to cover this case, this is quite subtle.
643   if (!did_allocate_all_buffers) {
644     for (BufferID global_id : tracing_session->buffers_index) {
645       buffer_ids_.Free(global_id);
646       buffers_.erase(global_id);
647     }
648     tracing_sessions_.erase(tsid);
649     return false;
650   }
651 
652   consumer->tracing_session_id_ = tsid;
653 
654   // Setup the data sources on the producers without starting them.
655   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
656     // Scan all the registered data sources with a matching name.
657     auto range = data_sources_.equal_range(cfg_data_source.config().name());
658     for (auto it = range.first; it != range.second; it++) {
659       TraceConfig::ProducerConfig producer_config;
660       for (auto& config : cfg.producers()) {
661         if (GetProducer(it->second.producer_id)->name_ ==
662             config.producer_name()) {
663           producer_config = config;
664           break;
665         }
666       }
667       SetupDataSource(cfg_data_source, producer_config, it->second,
668                       tracing_session);
669     }
670   }
671 
672   bool has_start_trigger = false;
673   auto weak_this = weak_ptr_factory_.GetWeakPtr();
674   switch (cfg.trigger_config().trigger_mode()) {
675     case TraceConfig::TriggerConfig::UNSPECIFIED:
676       // no triggers are specified so this isn't a trace that is using triggers.
677       PERFETTO_DCHECK(!has_trigger_config);
678       break;
679     case TraceConfig::TriggerConfig::START_TRACING:
680       // For traces which use START_TRACE triggers we need to ensure that the
681       // tracing session will be cleaned up when it times out.
682       has_start_trigger = true;
683       task_runner_->PostDelayedTask(
684           [weak_this, tsid]() {
685             if (weak_this)
686               weak_this->OnStartTriggersTimeout(tsid);
687           },
688           cfg.trigger_config().trigger_timeout_ms());
689       break;
690     case TraceConfig::TriggerConfig::STOP_TRACING:
691       // Update the tracing_session's duration_ms to ensure that if no trigger
692       // is received the session will end and be cleaned up equal to the
693       // timeout.
694       //
695       // TODO(nuskos): Refactor this so that rather then modifying the config we
696       // have a field we look at on the tracing_session.
697       tracing_session->config.set_duration_ms(
698           cfg.trigger_config().trigger_timeout_ms());
699       break;
700   }
701 
702   tracing_session->state = TracingSession::CONFIGURED;
703   PERFETTO_LOG(
704       "Configured tracing session %" PRIu64
705       ", #sources:%zu, duration:%d ms, #buffers:%d, total "
706       "buffer size:%zu KB, total sessions:%zu, uid:%d session name: \"%s\"",
707       tsid, cfg.data_sources().size(), tracing_session->config.duration_ms(),
708       cfg.buffers_size(), total_buf_size_kb, tracing_sessions_.size(),
709       static_cast<unsigned int>(consumer->uid_),
710       cfg.unique_session_name().c_str());
711 
712   // Start the data sources, unless this is a case of early setup + fast
713   // triggering, either through TraceConfig.deferred_start or
714   // TraceConfig.trigger_config(). If both are specified which ever one occurs
715   // first will initiate the trace.
716   if (!cfg.deferred_start() && !has_start_trigger)
717     return StartTracing(tsid);
718 
719   return true;
720 }
721 
ChangeTraceConfig(ConsumerEndpointImpl * consumer,const TraceConfig & updated_cfg)722 void TracingServiceImpl::ChangeTraceConfig(ConsumerEndpointImpl* consumer,
723                                            const TraceConfig& updated_cfg) {
724   PERFETTO_DCHECK_THREAD(thread_checker_);
725   TracingSession* tracing_session =
726       GetTracingSession(consumer->tracing_session_id_);
727   PERFETTO_DCHECK(tracing_session);
728 
729   if ((tracing_session->state != TracingSession::STARTED) &&
730       (tracing_session->state != TracingSession::CONFIGURED)) {
731     PERFETTO_ELOG(
732         "ChangeTraceConfig() was called for a tracing session which isn't "
733         "running.");
734     return;
735   }
736 
737   // We only support updating producer_name_{,regex}_filter (and pass-through
738   // configs) for now; null out any changeable fields and make sure the rest are
739   // identical.
740   TraceConfig new_config_copy(updated_cfg);
741   for (auto& ds_cfg : *new_config_copy.mutable_data_sources()) {
742     ds_cfg.clear_producer_name_filter();
743     ds_cfg.clear_producer_name_regex_filter();
744   }
745 
746   TraceConfig current_config_copy(tracing_session->config);
747   for (auto& ds_cfg : *current_config_copy.mutable_data_sources()) {
748     ds_cfg.clear_producer_name_filter();
749     ds_cfg.clear_producer_name_regex_filter();
750   }
751 
752   if (new_config_copy != current_config_copy) {
753     PERFETTO_LOG(
754         "ChangeTraceConfig() was called with a config containing unsupported "
755         "changes; only adding to the producer_name_{,regex}_filter is "
756         "currently supported and will have an effect.");
757   }
758 
759   for (TraceConfig::DataSource& cfg_data_source :
760        *tracing_session->config.mutable_data_sources()) {
761     // Find the updated producer_filter in the new config.
762     std::vector<std::string> new_producer_name_filter;
763     std::vector<std::string> new_producer_name_regex_filter;
764     bool found_data_source = false;
765     for (auto it : updated_cfg.data_sources()) {
766       if (cfg_data_source.config().name() == it.config().name()) {
767         new_producer_name_filter = it.producer_name_filter();
768         new_producer_name_regex_filter = it.producer_name_regex_filter();
769         found_data_source = true;
770         break;
771       }
772     }
773 
774     // Bail out if data source not present in the new config.
775     if (!found_data_source) {
776       PERFETTO_ELOG(
777           "ChangeTraceConfig() called without a current data source also "
778           "present in the new config: %s",
779           cfg_data_source.config().name().c_str());
780       continue;
781     }
782 
783     // TODO(oysteine): Just replacing the filter means that if
784     // there are any filter entries which were present in the original config,
785     // but removed from the config passed to ChangeTraceConfig, any matching
786     // producers will keep producing but newly added producers after this
787     // point will never start.
788     *cfg_data_source.mutable_producer_name_filter() = new_producer_name_filter;
789     *cfg_data_source.mutable_producer_name_regex_filter() =
790         new_producer_name_regex_filter;
791 
792     // Scan all the registered data sources with a matching name.
793     auto range = data_sources_.equal_range(cfg_data_source.config().name());
794     for (auto it = range.first; it != range.second; it++) {
795       ProducerEndpointImpl* producer = GetProducer(it->second.producer_id);
796       PERFETTO_DCHECK(producer);
797 
798       // Check if the producer name of this data source is present
799       // in the name filters. We currently only support new filters, not
800       // removing old ones.
801       if (!NameMatchesFilter(producer->name_, new_producer_name_filter,
802                              new_producer_name_regex_filter)) {
803         continue;
804       }
805 
806       bool already_setup = false;
807       auto& ds_instances = tracing_session->data_source_instances;
808       for (auto instance_it = ds_instances.begin();
809            instance_it != ds_instances.end(); ++instance_it) {
810         if (instance_it->first == it->second.producer_id &&
811             instance_it->second.data_source_name ==
812                 cfg_data_source.config().name()) {
813           already_setup = true;
814           break;
815         }
816       }
817 
818       if (already_setup)
819         continue;
820 
821       // If it wasn't previously setup, set it up now.
822       // (The per-producer config is optional).
823       TraceConfig::ProducerConfig producer_config;
824       for (auto& config : tracing_session->config.producers()) {
825         if (producer->name_ == config.producer_name()) {
826           producer_config = config;
827           break;
828         }
829       }
830 
831       DataSourceInstance* ds_inst = SetupDataSource(
832           cfg_data_source, producer_config, it->second, tracing_session);
833 
834       if (ds_inst && tracing_session->state == TracingSession::STARTED)
835         StartDataSourceInstance(producer, tracing_session, ds_inst);
836     }
837   }
838 }
839 
StartTracing(TracingSessionID tsid)840 bool TracingServiceImpl::StartTracing(TracingSessionID tsid) {
841   PERFETTO_DCHECK_THREAD(thread_checker_);
842   TracingSession* tracing_session = GetTracingSession(tsid);
843   if (!tracing_session) {
844     PERFETTO_DLOG("StartTracing() failed, invalid session ID %" PRIu64, tsid);
845     return false;
846   }
847 
848   if (tracing_session->state != TracingSession::CONFIGURED) {
849     PERFETTO_DLOG("StartTracing() failed, invalid session state: %d",
850                   tracing_session->state);
851     return false;
852   }
853 
854   tracing_session->state = TracingSession::STARTED;
855 
856   if (!tracing_session->config.builtin_data_sources()
857            .disable_clock_snapshotting()) {
858     SnapshotClocks(&tracing_session->initial_clock_snapshot_,
859                    /*set_root_timestamp=*/true);
860   }
861 
862   // Trigger delayed task if the trace is time limited.
863   const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
864   if (trace_duration_ms > 0) {
865     auto weak_this = weak_ptr_factory_.GetWeakPtr();
866     task_runner_->PostDelayedTask(
867         [weak_this, tsid] {
868           // Skip entirely the flush if the trace session doesn't exist anymore.
869           // This is to prevent misleading error messages to be logged.
870           if (!weak_this)
871             return;
872           auto* tracing_session_ptr = weak_this->GetTracingSession(tsid);
873           if (!tracing_session_ptr)
874             return;
875           // If this trace was using STOP_TRACING triggers and we've seen
876           // one, then the trigger overrides the normal timeout. In this
877           // case we just return and let the other task clean up this trace.
878           if (tracing_session_ptr->config.trigger_config().trigger_mode() ==
879                   TraceConfig::TriggerConfig::STOP_TRACING &&
880               !tracing_session_ptr->received_triggers.empty())
881             return;
882           // In all other cases (START_TRACING or no triggers) we flush
883           // after |trace_duration_ms| unconditionally.
884           weak_this->FlushAndDisableTracing(tsid);
885         },
886         trace_duration_ms);
887   }
888 
889   // Start the periodic drain tasks if we should to save the trace into a file.
890   if (tracing_session->config.write_into_file()) {
891     auto weak_this = weak_ptr_factory_.GetWeakPtr();
892     task_runner_->PostDelayedTask(
893         [weak_this, tsid] {
894           if (weak_this)
895             weak_this->ReadBuffers(tsid, nullptr);
896         },
897         tracing_session->delay_to_next_write_period_ms());
898   }
899 
900   // Start the periodic flush tasks if the config specified a flush period.
901   if (tracing_session->config.flush_period_ms())
902     PeriodicFlushTask(tsid, /*post_next_only=*/true);
903 
904   // Start the periodic incremental state clear tasks if the config specified a
905   // period.
906   if (tracing_session->config.incremental_state_config().clear_period_ms()) {
907     PeriodicClearIncrementalStateTask(tsid, /*post_next_only=*/true);
908   }
909 
910   for (auto& kv : tracing_session->data_source_instances) {
911     ProducerID producer_id = kv.first;
912     DataSourceInstance& data_source = kv.second;
913     ProducerEndpointImpl* producer = GetProducer(producer_id);
914     if (!producer) {
915       PERFETTO_DFATAL("Producer does not exist.");
916       continue;
917     }
918     StartDataSourceInstance(producer, tracing_session, &data_source);
919   }
920   return true;
921 }
922 
StartDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,TracingServiceImpl::DataSourceInstance * instance)923 void TracingServiceImpl::StartDataSourceInstance(
924     ProducerEndpointImpl* producer,
925     TracingSession* tracing_session,
926     TracingServiceImpl::DataSourceInstance* instance) {
927   PERFETTO_DCHECK(instance->state == DataSourceInstance::CONFIGURED);
928   if (instance->will_notify_on_start) {
929     instance->state = DataSourceInstance::STARTING;
930   } else {
931     instance->state = DataSourceInstance::STARTED;
932   }
933   if (tracing_session->consumer_maybe_null) {
934     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
935         *producer, *instance);
936   }
937   producer->StartDataSource(instance->instance_id, instance->config);
938 }
939 
940 // DisableTracing just stops the data sources but doesn't free up any buffer.
941 // This is to allow the consumer to freeze the buffers (by stopping the trace)
942 // and then drain the buffers. The actual teardown of the TracingSession happens
943 // in FreeBuffers().
DisableTracing(TracingSessionID tsid,bool disable_immediately)944 void TracingServiceImpl::DisableTracing(TracingSessionID tsid,
945                                         bool disable_immediately) {
946   PERFETTO_DCHECK_THREAD(thread_checker_);
947   TracingSession* tracing_session = GetTracingSession(tsid);
948   if (!tracing_session) {
949     // Can happen if the consumer calls this before EnableTracing() or after
950     // FreeBuffers().
951     PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid);
952     return;
953   }
954 
955   switch (tracing_session->state) {
956     // Spurious call to DisableTracing() while already disabled, nothing to do.
957     case TracingSession::DISABLED:
958       PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
959       return;
960 
961     // This is either:
962     // A) The case of a graceful DisableTracing() call followed by a call to
963     //    FreeBuffers(), iff |disable_immediately| == true. In this case we want
964     //    to forcefully transition in the disabled state without waiting for the
965     //    outstanding acks because the buffers are going to be destroyed soon.
966     // B) A spurious call, iff |disable_immediately| == false, in which case
967     //    there is nothing to do.
968     case TracingSession::DISABLING_WAITING_STOP_ACKS:
969       PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
970       if (disable_immediately)
971         DisableTracingNotifyConsumerAndFlushFile(tracing_session);
972       return;
973 
974     // Continues below.
975     case TracingSession::CONFIGURED:
976       // If the session didn't even start there is no need to orchestrate a
977       // graceful stop of data sources.
978       disable_immediately = true;
979       break;
980 
981     // This is the nominal case, continues below.
982     case TracingSession::STARTED:
983       break;
984   }
985 
986   for (auto& data_source_inst : tracing_session->data_source_instances) {
987     const ProducerID producer_id = data_source_inst.first;
988     DataSourceInstance& instance = data_source_inst.second;
989     ProducerEndpointImpl* producer = GetProducer(producer_id);
990     PERFETTO_DCHECK(producer);
991     PERFETTO_DCHECK(instance.state == DataSourceInstance::CONFIGURED ||
992                     instance.state == DataSourceInstance::STARTING ||
993                     instance.state == DataSourceInstance::STARTED);
994     StopDataSourceInstance(producer, tracing_session, &instance,
995                            disable_immediately);
996   }
997 
998   // Either this request is flagged with |disable_immediately| or there are no
999   // data sources that are requesting a final handshake. In both cases just mark
1000   // the session as disabled immediately, notify the consumer and flush the
1001   // trace file (if used).
1002   if (tracing_session->AllDataSourceInstancesStopped())
1003     return DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1004 
1005   tracing_session->state = TracingSession::DISABLING_WAITING_STOP_ACKS;
1006   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1007   task_runner_->PostDelayedTask(
1008       [weak_this, tsid] {
1009         if (weak_this)
1010           weak_this->OnDisableTracingTimeout(tsid);
1011       },
1012       tracing_session->data_source_stop_timeout_ms());
1013 
1014   // Deliberately NOT removing the session from |tracing_session_|, it's still
1015   // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
1016 }
1017 
NotifyDataSourceStarted(ProducerID producer_id,DataSourceInstanceID instance_id)1018 void TracingServiceImpl::NotifyDataSourceStarted(
1019     ProducerID producer_id,
1020     DataSourceInstanceID instance_id) {
1021   PERFETTO_DCHECK_THREAD(thread_checker_);
1022   for (auto& kv : tracing_sessions_) {
1023     TracingSession& tracing_session = kv.second;
1024     DataSourceInstance* instance =
1025         tracing_session.GetDataSourceInstance(producer_id, instance_id);
1026 
1027     if (!instance)
1028       continue;
1029 
1030     // If the tracing session was already stopped, ignore this notification.
1031     if (tracing_session.state != TracingSession::STARTED)
1032       continue;
1033 
1034     if (instance->state != DataSourceInstance::STARTING) {
1035       PERFETTO_ELOG("Started data source instance in incorrect state: %d",
1036                     instance->state);
1037       continue;
1038     }
1039 
1040     instance->state = DataSourceInstance::STARTED;
1041 
1042     ProducerEndpointImpl* producer = GetProducer(producer_id);
1043     PERFETTO_DCHECK(producer);
1044     if (tracing_session.consumer_maybe_null) {
1045       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1046           *producer, *instance);
1047     }
1048   }  // for (tracing_session)
1049 }
1050 
NotifyDataSourceStopped(ProducerID producer_id,DataSourceInstanceID instance_id)1051 void TracingServiceImpl::NotifyDataSourceStopped(
1052     ProducerID producer_id,
1053     DataSourceInstanceID instance_id) {
1054   PERFETTO_DCHECK_THREAD(thread_checker_);
1055   for (auto& kv : tracing_sessions_) {
1056     TracingSession& tracing_session = kv.second;
1057     DataSourceInstance* instance =
1058         tracing_session.GetDataSourceInstance(producer_id, instance_id);
1059 
1060     if (!instance)
1061       continue;
1062 
1063     if (instance->state != DataSourceInstance::STOPPING) {
1064       PERFETTO_ELOG("Stopped data source instance in incorrect state: %d",
1065                     instance->state);
1066       continue;
1067     }
1068 
1069     instance->state = DataSourceInstance::STOPPED;
1070 
1071     ProducerEndpointImpl* producer = GetProducer(producer_id);
1072     PERFETTO_DCHECK(producer);
1073     if (tracing_session.consumer_maybe_null) {
1074       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1075           *producer, *instance);
1076     }
1077 
1078     if (!tracing_session.AllDataSourceInstancesStopped())
1079       continue;
1080 
1081     if (tracing_session.state != TracingSession::DISABLING_WAITING_STOP_ACKS)
1082       continue;
1083 
1084     // All data sources acked the termination.
1085     DisableTracingNotifyConsumerAndFlushFile(&tracing_session);
1086   }  // for (tracing_session)
1087 }
1088 
ActivateTriggers(ProducerID producer_id,const std::vector<std::string> & triggers)1089 void TracingServiceImpl::ActivateTriggers(
1090     ProducerID producer_id,
1091     const std::vector<std::string>& triggers) {
1092   PERFETTO_DCHECK_THREAD(thread_checker_);
1093   auto* producer = GetProducer(producer_id);
1094   PERFETTO_DCHECK(producer);
1095   for (const auto& trigger_name : triggers) {
1096     for (auto& id_and_tracing_session : tracing_sessions_) {
1097       auto& tracing_session = id_and_tracing_session.second;
1098       TracingSessionID tsid = id_and_tracing_session.first;
1099       auto iter = std::find_if(
1100           tracing_session.config.trigger_config().triggers().begin(),
1101           tracing_session.config.trigger_config().triggers().end(),
1102           [&trigger_name](const TraceConfig::TriggerConfig::Trigger& trigger) {
1103             return trigger.name() == trigger_name;
1104           });
1105       if (iter == tracing_session.config.trigger_config().triggers().end()) {
1106         continue;
1107       }
1108 
1109       // If this trigger requires a certain producer to have sent it
1110       // (non-empty producer_name()) ensure the producer who sent this trigger
1111       // matches.
1112       if (!iter->producer_name_regex().empty() &&
1113           !std::regex_match(
1114               producer->name_,
1115               std::regex(iter->producer_name_regex(), std::regex::extended))) {
1116         continue;
1117       }
1118 
1119       const bool triggers_already_received =
1120           !tracing_session.received_triggers.empty();
1121       tracing_session.received_triggers.push_back(
1122           {static_cast<uint64_t>(base::GetBootTimeNs().count()), iter->name(),
1123            producer->name_, producer->uid_});
1124       auto weak_this = weak_ptr_factory_.GetWeakPtr();
1125       switch (tracing_session.config.trigger_config().trigger_mode()) {
1126         case TraceConfig::TriggerConfig::START_TRACING:
1127           // If the session has already been triggered and moved past
1128           // CONFIGURED then we don't need to repeat StartTracing. This would
1129           // work fine (StartTracing would return false) but would add error
1130           // logs.
1131           if (tracing_session.state != TracingSession::CONFIGURED)
1132             break;
1133 
1134           PERFETTO_DLOG("Triggering '%s' on tracing session %" PRIu64
1135                         " with duration of %" PRIu32 "ms.",
1136                         iter->name().c_str(), tsid, iter->stop_delay_ms());
1137           // We override the trace duration to be the trigger's requested
1138           // value, this ensures that the trace will end after this amount
1139           // of time has passed.
1140           tracing_session.config.set_duration_ms(iter->stop_delay_ms());
1141           StartTracing(tsid);
1142           break;
1143         case TraceConfig::TriggerConfig::STOP_TRACING:
1144           // Only stop the trace once to avoid confusing log messages. I.E.
1145           // when we've already hit the first trigger we've already Posted the
1146           // task to FlushAndDisable. So all future triggers will just break
1147           // out.
1148           if (triggers_already_received)
1149             break;
1150 
1151           PERFETTO_DLOG("Triggering '%s' on tracing session %" PRIu64
1152                         " with duration of %" PRIu32 "ms.",
1153                         iter->name().c_str(), tsid, iter->stop_delay_ms());
1154           // Now that we've seen a trigger we need to stop, flush, and disable
1155           // this session after the configured |stop_delay_ms|.
1156           task_runner_->PostDelayedTask(
1157               [weak_this, tsid] {
1158                 // Skip entirely the flush if the trace session doesn't exist
1159                 // anymore. This is to prevent misleading error messages to be
1160                 // logged.
1161                 if (weak_this && weak_this->GetTracingSession(tsid))
1162                   weak_this->FlushAndDisableTracing(tsid);
1163               },
1164               // If this trigger is zero this will immediately executable and
1165               // will happen shortly.
1166               iter->stop_delay_ms());
1167           break;
1168         case TraceConfig::TriggerConfig::UNSPECIFIED:
1169           PERFETTO_ELOG("Trigger activated but trigger mode unspecified.");
1170           break;
1171       }
1172     }
1173   }
1174 }
1175 
1176 // Always invoked kDataSourceStopTimeoutMs after DisableTracing(). In nominal
1177 // conditions all data sources should have acked the stop and this will early
1178 // out.
OnDisableTracingTimeout(TracingSessionID tsid)1179 void TracingServiceImpl::OnDisableTracingTimeout(TracingSessionID tsid) {
1180   PERFETTO_DCHECK_THREAD(thread_checker_);
1181   TracingSession* tracing_session = GetTracingSession(tsid);
1182   if (!tracing_session ||
1183       tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
1184     return;  // Tracing session was successfully disabled.
1185   }
1186 
1187   PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
1188                 tsid);
1189   PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1190   DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1191 }
1192 
DisableTracingNotifyConsumerAndFlushFile(TracingSession * tracing_session)1193 void TracingServiceImpl::DisableTracingNotifyConsumerAndFlushFile(
1194     TracingSession* tracing_session) {
1195   PERFETTO_DCHECK(tracing_session->state != TracingSession::DISABLED);
1196   for (auto& inst_kv : tracing_session->data_source_instances) {
1197     if (inst_kv.second.state == DataSourceInstance::STOPPED)
1198       continue;
1199     inst_kv.second.state = DataSourceInstance::STOPPED;
1200     ProducerEndpointImpl* producer = GetProducer(inst_kv.first);
1201     PERFETTO_DCHECK(producer);
1202     if (tracing_session->consumer_maybe_null) {
1203       tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1204           *producer, inst_kv.second);
1205     }
1206   }
1207   tracing_session->state = TracingSession::DISABLED;
1208 
1209   // Scrape any remaining chunks that weren't flushed by the producers.
1210   for (auto& producer_id_and_producer : producers_)
1211     ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1212 
1213   if (tracing_session->write_into_file) {
1214     tracing_session->write_period_ms = 0;
1215     ReadBuffers(tracing_session->id, nullptr);
1216   }
1217 
1218   if (tracing_session->consumer_maybe_null)
1219     tracing_session->consumer_maybe_null->NotifyOnTracingDisabled();
1220 }
1221 
Flush(TracingSessionID tsid,uint32_t timeout_ms,ConsumerEndpoint::FlushCallback callback)1222 void TracingServiceImpl::Flush(TracingSessionID tsid,
1223                                uint32_t timeout_ms,
1224                                ConsumerEndpoint::FlushCallback callback) {
1225   PERFETTO_DCHECK_THREAD(thread_checker_);
1226   TracingSession* tracing_session = GetTracingSession(tsid);
1227   if (!tracing_session) {
1228     PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
1229     return;
1230   }
1231 
1232   if (!timeout_ms)
1233     timeout_ms = tracing_session->flush_timeout_ms();
1234 
1235   if (tracing_session->pending_flushes.size() > 1000) {
1236     PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
1237                   tracing_session->pending_flushes.size());
1238     callback(false);
1239     return;
1240   }
1241 
1242   FlushRequestID flush_request_id = ++last_flush_request_id_;
1243   PendingFlush& pending_flush =
1244       tracing_session->pending_flushes
1245           .emplace_hint(tracing_session->pending_flushes.end(),
1246                         flush_request_id, PendingFlush(std::move(callback)))
1247           ->second;
1248 
1249   // Send a flush request to each producer involved in the tracing session. In
1250   // order to issue a flush request we have to build a map of all data source
1251   // instance ids enabled for each producer.
1252   std::map<ProducerID, std::vector<DataSourceInstanceID>> flush_map;
1253   for (const auto& data_source_inst : tracing_session->data_source_instances) {
1254     const ProducerID producer_id = data_source_inst.first;
1255     const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id;
1256     flush_map[producer_id].push_back(ds_inst_id);
1257   }
1258 
1259   for (const auto& kv : flush_map) {
1260     ProducerID producer_id = kv.first;
1261     ProducerEndpointImpl* producer = GetProducer(producer_id);
1262     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
1263     producer->Flush(flush_request_id, data_sources);
1264     pending_flush.producers.insert(producer_id);
1265   }
1266 
1267   // If there are no producers to flush (realistically this happens only in
1268   // some tests) fire OnFlushTimeout() straight away, without waiting.
1269   if (flush_map.empty())
1270     timeout_ms = 0;
1271 
1272   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1273   task_runner_->PostDelayedTask(
1274       [weak_this, tsid, flush_request_id] {
1275         if (weak_this)
1276           weak_this->OnFlushTimeout(tsid, flush_request_id);
1277       },
1278       timeout_ms);
1279 }
1280 
NotifyFlushDoneForProducer(ProducerID producer_id,FlushRequestID flush_request_id)1281 void TracingServiceImpl::NotifyFlushDoneForProducer(
1282     ProducerID producer_id,
1283     FlushRequestID flush_request_id) {
1284   for (auto& kv : tracing_sessions_) {
1285     // Remove all pending flushes <= |flush_request_id| for |producer_id|.
1286     auto& pending_flushes = kv.second.pending_flushes;
1287     auto end_it = pending_flushes.upper_bound(flush_request_id);
1288     for (auto it = pending_flushes.begin(); it != end_it;) {
1289       PendingFlush& pending_flush = it->second;
1290       pending_flush.producers.erase(producer_id);
1291       if (pending_flush.producers.empty()) {
1292         auto weak_this = weak_ptr_factory_.GetWeakPtr();
1293         TracingSessionID tsid = kv.first;
1294         auto callback = std::move(pending_flush.callback);
1295         task_runner_->PostTask([weak_this, tsid, callback]() {
1296           if (weak_this) {
1297             weak_this->CompleteFlush(tsid, std::move(callback),
1298                                      /*success=*/true);
1299           }
1300         });
1301         it = pending_flushes.erase(it);
1302       } else {
1303         it++;
1304       }
1305     }  // for (pending_flushes)
1306   }    // for (tracing_session)
1307 }
1308 
OnFlushTimeout(TracingSessionID tsid,FlushRequestID flush_request_id)1309 void TracingServiceImpl::OnFlushTimeout(TracingSessionID tsid,
1310                                         FlushRequestID flush_request_id) {
1311   TracingSession* tracing_session = GetTracingSession(tsid);
1312   if (!tracing_session)
1313     return;
1314   auto it = tracing_session->pending_flushes.find(flush_request_id);
1315   if (it == tracing_session->pending_flushes.end())
1316     return;  // Nominal case: flush was completed and acked on time.
1317 
1318   // If there were no producers to flush, consider it a success.
1319   bool success = it->second.producers.empty();
1320 
1321   auto callback = std::move(it->second.callback);
1322   tracing_session->pending_flushes.erase(it);
1323   CompleteFlush(tsid, std::move(callback), success);
1324 }
1325 
CompleteFlush(TracingSessionID tsid,ConsumerEndpoint::FlushCallback callback,bool success)1326 void TracingServiceImpl::CompleteFlush(TracingSessionID tsid,
1327                                        ConsumerEndpoint::FlushCallback callback,
1328                                        bool success) {
1329   TracingSession* tracing_session = GetTracingSession(tsid);
1330   if (tracing_session) {
1331     // Producers may not have been able to flush all their data, even if they
1332     // indicated flush completion. If possible, also collect uncommitted chunks
1333     // to make sure we have everything they wrote so far.
1334     for (auto& producer_id_and_producer : producers_) {
1335       ScrapeSharedMemoryBuffers(tracing_session,
1336                                 producer_id_and_producer.second);
1337     }
1338   }
1339   callback(success);
1340 }
1341 
ScrapeSharedMemoryBuffers(TracingSession * tracing_session,ProducerEndpointImpl * producer)1342 void TracingServiceImpl::ScrapeSharedMemoryBuffers(
1343     TracingSession* tracing_session,
1344     ProducerEndpointImpl* producer) {
1345   if (!producer->smb_scraping_enabled_)
1346     return;
1347 
1348   // Can't copy chunks if we don't know about any trace writers.
1349   if (producer->writers_.empty())
1350     return;
1351 
1352   // Performance optimization: On flush or session disconnect, this method is
1353   // called for each producer. If the producer doesn't participate in the
1354   // session, there's no need to scape its chunks right now. We can tell if a
1355   // producer participates in the session by checking if the producer is allowed
1356   // to write into the session's log buffers.
1357   const auto& session_buffers = tracing_session->buffers_index;
1358   bool producer_in_session =
1359       std::any_of(session_buffers.begin(), session_buffers.end(),
1360                   [producer](BufferID buffer_id) {
1361                     return producer->allowed_target_buffers_.count(buffer_id);
1362                   });
1363   if (!producer_in_session)
1364     return;
1365 
1366   PERFETTO_DLOG("Scraping SMB for producer %" PRIu16, producer->id_);
1367 
1368   // Find and copy any uncommitted chunks from the SMB.
1369   //
1370   // In nominal conditions, the page layout of the used SMB pages should never
1371   // change because the service is the only one who is supposed to modify used
1372   // pages (to make them free again).
1373   //
1374   // However, the code here needs to deal with the case of a malicious producer
1375   // altering the SMB in unpredictable ways. Thankfully the SMB size is
1376   // immutable, so a chunk will always point to some valid memory, even if the
1377   // producer alters the intended layout and chunk header concurrently.
1378   // Ultimately a malicious producer altering the SMB's chunk layout while we
1379   // are iterating in this function is not any different from the case of a
1380   // malicious producer asking to commit a chunk made of random data, which is
1381   // something this class has to deal with regardless.
1382   //
1383   // The only legitimate mutations that can happen from sane producers,
1384   // concurrently to this function, are:
1385   //   A. free pages being partitioned,
1386   //   B. free chunks being migrated to kChunkBeingWritten,
1387   //   C. kChunkBeingWritten chunks being migrated to kChunkCompleted.
1388 
1389   SharedMemoryABI* abi = &producer->shmem_abi_;
1390   // num_pages() is immutable after the SMB is initialized and cannot be changed
1391   // even by a producer even if malicious.
1392   for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
1393     uint32_t layout = abi->GetPageLayout(page_idx);
1394 
1395     uint32_t used_chunks = abi->GetUsedChunks(layout);  // Returns a bitmap.
1396     // Skip empty pages.
1397     if (used_chunks == 0)
1398       continue;
1399 
1400     // Scrape the chunks that are currently used. These should be either in
1401     // state kChunkBeingWritten or kChunkComplete.
1402     for (uint32_t chunk_idx = 0; used_chunks; chunk_idx++, used_chunks >>= 1) {
1403       if (!(used_chunks & 1))
1404         continue;
1405 
1406       SharedMemoryABI::ChunkState state =
1407           SharedMemoryABI::GetChunkStateFromLayout(layout, chunk_idx);
1408       PERFETTO_DCHECK(state == SharedMemoryABI::kChunkBeingWritten ||
1409                       state == SharedMemoryABI::kChunkComplete);
1410       bool chunk_complete = state == SharedMemoryABI::kChunkComplete;
1411 
1412       SharedMemoryABI::Chunk chunk =
1413           abi->GetChunkUnchecked(page_idx, layout, chunk_idx);
1414 
1415       uint16_t packet_count;
1416       uint8_t flags;
1417       // GetPacketCountAndFlags has acquire_load semantics.
1418       std::tie(packet_count, flags) = chunk.GetPacketCountAndFlags();
1419 
1420       // It only makes sense to copy an incomplete chunk if there's at least
1421       // one full packet available. (The producer may not have completed the
1422       // last packet in it yet, so we need at least 2.)
1423       if (!chunk_complete && packet_count < 2)
1424         continue;
1425 
1426       // At this point, it is safe to access the remaining header fields of
1427       // the chunk. Even if the chunk was only just transferred from
1428       // kChunkFree into kChunkBeingWritten state, the header should be
1429       // written completely once the packet count increased above 1 (it was
1430       // reset to 0 by the service when the chunk was freed).
1431 
1432       WriterID writer_id = chunk.writer_id();
1433       base::Optional<BufferID> target_buffer_id =
1434           producer->buffer_id_for_writer(writer_id);
1435 
1436       // We can only scrape this chunk if we know which log buffer to copy it
1437       // into.
1438       if (!target_buffer_id)
1439         continue;
1440 
1441       // Skip chunks that don't belong to the requested tracing session.
1442       bool target_buffer_belongs_to_session =
1443           std::find(session_buffers.begin(), session_buffers.end(),
1444                     *target_buffer_id) != session_buffers.end();
1445       if (!target_buffer_belongs_to_session)
1446         continue;
1447 
1448       uint32_t chunk_id =
1449           chunk.header()->chunk_id.load(std::memory_order_relaxed);
1450 
1451       CopyProducerPageIntoLogBuffer(
1452           producer->id_, producer->uid_, writer_id, chunk_id, *target_buffer_id,
1453           packet_count, flags, chunk_complete, chunk.payload_begin(),
1454           chunk.payload_size());
1455     }
1456   }
1457 }
1458 
FlushAndDisableTracing(TracingSessionID tsid)1459 void TracingServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
1460   PERFETTO_DCHECK_THREAD(thread_checker_);
1461   PERFETTO_DLOG("Triggering final flush for %" PRIu64, tsid);
1462   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1463   Flush(tsid, 0, [weak_this, tsid](bool success) {
1464     PERFETTO_DLOG("Flush done (success: %d), disabling trace session %" PRIu64,
1465                   success, tsid);
1466     if (!weak_this)
1467       return;
1468     TracingSession* session = weak_this->GetTracingSession(tsid);
1469     if (session->consumer_maybe_null) {
1470       // If the consumer is still attached, just disable the session but give it
1471       // a chance to read the contents.
1472       weak_this->DisableTracing(tsid);
1473     } else {
1474       // If the consumer detached, destroy the session. If the consumer did
1475       // start the session in long-tracing mode, the service will have saved
1476       // the contents to the passed file. If not, the contents will be
1477       // destroyed.
1478       weak_this->FreeBuffers(tsid);
1479     }
1480   });
1481 }
1482 
PeriodicFlushTask(TracingSessionID tsid,bool post_next_only)1483 void TracingServiceImpl::PeriodicFlushTask(TracingSessionID tsid,
1484                                            bool post_next_only) {
1485   PERFETTO_DCHECK_THREAD(thread_checker_);
1486   TracingSession* tracing_session = GetTracingSession(tsid);
1487   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
1488     return;
1489 
1490   uint32_t flush_period_ms = tracing_session->config.flush_period_ms();
1491   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1492   task_runner_->PostDelayedTask(
1493       [weak_this, tsid] {
1494         if (weak_this)
1495           weak_this->PeriodicFlushTask(tsid, /*post_next_only=*/false);
1496       },
1497       flush_period_ms - (base::GetWallTimeMs().count() % flush_period_ms));
1498 
1499   if (post_next_only)
1500     return;
1501 
1502   PERFETTO_DLOG("Triggering periodic flush for trace session %" PRIu64, tsid);
1503   Flush(tsid, 0, [](bool success) {
1504     if (!success)
1505       PERFETTO_ELOG("Periodic flush timed out");
1506   });
1507 }
1508 
PeriodicClearIncrementalStateTask(TracingSessionID tsid,bool post_next_only)1509 void TracingServiceImpl::PeriodicClearIncrementalStateTask(
1510     TracingSessionID tsid,
1511     bool post_next_only) {
1512   PERFETTO_DCHECK_THREAD(thread_checker_);
1513   TracingSession* tracing_session = GetTracingSession(tsid);
1514   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
1515     return;
1516 
1517   uint32_t clear_period_ms =
1518       tracing_session->config.incremental_state_config().clear_period_ms();
1519   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1520   task_runner_->PostDelayedTask(
1521       [weak_this, tsid] {
1522         if (weak_this)
1523           weak_this->PeriodicClearIncrementalStateTask(
1524               tsid, /*post_next_only=*/false);
1525       },
1526       clear_period_ms - (base::GetWallTimeMs().count() % clear_period_ms));
1527 
1528   if (post_next_only)
1529     return;
1530 
1531   PERFETTO_DLOG(
1532       "Performing periodic incremental state clear for trace session %" PRIu64,
1533       tsid);
1534 
1535   // Queue the IPCs to producers with active data sources that opted in.
1536   std::map<ProducerID, std::vector<DataSourceInstanceID>> clear_map;
1537   for (const auto& kv : tracing_session->data_source_instances) {
1538     ProducerID producer_id = kv.first;
1539     const DataSourceInstance& data_source = kv.second;
1540     if (data_source.handles_incremental_state_clear)
1541       clear_map[producer_id].push_back(data_source.instance_id);
1542   }
1543 
1544   for (const auto& kv : clear_map) {
1545     ProducerID producer_id = kv.first;
1546     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
1547     ProducerEndpointImpl* producer = GetProducer(producer_id);
1548     if (!producer) {
1549       PERFETTO_DFATAL("Producer does not exist.");
1550       continue;
1551     }
1552     producer->ClearIncrementalState(data_sources);
1553   }
1554 }
1555 
1556 // Note: when this is called to write into a file passed when starting tracing
1557 // |consumer| will be == nullptr (as opposite to the case of a consumer asking
1558 // to send the trace data back over IPC).
ReadBuffers(TracingSessionID tsid,ConsumerEndpointImpl * consumer)1559 bool TracingServiceImpl::ReadBuffers(TracingSessionID tsid,
1560                                      ConsumerEndpointImpl* consumer) {
1561   PERFETTO_DCHECK_THREAD(thread_checker_);
1562   TracingSession* tracing_session = GetTracingSession(tsid);
1563   if (!tracing_session) {
1564     // This will be hit systematically from the PostDelayedTask when directly
1565     // writing into the file (in which case consumer == nullptr). Suppress the
1566     // log in this case as it's just spam.
1567     if (consumer) {
1568       PERFETTO_DLOG("Cannot ReadBuffers(): no tracing session is active");
1569     }
1570     return false;
1571   }
1572 
1573   // When a tracing session is waiting for a trigger it is considered empty. If
1574   // a tracing session finishes and moves into DISABLED without ever receiving a
1575   // trigger the trace should never return any data. This includes the synthetic
1576   // packets like TraceConfig and Clock snapshots. So we bail out early and let
1577   // the consumer know there is no data.
1578   if (!tracing_session->config.trigger_config().triggers().empty() &&
1579       tracing_session->received_triggers.empty()) {
1580     PERFETTO_DLOG(
1581         "ReadBuffers(): tracing session has not received a trigger yet.");
1582     return false;
1583   }
1584 
1585   // This can happen if the file is closed by a previous task because it reaches
1586   // |max_file_size_bytes|.
1587   if (!tracing_session->write_into_file && !consumer)
1588     return false;
1589 
1590   if (tracing_session->write_into_file && consumer) {
1591     // If the consumer enabled tracing and asked to save the contents into the
1592     // passed file makes little sense to also try to read the buffers over IPC,
1593     // as that would just steal data from the periodic draining task.
1594     PERFETTO_DFATAL("Consumer trying to read from write_into_file session.");
1595     return false;
1596   }
1597 
1598   std::vector<TracePacket> packets;
1599   packets.reserve(1024);  // Just an educated guess to avoid trivial expansions.
1600 
1601   std::move(tracing_session->initial_clock_snapshot_.begin(),
1602             tracing_session->initial_clock_snapshot_.end(),
1603             std::back_inserter(packets));
1604   tracing_session->initial_clock_snapshot_.clear();
1605 
1606   base::TimeMillis now = base::GetWallTimeMs();
1607   if (now >= tracing_session->last_snapshot_time + kSnapshotsInterval) {
1608     tracing_session->last_snapshot_time = now;
1609     // Don't emit the stats immediately, but instead wait until no more trace
1610     // data is available to read. That way, any problems that occur while
1611     // reading from the buffers are reflected in the emitted stats. This is
1612     // particularly important for use cases where ReadBuffers is only ever
1613     // called after the tracing session is stopped.
1614     tracing_session->should_emit_stats = true;
1615     SnapshotSyncMarker(&packets);
1616 
1617     if (!tracing_session->config.builtin_data_sources()
1618              .disable_clock_snapshotting()) {
1619       // We don't want to put a root timestamp in this snapshot as the packet
1620       // may be very out of order with respect to the actual trace packets
1621       // since consuming the trace may happen at any point after it starts.
1622       SnapshotClocks(&packets, /*set_root_timestamp=*/false);
1623     }
1624   }
1625   if (!tracing_session->config.builtin_data_sources().disable_trace_config()) {
1626     MaybeEmitTraceConfig(tracing_session, &packets);
1627     MaybeEmitReceivedTriggers(tracing_session, &packets);
1628   }
1629   if (!tracing_session->config.builtin_data_sources().disable_system_info())
1630     MaybeEmitSystemInfo(tracing_session, &packets);
1631 
1632   size_t packets_bytes = 0;  // SUM(slice.size() for each slice in |packets|).
1633   size_t total_slices = 0;   // SUM(#slices in |packets|).
1634 
1635   // Add up size for packets added by the Maybe* calls above.
1636   for (const TracePacket& packet : packets) {
1637     packets_bytes += packet.size();
1638     total_slices += packet.slices().size();
1639   }
1640 
1641   // This is a rough threshold to determine how much to read from the buffer in
1642   // each task. This is to avoid executing a single huge sending task for too
1643   // long and risk to hit the watchdog. This is *not* an upper bound: we just
1644   // stop accumulating new packets and PostTask *after* we cross this threshold.
1645   // This constant essentially balances the PostTask and IPC overhead vs the
1646   // responsiveness of the service. An extremely small value will cause one IPC
1647   // and one PostTask for each slice but will keep the service extremely
1648   // responsive. An extremely large value will batch the send for the full
1649   // buffer in one large task, will hit the blocking send() once the socket
1650   // buffers are full and hang the service for a bit (until the consumer
1651   // catches up).
1652   static constexpr size_t kApproxBytesPerTask = 32768;
1653   bool did_hit_threshold = false;
1654 
1655   // TODO(primiano): Extend the ReadBuffers API to allow reading only some
1656   // buffers, not all of them in one go.
1657   for (size_t buf_idx = 0;
1658        buf_idx < tracing_session->num_buffers() && !did_hit_threshold;
1659        buf_idx++) {
1660     auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]);
1661     if (tbuf_iter == buffers_.end()) {
1662       PERFETTO_DFATAL("Buffer not found.");
1663       continue;
1664     }
1665     TraceBuffer& tbuf = *tbuf_iter->second;
1666     tbuf.BeginRead();
1667     while (!did_hit_threshold) {
1668       TracePacket packet;
1669       TraceBuffer::PacketSequenceProperties sequence_properties{};
1670       bool previous_packet_dropped;
1671       if (!tbuf.ReadNextTracePacket(&packet, &sequence_properties,
1672                                     &previous_packet_dropped)) {
1673         break;
1674       }
1675       PERFETTO_DCHECK(sequence_properties.producer_id_trusted != 0);
1676       PERFETTO_DCHECK(sequence_properties.writer_id != 0);
1677       PERFETTO_DCHECK(sequence_properties.producer_uid_trusted != kInvalidUid);
1678       PERFETTO_DCHECK(packet.size() > 0);
1679       if (!PacketStreamValidator::Validate(packet.slices())) {
1680         tracing_session->invalid_packets++;
1681         PERFETTO_DLOG("Dropping invalid packet");
1682         continue;
1683       }
1684 
1685       // Append a slice with the trusted field data. This can't be spoofed
1686       // because above we validated that the existing slices don't contain any
1687       // trusted fields. For added safety we append instead of prepending
1688       // because according to protobuf semantics, if the same field is
1689       // encountered multiple times the last instance takes priority. Note that
1690       // truncated packets are also rejected, so the producer can't give us a
1691       // partial packet (e.g., a truncated string) which only becomes valid when
1692       // the trusted data is appended here.
1693       Slice slice = Slice::Allocate(32);
1694       protozero::StaticBuffered<protos::pbzero::TracePacket> trusted_packet(
1695           slice.own_data(), slice.size);
1696       trusted_packet->set_trusted_uid(
1697           static_cast<int32_t>(sequence_properties.producer_uid_trusted));
1698       trusted_packet->set_trusted_packet_sequence_id(
1699           tracing_session->GetPacketSequenceID(
1700               sequence_properties.producer_id_trusted,
1701               sequence_properties.writer_id));
1702       if (previous_packet_dropped)
1703         trusted_packet->set_previous_packet_dropped(previous_packet_dropped);
1704       slice.size = trusted_packet.Finalize();
1705       packet.AddSlice(std::move(slice));
1706 
1707       // Append the packet (inclusive of the trusted uid) to |packets|.
1708       packets_bytes += packet.size();
1709       total_slices += packet.slices().size();
1710       did_hit_threshold = packets_bytes >= kApproxBytesPerTask &&
1711                           !tracing_session->write_into_file;
1712       packets.emplace_back(std::move(packet));
1713     }  // for(packets...)
1714   }    // for(buffers...)
1715 
1716   const bool has_more = did_hit_threshold;
1717   if (!has_more && tracing_session->should_emit_stats) {
1718     size_t prev_packets_size = packets.size();
1719     SnapshotStats(tracing_session, &packets);
1720     tracing_session->should_emit_stats = false;
1721 
1722     // Add sizes of packets emitted by SnapshotStats.
1723     for (size_t i = prev_packets_size; i < packets.size(); ++i) {
1724       packets_bytes += packets[i].size();
1725       total_slices += packets[i].slices().size();
1726     }
1727   }
1728 
1729   // If the caller asked us to write into a file by setting
1730   // |write_into_file| == true in the trace config, drain the packets read
1731   // (if any) into the given file descriptor.
1732   if (tracing_session->write_into_file) {
1733     const uint64_t max_size = tracing_session->max_file_size_bytes
1734                                   ? tracing_session->max_file_size_bytes
1735                                   : std::numeric_limits<size_t>::max();
1736 
1737     // When writing into a file, the file should look like a root trace.proto
1738     // message. Each packet should be prepended with a proto preamble stating
1739     // its field id (within trace.proto) and size. Hence the addition below.
1740     const size_t max_iovecs = total_slices + packets.size();
1741 
1742     size_t num_iovecs = 0;
1743     bool stop_writing_into_file = tracing_session->write_period_ms == 0;
1744     std::unique_ptr<struct iovec[]> iovecs(new struct iovec[max_iovecs]);
1745     size_t num_iovecs_at_last_packet = 0;
1746     uint64_t bytes_about_to_be_written = 0;
1747     for (TracePacket& packet : packets) {
1748       std::tie(iovecs[num_iovecs].iov_base, iovecs[num_iovecs].iov_len) =
1749           packet.GetProtoPreamble();
1750       bytes_about_to_be_written += iovecs[num_iovecs].iov_len;
1751       num_iovecs++;
1752       for (const Slice& slice : packet.slices()) {
1753         // writev() doesn't change the passed pointer. However, struct iovec
1754         // take a non-const ptr because it's the same struct used by readv().
1755         // Hence the const_cast here.
1756         char* start = static_cast<char*>(const_cast<void*>(slice.start));
1757         bytes_about_to_be_written += slice.size;
1758         iovecs[num_iovecs++] = {start, slice.size};
1759       }
1760 
1761       if (tracing_session->bytes_written_into_file +
1762               bytes_about_to_be_written >=
1763           max_size) {
1764         stop_writing_into_file = true;
1765         num_iovecs = num_iovecs_at_last_packet;
1766         break;
1767       }
1768 
1769       num_iovecs_at_last_packet = num_iovecs;
1770     }
1771     PERFETTO_DCHECK(num_iovecs <= max_iovecs);
1772     int fd = *tracing_session->write_into_file;
1773 
1774     uint64_t total_wr_size = 0;
1775 
1776     // writev() can take at most IOV_MAX entries per call. Batch them.
1777     constexpr size_t kIOVMax = IOV_MAX;
1778     for (size_t i = 0; i < num_iovecs; i += kIOVMax) {
1779       int iov_batch_size = static_cast<int>(std::min(num_iovecs - i, kIOVMax));
1780       ssize_t wr_size = PERFETTO_EINTR(writev(fd, &iovecs[i], iov_batch_size));
1781       if (wr_size <= 0) {
1782         PERFETTO_ELOG("writev() failed (errno: %d, %s)", errno, strerror(errno));
1783         stop_writing_into_file = true;
1784         break;
1785       }
1786       total_wr_size += static_cast<size_t>(wr_size);
1787     }
1788 
1789     tracing_session->bytes_written_into_file += total_wr_size;
1790 
1791     PERFETTO_DLOG("Draining into file, written: %" PRIu64 " KB, stop: %d",
1792                   (total_wr_size + 1023) / 1024, stop_writing_into_file);
1793     if (stop_writing_into_file) {
1794       // Ensure all data was written to the file before we close it.
1795       base::FlushFile(fd);
1796       tracing_session->write_into_file.reset();
1797       tracing_session->write_period_ms = 0;
1798       if (tracing_session->state == TracingSession::STARTED)
1799         DisableTracing(tsid);
1800       return true;
1801     }
1802 
1803     auto weak_this = weak_ptr_factory_.GetWeakPtr();
1804     task_runner_->PostDelayedTask(
1805         [weak_this, tsid] {
1806           if (weak_this)
1807             weak_this->ReadBuffers(tsid, nullptr);
1808         },
1809         tracing_session->delay_to_next_write_period_ms());
1810     return true;
1811   }  // if (tracing_session->write_into_file)
1812 
1813   if (has_more) {
1814     auto weak_consumer = consumer->GetWeakPtr();
1815     auto weak_this = weak_ptr_factory_.GetWeakPtr();
1816     task_runner_->PostTask([weak_this, weak_consumer, tsid] {
1817       if (!weak_this || !weak_consumer)
1818         return;
1819       weak_this->ReadBuffers(tsid, weak_consumer.get());
1820     });
1821   }
1822 
1823   // Keep this as tail call, just in case the consumer re-enters.
1824   consumer->consumer_->OnTraceData(std::move(packets), has_more);
1825   return true;
1826 }
1827 
FreeBuffers(TracingSessionID tsid)1828 void TracingServiceImpl::FreeBuffers(TracingSessionID tsid) {
1829   PERFETTO_DCHECK_THREAD(thread_checker_);
1830   PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
1831   TracingSession* tracing_session = GetTracingSession(tsid);
1832   if (!tracing_session) {
1833     PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid);
1834     return;  // TODO(primiano): signal failure?
1835   }
1836   DisableTracing(tsid, /*disable_immediately=*/true);
1837 
1838   PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
1839   tracing_session->data_source_instances.clear();
1840 
1841   for (auto& producer_entry : producers_) {
1842     ProducerEndpointImpl* producer = producer_entry.second;
1843     producer->OnFreeBuffers(tracing_session->buffers_index);
1844   }
1845 
1846   for (BufferID buffer_id : tracing_session->buffers_index) {
1847     buffer_ids_.Free(buffer_id);
1848     PERFETTO_DCHECK(buffers_.count(buffer_id) == 1);
1849     buffers_.erase(buffer_id);
1850   }
1851   bool notify_traceur = tracing_session->config.notify_traceur();
1852   tracing_sessions_.erase(tsid);
1853   UpdateMemoryGuardrail();
1854 
1855   PERFETTO_LOG("Tracing session %" PRIu64 " ended, total sessions:%zu", tsid,
1856                tracing_sessions_.size());
1857 
1858 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
1859   static const char kTraceurProp[] = "sys.trace.trace_end_signal";
1860   if (notify_traceur && __system_property_set(kTraceurProp, "1"))
1861     PERFETTO_ELOG("Failed to setprop %s=1", kTraceurProp);
1862 #else
1863   base::ignore_result(notify_traceur);
1864 #endif
1865 }
1866 
RegisterDataSource(ProducerID producer_id,const DataSourceDescriptor & desc)1867 void TracingServiceImpl::RegisterDataSource(ProducerID producer_id,
1868                                             const DataSourceDescriptor& desc) {
1869   PERFETTO_DCHECK_THREAD(thread_checker_);
1870   PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
1871                 producer_id, desc.name().c_str());
1872 
1873   PERFETTO_DCHECK(!desc.name().empty());
1874   auto reg_ds = data_sources_.emplace(desc.name(),
1875                                       RegisteredDataSource{producer_id, desc});
1876 
1877   // If there are existing tracing sessions, we need to check if the new
1878   // data source is enabled by any of them.
1879   if (tracing_sessions_.empty())
1880     return;
1881 
1882   ProducerEndpointImpl* producer = GetProducer(producer_id);
1883   if (!producer) {
1884     PERFETTO_DFATAL("Producer not found.");
1885     return;
1886   }
1887 
1888   for (auto& iter : tracing_sessions_) {
1889     TracingSession& tracing_session = iter.second;
1890     if (tracing_session.state != TracingSession::STARTED &&
1891         tracing_session.state != TracingSession::CONFIGURED) {
1892       continue;
1893     }
1894 
1895     TraceConfig::ProducerConfig producer_config;
1896     for (auto& config : tracing_session.config.producers()) {
1897       if (producer->name_ == config.producer_name()) {
1898         producer_config = config;
1899         break;
1900       }
1901     }
1902     for (const TraceConfig::DataSource& cfg_data_source :
1903          tracing_session.config.data_sources()) {
1904       if (cfg_data_source.config().name() != desc.name())
1905         continue;
1906       DataSourceInstance* ds_inst = SetupDataSource(
1907           cfg_data_source, producer_config, reg_ds->second, &tracing_session);
1908       if (ds_inst && tracing_session.state == TracingSession::STARTED)
1909         StartDataSourceInstance(producer, &tracing_session, ds_inst);
1910     }
1911   }
1912 }
1913 
StopDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,DataSourceInstance * instance,bool disable_immediately)1914 void TracingServiceImpl::StopDataSourceInstance(ProducerEndpointImpl* producer,
1915                                                 TracingSession* tracing_session,
1916                                                 DataSourceInstance* instance,
1917                                                 bool disable_immediately) {
1918   const DataSourceInstanceID ds_inst_id = instance->instance_id;
1919   if (instance->will_notify_on_stop && !disable_immediately) {
1920     instance->state = DataSourceInstance::STOPPING;
1921   } else {
1922     instance->state = DataSourceInstance::STOPPED;
1923   }
1924   if (tracing_session->consumer_maybe_null) {
1925     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1926         *producer, *instance);
1927   }
1928   producer->StopDataSource(ds_inst_id);
1929 }
1930 
UnregisterDataSource(ProducerID producer_id,const std::string & name)1931 void TracingServiceImpl::UnregisterDataSource(ProducerID producer_id,
1932                                               const std::string& name) {
1933   PERFETTO_DCHECK_THREAD(thread_checker_);
1934   PERFETTO_DLOG("Producer %" PRIu16 " unregistered data source \"%s\"",
1935                 producer_id, name.c_str());
1936   PERFETTO_CHECK(producer_id);
1937   ProducerEndpointImpl* producer = GetProducer(producer_id);
1938   PERFETTO_DCHECK(producer);
1939   for (auto& kv : tracing_sessions_) {
1940     auto& ds_instances = kv.second.data_source_instances;
1941     for (auto it = ds_instances.begin(); it != ds_instances.end();) {
1942       if (it->first == producer_id && it->second.data_source_name == name) {
1943         DataSourceInstanceID ds_inst_id = it->second.instance_id;
1944         if (it->second.state != DataSourceInstance::STOPPED) {
1945           if (it->second.state != DataSourceInstance::STOPPING)
1946             StopDataSourceInstance(producer, &kv.second, &it->second,
1947                                    /* disable_immediately = */ false);
1948           // Mark the instance as stopped immediately, since we are
1949           // unregistering it below.
1950           if (it->second.state == DataSourceInstance::STOPPING)
1951             NotifyDataSourceStopped(producer_id, ds_inst_id);
1952         }
1953         it = ds_instances.erase(it);
1954       } else {
1955         ++it;
1956       }
1957     }  // for (data_source_instances)
1958   }    // for (tracing_session)
1959 
1960   for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) {
1961     if (it->second.producer_id == producer_id &&
1962         it->second.descriptor.name() == name) {
1963       data_sources_.erase(it);
1964       return;
1965     }
1966   }
1967 
1968   PERFETTO_DFATAL(
1969       "Tried to unregister a non-existent data source \"%s\" for "
1970       "producer %" PRIu16,
1971       name.c_str(), producer_id);
1972 }
1973 
SetupDataSource(const TraceConfig::DataSource & cfg_data_source,const TraceConfig::ProducerConfig & producer_config,const RegisteredDataSource & data_source,TracingSession * tracing_session)1974 TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
1975     const TraceConfig::DataSource& cfg_data_source,
1976     const TraceConfig::ProducerConfig& producer_config,
1977     const RegisteredDataSource& data_source,
1978     TracingSession* tracing_session) {
1979   PERFETTO_DCHECK_THREAD(thread_checker_);
1980   ProducerEndpointImpl* producer = GetProducer(data_source.producer_id);
1981   PERFETTO_DCHECK(producer);
1982   // An existing producer that is not ftrace could have registered itself as
1983   // ftrace, we must not enable it in that case.
1984   if (lockdown_mode_ && producer->uid_ != uid_) {
1985     PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
1986     return nullptr;
1987   }
1988   // TODO(primiano): Add tests for registration ordering (data sources vs
1989   // consumers).
1990   if (!NameMatchesFilter(producer->name_,
1991                          cfg_data_source.producer_name_filter(),
1992                          cfg_data_source.producer_name_regex_filter())) {
1993     PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
1994                   cfg_data_source.config().name().c_str(),
1995                   producer->name_.c_str());
1996     return nullptr;
1997   }
1998 
1999   auto relative_buffer_id = cfg_data_source.config().target_buffer();
2000   if (relative_buffer_id >= tracing_session->num_buffers()) {
2001     PERFETTO_LOG(
2002         "The TraceConfig for DataSource %s specified a target_buffer out of "
2003         "bound (%d). Skipping it.",
2004         cfg_data_source.config().name().c_str(), relative_buffer_id);
2005     return nullptr;
2006   }
2007 
2008   // Create a copy of the DataSourceConfig specified in the trace config. This
2009   // will be passed to the producer after translating the |target_buffer| id.
2010   // The |target_buffer| parameter passed by the consumer in the trace config is
2011   // relative to the buffers declared in the same trace config. This has to be
2012   // translated to the global BufferID before passing it to the producers, which
2013   // don't know anything about tracing sessions and consumers.
2014 
2015   DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
2016   auto insert_iter = tracing_session->data_source_instances.emplace(
2017       std::piecewise_construct,  //
2018       std::forward_as_tuple(producer->id_),
2019       std::forward_as_tuple(
2020           inst_id,
2021           cfg_data_source.config(),  //  Deliberate copy.
2022           data_source.descriptor.name(),
2023           data_source.descriptor.will_notify_on_start(),
2024           data_source.descriptor.will_notify_on_stop(),
2025           data_source.descriptor.handles_incremental_state_clear()));
2026   DataSourceInstance* ds_instance = &insert_iter->second;
2027 
2028   // New data source instance starts out in CONFIGURED state.
2029   if (tracing_session->consumer_maybe_null) {
2030     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2031         *producer, *ds_instance);
2032   }
2033 
2034   DataSourceConfig& ds_config = ds_instance->config;
2035   ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
2036   ds_config.set_stop_timeout_ms(tracing_session->data_source_stop_timeout_ms());
2037   ds_config.set_enable_extra_guardrails(
2038       tracing_session->config.enable_extra_guardrails());
2039   ds_config.set_tracing_session_id(tracing_session->id);
2040   BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
2041   PERFETTO_DCHECK(global_id);
2042   ds_config.set_target_buffer(global_id);
2043 
2044   PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
2045                 ds_config.name().c_str(), global_id);
2046   if (!producer->shared_memory()) {
2047     // Determine the SMB page size. Must be an integer multiple of 4k.
2048     // As for the SMB size below, the decision tree is as follows:
2049     // 1. Give priority to what is defined in the trace config.
2050     // 2. If unset give priority to the hint passed by the producer.
2051     // 3. Keep within bounds and ensure it's a multiple of 4k.
2052     size_t page_size = producer_config.page_size_kb() * 1024;
2053     if (page_size == 0)
2054       page_size = producer->shmem_page_size_hint_bytes_;
2055 
2056     // Determine the SMB size. Must be an integer multiple of the SMB page size.
2057     // The decision tree is as follows:
2058     // 1. Give priority to what defined in the trace config.
2059     // 2. If unset give priority to the hint passed by the producer.
2060     // 3. Keep within bounds and ensure it's a multiple of the page size.
2061     size_t shm_size = producer_config.shm_size_kb() * 1024;
2062     if (shm_size == 0)
2063       shm_size = producer->shmem_size_hint_bytes_;
2064 
2065     auto valid_sizes = EnsureValidShmSizes(shm_size, page_size);
2066     if (valid_sizes != std::tie(shm_size, page_size)) {
2067       PERFETTO_DLOG(
2068           "Invalid configured SMB sizes: shm_size %zu page_size %zu. Falling "
2069           "back to shm_size %zu page_size %zu.",
2070           shm_size, page_size, std::get<0>(valid_sizes),
2071           std::get<1>(valid_sizes));
2072     }
2073     std::tie(shm_size, page_size) = valid_sizes;
2074 
2075     // TODO(primiano): right now Create() will suicide in case of OOM if the
2076     // mmap fails. We should instead gracefully fail the request and tell the
2077     // client to go away.
2078     PERFETTO_DLOG("Creating SMB of %zu KB for producer \"%s\"", shm_size / 1024,
2079                   producer->name_.c_str());
2080     auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
2081     producer->SetupSharedMemory(std::move(shared_memory), page_size,
2082                                 /*provided_by_producer=*/false);
2083   }
2084   producer->SetupDataSource(inst_id, ds_config);
2085   return ds_instance;
2086 }
2087 
2088 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
2089 // might be lying / returning garbage contents. |src| and |size| can be trusted
2090 // in terms of being a valid pointer, but not the contents.
CopyProducerPageIntoLogBuffer(ProducerID producer_id_trusted,uid_t producer_uid_trusted,WriterID writer_id,ChunkID chunk_id,BufferID buffer_id,uint16_t num_fragments,uint8_t chunk_flags,bool chunk_complete,const uint8_t * src,size_t size)2091 void TracingServiceImpl::CopyProducerPageIntoLogBuffer(
2092     ProducerID producer_id_trusted,
2093     uid_t producer_uid_trusted,
2094     WriterID writer_id,
2095     ChunkID chunk_id,
2096     BufferID buffer_id,
2097     uint16_t num_fragments,
2098     uint8_t chunk_flags,
2099     bool chunk_complete,
2100     const uint8_t* src,
2101     size_t size) {
2102   PERFETTO_DCHECK_THREAD(thread_checker_);
2103 
2104   ProducerEndpointImpl* producer = GetProducer(producer_id_trusted);
2105   if (!producer) {
2106     PERFETTO_DFATAL("Producer not found.");
2107     chunks_discarded_++;
2108     return;
2109   }
2110 
2111   TraceBuffer* buf = GetBufferByID(buffer_id);
2112   if (!buf) {
2113     PERFETTO_DLOG("Could not find target buffer %" PRIu16
2114                   " for producer %" PRIu16,
2115                   buffer_id, producer_id_trusted);
2116     chunks_discarded_++;
2117     return;
2118   }
2119 
2120   // Verify that the producer is actually allowed to write into the target
2121   // buffer specified in the request. This prevents a malicious producer from
2122   // injecting data into a log buffer that belongs to a tracing session the
2123   // producer is not part of.
2124   if (!producer->is_allowed_target_buffer(buffer_id)) {
2125     PERFETTO_ELOG("Producer %" PRIu16
2126                   " tried to write into forbidden target buffer %" PRIu16,
2127                   producer_id_trusted, buffer_id);
2128     PERFETTO_DFATAL("Forbidden target buffer");
2129     chunks_discarded_++;
2130     return;
2131   }
2132 
2133   // If the writer was registered by the producer, it should only write into the
2134   // buffer it was registered with.
2135   base::Optional<BufferID> associated_buffer =
2136       producer->buffer_id_for_writer(writer_id);
2137   if (associated_buffer && *associated_buffer != buffer_id) {
2138     PERFETTO_ELOG("Writer %" PRIu16 " of producer %" PRIu16
2139                   " was registered to write into target buffer %" PRIu16
2140                   ", but tried to write into buffer %" PRIu16,
2141                   writer_id, producer_id_trusted, *associated_buffer,
2142                   buffer_id);
2143     PERFETTO_DFATAL("Wrong target buffer");
2144     chunks_discarded_++;
2145     return;
2146   }
2147 
2148   buf->CopyChunkUntrusted(producer_id_trusted, producer_uid_trusted, writer_id,
2149                           chunk_id, num_fragments, chunk_flags, chunk_complete,
2150                           src, size);
2151 }
2152 
ApplyChunkPatches(ProducerID producer_id_trusted,const std::vector<CommitDataRequest::ChunkToPatch> & chunks_to_patch)2153 void TracingServiceImpl::ApplyChunkPatches(
2154     ProducerID producer_id_trusted,
2155     const std::vector<CommitDataRequest::ChunkToPatch>& chunks_to_patch) {
2156   PERFETTO_DCHECK_THREAD(thread_checker_);
2157 
2158   for (const auto& chunk : chunks_to_patch) {
2159     const ChunkID chunk_id = static_cast<ChunkID>(chunk.chunk_id());
2160     const WriterID writer_id = static_cast<WriterID>(chunk.writer_id());
2161     TraceBuffer* buf =
2162         GetBufferByID(static_cast<BufferID>(chunk.target_buffer()));
2163     static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
2164                   "Add a '|| chunk_id > kMaxChunkID' below if this fails");
2165     if (!writer_id || writer_id > kMaxWriterID || !buf) {
2166       // This can genuinely happen when the trace is stopped. The producers
2167       // might see the stop signal with some delay and try to keep sending
2168       // patches left soon after.
2169       PERFETTO_DLOG(
2170           "Received invalid chunks_to_patch request from Producer: %" PRIu16
2171           ", BufferID: %" PRIu32 " ChunkdID: %" PRIu32 " WriterID: %" PRIu16,
2172           producer_id_trusted, chunk.target_buffer(), chunk_id, writer_id);
2173       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
2174       continue;
2175     }
2176 
2177     // Note, there's no need to validate that the producer is allowed to write
2178     // to the specified buffer ID (or that it's the correct buffer ID for a
2179     // registered TraceWriter). That's because TraceBuffer uses the producer ID
2180     // and writer ID to look up the chunk to patch. If the producer specifies an
2181     // incorrect buffer, this lookup will fail and TraceBuffer will ignore the
2182     // patches. Because the producer ID is trusted, there's also no way for a
2183     // malicious producer to patch another producer's data.
2184 
2185     // Speculate on the fact that there are going to be a limited amount of
2186     // patches per request, so we can allocate the |patches| array on the stack.
2187     std::array<TraceBuffer::Patch, 1024> patches;  // Uninitialized.
2188     if (chunk.patches().size() > patches.size()) {
2189       PERFETTO_ELOG("Too many patches (%zu) batched in the same request",
2190                     patches.size());
2191       PERFETTO_DFATAL("Too many patches");
2192       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
2193       continue;
2194     }
2195 
2196     size_t i = 0;
2197     for (const auto& patch : chunk.patches()) {
2198       const std::string& patch_data = patch.data();
2199       if (patch_data.size() != patches[i].data.size()) {
2200         PERFETTO_ELOG("Received patch from producer: %" PRIu16
2201                       " of unexpected size %zu",
2202                       producer_id_trusted, patch_data.size());
2203         patches_discarded_++;
2204         continue;
2205       }
2206       patches[i].offset_untrusted = patch.offset();
2207       memcpy(&patches[i].data[0], patch_data.data(), patches[i].data.size());
2208       i++;
2209     }
2210     buf->TryPatchChunkContents(producer_id_trusted, writer_id, chunk_id,
2211                                &patches[0], i, chunk.has_more_patches());
2212   }
2213 }
2214 
GetDetachedSession(uid_t uid,const std::string & key)2215 TracingServiceImpl::TracingSession* TracingServiceImpl::GetDetachedSession(
2216     uid_t uid,
2217     const std::string& key) {
2218   PERFETTO_DCHECK_THREAD(thread_checker_);
2219   for (auto& kv : tracing_sessions_) {
2220     TracingSession* session = &kv.second;
2221     if (session->consumer_uid == uid && session->detach_key == key) {
2222       PERFETTO_DCHECK(session->consumer_maybe_null == nullptr);
2223       return session;
2224     }
2225   }
2226   return nullptr;
2227 }
2228 
GetTracingSession(TracingSessionID tsid)2229 TracingServiceImpl::TracingSession* TracingServiceImpl::GetTracingSession(
2230     TracingSessionID tsid) {
2231   PERFETTO_DCHECK_THREAD(thread_checker_);
2232   auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end();
2233   if (it == tracing_sessions_.end())
2234     return nullptr;
2235   return &it->second;
2236 }
2237 
GetNextProducerID()2238 ProducerID TracingServiceImpl::GetNextProducerID() {
2239   PERFETTO_DCHECK_THREAD(thread_checker_);
2240   PERFETTO_CHECK(producers_.size() < kMaxProducerID);
2241   do {
2242     ++last_producer_id_;
2243   } while (producers_.count(last_producer_id_) || last_producer_id_ == 0);
2244   PERFETTO_DCHECK(last_producer_id_ > 0 && last_producer_id_ <= kMaxProducerID);
2245   return last_producer_id_;
2246 }
2247 
GetBufferByID(BufferID buffer_id)2248 TraceBuffer* TracingServiceImpl::GetBufferByID(BufferID buffer_id) {
2249   auto buf_iter = buffers_.find(buffer_id);
2250   if (buf_iter == buffers_.end())
2251     return nullptr;
2252   return &*buf_iter->second;
2253 }
2254 
OnStartTriggersTimeout(TracingSessionID tsid)2255 void TracingServiceImpl::OnStartTriggersTimeout(TracingSessionID tsid) {
2256   // Skip entirely the flush if the trace session doesn't exist anymore.
2257   // This is to prevent misleading error messages to be logged.
2258   //
2259   // if the trace has started from the trigger we rely on
2260   // the |stop_delay_ms| from the trigger so don't flush and
2261   // disable if we've moved beyond a CONFIGURED state
2262   auto* tracing_session_ptr = GetTracingSession(tsid);
2263   if (tracing_session_ptr &&
2264       tracing_session_ptr->state == TracingSession::CONFIGURED) {
2265     PERFETTO_DLOG("Disabling TracingSession %" PRIu64
2266                   " since no triggers activated.",
2267                   tsid);
2268     // No data should be returned from ReadBuffers() regardless of if we
2269     // call FreeBuffers() or DisableTracing(). This is because in
2270     // STOP_TRACING we need this promise in either case, and using
2271     // DisableTracing() allows a graceful shutdown. Consumers can follow
2272     // their normal path and check the buffers through ReadBuffers() and
2273     // the code won't hang because the tracing session will still be
2274     // alive just disabled.
2275     DisableTracing(tsid);
2276   }
2277 }
2278 
UpdateMemoryGuardrail()2279 void TracingServiceImpl::UpdateMemoryGuardrail() {
2280 #if PERFETTO_BUILDFLAG(PERFETTO_WATCHDOG)
2281   uint64_t total_buffer_bytes = 0;
2282 
2283   // Sum up all the shared memory buffers.
2284   for (const auto& id_to_producer : producers_) {
2285     if (id_to_producer.second->shared_memory())
2286       total_buffer_bytes += id_to_producer.second->shared_memory()->size();
2287   }
2288 
2289   // Sum up all the trace buffers.
2290   for (const auto& id_to_buffer : buffers_) {
2291     total_buffer_bytes += id_to_buffer.second->size();
2292   }
2293 
2294   // Set the guard rail to 32MB + the sum of all the buffers over a 30 second
2295   // interval.
2296   uint64_t guardrail = base::kWatchdogDefaultMemorySlack + total_buffer_bytes;
2297   base::Watchdog::GetInstance()->SetMemoryLimit(guardrail, 30 * 1000);
2298 #endif
2299 }
2300 
SnapshotSyncMarker(std::vector<TracePacket> * packets)2301 void TracingServiceImpl::SnapshotSyncMarker(std::vector<TracePacket>* packets) {
2302   // The sync marks are used to tokenize large traces efficiently.
2303   // See description in trace_packet.proto.
2304   if (sync_marker_packet_size_ == 0) {
2305     // The marker ABI expects that the marker is written after the uid.
2306     // Protozero guarantees that fields are written in the same order of the
2307     // calls. The ResynchronizeTraceStreamUsingSyncMarker test verifies the ABI.
2308     protozero::StaticBuffered<protos::pbzero::TracePacket> packet(
2309         &sync_marker_packet_[0], sizeof(sync_marker_packet_));
2310     packet->set_trusted_uid(static_cast<int32_t>(uid_));
2311     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2312 
2313     // Keep this last.
2314     packet->set_synchronization_marker(kSyncMarker, sizeof(kSyncMarker));
2315     sync_marker_packet_size_ = packet.Finalize();
2316   }
2317   packets->emplace_back();
2318   packets->back().AddSlice(&sync_marker_packet_[0], sync_marker_packet_size_);
2319 }
2320 
SnapshotClocks(std::vector<TracePacket> * packets,bool set_root_timestamp)2321 void TracingServiceImpl::SnapshotClocks(std::vector<TracePacket>* packets,
2322                                         bool set_root_timestamp) {
2323   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2324   uint64_t root_timestamp_ns = 0;
2325   auto* clock_snapshot = packet->set_clock_snapshot();
2326 
2327 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) && \
2328     !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) &&    \
2329     !PERFETTO_BUILDFLAG(PERFETTO_OS_FREEBSD) && \
2330     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
2331   struct {
2332     clockid_t id;
2333     protos::pbzero::ClockSnapshot::Clock::BuiltinClocks type;
2334     struct timespec ts;
2335   } clocks[] = {
2336       {CLOCK_BOOTTIME, protos::pbzero::ClockSnapshot::Clock::BOOTTIME, {0, 0}},
2337       {CLOCK_REALTIME_COARSE,
2338        protos::pbzero::ClockSnapshot::Clock::REALTIME_COARSE,
2339        {0, 0}},
2340       {CLOCK_MONOTONIC_COARSE,
2341        protos::pbzero::ClockSnapshot::Clock::MONOTONIC_COARSE,
2342        {0, 0}},
2343       {CLOCK_REALTIME, protos::pbzero::ClockSnapshot::Clock::REALTIME, {0, 0}},
2344       {CLOCK_MONOTONIC,
2345        protos::pbzero::ClockSnapshot::Clock::MONOTONIC,
2346        {0, 0}},
2347       {CLOCK_MONOTONIC_RAW,
2348        protos::pbzero::ClockSnapshot::Clock::MONOTONIC_RAW,
2349        {0, 0}},
2350       {CLOCK_PROCESS_CPUTIME_ID,
2351        protos::pbzero::ClockSnapshot::Clock::PROCESS_CPUTIME,
2352        {0, 0}},
2353       {CLOCK_THREAD_CPUTIME_ID,
2354        protos::pbzero::ClockSnapshot::Clock::THREAD_CPUTIME,
2355        {0, 0}},
2356   };
2357   // First snapshot all the clocks as atomically as we can.
2358   for (auto& clock : clocks) {
2359     if (clock_gettime(clock.id, &clock.ts) == -1)
2360       PERFETTO_DLOG("clock_gettime failed for clock %d", clock.id);
2361   }
2362   for (auto& clock : clocks) {
2363     if (set_root_timestamp &&
2364         clock.type == protos::pbzero::ClockSnapshot::Clock::BOOTTIME) {
2365       root_timestamp_ns =
2366           static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count());
2367     }
2368     auto* c = clock_snapshot->add_clocks();
2369     c->set_clock_id(static_cast<uint32_t>(clock.type));
2370     c->set_timestamp(
2371         static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count()));
2372   }
2373 #else  // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) &&
2374        // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) &&
2375        // !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
2376   auto wall_time_ns = static_cast<uint64_t>(base::GetWallTimeNs().count());
2377   if (set_root_timestamp)
2378     root_timestamp_ns = wall_time_ns;
2379   auto* c = clock_snapshot->add_clocks();
2380   c->set_clock_id(protos::pbzero::ClockSnapshot::Clock::MONOTONIC);
2381   c->set_timestamp(wall_time_ns);
2382   // The default trace clock is boot time, so we always need to emit a path to
2383   // it. However since we don't actually have a boot time source on these
2384   // platforms, pretend that wall time equals boot time.
2385   c = clock_snapshot->add_clocks();
2386   c->set_clock_id(protos::pbzero::ClockSnapshot::Clock::BOOTTIME);
2387   c->set_timestamp(wall_time_ns);
2388 #endif  // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) &&
2389         // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
2390 
2391   if (root_timestamp_ns)
2392     packet->set_timestamp(root_timestamp_ns);
2393   packet->set_trusted_uid(static_cast<int32_t>(uid_));
2394   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2395 
2396   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2397 }
2398 
SnapshotStats(TracingSession * tracing_session,std::vector<TracePacket> * packets)2399 void TracingServiceImpl::SnapshotStats(TracingSession* tracing_session,
2400                                        std::vector<TracePacket>* packets) {
2401   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2402   packet->set_trusted_uid(static_cast<int32_t>(uid_));
2403   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2404   GetTraceStats(tracing_session).Serialize(packet->set_trace_stats());
2405   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2406 }
2407 
GetTraceStats(TracingSession * tracing_session)2408 TraceStats TracingServiceImpl::GetTraceStats(TracingSession* tracing_session) {
2409   TraceStats trace_stats;
2410   trace_stats.set_producers_connected(static_cast<uint32_t>(producers_.size()));
2411   trace_stats.set_producers_seen(last_producer_id_);
2412   trace_stats.set_data_sources_registered(
2413       static_cast<uint32_t>(data_sources_.size()));
2414   trace_stats.set_data_sources_seen(last_data_source_instance_id_);
2415   trace_stats.set_tracing_sessions(
2416       static_cast<uint32_t>(tracing_sessions_.size()));
2417   trace_stats.set_total_buffers(static_cast<uint32_t>(buffers_.size()));
2418   trace_stats.set_chunks_discarded(chunks_discarded_);
2419   trace_stats.set_patches_discarded(patches_discarded_);
2420   trace_stats.set_invalid_packets(tracing_session->invalid_packets);
2421 
2422   for (BufferID buf_id : tracing_session->buffers_index) {
2423     TraceBuffer* buf = GetBufferByID(buf_id);
2424     if (!buf) {
2425       PERFETTO_DFATAL("Buffer not found.");
2426       continue;
2427     }
2428     *trace_stats.add_buffer_stats() = buf->stats();
2429   }  // for (buf in session).
2430   return trace_stats;
2431 }
2432 
MaybeEmitTraceConfig(TracingSession * tracing_session,std::vector<TracePacket> * packets)2433 void TracingServiceImpl::MaybeEmitTraceConfig(
2434     TracingSession* tracing_session,
2435     std::vector<TracePacket>* packets) {
2436   if (tracing_session->did_emit_config)
2437     return;
2438   tracing_session->did_emit_config = true;
2439   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2440   packet->set_trusted_uid(static_cast<int32_t>(uid_));
2441   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2442   tracing_session->config.Serialize(packet->set_trace_config());
2443   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2444 }
2445 
MaybeEmitSystemInfo(TracingSession * tracing_session,std::vector<TracePacket> * packets)2446 void TracingServiceImpl::MaybeEmitSystemInfo(
2447     TracingSession* tracing_session,
2448     std::vector<TracePacket>* packets) {
2449   if (tracing_session->did_emit_system_info)
2450     return;
2451   tracing_session->did_emit_system_info = true;
2452   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2453   auto* info = packet->set_system_info();
2454   base::ignore_result(info);  // For PERFETTO_OS_WIN.
2455 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
2456     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
2457   struct utsname uname_info;
2458   if (uname(&uname_info) == 0) {
2459     auto* utsname_info = info->set_utsname();
2460     utsname_info->set_sysname(uname_info.sysname);
2461     utsname_info->set_version(uname_info.version);
2462     utsname_info->set_machine(uname_info.machine);
2463     utsname_info->set_release(uname_info.release);
2464   }
2465 #endif  // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
2466 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2467   char value[PROP_VALUE_MAX];
2468   if (__system_property_get("ro.build.fingerprint", value)) {
2469     info->set_android_build_fingerprint(value);
2470   } else {
2471     PERFETTO_ELOG("Unable to read ro.build.fingerprint");
2472   }
2473 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2474   packet->set_trusted_uid(static_cast<int32_t>(uid_));
2475   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2476   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2477 }
2478 
MaybeEmitReceivedTriggers(TracingSession * tracing_session,std::vector<TracePacket> * packets)2479 void TracingServiceImpl::MaybeEmitReceivedTriggers(
2480     TracingSession* tracing_session,
2481     std::vector<TracePacket>* packets) {
2482   PERFETTO_DCHECK(tracing_session->num_triggers_emitted_into_trace <=
2483                   tracing_session->received_triggers.size());
2484   for (size_t i = tracing_session->num_triggers_emitted_into_trace;
2485        i < tracing_session->received_triggers.size(); ++i) {
2486     const auto& info = tracing_session->received_triggers[i];
2487     protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2488     auto* trigger = packet->set_trigger();
2489     trigger->set_trigger_name(info.trigger_name);
2490     trigger->set_producer_name(info.producer_name);
2491     trigger->set_trusted_producer_uid(static_cast<int32_t>(info.producer_uid));
2492 
2493     packet->set_timestamp(info.boot_time_ns);
2494     packet->set_trusted_uid(static_cast<int32_t>(uid_));
2495     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2496     SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2497     ++tracing_session->num_triggers_emitted_into_trace;
2498   }
2499 }
2500 
2501 ////////////////////////////////////////////////////////////////////////////////
2502 // TracingServiceImpl::ConsumerEndpointImpl implementation
2503 ////////////////////////////////////////////////////////////////////////////////
2504 
ConsumerEndpointImpl(TracingServiceImpl * service,base::TaskRunner * task_runner,Consumer * consumer,uid_t uid)2505 TracingServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl(
2506     TracingServiceImpl* service,
2507     base::TaskRunner* task_runner,
2508     Consumer* consumer,
2509     uid_t uid)
2510     : task_runner_(task_runner),
2511       service_(service),
2512       consumer_(consumer),
2513       uid_(uid),
2514       weak_ptr_factory_(this) {}
2515 
~ConsumerEndpointImpl()2516 TracingServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() {
2517   service_->DisconnectConsumer(this);
2518   consumer_->OnDisconnect();
2519 }
2520 
NotifyOnTracingDisabled()2521 void TracingServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled() {
2522   PERFETTO_DCHECK_THREAD(thread_checker_);
2523   auto weak_this = GetWeakPtr();
2524   task_runner_->PostTask([weak_this] {
2525     if (weak_this)
2526       weak_this->consumer_->OnTracingDisabled();
2527   });
2528 }
2529 
EnableTracing(const TraceConfig & cfg,base::ScopedFile fd)2530 void TracingServiceImpl::ConsumerEndpointImpl::EnableTracing(
2531     const TraceConfig& cfg,
2532     base::ScopedFile fd) {
2533   PERFETTO_DCHECK_THREAD(thread_checker_);
2534   if (!service_->EnableTracing(this, cfg, std::move(fd)))
2535     NotifyOnTracingDisabled();
2536 }
2537 
ChangeTraceConfig(const TraceConfig & cfg)2538 void TracingServiceImpl::ConsumerEndpointImpl::ChangeTraceConfig(
2539     const TraceConfig& cfg) {
2540   if (!tracing_session_id_) {
2541     PERFETTO_LOG(
2542         "Consumer called ChangeTraceConfig() but tracing was "
2543         "not active");
2544     return;
2545   }
2546   service_->ChangeTraceConfig(this, cfg);
2547 }
2548 
StartTracing()2549 void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
2550   PERFETTO_DCHECK_THREAD(thread_checker_);
2551   if (!tracing_session_id_) {
2552     PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
2553     return;
2554   }
2555   service_->StartTracing(tracing_session_id_);
2556 }
2557 
DisableTracing()2558 void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
2559   PERFETTO_DCHECK_THREAD(thread_checker_);
2560   if (!tracing_session_id_) {
2561     PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
2562     return;
2563   }
2564   service_->DisableTracing(tracing_session_id_);
2565 }
2566 
ReadBuffers()2567 void TracingServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
2568   PERFETTO_DCHECK_THREAD(thread_checker_);
2569   if (!tracing_session_id_) {
2570     PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
2571     consumer_->OnTraceData({}, /* has_more = */ false);
2572     return;
2573   }
2574   if (!service_->ReadBuffers(tracing_session_id_, this)) {
2575     consumer_->OnTraceData({}, /* has_more = */ false);
2576   }
2577 }
2578 
FreeBuffers()2579 void TracingServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
2580   PERFETTO_DCHECK_THREAD(thread_checker_);
2581   if (!tracing_session_id_) {
2582     PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
2583     return;
2584   }
2585   service_->FreeBuffers(tracing_session_id_);
2586   tracing_session_id_ = 0;
2587 }
2588 
Flush(uint32_t timeout_ms,FlushCallback callback)2589 void TracingServiceImpl::ConsumerEndpointImpl::Flush(uint32_t timeout_ms,
2590                                                      FlushCallback callback) {
2591   PERFETTO_DCHECK_THREAD(thread_checker_);
2592   if (!tracing_session_id_) {
2593     PERFETTO_LOG("Consumer called Flush() but tracing was not active");
2594     return;
2595   }
2596   service_->Flush(tracing_session_id_, timeout_ms, callback);
2597 }
2598 
Detach(const std::string & key)2599 void TracingServiceImpl::ConsumerEndpointImpl::Detach(const std::string& key) {
2600   PERFETTO_DCHECK_THREAD(thread_checker_);
2601   bool success = service_->DetachConsumer(this, key);
2602   auto weak_this = GetWeakPtr();
2603   task_runner_->PostTask([weak_this, success] {
2604     if (weak_this)
2605       weak_this->consumer_->OnDetach(success);
2606   });
2607 }
2608 
Attach(const std::string & key)2609 void TracingServiceImpl::ConsumerEndpointImpl::Attach(const std::string& key) {
2610   PERFETTO_DCHECK_THREAD(thread_checker_);
2611   bool success = service_->AttachConsumer(this, key);
2612   auto weak_this = GetWeakPtr();
2613   task_runner_->PostTask([weak_this, success] {
2614     if (!weak_this)
2615       return;
2616     Consumer* consumer = weak_this->consumer_;
2617     TracingSession* session =
2618         weak_this->service_->GetTracingSession(weak_this->tracing_session_id_);
2619     if (!session) {
2620       consumer->OnAttach(false, TraceConfig());
2621       return;
2622     }
2623     consumer->OnAttach(success, session->config);
2624   });
2625 }
2626 
GetTraceStats()2627 void TracingServiceImpl::ConsumerEndpointImpl::GetTraceStats() {
2628   PERFETTO_DCHECK_THREAD(thread_checker_);
2629   bool success = false;
2630   TraceStats stats;
2631   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
2632   if (session) {
2633     success = true;
2634     stats = service_->GetTraceStats(session);
2635   }
2636   auto weak_this = GetWeakPtr();
2637   task_runner_->PostTask([weak_this, success, stats] {
2638     if (weak_this)
2639       weak_this->consumer_->OnTraceStats(success, stats);
2640   });
2641 }
2642 
ObserveEvents(uint32_t events_mask)2643 void TracingServiceImpl::ConsumerEndpointImpl::ObserveEvents(
2644     uint32_t events_mask) {
2645   PERFETTO_DCHECK_THREAD(thread_checker_);
2646   observable_events_mask_ = events_mask;
2647   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
2648   if (!session)
2649     return;
2650 
2651   if (observable_events_mask_ & ObservableEvents::TYPE_DATA_SOURCES_INSTANCES) {
2652     // Issue initial states.
2653     for (const auto& kv : session->data_source_instances) {
2654       ProducerEndpointImpl* producer = service_->GetProducer(kv.first);
2655       PERFETTO_DCHECK(producer);
2656       OnDataSourceInstanceStateChange(*producer, kv.second);
2657     }
2658   }
2659 }
2660 
OnDataSourceInstanceStateChange(const ProducerEndpointImpl & producer,const DataSourceInstance & instance)2661 void TracingServiceImpl::ConsumerEndpointImpl::OnDataSourceInstanceStateChange(
2662     const ProducerEndpointImpl& producer,
2663     const DataSourceInstance& instance) {
2664   if (!(observable_events_mask_ &
2665         ObservableEvents::TYPE_DATA_SOURCES_INSTANCES)) {
2666     return;
2667   }
2668 
2669   if (instance.state != DataSourceInstance::CONFIGURED &&
2670       instance.state != DataSourceInstance::STARTED &&
2671       instance.state != DataSourceInstance::STOPPED) {
2672     return;
2673   }
2674 
2675   auto* observable_events = AddObservableEvents();
2676   auto* change = observable_events->add_instance_state_changes();
2677   change->set_producer_name(producer.name_);
2678   change->set_data_source_name(instance.data_source_name);
2679   if (instance.state == DataSourceInstance::STARTED) {
2680     change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED);
2681   } else {
2682     change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STOPPED);
2683   }
2684 }
2685 
2686 base::WeakPtr<TracingServiceImpl::ConsumerEndpointImpl>
GetWeakPtr()2687 TracingServiceImpl::ConsumerEndpointImpl::GetWeakPtr() {
2688   PERFETTO_DCHECK_THREAD(thread_checker_);
2689   return weak_ptr_factory_.GetWeakPtr();
2690 }
2691 
2692 ObservableEvents*
AddObservableEvents()2693 TracingServiceImpl::ConsumerEndpointImpl::AddObservableEvents() {
2694   PERFETTO_DCHECK_THREAD(thread_checker_);
2695   if (!observable_events_) {
2696     observable_events_.reset(new ObservableEvents());
2697     auto weak_this = GetWeakPtr();
2698     task_runner_->PostTask([weak_this] {
2699       if (!weak_this)
2700         return;
2701 
2702       // Move into a temporary to allow reentrancy in OnObservableEvents.
2703       auto observable_events = std::move(weak_this->observable_events_);
2704       weak_this->consumer_->OnObservableEvents(*observable_events);
2705     });
2706   }
2707   return observable_events_.get();
2708 }
2709 
QueryServiceState(QueryServiceStateCallback callback)2710 void TracingServiceImpl::ConsumerEndpointImpl::QueryServiceState(
2711     QueryServiceStateCallback callback) {
2712   PERFETTO_DCHECK_THREAD(thread_checker_);
2713   TracingServiceState svc_state;
2714 
2715   const auto& sessions = service_->tracing_sessions_;
2716   svc_state.set_num_sessions(static_cast<int>(sessions.size()));
2717 
2718   int num_started = 0;
2719   for (const auto& kv : sessions)
2720     num_started += kv.second.state == TracingSession::State::STARTED ? 1 : 0;
2721   svc_state.set_num_sessions_started(static_cast<int>(num_started));
2722 
2723   for (const auto& kv : service_->producers_) {
2724     auto* producer = svc_state.add_producers();
2725     producer->set_id(static_cast<int>(kv.first));
2726     producer->set_name(kv.second->name_);
2727     producer->set_uid(static_cast<int32_t>(producer->uid()));
2728   }
2729 
2730   for (const auto& kv : service_->data_sources_) {
2731     const auto& registered_data_source = kv.second;
2732     auto* data_source = svc_state.add_data_sources();
2733     *data_source->mutable_ds_descriptor() = registered_data_source.descriptor;
2734     data_source->set_producer_id(
2735         static_cast<int>(registered_data_source.producer_id));
2736   }
2737   callback(/*success=*/true, svc_state);
2738 }
2739 
2740 ////////////////////////////////////////////////////////////////////////////////
2741 // TracingServiceImpl::ProducerEndpointImpl implementation
2742 ////////////////////////////////////////////////////////////////////////////////
2743 
ProducerEndpointImpl(ProducerID id,uid_t uid,TracingServiceImpl * service,base::TaskRunner * task_runner,Producer * producer,const std::string & producer_name,bool in_process,bool smb_scraping_enabled)2744 TracingServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
2745     ProducerID id,
2746     uid_t uid,
2747     TracingServiceImpl* service,
2748     base::TaskRunner* task_runner,
2749     Producer* producer,
2750     const std::string& producer_name,
2751     bool in_process,
2752     bool smb_scraping_enabled)
2753     : id_(id),
2754       uid_(uid),
2755       service_(service),
2756       task_runner_(task_runner),
2757       producer_(producer),
2758       name_(producer_name),
2759       in_process_(in_process),
2760       smb_scraping_enabled_(smb_scraping_enabled),
2761       weak_ptr_factory_(this) {}
2762 
~ProducerEndpointImpl()2763 TracingServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
2764   service_->DisconnectProducer(id_);
2765   producer_->OnDisconnect();
2766 }
2767 
RegisterDataSource(const DataSourceDescriptor & desc)2768 void TracingServiceImpl::ProducerEndpointImpl::RegisterDataSource(
2769     const DataSourceDescriptor& desc) {
2770   PERFETTO_DCHECK_THREAD(thread_checker_);
2771   if (desc.name().empty()) {
2772     PERFETTO_DLOG("Received RegisterDataSource() with empty name");
2773     return;
2774   }
2775 
2776   service_->RegisterDataSource(id_, desc);
2777 }
2778 
UnregisterDataSource(const std::string & name)2779 void TracingServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
2780     const std::string& name) {
2781   PERFETTO_DCHECK_THREAD(thread_checker_);
2782   service_->UnregisterDataSource(id_, name);
2783 }
2784 
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)2785 void TracingServiceImpl::ProducerEndpointImpl::RegisterTraceWriter(
2786     uint32_t writer_id,
2787     uint32_t target_buffer) {
2788   PERFETTO_DCHECK_THREAD(thread_checker_);
2789   PERFETTO_DCHECK(!buffer_id_for_writer(static_cast<WriterID>(writer_id)));
2790   writers_[static_cast<WriterID>(writer_id)] =
2791       static_cast<BufferID>(target_buffer);
2792 }
2793 
UnregisterTraceWriter(uint32_t writer_id)2794 void TracingServiceImpl::ProducerEndpointImpl::UnregisterTraceWriter(
2795     uint32_t writer_id) {
2796   PERFETTO_DCHECK_THREAD(thread_checker_);
2797   PERFETTO_DCHECK(buffer_id_for_writer(static_cast<WriterID>(writer_id)));
2798   writers_.erase(static_cast<WriterID>(writer_id));
2799 }
2800 
CommitData(const CommitDataRequest & req_untrusted,CommitDataCallback callback)2801 void TracingServiceImpl::ProducerEndpointImpl::CommitData(
2802     const CommitDataRequest& req_untrusted,
2803     CommitDataCallback callback) {
2804   PERFETTO_DCHECK_THREAD(thread_checker_);
2805 
2806   if (metatrace::IsEnabled(metatrace::TAG_TRACE_SERVICE)) {
2807     PERFETTO_METATRACE_COUNTER(TAG_TRACE_SERVICE, TRACE_SERVICE_COMMIT_DATA,
2808                                EncodeCommitDataRequest(id_, req_untrusted));
2809   }
2810 
2811   if (!shared_memory_) {
2812     PERFETTO_DLOG(
2813         "Attempted to commit data before the shared memory was allocated.");
2814     return;
2815   }
2816   PERFETTO_DCHECK(shmem_abi_.is_valid());
2817   for (const auto& entry : req_untrusted.chunks_to_move()) {
2818     const uint32_t page_idx = entry.page();
2819     if (page_idx >= shmem_abi_.num_pages())
2820       continue;  // A buggy or malicious producer.
2821 
2822     SharedMemoryABI::Chunk chunk =
2823         shmem_abi_.TryAcquireChunkForReading(page_idx, entry.chunk());
2824     if (!chunk.is_valid()) {
2825       PERFETTO_DLOG("Asked to move chunk %d:%d, but it's not complete",
2826                     entry.page(), entry.chunk());
2827       continue;
2828     }
2829 
2830     // TryAcquireChunkForReading() has load-acquire semantics. Once acquired,
2831     // the ABI contract expects the producer to not touch the chunk anymore
2832     // (until the service marks that as free). This is why all the reads below
2833     // are just memory_order_relaxed. Also, the code here assumes that all this
2834     // data can be malicious and just gives up if anything is malformed.
2835     BufferID buffer_id = static_cast<BufferID>(entry.target_buffer());
2836     const SharedMemoryABI::ChunkHeader& chunk_header = *chunk.header();
2837     WriterID writer_id = chunk_header.writer_id.load(std::memory_order_relaxed);
2838     ChunkID chunk_id = chunk_header.chunk_id.load(std::memory_order_relaxed);
2839     auto packets = chunk_header.packets.load(std::memory_order_relaxed);
2840     uint16_t num_fragments = packets.count;
2841     uint8_t chunk_flags = packets.flags;
2842 
2843     service_->CopyProducerPageIntoLogBuffer(
2844         id_, uid_, writer_id, chunk_id, buffer_id, num_fragments, chunk_flags,
2845         /*chunk_complete=*/true, chunk.payload_begin(), chunk.payload_size());
2846 
2847     // This one has release-store semantics.
2848     shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
2849   }  // for(chunks_to_move)
2850 
2851   service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
2852 
2853   if (req_untrusted.flush_request_id()) {
2854     service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
2855   }
2856 
2857   // Keep this invocation last. ProducerIPCService::CommitData() relies on this
2858   // callback being invoked within the same callstack and not posted. If this
2859   // changes, the code there needs to be changed accordingly.
2860   if (callback)
2861     callback();
2862 }
2863 
SetupSharedMemory(std::unique_ptr<SharedMemory> shared_memory,size_t page_size_bytes,bool provided_by_producer)2864 void TracingServiceImpl::ProducerEndpointImpl::SetupSharedMemory(
2865     std::unique_ptr<SharedMemory> shared_memory,
2866     size_t page_size_bytes,
2867     bool provided_by_producer) {
2868   PERFETTO_DCHECK(!shared_memory_ && !shmem_abi_.is_valid());
2869   PERFETTO_DCHECK(page_size_bytes % 1024 == 0);
2870 
2871   shared_memory_ = std::move(shared_memory);
2872   shared_buffer_page_size_kb_ = page_size_bytes / 1024;
2873   is_shmem_provided_by_producer_ = provided_by_producer;
2874 
2875   shmem_abi_.Initialize(reinterpret_cast<uint8_t*>(shared_memory_->start()),
2876                         shared_memory_->size(),
2877                         shared_buffer_page_size_kb() * 1024);
2878   if (in_process_) {
2879     inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl(
2880         shared_memory_->start(), shared_memory_->size(),
2881         shared_buffer_page_size_kb_ * 1024, this, task_runner_));
2882   }
2883 
2884   OnTracingSetup();
2885   service_->UpdateMemoryGuardrail();
2886 }
2887 
shared_memory() const2888 SharedMemory* TracingServiceImpl::ProducerEndpointImpl::shared_memory() const {
2889   PERFETTO_DCHECK_THREAD(thread_checker_);
2890   return shared_memory_.get();
2891 }
2892 
shared_buffer_page_size_kb() const2893 size_t TracingServiceImpl::ProducerEndpointImpl::shared_buffer_page_size_kb()
2894     const {
2895   return shared_buffer_page_size_kb_;
2896 }
2897 
ActivateTriggers(const std::vector<std::string> & triggers)2898 void TracingServiceImpl::ProducerEndpointImpl::ActivateTriggers(
2899     const std::vector<std::string>& triggers) {
2900   service_->ActivateTriggers(id_, triggers);
2901 }
2902 
StopDataSource(DataSourceInstanceID ds_inst_id)2903 void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
2904     DataSourceInstanceID ds_inst_id) {
2905   // TODO(primiano): When we'll support tearing down the SMB, at this point we
2906   // should send the Producer a TearDownTracing if all its data sources have
2907   // been disabled (see b/77532839 and aosp/655179 PS1).
2908   PERFETTO_DCHECK_THREAD(thread_checker_);
2909   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2910   task_runner_->PostTask([weak_this, ds_inst_id] {
2911     if (weak_this)
2912       weak_this->producer_->StopDataSource(ds_inst_id);
2913   });
2914 }
2915 
2916 SharedMemoryArbiter*
MaybeSharedMemoryArbiter()2917 TracingServiceImpl::ProducerEndpointImpl::MaybeSharedMemoryArbiter() {
2918   if (!inproc_shmem_arbiter_) {
2919     PERFETTO_FATAL(
2920         "The in-process SharedMemoryArbiter can only be used when "
2921         "CreateProducer has been called with in_process=true and after tracing "
2922         "has started.");
2923   }
2924 
2925   PERFETTO_DCHECK(in_process_);
2926   return inproc_shmem_arbiter_.get();
2927 }
2928 
IsShmemProvidedByProducer() const2929 bool TracingServiceImpl::ProducerEndpointImpl::IsShmemProvidedByProducer()
2930     const {
2931   return is_shmem_provided_by_producer_;
2932 }
2933 
2934 // Can be called on any thread.
2935 std::unique_ptr<TraceWriter>
CreateTraceWriter(BufferID buf_id,BufferExhaustedPolicy buffer_exhausted_policy)2936 TracingServiceImpl::ProducerEndpointImpl::CreateTraceWriter(
2937     BufferID buf_id,
2938     BufferExhaustedPolicy buffer_exhausted_policy) {
2939   PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
2940   return MaybeSharedMemoryArbiter()->CreateTraceWriter(buf_id,
2941                                                        buffer_exhausted_policy);
2942 }
2943 
NotifyFlushComplete(FlushRequestID id)2944 void TracingServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(
2945     FlushRequestID id) {
2946   PERFETTO_DCHECK_THREAD(thread_checker_);
2947   PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
2948   return MaybeSharedMemoryArbiter()->NotifyFlushComplete(id);
2949 }
2950 
OnTracingSetup()2951 void TracingServiceImpl::ProducerEndpointImpl::OnTracingSetup() {
2952   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2953   task_runner_->PostTask([weak_this] {
2954     if (weak_this)
2955       weak_this->producer_->OnTracingSetup();
2956   });
2957 }
2958 
Flush(FlushRequestID flush_request_id,const std::vector<DataSourceInstanceID> & data_sources)2959 void TracingServiceImpl::ProducerEndpointImpl::Flush(
2960     FlushRequestID flush_request_id,
2961     const std::vector<DataSourceInstanceID>& data_sources) {
2962   PERFETTO_DCHECK_THREAD(thread_checker_);
2963   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2964   task_runner_->PostTask([weak_this, flush_request_id, data_sources] {
2965     if (weak_this) {
2966       weak_this->producer_->Flush(flush_request_id, data_sources.data(),
2967                                   data_sources.size());
2968     }
2969   });
2970 }
2971 
SetupDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)2972 void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
2973     DataSourceInstanceID ds_id,
2974     const DataSourceConfig& config) {
2975   PERFETTO_DCHECK_THREAD(thread_checker_);
2976   allowed_target_buffers_.insert(static_cast<BufferID>(config.target_buffer()));
2977   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2978   task_runner_->PostTask([weak_this, ds_id, config] {
2979     if (weak_this)
2980       weak_this->producer_->SetupDataSource(ds_id, std::move(config));
2981   });
2982 }
2983 
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)2984 void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
2985     DataSourceInstanceID ds_id,
2986     const DataSourceConfig& config) {
2987   PERFETTO_DCHECK_THREAD(thread_checker_);
2988   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2989   task_runner_->PostTask([weak_this, ds_id, config] {
2990     if (weak_this)
2991       weak_this->producer_->StartDataSource(ds_id, std::move(config));
2992   });
2993 }
2994 
NotifyDataSourceStarted(DataSourceInstanceID data_source_id)2995 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStarted(
2996     DataSourceInstanceID data_source_id) {
2997   PERFETTO_DCHECK_THREAD(thread_checker_);
2998   service_->NotifyDataSourceStarted(id_, data_source_id);
2999 }
3000 
NotifyDataSourceStopped(DataSourceInstanceID data_source_id)3001 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStopped(
3002     DataSourceInstanceID data_source_id) {
3003   PERFETTO_DCHECK_THREAD(thread_checker_);
3004   service_->NotifyDataSourceStopped(id_, data_source_id);
3005 }
3006 
OnFreeBuffers(const std::vector<BufferID> & target_buffers)3007 void TracingServiceImpl::ProducerEndpointImpl::OnFreeBuffers(
3008     const std::vector<BufferID>& target_buffers) {
3009   if (allowed_target_buffers_.empty())
3010     return;
3011   for (BufferID buffer : target_buffers)
3012     allowed_target_buffers_.erase(buffer);
3013 }
3014 
ClearIncrementalState(const std::vector<DataSourceInstanceID> & data_sources)3015 void TracingServiceImpl::ProducerEndpointImpl::ClearIncrementalState(
3016     const std::vector<DataSourceInstanceID>& data_sources) {
3017   PERFETTO_DCHECK_THREAD(thread_checker_);
3018   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3019   task_runner_->PostTask([weak_this, data_sources] {
3020     if (weak_this) {
3021       weak_this->producer_->ClearIncrementalState(data_sources.data(),
3022                                                   data_sources.size());
3023     }
3024   });
3025 }
3026 
3027 ////////////////////////////////////////////////////////////////////////////////
3028 // TracingServiceImpl::TracingSession implementation
3029 ////////////////////////////////////////////////////////////////////////////////
3030 
TracingSession(TracingSessionID session_id,ConsumerEndpointImpl * consumer,const TraceConfig & new_config)3031 TracingServiceImpl::TracingSession::TracingSession(
3032     TracingSessionID session_id,
3033     ConsumerEndpointImpl* consumer,
3034     const TraceConfig& new_config)
3035     : id(session_id),
3036       consumer_maybe_null(consumer),
3037       consumer_uid(consumer->uid_),
3038       config(new_config) {}
3039 
3040 }  // namespace perfetto
3041