Model Code¶
Model code in ExpOps projects defines the ML pipeline using decorators and functions.
File Location¶
Location: models/<model_name>.py
Required Imports¶
Always import from expops.core:
Key Components¶
Process Definitions¶
Functions decorated with @process() define pipeline processes. Process functions have strict requirements:
Required Function Signature¶
Every process function MUST: 1. Declare explicit parameters for the upstream output keys and hyperparameter keys it needs 2. Return a dictionary (required - non-dict returns will raise an error) 3. Return only serializable data (dictionaries, lists, primitives - not complex objects) 4. Ensure output keys are unique across upstream dependencies (collisions raise errors)
@process()
def define_my_process(cleaned_df, learning_rate: float = 0.001):
# cleaned_df is injected from an upstream process return key
result = perform_work(cleaned_df, learning_rate)
# MUST return a dictionary with serializable values
return {
'scores': result # Must be serializable (dict, list, primitive types)
}
Step Functions¶
Functions decorated with @step() perform specific operations within a process:
@step()
def load_data():
# Data loading logic
df = pd.read_csv("data.csv")
return {'df': df.to_dict(orient='list')}
@step()
def preprocess(raw: SerializableData):
"""
Steps can accept SerializableData type hint for type checking.
SerializableData is a type alias for Dict[str, Any].
"""
df = pd.DataFrame(raw['df'])
# Preprocessing logic
processed = clean_data(df)
return {'processed_df': processed.to_dict(orient='list')}
@step()
def train(prep_data: SerializableData):
# Training logic
X = np.array(prep_data['processed_df'])
model = train_model(X)
return {'model': model}
Step Notes:
- Steps are defined inside process functions
- Steps can accept explicit parameters passed from the process
- Steps execute sequentially within their parent process
Metrics Logging¶
Use log_metric() for experiment tracking:
from expops.core import log_metric
# Log scalar metrics (step omitted - auto-increments)
log_metric("accuracy", 0.95)
log_metric("loss", 0.05)
Step Parameter¶
The step parameter is optional and controls how metrics are tracked over time:
When step is omitted (default behavior):
- The system automatically increments from the largest existing step for that metric
- If no previous metrics exist for that metric name, it starts at step
1 - Each subsequent call without
stepincrements by 1
When step is explicitly provided:
- You control the step number yourself
- Useful for training loops, epochs, iterations, or any custom progression
- Steps can be any positive integer
# Training loop with explicit step numbers
for epoch in range(100):
loss = train_one_epoch()
# Use epoch number as step
log_metric("train_loss", loss, step=epoch + 1)
Data Flow Between Processes¶
Processes receive data from all upstream processes by matching parameter names to upstream return keys and hyperparameter keys:
@process()
def define_downstream_process(model, X_test):
# model and X_test are injected from upstream outputs
predictions = model.predict(X_test)
return {'predictions': predictions.tolist()}
Important: Inputs are injected by name. If two upstream processes return the same key (or a key collides with a hyperparameter), the runtime raises an error.
Example¶
See template projects for complete examples:
- sklearn-basic: Simple sklearn pipeline
- premier-league: Complex pipeline with multiple steps