1 /** 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * SPDX-License-Identifier: Apache-2.0. 4 */ 5 6 #include <aws/external/gtest.h> 7 #include <aws/event-stream/event_stream.h> 8 #include <aws/core/http/standard/StandardHttpRequest.h> 9 #include <aws/core/http/standard/StandardHttpResponse.h> 10 #include <aws/core/client/AWSError.h> 11 #include <aws/core/client/CoreErrors.h> 12 #include <aws/core/client/AWSErrorMarshaller.h> 13 #include <aws/core/utils/event/EventStream.h> 14 #include <aws/testing/mocks/event/MockEventStreamHandler.h> 15 #include <aws/testing/mocks/event/MockEventStreamDecoder.h> 16 #include <aws/core/utils/memory/AWSMemory.h> 17 18 namespace 19 { 20 using namespace Aws::Utils; 21 using namespace Aws::Utils::Event; 22 using namespace Aws::Utils::Stream; 23 using namespace Aws::Http; 24 using namespace Aws::Http::Standard; 25 using namespace Aws::Client; 26 27 const static char ALLOCATION_TAG[] = "EventStreamTests"; 28 const static char ERROR_RAW[] = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" 29 "<Error>" 30 "<Code>IncompleteSignatureException</Code>" 31 "<Message>Message</Message>" 32 "<Resource>Resource</Resource>" 33 "<RequestId>RequestId</RequestId>" 34 "</Error>"; 35 36 class EventStreamTest : public ::testing::Test 37 { 38 public: 39 static aws_event_stream_message eventStreamMessage; 40 static aws_array_list eventStreamHeaders; 41 42 protected: SetUpTestCase()43 static void SetUpTestCase() 44 { 45 // Assemble Records Message 46 // Headers 47 ASSERT_EQ(AWS_OP_SUCCESS, aws_event_stream_headers_list_init(&eventStreamHeaders, Aws::get_aws_allocator())); 48 Aws::Http::HeaderValueCollection headers; 49 headers.insert(Aws::Http::HeaderValuePair(":event-type", "Records")); 50 headers.insert(Aws::Http::HeaderValuePair(":content-type", "application/octet-stream")); 51 headers.insert(Aws::Http::HeaderValuePair(":message-type", "event")); 52 for (const auto& header : headers) 53 { 54 ASSERT_EQ(AWS_OP_SUCCESS, aws_event_stream_add_string_header(&eventStreamHeaders, header.first.c_str(), 55 static_cast<uint8_t>(header.first.size()), 56 header.second.c_str(), static_cast<uint16_t>(header.second.size()), false/*copy*/)); 57 } 58 // Payload 59 const char* payload = "Records"; 60 aws_byte_buf payloadBuf = aws_byte_buf_from_array(reinterpret_cast<const uint8_t*>(payload), strlen(payload)); 61 ASSERT_EQ(AWS_OP_SUCCESS, aws_event_stream_message_init(&eventStreamMessage, Aws::get_aws_allocator(), &eventStreamHeaders, &payloadBuf)); 62 } 63 TearDownTestCase()64 static void TearDownTestCase() 65 { 66 aws_event_stream_message_clean_up(&eventStreamMessage); 67 aws_event_stream_headers_list_cleanup(&eventStreamHeaders); 68 } 69 }; 70 71 aws_event_stream_message EventStreamTest::eventStreamMessage; 72 aws_array_list EventStreamTest::eventStreamHeaders; 73 TEST_F(EventStreamTest,TestEventStreamDestructor)74 TEST_F(EventStreamTest, TestEventStreamDestructor) 75 { 76 MockEventStreamHandler handler; 77 MockEventStreamDecoder decoder(&handler); 78 79 const uint8_t* data_raw = aws_event_stream_message_buffer(&eventStreamMessage); 80 { 81 EventDecoderStream stream(decoder); 82 stream.write(reinterpret_cast<const char*>(data_raw), aws_event_stream_message_total_length(&eventStreamMessage)); 83 } 84 85 ASSERT_EQ(1u, handler.m_onPayloadSegmentCount); 86 ASSERT_EQ(1u, handler.m_onCompletePayloadCount); 87 ASSERT_EQ(1u, handler.m_onPreludeReceivedCount); 88 ASSERT_EQ(3u, handler.m_onHeaderReceivedCount); 89 ASSERT_EQ(0u, handler.m_requestLevelErrorsCount); 90 ASSERT_EQ(0u, handler.m_internalErrorsCount); 91 92 ASSERT_EQ(1u, handler.m_onRecordsCount); 93 } 94 TEST_F(EventStreamTest,TestEventStreamFlush)95 TEST_F(EventStreamTest, TestEventStreamFlush) 96 { 97 MockEventStreamHandler handler; 98 MockEventStreamDecoder decoder(&handler); 99 100 const uint8_t* data_raw = aws_event_stream_message_buffer(&eventStreamMessage); 101 EventDecoderStream stream(decoder); 102 103 size_t preludeLength = 4/*total byte-length*/ + 4/*headers byte-length*/ + 4/*prelude crc*/; 104 size_t headersLength = aws_event_stream_message_headers_len(&eventStreamMessage); 105 size_t payloadLength = aws_event_stream_message_payload_len(&eventStreamMessage); 106 107 // Write prelude and headers to stream. 108 size_t partialLength = preludeLength + headersLength; 109 110 stream.write(reinterpret_cast<const char*>(data_raw), partialLength); 111 stream.flush(); 112 113 ASSERT_EQ(0u, handler.m_onPayloadSegmentCount); 114 ASSERT_EQ(0u, handler.m_onCompletePayloadCount); 115 ASSERT_EQ(1u, handler.m_onPreludeReceivedCount); 116 ASSERT_EQ(3u, handler.m_onHeaderReceivedCount); 117 ASSERT_EQ(0u, handler.m_requestLevelErrorsCount); 118 ASSERT_EQ(0u, handler.m_internalErrorsCount); 119 120 ASSERT_EQ(0u, handler.m_onRecordsCount); 121 122 // Write payload to stream. 123 data_raw += partialLength; 124 partialLength = payloadLength + 4/*message crc*/; 125 126 stream.write(reinterpret_cast<const char*>(data_raw), partialLength); 127 stream.flush(); 128 129 ASSERT_EQ(1u, handler.m_onPayloadSegmentCount); 130 ASSERT_EQ(1u, handler.m_onCompletePayloadCount); 131 ASSERT_EQ(1u, handler.m_onPreludeReceivedCount); 132 ASSERT_EQ(3u, handler.m_onHeaderReceivedCount); 133 ASSERT_EQ(0u, handler.m_requestLevelErrorsCount); 134 ASSERT_EQ(0u, handler.m_internalErrorsCount); 135 136 ASSERT_EQ(1u, handler.m_onRecordsCount); 137 } 138 TEST_F(EventStreamTest,TestEventStreamLargePayload)139 TEST_F(EventStreamTest, TestEventStreamLargePayload) 140 { 141 MockEventStreamHandler handler; 142 MockEventStreamDecoder decoder(&handler); 143 144 const uint8_t* data_raw = aws_event_stream_message_buffer(&eventStreamMessage); 145 size_t totalLength = aws_event_stream_message_total_length(&eventStreamMessage); 146 EventDecoderStream stream(decoder, totalLength / 2); 147 148 stream.write(reinterpret_cast<const char*>(data_raw), totalLength); 149 stream.flush(); 150 151 ASSERT_EQ(1u, handler.m_onPayloadSegmentCount); 152 ASSERT_EQ(1u, handler.m_onCompletePayloadCount); 153 ASSERT_EQ(1u, handler.m_onPreludeReceivedCount); 154 ASSERT_EQ(3u, handler.m_onHeaderReceivedCount); 155 ASSERT_EQ(0u, handler.m_requestLevelErrorsCount); 156 ASSERT_EQ(0u, handler.m_internalErrorsCount); 157 158 ASSERT_EQ(1u, handler.m_onRecordsCount); 159 } 160 TEST_F(EventStreamTest,TestXmlErrorPayloadWithAwsString)161 TEST_F(EventStreamTest, TestXmlErrorPayloadWithAwsString) 162 { 163 MockEventStreamHandler handler; 164 MockEventStreamDecoder decoder(&handler); 165 166 EventDecoderStream stream(decoder); 167 stream.write(ERROR_RAW, sizeof(ERROR_RAW)); 168 stream.flush(); 169 170 Aws::StringStream ss; 171 ss << stream.rdbuf(); 172 ASSERT_STREQ(ERROR_RAW, ss.str().c_str()); 173 174 ASSERT_EQ(0u, handler.m_onPayloadSegmentCount); 175 ASSERT_EQ(0u, handler.m_onCompletePayloadCount); 176 // Underlying Event Stream Decoder will parse xml error unsuccessfully, an invalid prelude with CRC mismatch error will not trigger onPreludeReceived(). 177 ASSERT_EQ(0u, handler.m_onPreludeReceivedCount); 178 ASSERT_EQ(0u, handler.m_onHeaderReceivedCount); 179 ASSERT_EQ(0u, handler.m_requestLevelErrorsCount); 180 ASSERT_EQ(1u, handler.m_internalErrorsCount); 181 ASSERT_EQ(EventStreamErrors::EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE, handler.m_error); 182 ASSERT_TRUE(handler.m_errorMessage.find("CRC Mismatch.") == 0); 183 } 184 TEST_F(EventStreamTest,TestXmlErrorPayloadWithAwsErrorMarshaller)185 TEST_F(EventStreamTest, TestXmlErrorPayloadWithAwsErrorMarshaller) 186 { 187 MockEventStreamHandler handler; 188 MockEventStreamDecoder decoder(&handler); 189 190 auto stream = Aws::New<EventDecoderStream>(ALLOCATION_TAG, decoder); 191 stream->write(ERROR_RAW, sizeof(ERROR_RAW)); 192 stream->flush(); 193 194 auto fakeRequest = Aws::MakeShared<StandardHttpRequest>(ALLOCATION_TAG, 195 "/some/uri", Aws::Http::HttpMethod::HTTP_POST); 196 fakeRequest->SetResponseStreamFactory([=] { return stream; }); 197 198 StandardHttpResponse response(fakeRequest); 199 200 XmlErrorMarshaller awsErrorMarshaller; 201 AWSError<CoreErrors> error = awsErrorMarshaller.Marshall(response); 202 203 ASSERT_EQ(CoreErrors::INCOMPLETE_SIGNATURE, error.GetErrorType()); 204 ASSERT_STREQ("IncompleteSignatureException", error.GetExceptionName().c_str()); 205 ASSERT_STREQ("Message", error.GetMessage().c_str()); 206 ASSERT_FALSE(error.ShouldRetry()); 207 208 ASSERT_EQ(0u, handler.m_onPayloadSegmentCount); 209 ASSERT_EQ(0u, handler.m_onCompletePayloadCount); 210 // Underlying Event Stream Decoder will parse xml error unsuccessfully, an invalid prelude with CRC mismatch error will not trigger onPreludeReceived(). 211 ASSERT_EQ(0u, handler.m_onPreludeReceivedCount); 212 ASSERT_EQ(0u, handler.m_onHeaderReceivedCount); 213 ASSERT_EQ(0u, handler.m_requestLevelErrorsCount); 214 ASSERT_EQ(1u, handler.m_internalErrorsCount); 215 ASSERT_EQ(EventStreamErrors::EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE, handler.m_error); 216 ASSERT_TRUE(handler.m_errorMessage.find("CRC Mismatch.") == 0); 217 } 218 TEST_F(EventStreamTest,TestEncodingEvents)219 TEST_F(EventStreamTest, TestEncodingEvents) 220 { 221 Aws::Client::AWSNullSigner nullSigner; 222 EventEncoderStream io; 223 io.SetSigner(&nullSigner); 224 const char payloadString[] = "Amazon Web Services, Inc."; 225 io.SetSignatureSeed("deadbeef"); 226 constexpr long iterations = 5; 227 for (int i = 0; i < iterations; i++) 228 { 229 io.write(payloadString, sizeof(payloadString)); 230 } 231 232 io.flush(); 233 ASSERT_TRUE(io); 234 235 char output[1024]; 236 io.readsome(output, sizeof(output)); 237 ASSERT_GE(io.gcount(), static_cast<long>(sizeof(payloadString) * iterations)); 238 io.Close(); 239 ASSERT_TRUE(io.eof()); 240 } 241 TEST_F(EventStreamTest,EncodingEventsDecodesCorrectly)242 TEST_F(EventStreamTest, EncodingEventsDecodesCorrectly) 243 { 244 struct MockHandler : Aws::Utils::Event::EventStreamHandler 245 { 246 void OnEvent() override { m_payloads.push_back(GetEventPayloadAsString()); } 247 248 Aws::Vector<Aws::String> GetPayload() const { return m_payloads; } 249 250 Aws::Vector<Aws::String> m_payloads; 251 }; 252 253 // write the payload to the stream and create an event out of it 254 Aws::Client::AWSNullSigner nullSigner; 255 EventEncoderStream io; 256 io.SetSigner(&nullSigner); 257 io.SetSignatureSeed("deadbeef"); 258 const char payloadString[] = "Amazon Web Services, Inc."; 259 Event::Message msg; 260 msg.InsertEventHeader(":message-type", Aws::String("event")); 261 msg.WriteEventPayload(payloadString); 262 io.WriteEvent(msg); 263 264 io.flush(); 265 ASSERT_TRUE(io); 266 267 // read the event bits and attempt to deserialize them 268 char output[1024]; 269 io.readsome(output, sizeof(output)); 270 io.Close(); 271 ASSERT_TRUE(io.eof()); 272 273 // verify that we get the same message out 274 MockHandler handler; 275 EventStreamDecoder decoder(&handler); 276 EventDecoderStream s(decoder); 277 s.write(output, io.gcount()); 278 s.flush(); 279 ASSERT_EQ(1u, handler.m_payloads.size()); // this verifies we received the signed message 280 s.write(handler.m_payloads[0].data(), handler.m_payloads[0].length()); // unwrap the signed message to get the 281 // actual message 282 s.flush(); 283 ASSERT_EQ(2u, handler.m_payloads.size()); 284 ASSERT_STREQ(payloadString, handler.m_payloads[1].c_str()); 285 } 286 } 287