Retail Transaction Data Real Time Using Kafka & AWS

Tri Juhari
7 min readSep 16, 2023

--

In this post, I will share the steps taken and key learnings from the pipeline this project . This project exposed me to working with data realtime using kafka and AWS services. This project suitable for beginner, if you are new to AWS services, it’s easier to start with follow the step by step in this article.

Objective

Develop an ETL (Extract, Transform, Load) pipeline using Python and AWS services to explore the functionality of real-time data in Kafka and how to implement it using AWS services.

This project will be using retail transaction dataset, using Amazon Compute Cloud (EC2) for virtual environment that contains an operating system, applications, and computing resources like CPU, RAM, and storage. Using Amazon S3 as the storage location for its data, and then it will utilize AWS Glue as a data catalog, and Amazon Athena for data exploration and interactive queries.

Key Learnings

What you can learn from this project:

  • How implementing kafka as platform for real-time data streaming
  • Write extract data and store into S3
  • How using AWS Glue for data catalog
  • How using Athena for data exploration and interactive queries.

Architecture Diagram

Tools used : Python, AWS Services (EC2, S3, Glue, Athena)

This is a brief overview of what each services does:

Amazon EC2 : a cloud computing service that provides virtual servers to run applications.

Amazon S3: a highly scalable, durable, and available object storage service, Each file is called an object and data is stored in buckets.

Crawler : Component of AWS Glue that automatically scans and analyzes data sources to infer their schema and create metadata tables.

Glue Data Catalog: Fully managed metadata repository provided by AWS Glue. It acts as a central repository for storing and organizing metadata information about various data sources, including tables, schemas, and partitions. You can use the Glue Data Catalog without the Crawler if you already have the metadata information or prefer to define and manage the metadata manually and can directly create and populate tables in the Glue Data Catalog.

Athena: Interactive query service to analyze data stored in various sources using standard SQL queries. You can query data from the Glue Data Catalog, S3 and other supported data sources.

Kafka Installation on EC2

The first step is to have an AWS account. If you don’t have one, you can sign up for one. Once you have an account, log in and search for the EC2 service. This is the virtual machine (VM) that will be used to run Kafka.

The next step is to create an instance by clicking the Launch Instance. Then, enter the instance name, which can be the name of the project you are working on. Next, select the operating system that will be used as the base to run Kafka. Don’t forget to create the key pair as well, which is used to connect between your local computer and the EC2 instance using the SSH protocol. If the instance is successfully created, it will appear on the dashboard page and its status will change to running.

Connecting Amazon EC2 and Kafka

Next, we need to connect our local computer to the Instance. The step-by-step guide can be found at this link. After installing Kafka, it is important to configure the Kafka IP with the public IP on the VM. Next, start Zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

Then, start the Kafka Server.

bin/kafka-server-start.sh config/server.properties

The next step is to create a topic in Kafka. A topic is a logical grouping of messages. Messages are published to a topic and then consumed by one or more applications. Kafka topics can be used to decouple applications and to provide a way to store and process data in a distributed manner.

bin/kafka-topics.sh --create --topic Transaction    --bootstrap-server 13.236.191.210:9092 --replication-factor 1 --partitions 1

Because this project is only for learning purposes, the replication factor is set to 1 and partitions set to 1. Replication factor is a copy of data from a topic in one Kafka broker. It also determines the number of copies of data that are stored in different Kafka brokers. If the broker that stores the topic data fails, the data can still be accessed in another broker. It is recommended to use a replication factor of 2 or more. This will ensure that there are multiple copies of data stored in different brokers, which will protect the data from loss in the event of a broker failure.

Kafka Producer

The topic has been created. Next, start the Kafka producer. A Kafka producer works by connecting to a Kafka cluster. Once connected to a Kafka cluster, a Kafka producer can send messages to a Kafka topic. Messages sent by a Kafka producer can contain any data, depending on the needs of the application.

bin/kafka-console-producer.sh --topic Transaction --bootstrap-server #ip address:9092

Kafka Consumer

Kafka producer has been running, next run the Kafka consumer using the command below. A Kafka consumer works by subscribing to a Kafka topic. After subscribing to a topic, a Kafka consumer will begin receiving messages from that topic. The messages received by a Kafka consumer can be processed in various ways, depending on the needs of the application.

bin/kafka-console-consumer.sh --topic Transaction --bootstrap-server #ip address:9092

An illustration of how Kafka works

Retail Transaction Data Realtime

Kafka has been successfully launched. Next, we will attempt to implement the use of Kafka using retail transaction data. Here’s a preview of the data that will be used.

Amazon S3 Configuration

Data storage is needed to store all data that will be processed through the Kafka platform. S3 offers various needs, including:

  • Secure and reliable storage of large amounts of data.
  • Fast and easy access to data from anywhere.
  • Easy data management and organization.

The first step is to create an S3 bucket. An S3 bucket is a container for storing objects. To create an S3 bucket, open the S3 console and click the Create bucket button. After you create an S3 bucket, a dashboard will appear that lists all of the buckets that you have created. This dashboard is a convenient way to view and manage your buckets.

Initiating Real-Time Data Production

Here is the code that can be used to initiate and execute the data production process through Kafka. In this code, we use the Kafka Python library to connect to a Kafka broker, send messages, and produce data to a specified topic. The data production process through Kafka is a crucial component of streaming data architecture, enabling applications to send and receive data in real-time through the reliable Kafka infrastructure.

import pandas as pd
from kafka import KafkaProducer
from json import dumps
import time

producer = KafkaProducer(bootstrap_servers=[':9092'], #change ip here
value_serializer=lambda x:
dumps(x).encode('utf-8'))


df = pd.read_csv("indexProcessed.csv")
dict_stock = df.sample(1).to_dict(orient='records')[0]



start_time = time.time() # Get the current time in seconds

while True:
producer.send('Transaction',
value= dict_stock)
elapsed_time = time.time() - start_time
print('a')

if elapsed_time >= 4:
break # Break out of the loop after 4 seconds

producer.flush()

Initiating Real-Time Data Consuming

This code represents the implementation for consuming data through Kafka and subsequently storing it in an S3 data repository. Within this code, the Kafka Python library is employed to establish a connection with Kafka brokers, consume messages, and then seamlessly transfer and archive the acquired data into an S3 storage system.

 
from kafka import KafkaConsumer
import json
from s3fs import S3FileSystem

consumer = KafkaConsumer(
'Transaction',
bootstrap_servers=[':9092'], #add your IP here
value_deserializer=lambda x: loads(x.decode('utf-8')))

s3 = S3FileSystem()

for index, values in enumerate(consumer):
with s3.open("s3://retail-transaction-kafka/stock_market_.{}.json".format(index), 'w') as file:
json.dump(values.value, file)

After successfully running, the data that is sent and subsequently saved will be in the form of JSON files, and the data will appear in the S3 bucket.

Configuration AWS Glue

After the data has been stored in the S3 bucket, the next step involves leveraging AWS Glue for data processing. AWS Glue offers a robust set of tools, including crawlers, to automatically discover and catalog the data stored in your S3 bucket. These crawlers play a pivotal role in identifying the data’s structure and schema, making it easier to transform and analyze the data effectively.

Interactive Query with Athena

After creating and running the crawler, the next step is to utilize the AWS Athena service. AWS Athena allows you to easily run SQL queries on the data that has been cataloged in the AWS Glue Data Catalog. With AWS Athena, you can swiftly analyze data stored in S3 without the need to manage a separate database infrastructure. This simplifies the data exploration and processing process, enabling you to gain valuable insights from the data processed through the crawler and stored in S3.

--

--

No responses yet