Canary helper
Canary helper code
get_repo_info()
[EXPERIMENTAL] Obtains info on the current repo the code is in.
Returns:
Type | Description |
---|---|
dict |
git meta data |
Source code in shrike/pipeline/canary_helper.py
def get_repo_info():
"""[EXPERIMENTAL] Obtains info on the current repo the code is in.
Returns:
dict: git meta data"""
try:
import git
repo = git.Repo(search_parent_directories=True)
branch = repo.active_branch
head = repo.head
return {
"git": repo.remotes.origin.url,
"branch": branch.name,
"commit": head.commit.hexsha,
"last_known_author": head.commit.author.name,
}
except:
return {"git": "n/a"}
test_pipeline_step_metrics(pipeline_run, expected_metrics)
Tests a pipeline run against a set of expected metrics.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_run |
PipelineRun |
the AzureML pipeline run |
required |
expected_metrics |
dict |
defines the tests to execute |
required |
Returns:
Type | Description |
---|---|
List |
errors collected during tests |
!!! notes example entries in expected_metrics
"SelectJsonField" : [{"row" : {"name" : "output", "key" : "size", "value" : 369559}}],
tests module "SelectJsonField" for a metric row named "output", checks key "size" must have value 369559
"tokenizerparallel" : [{"metric" : {"key" : "Failed Items", "value" : 0}}],
tests module "tokenizerparallel" for a metric named "Failed Items", value must be 0
Source code in shrike/pipeline/canary_helper.py
def test_pipeline_step_metrics(pipeline_run, expected_metrics):
"""Tests a pipeline run against a set of expected metrics.
Args:
pipeline_run (PipelineRun): the AzureML pipeline run
expected_metrics (dict): defines the tests to execute
Returns:
List: errors collected during tests
Notes:
example entries in expected_metrics
"SelectJsonField" : [{"row" : {"name" : "output", "key" : "size", "value" : 369559}}],
tests module "SelectJsonField" for a metric row named "output", checks key "size" must have value 369559
"tokenizerparallel" : [{"metric" : {"key" : "Failed Items", "value" : 0}}],
tests module "tokenizerparallel" for a metric named "Failed Items", value must be 0
"""
errors = []
log.info("Looping through PipelineRun steps to test metrics...")
for step in pipeline_run.get_steps():
log.info(f"Checking status of step {step.name}...")
observed_metrics = step.get_metrics()
log.info(f"Step Metrics: {observed_metrics}")
status = step.get_status()
if status != "Finished":
errors.append(f"Pipeline step {step.name} status is {status} != Finished")
if step.name in expected_metrics:
for expected_metric_test in expected_metrics[step.name]:
if "row" in expected_metric_test:
log.info(f"Checking metrics, looking for {expected_metric_test}")
row_key = expected_metric_test["row"]["name"]
metric_key = expected_metric_test["row"]["key"]
expected_value = expected_metric_test["row"]["value"]
if row_key not in observed_metrics:
errors.append(
f"Step {step.name} metric row '{row_key}' not available in observed metrics {observed_metrics}"
)
elif metric_key not in observed_metrics[row_key]:
errors.append(
f"Step {step.name} metric row '{row_key}' does not have a metric '{metric_key}' in observed metrics {observed_metrics[row_key]}"
)
elif observed_metrics[row_key][metric_key] != expected_value:
errors.append(
f"Step {step.name} metric row '{row_key}' - metric '{metric_key}' - does not have expected value {expected_value} in observed metrics {observed_metrics[row_key]}"
)
if "metric" in expected_metric_test:
log.info(f"Checking metrics, looking for {expected_metric_test}")
metric_key = expected_metric_test["metric"]["key"]
expected_value = expected_metric_test["metric"]["value"]
if metric_key not in observed_metrics:
errors.append(
f"Step {step.name} metric '{metric_key}' not available in observed metrics {observed_metrics}"
)
elif observed_metrics[metric_key] != expected_value:
errors.append(
f"Step {step.name} metric row '{metric_key}' does not have expected value {expected_value} in observed metrics {observed_metrics[metric_key]}"
)
return errors
test_pipeline_step_output(pipeline_run, step_name, output_name, **kwargs)
Verify a given pipeline output for some basic checks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_run |
PipelineRun |
the pipeline run |
required |
step_name |
str |
name of the step to check |
required |
output_name |
str |
name of the output to check |
required |
**kwargs |
|
Arbitrary keyword arguments defining the test |
{} |
!!! kwargs length (int) : to verify the length
Returns:
Type | Description |
---|---|
dict |
results |
Source code in shrike/pipeline/canary_helper.py
def test_pipeline_step_output(pipeline_run, step_name, output_name, **kwargs):
"""Verify a given pipeline output for some basic checks.
Args:
pipeline_run (PipelineRun): the pipeline run
step_name (str): name of the step to check
output_name (str): name of the output to check
**kwargs: Arbitrary keyword arguments defining the test
Kwargs:
length (int) : to verify the length
Returns:
dict: results
"""
pipeline_step = pipeline_run.find_step_run(step_name)
results = {"errors": []}
if not pipeline_step:
results[
"exception"
] = f"Could not find step {step_name} in pipeline {pipeline_run._run_id}."
return results
output_port = pipeline_step[0].get_output_data(output_name)
if not output_port:
results[
"exception"
] = f"Could not find output {output_name} in step {step_name} in pipeline {pipeline_run._run_id}."
return results
data_reference = output_port._data_reference
data_path = DataPath(
datastore=data_reference.datastore,
path_on_datastore=data_reference.path_on_datastore,
name=data_reference.data_reference_name,
)
if kwargs.get("length"):
expected_length = kwargs.get("length")
log.info(
f"Checking count={expected_length} of files for step {step_name} output {output_name}..."
)
data_set = Dataset.File.from_files(data_path)
files_list = data_set.to_path()
if expected_length < 0:
# test any length > 0
results["length"] = {"expected": ">0", "observed": len(files_list)}
if results["length"]["observed"] == 0:
message = """Length mismatch in output {output_name} in step {step_name} in pipeline {run_id}. Expected len {a} found {b}.""".format(
output_name=output_name,
step_name=step_name,
run_id=pipeline_run._run_id,
b=results["length"]["observed"],
a=results["length"]["expected"],
)
# logging.error(message)
results["errors"].append(message)
else:
results["length"] = {
"expected": expected_length,
"observed": len(files_list),
}
if results["length"]["observed"] != results["length"]["expected"]:
message = """Length mismatch in output {output_name} in step {step_name} in pipeline {run_id}. Expected len {a} found {b}.""".format(
output_name=output_name,
step_name=step_name,
run_id=pipeline_run._run_id,
b=results["length"]["observed"],
a=results["length"]["expected"],
)
# logging.error(message)
results["errors"].append(message)
return results