1"""Unit tests verifying put-related delivery stream APIs.""" 2import boto3 3import sure # noqa pylint: disable=unused-import 4 5from moto import mock_firehose 6from moto import mock_s3 7from moto.core import ACCOUNT_ID 8from moto.core.utils import get_random_hex 9from tests.test_firehose.test_firehose import TEST_REGION 10from tests.test_firehose.test_firehose import sample_s3_dest_config 11from tests.test_firehose.test_firehose_destination_types import ( 12 create_redshift_delivery_stream, 13) 14 15S3_LOCATION_CONSTRAINT = "us-west-1" 16 17 18@mock_firehose 19def test_put_record_redshift_destination(): 20 """Test invocations of put_record() to a Redshift destination. 21 22 At the moment, for Redshift or Elasticsearch destinations, the data 23 is just thrown away 24 """ 25 client = boto3.client("firehose", region_name=TEST_REGION) 26 27 stream_name = f"test_put_record_{get_random_hex(6)}" 28 create_redshift_delivery_stream(client, stream_name) 29 result = client.put_record( 30 DeliveryStreamName=stream_name, Record={"Data": "some test data"} 31 ) 32 assert set(result.keys()) == {"RecordId", "Encrypted", "ResponseMetadata"} 33 34 35@mock_firehose 36def test_put_record_batch_redshift_destination(): 37 """Test invocations of put_record_batch() to a Redshift destination. 38 39 At the moment, for Redshift or Elasticsearch destinations, the data 40 is just thrown away 41 """ 42 client = boto3.client("firehose", region_name=TEST_REGION) 43 44 stream_name = f"test_put_record_{get_random_hex(6)}" 45 create_redshift_delivery_stream(client, stream_name) 46 records = [{"Data": "one"}, {"Data": "two"}, {"Data": "three"}] 47 result = client.put_record_batch(DeliveryStreamName=stream_name, Records=records) 48 assert set(result.keys()) == { 49 "FailedPutCount", 50 "Encrypted", 51 "RequestResponses", 52 "ResponseMetadata", 53 } 54 assert result["FailedPutCount"] == 0 55 assert result["Encrypted"] is False 56 for response in result["RequestResponses"]: 57 assert set(response.keys()) == {"RecordId"} 58 59 60@mock_firehose 61def test_put_record_http_destination(): 62 """Test invocations of put_record() to a Http destination.""" 63 client = boto3.client("firehose", region_name=TEST_REGION) 64 s3_dest_config = sample_s3_dest_config() 65 66 stream_name = f"test_put_record_{get_random_hex(6)}" 67 client.create_delivery_stream( 68 DeliveryStreamName=stream_name, 69 HttpEndpointDestinationConfiguration={ 70 "EndpointConfiguration": {"Url": "https://google.com"}, 71 "S3Configuration": s3_dest_config, 72 }, 73 ) 74 result = client.put_record( 75 DeliveryStreamName=stream_name, Record={"Data": "some test data"} 76 ) 77 assert set(result.keys()) == {"RecordId", "Encrypted", "ResponseMetadata"} 78 79 80@mock_firehose 81def test_put_record_batch_http_destination(): 82 """Test invocations of put_record_batch() to a Http destination.""" 83 client = boto3.client("firehose", region_name=TEST_REGION) 84 s3_dest_config = sample_s3_dest_config() 85 86 stream_name = f"test_put_record_{get_random_hex(6)}" 87 client.create_delivery_stream( 88 DeliveryStreamName=stream_name, 89 HttpEndpointDestinationConfiguration={ 90 "EndpointConfiguration": {"Url": "https://google.com"}, 91 "S3Configuration": s3_dest_config, 92 }, 93 ) 94 records = [{"Data": "one"}, {"Data": "two"}, {"Data": "three"}] 95 result = client.put_record_batch(DeliveryStreamName=stream_name, Records=records) 96 assert set(result.keys()) == { 97 "FailedPutCount", 98 "Encrypted", 99 "RequestResponses", 100 "ResponseMetadata", 101 } 102 assert result["FailedPutCount"] == 0 103 assert result["Encrypted"] is False 104 for response in result["RequestResponses"]: 105 assert set(response.keys()) == {"RecordId"} 106 107 108@mock_s3 109@mock_firehose 110def test_put_record_batch_extended_s3_destination(): 111 """Test invocations of put_record_batch() to a S3 destination.""" 112 client = boto3.client("firehose", region_name=TEST_REGION) 113 114 # Create a S3 bucket. 115 bucket_name = "firehosetestbucket" 116 s3_client = boto3.client("s3", region_name=TEST_REGION) 117 s3_client.create_bucket( 118 Bucket=bucket_name, 119 CreateBucketConfiguration={"LocationConstraint": S3_LOCATION_CONSTRAINT}, 120 ) 121 122 stream_name = f"test_put_record_{get_random_hex(6)}" 123 client.create_delivery_stream( 124 DeliveryStreamName=stream_name, 125 ExtendedS3DestinationConfiguration={ 126 "RoleARN": f"arn:aws:iam::{ACCOUNT_ID}:role/firehose-test-role", 127 "BucketARN": f"arn:aws:s3::{bucket_name}", 128 }, 129 ) 130 records = [{"Data": "one"}, {"Data": "two"}, {"Data": "three"}] 131 result = client.put_record_batch(DeliveryStreamName=stream_name, Records=records) 132 assert set(result.keys()) == { 133 "FailedPutCount", 134 "Encrypted", 135 "RequestResponses", 136 "ResponseMetadata", 137 } 138 assert result["FailedPutCount"] == 0 139 assert result["Encrypted"] is False 140 for response in result["RequestResponses"]: 141 assert set(response.keys()) == {"RecordId"} 142 143 # Pull data from S3 bucket. 144 bucket_objects = s3_client.list_objects_v2(Bucket=bucket_name) 145 response = s3_client.get_object( 146 Bucket=bucket_name, Key=bucket_objects["Contents"][0]["Key"] 147 ) 148 assert response["Body"].read() == b"onetwothree" 149