🛠️Implementation details

SnowKill is a tool for Data Engineers and DevOps engineers.

SnowKill application is usually implemented as Python script, which consists of multiple interchangeable parts. These parts are meant to be extended and adapted for your specific organization needs.

The parts are:

  • Engine - accepts Snowflake connection, communicates with REST API and performs checks.

  • Condition(s) - implement logic for specific query check conditions.

  • Formatter(s) - implement notification formatting for matched queries.

  • Storage(s) - implement log storage and deduplication functionality.

SnowKill does not have built-in functions to send formatted notifications. There are many other packages, SDKs and cloud native services solving this problem. However, we provide some basic examples and guidance to get you started.

Python Script structure

1) Open Snowflake connection

You should pre-configure an administration user for SnowKill with correct privileges.

You have full control over construction of connection object so you can use any connection method supported by Snowflake. For example:

from snowflake.connector import SnowflakeConnection

connection = SnowflakeConnection(
    account=getenv("SNOWFLAKE_ACCOUNT"),
    user=getenv("SNOWFLAKE_USER"),
    password=getenv("SNOWFLAKE_PASSWORD"),
)

2) Init engine, storage and formatter

In this example we're going to store logs in Snowflake and format messages for Slack.

You may use any other built-in storage and built-in formatter.

You may also create your own custom storage or custom formatter.

from snowkill import *

snowkill_engine = SnowKillEngine(connection)
snowkill_storage = SnowflakeTableStorage(connection, getenv("SNOWFLAKE_TARGET_TABLE"))
snowkill_formatter = SlackFormatter(getenv("SNOWSIGHT_BASE_URL"))

3) Define conditions

You may use built-in conditions and / or create your own custom conditions.

You may use query filters to narrow down conditions. If query filter is not specified, condition will be applied to ALL queries running on Snowflake account.

Here is a good list of conditions for starters:

checks = [
    ExecuteDurationCondition(
        warning_duration=60 * 30,  # 30 minutes for warning
        kill_duration=60 * 60,  # 60 minutes for kill
    ),
    CartesianJoinExplosionCondition(
        min_output_rows=10_000_000,  # join emits at least 10M output rows
        min_explosion_rate=10,  # ratio of output rows to input rows is at least 10x
        warning_duration=60 * 10,  # 10 minutes for warning
        kill_duration=60 * 20,  # 20 minutes for kill
    ),
    JoinExplosionCondition(
        min_output_rows=10_000_000,  # join emits at least 10M output rows
        min_explosion_rate=10,  # ratio of output rows to input rows is at least 10x
        warning_duration=60 * 10,  # 10 minutes for warning
        kill_duration=60 * 20,  # 20 minutes for kill
    ),
    UnionWithoutAllCondition(
        min_input_rows=10_000_000,  # at least 10M input rows for UNION without ALL
        notice_duration=60 * 10,  # 10 minutes for notice
    ),
    StorageSpillingCondition(
        min_local_spilling_gb=50,  # 50Gb spill to local storage
        min_remote_spilling_gb=1,  # 1Gb spill to remote storage
        warning_duration=60 * 10,  # 10 minutes for waring
        kill_duration=60 * 20,  # 20 minutes for kill
    ),
    QueuedDurationCondition(
        notice_duration=60 * 30,  # query was in queue for 30 minutes
    ),
    BlockedDurationCondition(
        notice_duration=60 * 5,  # query was locked by another transaction for 5 minutes
    ),
    EstimatedScanDurationCondition(
        min_estimated_scan_duration=60 * 60 * 2,  # query scan is estimated to take longer than 2 hours
        warning_duration=60 * 10,  # warning after 10 minutes
        kill_duration=60 * 20,  # kill after 20 minutes
    ),
]

4) Optionally allow some conditions to "kill" queries

By default no queries are "killed". You should explicitly set enable_kill=True and specify kill_duration for each individual condition.

If query would be killed according to conditions, but enable_kill was not set, this query is reported as "would be killed" instead of being actually killed. You may read more about it on "Check result levels" page.

You may additionally narrow down kill conditions using enable_kill_query_filter.

For example:

checks = [
    ExecuteDurationCondition(
        warning_duration=60 * 30,  # 30 minutes for warning
        kill_duration=60 * 60,  # 60 minutes for kill
        query_filter=QueryFilter(
            include_user_name=["STARTUP_*"],  # Apply to users with names starting with prefix `STARTUP_`
            exclude_user_name=["STARTUP_ADMIN", "STARTUP_SCRIPT"],  # Exclude some specific users
        ),
    ),
    QueuedDurationCondition(
        notice_duration=60 * 30,  # query was in queue for 30 minutes
        kill_duration=60 * 60 * 3,  # query was in queue for 3 hours
        query_filter=QueryFilter(
            include_warehouse_name=["ANALYST_WH", "LOOKER_WH"],  # Apply to specific warehouses
            include_query_tag=["*some_fancy_tag*"],  # Apply to specific query tag
        ),
        enable_kill=True,  # queries can be actually killed
        enable_kill_query_filter=QueryFilter(
            exclude_sql_text=["*--no-kill*"],  # Do not kill queries with SQL comment `--no-kill` (stop word)
        ),
    ),
]

5) Run checks

Run checks using engine function:

# Apply checks to running, queued, blocked queries
check_results = snowkill_engine.check_and_kill_pending_queries(checks)

It returns list of Check Results, one entry for each query matching conditions.

If one query matches multiple conditions at the same time, the result with highest level will be returned.

6) Run deduplication and store logs

Run deduplication using storage function:

# Save successful checks in storage and remove duplicates
check_results = snowkill_storage.store_and_remove_duplicate(check_results)

It returns list of filtered Check Results. The results which appeared earlier are removed.

Problematic queries may run for a long time. We have to remember which queries were reported previously, so we would not send notifications again.

However, if query was previously reported with lower level, but now the result contains higher level, such query will be reported again with higher level.

7) Format messages and send notifications

Format and send notifications using list of filtered Check Results.

Formatter returns "message blocks", which should be used for Slack API calls.

For example:

# Send notification for each new check result
for r in check_results:
    response = send_slack_message(
        slack_token=getenv("SLACK_TOKEN"),
        slack_channel=getenv("SLACK_CHANNEL"),
        message_blocks=snowkill_formatter.format(r),
    )

    if response["ok"]:
        logger.info(f"Sent Slack notification for query [{r.query.query_id}]")
    else:
        logger.warning(f"Failed to send Slack notification for query [{r.query.query_id}], error: [{response['error']}]")

You may read a bit more about Slack integration on a dedicated documentation page.

Last updated