Airflow DAG example

Here is the example if Airflow DAG running SnowKill

from contextlib import closing
from datetime import datetime
from functools import cached_property
from typing import TYPE_CHECKING

from airflow import DAG
from airflow.models.baseoperator import BaseOperator

if TYPE_CHECKING:
    from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
    from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
    from airflow.utils.context import Context


default_args = {
    "snowflake_conn_id": "snowflake_default",
    "slack_webhook_conn_id": "slack_webhook_default"
}


class RunSnowkillOperator(BaseOperator):

    def __init__(
            self,
            *,
            snowkill_target_table: str,
            snowsight_base_url: str,
            slack_webhook_conn_id: str | None = None,
            snowflake_conn_id: str | None = None,
            **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.slack_webhook_conn_id = slack_webhook_conn_id
        self.snowflake_conn_id = snowflake_conn_id
        self.snowkill_target_table = snowkill_target_table
        self.snowsight_base_url = snowsight_base_url

    @cached_property
    def slack_hook(self) -> "SlackWebhookHook":
        from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
        return SlackWebhookHook(slack_webhook_conn_id=self.slack_webhook_conn_id)

    @cached_property
    def snowflake_hook(self) -> "SnowflakeHook":
        from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
        return SnowflakeHook(
            snowflake_conn_id=self.snowflake_conn_id,
            # database="UTILS",
            # schema="MONITOR",
            # role="ADMIN_MONITOR_ROLE",
            # warehouse="ADMIN_MONITOR_WH"
        )

    def execute(self, context: "Context") -> None:
        import snowkill as sk

        with closing(self.snowflake_hook.get_conn()) as conn:
            snowkill_engine = sk.SnowKillEngine(conn)
            snowkill_storage = sk.SnowflakeTableStorage(conn, self.snowkill_target_table)
            snowkill_formatter = sk.SlackFormatter(self.snowsight_base_url)

            checks = [
                sk.ExecuteDurationCondition(
                    warning_duration=60 * 30,  # 30 minutes for warning
                    kill_duration=60 * 60,  # 60 minutes for kill
                ),
                sk.CartesianJoinExplosionCondition(
                    min_output_rows=1_000_000,  # join emits at least 1M output rows
                    min_explosion_rate=5,  # ratio of output rows to input rows is at least 5x
                    warning_duration=60 * 5,  # 5 minutes for warning
                    kill_duration=60 * 10,  # 10 minutes for kill
                ),
                sk.JoinExplosionCondition(
                    min_output_rows=10_000_000,  # join emits at least 10M output rows
                    min_explosion_rate=10,  # ratio of output rows to input rows is at least 10x
                    warning_duration=60 * 10,  # 10 minutes for warning
                    kill_duration=60 * 20,  # 20 minutes for kill
                ),
                sk.UnionWithoutAllCondition(
                    min_input_rows=10_000_000,  # at least 10M input rows for UNION without ALL
                    notice_duration=60 * 10,  # 10 minutes for notice
                ),
                sk.StorageSpillingCondition(
                    min_local_spilling_gb=50,  # 50Gb spill to local storage
                    min_remote_spilling_gb=1,  # 1Gb spill to remote storage
                    warning_duration=60 * 10,  # 10 minutes for waring
                    kill_duration=60 * 20,  # 20 minutes for kill
                ),
                sk.QueuedDurationCondition(
                    notice_duration=60 * 30,  # query was in queue for 30 minutes
                ),
                sk.BlockedDurationCondition(
                    notice_duration=60 * 5,  # query was locked by another transaction for 5 minutes
                ),
                sk.EstimatedScanDurationCondition(
                    min_estimated_scan_duration=60 * 60 * 2,  # query scan is estimated to take longer than 2 hours
                    warning_duration=60 * 10,  # warning after 10 minutes
                    kill_duration=60 * 20,  # kill after 20 minutes
                ),
            ]

            check_results = snowkill_engine.check_and_kill_pending_queries(checks)
            self.log.info(f"[{len(check_results)}] queries matched check conditions")
            check_results = snowkill_storage.store_and_remove_duplicate(check_results)
            self.log.info(f"[{len(check_results)}] queries remained after store deduplication")

            # Send notification for each new check result
            for result in check_results:
                self.slack_hook.send(blocks=snowkill_formatter.format(result))


with DAG(
        dag_id="snowkill",
        description="Snowkill performs real time query monitoring in Snowflake.",
        schedule="*/10 * * * *",  # Every 10 minutes
        catchup=False,
        start_date=datetime(2024, 1, 1),  # Set an appropriate start date here
        default_args=default_args
) as dag:
    RunSnowkillOperator(
        task_id="run_snowkill",
        snowkill_target_table="UTILS.MONITOR.SNOWKILL_LOG",
        snowsight_base_url="https://app.snowflake.com/my-org/my-account",
        max_active_tis_per_dag=1
    )

Last updated