Logging
Utilities around logging data which may or may not contain private content.
CompliantLogger
Subclass of the default logging class with an explicit category
parameter
on all logging methods. It will pass an extra
param with prefix
key
(value depending on whether category
is public or private) to the
handlers.
The default value for data category
is PRIVATE
for all methods.
Implementation is inspired by: https://github.com/python/cpython/blob/3.8/Lib/logging/init.py
metric(self, value, step=None, name=None, description=None, max_rows=250, category=<DataCategory.PRIVATE: 1>)
Converts most datatypes into a metric and logs them to AML Metric for the RunContext (if available) or directly to log
Note: Private Data will not be send to metrics!
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value to log (can be vaex/pandas/spark dataframe, numpy array, list, dict, int/float) |
required |
step |
str | int |
Step value used for single value items. Defaults to None. |
None |
name |
str |
Name under which the metric should be logged. Defaults to None. |
None |
description |
str |
Description for the metric provided to the run context. Defaults to None. |
None |
max_rows |
int |
Defines the number of rows to batch table metrics (only required for table metrics). Defaults to 250. |
250 |
category |
DataCategory |
Category of the data (logging to AML requires this to be set to PUBLIC explicitly). Defaults to DataCategory.PRIVATE. |
<DataCategory.PRIVATE: 1> |
Source code in shrike/compliant_logging/logging.py
def metric(
self,
value,
step=None,
name=None,
description=None,
max_rows=250,
category=DataCategory.PRIVATE,
):
"""
Converts most datatypes into a metric and logs them to AML Metric
for the RunContext (if available) or directly to log
Note: Private Data will not be send to metrics!
Args:
value (Any): The value to log
(can be vaex/pandas/spark dataframe, numpy array, list, dict, int/float)
step (str | int, optional): Step value used for single value items.
Defaults to None.
name (str, optional): Name under which the metric should be logged.
Defaults to None.
description (str, optional): Description for the metric provided
to the run context. Defaults to None.
max_rows (int, optional): Defines the number of rows to batch table metrics
(only required for table metrics).
Defaults to 250.
category (DataCategory, optional): Category of the data
(logging to AML requires this to be set to PUBLIC explicitly).
Defaults to DataCategory.PRIVATE.
"""
# check for name
if name is None:
name = f"metric_{self.metric_count}"
self.metric_count += 1
# check for description
if description is None:
description = ""
# retrieve AML Context
run = self._get_aml_context()
# check if value provided
if value is None:
self.error(
f"Value provided for metric {name} is None, skipping (step: {step})"
)
return
# check different data-types
if isinstance(value, (float, int)):
# log the data
if run is not None and category == DataCategory.PUBLIC:
if step:
run.log(name=name, value=value, description=description, step=step)
else:
run.log(name=name, value=value, description=description)
else:
self.info(
f"NumbericMetric | {name}:{step} | {value}",
category=category,
)
return
# collect dataframes
if is_vaex_dataframe(value):
value = collect_vaex_dataframe(value)
elif is_spark_dataframe(value):
value = collect_spark_dataframe(value)
elif is_pandas_dataframe(value):
value = collect_pandas_dataframe(value)
# log dictionary data
if isinstance(value, dict):
# check if values are present
if len(value) == 0:
self.warning(f"Dictionary Value for Metric {name} is empty. Skipping.")
return
# check the value types of the dict
type_set = list(set([type(v) for v in value.values()]))
# check for mixed types
if len(type_set) > 1:
pass
else:
type_set = type_set[0]
# check types
if type_set == list:
if run is not None and category == DataCategory.PUBLIC:
run.log_table(name, value, description)
else:
# log the matrix manually
col_names = " | ".join(
[f"{('' if col is None else col):15}" for col in value.keys()]
)
header = f"TableMetric | Index | {col_names} |"
self.info(f"TableMetric | {name}", category=category)
self.info(header, category=category)
self.info("-" * len(header), category=category)
# generate the rows
max_rows = max([len(value[col]) for col in value])
for i in range(max_rows):
row_str = f"TableMetric | {i:05}"
for key in value:
col = value[key]
col = col[i] if i < len(col) and col[i] else ""
row_str += f" | {str(col):15}"
self.info(row_str, category=category)
elif type_set in [int, float]:
for key, val in value.items():
key = name + "/" + key
self.metric(val, step, key, description, category)
else:
self.warning(
(
"The provided dictionary for metric"
f" {name} appears to be unstructured!"
),
category=category,
)
return
# collect list wise datatypes
if is_numpy_array(value):
value = numpy_array_to_list(value)
if is_pandas_series(value):
value = pandas_series_to_list(value)
# log list data
if isinstance(value, (list, tuple)):
value = list(value)
# check if values are present
if len(value) == 0:
self.warning(f"List Value for Metric {name} is empty. Skipping.")
return
# log data to run context
if run is not None and category == DataCategory.PUBLIC:
run.log_list(name=name, value=value, description=description)
else:
self.info(f"ListMetric | {name} | {value}")
return
self.warning(
f"Value {value} of the provided metric {name} has an unkown type",
category=category,
)
metric_accuracy_table(self, name, value, description=None, col_predict=None, col_target=None, probability_thresholds=5, percentile_thresholds=[0.0, 0.01, 0.24, 0.98, 1.0], class_labels=None, category=<DataCategory.PRIVATE: 1>)
Equivalent of the Run.log_accuracy_table
function.
Logs the data for an accuracy table to the metrics.
In the dataframe case, the col_predict
value has to contain the prediction
probabilities for the target class!
Note: Private Data will not be send to metrics!
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
dict | table |
Either dicationary in AML defined format or table that provides accuracy values. |
required |
name |
str |
Name of the metric. Defaults to None. |
required |
description |
str |
Description of the metric. Defaults to None. |
None |
col_predict |
str | int |
Name or Id of the predicted probabilities for the target class. This is only required if DataFrame is passed. Defaults to None. |
None |
col_target |
str | int |
Name or id of the target value column. This is only required if DataFrame is passed. Defaults to None. |
None |
probability_thresholds |
list | int |
Either a list of thresholds or a number of evenly spaced threshold points. Defaults to 5. |
5 |
percentile_thresholds |
list | int |
Either a list of thresholds or a number of evenly spaced threshold points. Defaults to a list. |
[0.0, 0.01, 0.24, 0.98, 1.0] |
category |
DataCategory |
Classification of the data category. Defaults to DataCategory.PRIVATE. |
<DataCategory.PRIVATE: 1> |
Source code in shrike/compliant_logging/logging.py
def metric_accuracy_table(
self,
name,
value,
description=None,
col_predict=None,
col_target=None,
probability_thresholds=5,
percentile_thresholds=[0.0, 0.01, 0.24, 0.98, 1.0],
class_labels=None,
category=DataCategory.PRIVATE,
):
"""
Equivalent of the `Run.log_accuracy_table` function.
Logs the data for an accuracy table to the metrics.
In the dataframe case, the `col_predict` value has to contain the prediction
probabilities for the **target** class!
Note: Private Data will not be send to metrics!
Args:
value (dict | table): Either dicationary in AML defined format
or table that provides accuracy values.
name (str, optional): Name of the metric. Defaults to None.
description (str, optional): Description of the metric. Defaults to None.
col_predict (str | int, optional): Name or Id of the predicted probabilities
for the target class. This is only required if DataFrame is passed.
Defaults to None.
col_target (str | int, optional): Name or id of the target value column.
This is only required if DataFrame is passed. Defaults to None.
probability_thresholds (list | int, optional): Either a list of thresholds
or a number of evenly spaced threshold points. Defaults to 5.
percentile_thresholds (list | int, optional): Either a list of thresholds
or a number of evenly spaced threshold points. Defaults to a list.
category (DataCategory, optional): Classification of the data category.
Defaults to DataCategory.PRIVATE.
"""
# retrieve the context
run = self._get_aml_context()
# convert data if not already pre-computed
if not isinstance(value, dict) or "schema_type" not in value:
# check the data
if is_vaex_dataframe(value):
value = collect_vaex_dataframe(value)
if is_spark_dataframe(value):
value = collect_spark_dataframe(value)
if is_pandas_dataframe(value):
value = collect_pandas_dataframe(value)
# check if datatype matches
if not isinstance(value, dict):
raise PublicRuntimeError("Unkown value-type passed to accuracy_table!")
# convert the data
try:
import pandas as pd
# create the dataframe
df = pd.DataFrame.from_dict(value)
# column checks
if None in [col_predict, col_target]:
raise PublicRuntimeError(
"If table is passed to accuracy_table it requires all "
+ "columns to be present!"
)
# check the class list (sort to make sure it is aligned)
class_list = list(df[col_target].unique())
class_list.sort()
if class_labels is None:
class_labels = class_list
# compute ranges
if isinstance(probability_thresholds, int):
probability_thresholds = floating_range(probability_thresholds)
if isinstance(percentile_thresholds, int):
percentile_thresholds = floating_range(percentile_thresholds)
# compute one-vs-rest labels for the class
prob_tables = []
perc_tables = []
for cl in class_list:
# compute the thresholds
prob_tables.append(
self._compute_truth_matrix(
df[col_predict], df[col_target], cl, probability_thresholds
)
)
# compute per class percentiles
cl_proba = (df[col_predict] * (df[col_target] == cl)) + (
(1 - df[col_predict]) * (df[col_target] != cl)
)
cl_percentile = list(cl_proba.quantile(percentile_thresholds))
perc_tables.append(
self._compute_truth_matrix(
df[col_predict], df[col_target], cl, cl_percentile
)
)
# generate data
value = {
"schema_type": "accuracy_table",
"schema_version": "1.0.1",
"data": {
"probability_tables": prob_tables,
"precentile_tables": perc_tables,
"probability_thresholds": probability_thresholds,
"percentile_thresholds": percentile_thresholds,
"class_labels": class_labels,
},
}
except Exception:
raise PublicRuntimeError(
"Unable to import pandas and parse the given data table! "
+ "Make sure that libraries are available and "
+ "correct data is passed."
)
# log the data
if category == DataCategory.PUBLIC and run is not None:
run.log_accuracy_table(name, value, description)
else:
self.warning("Logging Accuracy Tables to text is not yet implemented")
metric_confusion_matrix(self, name, value, idx_true=None, idx_pred=None, labels=None, description=None, category=<DataCategory.PRIVATE: 1>)
Equivalent of the Run.log_confusion_matrix
function.
Logs or generates a confusion matrix to the AML logs.
Note: Private Data will not be send to metrics!
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
dict | DataFrame |
Data to be used for the confusion_matrix |
required |
idx_true |
int | str |
Name or id of the target column. Defaults to None. |
None |
idx_pred |
int | str |
Name or id of the prediction column. Defaults to None. |
None |
labels |
list |
List of labels used for the rows. Defaults to None. |
None |
name |
str |
Name of the metric. Defaults to None. |
required |
description |
str |
Description of the metric. Defaults to None. |
None |
category |
DataCategory |
Classification of the data. Defaults to DataCategory.PRIVATE. |
<DataCategory.PRIVATE: 1> |
Exceptions:
Type | Description |
---|---|
PublicRuntimeError |
[description] |
PublicRuntimeError |
[description] |
Source code in shrike/compliant_logging/logging.py
def metric_confusion_matrix(
self,
name,
value,
idx_true=None,
idx_pred=None,
labels=None,
description=None,
category=DataCategory.PRIVATE,
):
"""
Equivalent of the `Run.log_confusion_matrix` function.
Logs or generates a confusion matrix to the AML logs.
Note: Private Data will not be send to metrics!
Args:
value (dict | DataFrame): Data to be used for the confusion_matrix
idx_true (int | str, optional): Name or id of the target column.
Defaults to None.
idx_pred (int | str, optional): Name or id of the prediction column.
Defaults to None.
labels (list, optional): List of labels used for the rows. Defaults to None.
name (str, optional): Name of the metric. Defaults to None.
description (str, optional): Description of the metric. Defaults to None.
category (DataCategory, optional): Classification of the data.
Defaults to DataCategory.PRIVATE.
Raises:
PublicRuntimeError: [description]
PublicRuntimeError: [description]
"""
# retrieve the context
run = self._get_aml_context()
# convert data if not already pre-computed
if (
not isinstance(value, dict)
or "schema_type" not in value
or "schema_version" not in value
):
# check the data
if is_vaex_dataframe(value):
value = collect_vaex_dataframe(value)
if is_spark_dataframe(value):
value = collect_spark_dataframe(value)
if is_pandas_dataframe(value):
value = collect_pandas_dataframe(value)
# check if datatype matches
if not isinstance(value, dict):
raise PublicRuntimeError(
"Unkown value-type passed to Run.log_confusion_matrix!"
)
# convert the data
try:
# try to import libs
import numpy as np
from sklearn.metrics import confusion_matrix
# update row names
if isinstance(idx_true, str):
idx_true = list(value.keys()).index(idx_true)
if isinstance(idx_pred, str):
idx_pred = list(value.keys()).index(idx_pred)
# retrieve left right
val_true, val_pred = None, None
value = np.array(list(value.values()))
val_true = value[idx_true]
val_pred = value[idx_pred]
# compute matrix
mat = confusion_matrix(val_true, val_pred)
# generate labels as distincts
if labels is None:
labels = np.unique(val_true)
# generate the dict
value = {
"schema_type": "confusion_matrix",
"schema_version": "1.0.0",
"data": {"class_labels": labels, "matrix": mat},
}
except Exception:
raise PublicRuntimeError(
"Unable to import numpy & scikit and parse the given data table! "
+ "Make sure that libraries are available and correct "
+ "data is passed."
)
# log the data
if category == DataCategory.PUBLIC and run is not None:
run.log_confusion_matrix(name, value, description)
else:
self.warning("Logging Confusion Matrices to text is not yet implemented")
metric_image(self, name=None, plot=None, path=None, description=None, category=<DataCategory.PRIVATE: 1>)
Logs an image to the AML Metrics. Note that this is only possible for public data when AML Run context is available
Note: Private Data will not be send to metrics!
Parameters:
Name | Type | Description | Default |
---|---|---|---|
plot |
pyplot.Plot |
The plot that should be logger |
None |
path |
str |
Optional Path to the image. Defaults to None. |
None |
name |
str |
Name of the image. Defaults to None. |
None |
description |
str |
Description of the metric. Defaults to None. |
None |
category |
DataCategory |
Category under which this image is logged Defaults to DataCategory.PRIVATE. |
<DataCategory.PRIVATE: 1> |
Source code in shrike/compliant_logging/logging.py
def metric_image(
self,
name=None,
plot=None,
path=None,
description=None,
category=DataCategory.PRIVATE,
):
"""
Logs an image to the AML Metrics.
Note that this is only possible for public data when
AML Run context is available
Note: Private Data will not be send to metrics!
Args:
plot (pyplot.Plot): The plot that should be logger
path (str, optional): Optional Path to the image. Defaults to None.
name (str, optional): Name of the image. Defaults to None.
description (str, optional): Description of the metric. Defaults to None.
category (DataCategory, optional): Category under which this image is logged
Defaults to DataCategory.PRIVATE.
"""
# retrieve the run context
run = self._get_aml_context()
# check if parameters are correct
if category != DataCategory.PUBLIC:
self.warning(f"Unable to log image metric {name} as private, skipping.")
return
# check for name
if name is None:
name = f"metric_{self.metric_count}"
self.metric_count += 1
if description is None:
description = ""
# log the image
run.log_image( # type: ignore
name=name, path=path, plot=plot, description=description
)
metric_list(self, name, value, description=None, category=<DataCategory.PRIVATE: 1>)
Equivalent to the Run.log_list
.
Logs a list of values for a single metric.
Note: Private Data will not be send to metrics!
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
list |
List values to log |
required |
name |
str |
Name of the metric. Defaults to None. |
required |
description |
str |
Description of the metric. Defaults to None. |
None |
category |
DataCategory |
DataCategory to log the data as. Defaults to DataCategory.PRIVATE. |
<DataCategory.PRIVATE: 1> |
Source code in shrike/compliant_logging/logging.py
def metric_list(self, name, value, description=None, category=DataCategory.PRIVATE):
"""
Equivalent to the `Run.log_list`.
Logs a list of values for a single metric.
Note: Private Data will not be send to metrics!
Args:
value (list): List values to log
name (str, optional): Name of the metric. Defaults to None.
description (str, optional): Description of the metric. Defaults to None.
category (DataCategory, optional): DataCategory to log the data as.
Defaults to DataCategory.PRIVATE.
"""
self.metric(value, name=name, description=description, category=category)
metric_predictions(self, name, value, description=None, col_predict=None, col_target=None, bin_edges=5, category=<DataCategory.PRIVATE: 1>)
Equivalent of Run.log_predictions
function.
This will log regression prediction histogram from dict or dataframe.
Note: Private Data will not be send to metrics!
For the dataframe case the prediction error is computed as the absolute difference between prediction and target.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the metric |
required |
value |
dict | DataFrame |
The data to log |
required |
description |
str |
Description of the metric. Defaults to ''. |
None |
col_predict |
str | int |
Id or Name of the target column. Defaults to None. |
None |
bin_edges |
list |
List of edge boundaries for logging. Defaults to None. |
5 |
category |
DataCategory |
Privacy Classification of the data. Defaults to DataCategory.PRIVATE. |
<DataCategory.PRIVATE: 1> |
Exceptions:
Type | Description |
---|---|
PublicRuntimeError |
If the data is not in the right format or required parameters are not passed. |
Source code in shrike/compliant_logging/logging.py
def metric_predictions(
self,
name,
value,
description=None,
col_predict=None,
col_target=None,
bin_edges=5,
category=DataCategory.PRIVATE,
):
"""
Equivalent of `Run.log_predictions` function.
This will log regression prediction histogram from dict or dataframe.
Note: Private Data will not be send to metrics!
For the dataframe case the prediction error is computed as the absolute
difference between prediction and target.
Args:
name (str): Name of the metric
value (dict | DataFrame): The data to log
description (str, optional): Description of the metric. Defaults to ''.
col_predict (str | int, optional): Id or Name of the target column.
Defaults to None.
bin_edges (list, optional): List of edge boundaries for logging.
Defaults to None.
category (DataCategory, optional): Privacy Classification of the data.
Defaults to DataCategory.PRIVATE.
Raises:
PublicRuntimeError: If the data is not in the right format or required
parameters are not passed.
"""
# retrieve the context
run = self._get_aml_context()
# convert data if not already pre-computed
if (
not isinstance(value, dict)
or "schema_type" not in value
or "schema_version" not in value
):
# check the data
if is_vaex_dataframe(value):
value = collect_vaex_dataframe(value)
if is_spark_dataframe(value):
value = collect_spark_dataframe(value)
if is_pandas_dataframe(value):
value = collect_pandas_dataframe(value)
# check if datatype matches
if not isinstance(value, dict):
raise PublicRuntimeError("Unkown value-type passed to predictions!")
# convert the data
try:
import pandas as pd
# create the dataframe
df = pd.DataFrame.from_dict(value)
# column checks
if None in [col_predict, col_target]:
raise PublicRuntimeError(
"The col_predict and col_target columns are both required."
)
# compute edges automatically
if isinstance(bin_edges, int):
bin_edges = floating_range(bin_edges)
# compute groupings in bins
df["bin"] = pd.cut(df[col_target], bin_edges)
df["error"] = (df[col_predict] - df[col_target]).abs()
# generate data
value = {
"schema_type": "predictions",
"schema_version": "1.0.0",
"data": {
"bin_averages": list(df.groupby("bin")[col_target].mean()),
"bin_errors": list(df.groupby("bin")["error"].sum()),
"bin_counts": list(df.groupby("bin")[col_target].count()),
"bin_edges": bin_edges,
},
}
except Exception:
raise PublicRuntimeError(
"Unable to import pandas and parse the given data! "
+ "Make sure that libraries are available and correct "
+ "data is passed."
)
# log the data
if category == DataCategory.PUBLIC and run is not None:
run.log_predictions(name, value, description)
else:
self.warning("Logging Predictions to text is not yet implemented")
metric_residual(self, name, value, description=None, col_predict=None, col_target=None, bin_edges=5, category=<DataCategory.PRIVATE: 1>)
Equivalent on the Run.log_residuals
functions.
Logs residual values for a list of edges
Note: Private Data will not be send to metrics!
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the metric |
required |
value |
dict | DataFrame |
Values to contain the residuals |
required |
description |
str |
Description of the dataframe. Defaults to ''. |
None |
col_target |
str |
Name of the target column (if value is a df). Defaults to None. |
None |
bin_edges |
list | int |
List of edges towards the bins. Defaults to 5. |
5 |
category |
DataCategory |
Privacy Classification of the data. Defaults to DataCategory.PRIVATE. |
<DataCategory.PRIVATE: 1> |
Exceptions:
Type | Description |
---|---|
PublicRuntimeError |
Thrown when data is in unkown format or required params not provided |
Source code in shrike/compliant_logging/logging.py
def metric_residual(
self,
name,
value,
description=None,
col_predict=None,
col_target=None,
bin_edges=5,
category=DataCategory.PRIVATE,
):
"""
Equivalent on the `Run.log_residuals` functions.
Logs residual values for a list of edges
Note: Private Data will not be send to metrics!
Args:
name (str): Name of the metric
value (dict | DataFrame): Values to contain the residuals
description (str, optional): Description of the dataframe. Defaults to ''.
col_target (str, optional): Name of the target column (if value is a df).
Defaults to None.
bin_edges (list | int, optional): List of edges towards the bins.
Defaults to 5.
category (DataCategory, optional): Privacy Classification of the data.
Defaults to DataCategory.PRIVATE.
Raises:
PublicRuntimeError: Thrown when data is in unkown format or required params
not provided
"""
# retrieve the context
run = self._get_aml_context()
# convert data if not already pre-computed
if (
not isinstance(value, dict)
or "schema_type" not in value
or "schema_version" not in value
):
# check the data
if is_vaex_dataframe(value):
value = collect_vaex_dataframe(value)
if is_spark_dataframe(value):
value = collect_spark_dataframe(value)
if is_pandas_dataframe(value):
value = collect_pandas_dataframe(value)
# check if datatype matches
if not isinstance(value, dict):
raise PublicRuntimeError(
"Unkown value-type passed to Run.log_residuals()!"
)
# convert the data
try:
import pandas as pd
# create the dataframe
df = pd.DataFrame.from_dict(value)
# column checks
if None in [col_predict, col_target]:
raise PublicRuntimeError(
"The col_predict and col_target columns are both required."
)
# check if bins should be generated automatically
if isinstance(bin_edges, int):
bin_edges = floating_range(bin_edges)
# compute the values
df["residual"] = df[col_predict] - df[col_target]
df["bin"] = pd.cut(df[col_target], bin_edges)
# generate data
value = {
"schema_type": "residuals",
"schema_version": "1.0.0",
"data": {
"bin_edges": bin_edges,
"bin_counts": list(df.groupby("bin")["residual"].sum()),
},
}
except Exception:
raise PublicRuntimeError(
"Unable to import pandas and parse the given data! "
+ "Make sure that libraries are available and correct "
+ "data is passed."
)
# log the data
if category == DataCategory.PUBLIC and run is not None:
run.log_residuals(name, value, description)
else:
self.warning("Logging Residuals to text is not yet implemented")
metric_row(self, name, description=None, category=<DataCategory.PRIVATE: 1>, **kwargs)
Equivalent of the Run.log_row
function.
Logs a single row of a table to the metrics.
Note: Private Data will not be send to metrics!
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the metric. |
required |
description |
str |
Description of the metric. |
None |
category |
DataCategory |
Classification of the data. Defaults to DataCategory.PRIVATE. |
<DataCategory.PRIVATE: 1> |
Source code in shrike/compliant_logging/logging.py
def metric_row(
self, name, description=None, category=DataCategory.PRIVATE, **kwargs
):
"""
Equivalent of the `Run.log_row` function.
Logs a single row of a table to the metrics.
Note: Private Data will not be send to metrics!
Args:
name (str): Name of the metric.
description (str): Description of the metric.
category (DataCategory, optional): Classification of the data.
Defaults to DataCategory.PRIVATE.
"""
# check run context
run = self._get_aml_context()
# log the data
if category == DataCategory.PUBLIC and run is not None:
run.log_row(name=name, description=description, **kwargs)
else:
row_str = f"RowMetric | {name} | "
row_str += " | ".join([f"{r}:{c}" for r, c in kwargs.items()])
self.info(row_str, category=category)
metric_table(self, name, value, description=None, category=<DataCategory.PRIVATE: 1>)
Equivalent to the Run.log_table
function.
Logs a table in dict format {rows: [values]} to metrics.
Note: Private Data will not be send to metrics!
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the metric. |
required |
value |
dict |
Dictionary representation of the table. |
required |
description |
str |
Description of the metric. Defaults to None. |
None |
category |
DataCategory |
Category to log the data. Default to DataCategory.PRIVATE. |
<DataCategory.PRIVATE: 1> |
Source code in shrike/compliant_logging/logging.py
def metric_table(
self, name, value, description=None, category=DataCategory.PRIVATE
):
"""
Equivalent to the `Run.log_table` function.
Logs a table in dict format {rows: [values]} to metrics.
Note: Private Data will not be send to metrics!
Args:
name (str): Name of the metric.
value (dict): Dictionary representation of the table.
description (str, optional): Description of the metric. Defaults to None.
category (DataCategory, optional): Category to log the data.
Default to DataCategory.PRIVATE.
"""
self.metric(value=value, name=name, description=description, category=category)
metric_value(self, name, value, description=None, step=None, category=<DataCategory.PRIVATE: 1>)
Equivalent to the Run.log
function.
Logs a single value to a metric
Note: Private Data will not be send to metrics!
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of the metric |
required |
value |
Any |
value to log |
required |
description |
str |
Description of the metric. Defaults to None. |
None |
step |
int |
Step of the current metric. Defaults to None. |
None |
category |
DataCategory |
Data category to make sure no data leaks. Defaults to DataCategory.PRIVATE. |
<DataCategory.PRIVATE: 1> |
Source code in shrike/compliant_logging/logging.py
def metric_value(
self, name, value, description=None, step=None, category=DataCategory.PRIVATE
):
"""
Equivalent to the `Run.log` function.
Logs a single value to a metric
Note: Private Data will not be send to metrics!
Args:
name (str): name of the metric
value (Any): value to log
description (str, optional): Description of the metric. Defaults to None.
step (int, optional): Step of the current metric. Defaults to None.
category (DataCategory, optional): Data category to make sure no data leaks.
Defaults to DataCategory.PRIVATE.
"""
self.metric(value, step, name, description, category=category)
enable_compliant_logging(prefix='SystemLog:', use_aml_metrics=False, **kwargs)
The default format is logging.BASIC_FORMAT
(%(levelname)s:%(name)s:%(message)s
).
All other kwargs are passed to logging.basicConfig
. Sets the default
logger class and root logger to be compliant. This means the format
string %(prefix)
will work.
Set the format using the format
kwarg.
If running in Python >= 3.8, will attempt to add force=True
to the kwargs
for logging.basicConfig.
After calling this method, use the kwarg category
to pass in a value of
DataCategory
to denote data category. The default is PRIVATE
. That is,
if no changes are made to an existing set of log statements, the log output
should be the same.
The standard implementation of the logging API is a good reference: https://github.com/python/cpython/blob/3.9/Lib/logging/init.py
Source code in shrike/compliant_logging/logging.py
def enable_compliant_logging(
prefix: str = "SystemLog:", use_aml_metrics: bool = False, **kwargs
) -> None:
"""
The default format is `logging.BASIC_FORMAT` (`%(levelname)s:%(name)s:%(message)s`).
All other kwargs are passed to `logging.basicConfig`. Sets the default
logger class and root logger to be compliant. This means the format
string `%(prefix)` will work.
Set the format using the `format` kwarg.
If running in Python >= 3.8, will attempt to add `force=True` to the kwargs
for logging.basicConfig.
After calling this method, use the kwarg `category` to pass in a value of
`DataCategory` to denote data category. The default is `PRIVATE`. That is,
if no changes are made to an existing set of log statements, the log output
should be the same.
The standard implementation of the logging API is a good reference:
https://github.com/python/cpython/blob/3.9/Lib/logging/__init__.py
"""
set_prefix(prefix)
if "format" not in kwargs:
kwargs["format"] = f"%(prefix)s{logging.BASIC_FORMAT}"
# Ensure that all loggers created via `logging.getLogger` are instances of
# the `CompliantLogger` class.
logging.setLoggerClass(CompliantLogger)
if len(logging.root.handlers) > 0:
p = get_prefix()
for line in _logging_basic_config_set_warning.splitlines():
print(f"{p}{line}", file=sys.stderr)
if "force" not in kwargs and sys.version_info >= (3, 8):
kwargs["force"] = True
old_root = logging.root
root = CompliantLogger(logging.root.name, use_aml_metrics)
root.handlers = old_root.handlers
logging.root = root
logging.Logger.root = root # type: ignore
logging.Logger.manager = logging.Manager(root) # type: ignore
# https://github.com/kivy/kivy/issues/6733
logging.basicConfig(**kwargs)
enable_confidential_logging(prefix='SystemLog:', use_aml_metrics=False, **kwargs)
This function is a duplicate of the function enable_compliant_logging
.
We encourage users to use enable_compliant_logging
.
Source code in shrike/compliant_logging/logging.py
def enable_confidential_logging(
prefix: str = "SystemLog:", use_aml_metrics: bool = False, **kwargs
) -> None:
"""
This function is a duplicate of the function `enable_compliant_logging`.
We encourage users to use `enable_compliant_logging`.
"""
print(
f"{prefix} The function enable_confidential_logging() is on the way"
" to deprecation. Please use enable_compliant_logging() instead.",
file=sys.stderr,
)
enable_compliant_logging(prefix, use_aml_metrics, **kwargs)
floating_range(buckets)
Computes a equal distributed list of bucket thresholds
Parameters:
Name | Type | Description | Default |
---|---|---|---|
buckets |
int |
Number of buckets |
required |
Returns:
Type | Description |
---|---|
List |
List of bucket thresholds of length buckets |
Source code in shrike/compliant_logging/logging.py
def floating_range(buckets):
"""
Computes a equal distributed list of bucket thresholds
Args:
buckets (int): Number of buckets
Returns:
List: List of bucket thresholds of length buckets
"""
return [x / 100 for x in list(range(0, 100, int(100 / (buckets - 1)))) + [100]]
get_aml_context()
Obtains the AML Context
Source code in shrike/compliant_logging/logging.py
def get_aml_context():
"""
Obtains the AML Context
"""
return _AML_RUN
get_prefix()
Obtain the current global prefix to use when logging public (non-private) data.
Source code in shrike/compliant_logging/logging.py
def get_prefix() -> Optional[str]:
"""
Obtain the current global prefix to use when logging public (non-private)
data.
"""
return _PREFIX
is_eyesoff()
Returns a boolean of whether current workspace is eyes-off.
Source code in shrike/compliant_logging/logging.py
def is_eyesoff() -> bool:
"""
Returns a boolean of whether current workspace is eyes-off.
"""
tenant_id = os.environ.get("AZ_BATCHAI_CLUSTER_TENANT_ID", "")
subscription_id = os.environ.get("AZ_BATCHAI_CLUSTER_SUBSCRIPTION_ID", "")
return is_eyesoff_helper(tenant_id, subscription_id)
set_aml_context()
Retrieves the AML Context, should be bundled in a try-catch.
Source code in shrike/compliant_logging/logging.py
def set_aml_context() -> None:
"""
Retrieves the AML Context, should be bundled in a try-catch.
"""
global _AML_RUN
from azureml.core.run import Run
_AML_RUN = Run.get_context()
set_prefix(prefix)
Set the global prefix to use when logging public (non-private) data.
This method is thread-safe.
Source code in shrike/compliant_logging/logging.py
def set_prefix(prefix: str) -> None:
"""
Set the global prefix to use when logging public (non-private) data.
This method is thread-safe.
"""
with _LOCK:
global _PREFIX
_PREFIX = prefix