Skip to content

Testing Pipelines

Tests for the example stages and the end-to-end pipeline. Note the mocking of the external HTTP request.

demos/dagster/tests/test_example_pipeline.py
"""
Unit tests for the example pipeline.
"""
from io import BytesIO
from unittest.mock import MagicMock, patch

from dagster import execute_pipeline, build_solid_context
from dagster.core.execution.context.compute import SolidExecutionContext
from pytest import fixture
from requests import Response

from pipelines.example_pipeline import cereal_data_pipeline, download_data


@fixture(scope="session")
def test_data() -> BytesIO:
    with open("tests/test_data.csv", "r+b") as f:
        file_bytes = f.read()
    return BytesIO(file_bytes)


@fixture(scope="session")
def context() -> SolidExecutionContext:
    return build_solid_context()


@patch("pipelines.example_pipeline.requests")
def test_download_data_downloads_data(
    mock_requests: MagicMock, test_data: BytesIO, context: SolidExecutionContext
):
    mock_response = Response()
    mock_response.raw = test_data
    mock_requests.get.return_value = mock_response
    dataset = download_data(context)
    assert len(dataset) == 77
    assert len(dataset[0].keys()) == 16


def test_cereal_data_pipeline():
    result = execute_pipeline(cereal_data_pipeline)
    assert result.success