1"""Unit tests verifying put-related delivery stream APIs."""
2import boto3
3import sure  # noqa pylint: disable=unused-import
4
5from moto import mock_firehose
6from moto import mock_s3
7from moto.core import ACCOUNT_ID
8from moto.core.utils import get_random_hex
9from tests.test_firehose.test_firehose import TEST_REGION
10from tests.test_firehose.test_firehose import sample_s3_dest_config
11from tests.test_firehose.test_firehose_destination_types import (
12    create_redshift_delivery_stream,
13)
14
15S3_LOCATION_CONSTRAINT = "us-west-1"
16
17
18@mock_firehose
19def test_put_record_redshift_destination():
20    """Test invocations of put_record() to a Redshift destination.
21
22    At the moment, for Redshift or Elasticsearch destinations, the data
23    is just thrown away
24    """
25    client = boto3.client("firehose", region_name=TEST_REGION)
26
27    stream_name = f"test_put_record_{get_random_hex(6)}"
28    create_redshift_delivery_stream(client, stream_name)
29    result = client.put_record(
30        DeliveryStreamName=stream_name, Record={"Data": "some test data"}
31    )
32    assert set(result.keys()) == {"RecordId", "Encrypted", "ResponseMetadata"}
33
34
35@mock_firehose
36def test_put_record_batch_redshift_destination():
37    """Test invocations of put_record_batch() to a Redshift destination.
38
39    At the moment, for Redshift or Elasticsearch destinations, the data
40    is just thrown away
41    """
42    client = boto3.client("firehose", region_name=TEST_REGION)
43
44    stream_name = f"test_put_record_{get_random_hex(6)}"
45    create_redshift_delivery_stream(client, stream_name)
46    records = [{"Data": "one"}, {"Data": "two"}, {"Data": "three"}]
47    result = client.put_record_batch(DeliveryStreamName=stream_name, Records=records)
48    assert set(result.keys()) == {
49        "FailedPutCount",
50        "Encrypted",
51        "RequestResponses",
52        "ResponseMetadata",
53    }
54    assert result["FailedPutCount"] == 0
55    assert result["Encrypted"] is False
56    for response in result["RequestResponses"]:
57        assert set(response.keys()) == {"RecordId"}
58
59
60@mock_firehose
61def test_put_record_http_destination():
62    """Test invocations of put_record() to a Http destination."""
63    client = boto3.client("firehose", region_name=TEST_REGION)
64    s3_dest_config = sample_s3_dest_config()
65
66    stream_name = f"test_put_record_{get_random_hex(6)}"
67    client.create_delivery_stream(
68        DeliveryStreamName=stream_name,
69        HttpEndpointDestinationConfiguration={
70            "EndpointConfiguration": {"Url": "https://google.com"},
71            "S3Configuration": s3_dest_config,
72        },
73    )
74    result = client.put_record(
75        DeliveryStreamName=stream_name, Record={"Data": "some test data"}
76    )
77    assert set(result.keys()) == {"RecordId", "Encrypted", "ResponseMetadata"}
78
79
80@mock_firehose
81def test_put_record_batch_http_destination():
82    """Test invocations of put_record_batch() to a Http destination."""
83    client = boto3.client("firehose", region_name=TEST_REGION)
84    s3_dest_config = sample_s3_dest_config()
85
86    stream_name = f"test_put_record_{get_random_hex(6)}"
87    client.create_delivery_stream(
88        DeliveryStreamName=stream_name,
89        HttpEndpointDestinationConfiguration={
90            "EndpointConfiguration": {"Url": "https://google.com"},
91            "S3Configuration": s3_dest_config,
92        },
93    )
94    records = [{"Data": "one"}, {"Data": "two"}, {"Data": "three"}]
95    result = client.put_record_batch(DeliveryStreamName=stream_name, Records=records)
96    assert set(result.keys()) == {
97        "FailedPutCount",
98        "Encrypted",
99        "RequestResponses",
100        "ResponseMetadata",
101    }
102    assert result["FailedPutCount"] == 0
103    assert result["Encrypted"] is False
104    for response in result["RequestResponses"]:
105        assert set(response.keys()) == {"RecordId"}
106
107
108@mock_s3
109@mock_firehose
110def test_put_record_batch_extended_s3_destination():
111    """Test invocations of put_record_batch() to a S3 destination."""
112    client = boto3.client("firehose", region_name=TEST_REGION)
113
114    # Create a S3 bucket.
115    bucket_name = "firehosetestbucket"
116    s3_client = boto3.client("s3", region_name=TEST_REGION)
117    s3_client.create_bucket(
118        Bucket=bucket_name,
119        CreateBucketConfiguration={"LocationConstraint": S3_LOCATION_CONSTRAINT},
120    )
121
122    stream_name = f"test_put_record_{get_random_hex(6)}"
123    client.create_delivery_stream(
124        DeliveryStreamName=stream_name,
125        ExtendedS3DestinationConfiguration={
126            "RoleARN": f"arn:aws:iam::{ACCOUNT_ID}:role/firehose-test-role",
127            "BucketARN": f"arn:aws:s3::{bucket_name}",
128        },
129    )
130    records = [{"Data": "one"}, {"Data": "two"}, {"Data": "three"}]
131    result = client.put_record_batch(DeliveryStreamName=stream_name, Records=records)
132    assert set(result.keys()) == {
133        "FailedPutCount",
134        "Encrypted",
135        "RequestResponses",
136        "ResponseMetadata",
137    }
138    assert result["FailedPutCount"] == 0
139    assert result["Encrypted"] is False
140    for response in result["RequestResponses"]:
141        assert set(response.keys()) == {"RecordId"}
142
143    # Pull data from S3 bucket.
144    bucket_objects = s3_client.list_objects_v2(Bucket=bucket_name)
145    response = s3_client.get_object(
146        Bucket=bucket_name, Key=bucket_objects["Contents"][0]["Key"]
147    )
148    assert response["Body"].read() == b"onetwothree"
149