1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 
6 #include <aws/external/gtest.h>
7 #include <aws/event-stream/event_stream.h>
8 #include <aws/core/http/standard/StandardHttpRequest.h>
9 #include <aws/core/http/standard/StandardHttpResponse.h>
10 #include <aws/core/client/AWSError.h>
11 #include <aws/core/client/CoreErrors.h>
12 #include <aws/core/client/AWSErrorMarshaller.h>
13 #include <aws/core/utils/event/EventStream.h>
14 #include <aws/testing/mocks/event/MockEventStreamHandler.h>
15 #include <aws/testing/mocks/event/MockEventStreamDecoder.h>
16 #include <aws/core/utils/memory/AWSMemory.h>
17 
18 namespace
19 {
20     using namespace Aws::Utils;
21     using namespace Aws::Utils::Event;
22     using namespace Aws::Utils::Stream;
23     using namespace Aws::Http;
24     using namespace Aws::Http::Standard;
25     using namespace Aws::Client;
26 
27     const static char ALLOCATION_TAG[] = "EventStreamTests";
28     const static char ERROR_RAW[] = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
29                                     "<Error>"
30                                     "<Code>IncompleteSignatureException</Code>"
31                                     "<Message>Message</Message>"
32                                     "<Resource>Resource</Resource>"
33                                     "<RequestId>RequestId</RequestId>"
34                                     "</Error>";
35 
36     class EventStreamTest : public ::testing::Test
37     {
38     public:
39         static aws_event_stream_message eventStreamMessage;
40         static aws_array_list eventStreamHeaders;
41 
42     protected:
SetUpTestCase()43         static void SetUpTestCase()
44         {
45             // Assemble Records Message
46             // Headers
47             ASSERT_EQ(AWS_OP_SUCCESS, aws_event_stream_headers_list_init(&eventStreamHeaders, Aws::get_aws_allocator()));
48             Aws::Http::HeaderValueCollection headers;
49             headers.insert(Aws::Http::HeaderValuePair(":event-type", "Records"));
50             headers.insert(Aws::Http::HeaderValuePair(":content-type", "application/octet-stream"));
51             headers.insert(Aws::Http::HeaderValuePair(":message-type", "event"));
52             for (const auto& header : headers)
53             {
54                 ASSERT_EQ(AWS_OP_SUCCESS, aws_event_stream_add_string_header(&eventStreamHeaders, header.first.c_str(),
55                             static_cast<uint8_t>(header.first.size()),
56                             header.second.c_str(), static_cast<uint16_t>(header.second.size()), false/*copy*/));
57             }
58             // Payload
59             const char* payload = "Records";
60             aws_byte_buf payloadBuf = aws_byte_buf_from_array(reinterpret_cast<const uint8_t*>(payload), strlen(payload));
61             ASSERT_EQ(AWS_OP_SUCCESS, aws_event_stream_message_init(&eventStreamMessage, Aws::get_aws_allocator(), &eventStreamHeaders, &payloadBuf));
62         }
63 
TearDownTestCase()64         static void TearDownTestCase()
65         {
66             aws_event_stream_message_clean_up(&eventStreamMessage);
67             aws_event_stream_headers_list_cleanup(&eventStreamHeaders);
68         }
69     };
70 
71     aws_event_stream_message EventStreamTest::eventStreamMessage;
72     aws_array_list EventStreamTest::eventStreamHeaders;
73 
TEST_F(EventStreamTest,TestEventStreamDestructor)74     TEST_F(EventStreamTest, TestEventStreamDestructor)
75     {
76         MockEventStreamHandler handler;
77         MockEventStreamDecoder decoder(&handler);
78 
79         const uint8_t* data_raw = aws_event_stream_message_buffer(&eventStreamMessage);
80         {
81             EventDecoderStream stream(decoder);
82             stream.write(reinterpret_cast<const char*>(data_raw), aws_event_stream_message_total_length(&eventStreamMessage));
83         }
84 
85         ASSERT_EQ(1u, handler.m_onPayloadSegmentCount);
86         ASSERT_EQ(1u, handler.m_onCompletePayloadCount);
87         ASSERT_EQ(1u, handler.m_onPreludeReceivedCount);
88         ASSERT_EQ(3u, handler.m_onHeaderReceivedCount);
89         ASSERT_EQ(0u, handler.m_requestLevelErrorsCount);
90         ASSERT_EQ(0u, handler.m_internalErrorsCount);
91 
92         ASSERT_EQ(1u, handler.m_onRecordsCount);
93     }
94 
TEST_F(EventStreamTest,TestEventStreamFlush)95     TEST_F(EventStreamTest, TestEventStreamFlush)
96     {
97         MockEventStreamHandler handler;
98         MockEventStreamDecoder decoder(&handler);
99 
100         const uint8_t* data_raw = aws_event_stream_message_buffer(&eventStreamMessage);
101         EventDecoderStream stream(decoder);
102 
103         size_t preludeLength = 4/*total byte-length*/ + 4/*headers byte-length*/ + 4/*prelude crc*/;
104         size_t headersLength = aws_event_stream_message_headers_len(&eventStreamMessage);
105         size_t payloadLength = aws_event_stream_message_payload_len(&eventStreamMessage);
106 
107         // Write prelude and headers to stream.
108         size_t partialLength = preludeLength + headersLength;
109 
110         stream.write(reinterpret_cast<const char*>(data_raw), partialLength);
111         stream.flush();
112 
113         ASSERT_EQ(0u, handler.m_onPayloadSegmentCount);
114         ASSERT_EQ(0u, handler.m_onCompletePayloadCount);
115         ASSERT_EQ(1u, handler.m_onPreludeReceivedCount);
116         ASSERT_EQ(3u, handler.m_onHeaderReceivedCount);
117         ASSERT_EQ(0u, handler.m_requestLevelErrorsCount);
118         ASSERT_EQ(0u, handler.m_internalErrorsCount);
119 
120         ASSERT_EQ(0u, handler.m_onRecordsCount);
121 
122         // Write payload to stream.
123         data_raw += partialLength;
124         partialLength = payloadLength + 4/*message crc*/;
125 
126         stream.write(reinterpret_cast<const char*>(data_raw), partialLength);
127         stream.flush();
128 
129         ASSERT_EQ(1u, handler.m_onPayloadSegmentCount);
130         ASSERT_EQ(1u, handler.m_onCompletePayloadCount);
131         ASSERT_EQ(1u, handler.m_onPreludeReceivedCount);
132         ASSERT_EQ(3u, handler.m_onHeaderReceivedCount);
133         ASSERT_EQ(0u, handler.m_requestLevelErrorsCount);
134         ASSERT_EQ(0u, handler.m_internalErrorsCount);
135 
136         ASSERT_EQ(1u, handler.m_onRecordsCount);
137     }
138 
TEST_F(EventStreamTest,TestEventStreamLargePayload)139     TEST_F(EventStreamTest, TestEventStreamLargePayload)
140     {
141         MockEventStreamHandler handler;
142         MockEventStreamDecoder decoder(&handler);
143 
144         const uint8_t* data_raw = aws_event_stream_message_buffer(&eventStreamMessage);
145         size_t totalLength = aws_event_stream_message_total_length(&eventStreamMessage);
146         EventDecoderStream stream(decoder, totalLength / 2);
147 
148         stream.write(reinterpret_cast<const char*>(data_raw), totalLength);
149         stream.flush();
150 
151         ASSERT_EQ(1u, handler.m_onPayloadSegmentCount);
152         ASSERT_EQ(1u, handler.m_onCompletePayloadCount);
153         ASSERT_EQ(1u, handler.m_onPreludeReceivedCount);
154         ASSERT_EQ(3u, handler.m_onHeaderReceivedCount);
155         ASSERT_EQ(0u, handler.m_requestLevelErrorsCount);
156         ASSERT_EQ(0u, handler.m_internalErrorsCount);
157 
158         ASSERT_EQ(1u, handler.m_onRecordsCount);
159     }
160 
TEST_F(EventStreamTest,TestXmlErrorPayloadWithAwsString)161     TEST_F(EventStreamTest, TestXmlErrorPayloadWithAwsString)
162     {
163         MockEventStreamHandler handler;
164         MockEventStreamDecoder decoder(&handler);
165 
166         EventDecoderStream stream(decoder);
167         stream.write(ERROR_RAW, sizeof(ERROR_RAW));
168         stream.flush();
169 
170         Aws::StringStream ss;
171         ss << stream.rdbuf();
172         ASSERT_STREQ(ERROR_RAW, ss.str().c_str());
173 
174         ASSERT_EQ(0u, handler.m_onPayloadSegmentCount);
175         ASSERT_EQ(0u, handler.m_onCompletePayloadCount);
176         // Underlying Event Stream Decoder will parse xml error unsuccessfully, an invalid prelude with CRC mismatch error will not trigger onPreludeReceived().
177         ASSERT_EQ(0u, handler.m_onPreludeReceivedCount);
178         ASSERT_EQ(0u, handler.m_onHeaderReceivedCount);
179         ASSERT_EQ(0u, handler.m_requestLevelErrorsCount);
180         ASSERT_EQ(1u, handler.m_internalErrorsCount);
181         ASSERT_EQ(EventStreamErrors::EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE, handler.m_error);
182         ASSERT_TRUE(handler.m_errorMessage.find("CRC Mismatch.") == 0);
183     }
184 
TEST_F(EventStreamTest,TestXmlErrorPayloadWithAwsErrorMarshaller)185     TEST_F(EventStreamTest, TestXmlErrorPayloadWithAwsErrorMarshaller)
186     {
187         MockEventStreamHandler handler;
188         MockEventStreamDecoder decoder(&handler);
189 
190         auto stream = Aws::New<EventDecoderStream>(ALLOCATION_TAG, decoder);
191         stream->write(ERROR_RAW, sizeof(ERROR_RAW));
192         stream->flush();
193 
194         auto fakeRequest = Aws::MakeShared<StandardHttpRequest>(ALLOCATION_TAG,
195             "/some/uri", Aws::Http::HttpMethod::HTTP_POST);
196         fakeRequest->SetResponseStreamFactory([=] { return stream; });
197 
198         StandardHttpResponse response(fakeRequest);
199 
200         XmlErrorMarshaller awsErrorMarshaller;
201         AWSError<CoreErrors> error = awsErrorMarshaller.Marshall(response);
202 
203         ASSERT_EQ(CoreErrors::INCOMPLETE_SIGNATURE, error.GetErrorType());
204         ASSERT_STREQ("IncompleteSignatureException", error.GetExceptionName().c_str());
205         ASSERT_STREQ("Message", error.GetMessage().c_str());
206         ASSERT_FALSE(error.ShouldRetry());
207 
208         ASSERT_EQ(0u, handler.m_onPayloadSegmentCount);
209         ASSERT_EQ(0u, handler.m_onCompletePayloadCount);
210         // Underlying Event Stream Decoder will parse xml error unsuccessfully, an invalid prelude with CRC mismatch error will not trigger onPreludeReceived().
211         ASSERT_EQ(0u, handler.m_onPreludeReceivedCount);
212         ASSERT_EQ(0u, handler.m_onHeaderReceivedCount);
213         ASSERT_EQ(0u, handler.m_requestLevelErrorsCount);
214         ASSERT_EQ(1u, handler.m_internalErrorsCount);
215         ASSERT_EQ(EventStreamErrors::EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE, handler.m_error);
216         ASSERT_TRUE(handler.m_errorMessage.find("CRC Mismatch.") == 0);
217     }
218 
TEST_F(EventStreamTest,TestEncodingEvents)219     TEST_F(EventStreamTest, TestEncodingEvents)
220     {
221         Aws::Client::AWSNullSigner nullSigner;
222         EventEncoderStream io;
223         io.SetSigner(&nullSigner);
224         const char payloadString[] = "Amazon Web Services, Inc.";
225         io.SetSignatureSeed("deadbeef");
226         constexpr long iterations = 5;
227         for (int i = 0; i < iterations; i++)
228         {
229             io.write(payloadString, sizeof(payloadString));
230         }
231 
232         io.flush();
233         ASSERT_TRUE(io);
234 
235         char output[1024];
236         io.readsome(output, sizeof(output));
237         ASSERT_GE(io.gcount(), static_cast<long>(sizeof(payloadString) * iterations));
238         io.Close();
239         ASSERT_TRUE(io.eof());
240     }
241 
TEST_F(EventStreamTest,EncodingEventsDecodesCorrectly)242     TEST_F(EventStreamTest, EncodingEventsDecodesCorrectly)
243     {
244         struct MockHandler : Aws::Utils::Event::EventStreamHandler
245         {
246             void OnEvent() override { m_payloads.push_back(GetEventPayloadAsString()); }
247 
248             Aws::Vector<Aws::String> GetPayload() const { return m_payloads; }
249 
250             Aws::Vector<Aws::String> m_payloads;
251         };
252 
253         // write the payload to the stream and create an event out of it
254         Aws::Client::AWSNullSigner nullSigner;
255         EventEncoderStream io;
256         io.SetSigner(&nullSigner);
257         io.SetSignatureSeed("deadbeef");
258         const char payloadString[] = "Amazon Web Services, Inc.";
259         Event::Message msg;
260         msg.InsertEventHeader(":message-type", Aws::String("event"));
261         msg.WriteEventPayload(payloadString);
262         io.WriteEvent(msg);
263 
264         io.flush();
265         ASSERT_TRUE(io);
266 
267         // read the event bits and attempt to deserialize them
268         char output[1024];
269         io.readsome(output, sizeof(output));
270         io.Close();
271         ASSERT_TRUE(io.eof());
272 
273         // verify that we get the same message out
274         MockHandler handler;
275         EventStreamDecoder decoder(&handler);
276         EventDecoderStream s(decoder);
277         s.write(output, io.gcount());
278         s.flush();
279         ASSERT_EQ(1u, handler.m_payloads.size()); // this verifies we received the signed message
280         s.write(handler.m_payloads[0].data(), handler.m_payloads[0].length()); // unwrap the signed message to get the
281                                                                                // actual message
282         s.flush();
283         ASSERT_EQ(2u, handler.m_payloads.size());
284         ASSERT_STREQ(payloadString, handler.m_payloads[1].c_str());
285     }
286 }
287