Custom conditions

In order to create a custom condition, please implement one of abstract classes derived from AbstractQueryCondition. Specific class choice depends on query status: RUNNING, BLOCKED or QUEUED.

Each condition has a number of default parameters which are always available and described here. On top of that, you may add custom parameters by overloading method __init__ in child class. For example:

from snowkill import AbstractRunningQueryCondition

class EstimatedScanDurationCondition(AbstractRunningQueryCondition):
    def __init__(self, *, min_estimated_scan_duration: int, **kwargs):
        super().__init__(**kwargs)

        self.min_estimated_scan_duration = min_estimated_scan_duration

Make sure to keep **kwargs argument and super().__init(**kwargs) call, this is important.

You should also implement abstract method check_custom_logic. Its signature it different depending on query status, but it always returns optional tuple with 2 elements:

  1. String description;

It should return None if query did not match any conditions.

1) RUNNING queries

Implement class AbstractRunningQueryCondition.

Implement method check_custom_logic, which accepts two arguments:

  • query (Query) - meta-information about running query

  • query_plan (QueryPlan) - query plan of running query

This is a complete example implementing Storage Spilling condition for running queries:

from snowkill import AbstractRunningQueryCondition, Query, QueryPlan, CheckResultLevel

class StorageSpillingCondition(AbstractRunningQueryCondition):
    def __init__(self, *, min_local_spilling_gb: float, min_remote_spilling_gb: float, **kwargs):
        super().__init__(**kwargs)

        self.min_local_spilling_gb = min_local_spilling_gb
        self.min_remote_spilling_gb = min_remote_spilling_gb

    def check_custom_logic(self, query: Query, query_plan: QueryPlan):
        running_step = query_plan.get_running_step()

        if not running_step.statistics_spilling:
            return None

        local_spilling_gb = 0
        remote_spilling_gb = 0

        if "Bytes spilled to local storage" in running_step.statistics_spilling:
            local_spilling_gb = running_step.statistics_spilling["Bytes spilled to local storage"].value / 1024 / 1024 / 1024

        if "Bytes spilled to remote storage" in running_step.statistics_spilling:
            remote_spilling_gb = running_step.statistics_spilling["Bytes spilled to remote storage"].value / 1024 / 1024 / 1024

        if remote_spilling_gb > self.min_remote_spilling_gb:
            description = f"Query spilled at least [{remote_spilling_gb:.1f}] Gb to remote storage"
        elif local_spilling_gb > self.min_local_spilling_gb:
            description = f"Query spilled at least [{local_spilling_gb:.1f}] Gb to local storage"
        else:
            return None

        if self.kill_duration and query.execute_duration >= self.kill_duration:
            return CheckResultLevel.KILL, description

        if self.warning_duration and query.execute_duration >= self.warning_duration:
            return CheckResultLevel.WARNING, description

        if self.notice_duration and query.execute_duration >= self.notice_duration:
            return CheckResultLevel.NOTICE, description

2) BLOCKED queries

Implement class AbstractBlockedQueryCondition.

Implement method check_custom_logic, which accepts three arguments:

  • query - (Query) - meta-information about blocked query

  • holding_lock (HoldingLock) - information about lock, which is the reason why query was blocked

  • holding_query - (Query) - meta-information about query causing lock

Please note that holding_lock and holding_query are both optional and may not be available due to race conditions or due to inability of Snowflake to provide information about specific lock. Make sure to take this into account while writing the code.

This is a complete example implementing Blocked Duration condition for blocked queries:

from snowkill import AbstractBlockedQueryCondition, Query, CheckResultLevel, HoldingLock
from typing import Optional


class BlockedDurationCondition(AbstractBlockedQueryCondition):
    def check_custom_logic(
        self,
        waiting_query: Query,
        holding_lock: Optional[HoldingLock],
        holding_query: Optional[Query],
    ):
        if self.kill_duration and waiting_query.total_duration >= self.kill_duration:
            return (
                CheckResultLevel.KILL,
                f"Query was blocked longer than [{self.kill_duration}] seconds",
            )

        if self.warning_duration and waiting_query.total_duration >= self.warning_duration:
            return (
                CheckResultLevel.WARNING,
                f"Query was blocked longer than [{self.warning_duration}] seconds",
            )

        if self.notice_duration and waiting_query.total_duration >= self.notice_duration:
            return (
                CheckResultLevel.NOTICE,
                f"Query was blocked longer than [{self.notice_duration}] seconds",
            )

3) QUEUED queries

Implement class AbstractQueuedQueryCondition.

Implement method check_custom_logic, which accepts one argument:

  • query - (Query) - meta-information about queued query

This is a complete example implementing Queued Duration condition for queued queries:

from snowkill import AbstractQueuedQueryCondition, Query, CheckResultLevel


class QueuedDurationCondition(AbstractQueuedQueryCondition):
    def check_custom_logic(self, query: Query):
        if self.kill_duration and query.queued_duration >= self.kill_duration:
            return CheckResultLevel.KILL, f"Query was queued longer than [{self.kill_duration}] seconds"

        if self.warning_duration and query.queued_duration >= self.warning_duration:
            return CheckResultLevel.WARNING, f"Query was queued longer than [{self.warning_duration}] seconds"

        if self.notice_duration and query.queued_duration >= self.notice_duration:
            return CheckResultLevel.NOTICE, f"Query was queued longer than [{self.notice_duration}] seconds"

Notes

  • Conditions can be executed in multiple threads in parallel. You should not rely on any global state outside of check_custom_logic method.

Debugging

You may use special debug methods dataclass_to_json_str(obj) and dataclass_to_dict_recursive(obj), which are exposed by snowkill package.

These methods are capable of converting SnowKill data structures to human-readable JSON string or to Python dictionary.

Last updated