../complete-elt-using-dbt-airflow-and-postgres

Complete ELT using DBT, Airflow, and Postgres

This series started off with wanting to explore dbt a little, and I figured if I was going to do that I should attempt to make something demo-worthy and so the plan is to have a full blown observable dbt on K8s deployment using Airflow for orchestration.

This article is my own personal journey through getting this set up and working. I have tried my best to make this tutorial-like, however if you try to just execute the code blocks sequentially you will not end up with a complete system. If you are attempting to follow along to create the system with me as you go you will need to clone my repo. Now the part I have been struggling with is how to go through the DBT tutorial as the code in the repo will only be in it’s final state and wont show the checkpoints along the way - so I suggest for the dbt part that you create a folder named jaffle-shop at the root of my cloned repo and follow along there, referring to the dbt documentation where necessary.

Tools

There is a bunch of commands that I will probably use throughout this document, I’ve tried to capture them all here in case you do attempt to follow along.

P.S. I’m on MacOS.

Arkade (K8s-related pacakage manager)

  • kubectl
  • kubens
  • kubectx
  • helm
  • yq

Brew (OS package manager)

  • libpq (for psql)
  • direnv

Pipx (Isolated python package manager)

  • sqlfluff (pipx install sqlfluff && pipx inject sqlfluff sqlfluff-templater-dbt)

Installing Minikube

The official documentation is easy enough to follow, except I started Minikube with the following command:

minikube start --memory 8192 --cpus 2

The reason for this is that by default you only get 2048 MiB of RAM and 2 cpus, and that is not enough for my demo which deploys quite a lot of stuff!.

Enabling ingress

At some point we will need to expose the Airflow UI and some other tools so we will use Minikube’s handy command to enable an ingress controller.

minikube addons enable ingress

Just as an aside, the ingress controller can be accessed via the Minikube IP which you can get via the following command:

minikube ip

If you were to execute something like curl $(minikube ip) you should get a 404, this proves that ingress (nginx) is working - although it has no rules and so it just returns a 404.

Configuring kubectl

We will be working in the dok (dbt on Kubernetes) namespace from here on out, so I like to set up kubectl to interact with that namespace without having to specify the namespace every time I execute a command. There is a bunch of ways to do this, but we have the technology and so let’s do this with kubens.

kubectl create namesapce dok
kubens dok

Installing Airflow

Let’s go for an easy win and set up Airflow first, even though it wont be used until the very end.

I intend to eventually be able to deploy this as a single application so I will be building up a wrapper Helm Chart to deploy everything. This does wrap the official Airflow Helm Chart.

Now we need to actually install Airflow into our local Minikube cluster. For that we will use the following Helm command.

helm upgrade --install dbt-on-k8s . \
    --create-namespace \
    --namespace dok \
    -f values.yaml \
    --set "airflow.enabled=true" 

helm upgrade --install is used here because you can use it whether you are updating an existing chart or installing one for the very first time. It’s just better.

Now that we have deployed Airflow (and before we take a look at it) let’s discuss what we have configured in the values.yaml. Just for the sake of completeness the values.yaml is the place where Helm Chart’s expect things to be specified that will be templated into the Kubernetes Manifests.

executor: This has been set to KubernetesExecutor, this means that Airflow will use Kubernetes Pod’s to execute jobs, as opposed to other methods including just executing the command locally in a separate shell.

ingress: This is so that I can navigate to the Airflow Webserver UI, it created a Kubernetes resource that allows ingress into the cluster from the outside via a hostname.

If you are following along you will need to update the ingress.web.hosts.name[0] section as it references MY Minikube private IP address and chances are yours is not the same. If you are interested in how this is working without a real domain you can read more about nip.io here.

You can quickly update the ip with this handy command:

yq -i e ".airflow.ingress.web.hosts.0.name |= \"airflow-$(minikube ip).nip.io\"" values.yaml 

Logging in and checking out the UI

The most exciting part, now you can log in by navigating to your ingress (mine is at https://airflow-192.168.64.5.nip.io/home). You will need to ignore the fact that the certificate is untrusted and then you should see the following page and you can log in with the default user/pass (admin/admin).

airflow-ui

Setting up a data warehouse (Postgres)

This will be our data warehouse (DWH), and yes, I have gone a little overboard - a single Postgres database would have been fine. I however, have opted to deploy a cluster.

I am using this Postgres Operator and basic guide to deploy and manage my Postgres cluster. I am just using an example deployment and much more rigorous configuration should take place before using this in production!

We can now update our deployment to enable the Postgres Operator with the following command. We are installing the operator and deploying a cluster all in one step!

helm upgrade --install dbt-on-k8s . \
    --create-namespace \
    --namespace dok \
    -f values.yaml \
    --set "airflow.enabled=true" \
    --set "postgres-operator.enabled=true" \
    --set "postgres_dwh.enabled=true"

After a few minutes you should have a postgres cluster set up, including a database named dwh with two schemas, dev and prod. We are just setting this up in advance so that we can simulate deploying to dev and prod when we use dbt. I don’t know how ergonomic or necessary this is - but we are doing it anyway!

Any time we interact with the DWH we will probably use the dev schema.

You will know that your cluster is up and ready to go when the STATUS is running.

kubectl get postgresqls

Opening a connection to our Postgres cluster

Alright still more set up to do before getting on with the dbt tutorial. We need to be able to interact with our beautiful Postgres cluster. We have a few options here. We can just set up a port-forward using kubectl or we can expose the database via an Ingress (capital I as this is the Kubernetes Ingress object). I am going to opt for port-forwarding because I am impatient to run dbt. You can set this up as follows:

If you are familiar with Kubernetes, you might venture off on your own (ignoring the documentation) when attempting to port-forward to the Postgres cluster (like I did). This will set you up for failure if you attempt to port-forward the service as the labels are not configured to point to the master. There is a reason for this that I don’t understand at this point. You can reach the cluster through the service when you are inside the cluster, however when you are outside the cluster you need to port-forward to the actual master node ¯\_(ツ)_/¯.

# get name of master pod of acid-minimal-cluster
export PGMASTER=$(kubectl get pods -o jsonpath={.items..metadata.name} -l application=spilo,cluster-name=acid-minimal-cluster,spilo-role=master)

# set up port forward (note we are using port 6432 locally)
kubectl port-forward $PGMASTER 6432:5432

Now that we have the port-forwarding setup we can go ahead and use psql to connect to the cluster.

export PGPASSWORD=$(kubectl get secret dwh-dev-owner-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do -o 'jsonpath={.data.password}' | base64 -d)
export PGSSLMODE=require

# Note the subtle name difference where '-' is replaced with '_'
psql -d dwh -U dwh_dev_owner_user -h localhost -p 6432

You can try out a few things, although there isn’t anything much there! You should see our demo schema though!

# List our tables (there are none)
\dt+

# List our schemas
\dn+

dbt

Finally we made it! We are going to follow along with the dbt tutorial, I will document everything I am doing here, but if something has changed between my writing and you reading (which it inevitably will) you should get along fine by looking at the original dbt tutorial.

We are not using the cloud part of dbt, we will be using dbt-core and the cli.

Install dbt

You need to install the dbt tool. You can find how to here. I have installed it into it’s own python virtualenv, this makes the most sense especially when you are working in a team and you want to make sure everyone is using the same version. Needless to say you should be deploying via CI and it should also have the same version.

I have opted to put the virtualenv at the root of my repo, this helps my IDE of choice (vscode) auto-detect the correct python environment, my dbt package will be nested which would usually not be the case, it’s only because I also have the Helm Chart next to it.

# Create the virtualenv
python -m venv .venv

# This will install the Postgres adapter and dbt-core
pip install dbt-postgres

Creating a dbt project

Following the tutorial this will set us up a cookie-cutter project that we can start working in. Bear in mind that we are also changing into the project directory and all commands from here on out will assume this!

dbt init jaffle_shop
cd jaffle_shop

Setting up some other ‘stuff’

I have opted to set up a bunch of other stuff in the jaffle_shop directory that I would have in a real repository.

The first thing I have set up is the .envrc which is loaded by direnv whenever I change into the directory. This conveniently sets some environment variables containing things like, Postgres settings, database user/pass (extracted from Kubernetes), the Minikube IP etc. I will assume the environment is set up correctly in other parts of the project.

Another thing I have set up is sqlfluff. I don’t have a lot of experience with this, but it is a linter and auto-formatter for SQL that supports dbt’s jinja templating syntax, because I hate having to make decisions about formatting, and having to worry about being consistent with the rest of my team.

But I digress.

Connecting dbt to our DWH

The settings for dbt are, by default, in the home directory in ~/.dbt/profiles.yml. I however, have set the environment variable DBT_PROFILES_DIR to point to the jaffle_shop directory and I will be using a local profiles.yml. As I mentioned above I have set the .envrc to load the user/pass information for our DWH into the environment, it is from the environment that I then get the user/pass information into the profiles.yml like so:

jaffle_shop:
  outputs:

    dev:
      type: postgres
      threads: 30
      host: localhost
      port: 6432
      user: "{{ env_var('DBT_DWH_DEV_USER') }}"
      pass: "{{ env_var('DBT_DWH_DEV_PASSWORD') }}"
      dbname: dwh
      schema: dev
      ...

If you decide not to go for an environment variable based approach then you can extract the password from the kubernetes secret like this:

kubectl get secret dwh-owner-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do -o 'jsonpath={.data.password}' | base64 -d

Note the first bit of the secret name is the user albeit with the - replaced with _.

Now that you have this configuration you can use dbt’s handy command to check that we have everything correct.

# Check the dev target (this is the default so the `-t dev` is superfluous)
dbt debug -t dev
# Check the prod target
dbt debug -t prod

You will need to address any issues that arise. Make sure your port-forward is still running!

Running dbt

Wooo! now we are getting on with actually running dbt. I will probably diverge from the tutorial and instead use it more as a spiritual guide.The reason I am doing this is an attempt to implement dbt’s best practice recommendations from the get-go.

For a bit more information about how I decided to structure my project directory.

To start with we need to do two things, the first is to run dbt seed which will load some raw csv’s into our database (we will need this later).

dbt seed -t dev

Again, the target selection -t dev is unnecessary as it is the default. We have now seeded some data from the csv’s in our seeds/ folder. We will use this to do some modelling with, but first lets check out what happened in the database using the following command (relies on the .envrc file setting some values).

psql postgresql://$DBT_DWH_DEV_USER:$DBT_DWH_DEV_PASSWORD@localhost:6432/dwh

Now that we are in, lets check out our tables!

dwh=> \dt
                List of relations
 Schema |   Name    | Type  |       Owner        
--------+-----------+-------+--------------------
 dev    | customers | table | dwh_dev_owner_user
 dev    | orders    | table | dwh_dev_owner_user
 dev    | payments  | table | dwh_dev_owner_user
(3 rows)

As you can see we have three tables, one for each file that was in our seeds/ directory. Now how did dbt know which schema to put these in and what to name our tables. It puts the tables in the schema specified for the target in profiles.yml in this case dev. It creates the name based off of the file name. So it’s nothing too magical!

Lets seed prod now as well.

You could also seed this data into a schema that both the dev and the prod user have access to, but I was having trouble configuring permissions to allow this with the Kubernetes Postgres Operator, and I didn’t want to have to manually intervene with the DWH to mess with permissions after creation.

dbt seed -t prod

Doing some modelling

First thing, lets configure our sources, this is so that we can reference data that already exists in our DWH i.e. the seed data. In a real DWH you would configure all of your data sources that were loaded in outside of dbt as sources so that you could then reference them using the dbt source macro.

-- models/staging/fake/src_fake.yml
version: 2
sources:
  - name: fake
    schema: "{{ target.schema }}"
    tables:
      - name: orders
      - name: customers
      - name: payments

Here we have declared our sources, we have nested them in the fake folder, which is the name of our source, and we have declared the names of the tables (the ones we seeded). As you can see the schema is specified dynamically depending on the targets schema. If we had a common schema for our seed data we could just hard code it, but we seed our data into the dev and the prod schemas, and so we need this to be dynamic.

So now we have created the ability to reference our source tables in dbt we can make some models. These will be staging models and will do extremely basic rename and selects as per the recommendation. For example lets take a look at how we stage the customers table from the fake source.

-- models/staging/fake/stg_fake__customers.sql
SELECT
    id AS customer_id,
    first_name,
    last_name
FROM
    {{ source(
        'fake',
        'customers'
    ) }}

As you can see we’ve done barely anything, we have used the macro to reference the customer table source, and we have renamed the id column to customer_id.

I also want to point out the naming convention and directory hierarchy. We have models/staging/<SOURCENAME>/<ABBREVIATION_SOURCENAME__TABLENAME>. This is detailed in the links I provided above.

Now lets run dbt and create these source tables, I will be running dbt with a -s argument so that I can select ONLY the staging tables, this is because I have other models that I don’t want to run yet.

dbt run -s jaffle_shop.staging.fake

Log in to your database to check out the new stuff you’ve created!

Everything is created as a view by default so we can just list the views to see all of our staged tables!

dwh=> \dv
                    List of relations
 Schema |        Name         | Type |       Owner        
--------+---------------------+------+--------------------
 dev    | stg_fake__customers | view | dwh_dev_owner_user
 dev    | stg_fake__orders    | view | dwh_dev_owner_user
 dev    | stg_fake__payments  | view | dwh_dev_owner_user
(3 rows)

Making our first model

Lets add the following model to models/customers.sql.

-- models/customers.sql
with customers as (

    select
        id as customer_id,
        first_name,
        last_name

    from {{ ref('raw_customers') }}

),

orders as (

    select
        id as order_id,
        user_id as customer_id,
        order_date,
        status

    from {{ ref('raw_orders') }}

),

customer_orders as (

    select
        customer_id,

        min(order_date) as first_order_date,
        max(order_date) as most_recent_order_date,
        count(order_id) as number_of_orders

    from orders

    group by 1

),


final as (

    select
        customers.customer_id,
        customers.first_name,
        customers.last_name,
        customer_orders.first_order_date,
        customer_orders.most_recent_order_date,
        coalesce(customer_orders.number_of_orders, 0) as number_of_orders

    from customers

    left join customer_orders using (customer_id)

)

select * from final
EOF

Now I did have to modify the references here for the tables in the from statements. I have replaced this with the {{ ref('table_name') }} syntax.

Lets execute our models again and hopefully we will have a customers view created!

dbt run

Let’s check it out using psql

select * from customers limit 5;
 customer_id | first_name | last_name | first_order_date | most_recent_order_date | number_of_orders
-------------+------------+-----------+------------------+------------------------+------------------
           1 | Michael    | P.        | 2018-01-01       | 2018-02-10             |                2
           2 | Shawn      | M.        | 2018-01-11       | 2018-01-11             |                1
           3 | Kathleen   | P.        | 2018-01-02       | 2018-03-11             |                3
           4 | Jimmy      | C.        |                  |                        |                0
           5 | Katherine  | R.        |                  |                        |                0
(5 rows)

Delete the examples and cleanup the dbt_project.yml

Just quickly lets delete the example models that exist.

rm -rf models/examples

And clean up the reference to them in the models.jaffle_shop section of dbt_project.yml. this should result in the following:

# ...
models:
  jaffle_shop:

Creating some more models

Now we are going to go a bit further and stage our raw tables, we are also going to rename some columns, don’t worry this is all as per the dbt tutorial!

-- models/stg_customers.sql
select
    id as customer_id,
    first_name,
    last_name
from {{ ref('raw_customers') }}
-- models/stg_orders.sql
select
    id as order_id,
    user_id as customer_id,
    order_date,
    status
from {{ ref('raw_orders') }}

Now that we have this we are actually going to reference these tables in our original customers.sql model to use these staged tables instead!

-- models/customers.sql
with customers as (

    select * from {{ ref('stg_customers') }}

),

orders as (

    select * from {{ ref('stg_orders') }}

),

customer_orders as (

    select
        customer_id,

        min(order_date) as first_order_date,
        max(order_date) as most_recent_order_date,
        count(order_id) as number_of_orders

    from orders

    group by 1

),


final as (

    select
        customers.customer_id,
        customers.first_name,
        customers.last_name,
        customer_orders.first_order_date,
        customer_orders.most_recent_order_date,
        coalesce(customer_orders.number_of_orders, 0) as number_of_orders

    from customers

    left join customer_orders using (customer_id)

)

select * from final

And now we can create this all.

dbt run

Running dbt tests

This is a bit of a selling point for dbt in my opinion, baked in ability to write tests for your DWH. Any type of testing can increase our confidence around the pipelines we have created and this is a great first step.

So lets go ahead and define our tests!

-- models/schema.yml
version: 2

models:
  - name: customers
    description: One record per customer
    columns:
      - name: customer_id
        description: Primary key
        tests:
          - unique
          - not_null
      - name: first_order_date
        description: NULL when a customer has not yet placed an order.

  - name: stg_customers
    description: This model cleans up customer data
    columns:
      - name: customer_id
        description: Primary key
        tests:
          - unique
          - not_null

  - name: stg_orders
    description: This model cleans up order data
    columns:
      - name: order_id
        description: Primary key
        tests:
          - unique
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['placed', 'shipped', 'completed', 'return_pending', 'returned']

And run them! which is so so simple. Now there is no excuses for not writing tests and checks!

dbt test

Examine the dbt docs

You may have noticed that just above in the models/schema.yaml we included some descriptions of various things including columns and models. This will help be sucked up into the documentation when we generate it. This is pretty amazing and another awesome selling point of dbt.

dbt docs generate
dbt docs serve

This should have opened your default browser, otherwise you can navigate to it via http://localhost:8080 and check out all of the cool stuff, namely the lineage graph:

lineage

Finally merge your branch back in to your main branch via a Pull Request / Merge Request.

Tags: k8s airflow dbt