Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create centralised way to publish production versions of data to Azure #123

Open
penelopeysm opened this issue Jun 26, 2024 · 4 comments · May be fixed by #131
Open

Create centralised way to publish production versions of data to Azure #123

penelopeysm opened this issue Jun 26, 2024 · 4 comments · May be fixed by #131
Assignees

Comments

@penelopeysm
Copy link
Member

penelopeysm commented Jun 26, 2024

Right now, our production data is in the Azure blob storage container https://popgetter.blob.core.windows.net/popgetter-dagster-test/test_2 and one of us will populate this by setting ENV=prod and running all the Dagster pipelines locally :)

I think it's useful to have a single, centralised, way to generate all production data and upload it to another Azure blob storage container (that has a less testy name :-)). There are several benefits of this:

  1. Reproducibility — It is clear which data is being uploaded and how it is being generated.
  2. Handles top level countries.txt file cleanly — The CLI uses this file to determine which countries are present as it cannot traverse the Azure directory structure. Right now the file is being manually generated, which can easily lead to inconsistencies between what it says and the actual data that is tehre
  3. Statelessness — The pipeline should wipe the entire blob storage container before re-uploading everything. That way we don't end up with some data updated and others not (which would be bad if e.g. the metadata schema is changed).
  4. Continuous deployment — The pipeline can be automatically triggered by new versions/releases on GitHub.

I can throw together a quick Dockerfile for this and maybe investigate running this on GitHub Actions / Azure!

GHA has usage limits (https://docs.github.com/en/actions/learn-github-actions/usage-limits-billing-and-administration); in particular, "each job in a workflow can run for up to 6 hours of execution time" so it is not a deployment method that will scale well if we have many countries to run. I think for what we have now (BE + NI) it is still workable.

@penelopeysm penelopeysm self-assigned this Jun 26, 2024
@penelopeysm
Copy link
Member Author

penelopeysm commented Jun 26, 2024

Installing fiona on a Docker image on an M1 Mac is not fun 🫠

Edit: apt-get update && apt-get install -y libgdal-dev g++ takes care of the fiona dependencies

@penelopeysm
Copy link
Member Author

penelopeysm commented Jun 26, 2024

Next problem: From the CLI the way to run all partitions of a job is to do dagster job backfill -j <job_name> --noprompt, but:

  • if the partitions have already been determined (e.g. through a previous run) and there are N partitions, that causes unpartitioned assets to be materialised N times 😢 see Fix performance issues with Belgium DAG #59
  • if the partitions have not yet been determined then it crashes max_str_len = max(len(x) for x in partitions) ValueError: max() iterable argument is empty. You need to run dagster job launch -j <job_name> first which runs all the unpartitioned assets at the start and populates the list of dynamic partitions. As mentioned above, that tries to run all the unpartitioned assets N times and e.g. for the final metrics asset it means it will fail N-1 times until the last derived_metrics partition has completed 🙄

Not sure how to get the sensors to run. I got them to run by doing dagster-daemon run after the assets have been materialised, and that's basically the same as doing dagster dev offline, but this is finicky because there's no indication when the sensors have finished running. It doesn't seem like there's a dagster sensor run command, you can only turn them on/off and then the daemon is the one that's actually responsible for running them.

Oh, if that wasn't complicated enough: dagster job runs stuff in the background, so there's no obvious way of telling when it's done, too.

I don't know if it is possible and/or makes more sense to trigger the asset materialisations using Python code. If this is the case, we could in theory parse the DAG, inspect the asset to check whether it's partitioned or not, and then run it accordingly. This seems like something that would be really useful upstream, but I think it is also a lot of work.

@penelopeysm
Copy link
Member Author

penelopeysm commented Jul 1, 2024

Code to run a job without the issue above!!!!!!

# import the top level defs thing from popgetter
from . import defs
import time
from dagster import materialize, DagsterInstance, DynamicPartitionsDefinition

job_name = "job"   # Replace with whatever
job = defs.get_job_def(job_name)

# Required for persisting outputs in $DAGSTER_HOME/storage
instance = DagsterInstance.get()

dependency_list = job._graph_def._dependencies

all_assets = {node_handle.name: definition
              for node_handle, definition in
              job._asset_layer.assets_defs_by_node_handle.items()}


def find_materialisable_asset_names(dep_list, done_asset_names) -> set[str]:
    materialisable_asset_names = set()

    for asset, dep_dict in dep_list.items():
        if asset.name in done_asset_names:
            continue

        if all(dep.node in done_asset_names for dep in dep_dict.values()):
            materialisable_asset_names.add(asset.name)

    return materialisable_asset_names


print("--------------------")
materialised_asset_names = set()
while len(materialised_asset_names) < len(all_assets):
    time.sleep(0.5)
    asset_names_to_materialise = find_materialisable_asset_names(dependency_list, materialised_asset_names)

    if len(asset_names_to_materialise) == 0:
        print("No more assets to materialise")
        break

    asset_name_to_materialise = asset_names_to_materialise.pop()
    asset_to_materialise = all_assets.get(asset_name_to_materialise)

    print(f"Materialising: {asset_name_to_materialise}")

    partitions_def = asset_to_materialise.partitions_def

    # https://docs.dagster.io/_apidocs/execution#dagster.materialize -- note
    # that the `assets` keyword argument needs to include upstream assets as
    # well. We use `selection` to specify the asset that is actually being
    # materialised.
    if partitions_def is None:
        # Unpartitioned
        materialize(assets=[asset_to_materialise,
                            *(all_assets.get(k) for k in materialised_asset_names)],
                    selection=[asset_to_materialise],
                    instance=instance)
        materialised_asset_names.add(asset_name_to_materialise)

    else:
        # Partitioned
        if type(partitions_def) != DynamicPartitionsDefinition:
            raise NotImplementedError("Non-dynamic partitions not implemented yet")
        partition_names = instance.get_dynamic_partitions(partitions_def.name)

        for partition in partition_names:
            materialize(assets=[asset_to_materialise,
                                *(all_assets.get(k) for k in materialised_asset_names)],
                        selection=[asset_to_materialise],
                        partition_key=partition,
                        instance=instance)
        materialised_asset_names.add(asset_name_to_materialise)

penelopeysm added a commit that referenced this issue Jul 1, 2024
penelopeysm added a commit that referenced this issue Jul 1, 2024
@penelopeysm
Copy link
Member Author

penelopeysm commented Jul 1, 2024

Progress!!!!!!!

Run locally with:

git fetch
git checkout origin/automated-publishing

# activate your venv
# optionally, launch `dagster dev` so that you can watch this go on in the web UI

set -a; source .env; set +a
POPGETTER_COUNTRIES=bel,gb_nir python -m popgetter.run all

Or use the Dockerfile:

git fetch
git checkout origin/automated-publishing
export SAS_TOKEN=(whatever it should be)
docker build -t popgetter-prod . --build-arg "SAS_TOKEN=$SAS_TOKEN"
docker run -it popgetter-prod

Yet to figure out:

  • How to not hardcode the list of jobs
  • running the cloud sensors so that they're serialised into the right place
  • generating countries.txt file (shouldn't really be that hard)
    • This should probably be an asset of its own
  • how to not pass the Azure SAS key as a plaintext secret in the container (would be very awkward)

@penelopeysm penelopeysm linked a pull request Jul 2, 2024 that will close this issue
penelopeysm added a commit that referenced this issue Jul 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Backlog:
Development

Successfully merging a pull request may close this issue.

1 participant