1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 *
7 */
8
9 #include <quic/api/test/Mocks.h>
10
11 #include <folly/portability/GMock.h>
12 #include <folly/portability/GTest.h>
13
14 #include <quic/api/QuicSocket.h>
15 #include <quic/api/QuicTransportBase.h>
16 #include <quic/codec/DefaultConnectionIdAlgo.h>
17 #include <quic/common/test/TestUtils.h>
18 #include <quic/fizz/server/handshake/FizzServerQuicHandshakeContext.h>
19 #include <quic/server/state/ServerStateMachine.h>
20 #include <quic/state/DatagramHandlers.h>
21 #include <quic/state/QuicStreamFunctions.h>
22 #include <quic/state/QuicStreamUtilities.h>
23 #include <quic/state/test/Mocks.h>
24
25 #include <folly/io/async/test/MockAsyncUDPSocket.h>
26
27 using namespace testing;
28 using namespace folly;
29
30 namespace quic {
31 namespace test {
32
33 constexpr uint8_t kStreamIncrement = 0x04;
34 using ByteEvent = QuicTransportBase::ByteEvent;
35 using ByteEventCancellation = QuicTransportBase::ByteEventCancellation;
36
37 enum class TestFrameType : uint8_t {
38 STREAM,
39 CRYPTO,
40 EXPIRED_DATA,
41 REJECTED_DATA,
42 MAX_STREAMS,
43 DATAGRAM
44 };
45
46 // A made up encoding decoding of a stream.
encodeStreamBuffer(StreamId id,StreamBuffer data)47 Buf encodeStreamBuffer(StreamId id, StreamBuffer data) {
48 auto buf = IOBuf::create(10);
49 folly::io::Appender appender(buf.get(), 10);
50 appender.writeBE(static_cast<uint8_t>(TestFrameType::STREAM));
51 appender.writeBE(id);
52 auto dataBuf = data.data.move();
53 dataBuf->coalesce();
54 appender.writeBE<uint32_t>(dataBuf->length());
55 appender.push(dataBuf->coalesce());
56 appender.writeBE<uint64_t>(data.offset);
57 appender.writeBE<uint8_t>(data.eof);
58 return buf;
59 }
60
encodeCryptoBuffer(StreamBuffer data)61 Buf encodeCryptoBuffer(StreamBuffer data) {
62 auto buf = IOBuf::create(10);
63 folly::io::Appender appender(buf.get(), 10);
64 appender.writeBE(static_cast<uint8_t>(TestFrameType::CRYPTO));
65 auto dataBuf = data.data.move();
66 dataBuf->coalesce();
67 appender.writeBE<uint32_t>(dataBuf->length());
68 appender.push(dataBuf->coalesce());
69 appender.writeBE<uint64_t>(data.offset);
70 return buf;
71 }
72
73 // A made up encoding of a MaxStreamsFrame.
encodeMaxStreamsFrame(const MaxStreamsFrame & frame)74 Buf encodeMaxStreamsFrame(const MaxStreamsFrame& frame) {
75 auto buf = IOBuf::create(25);
76 folly::io::Appender appender(buf.get(), 25);
77 appender.writeBE(static_cast<uint8_t>(TestFrameType::MAX_STREAMS));
78 appender.writeBE<uint8_t>(frame.isForBidirectionalStream() ? 1 : 0);
79 appender.writeBE<uint64_t>(frame.maxStreams);
80 return buf;
81 }
82
83 // Build a datagram frame
encodeDatagramFrame(BufQueue data)84 Buf encodeDatagramFrame(BufQueue data) {
85 auto buf = IOBuf::create(10);
86 folly::io::Appender appender(buf.get(), 10);
87 appender.writeBE(static_cast<uint8_t>(TestFrameType::DATAGRAM));
88 auto dataBuf = data.move();
89 dataBuf->coalesce();
90 appender.writeBE<uint32_t>(dataBuf->length());
91 appender.push(dataBuf->coalesce());
92 return buf;
93 }
94
decodeDatagramFrame(folly::io::Cursor & cursor)95 std::pair<Buf, uint32_t> decodeDatagramFrame(folly::io::Cursor& cursor) {
96 Buf outData;
97 auto len = cursor.readBE<uint32_t>();
98 cursor.clone(outData, len);
99 return std::make_pair(std::move(outData), len);
100 }
101
decodeDataBuffer(folly::io::Cursor & cursor)102 std::pair<Buf, uint64_t> decodeDataBuffer(folly::io::Cursor& cursor) {
103 Buf outData;
104 auto len = cursor.readBE<uint32_t>();
105 cursor.clone(outData, len);
106 uint64_t offset = cursor.readBE<uint64_t>();
107 return std::make_pair(std::move(outData), offset);
108 }
109
decodeStreamBuffer(folly::io::Cursor & cursor)110 std::pair<StreamId, StreamBuffer> decodeStreamBuffer(
111 folly::io::Cursor& cursor) {
112 auto streamId = cursor.readBE<StreamId>();
113 auto dataBuffer = decodeDataBuffer(cursor);
114 bool eof = (bool)cursor.readBE<uint8_t>();
115 return std::make_pair(
116 streamId,
117 StreamBuffer(std::move(dataBuffer.first), dataBuffer.second, eof));
118 }
119
decodeCryptoBuffer(folly::io::Cursor & cursor)120 StreamBuffer decodeCryptoBuffer(folly::io::Cursor& cursor) {
121 auto dataBuffer = decodeDataBuffer(cursor);
122 return StreamBuffer(std::move(dataBuffer.first), dataBuffer.second, false);
123 }
124
decodeMaxStreamsFrame(folly::io::Cursor & cursor)125 MaxStreamsFrame decodeMaxStreamsFrame(folly::io::Cursor& cursor) {
126 bool isBidi = cursor.readBE<uint8_t>();
127 auto maxStreams = cursor.readBE<uint64_t>();
128 return MaxStreamsFrame(maxStreams, isBidi);
129 }
130
131 class TestPingCallback : public QuicSocket::PingCallback {
132 public:
pingAcknowledged()133 void pingAcknowledged() noexcept override {}
pingTimeout()134 void pingTimeout() noexcept override {}
135 };
136
137 class TestByteEventCallback : public QuicSocket::ByteEventCallback {
138 public:
139 using HashFn = std::function<size_t(const ByteEvent&)>;
140 using ComparatorFn = std::function<bool(const ByteEvent&, const ByteEvent&)>;
141
142 enum class Status { REGISTERED = 1, RECEIVED = 2, CANCELLED = 3 };
143
onByteEventRegistered(ByteEvent event)144 void onByteEventRegistered(ByteEvent event) override {
145 EXPECT_TRUE(byteEventTracker_.find(event) == byteEventTracker_.end());
146 byteEventTracker_[event] = Status::REGISTERED;
147 }
onByteEvent(ByteEvent event)148 void onByteEvent(ByteEvent event) override {
149 EXPECT_TRUE(byteEventTracker_.find(event) != byteEventTracker_.end());
150 byteEventTracker_[event] = Status::RECEIVED;
151 }
onByteEventCanceled(ByteEventCancellation cancellation)152 void onByteEventCanceled(ByteEventCancellation cancellation) override {
153 const ByteEvent& event = cancellation;
154 EXPECT_TRUE(byteEventTracker_.find(event) != byteEventTracker_.end());
155 byteEventTracker_[event] = Status::CANCELLED;
156 }
157
158 std::unordered_map<ByteEvent, Status, HashFn, ComparatorFn>
getByteEventTracker() const159 getByteEventTracker() const {
160 return byteEventTracker_;
161 }
162
163 private:
164 // Custom hash and comparator functions that use only id, offset and types
165 // (not the srtt)
__anon22113e7d0102(const ByteEvent& e) 166 HashFn hash = [](const ByteEvent& e) {
167 return folly::hash::hash_combine(e.id, e.offset, e.type);
168 };
__anon22113e7d0202(const ByteEvent& lhs, const ByteEvent& rhs) 169 ComparatorFn comparator = [](const ByteEvent& lhs, const ByteEvent& rhs) {
170 return (
171 (lhs.id == rhs.id) && (lhs.offset == rhs.offset) &&
172 (lhs.type == rhs.type));
173 };
174 std::unordered_map<ByteEvent, Status, HashFn, ComparatorFn> byteEventTracker_{
175 /* bucket count */ 4,
176 hash,
177 comparator};
178 };
179
180 static auto
getByteEventMatcher(ByteEvent::Type type,StreamId id,uint64_t offset)181 getByteEventMatcher(ByteEvent::Type type, StreamId id, uint64_t offset) {
182 return AllOf(
183 testing::Field(&ByteEvent::type, testing::Eq(type)),
184 testing::Field(&ByteEvent::id, testing::Eq(id)),
185 testing::Field(&ByteEvent::offset, testing::Eq(offset)));
186 }
187
getByteEventTrackerMatcher(ByteEvent event,TestByteEventCallback::Status status)188 static auto getByteEventTrackerMatcher(
189 ByteEvent event,
190 TestByteEventCallback::Status status) {
191 return Pair(getByteEventMatcher(event.type, event.id, event.offset), status);
192 }
193
194 class TestQuicTransport
195 : public QuicTransportBase,
196 public std::enable_shared_from_this<TestQuicTransport> {
197 public:
TestQuicTransport(folly::EventBase * evb,std::unique_ptr<folly::AsyncUDPSocket> socket,ConnectionCallback & cb)198 TestQuicTransport(
199 folly::EventBase* evb,
200 std::unique_ptr<folly::AsyncUDPSocket> socket,
201 ConnectionCallback& cb)
202 : QuicTransportBase(evb, std::move(socket)) {
203 setConnectionCallback(&cb);
204 auto conn = std::make_unique<QuicServerConnectionState>(
205 FizzServerQuicHandshakeContext::Builder().build());
206 conn->clientConnectionId = ConnectionId({10, 9, 8, 7});
207 conn->version = QuicVersion::MVFST;
208 transportConn = conn.get();
209 conn_.reset(conn.release());
210 aead = test::createNoOpAead();
211 headerCipher = test::createNoOpHeaderCipher();
212 connIdAlgo_ = std::make_unique<DefaultConnectionIdAlgo>();
213 }
214
~TestQuicTransport()215 ~TestQuicTransport() override {
216 resetConnectionCallbacks();
217 // we need to call close in the derived class.
218 closeImpl(
219 std::make_pair(
220 QuicErrorCode(LocalErrorCode::SHUTTING_DOWN),
221 std::string("shutdown")),
222 false);
223 }
224
getLossTimeoutRemainingTime() const225 std::chrono::milliseconds getLossTimeoutRemainingTime() const {
226 return lossTimeout_.getTimeRemaining();
227 }
228
onReadData(const folly::SocketAddress &,NetworkDataSingle && data)229 void onReadData(const folly::SocketAddress&, NetworkDataSingle&& data)
230 override {
231 if (!data.data) {
232 return;
233 }
234 folly::io::Cursor cursor(data.data.get());
235 while (!cursor.isAtEnd()) {
236 // create server chosen connId with processId = 0 and workerId = 0
237 ServerConnectionIdParams params(0, 0, 0);
238 conn_->serverConnectionId = *connIdAlgo_->encodeConnectionId(params);
239 auto type = static_cast<TestFrameType>(cursor.readBE<uint8_t>());
240 if (type == TestFrameType::CRYPTO) {
241 auto cryptoBuffer = decodeCryptoBuffer(cursor);
242 appendDataToReadBuffer(
243 conn_->cryptoState->initialStream, std::move(cryptoBuffer));
244 } else if (type == TestFrameType::MAX_STREAMS) {
245 auto maxStreamsFrame = decodeMaxStreamsFrame(cursor);
246 if (maxStreamsFrame.isForBidirectionalStream()) {
247 conn_->streamManager->setMaxLocalBidirectionalStreams(
248 maxStreamsFrame.maxStreams);
249 } else {
250 conn_->streamManager->setMaxLocalUnidirectionalStreams(
251 maxStreamsFrame.maxStreams);
252 }
253 } else if (type == TestFrameType::DATAGRAM) {
254 auto buffer = decodeDatagramFrame(cursor);
255 auto frame = DatagramFrame(buffer.second, std::move(buffer.first));
256 handleDatagram(*conn_, frame);
257 } else {
258 auto buffer = decodeStreamBuffer(cursor);
259 QuicStreamState* stream = conn_->streamManager->getStream(buffer.first);
260 if (!stream) {
261 continue;
262 }
263 appendDataToReadBuffer(*stream, std::move(buffer.second));
264 conn_->streamManager->updateReadableStreams(*stream);
265 conn_->streamManager->updatePeekableStreams(*stream);
266 }
267 }
268 }
269
writeData()270 void writeData() override {
271 writeQuicDataToSocket(
272 *socket_,
273 *conn_,
274 *conn_->serverConnectionId,
275 *conn_->clientConnectionId,
276 *aead,
277 *headerCipher,
278 *conn_->version,
279 conn_->transportSettings.writeConnectionDataPacketsLimit);
280 }
281
hasWriteCipher() const282 bool hasWriteCipher() const {
283 return conn_->oneRttWriteCipher != nullptr;
284 }
285
sharedGuard()286 std::shared_ptr<QuicTransportBase> sharedGuard() override {
287 return shared_from_this();
288 }
289
getConnectionState()290 QuicConnectionStateBase& getConnectionState() {
291 return *conn_;
292 }
293
closeTransport()294 void closeTransport() {
295 transportClosed = true;
296 }
297
AckTimeout()298 void AckTimeout() {
299 ackTimeoutExpired();
300 }
301
setIdleTimeout()302 void setIdleTimeout() {
303 setIdleTimer();
304 }
305
invokeIdleTimeout()306 void invokeIdleTimeout() {
307 idleTimeout_.timeoutExpired();
308 }
309
invokeAckTimeout()310 void invokeAckTimeout() {
311 ackTimeout_.timeoutExpired();
312 }
313
invokeSendPing(quic::QuicSocket::PingCallback * cb,std::chrono::milliseconds interval)314 void invokeSendPing(
315 quic::QuicSocket::PingCallback* cb,
316 std::chrono::milliseconds interval) {
317 sendPing(cb, interval);
318 }
319
invokeCancelPingTimeout()320 void invokeCancelPingTimeout() {
321 pingTimeout_.cancelTimeout();
322 }
323
invokeHandlePingCallback()324 void invokeHandlePingCallback() {
325 handlePingCallback();
326 }
327
invokeHandleKnobCallbacks()328 void invokeHandleKnobCallbacks() {
329 handleKnobCallbacks();
330 }
331
isPingTimeoutScheduled()332 bool isPingTimeoutScheduled() {
333 if (pingTimeout_.isScheduled()) {
334 return true;
335 }
336 return false;
337 }
338
writeLooper()339 auto& writeLooper() {
340 return writeLooper_;
341 }
342
unbindConnection()343 void unbindConnection() {}
344
onReadError(const folly::AsyncSocketException &)345 void onReadError(const folly::AsyncSocketException&) noexcept {}
346
addDataToStream(StreamId id,StreamBuffer data)347 void addDataToStream(StreamId id, StreamBuffer data) {
348 auto buf = encodeStreamBuffer(id, std::move(data));
349 SocketAddress addr("127.0.0.1", 1000);
350 onNetworkData(addr, NetworkData(std::move(buf), Clock::now()));
351 }
352
addCryptoData(StreamBuffer data)353 void addCryptoData(StreamBuffer data) {
354 auto buf = encodeCryptoBuffer(std::move(data));
355 SocketAddress addr("127.0.0.1", 1000);
356 onNetworkData(addr, NetworkData(std::move(buf), Clock::now()));
357 }
358
addMaxStreamsFrame(MaxStreamsFrame frame)359 void addMaxStreamsFrame(MaxStreamsFrame frame) {
360 auto buf = encodeMaxStreamsFrame(frame);
361 SocketAddress addr("127.0.0.1", 1000);
362 onNetworkData(addr, NetworkData(std::move(buf), Clock::now()));
363 }
364
addStreamReadError(StreamId id,QuicErrorCode ex)365 void addStreamReadError(StreamId id, QuicErrorCode ex) {
366 QuicStreamState* stream = conn_->streamManager->getStream(id);
367 stream->streamReadError = ex;
368 conn_->streamManager->updateReadableStreams(*stream);
369 conn_->streamManager->updatePeekableStreams(*stream);
370 // peekableStreams is updated to contain streams with streamReadError
371 updatePeekLooper();
372 updateReadLooper();
373 }
374
addDatagram(Buf data)375 void addDatagram(Buf data) {
376 auto buf = encodeDatagramFrame(std::move(data));
377 SocketAddress addr("127.0.0.1", 1000);
378 onNetworkData(addr, NetworkData(std::move(buf), Clock::now()));
379 }
380
closeStream(StreamId id)381 void closeStream(StreamId id) {
382 QuicStreamState* stream = conn_->streamManager->getStream(id);
383 stream->sendState = StreamSendState::Closed;
384 stream->recvState = StreamRecvState::Closed;
385 conn_->streamManager->addClosed(id);
386
387 auto deliveryCb = deliveryCallbacks_.find(id);
388 if (deliveryCb != deliveryCallbacks_.end()) {
389 for (auto& cbs : deliveryCb->second) {
390 ByteEvent event = {};
391 event.id = id;
392 event.offset = cbs.offset;
393 event.type = ByteEvent::Type::ACK;
394 event.srtt = stream->conn.lossState.srtt;
395 cbs.callback->onByteEvent(event);
396 if (closeState_ != CloseState::OPEN) {
397 break;
398 }
399 }
400 deliveryCallbacks_.erase(deliveryCb);
401 }
402
403 auto txCallbacksForStream = txCallbacks_.find(id);
404 if (txCallbacksForStream != txCallbacks_.end()) {
405 for (auto& cbs : txCallbacksForStream->second) {
406 ByteEvent event = {};
407 event.id = id;
408 event.offset = cbs.offset;
409 event.type = ByteEvent::Type::TX;
410 cbs.callback->onByteEvent(event);
411 if (closeState_ != CloseState::OPEN) {
412 break;
413 }
414 }
415 txCallbacks_.erase(txCallbacksForStream);
416 }
417
418 SocketAddress addr("127.0.0.1", 1000);
419 // some fake data to trigger close behavior.
420 auto buf = encodeStreamBuffer(
421 id,
422 StreamBuffer(IOBuf::create(0), stream->maxOffsetObserved + 1, true));
423 auto networkData = NetworkData(std::move(buf), Clock::now());
424 onNetworkData(addr, std::move(networkData));
425 }
426
getStream(StreamId id)427 QuicStreamState* getStream(StreamId id) {
428 return conn_->streamManager->getStream(id);
429 }
430
setServerConnectionId()431 void setServerConnectionId() {
432 // create server chosen connId with processId = 0 and workerId = 0
433 ServerConnectionIdParams params(0, 0, 0);
434 conn_->serverConnectionId = *connIdAlgo_->encodeConnectionId(params);
435 }
436
driveReadCallbacks()437 void driveReadCallbacks() {
438 getEventBase()->loopOnce();
439 }
440
getConnectionError()441 QuicErrorCode getConnectionError() {
442 return conn_->localConnectionError->first;
443 }
444
isClosed() const445 bool isClosed() const noexcept {
446 return closeState_ == CloseState::CLOSED;
447 }
448
closeWithoutWrite()449 void closeWithoutWrite() {
450 closeImpl(folly::none, false, false);
451 }
452
invokeWriteSocketData()453 void invokeWriteSocketData() {
454 writeSocketData();
455 }
456
invokeProcessCallbacksAfterNetworkData()457 void invokeProcessCallbacksAfterNetworkData() {
458 processCallbacksAfterNetworkData();
459 }
460
461 // Simulates the delivery of a Byte Event callback, similar to the way it
462 // happens in QuicTransportBase::processCallbacksAfterNetworkData() or
463 // in the runOnEvbAsync lambda in
464 // QuicTransportBase::registerByteEventCallback()
deleteRegisteredByteEvent(StreamId id,uint64_t offset,ByteEventCallback * cb,ByteEvent::Type type)465 bool deleteRegisteredByteEvent(
466 StreamId id,
467 uint64_t offset,
468 ByteEventCallback* cb,
469 ByteEvent::Type type) {
470 auto& byteEventMap = getByteEventMap(type);
471 auto streamByteEventCbIt = byteEventMap.find(id);
472 if (streamByteEventCbIt == byteEventMap.end()) {
473 return false;
474 }
475 auto pos = std::find_if(
476 streamByteEventCbIt->second.begin(),
477 streamByteEventCbIt->second.end(),
478 [offset, cb](const ByteEventDetail& p) {
479 return ((p.offset == offset) && (p.callback == cb));
480 });
481 if (pos == streamByteEventCbIt->second.end()) {
482 return false;
483 }
484 streamByteEventCbIt->second.erase(pos);
485 return true;
486 }
487
enableDatagram()488 void enableDatagram() {
489 conn_->datagramState.maxReadFrameSize = 65535;
490 conn_->datagramState.maxReadBufferSize = 10;
491 }
492
493 QuicServerConnectionState* transportConn;
494 std::unique_ptr<Aead> aead;
495 std::unique_ptr<PacketNumberCipher> headerCipher;
496 std::unique_ptr<ConnectionIdAlgo> connIdAlgo_;
497 bool transportClosed{false};
498 PacketNum packetNum_{0};
499 };
500
501 class QuicTransportImplTest : public Test {
502 public:
SetUp()503 void SetUp() override {
504 evb = std::make_unique<folly::EventBase>();
505 auto socket =
506 std::make_unique<NiceMock<folly::test::MockAsyncUDPSocket>>(evb.get());
507 socketPtr = socket.get();
508 transport = std::make_shared<TestQuicTransport>(
509 evb.get(), std::move(socket), connCallback);
510 auto& conn = *transport->transportConn;
511 conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiLocal =
512 kDefaultStreamWindowSize;
513 conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote =
514 kDefaultStreamWindowSize;
515 conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetUni =
516 kDefaultStreamWindowSize;
517 conn.flowControlState.peerAdvertisedMaxOffset =
518 kDefaultConnectionWindowSize;
519 conn.streamManager->setMaxLocalBidirectionalStreams(
520 kDefaultMaxStreamsBidirectional);
521 conn.streamManager->setMaxLocalUnidirectionalStreams(
522 kDefaultMaxStreamsUnidirectional);
523 }
524
getTxMatcher(StreamId id,uint64_t offset)525 auto getTxMatcher(StreamId id, uint64_t offset) {
526 return MockByteEventCallback::getTxMatcher(id, offset);
527 }
528
getAckMatcher(StreamId id,uint64_t offset)529 auto getAckMatcher(StreamId id, uint64_t offset) {
530 return MockByteEventCallback::getAckMatcher(id, offset);
531 }
532
533 protected:
534 std::unique_ptr<folly::EventBase> evb;
535 NiceMock<MockConnectionCallback> connCallback;
536 TestByteEventCallback byteEventCallback;
537 std::shared_ptr<TestQuicTransport> transport;
538 folly::test::MockAsyncUDPSocket* socketPtr;
539 };
540
541 class QuicTransportImplTestClose : public QuicTransportImplTest,
542 public testing::WithParamInterface<bool> {};
543
544 INSTANTIATE_TEST_CASE_P(
545 QuicTransportImplTest,
546 QuicTransportImplTestClose,
547 Values(true, false));
548
TEST_F(QuicTransportImplTest,AckTimeoutExpiredWillResetTimeoutFlag)549 TEST_F(QuicTransportImplTest, AckTimeoutExpiredWillResetTimeoutFlag) {
550 transport->invokeAckTimeout();
551 EXPECT_FALSE(transport->transportConn->pendingEvents.scheduleAckTimeout);
552 }
553
TEST_F(QuicTransportImplTest,IdleTimeoutExpiredDestroysTransport)554 TEST_F(QuicTransportImplTest, IdleTimeoutExpiredDestroysTransport) {
555 EXPECT_CALL(connCallback, onConnectionEnd()).WillOnce(Invoke([&]() {
556 transport = nullptr;
557 }));
558 transport->invokeIdleTimeout();
559 }
560
TEST_F(QuicTransportImplTest,IdleTimeoutStreamMaessage)561 TEST_F(QuicTransportImplTest, IdleTimeoutStreamMaessage) {
562 auto stream1 = transport->createBidirectionalStream().value();
563 auto stream2 = transport->createBidirectionalStream().value();
564 auto stream3 = transport->createUnidirectionalStream().value();
565 transport->setControlStream(stream3);
566
567 NiceMock<MockReadCallback> readCb1;
568 NiceMock<MockReadCallback> readCb2;
569
570 transport->setReadCallback(stream1, &readCb1);
571 transport->setReadCallback(stream2, &readCb2);
572
573 transport->addDataToStream(
574 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
575 transport->addDataToStream(
576 stream2,
577 StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10));
578 EXPECT_CALL(readCb1, readError(stream1, _))
579 .Times(1)
580 .WillOnce(Invoke([](auto, auto error) {
581 EXPECT_EQ(
582 "Idle timeout, num non control streams: 2", error.second->str());
583 }));
584 transport->invokeIdleTimeout();
585 }
586
TEST_F(QuicTransportImplTest,WriteAckPacketUnsetsLooper)587 TEST_F(QuicTransportImplTest, WriteAckPacketUnsetsLooper) {
588 // start looper in running state first
589 transport->writeLooper()->run(true);
590
591 // Write data which will be acked immediately.
592 PacketNum packetSeen = 10;
593 bool pktHasRetransmittableData = true;
594 bool pktHasCryptoData = true;
595 updateAckState(
596 *transport->transportConn,
597 PacketNumberSpace::Initial,
598 packetSeen,
599 pktHasRetransmittableData,
600 pktHasCryptoData,
601 Clock::now());
602 ASSERT_TRUE(transport->transportConn->ackStates.initialAckState
603 .needsToSendAckImmediately);
604 // Trigger the loop callback. This will trigger writes and we assume this will
605 // write the acks since we have nothing else to write.
606 transport->writeLooper()->runLoopCallback();
607 EXPECT_FALSE(transport->transportConn->pendingEvents.scheduleAckTimeout);
608 EXPECT_FALSE(transport->writeLooper()->isLoopCallbackScheduled());
609 }
610
TEST_F(QuicTransportImplTest,ReadCallbackDataAvailable)611 TEST_F(QuicTransportImplTest, ReadCallbackDataAvailable) {
612 auto stream1 = transport->createBidirectionalStream().value();
613 auto stream2 = transport->createBidirectionalStream().value();
614 StreamId stream3 = 0x6;
615
616 NiceMock<MockReadCallback> readCb1;
617 NiceMock<MockReadCallback> readCb2;
618 NiceMock<MockReadCallback> readCb3;
619
620 transport->setReadCallback(stream1, &readCb1);
621 transport->setReadCallback(stream2, &readCb2);
622
623 transport->addDataToStream(
624 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
625
626 transport->addDataToStream(
627 stream2,
628 StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10));
629
630 transport->addDataToStream(
631 stream3, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
632 transport->setReadCallback(stream3, &readCb3);
633
634 EXPECT_CALL(readCb1, readAvailable(stream1));
635 EXPECT_CALL(readCb3, readAvailable(stream3));
636 transport->driveReadCallbacks();
637
638 transport->addDataToStream(
639 stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
640
641 EXPECT_CALL(readCb1, readAvailable(stream1));
642 EXPECT_CALL(readCb2, readAvailable(stream2));
643 EXPECT_CALL(readCb3, readAvailable(stream3));
644 transport->driveReadCallbacks();
645
646 EXPECT_CALL(readCb1, readAvailable(stream1));
647 EXPECT_CALL(readCb2, readAvailable(stream2));
648 EXPECT_CALL(readCb3, readAvailable(stream3));
649 transport->driveReadCallbacks();
650
651 EXPECT_CALL(readCb2, readAvailable(stream2));
652 EXPECT_CALL(readCb3, readAvailable(stream3));
653 transport->setReadCallback(stream1, nullptr);
654 transport->driveReadCallbacks();
655 transport.reset();
656 }
657
TEST_F(QuicTransportImplTest,ReadCallbackDataAvailableNoReap)658 TEST_F(QuicTransportImplTest, ReadCallbackDataAvailableNoReap) {
659 auto stream1 = transport->createBidirectionalStream().value();
660 auto stream2 = transport->createBidirectionalStream().value();
661 StreamId stream3 = 0x6;
662
663 NiceMock<MockReadCallback> readCb1;
664 NiceMock<MockReadCallback> readCb2;
665 NiceMock<MockReadCallback> readCb3;
666
667 transport->setReadCallback(stream1, &readCb1);
668 transport->setReadCallback(stream2, &readCb2);
669
670 transport->addDataToStream(
671 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
672
673 transport->addDataToStream(
674 stream2,
675 StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10));
676
677 transport->addDataToStream(
678 stream3, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
679
680 EXPECT_CALL(readCb1, readAvailable(stream1));
681 transport->driveReadCallbacks();
682
683 transport->setReadCallback(stream3, &readCb3);
684 transport->addDataToStream(
685 stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
686
687 EXPECT_CALL(readCb1, readAvailable(stream1));
688 EXPECT_CALL(readCb2, readAvailable(stream2));
689 EXPECT_CALL(readCb3, readAvailable(stream3));
690 transport->driveReadCallbacks();
691
692 EXPECT_CALL(readCb1, readAvailable(stream1));
693 EXPECT_CALL(readCb2, readAvailable(stream2));
694 EXPECT_CALL(readCb3, readAvailable(stream3));
695 transport->driveReadCallbacks();
696
697 EXPECT_CALL(readCb2, readAvailable(stream2));
698 EXPECT_CALL(readCb3, readAvailable(stream3));
699 transport->setReadCallback(stream1, nullptr);
700 transport->driveReadCallbacks();
701 transport.reset();
702 }
703
TEST_F(QuicTransportImplTest,ReadCallbackDataAvailableOrdered)704 TEST_F(QuicTransportImplTest, ReadCallbackDataAvailableOrdered) {
705 auto transportSettings = transport->getTransportSettings();
706 transportSettings.orderedReadCallbacks = true;
707 transport->setTransportSettings(transportSettings);
708
709 auto stream1 = transport->createBidirectionalStream().value();
710 auto stream2 = transport->createBidirectionalStream().value();
711 StreamId stream3 = 0x6;
712
713 InSequence s;
714 NiceMock<MockReadCallback> readCb1;
715 NiceMock<MockReadCallback> readCb2;
716 NiceMock<MockReadCallback> readCb3;
717
718 transport->setReadCallback(stream1, &readCb1);
719 transport->setReadCallback(stream2, &readCb2);
720
721 transport->addDataToStream(
722 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
723
724 transport->addDataToStream(
725 stream2,
726 StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10));
727
728 transport->addDataToStream(
729 stream3, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
730 transport->setReadCallback(stream3, &readCb3);
731
732 EXPECT_CALL(readCb1, readAvailable(stream1));
733 EXPECT_CALL(readCb3, readAvailable(stream3));
734 transport->driveReadCallbacks();
735
736 transport->addDataToStream(
737 stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
738
739 EXPECT_CALL(readCb1, readAvailable(stream1));
740 EXPECT_CALL(readCb2, readAvailable(stream2));
741 EXPECT_CALL(readCb3, readAvailable(stream3));
742 transport->driveReadCallbacks();
743
744 EXPECT_CALL(readCb1, readAvailable(stream1));
745 EXPECT_CALL(readCb2, readAvailable(stream2));
746 EXPECT_CALL(readCb3, readAvailable(stream3));
747 transport->driveReadCallbacks();
748
749 EXPECT_CALL(readCb2, readAvailable(stream2));
750 EXPECT_CALL(readCb3, readAvailable(stream3));
751 transport->setReadCallback(stream1, nullptr);
752 transport->driveReadCallbacks();
753 transport.reset();
754 }
755
TEST_F(QuicTransportImplTest,ReadCallbackChangeReadCallback)756 TEST_F(QuicTransportImplTest, ReadCallbackChangeReadCallback) {
757 auto stream1 = transport->createBidirectionalStream().value();
758
759 NiceMock<MockReadCallback> readCb1;
760 NiceMock<MockReadCallback> readCb2;
761
762 EXPECT_TRUE(transport->setReadCallback(stream1, nullptr).hasError());
763
764 transport->setReadCallback(stream1, &readCb1);
765
766 transport->addDataToStream(
767 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
768
769 EXPECT_CALL(readCb1, readAvailable(stream1));
770 transport->driveReadCallbacks();
771
772 transport->setReadCallback(stream1, &readCb2);
773 EXPECT_CALL(readCb2, readAvailable(stream1));
774 transport->driveReadCallbacks();
775
776 auto& conn = transport->getConnectionState();
777 EXPECT_EQ(conn.pendingEvents.frames.size(), 0);
778 transport->setReadCallback(stream1, nullptr);
779 EXPECT_EQ(conn.pendingEvents.frames.size(), 1);
780 EXPECT_CALL(readCb2, readAvailable(_)).Times(0);
781 transport->driveReadCallbacks();
782
783 EXPECT_TRUE(transport->setReadCallback(stream1, &readCb2).hasError());
784
785 transport.reset();
786 }
787
TEST_F(QuicTransportImplTest,ReadCallbackUnsetAll)788 TEST_F(QuicTransportImplTest, ReadCallbackUnsetAll) {
789 auto stream1 = transport->createBidirectionalStream().value();
790 auto stream2 = transport->createBidirectionalStream().value();
791
792 NiceMock<MockReadCallback> readCb1;
793 NiceMock<MockReadCallback> readCb2;
794
795 // Set the read callbacks, and then add data to the stream, and see that the
796 // callbacks are, in fact, called.
797
798 transport->setReadCallback(stream1, &readCb1);
799 transport->setReadCallback(stream2, &readCb2);
800
801 EXPECT_CALL(readCb1, readAvailable(stream1));
802 EXPECT_CALL(readCb2, readAvailable(stream2));
803
804 transport->addDataToStream(
805 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
806 transport->addDataToStream(
807 stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
808
809 transport->driveReadCallbacks();
810
811 // Unset all of the read callbacks, then add data to the stream, and see that
812 // the read callbacks are not called.
813
814 transport->unsetAllReadCallbacks();
815
816 EXPECT_CALL(readCb1, readAvailable(stream1)).Times(0);
817 EXPECT_CALL(readCb2, readAvailable(stream2)).Times(0);
818
819 transport->addDataToStream(
820 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
821 transport->addDataToStream(
822 stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
823
824 transport->driveReadCallbacks();
825
826 transport.reset();
827 }
828
TEST_F(QuicTransportImplTest,ReadCallbackPauseResume)829 TEST_F(QuicTransportImplTest, ReadCallbackPauseResume) {
830 auto stream1 = transport->createBidirectionalStream().value();
831 auto stream2 = transport->createBidirectionalStream().value();
832 NiceMock<MockReadCallback> readCb1;
833 NiceMock<MockReadCallback> readCb2;
834
835 transport->setReadCallback(stream1, &readCb1);
836 transport->setReadCallback(stream2, &readCb2);
837
838 transport->addDataToStream(
839 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
840 transport->addDataToStream(
841 stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
842
843 auto res = transport->pauseRead(stream1);
844 EXPECT_TRUE(res);
845 EXPECT_CALL(readCb1, readAvailable(stream1)).Times(0);
846 EXPECT_CALL(readCb2, readAvailable(stream2));
847 transport->driveReadCallbacks();
848
849 res = transport->resumeRead(stream1);
850 EXPECT_TRUE(res);
851 res = transport->pauseRead(stream2);
852 EXPECT_CALL(readCb1, readAvailable(stream1));
853 EXPECT_CALL(readCb2, readAvailable(stream2)).Times(0);
854 transport->driveReadCallbacks();
855
856 auto stream3 = transport->createBidirectionalStream().value();
857 res = transport->pauseRead(stream3);
858 EXPECT_FALSE(res);
859 EXPECT_EQ(LocalErrorCode::APP_ERROR, res.error());
860 transport.reset();
861 }
862
TEST_F(QuicTransportImplTest,ReadCallbackNoCallbackSet)863 TEST_F(QuicTransportImplTest, ReadCallbackNoCallbackSet) {
864 auto stream1 = transport->createBidirectionalStream().value();
865
866 transport->addDataToStream(
867 stream1,
868 StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10));
869 transport->driveReadCallbacks();
870 transport->addDataToStream(
871 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
872 transport->driveReadCallbacks();
873 transport.reset();
874 }
875
TEST_F(QuicTransportImplTest,ReadCallbackInvalidStream)876 TEST_F(QuicTransportImplTest, ReadCallbackInvalidStream) {
877 NiceMock<MockReadCallback> readCb1;
878 StreamId invalidStream = 10;
879 EXPECT_TRUE(transport->setReadCallback(invalidStream, &readCb1).hasError());
880 transport.reset();
881 }
882
TEST_F(QuicTransportImplTest,ReadData)883 TEST_F(QuicTransportImplTest, ReadData) {
884 auto stream1 = transport->createBidirectionalStream().value();
885
886 NiceMock<MockReadCallback> readCb1;
887 auto readData = folly::IOBuf::copyBuffer("actual stream data");
888
889 transport->setReadCallback(stream1, &readCb1);
890
891 EXPECT_CALL(readCb1, readAvailable(stream1));
892 transport->addDataToStream(stream1, StreamBuffer(readData->clone(), 0));
893 transport->driveReadCallbacks();
894
895 transport->read(stream1, 10).thenOrThrow([&](std::pair<Buf, bool> data) {
896 IOBufEqualTo eq;
897 auto expected = readData->clone();
898 expected->trimEnd(expected->length() - 10);
899 EXPECT_TRUE(eq(*data.first, *expected));
900 });
901
902 EXPECT_CALL(readCb1, readAvailable(stream1));
903 transport->driveReadCallbacks();
904 transport->read(stream1, 100).thenOrThrow([&](std::pair<Buf, bool> data) {
905 IOBufEqualTo eq;
906 auto expected = readData->clone();
907 expected->trimStart(10);
908 EXPECT_TRUE(eq(*data.first, *expected));
909 });
910
911 transport->driveReadCallbacks();
912 transport.reset();
913 }
914
915 // TODO The finest copypasta around. We need a better story for parameterizing
916 // unidirectional vs. bidirectional.
TEST_F(QuicTransportImplTest,UnidirectionalReadData)917 TEST_F(QuicTransportImplTest, UnidirectionalReadData) {
918 auto stream1 = 0x6;
919
920 NiceMock<MockReadCallback> readCb1;
921 auto readData = folly::IOBuf::copyBuffer("actual stream data");
922
923 transport->addDataToStream(stream1, StreamBuffer(readData->clone(), 0));
924 transport->setReadCallback(stream1, &readCb1);
925 EXPECT_CALL(readCb1, readAvailable(stream1));
926 transport->driveReadCallbacks();
927
928 transport->read(stream1, 10).thenOrThrow([&](std::pair<Buf, bool> data) {
929 IOBufEqualTo eq;
930 auto expected = readData->clone();
931 expected->trimEnd(expected->length() - 10);
932 EXPECT_TRUE(eq(*data.first, *expected));
933 });
934
935 EXPECT_CALL(readCb1, readAvailable(stream1));
936 transport->driveReadCallbacks();
937 transport->read(stream1, 100).thenOrThrow([&](std::pair<Buf, bool> data) {
938 IOBufEqualTo eq;
939 auto expected = readData->clone();
940 expected->trimStart(10);
941 EXPECT_TRUE(eq(*data.first, *expected));
942 });
943
944 transport->driveReadCallbacks();
945 transport.reset();
946 }
947
TEST_F(QuicTransportImplTest,ReadDataUnsetReadCallbackInCallback)948 TEST_F(QuicTransportImplTest, ReadDataUnsetReadCallbackInCallback) {
949 auto stream1 = transport->createBidirectionalStream().value();
950 auto readData = folly::IOBuf::copyBuffer("actual stream data");
951
952 NiceMock<MockReadCallback> readCb1;
953 transport->setReadCallback(stream1, &readCb1);
954
955 transport->addDataToStream(stream1, StreamBuffer(readData->clone(), 0, true));
956
957 EXPECT_CALL(readCb1, readAvailable(stream1))
958 .WillOnce(Invoke(
959 [&](StreamId id) { transport->setReadCallback(id, nullptr); }));
960 transport->driveReadCallbacks();
961 transport->driveReadCallbacks();
962 transport->getEventBase()->loop();
963 transport.reset();
964 }
965
TEST_F(QuicTransportImplTest,ReadDataNoCallback)966 TEST_F(QuicTransportImplTest, ReadDataNoCallback) {
967 auto stream1 = transport->createBidirectionalStream().value();
968 auto readData = folly::IOBuf::copyBuffer("actual stream data");
969
970 transport->addDataToStream(stream1, StreamBuffer(readData->clone(), 0, true));
971 transport->driveReadCallbacks();
972 transport->read(stream1, 100).thenOrThrow([&](std::pair<Buf, bool> data) {
973 IOBufEqualTo eq;
974 EXPECT_TRUE(eq(*data.first, *readData));
975 EXPECT_TRUE(data.second);
976 });
977 transport.reset();
978 }
979
TEST_F(QuicTransportImplTest,ReadCallbackForClientOutOfOrderStream)980 TEST_F(QuicTransportImplTest, ReadCallbackForClientOutOfOrderStream) {
981 InSequence dummy;
982 StreamId clientOutOfOrderStream = 96;
983 StreamId clientOutOfOrderStream2 = 76;
984
985 auto readData = folly::IOBuf::copyBuffer("actual stream data");
986
987 NiceMock<MockReadCallback> streamRead;
988
989 for (StreamId start = 0x00; start <= clientOutOfOrderStream;
990 start += kStreamIncrement) {
991 EXPECT_CALL(connCallback, onNewBidirectionalStream(start))
992 .WillOnce(Invoke(
993 [&](StreamId id) { transport->setReadCallback(id, &streamRead); }));
994 }
995
996 EXPECT_CALL(streamRead, readAvailable(clientOutOfOrderStream))
997 .WillOnce(Invoke([&](StreamId id) {
998 transport->read(id, 100).thenOrThrow([&](std::pair<Buf, bool> data) {
999 IOBufEqualTo eq;
1000 EXPECT_TRUE(eq(*data.first, *readData));
1001 EXPECT_TRUE(data.second);
1002 });
1003 }));
1004
1005 transport->addDataToStream(
1006 clientOutOfOrderStream, StreamBuffer(readData->clone(), 0, true));
1007
1008 transport->driveReadCallbacks();
1009
1010 transport->addDataToStream(
1011 clientOutOfOrderStream2, StreamBuffer(readData->clone(), 0, true));
1012
1013 EXPECT_CALL(streamRead, readAvailable(clientOutOfOrderStream2))
1014 .WillOnce(Invoke([&](StreamId id) {
1015 transport->read(id, 100).thenOrThrow([&](std::pair<Buf, bool> data) {
1016 IOBufEqualTo eq;
1017 EXPECT_TRUE(eq(*data.first, *readData));
1018 EXPECT_TRUE(data.second);
1019 });
1020 }));
1021 transport->driveReadCallbacks();
1022 transport.reset();
1023 }
1024
TEST_F(QuicTransportImplTest,ReadDataInvalidStream)1025 TEST_F(QuicTransportImplTest, ReadDataInvalidStream) {
1026 StreamId invalidStream = 10;
1027 EXPECT_THROW(
1028 transport->read(invalidStream, 100).thenOrThrow([&](auto) {}),
1029 folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
1030 transport.reset();
1031 }
1032
TEST_F(QuicTransportImplTest,ReadError)1033 TEST_F(QuicTransportImplTest, ReadError) {
1034 auto stream1 = transport->createBidirectionalStream().value();
1035
1036 NiceMock<MockReadCallback> readCb1;
1037 auto readData = folly::IOBuf::copyBuffer("actual stream data");
1038
1039 transport->setReadCallback(stream1, &readCb1);
1040
1041 EXPECT_CALL(
1042 readCb1, readError(stream1, IsError(LocalErrorCode::STREAM_CLOSED)));
1043 transport->addStreamReadError(stream1, LocalErrorCode::STREAM_CLOSED);
1044 transport->driveReadCallbacks();
1045 transport.reset();
1046 }
1047
TEST_F(QuicTransportImplTest,ReadCallbackDeleteTransport)1048 TEST_F(QuicTransportImplTest, ReadCallbackDeleteTransport) {
1049 auto stream1 = transport->createBidirectionalStream().value();
1050 auto stream2 = transport->createBidirectionalStream().value();
1051
1052 NiceMock<MockReadCallback> readCb1;
1053 NiceMock<MockReadCallback> readCb2;
1054
1055 transport->setReadCallback(stream1, &readCb1);
1056 transport->setReadCallback(stream2, &readCb2);
1057
1058 transport->addStreamReadError(stream1, LocalErrorCode::NO_ERROR);
1059
1060 transport->addDataToStream(
1061 stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
1062
1063 EXPECT_CALL(readCb1, readError(stream1, _)).WillOnce(Invoke([&](auto, auto) {
1064 transport.reset();
1065 }));
1066 EXPECT_CALL(readCb2, readAvailable(stream2));
1067 transport->driveReadCallbacks();
1068 transport.reset();
1069 }
1070
TEST_F(QuicTransportImplTest,onNewBidirectionalStreamCallback)1071 TEST_F(QuicTransportImplTest, onNewBidirectionalStreamCallback) {
1072 auto readData = folly::IOBuf::copyBuffer("actual stream data");
1073
1074 StreamId stream2 = 0x00;
1075 EXPECT_CALL(connCallback, onNewBidirectionalStream(stream2));
1076 transport->addDataToStream(stream2, StreamBuffer(readData->clone(), 0, true));
1077
1078 StreamId stream3 = 0x04;
1079 EXPECT_CALL(connCallback, onNewBidirectionalStream(stream3));
1080 transport->addDataToStream(stream3, StreamBuffer(readData->clone(), 0, true));
1081
1082 StreamId uniStream3 = 0xa;
1083 EXPECT_CALL(
1084 connCallback,
1085 onNewUnidirectionalStream(uniStream3 - 2 * kStreamIncrement));
1086 EXPECT_CALL(
1087 connCallback, onNewUnidirectionalStream(uniStream3 - kStreamIncrement));
1088 EXPECT_CALL(connCallback, onNewUnidirectionalStream(uniStream3));
1089 transport->addDataToStream(
1090 uniStream3, StreamBuffer(readData->clone(), 0, true));
1091 transport.reset();
1092 }
1093
TEST_F(QuicTransportImplTest,onNewStreamCallbackDoesNotRemove)1094 TEST_F(QuicTransportImplTest, onNewStreamCallbackDoesNotRemove) {
1095 auto readData = folly::IOBuf::copyBuffer("actual stream data");
1096 StreamId uniStream1 = 2;
1097 StreamId uniStream2 = uniStream1 + kStreamIncrement;
1098 EXPECT_CALL(connCallback, onNewUnidirectionalStream(uniStream1))
1099 .WillOnce(Invoke([&](StreamId id) {
1100 ASSERT_FALSE(transport->read(id, 100).hasError());
1101 }));
1102 EXPECT_CALL(connCallback, onNewUnidirectionalStream(uniStream2))
1103 .WillOnce(Invoke([&](StreamId id) {
1104 ASSERT_FALSE(transport->read(id, 100).hasError());
1105 }));
1106 transport->addDataToStream(
1107 uniStream1, StreamBuffer(readData->clone(), 0, true));
1108 transport->addDataToStream(
1109 uniStream2, StreamBuffer(readData->clone(), 0, true));
1110 transport.reset();
1111 }
1112
TEST_F(QuicTransportImplTest,onNewBidirectionalStreamStreamOutOfOrder)1113 TEST_F(QuicTransportImplTest, onNewBidirectionalStreamStreamOutOfOrder) {
1114 InSequence dummy;
1115 auto readData = folly::IOBuf::copyBuffer("actual stream data");
1116 StreamId biStream1 = 28;
1117 StreamId uniStream1 = 30;
1118 for (StreamId id = 0x00; id <= biStream1; id += kStreamIncrement) {
1119 EXPECT_CALL(connCallback, onNewBidirectionalStream(id));
1120 }
1121 for (StreamId id = 0x02; id <= uniStream1; id += kStreamIncrement) {
1122 EXPECT_CALL(connCallback, onNewUnidirectionalStream(id));
1123 }
1124 transport->addDataToStream(
1125 biStream1, StreamBuffer(readData->clone(), 0, true));
1126 transport->addDataToStream(
1127 uniStream1, StreamBuffer(readData->clone(), 0, true));
1128
1129 StreamId biStream2 = 56;
1130 StreamId uniStream2 = 38;
1131 for (StreamId id = biStream1 + kStreamIncrement; id <= biStream2;
1132 id += kStreamIncrement) {
1133 EXPECT_CALL(connCallback, onNewBidirectionalStream(id));
1134 }
1135 for (StreamId id = uniStream1 + kStreamIncrement; id <= uniStream2;
1136 id += kStreamIncrement) {
1137 EXPECT_CALL(connCallback, onNewUnidirectionalStream(id));
1138 }
1139 transport->addDataToStream(
1140 biStream2, StreamBuffer(readData->clone(), 0, true));
1141 transport->addDataToStream(
1142 uniStream2, StreamBuffer(readData->clone(), 0, true));
1143 transport.reset();
1144 }
1145
TEST_F(QuicTransportImplTest,onNewBidirectionalStreamSetReadCallback)1146 TEST_F(QuicTransportImplTest, onNewBidirectionalStreamSetReadCallback) {
1147 InSequence dummy;
1148 auto readData = folly::IOBuf::copyBuffer("actual stream data");
1149 transport->addCryptoData(StreamBuffer(readData->clone(), 0, true));
1150
1151 NiceMock<MockReadCallback> stream2Read;
1152 StreamId stream2 = 0x00;
1153 EXPECT_CALL(connCallback, onNewBidirectionalStream(stream2))
1154 .WillOnce(Invoke(
1155 [&](StreamId id) { transport->setReadCallback(id, &stream2Read); }));
1156 transport->addDataToStream(stream2, StreamBuffer(readData->clone(), 0, true));
1157
1158 StreamId stream3 = 0x10;
1159 NiceMock<MockReadCallback> streamRead;
1160 for (StreamId start = stream2 + kStreamIncrement; start <= stream3;
1161 start += kStreamIncrement) {
1162 EXPECT_CALL(connCallback, onNewBidirectionalStream(start))
1163 .WillOnce(Invoke(
1164 [&](StreamId id) { transport->setReadCallback(id, &streamRead); }));
1165 }
1166 transport->addDataToStream(stream3, StreamBuffer(readData->clone(), 0, true));
1167 evb->loopOnce();
1168 transport.reset();
1169 }
1170
TEST_F(QuicTransportImplTest,OnInvalidServerStream)1171 TEST_F(QuicTransportImplTest, OnInvalidServerStream) {
1172 EXPECT_CALL(
1173 connCallback,
1174 onConnectionError(IsError(TransportErrorCode::STREAM_STATE_ERROR)));
1175 auto readData = folly::IOBuf::copyBuffer("actual stream data");
1176 StreamId stream1 = 29;
1177 transport->addDataToStream(stream1, StreamBuffer(readData->clone(), 0, true));
1178 EXPECT_TRUE(transport->isClosed());
1179 EXPECT_EQ(
1180 transport->getConnectionError(),
1181 QuicErrorCode(TransportErrorCode::STREAM_STATE_ERROR));
1182 transport.reset();
1183 }
1184
TEST_F(QuicTransportImplTest,CreateStream)1185 TEST_F(QuicTransportImplTest, CreateStream) {
1186 auto streamId = transport->createBidirectionalStream().value();
1187 auto streamId2 = transport->createBidirectionalStream().value();
1188 auto streamId3 = transport->createBidirectionalStream().value();
1189 auto streamId4 = transport->createBidirectionalStream().value();
1190
1191 EXPECT_EQ(streamId2, streamId + kStreamIncrement);
1192 EXPECT_EQ(streamId3, streamId2 + kStreamIncrement);
1193 EXPECT_EQ(streamId4, streamId3 + kStreamIncrement);
1194 transport.reset();
1195 }
1196
TEST_F(QuicTransportImplTest,CreateUnidirectionalStream)1197 TEST_F(QuicTransportImplTest, CreateUnidirectionalStream) {
1198 auto streamId = transport->createUnidirectionalStream().value();
1199 auto streamId2 = transport->createUnidirectionalStream().value();
1200 auto streamId3 = transport->createUnidirectionalStream().value();
1201 auto streamId4 = transport->createUnidirectionalStream().value();
1202
1203 EXPECT_EQ(streamId2, streamId + kStreamIncrement);
1204 EXPECT_EQ(streamId3, streamId2 + kStreamIncrement);
1205 EXPECT_EQ(streamId4, streamId3 + kStreamIncrement);
1206 transport.reset();
1207 }
1208
TEST_F(QuicTransportImplTest,CreateBothStream)1209 TEST_F(QuicTransportImplTest, CreateBothStream) {
1210 auto uniStreamId = transport->createUnidirectionalStream().value();
1211 auto biStreamId = transport->createBidirectionalStream().value();
1212 auto uniStreamId2 = transport->createUnidirectionalStream().value();
1213 auto biStreamId2 = transport->createBidirectionalStream().value();
1214 auto uniStreamId3 = transport->createUnidirectionalStream().value();
1215 auto biStreamId3 = transport->createBidirectionalStream().value();
1216 auto uniStreamId4 = transport->createUnidirectionalStream().value();
1217 auto biStreamId4 = transport->createBidirectionalStream().value();
1218
1219 EXPECT_EQ(uniStreamId2, uniStreamId + kStreamIncrement);
1220 EXPECT_EQ(uniStreamId3, uniStreamId2 + kStreamIncrement);
1221 EXPECT_EQ(uniStreamId4, uniStreamId3 + kStreamIncrement);
1222 EXPECT_EQ(biStreamId2, biStreamId + kStreamIncrement);
1223 EXPECT_EQ(biStreamId3, biStreamId2 + kStreamIncrement);
1224 EXPECT_EQ(biStreamId4, biStreamId3 + kStreamIncrement);
1225 transport.reset();
1226 }
1227
TEST_F(QuicTransportImplTest,CreateStreamLimitsBidirectionalZero)1228 TEST_F(QuicTransportImplTest, CreateStreamLimitsBidirectionalZero) {
1229 transport->transportConn->streamManager->setMaxLocalBidirectionalStreams(
1230 0, true);
1231 EXPECT_EQ(transport->getNumOpenableBidirectionalStreams(), 0);
1232 auto result = transport->createBidirectionalStream();
1233 ASSERT_FALSE(result);
1234 EXPECT_EQ(result.error(), LocalErrorCode::STREAM_LIMIT_EXCEEDED);
1235 result = transport->createUnidirectionalStream();
1236 EXPECT_TRUE(result);
1237 transport.reset();
1238 }
1239
TEST_F(QuicTransportImplTest,CreateStreamLimitsUnidirectionalZero)1240 TEST_F(QuicTransportImplTest, CreateStreamLimitsUnidirectionalZero) {
1241 transport->transportConn->streamManager->setMaxLocalUnidirectionalStreams(
1242 0, true);
1243 EXPECT_EQ(transport->getNumOpenableUnidirectionalStreams(), 0);
1244 auto result = transport->createUnidirectionalStream();
1245 ASSERT_FALSE(result);
1246 EXPECT_EQ(result.error(), LocalErrorCode::STREAM_LIMIT_EXCEEDED);
1247 result = transport->createBidirectionalStream();
1248 EXPECT_TRUE(result);
1249 transport.reset();
1250 }
1251
TEST_F(QuicTransportImplTest,CreateStreamLimitsBidirectionalFew)1252 TEST_F(QuicTransportImplTest, CreateStreamLimitsBidirectionalFew) {
1253 transport->transportConn->streamManager->setMaxLocalBidirectionalStreams(
1254 10, true);
1255 EXPECT_EQ(transport->getNumOpenableBidirectionalStreams(), 10);
1256 for (int i = 0; i < 10; i++) {
1257 EXPECT_TRUE(transport->createBidirectionalStream());
1258 EXPECT_EQ(transport->getNumOpenableBidirectionalStreams(), 10 - (i + 1));
1259 }
1260 auto result = transport->createBidirectionalStream();
1261 ASSERT_FALSE(result);
1262 EXPECT_EQ(result.error(), LocalErrorCode::STREAM_LIMIT_EXCEEDED);
1263 EXPECT_TRUE(transport->createUnidirectionalStream());
1264 transport.reset();
1265 }
1266
TEST_F(QuicTransportImplTest,CreateStreamLimitsUnidirectionalFew)1267 TEST_F(QuicTransportImplTest, CreateStreamLimitsUnidirectionalFew) {
1268 transport->transportConn->streamManager->setMaxLocalUnidirectionalStreams(
1269 10, true);
1270 EXPECT_EQ(transport->getNumOpenableUnidirectionalStreams(), 10);
1271 for (int i = 0; i < 10; i++) {
1272 EXPECT_TRUE(transport->createUnidirectionalStream());
1273 EXPECT_EQ(transport->getNumOpenableUnidirectionalStreams(), 10 - (i + 1));
1274 }
1275 auto result = transport->createUnidirectionalStream();
1276 ASSERT_FALSE(result);
1277 EXPECT_EQ(result.error(), LocalErrorCode::STREAM_LIMIT_EXCEEDED);
1278 EXPECT_TRUE(transport->createBidirectionalStream());
1279 transport.reset();
1280 }
1281
TEST_F(QuicTransportImplTest,onBidiStreamsAvailableCallback)1282 TEST_F(QuicTransportImplTest, onBidiStreamsAvailableCallback) {
1283 transport->transportConn->streamManager->setMaxLocalBidirectionalStreams(
1284 0, /*force=*/true);
1285
1286 EXPECT_CALL(connCallback, onBidirectionalStreamsAvailable(_))
1287 .WillOnce(Invoke([](uint64_t numAvailableStreams) {
1288 EXPECT_EQ(numAvailableStreams, 1);
1289 }));
1290 transport->addMaxStreamsFrame(MaxStreamsFrame(1, /*isBidirectionalIn=*/true));
1291 EXPECT_EQ(transport->getNumOpenableBidirectionalStreams(), 1);
1292
1293 // same value max streams frame doesn't trigger callback
1294 transport->addMaxStreamsFrame(MaxStreamsFrame(1, /*isBidirectionalIn=*/true));
1295 }
1296
TEST_F(QuicTransportImplTest,onBidiStreamsAvailableCallbackAfterExausted)1297 TEST_F(QuicTransportImplTest, onBidiStreamsAvailableCallbackAfterExausted) {
1298 transport->transportConn->streamManager->setMaxLocalBidirectionalStreams(
1299 0, /*force=*/true);
1300
1301 EXPECT_CALL(connCallback, onBidirectionalStreamsAvailable(_)).Times(2);
1302 transport->addMaxStreamsFrame(MaxStreamsFrame(
1303 1,
1304 /*isBidirectionalIn=*/true));
1305 EXPECT_EQ(transport->getNumOpenableBidirectionalStreams(), 1);
1306
1307 auto result = transport->createBidirectionalStream();
1308 EXPECT_TRUE(result);
1309 EXPECT_EQ(transport->getNumOpenableBidirectionalStreams(), 0);
1310
1311 transport->addMaxStreamsFrame(MaxStreamsFrame(
1312 2,
1313 /*isBidirectionalIn=*/true));
1314 }
1315
TEST_F(QuicTransportImplTest,oneUniStreamsAvailableCallback)1316 TEST_F(QuicTransportImplTest, oneUniStreamsAvailableCallback) {
1317 transport->transportConn->streamManager->setMaxLocalUnidirectionalStreams(
1318 0, /*force=*/true);
1319
1320 EXPECT_CALL(connCallback, onUnidirectionalStreamsAvailable(_))
1321 .WillOnce(Invoke([](uint64_t numAvailableStreams) {
1322 EXPECT_EQ(numAvailableStreams, 1);
1323 }));
1324 transport->addMaxStreamsFrame(
1325 MaxStreamsFrame(1, /*isBidirectionalIn=*/false));
1326 EXPECT_EQ(transport->getNumOpenableUnidirectionalStreams(), 1);
1327
1328 // same value max streams frame doesn't trigger callback
1329 transport->addMaxStreamsFrame(
1330 MaxStreamsFrame(1, /*isBidirectionalIn=*/false));
1331 }
1332
TEST_F(QuicTransportImplTest,onUniStreamsAvailableCallbackAfterExausted)1333 TEST_F(QuicTransportImplTest, onUniStreamsAvailableCallbackAfterExausted) {
1334 transport->transportConn->streamManager->setMaxLocalUnidirectionalStreams(
1335 0, /*force=*/true);
1336
1337 EXPECT_CALL(connCallback, onUnidirectionalStreamsAvailable(_)).Times(2);
1338 transport->addMaxStreamsFrame(
1339 MaxStreamsFrame(1, /*isBidirectionalIn=*/false));
1340 EXPECT_EQ(transport->getNumOpenableUnidirectionalStreams(), 1);
1341
1342 auto result = transport->createUnidirectionalStream();
1343 EXPECT_TRUE(result);
1344 EXPECT_EQ(transport->getNumOpenableUnidirectionalStreams(), 0);
1345
1346 transport->addMaxStreamsFrame(
1347 MaxStreamsFrame(2, /*isBidirectionalIn=*/false));
1348 }
1349
TEST_F(QuicTransportImplTest,ReadDataAlsoChecksLossAlarm)1350 TEST_F(QuicTransportImplTest, ReadDataAlsoChecksLossAlarm) {
1351 transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
1352 auto stream = transport->createBidirectionalStream().value();
1353 transport->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), true);
1354 // Artificially stop the write looper so that the read can trigger it.
1355 transport->writeLooper()->stop();
1356 transport->addDataToStream(
1357 stream, StreamBuffer(folly::IOBuf::copyBuffer("Data"), 0));
1358 EXPECT_TRUE(transport->writeLooper()->isRunning());
1359 // Drive the event loop once to allow for the write looper to continue.
1360 evb->loopOnce();
1361 EXPECT_TRUE(transport->isLossTimeoutScheduled());
1362 transport.reset();
1363 }
1364
TEST_F(QuicTransportImplTest,ConnectionErrorOnWrite)1365 TEST_F(QuicTransportImplTest, ConnectionErrorOnWrite) {
1366 transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
1367 auto stream = transport->createBidirectionalStream().value();
1368 EXPECT_CALL(*socketPtr, write(_, _))
1369 .WillOnce(SetErrnoAndReturn(ENETUNREACH, -1));
1370 transport->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), true, nullptr);
1371 transport->addDataToStream(
1372 stream, StreamBuffer(folly::IOBuf::copyBuffer("Data"), 0));
1373 evb->loopOnce();
1374
1375 EXPECT_TRUE(transport->isClosed());
1376 EXPECT_EQ(
1377 transport->getConnectionError(),
1378 QuicErrorCode(LocalErrorCode::CONNECTION_ABANDONED));
1379 }
1380
TEST_F(QuicTransportImplTest,ReadErrorUnsanitizedErrorMsg)1381 TEST_F(QuicTransportImplTest, ReadErrorUnsanitizedErrorMsg) {
1382 transport->setServerConnectionId();
1383 transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
1384 auto stream = transport->createBidirectionalStream().value();
1385 MockReadCallback rcb;
1386 transport->setReadCallback(stream, &rcb);
1387 EXPECT_CALL(rcb, readError(stream, _))
1388 .Times(1)
1389 .WillOnce(Invoke(
1390 [](StreamId,
1391 std::pair<QuicErrorCode, folly::Optional<folly::StringPiece>>
1392 error) {
1393 EXPECT_EQ("You need to calm down.", *error.second);
1394 }));
1395
1396 EXPECT_CALL(*socketPtr, write(_, _)).WillOnce(Invoke([](auto&, auto&) {
1397 throw std::runtime_error("You need to calm down.");
1398 return 0;
1399 }));
1400 transport->writeChain(
1401 stream,
1402 folly::IOBuf::copyBuffer("You are being too loud."),
1403 true,
1404 nullptr);
1405 evb->loopOnce();
1406
1407 EXPECT_TRUE(transport->isClosed());
1408 }
1409
TEST_F(QuicTransportImplTest,ConnectionErrorUnhandledException)1410 TEST_F(QuicTransportImplTest, ConnectionErrorUnhandledException) {
1411 transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
1412 auto stream = transport->createBidirectionalStream().value();
1413 EXPECT_CALL(
1414 connCallback,
1415 onConnectionError(std::make_pair(
1416 QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
1417 std::string("Well there's your problem"))));
1418 EXPECT_CALL(*socketPtr, write(_, _)).WillOnce(Invoke([](auto&, auto&) {
1419 throw std::runtime_error("Well there's your problem");
1420 return 0;
1421 }));
1422 transport->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), true, nullptr);
1423 transport->addDataToStream(
1424 stream, StreamBuffer(folly::IOBuf::copyBuffer("Data"), 0));
1425 evb->loopOnce();
1426
1427 EXPECT_TRUE(transport->isClosed());
1428 EXPECT_EQ(
1429 transport->getConnectionError(),
1430 QuicErrorCode(TransportErrorCode::INTERNAL_ERROR));
1431 }
1432
TEST_F(QuicTransportImplTest,LossTimeoutNoLessThanTickInterval)1433 TEST_F(QuicTransportImplTest, LossTimeoutNoLessThanTickInterval) {
1434 auto tickInterval = evb->timer().getTickInterval();
1435 transport->scheduleLossTimeout(tickInterval - 1ms);
1436 EXPECT_NEAR(
1437 tickInterval.count(),
1438 transport->getLossTimeoutRemainingTime().count(),
1439 2);
1440 }
1441
TEST_F(QuicTransportImplTest,CloseStreamAfterReadError)1442 TEST_F(QuicTransportImplTest, CloseStreamAfterReadError) {
1443 auto qLogger = std::make_shared<FileQLogger>(VantagePoint::Client);
1444 transport->transportConn->qLogger = qLogger;
1445 auto stream1 = transport->createBidirectionalStream().value();
1446
1447 NiceMock<MockReadCallback> readCb1;
1448 transport->setReadCallback(stream1, &readCb1);
1449
1450 transport->addStreamReadError(stream1, LocalErrorCode::NO_ERROR);
1451 transport->closeStream(stream1);
1452
1453 EXPECT_CALL(readCb1, readError(stream1, IsError(LocalErrorCode::NO_ERROR)));
1454 transport->driveReadCallbacks();
1455
1456 EXPECT_FALSE(transport->transportConn->streamManager->streamExists(stream1));
1457 transport.reset();
1458
1459 std::vector<int> indices =
1460 getQLogEventIndices(QLogEventType::TransportStateUpdate, qLogger);
1461 EXPECT_EQ(indices.size(), 1);
1462 auto tmp = std::move(qLogger->logs[indices[0]]);
1463 auto event = dynamic_cast<QLogTransportStateUpdateEvent*>(tmp.get());
1464 EXPECT_EQ(event->update, getClosingStream("1"));
1465 }
1466
TEST_F(QuicTransportImplTest,CloseStreamAfterReadFin)1467 TEST_F(QuicTransportImplTest, CloseStreamAfterReadFin) {
1468 auto stream2 = transport->createBidirectionalStream().value();
1469 NiceMock<MockReadCallback> readCb2;
1470 transport->setReadCallback(stream2, &readCb2);
1471
1472 transport->addDataToStream(
1473 stream2,
1474 StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0, true));
1475
1476 EXPECT_CALL(readCb2, readAvailable(stream2)).WillOnce(Invoke([&](StreamId) {
1477 auto data = transport->read(stream2, 100);
1478 EXPECT_TRUE(data->second);
1479 transport->closeStream(stream2);
1480 }));
1481 transport->driveReadCallbacks();
1482 EXPECT_FALSE(transport->transportConn->streamManager->streamExists(stream2));
1483 transport.reset();
1484 }
1485
TEST_F(QuicTransportImplTest,CloseTransportCleansupOutstandingCounters)1486 TEST_F(QuicTransportImplTest, CloseTransportCleansupOutstandingCounters) {
1487 transport->transportConn->outstandings
1488 .packetCount[PacketNumberSpace::Handshake] = 200;
1489 transport->closeNow(folly::none);
1490 EXPECT_EQ(
1491 0,
1492 transport->transportConn->outstandings
1493 .packetCount[PacketNumberSpace::Handshake]);
1494 }
1495
TEST_F(QuicTransportImplTest,DeliveryCallbackUnsetAll)1496 TEST_F(QuicTransportImplTest, DeliveryCallbackUnsetAll) {
1497 auto stream1 = transport->createBidirectionalStream().value();
1498 auto stream2 = transport->createBidirectionalStream().value();
1499 NiceMock<MockDeliveryCallback> dcb1;
1500 NiceMock<MockDeliveryCallback> dcb2;
1501
1502 transport->registerDeliveryCallback(stream1, 10, &dcb1);
1503 transport->registerDeliveryCallback(stream2, 20, &dcb2);
1504
1505 EXPECT_CALL(dcb1, onCanceled(_, _));
1506 EXPECT_CALL(dcb2, onCanceled(_, _));
1507
1508 transport->unsetAllDeliveryCallbacks();
1509
1510 EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0);
1511 EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
1512
1513 transport->close(folly::none);
1514 }
1515
TEST_F(QuicTransportImplTest,DeliveryCallbackUnsetOne)1516 TEST_F(QuicTransportImplTest, DeliveryCallbackUnsetOne) {
1517 auto stream1 = transport->createBidirectionalStream().value();
1518 auto stream2 = transport->createBidirectionalStream().value();
1519 NiceMock<MockDeliveryCallback> dcb1;
1520 NiceMock<MockDeliveryCallback> dcb2;
1521
1522 transport->registerDeliveryCallback(stream1, 10, &dcb1);
1523 transport->registerDeliveryCallback(stream2, 20, &dcb2);
1524
1525 EXPECT_CALL(dcb1, onCanceled(_, _));
1526 EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
1527
1528 transport->cancelDeliveryCallbacksForStream(stream1);
1529
1530 EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0);
1531 EXPECT_CALL(dcb2, onCanceled(_, _));
1532
1533 transport->close(folly::none);
1534 }
1535
TEST_F(QuicTransportImplTest,ByteEventCallbacksManagementSingleStream)1536 TEST_F(QuicTransportImplTest, ByteEventCallbacksManagementSingleStream) {
1537 auto stream = transport->createBidirectionalStream().value();
1538 uint64_t offset1 = 10, offset2 = 20;
1539
1540 ByteEvent txEvent1 = {
1541 .id = stream, .offset = offset1, .type = ByteEvent::Type::TX};
1542 ByteEvent txEvent2 = {
1543 .id = stream, .offset = offset2, .type = ByteEvent::Type::TX};
1544 ByteEvent ackEvent1 = {
1545 .id = stream, .offset = offset1, .type = ByteEvent::Type::ACK};
1546 ByteEvent ackEvent2 = {
1547 .id = stream, .offset = offset2, .type = ByteEvent::Type::ACK};
1548
1549 // Register 2 TX and 2 ACK events for the same stream at 2 different offsets
1550 transport->registerTxCallback(
1551 txEvent1.id, txEvent1.offset, &byteEventCallback);
1552 transport->registerTxCallback(
1553 txEvent2.id, txEvent2.offset, &byteEventCallback);
1554 transport->registerByteEventCallback(
1555 ByteEvent::Type::ACK, ackEvent1.id, ackEvent1.offset, &byteEventCallback);
1556 transport->registerByteEventCallback(
1557 ByteEvent::Type::ACK, ackEvent2.id, ackEvent2.offset, &byteEventCallback);
1558 EXPECT_THAT(
1559 byteEventCallback.getByteEventTracker(),
1560 UnorderedElementsAre(
1561 getByteEventTrackerMatcher(
1562 txEvent1, TestByteEventCallback::Status::REGISTERED),
1563 getByteEventTrackerMatcher(
1564 txEvent2, TestByteEventCallback::Status::REGISTERED),
1565 getByteEventTrackerMatcher(
1566 ackEvent1, TestByteEventCallback::Status::REGISTERED),
1567 getByteEventTrackerMatcher(
1568 ackEvent2, TestByteEventCallback::Status::REGISTERED)));
1569
1570 // Registering the same events a second time will result in an error.
1571 // as double registrations are not allowed.
1572 folly::Expected<folly::Unit, LocalErrorCode> ret;
1573 ret = transport->registerTxCallback(
1574 txEvent1.id, txEvent1.offset, &byteEventCallback);
1575 EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
1576 ret = transport->registerTxCallback(
1577 txEvent2.id, txEvent2.offset, &byteEventCallback);
1578 EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
1579 ret = transport->registerByteEventCallback(
1580 ByteEvent::Type::ACK, ackEvent1.id, ackEvent1.offset, &byteEventCallback);
1581 EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
1582 ret = transport->registerByteEventCallback(
1583 ByteEvent::Type::ACK, ackEvent2.id, ackEvent2.offset, &byteEventCallback);
1584 EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
1585 EXPECT_THAT(
1586 byteEventCallback.getByteEventTracker(),
1587 UnorderedElementsAre(
1588 getByteEventTrackerMatcher(
1589 txEvent1, TestByteEventCallback::Status::REGISTERED),
1590 getByteEventTrackerMatcher(
1591 txEvent2, TestByteEventCallback::Status::REGISTERED),
1592 getByteEventTrackerMatcher(
1593 ackEvent1, TestByteEventCallback::Status::REGISTERED),
1594 getByteEventTrackerMatcher(
1595 ackEvent2, TestByteEventCallback::Status::REGISTERED)));
1596
1597 // On the ACK events, the transport usually sets the srtt value. This value
1598 // should have NO EFFECT on the ByteEvent's hash and we still should be able
1599 // to identify the previously registered byte event correctly.
1600 ackEvent1.srtt = std::chrono::microseconds(1000);
1601 ackEvent2.srtt = std::chrono::microseconds(2000);
1602
1603 // Deliver 1 TX and 1 ACK event. Cancel the other TX anc ACK event
1604 byteEventCallback.onByteEvent(txEvent1);
1605 byteEventCallback.onByteEvent(ackEvent2);
1606 byteEventCallback.onByteEventCanceled(txEvent2);
1607 byteEventCallback.onByteEventCanceled((ByteEventCancellation)ackEvent1);
1608
1609 EXPECT_THAT(
1610 byteEventCallback.getByteEventTracker(),
1611 UnorderedElementsAre(
1612 getByteEventTrackerMatcher(
1613 txEvent1, TestByteEventCallback::Status::RECEIVED),
1614 getByteEventTrackerMatcher(
1615 txEvent2, TestByteEventCallback::Status::CANCELLED),
1616 getByteEventTrackerMatcher(
1617 ackEvent1, TestByteEventCallback::Status::CANCELLED),
1618 getByteEventTrackerMatcher(
1619 ackEvent2, TestByteEventCallback::Status::RECEIVED)));
1620 }
1621
TEST_F(QuicTransportImplTest,ByteEventCallbacksManagementDifferentStreams)1622 TEST_F(QuicTransportImplTest, ByteEventCallbacksManagementDifferentStreams) {
1623 auto stream1 = transport->createBidirectionalStream().value();
1624 auto stream2 = transport->createBidirectionalStream().value();
1625
1626 ByteEvent txEvent1 = {
1627 .id = stream1, .offset = 10, .type = ByteEvent::Type::TX};
1628 ByteEvent txEvent2 = {
1629 .id = stream2, .offset = 20, .type = ByteEvent::Type::TX};
1630 ByteEvent ackEvent1 = {
1631 .id = stream1, .offset = 10, .type = ByteEvent::Type::ACK};
1632 ByteEvent ackEvent2 = {
1633 .id = stream2, .offset = 20, .type = ByteEvent::Type::ACK};
1634
1635 EXPECT_THAT(byteEventCallback.getByteEventTracker(), IsEmpty());
1636 // Register 2 TX and 2 ACK events for 2 separate streams.
1637 transport->registerTxCallback(
1638 txEvent1.id, txEvent1.offset, &byteEventCallback);
1639 transport->registerTxCallback(
1640 txEvent2.id, txEvent2.offset, &byteEventCallback);
1641 transport->registerByteEventCallback(
1642 ByteEvent::Type::ACK, ackEvent1.id, ackEvent1.offset, &byteEventCallback);
1643 transport->registerByteEventCallback(
1644 ByteEvent::Type::ACK, ackEvent2.id, ackEvent2.offset, &byteEventCallback);
1645 EXPECT_THAT(
1646 byteEventCallback.getByteEventTracker(),
1647 UnorderedElementsAre(
1648 getByteEventTrackerMatcher(
1649 txEvent1, TestByteEventCallback::Status::REGISTERED),
1650 getByteEventTrackerMatcher(
1651 txEvent2, TestByteEventCallback::Status::REGISTERED),
1652 getByteEventTrackerMatcher(
1653 ackEvent1, TestByteEventCallback::Status::REGISTERED),
1654 getByteEventTrackerMatcher(
1655 ackEvent2, TestByteEventCallback::Status::REGISTERED)));
1656
1657 // On the ACK events, the transport usually sets the srtt value. This value
1658 // should have NO EFFECT on the ByteEvent's hash and we should still be able
1659 // to identify the previously registered byte event correctly.
1660 ackEvent1.srtt = std::chrono::microseconds(1000);
1661 ackEvent2.srtt = std::chrono::microseconds(2000);
1662
1663 // Deliver the TX event for stream 1 and cancel the ACK event for stream 2
1664 byteEventCallback.onByteEvent(txEvent1);
1665 byteEventCallback.onByteEventCanceled((ByteEventCancellation)ackEvent2);
1666
1667 EXPECT_THAT(
1668 byteEventCallback.getByteEventTracker(),
1669 UnorderedElementsAre(
1670 getByteEventTrackerMatcher(
1671 txEvent1, TestByteEventCallback::Status::RECEIVED),
1672 getByteEventTrackerMatcher(
1673 txEvent2, TestByteEventCallback::Status::REGISTERED),
1674 getByteEventTrackerMatcher(
1675 ackEvent1, TestByteEventCallback::Status::REGISTERED),
1676 getByteEventTrackerMatcher(
1677 ackEvent2, TestByteEventCallback::Status::CANCELLED)));
1678
1679 // Deliver the TX event for stream 2 and cancel the ACK event for stream 1
1680 byteEventCallback.onByteEvent(txEvent2);
1681 byteEventCallback.onByteEventCanceled((ByteEventCancellation)ackEvent1);
1682
1683 EXPECT_THAT(
1684 byteEventCallback.getByteEventTracker(),
1685 UnorderedElementsAre(
1686 getByteEventTrackerMatcher(
1687 txEvent1, TestByteEventCallback::Status::RECEIVED),
1688 getByteEventTrackerMatcher(
1689 txEvent2, TestByteEventCallback::Status::RECEIVED),
1690 getByteEventTrackerMatcher(
1691 ackEvent1, TestByteEventCallback::Status::CANCELLED),
1692 getByteEventTrackerMatcher(
1693 ackEvent2, TestByteEventCallback::Status::CANCELLED)));
1694 }
1695
TEST_F(QuicTransportImplTest,RegisterTxDeliveryCallbackLowerThanExpected)1696 TEST_F(QuicTransportImplTest, RegisterTxDeliveryCallbackLowerThanExpected) {
1697 auto stream = transport->createBidirectionalStream().value();
1698 StrictMock<MockByteEventCallback> txcb1;
1699 StrictMock<MockByteEventCallback> txcb2;
1700 StrictMock<MockByteEventCallback> txcb3;
1701 NiceMock<MockDeliveryCallback> dcb1;
1702 NiceMock<MockDeliveryCallback> dcb2;
1703 NiceMock<MockDeliveryCallback> dcb3;
1704
1705 EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream, 10)));
1706 EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream, 20)));
1707 transport->registerTxCallback(stream, 10, &txcb1);
1708 transport->registerTxCallback(stream, 20, &txcb2);
1709 transport->registerDeliveryCallback(stream, 10, &dcb1);
1710 transport->registerDeliveryCallback(stream, 20, &dcb2);
1711 Mock::VerifyAndClearExpectations(&txcb1);
1712 Mock::VerifyAndClearExpectations(&txcb2);
1713
1714 auto streamState = transport->transportConn->streamManager->getStream(stream);
1715 streamState->currentWriteOffset = 7;
1716 streamState->ackedIntervals.insert(0, 6);
1717
1718 EXPECT_CALL(txcb3, onByteEventRegistered(getTxMatcher(stream, 2)));
1719 EXPECT_CALL(txcb3, onByteEvent(getTxMatcher(stream, 2)));
1720 EXPECT_CALL(dcb3, onDeliveryAck(stream, 2, _));
1721 transport->registerTxCallback(stream, 2, &txcb3);
1722 transport->registerDeliveryCallback(stream, 2, &dcb3);
1723 evb->loopOnce();
1724 Mock::VerifyAndClearExpectations(&txcb3);
1725 Mock::VerifyAndClearExpectations(&dcb3);
1726
1727 EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream, 10)));
1728 EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream, 20)));
1729 EXPECT_CALL(dcb1, onCanceled(_, _));
1730 EXPECT_CALL(dcb2, onCanceled(_, _));
1731 transport->close(folly::none);
1732 Mock::VerifyAndClearExpectations(&txcb1);
1733 Mock::VerifyAndClearExpectations(&txcb2);
1734 Mock::VerifyAndClearExpectations(&txcb3);
1735 Mock::VerifyAndClearExpectations(&dcb1);
1736 Mock::VerifyAndClearExpectations(&dcb2);
1737 Mock::VerifyAndClearExpectations(&dcb3);
1738 }
1739
TEST_F(QuicTransportImplTest,RegisterTxDeliveryCallbackLowerThanExpectedClose)1740 TEST_F(
1741 QuicTransportImplTest,
1742 RegisterTxDeliveryCallbackLowerThanExpectedClose) {
1743 auto stream = transport->createBidirectionalStream().value();
1744 StrictMock<MockByteEventCallback> txcb;
1745 NiceMock<MockDeliveryCallback> dcb;
1746 auto streamState = transport->transportConn->streamManager->getStream(stream);
1747 streamState->currentWriteOffset = 7;
1748
1749 EXPECT_CALL(txcb, onByteEventRegistered(getTxMatcher(stream, 2)));
1750 EXPECT_CALL(txcb, onByteEventCanceled(getTxMatcher(stream, 2)));
1751 EXPECT_CALL(dcb, onCanceled(_, _));
1752 transport->registerTxCallback(stream, 2, &txcb);
1753 transport->registerDeliveryCallback(stream, 2, &dcb);
1754 transport->close(folly::none);
1755 evb->loopOnce();
1756 Mock::VerifyAndClearExpectations(&txcb);
1757 Mock::VerifyAndClearExpectations(&dcb);
1758 }
1759
TEST_F(QuicTransportImplTest,RegisterDeliveryCallbackMultipleRegistrationsTx)1760 TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackMultipleRegistrationsTx) {
1761 auto stream = transport->createBidirectionalStream().value();
1762 StrictMock<MockByteEventCallback> txcb1;
1763 StrictMock<MockByteEventCallback> txcb2;
1764
1765 // Set the current write offset to 7.
1766 auto streamState = transport->transportConn->streamManager->getStream(stream);
1767 streamState->currentWriteOffset = 7;
1768 streamState->ackedIntervals.insert(0, 6);
1769
1770 // Have 2 different recipients register for a callback on the same stream ID
1771 // and offset that is before the curernt write offset, they will both be
1772 // scheduled for immediate delivery.
1773 EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream, 3)));
1774 EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream, 3)));
1775 transport->registerTxCallback(stream, 3, &txcb1);
1776 transport->registerTxCallback(stream, 3, &txcb2);
1777 Mock::VerifyAndClearExpectations(&txcb1);
1778 Mock::VerifyAndClearExpectations(&txcb2);
1779
1780 // Now, re-register the same callbacks, it should not go through.
1781 auto ret = transport->registerTxCallback(stream, 3, &txcb1);
1782 EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
1783 ret = transport->registerTxCallback(stream, 3, &txcb2);
1784 EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
1785
1786 // Deliver the first set of registrations.
1787 EXPECT_CALL(txcb1, onByteEvent(getTxMatcher(stream, 3))).Times(1);
1788 EXPECT_CALL(txcb2, onByteEvent(getTxMatcher(stream, 3))).Times(1);
1789 evb->loopOnce();
1790 Mock::VerifyAndClearExpectations(&txcb1);
1791 Mock::VerifyAndClearExpectations(&txcb2);
1792 }
1793
TEST_F(QuicTransportImplTest,RegisterDeliveryCallbackMultipleRegistrationsAck)1794 TEST_F(
1795 QuicTransportImplTest,
1796 RegisterDeliveryCallbackMultipleRegistrationsAck) {
1797 auto stream = transport->createBidirectionalStream().value();
1798 StrictMock<MockByteEventCallback> txcb1;
1799 StrictMock<MockByteEventCallback> txcb2;
1800
1801 // Set the current write offset to 7.
1802 auto streamState = transport->transportConn->streamManager->getStream(stream);
1803 streamState->currentWriteOffset = 7;
1804 streamState->ackedIntervals.insert(0, 6);
1805
1806 // Have 2 different recipients register for a callback on the same stream ID
1807 // and offset that is before the curernt write offset, they will both be
1808 // scheduled for immediate delivery.
1809 EXPECT_CALL(txcb1, onByteEventRegistered(getAckMatcher(stream, 3)));
1810 EXPECT_CALL(txcb2, onByteEventRegistered(getAckMatcher(stream, 3)));
1811 transport->registerByteEventCallback(ByteEvent::Type::ACK, stream, 3, &txcb1);
1812 transport->registerByteEventCallback(ByteEvent::Type::ACK, stream, 3, &txcb2);
1813 Mock::VerifyAndClearExpectations(&txcb1);
1814 Mock::VerifyAndClearExpectations(&txcb2);
1815
1816 // Now, re-register the same callbacks, it should not go through.
1817 auto ret = transport->registerByteEventCallback(
1818 ByteEvent::Type::ACK, stream, 3, &txcb1);
1819 EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
1820 ret = transport->registerByteEventCallback(
1821 ByteEvent::Type::ACK, stream, 3, &txcb2);
1822 EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
1823
1824 // Deliver the first set of registrations.
1825 EXPECT_CALL(txcb1, onByteEvent(getAckMatcher(stream, 3))).Times(1);
1826 EXPECT_CALL(txcb2, onByteEvent(getAckMatcher(stream, 3))).Times(1);
1827 evb->loopOnce();
1828 Mock::VerifyAndClearExpectations(&txcb1);
1829 Mock::VerifyAndClearExpectations(&txcb2);
1830 }
1831
TEST_F(QuicTransportImplTest,RegisterDeliveryCallbackMultipleRecipientsTx)1832 TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackMultipleRecipientsTx) {
1833 auto stream = transport->createBidirectionalStream().value();
1834 StrictMock<MockByteEventCallback> txcb1;
1835 StrictMock<MockByteEventCallback> txcb2;
1836
1837 // Set the current write offset to 7.
1838 auto streamState = transport->transportConn->streamManager->getStream(stream);
1839 streamState->currentWriteOffset = 7;
1840 streamState->ackedIntervals.insert(0, 6);
1841
1842 // Have 2 different recipients register for a callback on the same stream ID
1843 // and offset.
1844 EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream, 3)));
1845 EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream, 3)));
1846 transport->registerTxCallback(stream, 3, &txcb1);
1847 transport->registerTxCallback(stream, 3, &txcb2);
1848 Mock::VerifyAndClearExpectations(&txcb1);
1849 Mock::VerifyAndClearExpectations(&txcb2);
1850
1851 // Now, *before* the runOnEvbAsync gets a chance to run, simulate the
1852 // delivery of the callback for txcb1 (offset = 3) by deleting it from the
1853 // outstanding callback queue for this stream ID. This is similar to what
1854 // happens in processCallbacksAfterNetworkData.
1855 bool deleted = transport->deleteRegisteredByteEvent(
1856 stream, 3, &txcb1, ByteEvent::Type::TX);
1857 CHECK_EQ(true, deleted);
1858
1859 // Only the callback for txcb2 should be outstanding now. Run the loop to
1860 // confirm.
1861 EXPECT_CALL(txcb1, onByteEvent(getTxMatcher(stream, 3))).Times(0);
1862 EXPECT_CALL(txcb2, onByteEvent(getTxMatcher(stream, 3))).Times(1);
1863 evb->loopOnce();
1864 Mock::VerifyAndClearExpectations(&txcb1);
1865 Mock::VerifyAndClearExpectations(&txcb2);
1866 }
1867
TEST_F(QuicTransportImplTest,RegisterDeliveryCallbackMultipleRecipientsAck)1868 TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackMultipleRecipientsAck) {
1869 auto stream = transport->createBidirectionalStream().value();
1870 StrictMock<MockByteEventCallback> txcb1;
1871 StrictMock<MockByteEventCallback> txcb2;
1872
1873 // Set the current write offset to 7.
1874 auto streamState = transport->transportConn->streamManager->getStream(stream);
1875 streamState->currentWriteOffset = 7;
1876 streamState->ackedIntervals.insert(0, 6);
1877
1878 // Have 2 different recipients register for a callback on the same stream ID
1879 // and offset.
1880 EXPECT_CALL(txcb1, onByteEventRegistered(getAckMatcher(stream, 3)));
1881 EXPECT_CALL(txcb2, onByteEventRegistered(getAckMatcher(stream, 3)));
1882 transport->registerByteEventCallback(ByteEvent::Type::ACK, stream, 3, &txcb1);
1883 transport->registerByteEventCallback(ByteEvent::Type::ACK, stream, 3, &txcb2);
1884 Mock::VerifyAndClearExpectations(&txcb1);
1885 Mock::VerifyAndClearExpectations(&txcb2);
1886
1887 // Now, *before* the runOnEvbAsync gets a chance to run, simulate the
1888 // delivery of the callback for txcb1 (offset = 3) by deleting it from the
1889 // outstanding callback queue for this stream ID. This is similar to what
1890 // happens in processCallbacksAfterNetworkData.
1891 bool deleted = transport->deleteRegisteredByteEvent(
1892 stream, 3, &txcb1, ByteEvent::Type::ACK);
1893 CHECK_EQ(true, deleted);
1894
1895 // Only the callback for txcb2 should be outstanding now. Run the loop to
1896 // confirm.
1897 EXPECT_CALL(txcb1, onByteEvent(getAckMatcher(stream, 3))).Times(0);
1898 EXPECT_CALL(txcb2, onByteEvent(getAckMatcher(stream, 3))).Times(1);
1899 evb->loopOnce();
1900 Mock::VerifyAndClearExpectations(&txcb1);
1901 Mock::VerifyAndClearExpectations(&txcb2);
1902 }
1903
TEST_F(QuicTransportImplTest,RegisterDeliveryCallbackAsyncDeliveryTx)1904 TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackAsyncDeliveryTx) {
1905 auto stream = transport->createBidirectionalStream().value();
1906 StrictMock<MockByteEventCallback> txcb1;
1907 StrictMock<MockByteEventCallback> txcb2;
1908
1909 // Set the current write offset to 7.
1910 auto streamState = transport->transportConn->streamManager->getStream(stream);
1911 streamState->currentWriteOffset = 7;
1912 streamState->ackedIntervals.insert(0, 6);
1913
1914 // Register tx callbacks for the same stream at offsets 3 (before current
1915 // write offset) and 10 (after current write offset).
1916 // txcb1 (offset = 3) will be scheduled in the lambda (runOnEvbAsync)
1917 // for immediate delivery. txcb2 (offset = 10) will be queued for delivery
1918 // when the actual TX for this offset occurs in the future.
1919 EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream, 3)));
1920 EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream, 10)));
1921 transport->registerTxCallback(stream, 3, &txcb1);
1922 transport->registerTxCallback(stream, 10, &txcb2);
1923 Mock::VerifyAndClearExpectations(&txcb1);
1924 Mock::VerifyAndClearExpectations(&txcb2);
1925
1926 // Now, *before* the runOnEvbAsync gets a chance to run, simulate the
1927 // delivery of the callback for txcb1 (offset = 3) by deleting it from the
1928 // outstanding callback queue for this stream ID. This is similar to what
1929 // happens in processCallbacksAfterNetworkData.
1930 bool deleted = transport->deleteRegisteredByteEvent(
1931 stream, 3, &txcb1, ByteEvent::Type::TX);
1932 CHECK_EQ(true, deleted);
1933
1934 // Only txcb2 (offset = 10) should be outstanding now. Run the loop.
1935 // txcb1 (offset = 3) should not be delivered now because it is already
1936 // delivered. txcb2 (offset = 10) should not be delivered because the
1937 // current write offset (7) is still less than the offset requested (10)
1938 EXPECT_CALL(txcb1, onByteEvent(getTxMatcher(stream, 3))).Times(0);
1939 EXPECT_CALL(txcb2, onByteEvent(getTxMatcher(stream, 10))).Times(0);
1940 evb->loopOnce();
1941 Mock::VerifyAndClearExpectations(&txcb1);
1942 Mock::VerifyAndClearExpectations(&txcb2);
1943
1944 EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream, 10)));
1945 transport->close(folly::none);
1946 Mock::VerifyAndClearExpectations(&txcb2);
1947 }
1948
TEST_F(QuicTransportImplTest,RegisterDeliveryCallbackAsyncDeliveryAck)1949 TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackAsyncDeliveryAck) {
1950 auto stream = transport->createBidirectionalStream().value();
1951 StrictMock<MockByteEventCallback> txcb1;
1952 StrictMock<MockByteEventCallback> txcb2;
1953
1954 // Set the current write offset to 7.
1955 auto streamState = transport->transportConn->streamManager->getStream(stream);
1956 streamState->currentWriteOffset = 7;
1957 streamState->ackedIntervals.insert(0, 6);
1958
1959 // Register tx callbacks for the same stream at offsets 3 (before current
1960 // write offset) and 10 (after current write offset).
1961 // txcb1 (offset = 3) will be scheduled in the lambda (runOnEvbAsync)
1962 // for immediate delivery. txcb2 (offset = 10) will be queued for delivery
1963 // when the actual TX for this offset occurs in the future.
1964 EXPECT_CALL(txcb1, onByteEventRegistered(getAckMatcher(stream, 3)));
1965 EXPECT_CALL(txcb2, onByteEventRegistered(getAckMatcher(stream, 10)));
1966 transport->registerByteEventCallback(ByteEvent::Type::ACK, stream, 3, &txcb1);
1967 transport->registerByteEventCallback(
1968 ByteEvent::Type::ACK, stream, 10, &txcb2);
1969 Mock::VerifyAndClearExpectations(&txcb1);
1970 Mock::VerifyAndClearExpectations(&txcb2);
1971
1972 // Now, *before* the runOnEvbAsync gets a chance to run, simulate the
1973 // delivery of the callback for txcb1 (offset = 3) by deleting it from the
1974 // outstanding callback queue for this stream ID. This is similar to what
1975 // happens in processCallbacksAfterNetworkData.
1976 bool deleted = transport->deleteRegisteredByteEvent(
1977 stream, 3, &txcb1, ByteEvent::Type::ACK);
1978 CHECK_EQ(true, deleted);
1979
1980 // Only txcb2 (offset = 10) should be outstanding now. Run the loop.
1981 // txcb1 (offset = 3) should not be delivered now because it is already
1982 // delivered. txcb2 (offset = 10) should not be delivered because the
1983 // current write offset (7) is still less than the offset requested (10)
1984 EXPECT_CALL(txcb1, onByteEvent(getAckMatcher(stream, 3))).Times(0);
1985 EXPECT_CALL(txcb2, onByteEvent(getAckMatcher(stream, 10))).Times(0);
1986 evb->loopOnce();
1987 Mock::VerifyAndClearExpectations(&txcb1);
1988 Mock::VerifyAndClearExpectations(&txcb2);
1989
1990 EXPECT_CALL(txcb2, onByteEventCanceled(getAckMatcher(stream, 10)));
1991 transport->close(folly::none);
1992 Mock::VerifyAndClearExpectations(&txcb2);
1993 }
1994
TEST_F(QuicTransportImplTest,CancelAllByteEventCallbacks)1995 TEST_F(QuicTransportImplTest, CancelAllByteEventCallbacks) {
1996 auto stream1 = transport->createBidirectionalStream().value();
1997 auto stream2 = transport->createBidirectionalStream().value();
1998
1999 NiceMock<MockByteEventCallback> txcb1;
2000 NiceMock<MockByteEventCallback> txcb2;
2001 EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 10)));
2002 EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 20)));
2003 transport->registerTxCallback(stream1, 10, &txcb1);
2004 transport->registerTxCallback(stream2, 20, &txcb2);
2005
2006 NiceMock<MockDeliveryCallback> dcb1;
2007 NiceMock<MockDeliveryCallback> dcb2;
2008 transport->registerDeliveryCallback(stream1, 10, &dcb1);
2009 transport->registerDeliveryCallback(stream2, 20, &dcb2);
2010
2011 EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream1));
2012 EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream2));
2013 EXPECT_EQ(
2014 1,
2015 transport->getNumByteEventCallbacksForStream(
2016 ByteEvent::Type::TX, stream1));
2017 EXPECT_EQ(
2018 1,
2019 transport->getNumByteEventCallbacksForStream(
2020 ByteEvent::Type::TX, stream2));
2021 EXPECT_EQ(
2022 1,
2023 transport->getNumByteEventCallbacksForStream(
2024 ByteEvent::Type::ACK, stream1));
2025 EXPECT_EQ(
2026 1,
2027 transport->getNumByteEventCallbacksForStream(
2028 ByteEvent::Type::ACK, stream2));
2029
2030 EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10)));
2031 EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 20)));
2032 EXPECT_CALL(dcb1, onCanceled(_, _));
2033 EXPECT_CALL(dcb2, onCanceled(_, _));
2034
2035 transport->cancelAllByteEventCallbacks();
2036 Mock::VerifyAndClearExpectations(&txcb1);
2037 Mock::VerifyAndClearExpectations(&txcb2);
2038 Mock::VerifyAndClearExpectations(&dcb1);
2039 Mock::VerifyAndClearExpectations(&dcb2);
2040
2041 EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream1));
2042 EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream2));
2043 EXPECT_EQ(
2044 0,
2045 transport->getNumByteEventCallbacksForStream(
2046 ByteEvent::Type::TX, stream1));
2047 EXPECT_EQ(
2048 0,
2049 transport->getNumByteEventCallbacksForStream(
2050 ByteEvent::Type::TX, stream2));
2051 EXPECT_EQ(
2052 0,
2053 transport->getNumByteEventCallbacksForStream(
2054 ByteEvent::Type::ACK, stream1));
2055 EXPECT_EQ(
2056 0,
2057 transport->getNumByteEventCallbacksForStream(
2058 ByteEvent::Type::ACK, stream2));
2059
2060 EXPECT_CALL(txcb1, onByteEventCanceled(_)).Times(0);
2061 EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0);
2062 EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0);
2063 EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
2064
2065 transport->close(folly::none);
2066 Mock::VerifyAndClearExpectations(&txcb1);
2067 Mock::VerifyAndClearExpectations(&txcb2);
2068 Mock::VerifyAndClearExpectations(&dcb1);
2069 Mock::VerifyAndClearExpectations(&dcb2);
2070 }
2071
TEST_F(QuicTransportImplTest,CancelByteEventCallbacksForStream)2072 TEST_F(QuicTransportImplTest, CancelByteEventCallbacksForStream) {
2073 auto stream1 = transport->createBidirectionalStream().value();
2074 auto stream2 = transport->createBidirectionalStream().value();
2075 StrictMock<MockByteEventCallback> txcb1;
2076 StrictMock<MockByteEventCallback> txcb2;
2077 NiceMock<MockDeliveryCallback> dcb1;
2078 NiceMock<MockDeliveryCallback> dcb2;
2079
2080 EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 10)));
2081 EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 20)));
2082 transport->registerTxCallback(stream1, 10, &txcb1);
2083 transport->registerTxCallback(stream2, 20, &txcb2);
2084 transport->registerDeliveryCallback(stream1, 10, &dcb1);
2085 transport->registerDeliveryCallback(stream2, 20, &dcb2);
2086
2087 EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream1));
2088 EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream2));
2089 EXPECT_EQ(
2090 1,
2091 transport->getNumByteEventCallbacksForStream(
2092 ByteEvent::Type::TX, stream1));
2093 EXPECT_EQ(
2094 1,
2095 transport->getNumByteEventCallbacksForStream(
2096 ByteEvent::Type::TX, stream2));
2097 EXPECT_EQ(
2098 1,
2099 transport->getNumByteEventCallbacksForStream(
2100 ByteEvent::Type::ACK, stream1));
2101 EXPECT_EQ(
2102 1,
2103 transport->getNumByteEventCallbacksForStream(
2104 ByteEvent::Type::ACK, stream2));
2105
2106 EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10)));
2107 EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0);
2108 EXPECT_CALL(dcb1, onCanceled(stream1, 10));
2109 EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
2110
2111 transport->cancelByteEventCallbacksForStream(stream1);
2112 Mock::VerifyAndClearExpectations(&txcb1);
2113 Mock::VerifyAndClearExpectations(&txcb2);
2114 Mock::VerifyAndClearExpectations(&dcb1);
2115 Mock::VerifyAndClearExpectations(&dcb2);
2116
2117 EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream1));
2118 EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream2));
2119 EXPECT_EQ(
2120 0,
2121 transport->getNumByteEventCallbacksForStream(
2122 ByteEvent::Type::TX, stream1));
2123 EXPECT_EQ(
2124 1,
2125 transport->getNumByteEventCallbacksForStream(
2126 ByteEvent::Type::TX, stream2));
2127 EXPECT_EQ(
2128 0,
2129 transport->getNumByteEventCallbacksForStream(
2130 ByteEvent::Type::ACK, stream1));
2131 EXPECT_EQ(
2132 1,
2133 transport->getNumByteEventCallbacksForStream(
2134 ByteEvent::Type::ACK, stream2));
2135
2136 EXPECT_CALL(txcb1, onByteEventCanceled(_)).Times(0);
2137 EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 20)));
2138 EXPECT_CALL(dcb1, onCanceled(stream1, _)).Times(0);
2139 EXPECT_CALL(dcb2, onCanceled(_, 20));
2140
2141 transport->close(folly::none);
2142 Mock::VerifyAndClearExpectations(&txcb1);
2143 Mock::VerifyAndClearExpectations(&txcb2);
2144 Mock::VerifyAndClearExpectations(&dcb1);
2145 Mock::VerifyAndClearExpectations(&dcb2);
2146 }
2147
TEST_F(QuicTransportImplTest,CancelByteEventCallbacksForStreamWithOffset)2148 TEST_F(QuicTransportImplTest, CancelByteEventCallbacksForStreamWithOffset) {
2149 auto stream1 = transport->createBidirectionalStream().value();
2150 auto stream2 = transport->createBidirectionalStream().value();
2151 StrictMock<MockByteEventCallback> txcb1;
2152 StrictMock<MockByteEventCallback> txcb2;
2153 NiceMock<MockDeliveryCallback> dcb1;
2154 NiceMock<MockDeliveryCallback> dcb2;
2155
2156 EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream1));
2157 EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream2));
2158 EXPECT_EQ(
2159 0,
2160 transport->getNumByteEventCallbacksForStream(
2161 ByteEvent::Type::TX, stream1));
2162 EXPECT_EQ(
2163 0,
2164 transport->getNumByteEventCallbacksForStream(
2165 ByteEvent::Type::TX, stream2));
2166 EXPECT_EQ(
2167 0,
2168 transport->getNumByteEventCallbacksForStream(
2169 ByteEvent::Type::ACK, stream1));
2170 EXPECT_EQ(
2171 0,
2172 transport->getNumByteEventCallbacksForStream(
2173 ByteEvent::Type::ACK, stream2));
2174
2175 EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 10)));
2176 EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 15)));
2177 EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 20)));
2178 EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 10)));
2179 EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 15)));
2180 EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 20)));
2181 transport->registerTxCallback(stream1, 10, &txcb1);
2182 transport->registerTxCallback(stream1, 15, &txcb1);
2183 transport->registerTxCallback(stream1, 20, &txcb1);
2184 transport->registerTxCallback(stream2, 10, &txcb2);
2185 transport->registerTxCallback(stream2, 15, &txcb2);
2186 transport->registerTxCallback(stream2, 20, &txcb2);
2187
2188 EXPECT_EQ(3, transport->getNumByteEventCallbacksForStream(stream1));
2189 EXPECT_EQ(3, transport->getNumByteEventCallbacksForStream(stream2));
2190 EXPECT_EQ(
2191 3,
2192 transport->getNumByteEventCallbacksForStream(
2193 ByteEvent::Type::TX, stream1));
2194 EXPECT_EQ(
2195 3,
2196 transport->getNumByteEventCallbacksForStream(
2197 ByteEvent::Type::TX, stream2));
2198 EXPECT_EQ(
2199 0,
2200 transport->getNumByteEventCallbacksForStream(
2201 ByteEvent::Type::ACK, stream1));
2202 EXPECT_EQ(
2203 0,
2204 transport->getNumByteEventCallbacksForStream(
2205 ByteEvent::Type::ACK, stream2));
2206
2207 transport->registerDeliveryCallback(stream1, 10, &dcb1);
2208 transport->registerDeliveryCallback(stream1, 15, &dcb1);
2209 transport->registerDeliveryCallback(stream1, 20, &dcb1);
2210 transport->registerDeliveryCallback(stream2, 10, &dcb2);
2211 transport->registerDeliveryCallback(stream2, 15, &dcb2);
2212 transport->registerDeliveryCallback(stream2, 20, &dcb2);
2213
2214 EXPECT_EQ(6, transport->getNumByteEventCallbacksForStream(stream1));
2215 EXPECT_EQ(6, transport->getNumByteEventCallbacksForStream(stream2));
2216 EXPECT_EQ(
2217 3,
2218 transport->getNumByteEventCallbacksForStream(
2219 ByteEvent::Type::TX, stream1));
2220 EXPECT_EQ(
2221 3,
2222 transport->getNumByteEventCallbacksForStream(
2223 ByteEvent::Type::TX, stream2));
2224 EXPECT_EQ(
2225 3,
2226 transport->getNumByteEventCallbacksForStream(
2227 ByteEvent::Type::ACK, stream1));
2228 EXPECT_EQ(
2229 3,
2230 transport->getNumByteEventCallbacksForStream(
2231 ByteEvent::Type::ACK, stream2));
2232
2233 EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10)));
2234 EXPECT_CALL(dcb1, onCanceled(stream1, 10));
2235
2236 // cancels if offset is < (not <=) offset provided
2237 transport->cancelByteEventCallbacksForStream(stream1, 15);
2238 Mock::VerifyAndClearExpectations(&txcb1);
2239 Mock::VerifyAndClearExpectations(&txcb2);
2240 Mock::VerifyAndClearExpectations(&dcb1);
2241 Mock::VerifyAndClearExpectations(&dcb2);
2242
2243 EXPECT_EQ(4, transport->getNumByteEventCallbacksForStream(stream1));
2244 EXPECT_EQ(6, transport->getNumByteEventCallbacksForStream(stream2));
2245 EXPECT_EQ(
2246 2,
2247 transport->getNumByteEventCallbacksForStream(
2248 ByteEvent::Type::TX, stream1));
2249 EXPECT_EQ(
2250 3,
2251 transport->getNumByteEventCallbacksForStream(
2252 ByteEvent::Type::TX, stream2));
2253 EXPECT_EQ(
2254 2,
2255 transport->getNumByteEventCallbacksForStream(
2256 ByteEvent::Type::ACK, stream1));
2257 EXPECT_EQ(
2258 3,
2259 transport->getNumByteEventCallbacksForStream(
2260 ByteEvent::Type::ACK, stream2));
2261
2262 EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 15)));
2263 EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 20)));
2264 EXPECT_CALL(dcb1, onCanceled(stream1, 15));
2265 EXPECT_CALL(dcb1, onCanceled(stream1, 20));
2266
2267 // cancels if offset is < (not <=) offset provided
2268 transport->cancelByteEventCallbacksForStream(stream1, 21);
2269 Mock::VerifyAndClearExpectations(&txcb1);
2270 Mock::VerifyAndClearExpectations(&txcb2);
2271 Mock::VerifyAndClearExpectations(&dcb1);
2272 Mock::VerifyAndClearExpectations(&dcb2);
2273
2274 EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream1));
2275 EXPECT_EQ(6, transport->getNumByteEventCallbacksForStream(stream2));
2276 EXPECT_EQ(
2277 0,
2278 transport->getNumByteEventCallbacksForStream(
2279 ByteEvent::Type::TX, stream1));
2280 EXPECT_EQ(
2281 3,
2282 transport->getNumByteEventCallbacksForStream(
2283 ByteEvent::Type::TX, stream2));
2284 EXPECT_EQ(
2285 0,
2286 transport->getNumByteEventCallbacksForStream(
2287 ByteEvent::Type::ACK, stream1));
2288 EXPECT_EQ(
2289 3,
2290 transport->getNumByteEventCallbacksForStream(
2291 ByteEvent::Type::ACK, stream2));
2292
2293 EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 10)));
2294 EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 15)));
2295 EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 20)));
2296 EXPECT_CALL(dcb2, onCanceled(stream2, 10));
2297 EXPECT_CALL(dcb2, onCanceled(stream2, 15));
2298 EXPECT_CALL(dcb2, onCanceled(stream2, 20));
2299
2300 transport->close(folly::none);
2301 Mock::VerifyAndClearExpectations(&txcb1);
2302 Mock::VerifyAndClearExpectations(&txcb2);
2303 Mock::VerifyAndClearExpectations(&dcb1);
2304 Mock::VerifyAndClearExpectations(&dcb2);
2305
2306 EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream1));
2307 EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream2));
2308 EXPECT_EQ(
2309 0,
2310 transport->getNumByteEventCallbacksForStream(
2311 ByteEvent::Type::TX, stream1));
2312 EXPECT_EQ(
2313 0,
2314 transport->getNumByteEventCallbacksForStream(
2315 ByteEvent::Type::TX, stream2));
2316 EXPECT_EQ(
2317 0,
2318 transport->getNumByteEventCallbacksForStream(
2319 ByteEvent::Type::ACK, stream1));
2320 EXPECT_EQ(
2321 0,
2322 transport->getNumByteEventCallbacksForStream(
2323 ByteEvent::Type::ACK, stream2));
2324 }
2325
TEST_F(QuicTransportImplTest,CancelByteEventCallbacksTx)2326 TEST_F(QuicTransportImplTest, CancelByteEventCallbacksTx) {
2327 auto stream1 = transport->createBidirectionalStream().value();
2328 auto stream2 = transport->createBidirectionalStream().value();
2329 StrictMock<MockByteEventCallback> txcb1;
2330 StrictMock<MockByteEventCallback> txcb2;
2331 NiceMock<MockDeliveryCallback> dcb1;
2332 NiceMock<MockDeliveryCallback> dcb2;
2333
2334 EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 10)));
2335 EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 15)));
2336 EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 10)));
2337 EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 15)));
2338 transport->registerTxCallback(stream1, 10, &txcb1);
2339 transport->registerTxCallback(stream1, 15, &txcb1);
2340 transport->registerTxCallback(stream2, 10, &txcb2);
2341 transport->registerTxCallback(stream2, 15, &txcb2);
2342 transport->registerDeliveryCallback(stream1, 10, &dcb1);
2343 transport->registerDeliveryCallback(stream1, 15, &dcb1);
2344 transport->registerDeliveryCallback(stream2, 10, &dcb2);
2345 transport->registerDeliveryCallback(stream2, 15, &dcb2);
2346
2347 EXPECT_EQ(4, transport->getNumByteEventCallbacksForStream(stream1));
2348 EXPECT_EQ(4, transport->getNumByteEventCallbacksForStream(stream2));
2349 EXPECT_EQ(
2350 2,
2351 transport->getNumByteEventCallbacksForStream(
2352 ByteEvent::Type::TX, stream1));
2353 EXPECT_EQ(
2354 2,
2355 transport->getNumByteEventCallbacksForStream(
2356 ByteEvent::Type::TX, stream2));
2357 EXPECT_EQ(
2358 2,
2359 transport->getNumByteEventCallbacksForStream(
2360 ByteEvent::Type::ACK, stream1));
2361 EXPECT_EQ(
2362 2,
2363 transport->getNumByteEventCallbacksForStream(
2364 ByteEvent::Type::ACK, stream2));
2365
2366 EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10)));
2367 EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 15)));
2368 EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 10)));
2369 EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 15)));
2370
2371 transport->cancelByteEventCallbacks(ByteEvent::Type::TX);
2372 Mock::VerifyAndClearExpectations(&txcb1);
2373 Mock::VerifyAndClearExpectations(&txcb2);
2374 Mock::VerifyAndClearExpectations(&dcb1);
2375 Mock::VerifyAndClearExpectations(&dcb2);
2376
2377 EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream1));
2378 EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream2));
2379 EXPECT_EQ(
2380 0,
2381 transport->getNumByteEventCallbacksForStream(
2382 ByteEvent::Type::TX, stream1));
2383 EXPECT_EQ(
2384 0,
2385 transport->getNumByteEventCallbacksForStream(
2386 ByteEvent::Type::TX, stream2));
2387 EXPECT_EQ(
2388 2,
2389 transport->getNumByteEventCallbacksForStream(
2390 ByteEvent::Type::ACK, stream1));
2391 EXPECT_EQ(
2392 2,
2393 transport->getNumByteEventCallbacksForStream(
2394 ByteEvent::Type::ACK, stream2));
2395
2396 EXPECT_CALL(dcb1, onCanceled(stream1, 10));
2397 EXPECT_CALL(dcb1, onCanceled(stream1, 15));
2398 EXPECT_CALL(dcb2, onCanceled(stream2, 10));
2399 EXPECT_CALL(dcb2, onCanceled(stream2, 15));
2400
2401 transport->close(folly::none);
2402 Mock::VerifyAndClearExpectations(&txcb1);
2403 Mock::VerifyAndClearExpectations(&txcb2);
2404 Mock::VerifyAndClearExpectations(&dcb1);
2405 Mock::VerifyAndClearExpectations(&dcb2);
2406 }
2407
TEST_F(QuicTransportImplTest,CancelByteEventCallbacksDelivery)2408 TEST_F(QuicTransportImplTest, CancelByteEventCallbacksDelivery) {
2409 auto stream1 = transport->createBidirectionalStream().value();
2410 auto stream2 = transport->createBidirectionalStream().value();
2411 StrictMock<MockByteEventCallback> txcb1;
2412 StrictMock<MockByteEventCallback> txcb2;
2413 NiceMock<MockDeliveryCallback> dcb1;
2414 NiceMock<MockDeliveryCallback> dcb2;
2415
2416 EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 10)));
2417 EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 15)));
2418 EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 10)));
2419 EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 15)));
2420 transport->registerTxCallback(stream1, 10, &txcb1);
2421 transport->registerTxCallback(stream1, 15, &txcb1);
2422 transport->registerTxCallback(stream2, 10, &txcb2);
2423 transport->registerTxCallback(stream2, 15, &txcb2);
2424 transport->registerDeliveryCallback(stream1, 10, &dcb1);
2425 transport->registerDeliveryCallback(stream1, 15, &dcb1);
2426 transport->registerDeliveryCallback(stream2, 10, &dcb2);
2427 transport->registerDeliveryCallback(stream2, 15, &dcb2);
2428
2429 EXPECT_EQ(4, transport->getNumByteEventCallbacksForStream(stream1));
2430 EXPECT_EQ(4, transport->getNumByteEventCallbacksForStream(stream2));
2431 EXPECT_EQ(
2432 2,
2433 transport->getNumByteEventCallbacksForStream(
2434 ByteEvent::Type::TX, stream1));
2435 EXPECT_EQ(
2436 2,
2437 transport->getNumByteEventCallbacksForStream(
2438 ByteEvent::Type::TX, stream2));
2439 EXPECT_EQ(
2440 2,
2441 transport->getNumByteEventCallbacksForStream(
2442 ByteEvent::Type::ACK, stream1));
2443 EXPECT_EQ(
2444 2,
2445 transport->getNumByteEventCallbacksForStream(
2446 ByteEvent::Type::ACK, stream2));
2447
2448 EXPECT_CALL(dcb1, onCanceled(stream1, 10));
2449 EXPECT_CALL(dcb1, onCanceled(stream1, 15));
2450 EXPECT_CALL(dcb2, onCanceled(stream2, 10));
2451 EXPECT_CALL(dcb2, onCanceled(stream2, 15));
2452
2453 transport->cancelByteEventCallbacks(ByteEvent::Type::ACK);
2454 Mock::VerifyAndClearExpectations(&txcb1);
2455 Mock::VerifyAndClearExpectations(&txcb2);
2456 Mock::VerifyAndClearExpectations(&dcb1);
2457 Mock::VerifyAndClearExpectations(&dcb2);
2458
2459 EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream1));
2460 EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream2));
2461 EXPECT_EQ(
2462 2,
2463 transport->getNumByteEventCallbacksForStream(
2464 ByteEvent::Type::TX, stream1));
2465 EXPECT_EQ(
2466 2,
2467 transport->getNumByteEventCallbacksForStream(
2468 ByteEvent::Type::TX, stream2));
2469 EXPECT_EQ(
2470 0,
2471 transport->getNumByteEventCallbacksForStream(
2472 ByteEvent::Type::ACK, stream1));
2473 EXPECT_EQ(
2474 0,
2475 transport->getNumByteEventCallbacksForStream(
2476 ByteEvent::Type::ACK, stream2));
2477
2478 EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10)));
2479 EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 15)));
2480 EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 10)));
2481 EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 15)));
2482
2483 transport->close(folly::none);
2484 Mock::VerifyAndClearExpectations(&txcb1);
2485 Mock::VerifyAndClearExpectations(&txcb2);
2486 Mock::VerifyAndClearExpectations(&dcb1);
2487 Mock::VerifyAndClearExpectations(&dcb2);
2488 }
2489
TEST_F(QuicTransportImplTest,TestNotifyPendingConnWriteOnCloseWithoutError)2490 TEST_F(QuicTransportImplTest, TestNotifyPendingConnWriteOnCloseWithoutError) {
2491 NiceMock<MockWriteCallback> wcb;
2492 EXPECT_CALL(
2493 wcb,
2494 onConnectionWriteError(IsError(GenericApplicationErrorCode::NO_ERROR)));
2495 transport->notifyPendingWriteOnConnection(&wcb);
2496 transport->close(folly::none);
2497 evb->loopOnce();
2498 }
2499
TEST_P(QuicTransportImplTestClose,TestNotifyPendingConnWriteOnCloseWithError)2500 TEST_P(QuicTransportImplTestClose, TestNotifyPendingConnWriteOnCloseWithError) {
2501 NiceMock<MockWriteCallback> wcb;
2502 transport->notifyPendingWriteOnConnection(&wcb);
2503 if (GetParam()) {
2504 EXPECT_CALL(
2505 wcb,
2506 onConnectionWriteError(
2507 IsAppError(GenericApplicationErrorCode::UNKNOWN)));
2508 transport->close(std::make_pair(
2509 QuicErrorCode(GenericApplicationErrorCode::UNKNOWN),
2510 std::string("Bye")));
2511 } else {
2512 transport->close(folly::none);
2513 }
2514 evb->loopOnce();
2515 }
2516
TEST_F(QuicTransportImplTest,TestNotifyPendingWriteWithActiveCallback)2517 TEST_F(QuicTransportImplTest, TestNotifyPendingWriteWithActiveCallback) {
2518 auto stream = transport->createBidirectionalStream().value();
2519 NiceMock<MockWriteCallback> wcb;
2520 EXPECT_CALL(wcb, onStreamWriteReady(stream, _));
2521 auto ok1 = transport->notifyPendingWriteOnStream(stream, &wcb);
2522 EXPECT_TRUE(ok1.hasValue());
2523 auto ok2 = transport->notifyPendingWriteOnStream(stream, &wcb);
2524 EXPECT_EQ(ok2.error(), quic::LocalErrorCode::CALLBACK_ALREADY_INSTALLED);
2525 evb->loopOnce();
2526 }
2527
TEST_F(QuicTransportImplTest,TestNotifyPendingWriteOnCloseWithoutError)2528 TEST_F(QuicTransportImplTest, TestNotifyPendingWriteOnCloseWithoutError) {
2529 auto stream = transport->createBidirectionalStream().value();
2530 NiceMock<MockWriteCallback> wcb;
2531 EXPECT_CALL(
2532 wcb,
2533 onStreamWriteError(
2534 stream, IsError(GenericApplicationErrorCode::NO_ERROR)));
2535 transport->notifyPendingWriteOnStream(stream, &wcb);
2536 transport->close(folly::none);
2537 evb->loopOnce();
2538 }
2539
TEST_P(QuicTransportImplTestClose,TestNotifyPendingWriteOnCloseWithError)2540 TEST_P(QuicTransportImplTestClose, TestNotifyPendingWriteOnCloseWithError) {
2541 auto stream = transport->createBidirectionalStream().value();
2542 NiceMock<MockWriteCallback> wcb;
2543 transport->notifyPendingWriteOnStream(stream, &wcb);
2544 if (GetParam()) {
2545 EXPECT_CALL(
2546 wcb,
2547 onStreamWriteError(
2548 stream, IsAppError(GenericApplicationErrorCode::UNKNOWN)));
2549 transport->close(std::make_pair(
2550 QuicErrorCode(GenericApplicationErrorCode::UNKNOWN),
2551 std::string("Bye")));
2552 } else {
2553 transport->close(folly::none);
2554 }
2555 evb->loopOnce();
2556 }
2557
TEST_F(QuicTransportImplTest,TestTransportCloseWithMaxPacketNumber)2558 TEST_F(QuicTransportImplTest, TestTransportCloseWithMaxPacketNumber) {
2559 transport->setServerConnectionId();
2560 transport->transportConn->pendingEvents.closeTransport = false;
2561 EXPECT_NO_THROW(transport->invokeWriteSocketData());
2562
2563 transport->transportConn->pendingEvents.closeTransport = true;
2564 EXPECT_THROW(transport->invokeWriteSocketData(), QuicTransportException);
2565 }
2566
TEST_F(QuicTransportImplTest,TestGracefulCloseWithActiveStream)2567 TEST_F(QuicTransportImplTest, TestGracefulCloseWithActiveStream) {
2568 EXPECT_CALL(connCallback, onConnectionEnd()).Times(0);
2569 EXPECT_CALL(connCallback, onConnectionError(_)).Times(0);
2570
2571 auto stream = transport->createBidirectionalStream().value();
2572 NiceMock<MockWriteCallback> wcb;
2573 NiceMock<MockWriteCallback> wcbConn;
2574 NiceMock<MockReadCallback> rcb;
2575 StrictMock<MockByteEventCallback> txCb;
2576 StrictMock<MockDeliveryCallback> deliveryCb;
2577 EXPECT_CALL(
2578 wcb, onStreamWriteError(stream, IsError(LocalErrorCode::NO_ERROR)));
2579 EXPECT_CALL(
2580 wcbConn, onConnectionWriteError(IsError(LocalErrorCode::NO_ERROR)));
2581 EXPECT_CALL(rcb, readError(stream, IsError(LocalErrorCode::NO_ERROR)));
2582 EXPECT_CALL(deliveryCb, onCanceled(stream, _));
2583 EXPECT_CALL(txCb, onByteEventCanceled(getTxMatcher(stream, 0)));
2584 EXPECT_CALL(txCb, onByteEventCanceled(getTxMatcher(stream, 4)));
2585
2586 transport->notifyPendingWriteOnConnection(&wcbConn);
2587 transport->notifyPendingWriteOnStream(stream, &wcb);
2588 transport->setReadCallback(stream, &rcb);
2589 EXPECT_CALL(*socketPtr, write(_, _))
2590 .WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
2591 transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, &deliveryCb);
2592 EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 0)));
2593 EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 4)));
2594 EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError());
2595 EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError());
2596 transport->closeGracefully();
2597
2598 ASSERT_FALSE(transport->transportClosed);
2599 EXPECT_FALSE(transport->createBidirectionalStream());
2600
2601 EXPECT_TRUE(transport->setReadCallback(stream, &rcb).hasError());
2602 EXPECT_TRUE(transport->notifyPendingWriteOnStream(stream, &wcb).hasError());
2603 EXPECT_TRUE(transport->notifyPendingWriteOnConnection(&wcbConn).hasError());
2604 EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 2))).Times(0);
2605 EXPECT_TRUE(transport->registerTxCallback(stream, 2, &txCb).hasError());
2606 EXPECT_TRUE(
2607 transport->registerDeliveryCallback(stream, 2, &deliveryCb).hasError());
2608 EXPECT_TRUE(
2609 transport->resetStream(stream, GenericApplicationErrorCode::UNKNOWN)
2610 .hasError());
2611
2612 transport->addDataToStream(
2613 stream, StreamBuffer(IOBuf::copyBuffer("hello"), 0, false));
2614 EXPECT_FALSE(transport->transportConn->streamManager->getStream(stream)
2615 ->readBuffer.empty());
2616
2617 // Close the last stream.
2618 // TODO: replace this when we call conn callbacks.
2619 // EXPECT_CALL(connCallback, onConnectionEnd());
2620 transport->closeStream(stream);
2621 ASSERT_TRUE(transport->transportClosed);
2622
2623 evb->loopOnce();
2624 }
2625
TEST_F(QuicTransportImplTest,TestGracefulCloseWithNoActiveStream)2626 TEST_F(QuicTransportImplTest, TestGracefulCloseWithNoActiveStream) {
2627 auto stream = transport->createBidirectionalStream().value();
2628 NiceMock<MockWriteCallback> wcb;
2629 NiceMock<MockWriteCallback> wcbConn;
2630 NiceMock<MockReadCallback> rcb;
2631 NiceMock<MockDeliveryCallback> deliveryCb;
2632 NiceMock<MockByteEventCallback> txCb;
2633 EXPECT_CALL(
2634 rcb, readError(stream, IsError(GenericApplicationErrorCode::NO_ERROR)));
2635 EXPECT_CALL(deliveryCb, onDeliveryAck(stream, _, _));
2636 EXPECT_CALL(txCb, onByteEvent(getTxMatcher(stream, 0)));
2637 EXPECT_CALL(txCb, onByteEvent(getTxMatcher(stream, 4)));
2638
2639 EXPECT_CALL(connCallback, onConnectionEnd()).Times(0);
2640 EXPECT_CALL(connCallback, onConnectionError(_)).Times(0);
2641
2642 transport->setReadCallback(stream, &rcb);
2643 EXPECT_CALL(*socketPtr, write(_, _))
2644 .WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
2645 transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, &deliveryCb);
2646 EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 0)));
2647 EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 4)));
2648 EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError());
2649 EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError());
2650
2651 // Close the last stream.
2652 auto streamState = transport->transportConn->streamManager->getStream(stream);
2653 // Fake that the data was TXed and delivered to keep all the state
2654 // consistent.
2655 streamState->currentWriteOffset = 7;
2656 transport->transportConn->streamManager->addTx(stream);
2657 transport->transportConn->streamManager->addDeliverable(stream);
2658 transport->closeStream(stream);
2659 transport->close(folly::none);
2660
2661 ASSERT_TRUE(transport->transportClosed);
2662 EXPECT_FALSE(transport->createBidirectionalStream());
2663
2664 EXPECT_TRUE(transport->setReadCallback(stream, &rcb).hasError());
2665 EXPECT_TRUE(transport->notifyPendingWriteOnStream(stream, &wcb).hasError());
2666 EXPECT_TRUE(transport->notifyPendingWriteOnConnection(&wcbConn).hasError());
2667 EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 2))).Times(0);
2668 EXPECT_TRUE(transport->registerTxCallback(stream, 2, &txCb).hasError());
2669 EXPECT_TRUE(
2670 transport->registerDeliveryCallback(stream, 2, &deliveryCb).hasError());
2671 EXPECT_TRUE(
2672 transport->resetStream(stream, GenericApplicationErrorCode::UNKNOWN)
2673 .hasError());
2674 }
2675
TEST_F(QuicTransportImplTest,TestImmediateClose)2676 TEST_F(QuicTransportImplTest, TestImmediateClose) {
2677 auto stream = transport->createBidirectionalStream().value();
2678 NiceMock<MockWriteCallback> wcb;
2679 NiceMock<MockWriteCallback> wcbConn;
2680 NiceMock<MockReadCallback> rcb;
2681 NiceMock<MockPeekCallback> pcb;
2682 NiceMock<MockDeliveryCallback> deliveryCb;
2683 NiceMock<MockByteEventCallback> txCb;
2684 EXPECT_CALL(
2685 wcb,
2686 onStreamWriteError(
2687 stream, IsAppError(GenericApplicationErrorCode::UNKNOWN)));
2688 EXPECT_CALL(
2689 wcbConn,
2690 onConnectionWriteError(IsAppError(GenericApplicationErrorCode::UNKNOWN)));
2691 EXPECT_CALL(
2692 rcb, readError(stream, IsAppError(GenericApplicationErrorCode::UNKNOWN)));
2693 EXPECT_CALL(
2694 pcb, peekError(stream, IsAppError(GenericApplicationErrorCode::UNKNOWN)));
2695 EXPECT_CALL(deliveryCb, onCanceled(stream, _));
2696 EXPECT_CALL(txCb, onByteEventCanceled(getTxMatcher(stream, 0)));
2697 EXPECT_CALL(txCb, onByteEventCanceled(getTxMatcher(stream, 4)));
2698
2699 EXPECT_CALL(connCallback, onConnectionError(_)).Times(0);
2700
2701 transport->notifyPendingWriteOnConnection(&wcbConn);
2702 transport->notifyPendingWriteOnStream(stream, &wcb);
2703 transport->setReadCallback(stream, &rcb);
2704 transport->setPeekCallback(stream, &pcb);
2705 EXPECT_CALL(*socketPtr, write(_, _))
2706 .WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
2707 transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, &deliveryCb);
2708 EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 0)));
2709 EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 4)));
2710 EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError());
2711 EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError());
2712 transport->close(std::make_pair(
2713 QuicErrorCode(GenericApplicationErrorCode::UNKNOWN),
2714 std::string("Error")));
2715
2716 ASSERT_TRUE(transport->transportClosed);
2717 EXPECT_FALSE(transport->createBidirectionalStream());
2718
2719 EXPECT_TRUE(transport->setReadCallback(stream, &rcb).hasError());
2720 EXPECT_TRUE(transport->notifyPendingWriteOnStream(stream, &wcb).hasError());
2721 EXPECT_TRUE(transport->notifyPendingWriteOnConnection(&wcbConn).hasError());
2722 EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 2))).Times(0);
2723 EXPECT_TRUE(transport->registerTxCallback(stream, 2, &txCb).hasError());
2724 EXPECT_TRUE(
2725 transport->registerDeliveryCallback(stream, 2, &deliveryCb).hasError());
2726 EXPECT_TRUE(
2727 transport->resetStream(stream, GenericApplicationErrorCode::UNKNOWN)
2728 .hasError());
2729
2730 transport->addDataToStream(
2731 stream, StreamBuffer(IOBuf::copyBuffer("hello"), 0, false));
2732 EXPECT_EQ(
2733 transport->transportConn->streamManager->getStream(stream), nullptr);
2734 evb->loopOnce();
2735 }
2736
TEST_F(QuicTransportImplTest,ResetStreamUnsetWriteCallback)2737 TEST_F(QuicTransportImplTest, ResetStreamUnsetWriteCallback) {
2738 auto stream = transport->createBidirectionalStream().value();
2739 NiceMock<MockWriteCallback> wcb;
2740 EXPECT_CALL(wcb, onStreamWriteError(stream, _)).Times(0);
2741 transport->notifyPendingWriteOnStream(stream, &wcb);
2742 EXPECT_FALSE(
2743 transport->resetStream(stream, GenericApplicationErrorCode::UNKNOWN)
2744 .hasError());
2745 evb->loopOnce();
2746 }
2747
TEST_F(QuicTransportImplTest,ResetAllNonControlStreams)2748 TEST_F(QuicTransportImplTest, ResetAllNonControlStreams) {
2749 auto stream1 = transport->createBidirectionalStream().value();
2750 ASSERT_FALSE(transport->setControlStream(stream1));
2751 NiceMock<MockWriteCallback> wcb1;
2752 NiceMock<MockReadCallback> rcb1;
2753 EXPECT_CALL(wcb1, onStreamWriteError(stream1, _)).Times(0);
2754 EXPECT_CALL(rcb1, readError(stream1, _)).Times(0);
2755 transport->notifyPendingWriteOnStream(stream1, &wcb1);
2756 transport->setReadCallback(stream1, &rcb1);
2757
2758 auto stream2 = transport->createBidirectionalStream().value();
2759 NiceMock<MockWriteCallback> wcb2;
2760 NiceMock<MockReadCallback> rcb2;
2761 EXPECT_CALL(wcb2, onStreamWriteError(stream2, _)).Times(1);
2762 EXPECT_CALL(rcb2, readError(stream2, _)).Times(1);
2763 transport->notifyPendingWriteOnStream(stream2, &wcb2);
2764 transport->setReadCallback(stream2, &rcb2);
2765
2766 auto stream3 = transport->createUnidirectionalStream().value();
2767 NiceMock<MockWriteCallback> wcb3;
2768 transport->notifyPendingWriteOnStream(stream3, &wcb3);
2769 EXPECT_CALL(wcb3, onStreamWriteError(stream3, _)).Times(1);
2770
2771 auto stream4 = transport->createBidirectionalStream().value();
2772 NiceMock<MockWriteCallback> wcb4;
2773 NiceMock<MockReadCallback> rcb4;
2774 EXPECT_CALL(wcb4, onStreamWriteError(stream4, _))
2775 .WillOnce(Invoke(
2776 [&](auto, auto) { transport->setReadCallback(stream4, nullptr); }));
2777 EXPECT_CALL(rcb4, readError(_, _)).Times(0);
2778 transport->notifyPendingWriteOnStream(stream4, &wcb4);
2779 transport->setReadCallback(stream4, &rcb4);
2780
2781 transport->resetNonControlStreams(
2782 GenericApplicationErrorCode::UNKNOWN, "bye bye");
2783 evb->loopOnce();
2784
2785 // Have to manually unset the read callbacks so they aren't use-after-freed.
2786 transport->unsetAllReadCallbacks();
2787 }
2788
TEST_F(QuicTransportImplTest,DestroyWithoutClosing)2789 TEST_F(QuicTransportImplTest, DestroyWithoutClosing) {
2790 EXPECT_CALL(connCallback, onConnectionError(_)).Times(0);
2791 EXPECT_CALL(connCallback, onConnectionEnd()).Times(0);
2792 transport.reset();
2793 }
2794
TEST_F(QuicTransportImplTest,UncleanShutdownEventBase)2795 TEST_F(QuicTransportImplTest, UncleanShutdownEventBase) {
2796 // if abruptly shutting down the eventbase we should avoid scheduling
2797 // any new timer.
2798 transport->setIdleTimeout();
2799 evb.reset();
2800 }
2801
TEST_F(QuicTransportImplTest,GetLocalAddressBoundSocket)2802 TEST_F(QuicTransportImplTest, GetLocalAddressBoundSocket) {
2803 SocketAddress addr("127.0.0.1", 443);
2804 EXPECT_CALL(*socketPtr, isBound()).WillOnce(Return(true));
2805 EXPECT_CALL(*socketPtr, address()).WillRepeatedly(ReturnRef(addr));
2806 SocketAddress localAddr = transport->getLocalAddress();
2807 EXPECT_TRUE(localAddr == addr);
2808 }
2809
TEST_F(QuicTransportImplTest,GetLocalAddressUnboundSocket)2810 TEST_F(QuicTransportImplTest, GetLocalAddressUnboundSocket) {
2811 EXPECT_CALL(*socketPtr, isBound()).WillOnce(Return(false));
2812 SocketAddress localAddr = transport->getLocalAddress();
2813 EXPECT_FALSE(localAddr.isInitialized());
2814 }
2815
TEST_F(QuicTransportImplTest,GetLocalAddressBadSocket)2816 TEST_F(QuicTransportImplTest, GetLocalAddressBadSocket) {
2817 auto badTransport =
2818 std::make_shared<TestQuicTransport>(evb.get(), nullptr, connCallback);
2819 badTransport->closeWithoutWrite();
2820 SocketAddress localAddr = badTransport->getLocalAddress();
2821 EXPECT_FALSE(localAddr.isInitialized());
2822 }
2823
TEST_F(QuicTransportImplTest,AsyncStreamFlowControlWrite)2824 TEST_F(QuicTransportImplTest, AsyncStreamFlowControlWrite) {
2825 transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
2826 auto stream = transport->createBidirectionalStream().value();
2827 auto streamState = transport->transportConn->streamManager->getStream(stream);
2828 transport->setServerConnectionId();
2829 transport->writeLooper()->stop();
2830 streamState->flowControlState.advertisedMaxOffset = 0; // Easier to calculate
2831 transport->setStreamFlowControlWindow(stream, 4000);
2832 EXPECT_EQ(0, streamState->flowControlState.advertisedMaxOffset);
2833 // Loop it:
2834 EXPECT_TRUE(transport->writeLooper()->isRunning());
2835 transport->writeLooper()->runLoopCallback();
2836 EXPECT_EQ(4000, streamState->flowControlState.advertisedMaxOffset);
2837 }
2838
TEST_F(QuicTransportImplTest,ExceptionInWriteLooperDoesNotCrash)2839 TEST_F(QuicTransportImplTest, ExceptionInWriteLooperDoesNotCrash) {
2840 auto stream = transport->createBidirectionalStream().value();
2841 transport->setReadCallback(stream, nullptr);
2842 transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, nullptr);
2843 transport->addDataToStream(
2844 stream, StreamBuffer(IOBuf::copyBuffer("hello"), 0, false));
2845 EXPECT_CALL(*socketPtr, write(_, _)).WillOnce(SetErrnoAndReturn(EBADF, -1));
2846 EXPECT_CALL(connCallback, onConnectionError(_)).WillOnce(Invoke([&](auto) {
2847 transport.reset();
2848 }));
2849 transport->writeLooper()->runLoopCallback();
2850 }
2851
2852 class QuicTransportImplTestUniBidi : public QuicTransportImplTest,
2853 public testing::WithParamInterface<bool> {
2854 };
2855
createStream(std::shared_ptr<TestQuicTransport> transport,bool unidirectional)2856 quic::StreamId createStream(
2857 std::shared_ptr<TestQuicTransport> transport,
2858 bool unidirectional) {
2859 if (unidirectional) {
2860 return transport->createUnidirectionalStream().value();
2861 } else {
2862 return transport->createBidirectionalStream().value();
2863 }
2864 }
2865
2866 INSTANTIATE_TEST_CASE_P(
2867 QuicTransportImplTest,
2868 QuicTransportImplTestUniBidi,
2869 Values(true, false));
2870
TEST_P(QuicTransportImplTestUniBidi,AppIdleTest)2871 TEST_P(QuicTransportImplTestUniBidi, AppIdleTest) {
2872 auto& conn = transport->getConnectionState();
2873 auto mockCongestionController =
2874 std::make_unique<NiceMock<MockCongestionController>>();
2875 auto rawCongestionController = mockCongestionController.get();
2876 conn.congestionController = std::move(mockCongestionController);
2877
2878 EXPECT_CALL(*rawCongestionController, setAppIdle(false, _)).Times(0);
2879 auto stream = createStream(transport, GetParam());
2880
2881 EXPECT_CALL(*rawCongestionController, setAppIdle(true, _));
2882 transport->closeStream(stream);
2883 }
2884
TEST_P(QuicTransportImplTestUniBidi,AppIdleTestControlStreams)2885 TEST_P(QuicTransportImplTestUniBidi, AppIdleTestControlStreams) {
2886 auto& conn = transport->getConnectionState();
2887 auto mockCongestionController =
2888 std::make_unique<NiceMock<MockCongestionController>>();
2889 auto rawCongestionController = mockCongestionController.get();
2890 conn.congestionController = std::move(mockCongestionController);
2891
2892 EXPECT_CALL(*rawCongestionController, setAppIdle(false, _)).Times(0);
2893 auto stream = createStream(transport, GetParam());
2894 ASSERT_TRUE(stream);
2895
2896 auto ctrlStream1 = createStream(transport, GetParam());
2897 ASSERT_TRUE(ctrlStream1);
2898 transport->setControlStream(ctrlStream1);
2899 auto ctrlStream2 = createStream(transport, GetParam());
2900 ASSERT_TRUE(ctrlStream2);
2901 transport->setControlStream(ctrlStream2);
2902
2903 EXPECT_CALL(*rawCongestionController, setAppIdle(true, _));
2904 transport->closeStream(stream);
2905 }
2906
TEST_P(QuicTransportImplTestUniBidi,AppIdleTestOnlyControlStreams)2907 TEST_P(QuicTransportImplTestUniBidi, AppIdleTestOnlyControlStreams) {
2908 auto& conn = transport->getConnectionState();
2909 auto mockCongestionController =
2910 std::make_unique<NiceMock<MockCongestionController>>();
2911 auto rawCongestionController = mockCongestionController.get();
2912 conn.congestionController = std::move(mockCongestionController);
2913
2914 auto ctrlStream1 = createStream(transport, GetParam());
2915 EXPECT_CALL(*rawCongestionController, setAppIdle(true, _)).Times(1);
2916 transport->setControlStream(ctrlStream1);
2917 EXPECT_CALL(*rawCongestionController, setAppIdle(false, _)).Times(1);
2918 auto ctrlStream2 = createStream(transport, GetParam());
2919 EXPECT_CALL(*rawCongestionController, setAppIdle(true, _)).Times(1);
2920 transport->setControlStream(ctrlStream2);
2921
2922 EXPECT_CALL(*rawCongestionController, setAppIdle(_, _)).Times(0);
2923 transport->closeStream(ctrlStream1);
2924 transport->closeStream(ctrlStream2);
2925 }
2926
TEST_F(QuicTransportImplTest,UnidirectionalInvalidReadFuncs)2927 TEST_F(QuicTransportImplTest, UnidirectionalInvalidReadFuncs) {
2928 auto stream = transport->createUnidirectionalStream().value();
2929 EXPECT_THROW(
2930 transport->read(stream, 100).thenOrThrow([&](auto) {}),
2931 folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
2932 EXPECT_THROW(
2933 transport->setReadCallback(stream, nullptr).thenOrThrow([&](auto) {}),
2934 folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
2935 EXPECT_THROW(
2936 transport->pauseRead(stream).thenOrThrow([&](auto) {}),
2937 folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
2938 EXPECT_THROW(
2939 transport->resumeRead(stream).thenOrThrow([&](auto) {}),
2940 folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
2941 EXPECT_THROW(
2942 transport->stopSending(stream, GenericApplicationErrorCode::UNKNOWN)
2943 .thenOrThrow([&](auto) {}),
2944 folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
2945 }
2946
TEST_F(QuicTransportImplTest,UnidirectionalInvalidWriteFuncs)2947 TEST_F(QuicTransportImplTest, UnidirectionalInvalidWriteFuncs) {
2948 auto readData = folly::IOBuf::copyBuffer("actual stream data");
2949 StreamId stream = 0x6;
2950 transport->addDataToStream(stream, StreamBuffer(readData->clone(), 0, true));
2951 EXPECT_THROW(
2952 transport->getStreamWriteOffset(stream).thenOrThrow([&](auto) {}),
2953 folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
2954 EXPECT_THROW(
2955 transport->getStreamWriteBufferedBytes(stream).thenOrThrow([&](auto) {}),
2956 folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
2957 EXPECT_THROW(
2958 transport->notifyPendingWriteOnStream(stream, nullptr)
2959 .thenOrThrow([&](auto) {}),
2960 folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
2961 EXPECT_THROW(
2962 transport->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), false)
2963 .thenOrThrow([&](auto) {}),
2964 folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
2965 EXPECT_THROW(
2966 transport->registerDeliveryCallback(stream, 0, nullptr)
2967 .thenOrThrow([&](auto) {}),
2968 folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
2969 EXPECT_THROW(
2970 transport->registerTxCallback(stream, 0, nullptr).thenOrThrow([&](auto) {
2971 }),
2972 folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
2973 EXPECT_THROW(
2974 transport
2975 ->registerByteEventCallback(ByteEvent::Type::ACK, stream, 0, nullptr)
2976 .thenOrThrow([&](auto) {}),
2977 folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
2978 EXPECT_THROW(
2979 transport
2980 ->registerByteEventCallback(ByteEvent::Type::TX, stream, 0, nullptr)
2981 .thenOrThrow([&](auto) {}),
2982 folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
2983 EXPECT_THROW(
2984 transport->resetStream(stream, GenericApplicationErrorCode::UNKNOWN)
2985 .thenOrThrow([&](auto) {}),
2986 folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
2987 }
2988
TEST_P(QuicTransportImplTestUniBidi,IsServerStream)2989 TEST_P(QuicTransportImplTestUniBidi, IsServerStream) {
2990 auto stream = createStream(transport, GetParam());
2991 EXPECT_TRUE(transport->isServerStream(stream));
2992 }
2993
TEST_P(QuicTransportImplTestUniBidi,IsClientStream)2994 TEST_P(QuicTransportImplTestUniBidi, IsClientStream) {
2995 auto stream = createStream(transport, GetParam());
2996 EXPECT_FALSE(transport->isClientStream(stream));
2997 }
2998
TEST_F(QuicTransportImplTest,IsUnidirectionalStream)2999 TEST_F(QuicTransportImplTest, IsUnidirectionalStream) {
3000 auto stream = transport->createUnidirectionalStream().value();
3001 EXPECT_TRUE(transport->isUnidirectionalStream(stream));
3002 }
3003
TEST_F(QuicTransportImplTest,IsBidirectionalStream)3004 TEST_F(QuicTransportImplTest, IsBidirectionalStream) {
3005 auto stream = transport->createBidirectionalStream().value();
3006 EXPECT_TRUE(transport->isBidirectionalStream(stream));
3007 }
3008
TEST_F(QuicTransportImplTest,PeekCallbackDataAvailable)3009 TEST_F(QuicTransportImplTest, PeekCallbackDataAvailable) {
3010 auto stream1 = transport->createBidirectionalStream().value();
3011 auto stream2 = transport->createBidirectionalStream().value();
3012
3013 NiceMock<MockPeekCallback> peekCb1;
3014 NiceMock<MockPeekCallback> peekCb2;
3015
3016 transport->setPeekCallback(stream1, &peekCb1);
3017 transport->setPeekCallback(stream2, &peekCb2);
3018
3019 transport->addDataToStream(
3020 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3021
3022 transport->addDataToStream(
3023 stream2,
3024 StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10));
3025
3026 EXPECT_CALL(peekCb1, onDataAvailable(stream1, _));
3027 transport->driveReadCallbacks();
3028
3029 transport->addDataToStream(
3030 stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3031
3032 EXPECT_CALL(peekCb2, onDataAvailable(stream2, _));
3033 transport->driveReadCallbacks();
3034
3035 transport->addDataToStream(
3036 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3037
3038 transport->addDataToStream(
3039 stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3040
3041 EXPECT_CALL(peekCb1, onDataAvailable(stream1, _));
3042 EXPECT_CALL(peekCb2, onDataAvailable(stream2, _));
3043 transport->driveReadCallbacks();
3044
3045 transport->setPeekCallback(stream1, nullptr);
3046 transport->setPeekCallback(stream2, nullptr);
3047 transport->driveReadCallbacks();
3048
3049 transport.reset();
3050 }
3051
TEST_F(QuicTransportImplTest,PeekError)3052 TEST_F(QuicTransportImplTest, PeekError) {
3053 auto stream1 = transport->createBidirectionalStream().value();
3054
3055 NiceMock<MockPeekCallback> peekCb1;
3056 transport->setPeekCallback(stream1, &peekCb1);
3057
3058 transport->addDataToStream(
3059 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3060 transport->addStreamReadError(stream1, LocalErrorCode::STREAM_CLOSED);
3061
3062 EXPECT_CALL(
3063 peekCb1, peekError(stream1, IsError(LocalErrorCode::STREAM_CLOSED)));
3064
3065 transport->driveReadCallbacks();
3066
3067 EXPECT_CALL(peekCb1, peekError(stream1, _));
3068
3069 transport.reset();
3070 }
3071
TEST_F(QuicTransportImplTest,PeekCallbackUnsetAll)3072 TEST_F(QuicTransportImplTest, PeekCallbackUnsetAll) {
3073 auto stream1 = transport->createBidirectionalStream().value();
3074 auto stream2 = transport->createBidirectionalStream().value();
3075
3076 NiceMock<MockPeekCallback> peekCb1;
3077 NiceMock<MockPeekCallback> peekCb2;
3078
3079 // Set the peek callbacks and add data to the streams, and see that the
3080 // callbacks do indeed fire
3081
3082 transport->setPeekCallback(stream1, &peekCb1);
3083 transport->setPeekCallback(stream2, &peekCb2);
3084
3085 transport->addDataToStream(
3086 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3087 transport->addDataToStream(
3088 stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3089
3090 EXPECT_CALL(peekCb1, onDataAvailable(stream1, _));
3091 EXPECT_CALL(peekCb2, onDataAvailable(stream2, _));
3092
3093 transport->driveReadCallbacks();
3094
3095 // unset all of the peek callbacks and see that the callbacks don't fire
3096 // after data is added to the streams
3097
3098 transport->unsetAllPeekCallbacks();
3099
3100 transport->addDataToStream(
3101 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3102 transport->addDataToStream(
3103 stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3104
3105 EXPECT_CALL(peekCb1, onDataAvailable(stream1, _)).Times(0);
3106 EXPECT_CALL(peekCb2, onDataAvailable(stream2, _)).Times(0);
3107
3108 transport->driveReadCallbacks();
3109 }
3110
TEST_F(QuicTransportImplTest,PeekCallbackChangePeekCallback)3111 TEST_F(QuicTransportImplTest, PeekCallbackChangePeekCallback) {
3112 InSequence enforceOrder;
3113
3114 auto stream1 = transport->createBidirectionalStream().value();
3115
3116 NiceMock<MockPeekCallback> peekCb1;
3117 NiceMock<MockPeekCallback> peekCb2;
3118
3119 transport->setPeekCallback(stream1, &peekCb1);
3120
3121 transport->addDataToStream(
3122 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3123
3124 EXPECT_CALL(peekCb1, onDataAvailable(stream1, _));
3125 transport->driveReadCallbacks();
3126
3127 transport->setPeekCallback(stream1, &peekCb2);
3128 transport->addDataToStream(
3129 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3130
3131 EXPECT_CALL(peekCb2, onDataAvailable(stream1, _));
3132 transport->driveReadCallbacks();
3133 transport.reset();
3134 }
3135
TEST_F(QuicTransportImplTest,PeekCallbackPauseResume)3136 TEST_F(QuicTransportImplTest, PeekCallbackPauseResume) {
3137 InSequence enforceOrder;
3138
3139 auto stream1 = transport->createBidirectionalStream().value();
3140 NiceMock<MockPeekCallback> peekCb1;
3141
3142 transport->setPeekCallback(stream1, &peekCb1);
3143
3144 transport->addDataToStream(
3145 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3146
3147 auto res = transport->pausePeek(stream1);
3148 EXPECT_TRUE(res);
3149 EXPECT_CALL(peekCb1, onDataAvailable(stream1, _)).Times(0);
3150 transport->driveReadCallbacks();
3151
3152 res = transport->resumePeek(stream1);
3153 EXPECT_TRUE(res);
3154 EXPECT_CALL(peekCb1, onDataAvailable(stream1, _));
3155 transport->driveReadCallbacks();
3156
3157 auto stream2 = transport->createBidirectionalStream().value();
3158 res = transport->pausePeek(stream2);
3159 EXPECT_FALSE(res);
3160 EXPECT_EQ(LocalErrorCode::APP_ERROR, res.error());
3161 transport.reset();
3162 }
3163
TEST_F(QuicTransportImplTest,PeekCallbackNoCallbackSet)3164 TEST_F(QuicTransportImplTest, PeekCallbackNoCallbackSet) {
3165 auto stream1 = transport->createBidirectionalStream().value();
3166
3167 transport->addDataToStream(
3168 stream1,
3169 StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10));
3170 transport->driveReadCallbacks();
3171 transport->addDataToStream(
3172 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3173 transport->driveReadCallbacks();
3174 transport.reset();
3175 }
3176
TEST_F(QuicTransportImplTest,PeekCallbackInvalidStream)3177 TEST_F(QuicTransportImplTest, PeekCallbackInvalidStream) {
3178 NiceMock<MockPeekCallback> peekCb1;
3179 StreamId invalidStream = 10;
3180 EXPECT_TRUE(transport->setPeekCallback(invalidStream, &peekCb1).hasError());
3181 transport.reset();
3182 }
3183
TEST_F(QuicTransportImplTest,PeekData)3184 TEST_F(QuicTransportImplTest, PeekData) {
3185 InSequence enforceOrder;
3186
3187 auto stream1 = transport->createBidirectionalStream().value();
3188
3189 NiceMock<MockPeekCallback> peekCb1;
3190 auto peekData = folly::IOBuf::copyBuffer("actual stream data");
3191
3192 transport->setPeekCallback(stream1, &peekCb1);
3193
3194 EXPECT_CALL(peekCb1, onDataAvailable(stream1, _));
3195 transport->addDataToStream(stream1, StreamBuffer(peekData->clone(), 0));
3196 transport->driveReadCallbacks();
3197
3198 bool cbCalled = false;
3199 auto peekCallback = [&](StreamId id,
3200 const folly::Range<PeekIterator>& range) {
3201 cbCalled = true;
3202 EXPECT_EQ(id, stream1);
3203 EXPECT_EQ(range.size(), 1);
3204 auto bufClone = range[0].data.front()->clone();
3205 EXPECT_EQ("actual stream data", bufClone->moveToFbString().toStdString());
3206 };
3207
3208 transport->peek(stream1, peekCallback);
3209 EXPECT_TRUE(cbCalled);
3210 transport.reset();
3211 }
3212
TEST_F(QuicTransportImplTest,PeekDataWithError)3213 TEST_F(QuicTransportImplTest, PeekDataWithError) {
3214 InSequence enforceOrder;
3215
3216 auto streamId = transport->createBidirectionalStream().value();
3217 auto peekData = folly::IOBuf::copyBuffer("actual stream data");
3218 transport->addDataToStream(streamId, StreamBuffer(peekData->clone(), 0));
3219
3220 bool cbCalled = false;
3221 auto peekCallback = [&](StreamId, const folly::Range<PeekIterator>&) {
3222 cbCalled = true;
3223 };
3224
3225 // Same local error code should be returned.
3226 transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR);
3227 auto result = transport->peek(streamId, peekCallback);
3228 EXPECT_FALSE(cbCalled);
3229 EXPECT_TRUE(result.hasError());
3230 EXPECT_EQ(LocalErrorCode::NO_ERROR, result.error());
3231
3232 // LocalErrorCode::INTERNAL_ERROR should be returned.
3233 transport->addStreamReadError(
3234 streamId, TransportErrorCode::FLOW_CONTROL_ERROR);
3235 result = transport->peek(streamId, peekCallback);
3236 EXPECT_FALSE(cbCalled);
3237 EXPECT_TRUE(result.hasError());
3238 EXPECT_EQ(LocalErrorCode::INTERNAL_ERROR, result.error());
3239
3240 transport.reset();
3241 }
3242
TEST_F(QuicTransportImplTest,ConsumeDataWithError)3243 TEST_F(QuicTransportImplTest, ConsumeDataWithError) {
3244 InSequence enforceOrder;
3245
3246 auto streamId = transport->createBidirectionalStream().value();
3247 auto peekData = folly::IOBuf::copyBuffer("actual stream data");
3248 transport->addDataToStream(streamId, StreamBuffer(peekData->clone(), 0));
3249
3250 // Same local error code should be returned.
3251 transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR);
3252 auto result = transport->consume(streamId, 1);
3253 EXPECT_TRUE(result.hasError());
3254 EXPECT_EQ(LocalErrorCode::NO_ERROR, result.error());
3255
3256 // LocalErrorCode::INTERNAL_ERROR should be returned.
3257 transport->addStreamReadError(
3258 streamId, TransportErrorCode::FLOW_CONTROL_ERROR);
3259 result = transport->consume(streamId, 1);
3260 EXPECT_TRUE(result.hasError());
3261 EXPECT_EQ(LocalErrorCode::INTERNAL_ERROR, result.error());
3262
3263 transport.reset();
3264 }
3265
TEST_F(QuicTransportImplTest,PeekConsumeReadTest)3266 TEST_F(QuicTransportImplTest, PeekConsumeReadTest) {
3267 InSequence enforceOrder;
3268
3269 auto stream1 = transport->createBidirectionalStream().value();
3270 auto readData = folly::IOBuf::copyBuffer("actual stream data");
3271
3272 NiceMock<MockPeekCallback> peekCb;
3273 NiceMock<MockReadCallback> readCb;
3274
3275 transport->setPeekCallback(stream1, &peekCb);
3276 transport->setReadCallback(stream1, &readCb);
3277
3278 transport->addDataToStream(
3279 stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3280
3281 // Both peek and read should be called.
3282 EXPECT_CALL(readCb, readAvailable(stream1));
3283 EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
3284 transport->driveReadCallbacks();
3285
3286 // Only read should be called
3287 EXPECT_CALL(readCb, readAvailable(stream1));
3288 transport->driveReadCallbacks();
3289
3290 // Consume 5 bytes.
3291 transport->consume(stream1, 5);
3292
3293 // Both peek and read should be called.
3294 // Read - because it is called every time
3295 // Peek - because the peekable range has changed
3296 EXPECT_CALL(readCb, readAvailable(stream1));
3297 EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
3298 transport->driveReadCallbacks();
3299
3300 // Read 10 bytes.
3301 transport->read(stream1, 10).thenOrThrow([&](std::pair<Buf, bool> data) {
3302 EXPECT_EQ("l stream d", data.first->moveToFbString().toStdString());
3303 });
3304
3305 // Both peek and read should be called.
3306 // Read - because it is called every time
3307 // Peek - because the peekable range has changed
3308 EXPECT_CALL(readCb, readAvailable(stream1));
3309 EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
3310 transport->driveReadCallbacks();
3311
3312 // Only read should be called.
3313 EXPECT_CALL(readCb, readAvailable(stream1));
3314 transport->driveReadCallbacks();
3315
3316 // Consume the rest of the data.
3317 // Only 3 bytes left, try consuming 42.
3318 transport->consume(stream1, 42);
3319
3320 // Neither read nor peek should be called.
3321 EXPECT_CALL(readCb, readAvailable(stream1)).Times(0);
3322 EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0);
3323 transport->driveReadCallbacks();
3324
3325 // Add more data, this time with a gap.
3326 auto buf1 = IOBuf::copyBuffer("I just met you and this is crazy.");
3327 auto buf2 = IOBuf::copyBuffer(" Here is my number, so call");
3328 auto buf3 = IOBuf::copyBuffer(" me maybe.");
3329 transport->addDataToStream(stream1, StreamBuffer(buf1->clone(), 0));
3330 transport->addDataToStream(
3331 stream1,
3332 StreamBuffer(
3333 buf3->clone(),
3334 buf1->computeChainDataLength() + buf2->computeChainDataLength()));
3335
3336 // Both peek and read should be called.
3337 EXPECT_CALL(readCb, readAvailable(stream1));
3338 EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
3339 transport->driveReadCallbacks();
3340
3341 // Consume left part.
3342 transport->consume(stream1, buf1->computeChainDataLength());
3343
3344 // Only peek should be called.
3345 EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
3346 transport->driveReadCallbacks();
3347
3348 // Fill in the gap.
3349 transport->addDataToStream(
3350 stream1, StreamBuffer(buf2->clone(), buf1->computeChainDataLength()));
3351
3352 // Both peek and read should be called.
3353 EXPECT_CALL(readCb, readAvailable(stream1));
3354 EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
3355 transport->driveReadCallbacks();
3356
3357 // Read the rest of the buffer.
3358 transport->read(stream1, 0).thenOrThrow([&](std::pair<Buf, bool> data) {
3359 EXPECT_EQ(
3360 " Here is my number, so call me maybe.",
3361 data.first->moveToFbString().toStdString());
3362 });
3363
3364 // Neither read nor peek should be called.
3365 EXPECT_CALL(readCb, readAvailable(stream1)).Times(0);
3366 EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0);
3367 transport->driveReadCallbacks();
3368
3369 transport.reset();
3370 }
3371
TEST_F(QuicTransportImplTest,UpdatePeekableListNoDataTest)3372 TEST_F(QuicTransportImplTest, UpdatePeekableListNoDataTest) {
3373 auto streamId = transport->createBidirectionalStream().value();
3374 const auto& conn = transport->transportConn;
3375 auto stream = transport->getStream(streamId);
3376
3377 // Insert streamId into the list.
3378 conn->streamManager->peekableStreams().insert(streamId);
3379 // After the call the streamId should be removed
3380 // from the list since there is no peekable data in the stream.
3381 conn->streamManager->updatePeekableStreams(*stream);
3382 EXPECT_EQ(0, conn->streamManager->peekableStreams().count(streamId));
3383 }
3384
TEST_F(QuicTransportImplTest,UpdatePeekableListWithDataTest)3385 TEST_F(QuicTransportImplTest, UpdatePeekableListWithDataTest) {
3386 auto streamId = transport->createBidirectionalStream().value();
3387 const auto& conn = transport->transportConn;
3388 auto stream = transport->getStream(streamId);
3389
3390 // Add some data to the stream.
3391 transport->addDataToStream(
3392 streamId,
3393 StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3394
3395 // streamId is in the list after the above call.
3396 EXPECT_EQ(1, conn->streamManager->peekableStreams().count(streamId));
3397
3398 // After the call the streamId shall remain
3399 // in the list since there is data in the stream.
3400 conn->streamManager->updatePeekableStreams(*stream);
3401 EXPECT_EQ(1, conn->streamManager->peekableStreams().count(streamId));
3402 }
3403
TEST_F(QuicTransportImplTest,UpdatePeekableListEmptyListTest)3404 TEST_F(QuicTransportImplTest, UpdatePeekableListEmptyListTest) {
3405 auto streamId = transport->createBidirectionalStream().value();
3406 const auto& conn = transport->transportConn;
3407 auto stream = transport->getStream(streamId);
3408
3409 // Add some data to the stream.
3410 transport->addDataToStream(
3411 streamId,
3412 StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3413
3414 // Erase streamId from the list.
3415 conn->streamManager->peekableStreams().erase(streamId);
3416 EXPECT_EQ(0, conn->streamManager->peekableStreams().count(streamId));
3417
3418 // After the call the streamId should be added to the list
3419 // because there is data in the stream and the streamId is
3420 // not in the list.
3421 conn->streamManager->updatePeekableStreams(*stream);
3422 EXPECT_EQ(1, conn->streamManager->peekableStreams().count(streamId));
3423 }
3424
TEST_F(QuicTransportImplTest,UpdatePeekableListWithStreamErrorTest)3425 TEST_F(QuicTransportImplTest, UpdatePeekableListWithStreamErrorTest) {
3426 auto streamId = transport->createBidirectionalStream().value();
3427 const auto& conn = transport->transportConn;
3428 // Add some data to the stream.
3429 transport->addDataToStream(
3430 streamId,
3431 StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
3432
3433 // streamId is in the list.
3434 EXPECT_EQ(1, conn->streamManager->peekableStreams().count(streamId));
3435
3436 transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR);
3437
3438 // peekableStreams is updated to allow stream with streamReadError.
3439 // So the streamId shall be in the list
3440 EXPECT_EQ(1, conn->streamManager->peekableStreams().count(streamId));
3441 }
3442
TEST_F(QuicTransportImplTest,SuccessfulPing)3443 TEST_F(QuicTransportImplTest, SuccessfulPing) {
3444 auto conn = transport->transportConn;
3445 std::chrono::milliseconds interval(10);
3446 TestPingCallback pingCallback;
3447 transport->invokeSendPing(&pingCallback, interval);
3448 EXPECT_EQ(transport->isPingTimeoutScheduled(), true);
3449 EXPECT_EQ(conn->pendingEvents.cancelPingTimeout, false);
3450 conn->pendingEvents.cancelPingTimeout = true;
3451 transport->invokeHandlePingCallback();
3452 evb->loopOnce();
3453 EXPECT_EQ(transport->isPingTimeoutScheduled(), false);
3454 EXPECT_EQ(conn->pendingEvents.cancelPingTimeout, false);
3455 }
3456
TEST_F(QuicTransportImplTest,FailedPing)3457 TEST_F(QuicTransportImplTest, FailedPing) {
3458 auto conn = transport->transportConn;
3459 std::chrono::milliseconds interval(10);
3460 TestPingCallback pingCallback;
3461 transport->invokeSendPing(&pingCallback, interval);
3462 EXPECT_EQ(transport->isPingTimeoutScheduled(), true);
3463 EXPECT_EQ(conn->pendingEvents.cancelPingTimeout, false);
3464 conn->pendingEvents.cancelPingTimeout = true;
3465 transport->invokeCancelPingTimeout();
3466 transport->invokeHandlePingCallback();
3467 EXPECT_EQ(conn->pendingEvents.cancelPingTimeout, false);
3468 }
3469
TEST_F(QuicTransportImplTest,HandleKnobCallbacks)3470 TEST_F(QuicTransportImplTest, HandleKnobCallbacks) {
3471 auto conn = transport->transportConn;
3472
3473 // attach an observer to the socket
3474 Observer::Config config = {};
3475 config.knobFrameEvents = true;
3476 auto cb = std::make_unique<StrictMock<MockObserver>>(config);
3477 EXPECT_CALL(*cb, observerAttach(transport.get()));
3478 transport->addObserver(cb.get());
3479 Mock::VerifyAndClearExpectations(cb.get());
3480
3481 // set test knob frame
3482 uint64_t knobSpace = 0xfaceb00c;
3483 uint64_t knobId = 42;
3484 folly::StringPiece data = "test knob data";
3485 Buf buf(folly::IOBuf::create(data.size()));
3486 memcpy(buf->writableData(), data.data(), data.size());
3487 buf->append(data.size());
3488 conn->pendingEvents.knobs.emplace_back(
3489 KnobFrame(knobSpace, knobId, std::move(buf)));
3490
3491 EXPECT_CALL(connCallback, onKnobMock(knobSpace, knobId, _))
3492 .WillOnce(Invoke([](Unused, Unused, Unused) { /* do nothing */ }));
3493 EXPECT_CALL(*cb, knobFrameReceived(transport.get(), _)).Times(1);
3494 transport->invokeHandleKnobCallbacks();
3495 evb->loopOnce();
3496 EXPECT_EQ(conn->pendingEvents.knobs.size(), 0);
3497
3498 // detach the observer from the socket
3499 EXPECT_CALL(*cb, observerDetach(transport.get()));
3500 EXPECT_TRUE(transport->removeObserver(cb.get()));
3501 Mock::VerifyAndClearExpectations(cb.get());
3502 }
3503
TEST_F(QuicTransportImplTest,StreamWriteCallbackUnregister)3504 TEST_F(QuicTransportImplTest, StreamWriteCallbackUnregister) {
3505 auto stream = transport->createBidirectionalStream().value();
3506 // Unset before set
3507 EXPECT_FALSE(transport->unregisterStreamWriteCallback(stream));
3508
3509 // Set
3510 auto wcb = std::make_unique<MockWriteCallback>();
3511 EXPECT_CALL(*wcb, onStreamWriteReady(stream, _)).Times(1);
3512 auto result = transport->notifyPendingWriteOnStream(stream, wcb.get());
3513 EXPECT_TRUE(result);
3514 evb->loopOnce();
3515
3516 // Set then unset
3517 EXPECT_CALL(*wcb, onStreamWriteReady(stream, _)).Times(0);
3518 result = transport->notifyPendingWriteOnStream(stream, wcb.get());
3519 EXPECT_TRUE(result);
3520 EXPECT_TRUE(transport->unregisterStreamWriteCallback(stream));
3521 evb->loopOnce();
3522
3523 // Set, close, unset
3524 result = transport->notifyPendingWriteOnStream(stream, wcb.get());
3525 EXPECT_TRUE(result);
3526 MockReadCallback rcb;
3527 transport->setReadCallback(stream, &rcb);
3528 // ReadCallback kills WriteCallback
3529 EXPECT_CALL(rcb, readError(stream, _))
3530 .WillOnce(Invoke([&](StreamId stream, auto) {
3531 EXPECT_TRUE(transport->unregisterStreamWriteCallback(stream));
3532 wcb.reset();
3533 }));
3534 transport->close(folly::none);
3535 evb->loopOnce();
3536 }
3537
TEST_F(QuicTransportImplTest,ObserverAttachRemove)3538 TEST_F(QuicTransportImplTest, ObserverAttachRemove) {
3539 auto cb = std::make_unique<StrictMock<MockObserver>>();
3540 EXPECT_CALL(*cb, observerAttach(transport.get()));
3541 transport->addObserver(cb.get());
3542 EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
3543 EXPECT_CALL(*cb, observerDetach(transport.get()));
3544 EXPECT_TRUE(transport->removeObserver(cb.get()));
3545 Mock::VerifyAndClearExpectations(cb.get());
3546 EXPECT_THAT(transport->getObservers(), IsEmpty());
3547 }
3548
TEST_F(QuicTransportImplTest,ObserverAttachRemoveMultiple)3549 TEST_F(QuicTransportImplTest, ObserverAttachRemoveMultiple) {
3550 auto cb1 = std::make_unique<StrictMock<MockObserver>>();
3551 EXPECT_CALL(*cb1, observerAttach(transport.get()));
3552 transport->addObserver(cb1.get());
3553 Mock::VerifyAndClearExpectations(cb1.get());
3554 EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb1.get()));
3555
3556 auto cb2 = std::make_unique<StrictMock<MockObserver>>();
3557 EXPECT_CALL(*cb2, observerAttach(transport.get()));
3558 transport->addObserver(cb2.get());
3559 Mock::VerifyAndClearExpectations(cb2.get());
3560 EXPECT_THAT(
3561 transport->getObservers(), UnorderedElementsAre(cb1.get(), cb2.get()));
3562
3563 EXPECT_CALL(*cb1, observerDetach(transport.get()));
3564 EXPECT_TRUE(transport->removeObserver(cb1.get()));
3565 Mock::VerifyAndClearExpectations(cb1.get());
3566 EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb2.get()));
3567
3568 EXPECT_CALL(*cb2, observerDetach(transport.get()));
3569 EXPECT_TRUE(transport->removeObserver(cb2.get()));
3570 Mock::VerifyAndClearExpectations(cb2.get());
3571 EXPECT_THAT(transport->getObservers(), IsEmpty());
3572 }
3573
TEST_F(QuicTransportImplTest,ObserverAttachRemoveMultipleReverse)3574 TEST_F(QuicTransportImplTest, ObserverAttachRemoveMultipleReverse) {
3575 auto cb1 = std::make_unique<StrictMock<MockObserver>>();
3576 EXPECT_CALL(*cb1, observerAttach(transport.get()));
3577 transport->addObserver(cb1.get());
3578 Mock::VerifyAndClearExpectations(cb1.get());
3579 EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb1.get()));
3580
3581 auto cb2 = std::make_unique<StrictMock<MockObserver>>();
3582 EXPECT_CALL(*cb2, observerAttach(transport.get()));
3583 transport->addObserver(cb2.get());
3584 Mock::VerifyAndClearExpectations(cb2.get());
3585 EXPECT_THAT(
3586 transport->getObservers(), UnorderedElementsAre(cb1.get(), cb2.get()));
3587
3588 EXPECT_CALL(*cb2, observerDetach(transport.get()));
3589 EXPECT_TRUE(transport->removeObserver(cb2.get()));
3590 Mock::VerifyAndClearExpectations(cb2.get());
3591 EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb1.get()));
3592
3593 EXPECT_CALL(*cb1, observerDetach(transport.get()));
3594 EXPECT_TRUE(transport->removeObserver(cb1.get()));
3595 Mock::VerifyAndClearExpectations(cb1.get());
3596 EXPECT_THAT(transport->getObservers(), IsEmpty());
3597 }
3598
TEST_F(QuicTransportImplTest,ObserverRemoveMissing)3599 TEST_F(QuicTransportImplTest, ObserverRemoveMissing) {
3600 auto cb = std::make_unique<StrictMock<MockObserver>>();
3601 EXPECT_FALSE(transport->removeObserver(cb.get()));
3602 EXPECT_THAT(transport->getObservers(), IsEmpty());
3603 }
3604
TEST_F(QuicTransportImplTest,ObserverDestroyTransport)3605 TEST_F(QuicTransportImplTest, ObserverDestroyTransport) {
3606 auto cb = std::make_unique<StrictMock<MockObserver>>();
3607 EXPECT_CALL(*cb, observerAttach(transport.get()));
3608 transport->addObserver(cb.get());
3609 EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
3610 InSequence s;
3611 EXPECT_CALL(*cb, close(transport.get(), _)).Times(2);
3612 EXPECT_CALL(*cb, destroy(transport.get()));
3613 transport = nullptr;
3614 Mock::VerifyAndClearExpectations(cb.get());
3615 }
3616
TEST_F(QuicTransportImplTest,ObserverCloseNoErrorThenDestroyTransport)3617 TEST_F(QuicTransportImplTest, ObserverCloseNoErrorThenDestroyTransport) {
3618 auto cb = std::make_unique<StrictMock<MockObserver>>();
3619 EXPECT_CALL(*cb, observerAttach(transport.get()));
3620 transport->addObserver(cb.get());
3621 EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
3622
3623 const std::pair<QuicErrorCode, std::string> defaultError = std::make_pair(
3624 GenericApplicationErrorCode::NO_ERROR,
3625 toString(GenericApplicationErrorCode::NO_ERROR));
3626 EXPECT_CALL(
3627 *cb,
3628 close(
3629 transport.get(),
3630 folly::Optional<std::pair<QuicErrorCode, std::string>>(
3631 defaultError)));
3632 transport->close(folly::none);
3633 Mock::VerifyAndClearExpectations(cb.get());
3634 InSequence s;
3635 EXPECT_CALL(*cb, close(transport.get(), _)).Times(2);
3636 EXPECT_CALL(*cb, destroy(transport.get()));
3637 transport = nullptr;
3638 Mock::VerifyAndClearExpectations(cb.get());
3639 }
3640
TEST_F(QuicTransportImplTest,ObserverCloseWithErrorThenDestroyTransport)3641 TEST_F(QuicTransportImplTest, ObserverCloseWithErrorThenDestroyTransport) {
3642 auto cb = std::make_unique<StrictMock<MockObserver>>();
3643 EXPECT_CALL(*cb, observerAttach(transport.get()));
3644 transport->addObserver(cb.get());
3645 EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
3646
3647 const auto testError = std::make_pair(
3648 QuicErrorCode(LocalErrorCode::CONNECTION_RESET),
3649 std::string("testError"));
3650 EXPECT_CALL(
3651 *cb,
3652 close(
3653 transport.get(),
3654 folly::Optional<std::pair<QuicErrorCode, std::string>>(testError)));
3655 transport->close(testError);
3656 Mock::VerifyAndClearExpectations(cb.get());
3657 InSequence s;
3658 EXPECT_CALL(*cb, close(transport.get(), _)).Times(2);
3659 EXPECT_CALL(*cb, destroy(transport.get()));
3660 transport = nullptr;
3661 Mock::VerifyAndClearExpectations(cb.get());
3662 }
3663
TEST_F(QuicTransportImplTest,ObserverDetachObserverImmediately)3664 TEST_F(QuicTransportImplTest, ObserverDetachObserverImmediately) {
3665 auto cb = std::make_unique<StrictMock<MockObserver>>();
3666 EXPECT_CALL(*cb, observerAttach(transport.get()));
3667 transport->addObserver(cb.get());
3668 EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
3669
3670 EXPECT_CALL(*cb, observerDetach(transport.get()));
3671 EXPECT_TRUE(transport->removeObserver(cb.get()));
3672 Mock::VerifyAndClearExpectations(cb.get());
3673 EXPECT_THAT(transport->getObservers(), IsEmpty());
3674 }
3675
TEST_F(QuicTransportImplTest,ObserverDetachObserverAfterTransportClose)3676 TEST_F(QuicTransportImplTest, ObserverDetachObserverAfterTransportClose) {
3677 auto cb = std::make_unique<StrictMock<MockObserver>>();
3678 EXPECT_CALL(*cb, observerAttach(transport.get()));
3679 transport->addObserver(cb.get());
3680 EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
3681
3682 EXPECT_CALL(*cb, close(transport.get(), _));
3683 transport->close(folly::none);
3684 Mock::VerifyAndClearExpectations(cb.get());
3685
3686 EXPECT_CALL(*cb, observerDetach(transport.get()));
3687 EXPECT_TRUE(transport->removeObserver(cb.get()));
3688 Mock::VerifyAndClearExpectations(cb.get());
3689 EXPECT_THAT(transport->getObservers(), IsEmpty());
3690 }
3691
TEST_F(QuicTransportImplTest,ObserverDetachObserverOnCloseDuringTransportDestroy)3692 TEST_F(
3693 QuicTransportImplTest,
3694 ObserverDetachObserverOnCloseDuringTransportDestroy) {
3695 auto cb = std::make_unique<StrictMock<MockObserver>>();
3696 EXPECT_CALL(*cb, observerAttach(transport.get()));
3697 transport->addObserver(cb.get());
3698 EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
3699
3700 InSequence s;
3701 EXPECT_CALL(*cb, close(transport.get(), _))
3702 .WillOnce(Invoke([&cb](auto callbackTransport, auto /* errorOpt */) {
3703 EXPECT_TRUE(callbackTransport->removeObserver(cb.get()));
3704 }));
3705 EXPECT_CALL(*cb, observerDetach(transport.get()));
3706 transport = nullptr;
3707 Mock::VerifyAndClearExpectations(cb.get());
3708 }
3709
TEST_F(QuicTransportImplTest,ObserverMultipleAttachRemove)3710 TEST_F(QuicTransportImplTest, ObserverMultipleAttachRemove) {
3711 auto cb1 = std::make_unique<StrictMock<MockObserver>>();
3712 EXPECT_CALL(*cb1, observerAttach(transport.get()));
3713 transport->addObserver(cb1.get());
3714 EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb1.get()));
3715
3716 auto cb2 = std::make_unique<StrictMock<MockObserver>>();
3717 EXPECT_CALL(*cb2, observerAttach(transport.get()));
3718 transport->addObserver(cb2.get());
3719 EXPECT_THAT(
3720 transport->getObservers(), UnorderedElementsAre(cb1.get(), cb2.get()));
3721
3722 EXPECT_CALL(*cb2, observerDetach(transport.get()));
3723 EXPECT_TRUE(transport->removeObserver(cb2.get()));
3724 EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb1.get()));
3725 Mock::VerifyAndClearExpectations(cb1.get());
3726 Mock::VerifyAndClearExpectations(cb2.get());
3727
3728 EXPECT_CALL(*cb1, observerDetach(transport.get()));
3729 EXPECT_TRUE(transport->removeObserver(cb1.get()));
3730 EXPECT_THAT(transport->getObservers(), IsEmpty());
3731 Mock::VerifyAndClearExpectations(cb1.get());
3732 Mock::VerifyAndClearExpectations(cb2.get());
3733
3734 transport = nullptr;
3735 }
3736
TEST_F(QuicTransportImplTest,ObserverMultipleAttachDestroyTransport)3737 TEST_F(QuicTransportImplTest, ObserverMultipleAttachDestroyTransport) {
3738 auto cb1 = std::make_unique<StrictMock<MockObserver>>();
3739 EXPECT_CALL(*cb1, observerAttach(transport.get()));
3740 transport->addObserver(cb1.get());
3741 EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb1.get()));
3742
3743 auto cb2 = std::make_unique<StrictMock<MockObserver>>();
3744 EXPECT_CALL(*cb2, observerAttach(transport.get()));
3745 transport->addObserver(cb2.get());
3746 EXPECT_THAT(
3747 transport->getObservers(), UnorderedElementsAre(cb1.get(), cb2.get()));
3748
3749 InSequence s;
3750 EXPECT_CALL(*cb1, close(transport.get(), _));
3751 EXPECT_CALL(*cb2, close(transport.get(), _));
3752 EXPECT_CALL(*cb1, close(transport.get(), _));
3753 EXPECT_CALL(*cb2, close(transport.get(), _));
3754 EXPECT_CALL(*cb1, destroy(transport.get()));
3755 EXPECT_CALL(*cb2, destroy(transport.get()));
3756 transport = nullptr;
3757 Mock::VerifyAndClearExpectations(cb1.get());
3758 Mock::VerifyAndClearExpectations(cb2.get());
3759 }
3760
TEST_F(QuicTransportImplTest,ObserverDetachAndAttachEvb)3761 TEST_F(QuicTransportImplTest, ObserverDetachAndAttachEvb) {
3762 Observer::Config config = {};
3763 config.evbEvents = true;
3764 auto cb = std::make_unique<StrictMock<MockObserver>>(config);
3765 folly::EventBase evb2;
3766
3767 EXPECT_CALL(*cb, observerAttach(transport.get()));
3768 transport->addObserver(cb.get());
3769 Mock::VerifyAndClearExpectations(cb.get());
3770
3771 // Detach the event base evb and attach a new event base evb2
3772 EXPECT_CALL(*cb, evbDetach(transport.get(), evb.get()));
3773 transport->detachEventBase();
3774 EXPECT_EQ(nullptr, transport->getEventBase());
3775 Mock::VerifyAndClearExpectations(cb.get());
3776
3777 EXPECT_CALL(*cb, evbAttach(transport.get(), &evb2));
3778 transport->attachEventBase(&evb2);
3779 EXPECT_EQ(&evb2, transport->getEventBase());
3780 Mock::VerifyAndClearExpectations(cb.get());
3781
3782 // Detach the event base evb and re-attach the old event base evb
3783 EXPECT_CALL(*cb, evbDetach(transport.get(), &evb2));
3784 transport->detachEventBase();
3785 EXPECT_EQ(nullptr, transport->getEventBase());
3786 Mock::VerifyAndClearExpectations(cb.get());
3787
3788 EXPECT_CALL(*cb, evbAttach(transport.get(), evb.get()));
3789 transport->attachEventBase(evb.get());
3790 EXPECT_EQ(evb.get(), transport->getEventBase());
3791 Mock::VerifyAndClearExpectations(cb.get());
3792
3793 EXPECT_CALL(*cb, observerDetach(transport.get()));
3794 EXPECT_TRUE(transport->removeObserver(cb.get()));
3795 Mock::VerifyAndClearExpectations(cb.get());
3796 }
3797
TEST_F(QuicTransportImplTest,ImplementationObserverCallbacksDeleted)3798 TEST_F(QuicTransportImplTest, ImplementationObserverCallbacksDeleted) {
3799 auto noopCallback = [](QuicSocket*) {};
3800 transport->transportConn->pendingCallbacks.emplace_back(noopCallback);
3801 EXPECT_EQ(1, size(transport->transportConn->pendingCallbacks));
3802 transport->invokeProcessCallbacksAfterNetworkData();
3803 EXPECT_EQ(0, size(transport->transportConn->pendingCallbacks));
3804 }
3805
TEST_F(QuicTransportImplTest,ImplementationObserverCallbacksInvoked)3806 TEST_F(QuicTransportImplTest, ImplementationObserverCallbacksInvoked) {
3807 uint32_t callbacksInvoked = 0;
3808 auto countingCallback = [&](QuicSocket*) { callbacksInvoked++; };
3809
3810 for (int i = 0; i < 2; i++) {
3811 transport->transportConn->pendingCallbacks.emplace_back(countingCallback);
3812 }
3813 EXPECT_EQ(2, size(transport->transportConn->pendingCallbacks));
3814 transport->invokeProcessCallbacksAfterNetworkData();
3815
3816 EXPECT_EQ(2, callbacksInvoked);
3817 EXPECT_EQ(0, size(transport->transportConn->pendingCallbacks));
3818 }
3819
TEST_F(QuicTransportImplTest,ImplementationObserverCallbacksCorrectQuicSocket)3820 TEST_F(
3821 QuicTransportImplTest,
3822 ImplementationObserverCallbacksCorrectQuicSocket) {
3823 QuicSocket* returnedSocket = nullptr;
3824 auto func = [&](QuicSocket* qSocket) { returnedSocket = qSocket; };
3825
3826 EXPECT_EQ(0, size(transport->transportConn->pendingCallbacks));
3827 transport->transportConn->pendingCallbacks.emplace_back(func);
3828 EXPECT_EQ(1, size(transport->transportConn->pendingCallbacks));
3829
3830 transport->invokeProcessCallbacksAfterNetworkData();
3831 EXPECT_EQ(0, size(transport->transportConn->pendingCallbacks));
3832
3833 EXPECT_EQ(transport.get(), returnedSocket);
3834 }
3835
TEST_F(QuicTransportImplTest,GetConnectionStatsSmoke)3836 TEST_F(QuicTransportImplTest, GetConnectionStatsSmoke) {
3837 auto stats = transport->getConnectionsStats();
3838 EXPECT_EQ(stats.congestionController, CongestionControlType::Cubic);
3839 EXPECT_EQ(stats.clientConnectionId, "0a090807");
3840 }
3841
TEST_F(QuicTransportImplTest,DatagramCallbackDatagramAvailable)3842 TEST_F(QuicTransportImplTest, DatagramCallbackDatagramAvailable) {
3843 NiceMock<MockDatagramCallback> datagramCb;
3844 transport->enableDatagram();
3845 transport->setDatagramCallback(&datagramCb);
3846 transport->addDatagram(folly::IOBuf::copyBuffer("datagram payload"));
3847 EXPECT_CALL(datagramCb, onDatagramsAvailable());
3848 transport->driveReadCallbacks();
3849 }
3850
TEST_F(QuicTransportImplTest,ZeroLengthDatagram)3851 TEST_F(QuicTransportImplTest, ZeroLengthDatagram) {
3852 NiceMock<MockDatagramCallback> datagramCb;
3853 transport->enableDatagram();
3854 transport->setDatagramCallback(&datagramCb);
3855 transport->addDatagram(folly::IOBuf::copyBuffer(""));
3856 EXPECT_CALL(datagramCb, onDatagramsAvailable());
3857 transport->driveReadCallbacks();
3858 auto datagrams = transport->readDatagrams();
3859 EXPECT_FALSE(datagrams.hasError());
3860 EXPECT_EQ(datagrams->size(), 1);
3861 EXPECT_TRUE(datagrams->front() != nullptr);
3862 EXPECT_EQ(datagrams->front()->computeChainDataLength(), 0);
3863 }
3864
TEST_F(QuicTransportImplTest,Cmsgs)3865 TEST_F(QuicTransportImplTest, Cmsgs) {
3866 transport->setServerConnectionId();
3867 folly::SocketOptionMap cmsgs;
3868 cmsgs[{IPPROTO_IP, IP_TOS}] = 123;
3869 EXPECT_CALL(*socketPtr, setCmsgs(_)).Times(1);
3870 transport->setCmsgs(cmsgs);
3871
3872 EXPECT_CALL(*socketPtr, appendCmsgs(_)).Times(1);
3873 transport->appendCmsgs(cmsgs);
3874 }
3875
TEST_F(QuicTransportImplTest,BackgroundModeChangeWithStreamChanges)3876 TEST_F(QuicTransportImplTest, BackgroundModeChangeWithStreamChanges) {
3877 // Verify that background mode is correctly turned on and off
3878 // based upon stream creation, priority changes, stream removal.
3879 // For different steps try local (uni/bi)directional streams and remote
3880 // streams
3881 InSequence s;
3882 auto& conn = transport->getConnectionState();
3883 auto mockCongestionController =
3884 std::make_unique<NiceMock<MockCongestionController>>();
3885 auto rawCongestionController = mockCongestionController.get();
3886 conn.congestionController = std::move(mockCongestionController);
3887 auto& manager = *conn.streamManager;
3888 EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(_))
3889 .Times(0); // Backgound params not set
3890 auto stream = manager.createNextUnidirectionalStream().value();
3891 manager.setStreamPriority(stream->id, 1, false);
3892
3893 EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5))
3894 .Times(1); // On setting the background params
3895 transport->setBackgroundModeParameters(1, 0.5);
3896
3897 EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5))
3898 .Times(1); // On removing a closed stream
3899 stream->sendState = StreamSendState::Closed;
3900 stream->recvState = StreamRecvState::Closed;
3901 manager.removeClosedStream(stream->id);
3902
3903 EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5))
3904 .Times(2); // On stream creation - create two streams - one bidirectional
3905 auto stream2Id = manager.createNextUnidirectionalStream().value()->id;
3906 auto stream3id = manager.createNextBidirectionalStream().value()->id;
3907
3908 EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0))
3909 .Times(1); // On increasing the priority of one of the streams
3910 manager.setStreamPriority(stream3id, 0, false);
3911
3912 EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0))
3913 .Times(1); // a new lower priority stream does not affect the utlization
3914 // factor
3915 auto streamLower = manager.createNextBidirectionalStream().value();
3916
3917 EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0))
3918 .Times(1); // On removing a closed stream
3919 streamLower->sendState = StreamSendState::Closed;
3920 streamLower->recvState = StreamRecvState::Closed;
3921 manager.removeClosedStream(streamLower->id);
3922
3923 EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5))
3924 .Times(1); // On removing a closed stream
3925 CHECK_NOTNULL(manager.getStream(stream3id))->sendState =
3926 StreamSendState::Closed;
3927 CHECK_NOTNULL(manager.getStream(stream3id))->recvState =
3928 StreamRecvState::Closed;
3929 manager.removeClosedStream(stream3id);
3930
3931 EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5))
3932 .Times(1); // On stream creation - remote stream
3933 auto peerStreamId = 20;
3934 ASSERT_TRUE(isRemoteStream(conn.nodeType, peerStreamId));
3935 auto stream4 = manager.getStream(peerStreamId);
3936
3937 EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0))
3938 .Times(1); // On clearing the background parameters
3939 transport->clearBackgroundModeParameters();
3940
3941 EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(_))
3942 .Times(0); // Background params not set
3943 stream4->sendState = StreamSendState::Closed;
3944 stream4->recvState = StreamRecvState::Closed;
3945 manager.removeClosedStream(stream4->id);
3946 CHECK_NOTNULL(manager.getStream(stream2Id))->sendState =
3947 StreamSendState::Closed;
3948 CHECK_NOTNULL(manager.getStream(stream2Id))->recvState =
3949 StreamRecvState::Closed;
3950 manager.removeClosedStream(stream2Id);
3951 }
3952
3953 } // namespace test
3954 } // namespace quic
3955