Ask AI

You are viewing an unreleased or outdated version of the documentation

Asset sensors#

Asset sensors allow you to instigate runs when materializations occur.


Relevant APIs#

NameDescription
RunRequestThe sensor evaluation function can yield one or more run requests. Each run request creates a job run.
SkipReasonIf a sensor evaluation doesn't yield any run requests, it can instead yield a skip reason to log why the evaluation was skipped or why there were no events to be processed.
@asset_sensorThe decorator used to define an asset sensor. The decorated function is an evaluation function that takes in a SensorEvaluationContext and an asset materialization event. The decorator returns an AssetSensorDefinition
AssetSensorDefinitionA special sensor definition class for asset sensors. You almost never want to use initialize this class directly. Instead, you should use the @asset_sensor which returns a AssetSensorDefinition
@freshness_policy_sensorThe decorator used to define a freshness policy sensor. The decorated function is an evaluation function that takes in a FreshnessPolicySensorContext. The decorator returns a FreshnessPolicySensorDefinition
FreshnessPolicySensorDefinitionA special sensor definition class for freshness policy sensors. You almost never want to use initialize this class directly. Instead, you should use the @freshness_policy_sensor which returns a FreshnessPolicySensorDefinition
---

Defining an asset sensor#

An asset sensor checks for new AssetMaterialization events for a particular asset key. This can be used to kick off a job that computes downstream assets or notifies appropriate stakeholders.

One benefit of this pattern is that it enables cross-job and even cross-code-location dependencies. Each job run instigated by an asset sensor is agnostic to the job that caused it.

Dagster provides a special asset sensor definition format for sensors that fire a single RunRequest based on a single asset materialization. Here is an example of a sensor that generates a RunRequest for every materialization for the asset key my_table:

from dagster import (
    AssetKey,
    EventLogEntry,
    RunConfig,
    SensorEvaluationContext,
    asset_sensor,
)


@asset_sensor(asset_key=AssetKey("my_table"), job=my_job)
def my_asset_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
    assert asset_event.dagster_event and asset_event.dagster_event.asset_key
    yield RunRequest(
        run_key=context.cursor,
        run_config=RunConfig(
            ops={
                "read_materialization": ReadMaterializationConfig(
                    asset_key=list(asset_event.dagster_event.asset_key.path)
                )
            }
        ),
    )

Testing an asset sensor#

To write a unit test for an asset sensor, you first need to construct a context object using a test instance that can log AssetMaterialization events.

You can then directly invoke your asset sensor with that context and evaluate the results.

from dagster import DagsterInstance, build_sensor_context, materialize


def test_my_asset_sensor():
    @asset
    def my_table():
        return 1

    instance = DagsterInstance.ephemeral()
    ctx = build_sensor_context(instance)

    result = list(my_asset_sensor(ctx))
    assert len(result) == 1
    assert isinstance(result[0], SkipReason)

    materialize([my_table], instance=instance)

    result = list(my_asset_sensor(ctx))
    assert len(result) == 1
    assert isinstance(result[0], RunRequest)

Freshness policy sensors#

Freshness policy sensors are experimental.

A freshness policy sensor checks the freshness of a given selection of assets on each tick, and performs some action in response to that status.

FreshnessPolicySensorContext has a current_minutes_late property, specifying how many minutes late the asset is with respect to its FreshnessPolicy, as well as previous_minutes_late, the number of minutes late that asset was on the previous sensor tick. Each tick, the decorated function will be run for each asset within the asset_selection that has a FreshnessPolicy defined.

Currently, freshness policy sensors do not support returning or yielding values (such as RunRequests) from their execution function.

Here is an example of a sensor that will send a single alert once an asset is 10 minutes later than its configured policy allows, and a single alert once that asset is on time again.

from dagster import FreshnessPolicySensorContext, freshness_policy_sensor


@freshness_policy_sensor(asset_selection=AssetSelection.all())
def my_freshness_alerting_sensor(context: FreshnessPolicySensorContext):
    if context.minutes_overdue is None or context.previous_minutes_overdue is None:
        return

    if context.minutes_overdue >= 10 and context.previous_minutes_overdue < 10:
        send_alert(
            f"Asset with key {context.asset_key} is now more than 10 minutes overdue."
        )
    elif context.minutes_overdue == 0 and context.previous_minutes_overdue >= 10:
        send_alert(f"Asset with key {context.asset_key} is now on time.")