SoFunction
Updated on 2025-04-16

Python builds a simple data processing pipeline

Data processing pipelines are a very common concept in data analysis and engineering. Through the design of pipelines, data collection, processing, storage and other steps can be connected to realize automated data flow. Using Python to build a simple data processing pipeline, we will learn how to build such a process step by step, and attach a flowchart to help you better understand how data flows work.

What is a data processing pipeline?

A data processing pipeline is a collection of data processing steps. Each step is part of the processing pipeline from the acquisition of data to the final data output. Pipeline design can make data processing more efficient, repeatable and automated. For example, you can collect data from an API, clean and process the data, and then store the processed data in the database for subsequent analysis.

Basic steps of data processing pipeline

Let's build a simple Python data processing pipeline that contains the following steps:

  1. Data collection: Get raw data from the API.
  2. Data cleaning: Filter and process the original data to remove invalid data.
  3. Data conversion: Convert data into a structure suitable for storage and analysis.
  4. Data storage: Save the cleaned and converted data to the database.

flow chart

The following figure shows the workflow of the data processing pipeline we want to build:

+-------------+      +--------------+      +--------------+      +---------------+
| Data collection    | ---> | Data cleaning     | ---> | Data conversion     | ---> | Data storage      |
| (API ask)  |      | (Remove invalid data) |      | (Structured data) |      | (Save to database) |
+-------------+      +--------------+      +--------------+      +---------------+

Code examples for building a data processing pipeline

We will use some common libraries in Python to implement the above pipeline. Here are the libraries we want to use:

  • requests: Used to get data from the API.
  • pandas: Used for data cleaning and conversion.
  • sqlite3: Used to store data into a SQLite database.

Step 1: Data acquisition

First, we will get the data from an open API. Here we use a simple example to get some sample data from JSONPlaceholder.

import requests
import pandas as pd
import sqlite3

# Data Collection - Get data from APIdef fetch_data():
    url = "/posts"
    response = (url)
    if response.status_code == 200:
        data = ()
        return data
    else:
        raise Exception(f"Failed to fetch data: {response.status_code}")

# Call the data acquisition functiondata = fetch_data()
print(f"The number of data obtained: {len(data)}")

Step 2: Data Cleaning

Next, we will use Pandas to convert the raw data to DataFrame format and perform a simple cleaning of the data, such as removing null values.

# Data Cleaning - Cleaning data with Pandasdef clean_data(data):
    df = (data)
    # Delete rows containing empty values    (inplace=True)
    return df

# Call the data cleaning functiondf_cleaned = clean_data(data)
print(f"Cleaned data: \n{df_cleaned.head()}")

Step 3: Data conversion

In this step, we structure the data to ensure that the data can be easily stored in the database. For example, we only keep useful columns and convert the data type to the appropriate format.

# Data transformation - Process and structure datadef transform_data(df):
    # Only specific columns are preserved    df_transformed = df[["userId", "id", "title", "body"]]
    # Rename the columns for better understanding    df_transformed.rename(columns={"userId": "user_id", "id": "post_id"}, inplace=True)
    return df_transformed

# Call the data conversion functiondf_transformed = transform_data(df_cleaned)
print(f"Converted data: \n{df_transformed.head()}")

Step 4: Data storage

Finally, we store the data in the SQLite database. SQLite is a lightweight relational database suitable for small projects and tests.

# Data Storage - Save data to SQLite databasedef store_data(df):
    # Create a connection to the SQLite database    conn = ("data_pipeline.db")
    # Store data in a table named 'posts'    df.to_sql("posts", conn, if_exists="replace", index=False)
    # Close the database connection    ()
    print("The data has been successfully stored in the database")

# Call the data storage functionstore_data(df_transformed)

Complete code example

Here is the complete code that brings all the steps together:

import requests
import pandas as pd
import sqlite3

# Data collectiondef fetch_data():
    url = "/posts"
    response = (url)
    if response.status_code == 200:
        data = ()
        return data
    else:
        raise Exception(f"Failed to fetch data: {response.status_code}")

# Data cleaningdef clean_data(data):
    df = (data)
    (inplace=True)
    return df

# Data conversiondef transform_data(df):
    df_transformed = df[["userId", "id", "title", "body"]]
    df_transformed.rename(columns={"userId": "user_id", "id": "post_id"}, inplace=True)
    return df_transformed

# Data storagedef store_data(df):
    conn = ("data_pipeline.db")
    df.to_sql("posts", conn, if_exists="replace", index=False)
    ()
    print("The data has been successfully stored in the database")

# Build a data processing pipelinedef data_pipeline():
    data = fetch_data()
    df_cleaned = clean_data(data)
    df_transformed = transform_data(df_cleaned)
    store_data(df_transformed)

# Run the data processing pipelinedata_pipeline()

Summarize

Through this blog, we learned how to build a simple data processing pipeline using Python. From data collection, data cleaning, data conversion to data storage, we connect each step to achieve a complete data stream. Using Python's Requests, Pandas, and SQLite, we can easily automate data processing and improve the efficiency and accuracy of data analysis.

This is the end of this article about building a simple data processing pipeline in Python. For more related content on building a data processing pipeline in Python, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!