A few weeks with Dagster

My first official venture into the world of data pipelines and ML Ops

A few weeks with Dagster

I have recently been setting up my first ML Ops infrastructure. The space is a bit of a minefield, with so many components including data pipeline tools, feature stores, model registries, model serving tools, data monitoring and more.

🤯

You have to start somewhere and after doing a bit of thinking and planning, I decided starting with a solid data pipeline tool would be a good place to start. The space is a bit scattered right now. I am sure it will look very different in a couple of years time, but one of the more modern tools that seems to be getting tracking is Dagster.

Here are some gotchas that I had to deal with when setting up Dagster for the first time. I think I am mostly writing them here so I can refer back to them if I need to setup Dagster again one day!

Disclaimer: I am no infrastructure or data engineer expert. So there is a very good chance that (1) my points below are super obvious to you, and (2) there are much easier ways of achieving the same thing. If so, drop me a line and let me know! 🤗

TLD;DR

Bloody loving Dagster. Highly recommended. The documentation is a bit lacking as they are moving pretty quickly and the open source deployment guide could do with a bit of TLC, but I am impressed so far.

Deploying Dagster

Unsurprisingly, the Dagster team are putting a fair bit of focus on encouraging you to use their cloud offering, however, I didn't really have any budget for that, and so ended up going with the open source version, deployed using Helm to our Kubernetes cluster.

Documentation

Although the Dagster documentation is OK, the product is moving super quickly and as such the documentation is always running behind the actual product. Moreover, the "deployment" section of the documentation is further lacking given that it completely independent of the Cloud offering.

It probably doesn't help that I am no Helm expert either 😆.

Referencing the user repo correctly

To specify the location for all your Dagster definitions, etc you need to specify dagsterApiGrpcArgs in your values.yaml file. using python modules seems to be the new done thing, but most of the guides example still reference loading definitions from a python file. Anyway, to use module based deployments your values.yaml for your chart deployment should look like this:

dagsterApiGrpcArgs:
    - "-m"
    - "dagster_user_code"

where your Dockerfile will probably end with something like this

COPY dagster_user_code ./dagster_user_code

Important to note the arguments to dagsterApiGrpcArgs are different to the command line arguments for dagster, so watch out.

Setting image tag for user code deployment

If you are like me, you are using some CI/CD tool to:

  • build your Dagster user code's docker image with some sensible image tag.
  • push it to some container registry (like Gitlab container registery)
  • and use that image to serve your user code via Helm.

However, annoyingly the Helm chart Dagster provides does not give you a variable that you can use to set the image tag to look for, so instead you have to set this manually at the command line when you deploy your chart.

The bit I was struggling with was that the deployments attribute in the values.yaml is a list, so you have to specify the index, e.g. your helm upgrade/install command will look like this:

helm upgrade .... --set deployments[0].image.tag=$IMAGE_TAG_FROM_EARLIER_CI_CD_STEP dagster/dagster-user-deployment -f values.yaml

I spent way too much time figuring this out when I was initially working on this 🙈.

Sharing PostgreSQL and stop writing to the public schema

I didn't see the value of spinning up a new AWS RDS instance for Dagster specifically, so I decided to use our existing RDS instance. However, if you are not careful, Dagster will just add its tables to your public schema (this definitely didn't happen to me 🙈).

Instead you have got to specify the search path in your values.yaml file, e.g.

postgresql:
  enabled: false # <----- Disable the default postgresql deployment. However, you still need to set the settings below
  ...
  postgresqlParams: {
    options: "-c search_path=dagster",  # <----- only look into dagster schema
  }

of course don't forget to set the host, etc and add the credentials to dagster-postgres-credentials secret (which you might want to manually create.)

I wish this alternative was the default.

Seperating out the user deployment from the Dagster deployment

From my understanding, at the bare minimum, Dagster needs three components

  1. Dagster daemon: this is the component that runs your pipelines, spins up new pods, etc.
  2. Dagster user code: this is the component that contains your Dagster definitions, etc. You could have multiple of these, e.g. for different projects and teams
  3. Daggit: This is the visual component that lets you view the DAGs, re-run jobs and so on.

By default the Dagster values.yaml has support for dagster-user-deployments. If you use that deployment, every time you want to deploy your user code, you have to deploy the dagster daemon and daggit. Sorry, what? Why? I wish it was just removed and was not even an option.

So what you need to do is to use a separate chart to deploy dagser user code (the chart is called dagster/dagster-user-code), and then disable the dagster-user-deployments in the dagster/dagster chart, but you disable it like this:

dagster-user-deployments:
  enabled: true  # <----- Set this to true, even though you are disabling it
  enableSubchart: false  # <----- this is what actually disables the actual user deployment.

I don't know if I missed a memo here, but I found this super confusing.

Dagster definitions, etc

Solids and Repositories

Dagster have recently had some major API changes over its lifetime, out are solids and repositories, and in are assets and definitions. E.g.

In 1.1.6, we introduced Definitions, which replaces repositories. While repositories will continue to work, we recommend migrating to Definitions. Refer to the Code locations documentation for more info.

This means you cannot rely on a lot of old tutorials and code example. As an added bonus, since ChatGPT was trained using data from back in 2021, you certainly cannot use it for any helpful Dagster tips 😆.

So good luck with that.

Online serving of assets

IO Managers are components of Dagster that manage materialisation of assets. Something like S3 is a sensible storage space for your assets.

But remember I am using Dagster as part of my ML Ops - so one of my assets corresponds to parameters that are used by my model and I don't want to load the whole asset if I am interested in a subset of those parameters.

This is where a database based IO manager comes in. I can write 1000's of rows to the database, but during online serving, I can load the particular rows I am interested in.

Data pipeline .... -> S3-backed asset  -> Postgres-backed asset
                            |                       | 
                        off-line serving     online serving

But guess what, there is no Postgres IO manager in Dagster. There is a DBIOManager, but that's marked as private, so I decided to roll out my own. There are a couple of gottachs though that I discuss below.

I wish this was officially supported.

contexts

There are 3 types of contexts that I have had to deal with in Dagster, there may be more. I wish they had more similar APIs, but sadly they don't.

  • OutputContext: the context during writing an asset in say an IO manager
  • InputContext: the context during loading assets in say an IO manager
  • AssetExecutionContext: the context during asset materialisation

Watch out for which one you have!

asset_partitions_time_window

If you are partitioning your assets by a time window, there is a handy property that gives you the time window. However, be aware, if you don't have a time window partition it raises an exception.

I have not found a consistent way of checking to see if I have a time partition, I have had to manually check the instance of my partitions definition:

def has_timebased_partitions(partition_definition: PartitionsDefinition) -> bool:
    if isinstance(partition_definition, TimeWindowPartitionsDefinition):
        return True
    if isinstance(partition_definition, MultiPartitionsDefinition):
        for partition in partition_definition.partitions_defs:
            if isinstance(partition.partitions_def, TimeWindowPartitionsDefinition):
                return True

    return False

Notice that if you have a MultiPartitionsDefinition, you have to check each of the underlying definitions explicitly.

Moreover, if you have MultiPartitionsDefinition, and you have access to the OutputContext or an AssetExecutionContext, you can simply ask for context.asset_partitions_time_window, but for an InputContext, you have to query the asset_partitions_time_window on the actual time based partition:

if has_timebased_partitions(context.asset_partitions_def):
  # for output context dagster lets me get the time window from context
  # in multidimension case but for input context it does not - thanks Dagster.
  if isinstance(context.asset_partitions_def, MultiPartitionsDefinition):
      partition_time_window = context.asset_partitions_def.time_window_for_partition_key(
          context.partition_key
      )
  else:
      partition_time_window = context.asset_partitions_time_window
else:
  partition_time_window = None

Not sure if this is an oversight or a requirement that I am missing, but there it is.

Partition "dimension" names

Another quirk to note is that if you have a single dimension partition, there isn't an explicit name for it, whereas for multi partitions, each dimension has a name. This makes it a bit awkward because you have to write the code that handles both cases. For the single dimension case I have ended up setting a dimension name on the metadata :

@asset(
    io_manager_key="postgres_io_manager",
    partitions_def=MonthlyPartitionsDefinition(start_date=START_OF_TIME, end_offset=1),
    metadata={"table": "....", "dimension": "created_at"},
)

Partition Mapping and typing of assets

These are pretty handy. For me, I am interested in doing a rolling average of some of my metrics, and with these, I can load a few more partitions to manipulate in my operation, so I have this asset definition:

@asset(
    io_manager_key="io_manager",
    partitions_def=MonthlyPartitionsDefinition(start_date=START_OF_TIME, end_offset=1),
    ins={
        "deliveries": AssetIn(
            metadata={"allow_missing_partitions": True},
            partition_mapping=TimeWindowPartitionMapping(start_offset=-6),
        )
    },
)

However, note that the typing of the input asset will suddenly be different - e.g. if each partition of your input asset is pandas.DataFrame, then your input asset for this new asset will be a dictionary of pandas.DataFrame's.

However, if you have allow_missing_partitions=True then you could get just a pandas.DataFrame if there is only one partition, so you end up with code like this:

  if isinstance(deliveries, dict):
      deliveries_df = pd.concat(deliveries.values())
  else:
      deliveries_df = cast(pd.DataFrame, deliveries)

end_offset=1

Ooh, this one drove me crazy for good a good few minutes. By default, the MonthlyPartitionsDefinition will create partitions excluding the current month. If you want data from the current month to be included, you have to set end_offset=1.

Summary

It's early days in my Dagster journey, so I am not sure how it will go. I am particularly concerned about major changes being introduced in their API that will make my code obsolete. As I mentioned they have had some major changes from solids to assets and repositories to definitions.

Other users have also raised concerns about the open source vs cloud offering, and Dagster team have not committed to keeping all the components free, but hopefully all the core features will stay free or at a reduced cost for self-hosted versions.

Anyway, I am committed now, so let's see how it goes.

Good luck to the Dagster team!

p.s. if you do have any comments about my issues above, please please do reach out!