Skip to navigationSkip to main contentSkip to footerScaleway Docs

Managing Serverless Jobs using Dagster

dagster
python
sdk
orchestrator

Dagster is an orchestrator designed for developing and maintaining data assets, such as tables, data sets, machine learning models, and reports.

It allows you to declare jobs that you want to run and the data assets that those jobs produce or update. Dagster then helps you run your jobs at the right time and keep your assets up-to-date.

In this tutorial, we will create a Serverless Job using Dagster to scrape the content of the Hacker News website to retrieve the title, author, and URL of the top articles.

First, we will initialize the Dagster project locally and add the scraper's source code to it. Then, we will set up the environment, build the image and push it to the Scaleway registry. Finally, we will execute Dagster locally to scrape the Hacker News top stories through the Dagster UI.

Before you start

To complete the actions presented below, you must have:

Creating the Dagster project

We will install Dagster using the pip package installer. Refer to the Dagster documentation for more information or alternative installation methods.

  1. Run the following command to install the latest Dagster:

    pip install dagster dagster-webserver
  2. Run following command to create the Dagster project:

    dagster project scaffold --name my-dagster-project
    cd my-dagster-project
  3. Update the install_requires list in the setup.pyfile with the code below to add the required dependencies to the project:

    install_requires=[
        "dagster",
        "dagster-cloud",
        "dagster_aws",
        "dagster_scaleway~=0.1.3",
        "dagster_postgres",
        "pandas",
        "scaleway",
        "pendulum==2.0.3",
    ],
  4. Run the following command to install the dependencies locally:

    pip install .
  5. Create the Dockerfile to build the container image:

    cat <<EOF > Dockerfile
    FROM python:3.11-slim-bookworm
    WORKDIR /app
    COPY . .
    RUN pip install .
    EOF
  6. Copy the scraper code below and paste it into the my-dagster-project/assets.py file:

    import pandas as pd
    import requests
    import os
    
    from dagster_aws.s3 import S3PickleIOManager, S3Resource
    
    from dagster import Definitions, asset, MetadataValue, Output
    import scaleway
    
    client = scaleway.Client.from_config_file_and_env()
    
    S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
    
    
    @asset
    def hackernews_top_story_ids():
        """
        Get top stories from the HackerNews top stories endpoint.
        API Docs: https://github.com/HackerNews/API#new-top-and-best-stories
        """
        top_story_ids = requests.get(
            "https://hacker-news.firebaseio.com/v0/topstories.json"
        ).json()
        return top_story_ids[:10]
    
    
    # Asset dependencies can be inferred from parameter names
    @asset
    def hackernews_top_stories(hackernews_top_story_ids):
        """Get items based on story IDs from the Hacker News items endpoint"""
        results = []
        for item_id in hackernews_top_story_ids:
            item = requests.get(
                f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
            ).json()
            results.append(item)
    
        df = pd.DataFrame(results)
    
        # The recorded metadata can be customized
        metadata = {
            "num_records": len(df),
            "preview": MetadataValue.md(df[["title", "by", "url"]].to_markdown()),
        }
    
        return Output(value=df, metadata=metadata)
    
    
    defs = Definitions(
        assets=[hackernews_top_story_ids, hackernews_top_stories],
        resources={
            "io_manager": S3PickleIOManager(
                s3_resource=S3Resource(
                    region_name="fr-par",
                    endpoint_url="https://s3.fr-par.scw.cloud",
                    aws_access_key_id=client.access_key,
                    aws_secret_access_key=client.secret_key,
                ),
                s3_bucket=S3_BUCKET_NAME,
            ),
        },
    )
  7. Replace the content of the my-dagster-project/__init__.py file with the code below to load the definitions from the assets.py file:

    from dagster import Definitions, load_assets_from_modules
    
    from . import assets
    
    all_assets = load_assets_from_modules([assets])
    
    defs = assets.defs

Setting up the environment and creating the image

  1. Retrieve the following elements:

  2. Export the environment variables below:

    export PG_CONN_STRING=<your_serverless_database_connection_string>
    export SCW_DEFAULT_PROJECT_ID=<your_scaleway_project_id>
    export SCW_ACCESS_KEY=<your_scaleway_access_key>
    export SCW_SECRET_KEY=<your_scaleway_secret_key>
    export S3_BUCKET_NAME=<your_bucket_name>
  3. Build and push the image to your container registry namespace:

    docker build -t rg.fr-par.scw.cloud/<your-namespace>/dagster-scaleway-example:latest . &&
    docker push rg.fr-par.scw.cloud/<your-namespace>/dagster-scaleway-example:latest
    Note

    You can find the name and endpoint of the container registry namespace you created in the Scaleway console

  4. Create a dagster.yaml file at the root of the Dagster project to store the configuration, and replace the container image with the one you pushed:

run_launcher:
  module: dagster_scaleway
  class: ScalewayServerlessJobRunLauncher
  config:
    docker_image: rg.fr-par.scw.cloud/<your-namespace>/dagster-scaleway-example:latest
    env_vars:
    - PG_CONN_STRING
    - SCW_DEFAULT_PROJECT_ID
    - SCW_ACCESS_KEY
    - SCW_SECRET_KEY
    - S3_BUCKET_NAME

storage:
  postgres:
    postgres_url:
      # Can be obtained from the Scaleway Console
      env: PG_CONN_STRING

Executing Serverless Jobs using the Dagster interface

  1. Run the command below to launch the Dagster user interface locally:

    dagster dev

    Once the output displays the Dagster UI URL, copy it and open it in your browser.

    Note

    By default, the Dagster UI is hosted at http://127.0.0.1:3000.

  2. In the Dagster UI, click Materialize all on the right to create the Dagster assets and start your Serverless Job.

  3. Click Runs in the top menu, to display the list of past and current job runs.

  4. Click the Run ID you just created by materializing the assets to access the Run details page. From here, you can monitor the execution of your job run.

  5. Once the job run is done, click Show Markdown in the INFO column to display a preview of the title, author, and URL of the top Hacker News articles.

The binary file containing the DataFrame created is stored in the Object Storage bucket. Each new run will overwrite the existing DataFrame contained in the bucket.

The Serverless Database ensures communication between the different Dagster elements, especially to notify the UI that the scheduler has completed the execution of a job. It also stores the different events occurring during the execution.

Important

Dagster writes small chunks of data in the Serverless SQL Database as long as the UI is active. Close the UI from the terminal to automatically stop your database.

Going further

Troubleshooting

  • Make sure that you have exported the environment variables, as they are cleared at the end of a terminal session.

  • Make sure the assets.py and __init__.py file contain the corresponding code.

  • Make sure the dagster.yaml file is in the root folder of your Dagster project.

Questions?

Visit our Help Center and find the answers to your most frequent questions.

Visit Help Center
No Results