Join us for an expert-led overview of the tools and concepts you'll need to pass exam PL-300. The first session starts on June 11th. See you there!
Get registeredJoin us at FabCon Vienna from September 15-18, 2025, for the ultimate Fabric, Power BI, SQL, and AI community-led learning event. Save €200 with code FABCOMM. Get registered
We created a notebook to do some revenue predictions for locations using MLflow and pyspark. (Yes, later we might use pandas.)
The code is something like below, and forgive me if the code is not completely correct.
In the code you see that for each location we do 14 iterations to use the predicted revenue do finetune the predictions. This process works to our likings.
When we run this process using a foreach loop everything works fine.
What we want to do is use the ThreadPoolExecutor to do parallel processing of the predictions for locations and create an experiment per location to save the process. The problem that we run into is that we see predictions sometimes being saved to experiments of other locations and even runs being nested in runs of other locations. Does anyone know how to prevent this from happening?
import mlflow
from datetime import datetime
from pyspark.sql import DataFrame
from pyspark.ml.pipeline import PipelineModel
from concurrent.futures import ThreadPoolExecutor
class LocationPrediction:
def __init__(self, location_name, pipeline_model):
self.location_name = location_name
self.pipeline_model = pipeline_model
self.df_with_predictions: DataFrame = None
self.iteration = 0
self.get_data_from_lakehouse()
def get_data_from_lakehouse(self):
self.initial_data = spark.read.format("delta").table("table_name").filter(f"location = '{self.location_name}'")
def predict(self):
# Start a child iteration run
with mlflow.start_run(run_name=f"Iteration_{self.iteration}", nested=True):
predictions = self.pipeline_model.transform(self.data)
mlflow.log_metric("row_count", predictions.count())
# ...
# Do some stuff do dataframe result
# ...
self.df_with_predictions = predictions
def write_to_lakehouse(self):
self.df_with_predictions.write.format("delta").mode("append").saveAsTable("table_name")
# Use new predictions to predict again
def do_iteration(self):
for i in range(14):
self.predict()
self.iteration += 1
self.write_to_lakehouse()
def get_pipeline_model(location_name) -> PipelineModel:
model_uri = f"models:/{location_name}/latest"
model = mlflow.spark.load_model(model_uri)
return model
def run_prediction_task(location_name):
# Create or set Fabric experiment and start main run
mlflow.set_experiment(location_name)
run_timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
mlflow.start_run(run_name=f"Prediction_{run_timestamp}")
pipeline_model = get_pipeline_model(location_name)
pipeline = LocationPrediction(location_name, pipeline_model)
pipeline.do_iteration()
mlflow.end_run()
if __name__ == "__main__":
locations = ["location_1", "location_2", "location_3","location_4","location_5","location_6"]
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(run_prediction_task, location) for location in locations]
Solved! Go to Solution.
Thanks for reaching out to MS Fabric community support.
Your code is mostly on track, but there are a few key changes needed to ensure the MLflow experiments and runs are correctly isolated when using ThreadPoolExecutor. Specifically, you need to make sure that:
Set Experiment Properly: The call to mlflow.set_experiment(location_name) inside run_prediction_task is correct for setting a separate experiment per location. However, we need to ensure that mlflow.start_run() is executed within the context of each thread and location.
Unique Run Names: You're using run_name=f"Prediction_{run_timestamp}" to uniquely identify the main run. That’s great! This will ensure each location has its own main run.
Manage Iteration Runs: You should avoid using nested=True in the mlflow.start_run() if you want fully independent runs (and if the iteration runs don't need to be nested within the main run). If nested runs are necessary, you can keep nested=True, but the parent-child relationship between runs can sometimes lead to issues when parallelizing execution.
DataFrame Management: Ensure that your df_with_predictions isn't being modified by multiple threads simultaneously. Each thread should work with its own version of the data.
MLflow Context in Threads: Make sure that each thread correctly creates and manages its own experiment and run context.
You can find more information on managing MLflow experiments and runs in the official documentation:
Thanks,
Prashanth Are
MS Fabric community support
Thanks for reaching out to MS Fabric community support.
Your code is mostly on track, but there are a few key changes needed to ensure the MLflow experiments and runs are correctly isolated when using ThreadPoolExecutor. Specifically, you need to make sure that:
Set Experiment Properly: The call to mlflow.set_experiment(location_name) inside run_prediction_task is correct for setting a separate experiment per location. However, we need to ensure that mlflow.start_run() is executed within the context of each thread and location.
Unique Run Names: You're using run_name=f"Prediction_{run_timestamp}" to uniquely identify the main run. That’s great! This will ensure each location has its own main run.
Manage Iteration Runs: You should avoid using nested=True in the mlflow.start_run() if you want fully independent runs (and if the iteration runs don't need to be nested within the main run). If nested runs are necessary, you can keep nested=True, but the parent-child relationship between runs can sometimes lead to issues when parallelizing execution.
DataFrame Management: Ensure that your df_with_predictions isn't being modified by multiple threads simultaneously. Each thread should work with its own version of the data.
MLflow Context in Threads: Make sure that each thread correctly creates and manages its own experiment and run context.
You can find more information on managing MLflow experiments and runs in the official documentation:
Thanks,
Prashanth Are
MS Fabric community support
Thanks @v-prasare. We are looking further into using NotebookUtils.notebook.RunMultiple(), as it gives us better insights in what the process was for each location in the notebook instance, by providing us a link to the notebook snaphots afterwards.
@nielsvdc, Thank you for the update!
It sounds like using NotebookUtils.notebook.RunMultiple() is a great choice, especially since it provides better insights and visibility into the process for each location in the notebook instance. The ability to access notebook snapshots afterwards will definitely help with tracking and debugging.
If this post helps, then please consider Accept it as the solution to help the other members find it more quickly and give Kudos if helped you resolve your query
Thanks,
Prashanth Are
MS Fabric community support