🚩Getting started
Requirements
Python 3.8+
Python requests package (likely to be installed already)
Active Snowflake account (you may use a free trial account for testing)
Active Slack workspace for notifications
1) Install SnowKill
pip install snowkill
2) Generate key pair for service account
Generate and securely store private and public keys for a service account: https://docs.snowflake.com/en/user-guide/key-pair-auth#generate-the-private-key
3) Create new administrator user in Snowflake
Replace <public_key>
placeholder with contents of public key. Make sure to remove delimiters.
CREATE USER ADMIN_MONITOR
TYPE = SERVICE
RSA_PUBLIC_KEY = 'MIIBIjANBgkqh...'
DEFAULT_ROLE = 'ACCOUNTADMIN';
GRANT ROLE ACCOUNTADMIN TO USER ADMIN_MONITOR;
Role ACCOUNTADMIN
is currently required in order for SnowKill to have full access to all SQL queries executed on account. Also, it is required to execute global SHOW LOCKS IN ACCOUNT
command.
3) Create table and warehouse for SnowKill log storage in Snowflake
We suggest to start with Snowflake storage for demonstration purposes, since it requires the least amount of additional setup.
You will be able to use an alternative storage option for real production environment, e.g. Postgres or Hybrid Table.
CREATE DATABASE UTILS;
CREATE SCHEMA UTILS.MONITOR;
CREATE TABLE UTILS.MONITOR.SNOWKILL_LOG
(
query_id VARCHAR(16777216),
check_result_level NUMBER(38,0),
check_result_name VARCHAR(16777216),
check_result_description VARCHAR(16777216),
check_result_time TIMESTAMP_NTZ(6)
);
CREATE WAREHOUSE ADMIN_MONITOR_WH
WAREHOUSE_SIZE = XSMALL
AUTO_SUSPEND = 60
INITIALLY_SUSPENDED = TRUE;
ALTER USER ADMIN_MONITOR SET
DEFAULT_WAREHOUSE = ADMIN_MONITOR_WH;
4) Create Slack application and obtain auth token
Go to https://api.slack.com/apps and login.
Press "Create New App" button, choose "From an app manifest".
Choose workspace, press "Next".
Choose "YAML" tab. Copy-paste settings below. Press "Next".
display_information:
name: snowflake-monitor
description: Monitoring of running, queued and blocked queries in Snowflake
background_color: "#00a5e0"
features:
bot_user:
display_name: Snowflake Monitor
always_online: true
oauth_config:
scopes:
bot:
- chat:write
- chat:write.public
settings:
org_deploy_enabled: false
socket_mode_enabled: false
token_rotation_enabled: false
Press "Create".
Press "Install to Workspace" button, press "Allow".
Go to "Settings -> Install App" and copy the contents of Bot User OAuth Token field. You will need it to send notifications from Python script.
Here is a video-tutorial which might help you if you're new to Slack Apps: https://www.youtube.com/watch?v=q3SBz_eqOq0
Optionally, you may go to "Settings -> Basic Information", scroll down to "Display Information" section and upload this App Icon:

It will make notification messages look better.
5) Create a channel for notifications in Slack
The suggested channel name is: #snowflake-monitor
How to create a channel: https://slack.com/intl/en-gb/help/articles/201402297-Create-a-channel
6) Create a Python script with the following content
You may choose any script file name. For example: snowflake_monitor.py
from logging import getLogger, StreamHandler, INFO
from os import getenv
from requests import post
from snowflake.connector import SnowflakeConnection
from snowkill import *
def init_logger():
logger = getLogger("snowkill")
logger.setLevel(INFO)
logger.addHandler(StreamHandler())
return logger
def send_slack_message(slack_token: str, slack_channel: str, message_blocks: List):
response = post(
url="https://slack.com/api/chat.postMessage",
headers={
"Authorization": f"Bearer {slack_token}",
"Content-type": "application/json; charset=utf-8",
},
json={
"channel": slack_channel,
"blocks": message_blocks,
},
)
response.raise_for_status()
return response.json()
"""
Complete example featuring basic usage of SnowKill:
1) Auth using private key file
2) Apply all types of checks
3) Store and deduplicate in Snowflake table
4) Format and send new check results to Slack
"""
logger = init_logger()
connection = SnowflakeConnection(
account=getenv("SNOWFLAKE_ACCOUNT"),
user=getenv("SNOWFLAKE_USER"),
private_key_file=getenv("SNOWFLAKE_PRIVATE_KEY_FILE"),
)
snowkill_engine = SnowKillEngine(connection)
snowkill_storage = SnowflakeTableStorage(connection, getenv("SNOWFLAKE_TARGET_TABLE"))
snowkill_formatter = SlackFormatter(getenv("SNOWSIGHT_BASE_URL"))
checks = [
ExecuteDurationCondition(
warning_duration=60 * 30, # 30 minutes for warning
kill_duration=60 * 60, # 60 minutes for kill
),
CartesianJoinExplosionCondition(
min_output_rows=1_000_000, # join emits at least 1M output rows
min_explosion_rate=5, # ratio of output rows to input rows is at least 5x
warning_duration=60 * 5, # 5 minutes for warning
kill_duration=60 * 10, # 10 minutes for kill
),
JoinExplosionCondition(
min_output_rows=10_000_000, # join emits at least 10M output rows
min_explosion_rate=10, # ratio of output rows to input rows is at least 10x
warning_duration=60 * 10, # 10 minutes for warning
kill_duration=60 * 20, # 20 minutes for kill
),
UnionWithoutAllCondition(
min_input_rows=10_000_000, # at least 10M input rows for UNION without ALL
notice_duration=60 * 10, # 10 minutes for notice
),
StorageSpillingCondition(
min_local_spilling_gb=50, # 50Gb spill to local storage
min_remote_spilling_gb=1, # 1Gb spill to remote storage
warning_duration=60 * 10, # 10 minutes for waring
kill_duration=60 * 20, # 20 minutes for kill
),
QueuedDurationCondition(
notice_duration=60 * 30, # query was in queue for 30 minutes
),
BlockedDurationCondition(
notice_duration=60 * 5, # query was locked by another transaction for 5 minutes
),
EstimatedScanDurationCondition(
min_estimated_scan_duration=60 * 60 * 2, # query scan is estimated to take longer than 2 hours
warning_duration=60 * 10, # warning after 10 minutes
kill_duration=60 * 20, # kill after 20 minutes
),
]
# Apply checks to running, queued, blocked queries
check_results = snowkill_engine.check_and_kill_pending_queries(checks)
logger.info(f"[{len(check_results)}] queries matched check conditions")
# Save successful checks in storage and remove duplicates
check_results = snowkill_storage.store_and_remove_duplicate(check_results)
logger.info(f"[{len(check_results)}] queries remained after store deduplication")
# Send notification for each new check result
for r in check_results:
response = send_slack_message(
slack_token=getenv("SLACK_TOKEN"),
slack_channel=getenv("SLACK_CHANNEL"),
message_blocks=snowkill_formatter.format(r),
)
if response["ok"]:
logger.info(f"Sent Slack notification for query [{r.query.query_id}]")
else:
logger.warning(f"Failed to send Slack notification for query [{r.query.query_id}], error: [{response['error']}]")
7) Set config options using environment variables
Replace example config options with values relevant for your Snowflake and Slack accounts. Execute commands in terminal.
Snowflake credentials:
export SNOWFLAKE_ACCOUNT=<account_identifier>
export SNOWFLAKE_USER=admin_monitor
export SNOWFLAKE_PRIVATE_KEY_FILE=<private_key_file_location>
Snowsight base URL, copy-pasted from browser window. It usually consists of http://app.snowflake.com/
, followed by organisation name and account name. But it might be different for old accounts and accounts with Private Link.
export SNOWSIGHT_BASE_URL="https://app.snowflake.com/<org>/<account>/"
Snowflake table name to store logs:
export SNOWFLAKE_TARGET_TABLE=utils.monitor.snowkill_log
Slack credentials:
export SLACK_TOKEN="xoxb-XXXXXXXXXXXX"
export SLACK_CHANNEL="#snowflake-monitor"
8) Start a "bad" query in Snowflake and run python script
Here is a good example of "bad" query, which uses standard SNOWFLAKE_SAMPLE_DATA
database, which is always available for newly created accounts.
SELECT *
FROM snowflake_sample_data.tpch_sf1000.customer a
JOIN snowflake_sample_data.tpch_sf100.customer b ON (a.c_custkey > b.c_custkey);
After ~5 minutes, run python script a few times:
python snowflake_monitor.py
If everything works correctly, you should see a log message about query with cartesian join. You should also receive a notification in Slack channel.
Congratulations!
Now you are ready to learn more about specific conditions, filters, alternative storage and notifications options.
Also, you may now browse other complete examples on GitHub.
Last updated