Kubernetes is a container orchestration system for automating deployment, scaling, and management of containerized applications. Dagster uses Kubernetes in combination with Helm, a package manager for Kubernetes applications. Using Helm, users specify the configuration of required Kubernetes resources to deploy Dagster through a values file or command-line overrides. References to values.yaml
in the following sections refer to Dagster's values.yaml
.
Dagster publishes a fully-featured Helm chart to manage installing and running a production-grade Kubernetes deployment of Dagster. For each Dagster component in the chart, Dagster publishes a corresponding Docker image on DockerHub.
kubectl
should be configured with your desired Kubernetes cluster. You should understand the basics of Helm, and Helm 3 should be installed. If you are creating your own user code images, Docker should be installed as well.
The Dagster Helm chart is versioned with the same version numbers as the Dagster Python library, and ideally should only be used together when the version numbers match.
In the following tutorial, we install the most recent version of the Dagster Helm chart. To use an older version of the Chart, a --version
flag can be passed to helm upgrade
. If you are using a chart version before 0.11.13, you will also need to update the tags of the Dagster provided images to match the Chart version. After 0.11.13, this will automatically be done for you.
Component Name | Type | Image |
---|---|---|
Daemon | Deployment | dagster/dagster-celery-k8s (released weekly) |
Dagit | Deployment behind a Service | dagster/dagster-celery-k8s (released weekly) |
Database | PostgreSQL | postgres (Optional) |
Run Worker | Job | User-provided or dagster/user-code-example (released weekly) |
User Code Deployment | Deployment behind a Service | User-provided or dagster/user-code-example (released weekly) |
The daemon periodically checks the Runs table in PostgreSQL for Pipeline Runs in that are ready to be launched. The daemon also runs the dagster-native scheduler, which has exactly-once guarantees.
The Daemon launches the run via the K8sRunLauncher
, creating a Run Worker Job with the image specified in the User Code Deployment.
The Dagit webserver communicates with the User Code Deployments via gRPC to fetch information needed to populate the Dagit UI. Dagit does not load or execute user-written code to ensure robustness, and will remain available even when user code contains errors. Dagit frequently checks whether the User Code Deployment has been updated; and if so, the new information is fetched.
Dagit can be horizontally scaled by setting the dagit.replicaCount
field in the values.yaml
.
By default, it is configured with a K8sRunLauncher
, which creates a new Kubernetes Job per pipeline run.
The user can connect an external database (i.e. using a cloud provider's managed database service, like RDS) or run PostgreSQL on Kubernetes. This database stores Pipeline Runs, Events, Schedules, etc and powers much of the real-time and historical data visible in Dagit. In order to maintain a referenceable history of events, we recommend connecting an external database for most use cases.
The Run Worker is responsible for executing the solids in topological order. The Run Worker uses the same image as the User Code Deployment at the time the run was requested. The Run Worker uses ephemeral compute, and completes once the run is finished. Events that occur during the run are written to the database, and are displayed in Dagit.
The Run Worker jobs and pods are not automatically deleted so that users are able to inspect results. It is up to the user to delete old jobs and pods after noting their status.
The Executor determines how the run worker will execute each step of a pipeline. Different executors offer different levels of isolation and concurrency. Common choices are in_process_executor
(all steps run serially in a single process in the single pod), multiprocess_executor
(multiple processes in the single pod), and kubernetes-job (each step runs in a separate pod). For the latter, you can choose between the celery_k8s_job_executor
, and the k8s_job_executor
which removes the Celery dependency. Generally, increasing isolation incurs some additional overhead per step (e.g. starting up a new Kubernetes job vs starting a new process within a pod). Different executors can be configured per-run in the execution
block.
A User Code Deployment runs a gRPC server and responds to Dagit's requests for information (such as: "List all of the pipelines in each repository" or "What is the dependency structure of pipeline X?"). The user-provided image for the User Code Deployment must contain a repository definition and all of the packages needed to execute within the repository.
Users can have multiple User Code Deployments. A common pattern is for each User Code Deployment to correspond to a different repository.
This component can be updated independently from other Dagster components, including Dagit. As a result, updates to repositories can occur without causing downtime to any other repository or to Dagit. After updating, if there is an error with any repository, an error is surfaced for that repository within Dagit; all other repositories and Dagit will still operate normally.
We'll use docker-desktop to set up a local k8s cluster to develop against; feel free to substitute with another k8s cluster as desired.
First, configure the kubectl
CLI to point to the local k8s cluster set up by docker-desktop
.
$ kubectl config set-context dagster --namespace default --cluster docker-desktop --user=docker-desktop
$ kubectl config use-context dagster
Skip this step if using Dagster's example User Code image dagster/user-code-example.
Build a Docker image containing your Dagster repository and any dependencies needed to execute the business logic in your code.
For reference, here is an example Dockerfile and the corresponding User Code directory. Here, we install all the Dagster-related dependencies in the Dockerfile, and then copy over the directory with the implementation of the Dagster repository into the root folder. We'll need to remember the path of this repository in a subsequent step to setup the gRPC server as a deployment.
For projects with many dependencies, it is recommended that you publish your Python project as a package and install that package in your Dockerfile.
Skip this step if using Dagster's example User Code image.
Publish the image to a registry that is accessible from the Kubernetes cluster, such as AWS ECR or DockerHub.
The dagster/user-code-example uses an S3 IO Manager.
Therefore, if you'd like to run the pipeline in the default
mode, you'll need an AWS S3 bucket available, and access to a pair of AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
values. This is because the IO Manager uses boto.
This tutorial also has the option of using minio
to mock an S3 endpoint locally in K8s. Note that this option utilizes host.docker.internal
to access a host from within Docker - this behavior has only been tested for MacOS, so may need different configuration for other platforms.
Skip this step if you'd like to use minio for a local S3 endpoint
If using S3, create a bucket in your AWS account -- for this tutorial, we'll create a bucket called test-bucket
. Also, keep your AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
credentials handy. Now, you can create your k8s secrets:
$ kubectl create secret generic dagster-aws-access-key-id --from-literal=AWS_ACCESS_KEY_ID=<YOUR ACCESS KEY ID>
$ kubectl create secret generic dagster-aws-secret-access-key --from-literal=AWS_SECRET_ACCESS_KEY=<SECRET ACCESS KEY>
Skip this step if you're using AWS S3
First, set up minio locally:
brew install minio/stable/minio # server
brew install minio/stable/mc # client
mkdir $HOME/miniodata # Prepare a directory for data
minio server $HOME/miniodata # start a server with default user/pass and no TLS
mc --insecure alias set minio http://localhost:9000 minioadmin minioadmin
# See it work
mc ls minio
date > date1.txt # create a sample file
mc cp date1.txt minio://testbucket/date1.txt
export AWS_ACCESS_KEY_ID="minioadmin"
export AWS_SECRET_ACCESS_KEY="minioadmin"
# See the aws cli work
aws --endpoint-url http://localhost:9000 s3 mb s3://test-bucket
aws --endpoint-url http://localhost:9000 s3 cp date1.txt s3://test-bucket/
Now, create your k8s AWS secrets:
$ kubectl create secret generic dagster-aws-access-key-id --from-literal=AWS_ACCESS_KEY_ID=minioadmin
$ kubectl create secret generic dagster-aws-secret-access-key --from-literal=AWS_SECRET_ACCESS_KEY=minioadmin
The Dagster chart repository contains the versioned charts for all Dagster releases. Add the remote url under the namespace dagster
to install the Dagster charts.
$ helm repo add dagster https://dagster-io.github.io/helm
Update the dagster-user-deployments.deployments
section of the Dagster chart's values.yaml
to include your deployment. Here, we can specify the configuration of the Kubernetes Deployment that will create the gRPC server for Dagit and the Daemon to access the User Code. The gRPC server is created through the arguments passed to dagsterApiGrpcArgs
, which expects a list of arguments for dagster api grpc
.
To get access to the Dagster values.yaml
, run:
$ helm show values dagster/dagster > values.yaml
The following snippet works for Dagster's example User Code image. Since our Dockerfile contains the repository definition in a path, we specify arguments for the gRPC server to find this path under dagsterApiGrpcArgs
. Note that if you haven't set up an S3 endpoint, you can only run the pipeline in test
mode.
dagster-user-deployments:
enabled: true
deployments:
- name: "k8s-example-user-code-1"
image:
repository: "docker.io/dagster/user-code-example"
tag: latest
pullPolicy: Always
dagsterApiGrpcArgs:
- "--python-file"
- "/example_project/example_repo/repo.py"
port: 3030
dagsterApiGrpcArgs
also supports loading repository definitions from a package name. To find the applicable arguments, read here.
default
mode (Optional)#You'll need a slightly different configuration to run the default
mode as well. This is because the user code uses an AWS S3IOManager
in the default
mode, and therefore you'll need to provide the user code k8s pods with AWS S3 credentials.
See the set up S3 section for setup instructions. The below snippet works for both AWS S3 and a local S3 endpoint via minio
.
dagster-user-deployments:
enabled: true
deployments:
- name: "k8s-example-user-code-1"
image:
repository: "docker.io/dagster/user-code-example"
tag: latest
pullPolicy: Always
dagsterApiGrpcArgs:
- "--python-file"
- "/example_project/example_repo/repo.py"
port: 3030
envSecrets:
- name: dagster-aws-access-key-id
- name: dagster-aws-secret-access-key
runLauncher:
type: K8sRunLauncher
config:
k8sRunLauncher:
envSecrets:
- name: dagster-aws-access-key-id
- name: dagster-aws-secret-access-key
Install the Helm chart and create a release. Below, we've named our release dagster
. We use helm upgrade --install
to create the release if it does not exist; otherwise, the existing dagster
release will be modified:
helm upgrade --install dagster dagster/dagster -f /path/to/values.yaml
Helm will launch several pods including PostgreSQL. You can check the status of the installation with kubectl
- note that it might take a few minutes for the pods to move to a Running
state.
If everything worked correctly, you should see output like the following:
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
dagster-dagit-645b7d59f8-6lwxh 1/1 Running 0 11m
dagster-k8s-example-user-code-1-88764b4f4-ds7tn 1/1 Running 0 9m24s
dagster-postgresql-0 1/1 Running 0 17m
After Helm has successfully installed all the required kubernetes resources, start port forwarding to the Dagit pod via:
export DAGIT_POD_NAME=$(kubectl get pods --namespace default \
-l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=dagit" \
-o jsonpath="{.items[0].metadata.name}")
kubectl --namespace default port-forward $DAGIT_POD_NAME 8080:80
Now try running a pipeline. Visit http://127.0.0.1:8080, navigate to the playground, select the test
mode, and click Launch Execution.
solids:
multiply_the_word:
config:
factor: 0
inputs:
word: ""
You can introspect the jobs that were launched with kubectl
:
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
dagster-run-5ee8a0b3-7ca5-44e6-97a6-8f4bd86ee630 1/1 4s 11s
Now, you can try a full run. Using the default
mode, provide the following config:
resources:
io_manager:
config:
s3_bucket: "test-bucket"
solids:
multiply_the_word:
config:
factor: 0
inputs:
word: ""
# Go to the playground and prepare a configuration
resources:
io_manager:
config:
s3_bucket: "test-bucket"
s3:
config:
# This use of host.docker.internal is unique to Mac
endpoint_url: http://host.docker.internal:9000
region_name: us-east-1
solids:
multiply_the_word:
config:
factor: 0
inputs:
word: ""
Again, you can view the launched jobs:
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
dagster-run-5ee8a0b3-7ca5-44e6-97a6-8f4bd86ee630 1/1 4s 11s
dagster-run-733baf75-fab2-4366-9542-0172fa4ebc1f 1/1 4s 100s
Some of the following commands will be useful if you'd like to debug issues with deploying on Helm:
# Get the Dagit pod's name
$ export DAGIT_POD_NAME=$(kubectl get pods --namespace default \
-l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=dagit" \
-o jsonpath="{.items[0].metadata.name}")
# Start a shell in the dagit pod
$ kubectl exec --stdin --tty $DAGIT_POD_NAME -- /bin/bash
# Get debug data from $RUN_ID
$ kubectl exec $DAGIT_POD_NAME -- dagster debug export $RUN_ID debug_info.gzip
# Get a list of recently failed runs
$ kubectl exec $DAGIT_POD -- dagster debug export fakename fakename.gzip
# Get debug output of a failed run
$ kubectl exec $DAGIT_POD -- dagster debug export 360d7882-e631-4ac7-8632-43c75cb4d426 debug.gzip
# Extract the debug.gzip from the pod
$ kubectl cp $DAGIT_POD:debug.gzip debug.gzip
# List config maps
$ kubectl get configmap # Make note of the "user-deployments" configmap
$ kubectl get configmap dagster-dagster-user-deployments-$NAME
We deployed Dagster, configured with the default K8sRunLauncher
, onto a Kubernetes cluster using Helm.