Replace <public_key> placeholder with contents of public key. Make sure to remove delimiters.
CREATE USER ADMIN_MONITOR
TYPE = SERVICE
RSA_PUBLIC_KEY = 'MIIBIjANBgkqh...'
DEFAULT_ROLE = 'ACCOUNTADMIN';
GRANT ROLE ACCOUNTADMIN TO USER ADMIN_MONITOR;
Role ACCOUNTADMIN is currently required in order for SnowKill to have full access to all SQL queries executed on account. Also, it is required to execute global SHOW LOCKS IN ACCOUNT command.
3) Create table and warehouse for SnowKill log storage in Snowflake
We suggest to start with Snowflake storage for demonstration purposes, since it requires the least amount of additional setup.
You will be able to use an alternative storage option for real production environment, e.g. Postgres or Hybrid Table.
6) Create a Python script with the following content
You may choose any script file name. For example: snowflake_monitor.py
from logging import getLogger, StreamHandler, INFO
from os import getenv
from requests import post
from snowflake.connector import SnowflakeConnection
from snowkill import *
def init_logger():
logger = getLogger("snowkill")
logger.setLevel(INFO)
logger.addHandler(StreamHandler())
return logger
def send_slack_message(slack_token: str, slack_channel: str, message_blocks: List):
response = post(
url="https://slack.com/api/chat.postMessage",
headers={
"Authorization": f"Bearer {slack_token}",
"Content-type": "application/json; charset=utf-8",
},
json={
"channel": slack_channel,
"blocks": message_blocks,
},
)
response.raise_for_status()
return response.json()
"""
Complete example featuring basic usage of SnowKill:
1) Auth using private key file
2) Apply all types of checks
3) Store and deduplicate in Snowflake table
4) Format and send new check results to Slack
"""
logger = init_logger()
connection = SnowflakeConnection(
account=getenv("SNOWFLAKE_ACCOUNT"),
user=getenv("SNOWFLAKE_USER"),
private_key_file=getenv("SNOWFLAKE_PRIVATE_KEY_FILE"),
)
snowkill_engine = SnowKillEngine(connection)
snowkill_storage = SnowflakeTableStorage(connection, getenv("SNOWFLAKE_TARGET_TABLE"))
snowkill_formatter = SlackFormatter(getenv("SNOWSIGHT_BASE_URL"))
checks = [
ExecuteDurationCondition(
warning_duration=60 * 30, # 30 minutes for warning
kill_duration=60 * 60, # 60 minutes for kill
),
CartesianJoinExplosionCondition(
min_output_rows=1_000_000, # join emits at least 1M output rows
min_explosion_rate=5, # ratio of output rows to input rows is at least 5x
warning_duration=60 * 5, # 5 minutes for warning
kill_duration=60 * 10, # 10 minutes for kill
),
JoinExplosionCondition(
min_output_rows=10_000_000, # join emits at least 10M output rows
min_explosion_rate=10, # ratio of output rows to input rows is at least 10x
warning_duration=60 * 10, # 10 minutes for warning
kill_duration=60 * 20, # 20 minutes for kill
),
UnionWithoutAllCondition(
min_input_rows=10_000_000, # at least 10M input rows for UNION without ALL
notice_duration=60 * 10, # 10 minutes for notice
),
StorageSpillingCondition(
min_local_spilling_gb=50, # 50Gb spill to local storage
min_remote_spilling_gb=1, # 1Gb spill to remote storage
warning_duration=60 * 10, # 10 minutes for waring
kill_duration=60 * 20, # 20 minutes for kill
),
QueuedDurationCondition(
notice_duration=60 * 30, # query was in queue for 30 minutes
),
BlockedDurationCondition(
notice_duration=60 * 5, # query was locked by another transaction for 5 minutes
),
EstimatedScanDurationCondition(
min_estimated_scan_duration=60 * 60 * 2, # query scan is estimated to take longer than 2 hours
warning_duration=60 * 10, # warning after 10 minutes
kill_duration=60 * 20, # kill after 20 minutes
),
]
# Apply checks to running, queued, blocked queries
check_results = snowkill_engine.check_and_kill_pending_queries(checks)
logger.info(f"[{len(check_results)}] queries matched check conditions")
# Save successful checks in storage and remove duplicates
check_results = snowkill_storage.store_and_remove_duplicate(check_results)
logger.info(f"[{len(check_results)}] queries remained after store deduplication")
# Send notification for each new check result
for r in check_results:
response = send_slack_message(
slack_token=getenv("SLACK_TOKEN"),
slack_channel=getenv("SLACK_CHANNEL"),
message_blocks=snowkill_formatter.format(r),
)
if response["ok"]:
logger.info(f"Sent Slack notification for query [{r.query.query_id}]")
else:
logger.warning(f"Failed to send Slack notification for query [{r.query.query_id}], error: [{response['error']}]")
7) Set config options using environment variables
Replace example config options with values relevant for your Snowflake and Slack accounts. Execute commands in terminal.
Snowsight base URL, copy-pasted from browser window. It usually consists of http://app.snowflake.com/, followed by organisation name and account name. But it might be different for old accounts and accounts with Private Link.