Real-time Sales Aggregation and Visualization with Poetry, Python, WebSocket and Streamlit
This article explores a Python project that demonstrates real-time sales data aggregation and visualization using WebSocket and Streamlit.
In this article, we will go through a Python project that demonstrates real-time sales data aggregation and visualization using WebSocket and Streamlit. We will explain the logic of the code, provide guidance, and share feedback on best practices.
We will be using Poetry as our dependency management tool. Poetry helps us manage project dependencies and makes it easier to maintain reproducible builds. It also simplifies the process of managing virtual environments.
Project setup
To set up our project, we will create a new directory and use Poetry to initialize our project:
$ mkdir real-time-sales && cd real-time-sales
$ poetry init
This will create a new pyproject.toml
file, which will be used to manage our project dependencies.
Our project consists of two main parts: real-time sales data generation and WebSocket server implementation. We will use SQLite as our database for storing sales data and hourly aggregations.
First, let’s define the necessary imports:
import asyncio
import websockets
import sqlite3
import random
from datetime import datetime
import json
from collections import deque
SQLite Database Setup
The setup_database
function creates an SQLite database and initializes two tables: sales
and hourly_aggregation
. The sales
table stores individual sales records, while the hourly_aggregation
table stores the aggregated sales data per hour.
def setup_database():
conn = sqlite3.connect('sales.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS sales
(timestamp DATETIME, ticket_id INTEGER, num_tickets INTEGER, total_amount REAL)''')
c.execute('''CREATE TABLE IF NOT EXISTS hourly_aggregation
(hour_key TEXT, total_tickets INTEGER, total_amount REAL)''')
conn.commit()
return conn, c
Generating Random Sales Data
The generate_sales_data
function generates random sales data with the current timestamp, random ticket_id, num_tickets, and total_amount.
def generate_sales_data():
timestamp = datetime.now()
ticket_id = random.randint(1, 1000)
num_tickets = random.randint(1, 10)
total_amount = num_tickets * random.uniform(10, 50)
return {'timestamp': timestamp, 'ticket_id': ticket_id, 'num_tickets': num_tickets, 'total_amount': total_amount}
Inserting Sales Data into the Database
The insert_sales_data
function inserts the generated sales data into the sales
table.
def insert_sales_data(conn, c, sales_data):
c.execute("INSERT INTO sales (timestamp, ticket_id, num_tickets, total_amount) VALUES (?, ?, ?, ?)",
(sales_data['timestamp'], sales_data['ticket_id'], sales_data['num_tickets'], sales_data['total_amount']))
conn.commit()
Updating Hourly Aggregation
The update_hourly_aggregation
function calculates the hour_key based on the timestamp and updates the hourly_aggregation
table accordingly.
def update_hourly_aggregation(conn, c, sales_data):
hour_key = sales_data['timestamp'].replace(minute=0, second=0, microsecond=0)
c.execute("SELECT * FROM hourly_aggregation WHERE hour_key=?", (hour_key,))
row = c.fetchone()
if row:
c.execute("UPDATE hourly_aggregation SET total_tickets=total_tickets+?, total_amount=total_amount+? WHERE hour_key=?",
(sales_data['num_tickets'], sales_data['total_amount'], hour_key))
else:
c.execute("INSERT INTO hourly_aggregation (hour_key, total_tickets, total_amount) VALUES (?, ?, ?)",
(hour_key, sales_data['num_tickets'], sales_data['total_amount']))
conn.commit()
WebSocket Server
The WebSocket server is responsible for broadcasting sales data to connected clients. It uses asyncio to handle multiple clients concurrently.
The broadcast_sales_data
function sends sales data to all connected clients whenever there is data available in the sales_data_queue
.
async def broadcast_sales_data():
while True:
if sales_data_queue and connected_clients:
data = json.dumps(sales_data_queue.popleft(), default=str)
for websocket in connected_clients:
await websocket.send(data)
await asyncio.sleep(0.1)
The websocket_handler
function manages the connections of WebSocket clients.
async def websocket_handler(websocket, path):
connected_clients.add(websocket)
try:
async for message in websocket:
pass
finally:
connected_clients.remove(websocket)
Main Function for WebSocket Server
The main
function is the core of the WebSocket server. It sets up the database, starts the WebSocket server, and periodically generates random sales data, which is then inserted into the database and broadcasted to connected clients.
async def main():
conn, c = setup_database()
try:
start_server = websockets.serve(websocket_handler, "localhost", 6789)
await start_server
asyncio.create_task(broadcast_sales_data())
while True:
sales_data = generate_sales_data()
insert_sales_data(conn, c, sales_data)
update_hourly_aggregation(conn, c, sales_data)
sales_data_queue.append(sales_data)
await asyncio.sleep(5)
finally:
conn.close()
if __name__ == '__main__':
asyncio.run(main())
Streamlit App for Data Visualization
Now, let’s move on to the Streamlit app for visualizing the hourly sales aggregation data.
Fetching Hourly Aggregation Data
The get_hourly_aggregation_data
function retrieves hourly aggregation data from the SQLite database.
def get_hourly_aggregation_data():
with sqlite3.connect(DB_PATH) as conn:
query = "SELECT * FROM hourly_aggregation"
df = pd.read_sql_query(query, conn)
return df
Visualizing Data with Streamlit and Plotly Express
The visualize_data
function creates a bar chart using Plotly Express to visualize the total sales amount per hour.
def visualize_data(data):
if not data.empty:
fig = px.bar(data, x='hour_key', y='total_amount', title="Total Sales Amount per Hour")
st.plotly_chart(fig, use_container_width=True)
else:
st.write("No data available yet.")
Main Function for Streamlit App
Finally, the main function for the Streamlit app fetches the data, visualizes it, and reruns the application at a specified interval.
def main():
data = get_hourly_aggregation_data()
visualize_data(data)
time.sleep(REFRESH_INTERVAL_SECONDS)
st.experimental_rerun()
if __name__ == "__main__":
main()
Results
And tadaaa! , we have a dashboard showing the aggregation of sales per hour. The dashboard is continuously updated in order to see in real-time store insights 🔥.
Conclusion
To recap, we have dissected the code into smaller components and provided a detailed explanation of each part. We are creating random data integrated into SQLite table. The WebSocket server inserts it into an SQLite database, updates hourly aggregations, and broadcasts the data to connected clients. The Streamlit app fetches the hourly aggregation data from the SQLite database and visualizes it using Plotly Express.
I hope you find this little side project fun, see you soon 🤟