1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/core/shared_memory_arbiter_impl.h"
18
19 #include <bitset>
20 #include "perfetto/ext/base/utils.h"
21 #include "perfetto/ext/tracing/core/basic_types.h"
22 #include "perfetto/ext/tracing/core/commit_data_request.h"
23 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
24 #include "perfetto/ext/tracing/core/trace_packet.h"
25 #include "perfetto/ext/tracing/core/trace_writer.h"
26 #include "src/base/test/gtest_test_suite.h"
27 #include "src/base/test/test_task_runner.h"
28 #include "src/tracing/core/patch_list.h"
29 #include "src/tracing/test/aligned_buffer_test.h"
30 #include "src/tracing/test/fake_producer_endpoint.h"
31 #include "test/gtest_and_gmock.h"
32
33 #include "protos/perfetto/trace/test_event.pbzero.h"
34 #include "protos/perfetto/trace/trace_packet.pbzero.h"
35
36 namespace perfetto {
37
38 using testing::Invoke;
39 using testing::_;
40
41 class MockProducerEndpoint : public TracingService::ProducerEndpoint {
42 public:
RegisterDataSource(const DataSourceDescriptor &)43 void RegisterDataSource(const DataSourceDescriptor&) override {}
UnregisterDataSource(const std::string &)44 void UnregisterDataSource(const std::string&) override {}
NotifyFlushComplete(FlushRequestID)45 void NotifyFlushComplete(FlushRequestID) override {}
NotifyDataSourceStarted(DataSourceInstanceID)46 void NotifyDataSourceStarted(DataSourceInstanceID) override {}
NotifyDataSourceStopped(DataSourceInstanceID)47 void NotifyDataSourceStopped(DataSourceInstanceID) override {}
ActivateTriggers(const std::vector<std::string> &)48 void ActivateTriggers(const std::vector<std::string>&) {}
shared_memory() const49 SharedMemory* shared_memory() const override { return nullptr; }
shared_buffer_page_size_kb() const50 size_t shared_buffer_page_size_kb() const override { return 0; }
CreateTraceWriter(BufferID,BufferExhaustedPolicy)51 std::unique_ptr<TraceWriter> CreateTraceWriter(
52 BufferID,
53 BufferExhaustedPolicy) override {
54 return nullptr;
55 }
MaybeSharedMemoryArbiter()56 SharedMemoryArbiter* MaybeSharedMemoryArbiter() override { return nullptr; }
IsShmemProvidedByProducer() const57 bool IsShmemProvidedByProducer() const override { return false; }
58
59 MOCK_METHOD2(CommitData, void(const CommitDataRequest&, CommitDataCallback));
60 MOCK_METHOD2(RegisterTraceWriter, void(uint32_t, uint32_t));
61 MOCK_METHOD1(UnregisterTraceWriter, void(uint32_t));
62 };
63
64 class SharedMemoryArbiterImplTest : public AlignedBufferTest {
65 public:
SetUp()66 void SetUp() override {
67 AlignedBufferTest::SetUp();
68 task_runner_.reset(new base::TestTaskRunner());
69 arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
70 &mock_producer_endpoint_,
71 task_runner_.get()));
72 }
73
IsArbiterFullyBound()74 bool IsArbiterFullyBound() { return arbiter_->fully_bound_; }
75
TearDown()76 void TearDown() override {
77 arbiter_.reset();
78 task_runner_.reset();
79 }
80
81 std::unique_ptr<base::TestTaskRunner> task_runner_;
82 std::unique_ptr<SharedMemoryArbiterImpl> arbiter_;
83 MockProducerEndpoint mock_producer_endpoint_;
84 std::function<void(const std::vector<uint32_t>&)> on_pages_complete_;
85 };
86
87 size_t const kPageSizes[] = {4096, 65536};
88 INSTANTIATE_TEST_SUITE_P(PageSize,
89 SharedMemoryArbiterImplTest,
90 ::testing::ValuesIn(kPageSizes));
91
92 // The buffer has 14 pages (kNumPages), each will be partitioned in 14 chunks.
93 // The test requests 30 chunks (2 full pages + 2 chunks from a 3rd page) and
94 // releases them in different batches. It tests the consistency of the batches
95 // and the releasing order.
TEST_P(SharedMemoryArbiterImplTest,GetAndReturnChunks)96 TEST_P(SharedMemoryArbiterImplTest, GetAndReturnChunks) {
97 SharedMemoryArbiterImpl::set_default_layout_for_testing(
98 SharedMemoryABI::PageLayout::kPageDiv14);
99 static constexpr size_t kTotChunks = kNumPages * 14;
100 SharedMemoryABI::Chunk chunks[kTotChunks];
101 for (size_t i = 0; i < 14 * 2 + 2; i++) {
102 chunks[i] = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kStall);
103 ASSERT_TRUE(chunks[i].is_valid());
104 }
105
106 // Finally return the first 28 chunks (full 2 pages) and only the 2nd chunk of
107 // the 2rd page. Chunks are release in interleaved order: 1,0,3,2,5,4,7,6.
108 // Check that the notification callback is posted and order is consistent.
109 auto on_commit_1 = task_runner_->CreateCheckpoint("on_commit_1");
110 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
111 .WillOnce(Invoke([on_commit_1](const CommitDataRequest& req,
112 MockProducerEndpoint::CommitDataCallback) {
113 ASSERT_EQ(14 * 2 + 1, req.chunks_to_move_size());
114 for (size_t i = 0; i < 14 * 2; i++) {
115 ASSERT_EQ(i / 14, req.chunks_to_move()[i].page());
116 ASSERT_EQ((i % 14) ^ 1, req.chunks_to_move()[i].chunk());
117 ASSERT_EQ(i % 5 + 1, req.chunks_to_move()[i].target_buffer());
118 }
119 ASSERT_EQ(2u, req.chunks_to_move()[28].page());
120 ASSERT_EQ(1u, req.chunks_to_move()[28].chunk());
121 ASSERT_EQ(42u, req.chunks_to_move()[28].target_buffer());
122 on_commit_1();
123 }));
124 PatchList ignored;
125 for (size_t i = 0; i < 14 * 2; i++) {
126 arbiter_->ReturnCompletedChunk(std::move(chunks[i ^ 1]), i % 5 + 1,
127 &ignored);
128 }
129 arbiter_->ReturnCompletedChunk(std::move(chunks[29]), 42, &ignored);
130 task_runner_->RunUntilCheckpoint("on_commit_1");
131
132 // Then release the 1st chunk of the 3rd page, and check that we get a
133 // notification for that as well.
134 auto on_commit_2 = task_runner_->CreateCheckpoint("on_commit_2");
135 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
136 .WillOnce(Invoke([on_commit_2](const CommitDataRequest& req,
137 MockProducerEndpoint::CommitDataCallback) {
138 ASSERT_EQ(1, req.chunks_to_move_size());
139 ASSERT_EQ(2u, req.chunks_to_move()[0].page());
140 ASSERT_EQ(0u, req.chunks_to_move()[0].chunk());
141 ASSERT_EQ(43u, req.chunks_to_move()[0].target_buffer());
142 on_commit_2();
143 }));
144 arbiter_->ReturnCompletedChunk(std::move(chunks[28]), 43, &ignored);
145 task_runner_->RunUntilCheckpoint("on_commit_2");
146 }
147
148 // Helper for verifying trace writer id allocations.
149 class TraceWriterIdChecker : public FakeProducerEndpoint {
150 public:
TraceWriterIdChecker(std::function<void ()> checkpoint)151 TraceWriterIdChecker(std::function<void()> checkpoint)
152 : checkpoint_(std::move(checkpoint)) {}
153
RegisterTraceWriter(uint32_t id,uint32_t)154 void RegisterTraceWriter(uint32_t id, uint32_t) override {
155 EXPECT_GT(id, 0u);
156 EXPECT_LE(id, kMaxWriterID);
157 if (id > 0 && id <= kMaxWriterID) {
158 registered_ids_.set(id - 1);
159 }
160 }
161
UnregisterTraceWriter(uint32_t id)162 void UnregisterTraceWriter(uint32_t id) override {
163 if (++unregister_calls_ == kMaxWriterID)
164 checkpoint_();
165
166 EXPECT_GT(id, 0u);
167 EXPECT_LE(id, kMaxWriterID);
168 if (id > 0 && id <= kMaxWriterID) {
169 unregistered_ids_.set(id - 1);
170 }
171 }
172
173 // bit N corresponds to id N+1
174 std::bitset<kMaxWriterID> registered_ids_;
175 std::bitset<kMaxWriterID> unregistered_ids_;
176
177 int unregister_calls_ = 0;
178
179 private:
180 std::function<void()> checkpoint_;
181 };
182
183 // Check that we can actually create up to kMaxWriterID TraceWriter(s).
TEST_P(SharedMemoryArbiterImplTest,WriterIDsAllocation)184 TEST_P(SharedMemoryArbiterImplTest, WriterIDsAllocation) {
185 auto checkpoint = task_runner_->CreateCheckpoint("last_unregistered");
186
187 TraceWriterIdChecker id_checking_endpoint(checkpoint);
188 arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
189 &id_checking_endpoint,
190 task_runner_.get()));
191 {
192 std::map<WriterID, std::unique_ptr<TraceWriter>> writers;
193
194 for (size_t i = 0; i < kMaxWriterID; i++) {
195 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(1);
196 ASSERT_TRUE(writer);
197 WriterID writer_id = writer->writer_id();
198 ASSERT_TRUE(writers.emplace(writer_id, std::move(writer)).second);
199 }
200
201 // A further call should return a null impl of trace writer as we exhausted
202 // writer IDs.
203 ASSERT_EQ(arbiter_->CreateTraceWriter(1)->writer_id(), 0);
204 }
205
206 // This should run the Register/UnregisterTraceWriter tasks enqueued by the
207 // memory arbiter.
208 task_runner_->RunUntilCheckpoint("last_unregistered", 15000);
209
210 EXPECT_TRUE(id_checking_endpoint.registered_ids_.all());
211 EXPECT_TRUE(id_checking_endpoint.unregistered_ids_.all());
212 }
213
214 // Verify that getting a new chunk doesn't stall when kDrop policy is chosen.
TEST_P(SharedMemoryArbiterImplTest,BufferExhaustedPolicyDrop)215 TEST_P(SharedMemoryArbiterImplTest, BufferExhaustedPolicyDrop) {
216 // Grab all chunks in the SMB.
217 SharedMemoryArbiterImpl::set_default_layout_for_testing(
218 SharedMemoryABI::PageLayout::kPageDiv1);
219 static constexpr size_t kTotChunks = kNumPages;
220 SharedMemoryABI::Chunk chunks[kTotChunks];
221 for (size_t i = 0; i < kTotChunks; i++) {
222 chunks[i] = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDrop);
223 ASSERT_TRUE(chunks[i].is_valid());
224 }
225
226 // SMB is exhausted, thus GetNewChunk() should return an invalid chunk. In
227 // kStall mode, this would stall.
228 SharedMemoryABI::Chunk invalid_chunk =
229 arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDrop);
230 ASSERT_FALSE(invalid_chunk.is_valid());
231
232 // Returning the chunk is not enough to be able to reacquire it.
233 PatchList ignored;
234 arbiter_->ReturnCompletedChunk(std::move(chunks[0]), 1, &ignored);
235
236 invalid_chunk = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDrop);
237 ASSERT_FALSE(invalid_chunk.is_valid());
238
239 // After releasing the chunk as free, we can reacquire it.
240 chunks[0] =
241 arbiter_->shmem_abi_for_testing()->TryAcquireChunkForReading(0, 0);
242 ASSERT_TRUE(chunks[0].is_valid());
243 arbiter_->shmem_abi_for_testing()->ReleaseChunkAsFree(std::move(chunks[0]));
244
245 chunks[0] = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDrop);
246 ASSERT_TRUE(chunks[0].is_valid());
247 }
248
TEST_P(SharedMemoryArbiterImplTest,CreateUnboundAndBind)249 TEST_P(SharedMemoryArbiterImplTest, CreateUnboundAndBind) {
250 auto checkpoint_writer = task_runner_->CreateCheckpoint("writer_registered");
251 auto checkpoint_flush = task_runner_->CreateCheckpoint("flush_completed");
252
253 // Create an unbound arbiter and bind immediately.
254 arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
255 nullptr, nullptr));
256 arbiter_->BindToProducerEndpoint(&mock_producer_endpoint_,
257 task_runner_.get());
258 EXPECT_TRUE(IsArbiterFullyBound());
259
260 // Trace writer should be registered in a non-delayed task.
261 EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, 1))
262 .WillOnce(testing::InvokeWithoutArgs(checkpoint_writer));
263 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(1);
264 task_runner_->RunUntilCheckpoint("writer_registered", 5000);
265
266 // Commits/flushes should be sent right away.
267 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
268 .WillOnce(testing::InvokeArgument<1>());
269 writer->Flush(checkpoint_flush);
270 task_runner_->RunUntilCheckpoint("flush_completed", 5000);
271 }
272
TEST_P(SharedMemoryArbiterImplTest,StartupTracing)273 TEST_P(SharedMemoryArbiterImplTest, StartupTracing) {
274 constexpr uint16_t kTargetBufferReservationId1 = 1;
275 constexpr uint16_t kTargetBufferReservationId2 = 2;
276
277 // Create an unbound arbiter and a startup writer.
278 arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
279 nullptr, nullptr));
280 std::unique_ptr<TraceWriter> writer =
281 arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId1);
282
283 // Write two packet while unbound and flush the chunk after each packet. The
284 // writer will return the chunk to the arbiter and grab a new chunk for the
285 // second packet. The flush should only add the chunk into the queued commit
286 // request.
287 for (int i = 0; i < 2; i++) {
288 {
289 auto packet = writer->NewTracePacket();
290 packet->set_for_testing()->set_str("foo");
291 }
292 writer->Flush();
293 }
294
295 // Bind to producer endpoint. This should not register the trace writer yet,
296 // because it's buffer reservation is still unbound.
297 arbiter_->BindToProducerEndpoint(&mock_producer_endpoint_,
298 task_runner_.get());
299 EXPECT_FALSE(IsArbiterFullyBound());
300
301 // Write another packet into another chunk and queue it.
302 {
303 auto packet = writer->NewTracePacket();
304 packet->set_for_testing()->set_str("foo");
305 }
306 bool flush_completed = false;
307 writer->Flush([&flush_completed] { flush_completed = true; });
308
309 // Bind the buffer reservation to a buffer. Trace writer should be registered
310 // and queued commits flushed.
311 EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, 42));
312 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
313 .WillOnce(Invoke([](const CommitDataRequest& req,
314 MockProducerEndpoint::CommitDataCallback callback) {
315 ASSERT_EQ(3, req.chunks_to_move_size());
316 EXPECT_EQ(42u, req.chunks_to_move()[0].target_buffer());
317 EXPECT_EQ(42u, req.chunks_to_move()[1].target_buffer());
318 EXPECT_EQ(42u, req.chunks_to_move()[2].target_buffer());
319 callback();
320 }));
321
322 arbiter_->BindStartupTargetBuffer(kTargetBufferReservationId1, 42);
323 EXPECT_TRUE(IsArbiterFullyBound());
324
325 testing::Mock::VerifyAndClearExpectations(&mock_producer_endpoint_);
326 EXPECT_TRUE(flush_completed);
327
328 // Creating a new startup writer for the same buffer posts an immediate task
329 // to register it.
330 auto checkpoint_register1b =
331 task_runner_->CreateCheckpoint("writer1b_registered");
332 EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, 42))
333 .WillOnce(testing::InvokeWithoutArgs(checkpoint_register1b));
334 std::unique_ptr<TraceWriter> writer1b =
335 arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId1);
336 task_runner_->RunUntilCheckpoint("writer1b_registered", 5000);
337
338 // And a commit on this new writer should be flushed to the right buffer, too.
339 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
340 .WillOnce(Invoke([](const CommitDataRequest& req,
341 MockProducerEndpoint::CommitDataCallback callback) {
342 ASSERT_EQ(1, req.chunks_to_move_size());
343 EXPECT_EQ(42u, req.chunks_to_move()[0].target_buffer());
344 callback();
345 }));
346 {
347 auto packet = writer1b->NewTracePacket();
348 packet->set_for_testing()->set_str("foo");
349 }
350 flush_completed = false;
351 writer1b->Flush([&flush_completed] { flush_completed = true; });
352
353 testing::Mock::VerifyAndClearExpectations(&mock_producer_endpoint_);
354 EXPECT_TRUE(flush_completed);
355
356 // Create another startup writer for another target buffer, which puts the
357 // arbiter back into unbound state.
358 std::unique_ptr<TraceWriter> writer2 =
359 arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId2);
360 EXPECT_FALSE(IsArbiterFullyBound());
361
362 // Write a chunk into both writers. Both should be queued up into the next
363 // commit request.
364 {
365 auto packet = writer->NewTracePacket();
366 packet->set_for_testing()->set_str("foo");
367 }
368 writer->Flush();
369 {
370 auto packet = writer2->NewTracePacket();
371 packet->set_for_testing()->set_str("bar");
372 }
373 flush_completed = false;
374 writer2->Flush([&flush_completed] { flush_completed = true; });
375
376 // Destroy the first trace writer, which should cause the arbiter to post a
377 // task to unregister it.
378 auto checkpoint_writer =
379 task_runner_->CreateCheckpoint("writer_unregistered");
380 EXPECT_CALL(mock_producer_endpoint_,
381 UnregisterTraceWriter(writer->writer_id()))
382 .WillOnce(testing::InvokeWithoutArgs(checkpoint_writer));
383 writer.reset();
384 task_runner_->RunUntilCheckpoint("writer_unregistered", 5000);
385
386 // Bind the second buffer reservation to a buffer. Second trace writer should
387 // be registered and queued commits flushed.
388 EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, 23));
389 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
390 .WillOnce(Invoke([](const CommitDataRequest& req,
391 MockProducerEndpoint::CommitDataCallback callback) {
392 ASSERT_EQ(2, req.chunks_to_move_size());
393 EXPECT_EQ(42u, req.chunks_to_move()[0].target_buffer());
394 EXPECT_EQ(23u, req.chunks_to_move()[1].target_buffer());
395 callback();
396 }));
397
398 arbiter_->BindStartupTargetBuffer(kTargetBufferReservationId2, 23);
399 EXPECT_TRUE(IsArbiterFullyBound());
400
401 testing::Mock::VerifyAndClearExpectations(&mock_producer_endpoint_);
402 EXPECT_TRUE(flush_completed);
403 }
404
TEST_P(SharedMemoryArbiterImplTest,AbortStartupTracingForReservation)405 TEST_P(SharedMemoryArbiterImplTest, AbortStartupTracingForReservation) {
406 constexpr uint16_t kTargetBufferReservationId1 = 1;
407 constexpr uint16_t kTargetBufferReservationId2 = 2;
408
409 // Create an unbound arbiter and a startup writer.
410 arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
411 nullptr, nullptr));
412 SharedMemoryABI* shmem_abi = arbiter_->shmem_abi_for_testing();
413 std::unique_ptr<TraceWriter> writer =
414 arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId1);
415 std::unique_ptr<TraceWriter> writer2 =
416 arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId1);
417
418 // Write two packet while unbound and flush the chunk after each packet. The
419 // writer will return the chunk to the arbiter and grab a new chunk for the
420 // second packet. The flush should only add the chunk into the queued commit
421 // request.
422 for (int i = 0; i < 2; i++) {
423 {
424 auto packet = writer->NewTracePacket();
425 packet->set_for_testing()->set_str("foo");
426 }
427 writer->Flush();
428 }
429
430 // Abort the first session. This should clear resolve the two chunks committed
431 // up to this point to an invalid target buffer (ID 0). They will remain
432 // buffered until bound to an endpoint.
433 arbiter_->AbortStartupTracingForReservation(kTargetBufferReservationId1);
434
435 // Destroy a writer that was created before the abort. This should not cause
436 // crashes.
437 writer2.reset();
438
439 // Bind to producer endpoint. The trace writer should not be registered as its
440 // target buffer is invalid. Since no startup sessions are active anymore, the
441 // arbiter should be fully bound. The commit data request is flushed.
442 EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, _)).Times(0);
443 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
444 .WillOnce(Invoke([shmem_abi](const CommitDataRequest& req,
445 MockProducerEndpoint::CommitDataCallback) {
446 ASSERT_EQ(2, req.chunks_to_move_size());
447 for (size_t i = 0; i < 2; i++) {
448 EXPECT_EQ(0u, req.chunks_to_move()[i].target_buffer());
449 SharedMemoryABI::Chunk chunk = shmem_abi->TryAcquireChunkForReading(
450 req.chunks_to_move()[i].page(), req.chunks_to_move()[i].chunk());
451 shmem_abi->ReleaseChunkAsFree(std::move(chunk));
452 }
453 }));
454 arbiter_->BindToProducerEndpoint(&mock_producer_endpoint_,
455 task_runner_.get());
456 EXPECT_TRUE(IsArbiterFullyBound());
457
458 // SMB should be free again, as no writer holds on to any chunk anymore.
459 for (size_t i = 0; i < shmem_abi->num_pages(); i++)
460 EXPECT_TRUE(shmem_abi->is_page_free(i));
461
462 // Write another packet into another chunk and commit it. It should be sent
463 // to the arbiter with invalid target buffer (ID 0).
464 {
465 auto packet = writer->NewTracePacket();
466 packet->set_for_testing()->set_str("foo");
467 }
468 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
469 .WillOnce(Invoke([shmem_abi](
470 const CommitDataRequest& req,
471 MockProducerEndpoint::CommitDataCallback callback) {
472 ASSERT_EQ(1, req.chunks_to_move_size());
473 EXPECT_EQ(0u, req.chunks_to_move()[0].target_buffer());
474 SharedMemoryABI::Chunk chunk = shmem_abi->TryAcquireChunkForReading(
475 req.chunks_to_move()[0].page(), req.chunks_to_move()[0].chunk());
476 shmem_abi->ReleaseChunkAsFree(std::move(chunk));
477 callback();
478 }));
479 bool flush_completed = false;
480 writer->Flush([&flush_completed] { flush_completed = true; });
481 EXPECT_TRUE(flush_completed);
482
483 // Creating a new startup writer for the same buffer does not cause it to
484 // register.
485 EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, _)).Times(0);
486 std::unique_ptr<TraceWriter> writer1b =
487 arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId1);
488
489 // And a commit on this new writer should again be flushed to the invalid
490 // target buffer.
491 {
492 auto packet = writer1b->NewTracePacket();
493 packet->set_for_testing()->set_str("foo");
494 }
495 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
496 .WillOnce(Invoke([shmem_abi](
497 const CommitDataRequest& req,
498 MockProducerEndpoint::CommitDataCallback callback) {
499 ASSERT_EQ(1, req.chunks_to_move_size());
500 EXPECT_EQ(0u, req.chunks_to_move()[0].target_buffer());
501 SharedMemoryABI::Chunk chunk = shmem_abi->TryAcquireChunkForReading(
502 req.chunks_to_move()[0].page(), req.chunks_to_move()[0].chunk());
503 shmem_abi->ReleaseChunkAsFree(std::move(chunk));
504 callback();
505 }));
506 flush_completed = false;
507 writer1b->Flush([&flush_completed] { flush_completed = true; });
508 EXPECT_TRUE(flush_completed);
509
510 // Create another startup writer for another target buffer, which puts the
511 // arbiter back into unbound state.
512 std::unique_ptr<TraceWriter> writer3 =
513 arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId2);
514 EXPECT_FALSE(IsArbiterFullyBound());
515
516 // Write a chunk into both writers. Both should be queued up into the next
517 // commit request.
518 {
519 auto packet = writer->NewTracePacket();
520 packet->set_for_testing()->set_str("foo");
521 }
522 writer->Flush();
523 {
524 auto packet = writer3->NewTracePacket();
525 packet->set_for_testing()->set_str("bar");
526 }
527 flush_completed = false;
528 writer3->Flush([&flush_completed] { flush_completed = true; });
529
530 // Destroy the first trace writer, which should cause the arbiter to post a
531 // task to unregister it.
532 auto checkpoint_writer =
533 task_runner_->CreateCheckpoint("writer_unregistered");
534 EXPECT_CALL(mock_producer_endpoint_,
535 UnregisterTraceWriter(writer->writer_id()))
536 .WillOnce(testing::InvokeWithoutArgs(checkpoint_writer));
537 writer.reset();
538 task_runner_->RunUntilCheckpoint("writer_unregistered", 5000);
539
540 // Abort the second session. Its commits should now also be associated with
541 // target buffer 0, and both writers' commits flushed.
542 EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, _)).Times(0);
543 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
544 .WillOnce(Invoke([shmem_abi](
545 const CommitDataRequest& req,
546 MockProducerEndpoint::CommitDataCallback callback) {
547 ASSERT_EQ(2, req.chunks_to_move_size());
548 for (size_t i = 0; i < 2; i++) {
549 EXPECT_EQ(0u, req.chunks_to_move()[i].target_buffer());
550 SharedMemoryABI::Chunk chunk = shmem_abi->TryAcquireChunkForReading(
551 req.chunks_to_move()[i].page(), req.chunks_to_move()[i].chunk());
552 shmem_abi->ReleaseChunkAsFree(std::move(chunk));
553 }
554 callback();
555 }));
556
557 arbiter_->AbortStartupTracingForReservation(kTargetBufferReservationId2);
558 EXPECT_TRUE(IsArbiterFullyBound());
559 EXPECT_TRUE(flush_completed);
560
561 // SMB should be free again, as no writer holds on to any chunk anymore.
562 for (size_t i = 0; i < shmem_abi->num_pages(); i++)
563 EXPECT_TRUE(shmem_abi->is_page_free(i));
564 }
565
566 // TODO(primiano): add multi-threaded tests.
567
568 } // namespace perfetto
569