In order to create a custom condition, please implement one of abstract classes derived from AbstractQueryCondition. Specific class choice depends on query status: RUNNING, BLOCKED or QUEUED.
Each condition has a number of default parameters which are always available and described here. On top of that, you may add custom parameters by overloading method __init__ in child class. For example:
from snowkill import AbstractRunningQueryConditionclassEstimatedScanDurationCondition(AbstractRunningQueryCondition):def__init__(self,*, min_estimated_scan_duration:int,**kwargs):super().__init__(**kwargs) self.min_estimated_scan_duration = min_estimated_scan_duration
Make sure to keep **kwargs argument and super().__init(**kwargs) call, this is important.
You should also implement abstract method check_custom_logic. Its signature it different depending on query status, but it always returns optional tuple with 2 elements:
It should return None if query did not match any conditions.
1) RUNNING queries
Implement class AbstractRunningQueryCondition.
Implement method check_custom_logic, which accepts two arguments:
query (Query) - meta-information about running query
query_plan (QueryPlan) - query plan of running query
This is a complete example implementing Storage Spilling condition for running queries:
from snowkill import AbstractRunningQueryCondition, Query, QueryPlan, CheckResultLevelclassStorageSpillingCondition(AbstractRunningQueryCondition):def__init__(self,*, min_local_spilling_gb:float,min_remote_spilling_gb:float,**kwargs):super().__init__(**kwargs) self.min_local_spilling_gb = min_local_spilling_gb self.min_remote_spilling_gb = min_remote_spilling_gbdefcheck_custom_logic(self,query: Query,query_plan: QueryPlan): running_step = query_plan.get_running_step()ifnot running_step.statistics_spilling:returnNone local_spilling_gb =0 remote_spilling_gb =0if"Bytes spilled to local storage"in running_step.statistics_spilling: local_spilling_gb = running_step.statistics_spilling["Bytes spilled to local storage"].value / 1024 / 1024 / 1024
if"Bytes spilled to remote storage"in running_step.statistics_spilling: remote_spilling_gb = running_step.statistics_spilling["Bytes spilled to remote storage"].value / 1024 / 1024 / 1024
if remote_spilling_gb > self.min_remote_spilling_gb: description =f"Query spilled at least [{remote_spilling_gb:.1f}] Gb to remote storage"elif local_spilling_gb > self.min_local_spilling_gb: description =f"Query spilled at least [{local_spilling_gb:.1f}] Gb to local storage"else:returnNoneif self.kill_duration and query.execute_duration >= self.kill_duration:return CheckResultLevel.KILL, descriptionif self.warning_duration and query.execute_duration >= self.warning_duration:return CheckResultLevel.WARNING, descriptionif self.notice_duration and query.execute_duration >= self.notice_duration:return CheckResultLevel.NOTICE, description
2) BLOCKED queries
Implement class AbstractBlockedQueryCondition.
Implement method check_custom_logic, which accepts three arguments:
query - (Query) - meta-information about blocked query
holding_lock (HoldingLock) - information about lock, which is the reason why query was blocked
holding_query - (Query) - meta-information about query causing lock
Please note that holding_lock and holding_query are both optional and may not be available due to race conditions or due to inability of Snowflake to provide information about specific lock. Make sure to take this into account while writing the code.
This is a complete example implementing Blocked Duration condition for blocked queries:
from snowkill import AbstractBlockedQueryCondition, Query, CheckResultLevel, HoldingLockfrom typing import OptionalclassBlockedDurationCondition(AbstractBlockedQueryCondition):defcheck_custom_logic(self,waiting_query: Query,holding_lock: Optional[HoldingLock],holding_query: Optional[Query], ):if self.kill_duration and waiting_query.total_duration >= self.kill_duration:return ( CheckResultLevel.KILL,f"Query was blocked longer than [{self.kill_duration}] seconds", )if self.warning_duration and waiting_query.total_duration >= self.warning_duration:return ( CheckResultLevel.WARNING,f"Query was blocked longer than [{self.warning_duration}] seconds", )if self.notice_duration and waiting_query.total_duration >= self.notice_duration:return ( CheckResultLevel.NOTICE,f"Query was blocked longer than [{self.notice_duration}] seconds", )
3) QUEUED queries
Implement class AbstractQueuedQueryCondition.
Implement method check_custom_logic, which accepts one argument:
query - (Query) - meta-information about queued query
This is a complete example implementing Queued Duration condition for queued queries:
from snowkill import AbstractQueuedQueryCondition, Query, CheckResultLevelclassQueuedDurationCondition(AbstractQueuedQueryCondition):defcheck_custom_logic(self,query: Query):if self.kill_duration and query.queued_duration >= self.kill_duration:return CheckResultLevel.KILL,f"Query was queued longer than [{self.kill_duration}] seconds"if self.warning_duration and query.queued_duration >= self.warning_duration:return CheckResultLevel.WARNING,f"Query was queued longer than [{self.warning_duration}] seconds"if self.notice_duration and query.queued_duration >= self.notice_duration:return CheckResultLevel.NOTICE,f"Query was queued longer than [{self.notice_duration}] seconds"
Notes
Conditions can be executed in multiple threads in parallel. You should not rely on any global state outside of check_custom_logic method.
Debugging
You may use special debug methods dataclass_to_json_str(obj) and dataclass_to_dict_recursive(obj), which are exposed by snowkill package.
These methods are capable of converting SnowKill data structures to human-readable JSON string or to Python dictionary.