Source code for fairops.mlops.models

import importlib
import time
from collections import defaultdict

import pandas as pd


# Check for MLflow and W&B availability
mlflow_available = importlib.util.find_spec("mlflow") is not None
wandb_available = importlib.util.find_spec("wandb") is not None

# Conditional imports
if mlflow_available:
    import mlflow
else:
    mlflow = None

if wandb_available:
    import wandb
else:
    wandb = None


[docs] class LoggedMetric: def __init__(self, key: str, value: float, step: int | None = None, timestamp: int | None = None, run_id: str | None = None, experiment_id: str | None = None): self.key = key self.value = value self.step = step self.timestamp = timestamp self.run_id = run_id self.experiment_id = experiment_id if self.experiment_id is None: self.experiment_id = mlflow.active_run().info.experiment_id self.timestamp = self.timestamp if self.timestamp is not None else int(time.time()) if self.run_id is None: self.run_id = mlflow.active_run().info.run_id def __repr__(self): return f"LoggedMetric(key={self.key}, value={self.value}, step={self.step}, timestamp={self.timestamp}, run_id={self.run_id}, experiment_id={self.experiment_id})"
[docs] def to_dict(self): """Convert to dictionary for easy logging/exporting.""" return { "run_id": self.run_id, "key": self.key, "value": self.value, "step": self.step, "timestamp": self.timestamp, "experiment_id": self.experiment_id }
[docs] class LoggedMetrics: def __init__(self): # Hierarchical storage: {experiment_id -> {run_id -> {key -> {step -> [LoggedMetric]}}}} self.metrics = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list))))
[docs] def add_metric(self, metric: LoggedMetric): """Store a new metric.""" self.metrics[metric.experiment_id][metric.run_id][metric.key][metric.step] = metric
[docs] def get_metrics(self, experiment_id: str | None = None, run_id: str | None = None, key: str | None = None, step: int | None = None): """Retrieve stored metrics for a given experiment_id, run_id, key, and/or step.""" if experiment_id and experiment_id in self.metrics: if run_id and run_id in self.metrics[experiment_id]: if key and key in self.metrics[experiment_id][run_id]: if step is not None: return self.metrics[experiment_id][run_id][key].get(step, []) return self.metrics[experiment_id][run_id][key] return self.metrics[experiment_id][run_id] return self.metrics[experiment_id] return self.metrics # Return all metrics if no filters applied
[docs] def to_dict(self): """Export logged metrics to a JSON file while preserving hierarchy.""" structured_data = [ { "experiment_id": experiment_id, "runs": [ { "run_id": run_id, "metrics": [ { "key": key, "steps": [ { "step": step, "value": metric.value, "timestamp": metric.timestamp } for step, metric in key_metrics.items() ] } for key, key_metrics in run_data.items() ] } for run_id, run_data in exp_data.items() ] } for experiment_id, exp_data in self.metrics.items() ] return structured_data
[docs] def to_dataframe(self): """Convert logged metrics into a Pandas DataFrame for analysis.""" all_metrics = [{ "experiment_id": experiment_id, "run_id": run_id, "key": key, "step": step, "value": metric.value, "timestamp": metric.timestamp } for experiment_id, exp_data in self.metrics.items() for run_id, run_data in exp_data.items() for key, key_metrics in run_data.items() for step, metric in key_metrics.items() ] return pd.DataFrame(all_metrics)
[docs] def aggregate(self, experiment_id: str, run_id: str, key: str, step: int, method="mean"): """Aggregate multiple values at the same step for a specific experiment and run.""" values = [m.value for m in self.get_metrics(experiment_id, run_id, key, step)] if not values: return None if method == "mean": return sum(values) / len(values) elif method == "max": return max(values) elif method == "min": return min(values) else: raise ValueError(f"Unsupported aggregation method: {method}")
[docs] class LoggedParam: def __init__(self, key: str, value: float, run_id: str | None = None, experiment_id: str | None = None): self.key = key self.value = value self.run_id = mlflow.active_run().info.run_id self.experiment_id = mlflow.active_run().info.experiment_id def __repr__(self): return f"LoggedParam(key={self.key}, value={self.value}, run_id={self.run_id}, experiment_id={self.experiment_id})"
[docs] def to_dict(self): """Convert to dictionary for easy logging/exporting.""" return { "run_id": self.run_id, "key": self.key, "value": self.value, "experiment_id": self.experiment_id }
[docs] class LoggedParams: def __init__(self): # Hierarchical storage: {experiment_id -> {run_id -> {key -> LoggedParam}}} self.params = defaultdict(lambda: defaultdict(dict))
[docs] def add_param(self, param: LoggedParam): """Store a new parameter.""" self.params[param.experiment_id][param.run_id][param.key] = param
[docs] def get_params(self, experiment_id: str | None = None, run_id: str | None = None, key: str | None = None): """Retrieve stored parameters for a given experiment_id, run_id, and/or key.""" if experiment_id and experiment_id in self.params: if run_id and run_id in self.params[experiment_id]: if key and key in self.params[experiment_id][run_id]: return self.params[experiment_id][run_id][key] return self.params[experiment_id][run_id] return self.params[experiment_id] return self.params # Return all parameters if no filters applied
[docs] def to_dict(self): """Export logged parameters to a JSON file while preserving hierarchy.""" structured_data = [ { "experiment_id": experiment_id, "runs": [ { "run_id": run_id, "parameters": [ {"key": key, "value": param.value} for key, param in run_data.items() ] } for run_id, run_data in exp_data.items() ] } for experiment_id, exp_data in self.params.items() ] return structured_data
[docs] def to_dataframe(self): """Convert logged parameters into a Pandas DataFrame for analysis.""" all_params = [ param.to_dict() for exp in self.params.values() for run in exp.values() for param in run.values() ] return pd.DataFrame(all_params)