🚩Getting started

Requirements


1) Install SnowKill

pip install snowkill

2) Create administration user in Snowflake account

Replace <password> placeholder with randomly generated password of your choice.

CREATE USER ADMIN_MONITOR
PASSWORD = '<password>'
DEFAULT_ROLE = 'ACCOUNTADMIN';

GRANT ROLE ACCOUNTADMIN TO USER ADMIN_MONITOR;

3) Create table and warehouse for SnowKill log storage in Snowflake

We suggest to start with Snowflake storage for demonstration purposes, since it requires the least amount of additional setup.

You will be able to use an alternative storage option for real production environment, e.g. Postgres.

CREATE DATABASE UTILS;
CREATE SCHEMA UTILS.MONITOR;

CREATE TABLE UTILS.MONITOR.SNOWKILL_LOG
(
    query_id VARCHAR(16777216),
    check_result_level NUMBER(38,0),
    check_result_name VARCHAR(16777216),
    check_result_description VARCHAR(16777216),
    check_result_time TIMESTAMP_NTZ(6)
);

CREATE WAREHOUSE ADMIN_MONITOR_WH
WAREHOUSE_SIZE = XSMALL
AUTO_SUSPEND = 60
INITIALLY_SUSPENDED = TRUE;

ALTER USER ADMIN_MONITOR SET
DEFAULT_WAREHOUSE = ADMIN_MONITOR_WH;

4) Create Slack application and obtain auth token

  • Press "Create New App" button, choose "From an app manifest".

  • Choose workspace, press "Next".

  • Choose "YAML" tab. Copy-paste settings below. Press "Next".

display_information:
  name: snowflake-monitor
  description: Monitoring of running, queued and blocked queries in Snowflake
  background_color: "#00a5e0"
features:
  bot_user:
    display_name: Snowflake Monitor
    always_online: true
oauth_config:
  scopes:
    bot:
      - chat:write
      - chat:write.public
settings:
  org_deploy_enabled: false
  socket_mode_enabled: false
  token_rotation_enabled: false
  • Press "Create".

  • Press "Install to Workspace" button, press "Allow".

  • Go to "Settings -> Install App" and copy the contents of Bot User OAuth Token field. You will need it to send notifications from Python script.

Here is a video-tutorial which might help you if you're new to Slack Apps: https://www.youtube.com/watch?v=q3SBz_eqOq0

Optionally, you may go to "Settings -> Basic Information", scroll down to "Display Information" section and upload this App Icon:

It will make notification messages look better.

5) Create a channel for notifications in Slack

The suggested channel name is: #snowflake-monitor

How to create a channel: https://slack.com/intl/en-gb/help/articles/201402297-Create-a-channel

6) Create a Python script with the following content

You may choose any script file name. For example: snowflake_monitor.py

from logging import getLogger, StreamHandler, INFO
from os import getenv
from requests import post
from snowflake.connector import SnowflakeConnection
from snowkill import *


def init_logger():
    logger = getLogger("snowkill")

    logger.setLevel(INFO)
    logger.addHandler(StreamHandler())

    return logger


def send_slack_message(slack_token: str, slack_channel: str, message_blocks: List):
    response = post(
        url="https://slack.com/api/chat.postMessage",
        headers={
            "Authorization": f"Bearer {slack_token}",
            "Content-type": "application/json; charset=utf-8",
        },
        json={
            "channel": slack_channel,
            "blocks": message_blocks,
        },
    )

    response.raise_for_status()

    return response.json()


"""
Complete example featuring basic usage of SnowKill:

1) Authorise by password
2) Apply all types of checks
3) Store and deduplicate in Snowflake table
4) Format and send new check results to Slack
"""
logger = init_logger()

connection = SnowflakeConnection(
    account=getenv("SNOWFLAKE_ACCOUNT"),
    user=getenv("SNOWFLAKE_USER"),
    password=getenv("SNOWFLAKE_PASSWORD"),
)

snowkill_engine = SnowKillEngine(connection)
snowkill_storage = SnowflakeTableStorage(connection, getenv("SNOWFLAKE_TARGET_TABLE"))
snowkill_formatter = SlackFormatter(getenv("SNOWSIGHT_BASE_URL"))

checks = [
    ExecuteDurationCondition(
        warning_duration=60 * 30,  # 30 minutes for warning
        kill_duration=60 * 60,  # 60 minutes for kill
    ),
    CartesianJoinExplosionCondition(
        min_output_rows=1_000_000,  # join emits at least 1M output rows
        min_explosion_rate=5,  # ratio of output rows to input rows is at least 5x
        warning_duration=60 * 5,  # 5 minutes for warning
        kill_duration=60 * 10,  # 10 minutes for kill
    ),
    JoinExplosionCondition(
        min_output_rows=10_000_000,  # join emits at least 10M output rows
        min_explosion_rate=10,  # ratio of output rows to input rows is at least 10x
        warning_duration=60 * 10,  # 10 minutes for warning
        kill_duration=60 * 20,  # 20 minutes for kill
    ),
    UnionWithoutAllCondition(
        min_input_rows=10_000_000,  # at least 10M input rows for UNION without ALL
        notice_duration=60 * 10,  # 10 minutes for notice
    ),
    StorageSpillingCondition(
        min_local_spilling_gb=50,  # 50Gb spill to local storage
        min_remote_spilling_gb=1,  # 1Gb spill to remote storage
        warning_duration=60 * 10,  # 10 minutes for waring
        kill_duration=60 * 20,  # 20 minutes for kill
    ),
    QueuedDurationCondition(
        notice_duration=60 * 30,  # query was in queue for 30 minutes
    ),
    BlockedDurationCondition(
        notice_duration=60 * 5,  # query was locked by another transaction for 5 minutes
    ),
    EstimatedScanDurationCondition(
        min_estimated_scan_duration=60 * 60 * 2,  # query scan is estimated to take longer than 2 hours
        warning_duration=60 * 10,  # warning after 10 minutes
        kill_duration=60 * 20,  # kill after 20 minutes
    ),
]

# Apply checks to running, queued, blocked queries
check_results = snowkill_engine.check_and_kill_pending_queries(checks)
logger.info(f"[{len(check_results)}] queries matched check conditions")

# Save successful checks in storage and remove duplicates
check_results = snowkill_storage.store_and_remove_duplicate(check_results)
logger.info(f"[{len(check_results)}] queries remained after store deduplication")

# Send notification for each new check result
for r in check_results:
    response = send_slack_message(
        slack_token=getenv("SLACK_TOKEN"),
        slack_channel=getenv("SLACK_CHANNEL"),
        message_blocks=snowkill_formatter.format(r),
    )

    if response["ok"]:
        logger.info(f"Sent Slack notification for query [{r.query.query_id}]")
    else:
        logger.warning(f"Failed to send Slack notification for query [{r.query.query_id}], error: [{response['error']}]")

7) Set config options using environment variables

Replace example config options with values relevant for your Snowflake and Slack accounts. Execute commands in terminal.

Snowflake credentials:

export SNOWFLAKE_ACCOUNT=ginfuwg-us67634
export SNOWFLAKE_USER=admin_monitor
export SNOWFLAKE_PASSWORD=<password>

Snowsight base URL, copy-pasted from browser window. It usually consists of http://app.snowflake.com/, followed by organisation name and account name. But it might be different for old accounts and accounts with Private Link.

export SNOWSIGHT_BASE_URL="https://app.snowflake.com/ginfuwg/us67634/"

Snowflake table name to store logs:

export SNOWFLAKE_TARGET_TABLE=utils.monitor.snowkill_log

Slack credentials:

export SLACK_TOKEN="xoxb-XXXXXXXXXXXX"
export SLACK_CHANNEL="#snowflake-monitor"

8) Start a "bad" query in Snowflake and run python script

Here is a good example of "bad" query, which uses standard SNOWFLAKE_SAMPLE_DATA database, which is always available for newly created accounts.

SELECT *
FROM snowflake_sample_data.tpch_sf1000.customer a
    JOIN snowflake_sample_data.tpch_sf100.customer b ON (a.c_custkey > b.c_custkey);

After ~5 minutes, run python script a few times:

python snowflake_monitor.py

If everything works correctly, you should see a log message about query with cartesian join. You should also receive a notification in Slack channel.


Congratulations!

Now you are ready to learn more about specific conditions, filters, alternative storage and notifications options.

Also, you may now browse other complete examples on GitHub.

Last updated