Airflow DAG example
Here is the example if Airflow DAG running SnowKill
from contextlib import closing
from datetime import datetime
from functools import cached_property
from typing import TYPE_CHECKING
from airflow import DAG
from airflow.models.baseoperator import BaseOperator
if TYPE_CHECKING:
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.utils.context import Context
default_args = {
"snowflake_conn_id": "snowflake_default",
"slack_webhook_conn_id": "slack_webhook_default"
}
class RunSnowkillOperator(BaseOperator):
def __init__(
self,
*,
snowkill_target_table: str,
snowsight_base_url: str,
slack_webhook_conn_id: str | None = None,
snowflake_conn_id: str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.slack_webhook_conn_id = slack_webhook_conn_id
self.snowflake_conn_id = snowflake_conn_id
self.snowkill_target_table = snowkill_target_table
self.snowsight_base_url = snowsight_base_url
@cached_property
def slack_hook(self) -> "SlackWebhookHook":
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
return SlackWebhookHook(slack_webhook_conn_id=self.slack_webhook_conn_id)
@cached_property
def snowflake_hook(self) -> "SnowflakeHook":
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
return SnowflakeHook(
snowflake_conn_id=self.snowflake_conn_id,
# database="UTILS",
# schema="MONITOR",
# role="ADMIN_MONITOR_ROLE",
# warehouse="ADMIN_MONITOR_WH"
)
def execute(self, context: "Context") -> None:
import snowkill as sk
with closing(self.snowflake_hook.get_conn()) as conn:
snowkill_engine = sk.SnowKillEngine(conn)
snowkill_storage = sk.SnowflakeTableStorage(conn, self.snowkill_target_table)
snowkill_formatter = sk.SlackFormatter(self.snowsight_base_url)
checks = [
sk.ExecuteDurationCondition(
warning_duration=60 * 30, # 30 minutes for warning
kill_duration=60 * 60, # 60 minutes for kill
),
sk.CartesianJoinExplosionCondition(
min_output_rows=1_000_000, # join emits at least 1M output rows
min_explosion_rate=5, # ratio of output rows to input rows is at least 5x
warning_duration=60 * 5, # 5 minutes for warning
kill_duration=60 * 10, # 10 minutes for kill
),
sk.JoinExplosionCondition(
min_output_rows=10_000_000, # join emits at least 10M output rows
min_explosion_rate=10, # ratio of output rows to input rows is at least 10x
warning_duration=60 * 10, # 10 minutes for warning
kill_duration=60 * 20, # 20 minutes for kill
),
sk.UnionWithoutAllCondition(
min_input_rows=10_000_000, # at least 10M input rows for UNION without ALL
notice_duration=60 * 10, # 10 minutes for notice
),
sk.StorageSpillingCondition(
min_local_spilling_gb=50, # 50Gb spill to local storage
min_remote_spilling_gb=1, # 1Gb spill to remote storage
warning_duration=60 * 10, # 10 minutes for waring
kill_duration=60 * 20, # 20 minutes for kill
),
sk.QueuedDurationCondition(
notice_duration=60 * 30, # query was in queue for 30 minutes
),
sk.BlockedDurationCondition(
notice_duration=60 * 5, # query was locked by another transaction for 5 minutes
),
sk.EstimatedScanDurationCondition(
min_estimated_scan_duration=60 * 60 * 2, # query scan is estimated to take longer than 2 hours
warning_duration=60 * 10, # warning after 10 minutes
kill_duration=60 * 20, # kill after 20 minutes
),
]
check_results = snowkill_engine.check_and_kill_pending_queries(checks)
self.log.info(f"[{len(check_results)}] queries matched check conditions")
check_results = snowkill_storage.store_and_remove_duplicate(check_results)
self.log.info(f"[{len(check_results)}] queries remained after store deduplication")
# Send notification for each new check result
for result in check_results:
self.slack_hook.send(blocks=snowkill_formatter.format(result))
with DAG(
dag_id="snowkill",
description="Snowkill performs real time query monitoring in Snowflake.",
schedule="*/10 * * * *", # Every 10 minutes
catchup=False,
start_date=datetime(2024, 1, 1), # Set an appropriate start date here
default_args=default_args
) as dag:
RunSnowkillOperator(
task_id="run_snowkill",
snowkill_target_table="UTILS.MONITOR.SNOWKILL_LOG",
snowsight_base_url="https://app.snowflake.com/my-org/my-account",
max_active_tis_per_dag=1
)
Last updated