This article consists of a step by step tutorial to create and test a Kinesis Firehose stream with Boto3 in order to ingest streaming data into AWS platform.
As the size and sources of streaming data increase exponentially, it is essential for companies to load and process data in near real-time in order to exploit its potential. As shown in the below, in a typical scenario a data producer generates input data (logs, clickstreams, IOT, etc.) which needs to be ingested. AWS Kinesis Firehose is a fully managed service which allows ingesting, transforming and loading streaming data into other AWS services such as Amazon S3, Redshift, etc., enabling real-time analytics or further processing. This tutorial focuses on the programmatic implementation of a Kinesis Firehose delivery stream, which is the underlying Firehose entity, using Boto3 (AWS Python SDK). The full code is available on GitHub.
A climate sensor measures temperature-related data which are returned via a JSON API response. In order to process data records in near real-time, each measurement record should be ingested and stored in S3 for further processing. This can be achieved by a Kinesis Firehose delivery stream.
The solution to the problem consists of multiple steps:
- Create a configuration file where credentials and other parameters are stored
- Create an S3 bucket where the streaming data will be stored
- Create an Identity Access Management (IAM) role to allow Kinesis Firehose to write to S3
- Create a policy granting write permission into the S3 bucket created above
- Attach the policy to the IAM role
- Create a Firehose delivery stream
- Test the Firehose delivery stream.
In order to replicate this tutorial, the following requirements should be met:
- An AWS account should be set-up
- A AWS user with Admin permission (if not set up yet please refer here) should have been created and a set of credentials (Access Key ID and Secret Access Key) should be available
- Python and Boto3 (AWS SDK for Python) should be installed on the user’s machine.
Create a configuration file where credentials and other parameters are stored
AWS best practices recommend storing credentials and other settings in a different file (“dl.cfg” in our case) which should not be public (e.g. on GitHub) in order to avoid exposing them. Also, it is recommended to store in this file all the parameters used in other services. The parameters used for other AWS services that will be called will be described in the following sections.
Once all the needed parameters have been stored in the configuration file, they can be read in the main file.
Create a S3 bucket where the streaming data will be stored
In this section, a S3 bucket that will serve as a destination for the Kinesis Firehose delivery stream will be created. This can be achieved through the following steps:
- Define Boto3 S3 client to programmatically access S3
- Define a function to create a new bucket
- Run the function.
The Boto3 S3 client is a tool which enables the user to programmatically access AWS S3 resources using the credentials provided. Since there cannot be more than one S3 bucket with a given name, the function to create a new S3 bucket performs a checks on the S3 buckets accessible to the user and returns a negative message in case the bucket already exists. In case it does not exist, it proceeds with the creation of a new bucket. Although several other optional parameters can be set (such as Access Control List, region of creation, permissions and ownership), for a matter of simplicity only the bucket name will be specified in this case.
Create an Identity Access Management role to allow Kinesis Firehose writing to S3
In order to allow Kinesis Firehose writing data into a S3 bucket, an IAM role should be created. Roles represent AWS identity that can be granted permission through attached policies. This role will allow AWS services to be called on behalf of the user. Similarly to the S3 service, the AWS IAM service can be accessed by Python SDK Boto3 using a specific client. In the code below, the following operations will be executed:
- define client to control IAM
- check if any role with the name defined in the config file already exists and (if it does) delete it
- create a new role destined to Kinesis Firehose.
Since there can only be one AWS role with a given name, the code will check if a role with the same name exists and, if it does, it will delete it. Then it will proceed with the creation of a new role.
Create a policy granting write permission into the S3 bucket created above
In order for Firehose to write ingested data into the S3 destination bucket, it is necessary to create a policy that specifies this specific permission for Firehose and attach it to the role created above. Policies consist of JSON files declaring permission associated with specific AWS resources. The code below will:
- Iterate over current policies to identify any policy having the same name as one to be created
- If any policy is found, iterate over the roles to which the policy is attached
- Detach the policy from any role it is attached to
- Delete all the versions of the policy.
Create a Firehose delivery stream
Once the destination bucket, the role and the policy have been set up, the Firehose stream can be created. Since there cannot be two streams with the same name in a given region, the code will check and eventually delete any stream with the same name. Then, the new stream will be created. For a matter of simplicity, in this tutorial only the mandatory parameters will be defined but other configuration features such as encryption, compression and record processing can be defined too. In particular, the code below defines:
- The stream name which uniquely identifies the delivery strea
- The source as Direct Put (selecting this option the producer will write directly to the stream)
- The S3 destination bucket
- The buffer size (5MBs) and the buffer interval (60s): whatever is reached first will determine the frequency of the transmission of the records to S3.
Test the Firehose stream
After the stream has been created, some sample records will be produced and sent to the stream using the Put Record method.
In the AWS Dashboard below, it is possible to visualize metrics about the streaming data ingestion. The dashboard confirms that records are being ingested with a 5MB size buffer and delivered to S3 as expected.
Delete the Firehose stream
If the stream was created for testing purposes, it is recommended to delete it to avoid incurring unexpected charges. This can be achieved with the following code.
In this tutorial it has been presented how to create a fully working AWS Kinesis Firehose delivery stream to write streaming data generated by a producer in near real-time into an S3 bucket using Boto3. Initially, an S3 destination bucket has been created and the relevant IAM permissions have been established. Once the Firehose stream has been created, it has been tested by generating data and ingesting them into the bucket, verifying that everything is working correctly through the AWS Management Console.
The code proposed can be modified to enhance the ingestion, for instance by replacing the Firehose near real-time stream with a real-time Kinesis Data Stream or by introducing a post-ingestion processing step with Kinesis Data Analytics.