Data extraction, transformation, and loading (ETL) is a process in which data is extracted from a source, transformed into a format that is suitable for analysis, and then loaded into a destination. ETL is a critical step in the data pipeline because it ensures that the data is clean, consistent, and ready for analysis.

We will use the following tools to do ETL:

  • PostgreSQL: postgres is an open-source relational database management system that is widely used for storing and managing data. We will use postgres to store the data that we extract, transform, and load.
  • dbt: dbt is a command-line tool that enables data analysts and engineers to transform data in their warehouse more effectively. dbt allows you to write SQL queries to transform your data and then run those queries in a reproducible and scalable way.
  • Airflow: Apache Airflow is an open-source platform that allows you to programmatically author, schedule, and monitor workflows. We will use Apache Airflow to orchestrate our ETL process and ensure that it runs smoothly.
  • basic cloud services knowledge: we will use basic cloud services such as AWS S3, Google Cloud Storage, or Azure Blob Storage or DigitalOcean Spaces to store the data that we extract, transform, and load.

Step 0: upload data to the cloud

As a machine learning engineer or data scientist, most of the time you will be teaming up with data base engineers or data engineers to build data pipelines. Very often, data warehouse is given to you by the data engineers. For big firms, raw data is normally streamed in and then stored in a data warehouse. For small firms, raw data is normally stored in a cloud storage service such as AWS S3, Google Cloud Storage, or Azure Blob Storage or DigitalOcean Spaces.

Again, most of time you do not need to know how to upload big data to the cloud. However, it is good to know how to do it. For a large data file like parquet or csv, they are normally stored in a s3 bucket. In this case, I recommend you to use either awscli or s3cmd to upload the data to the cloud. Here are two references you can use to learn how to use s3cmd:

Sometimes, you might have a small data file like csv or json. In this case, you can use psycopg2 to upload the data to the cloud. Here is my project structure:

(venv) ➜  dbt-postgresql tree -L 1
.
├── Holidu  # dbt folder
├── data
├   ├── prices-data-2022.csv
├── logs
├── poetry.lock
├── pyproject.toml
├── src
├   ├── upload_data.py
└── venv

We will use upload_data.py as a small example to show you how to upload data to the postgres database on the cloud. For most of the data, the SQL script we will run via psycopg2 will could do the data type inference for you. However, for prices-data-2022.csv (UK house price data), we do not have the column names in the first row. Therefore, we need to specify the column names and data types in the SQL script. Here is the content of upload_data.py:

import os
import timeit
import psycopg2
import pandas as pd
from loguru import logger
from dotenv import load_dotenv


load_dotenv()


db_host = os.getenv("POSTGRESQL_DB_HOST")
db_user = os.getenv("POSTGRESQL_DB_USER")
db_password = os.getenv("POSTGRESQL_DB_PASSWORD")
database_name = "Holidu"  # Database name


def upload_data():
    # read data and do not use the first column as index
    df = pd.read_csv("../data/prices-data-2020.csv", header=None)
    print(df.head())

    # create connection
    pg_conn = psycopg2.connect(
        dbname=database_name, user=db_user, password=db_password, host=db_host
    )
    cur = pg_conn.cursor()

    sql = """
    CREATE TABLE IF NOT EXISTS public.uk_house_price (
        transaction_id text PRIMARY KEY,
        price text,
        date_of_transfer text,
        postcode text,
        property_type text,
        old_or_new text,
        duration text,
        paon text,
        saon text,
        street text,
        locality text,
        town_or_city text,
        district text,
        county text,
        ppd_category_type text,
        record_status text
    )
    """

    cur.execute(sql)
    pg_conn.commit()


    st_time = timeit.default_timer()
    # upload data
    with open("../data/prices-data-2020.csv", "r") as f:
        # useing copy_expert to upload data
        # there is no header in the csv file
        cur.copy_expert("COPY uk_house_price FROM STDIN WITH CSV", f)

    pg_conn.commit()
    cur.close()
    ed_time = timeit.default_timer()
    
    logger.info(f"Time taken to upload data: {ed_time - st_time:.2f} seconds")

    pg_conn.close()

    logger.info("Data uploaded successfully")


if __name__ == "__main__":
    upload_data()

For a csv file of size around 100 MB, it takes around 91 seconds to upload the data to the postgres database on the cloud. This is way slower than using s3cmd to upload the data to the cloud. One could also use dbt seed to upload the data to the database. However, it is even slower than using psycopg2 to upload the data to the database. Therefore, I recommend you to use s3cmd to upload the data to the cloud.

ETL with dbt

Assume that you have already created a database called Holidu on the cloud and you have already uploaded the data to the database. After installing dbt, it is a good habit to set up the environment variables in the .env file. Here is the content of the .env file:

POSTGRESQL_DB_HOST=your_postgres_host
POSTGRESQL_DB_USER=your_postgres_user
POSTGRESQL_DB_PASSWORD=your_postgres_password
POSTGRESQL_DB_NAME=Holidu

Please make sure there is no space around the = sign. Then we could run the following command to source the .env file:

source .env
# please open a new terminal to run the following command
dbt init Holidu
cd Holidu

Now go to ~/.dbt/profiles.yml and add the following content:

Holidu:
  outputs:
    dev:
      dbname: Holidu
      host: ""
      pass: ""
      port: 5432
      schema: public
      threads: 5
      type: postgres
      user: ""
  target: dev

Then we could run the following command to create a new model:

# at the Holidu folder
dbt run
# you should see 'All checks passed!'

Here is my project structure:

(venv) ➜  Holidu tree -L 1
.
├── README.md
├── analyses
├── dbt_project.yml
├── logs
├── macros
├── models
├── seeds
├── snapshots
├── target
└── tests

Key concepts of dbt Models

  • Sources: Raw data tables/views in your warehouse
  • Models: Transformation of raw data into business logic
  • Snapshots: Historical records of your data
  • Seeds: Raw data that is version controlled in your dbt project
  • Tests: Assertions that your data meets expectations
  • Documentation: Descriptions of your data and models

The core of dbt is the model. Models are SQL select statements that transform your data in some way. dbt models are defined in .sql files in the models/ directory. You might wonder why do we need to write SQL in separate files? The answer is that dbt is a tool that allows you to build a data transformation pipeline in a modular way. By writing SQL in separate files, you can build up a library of SQL transformations that can be reused across your project.

If you have already did some data science projects, you might have encountered the problem of copy-pasting SQL code across different notebooks, or have to rewrite some SQL code because you forgot how you did it last time. If your project spans multiple data sources, it is very easy to lose track of how your data is transformed. dbt solves this problem by allowing you to define your data transformations in a modular way, and then build up a DAG of dependencies between your models with Airflow.