Running the Matrix Pipeline
Now that you understand the different environments, have a good understanding of kedro, and have successfully run the pipeline end-to-end with test data, let's explore how to run specific parts of the pipeline in base environments.
Pipeline Structure
As we explained at the beginning of this section, the Matrix pipeline is composed of several sub-pipelines that can be run independently or combined. The main components are:
- Data Engineering Pipeline (
data_engineering): Handles data ingestion and integration - Feature Pipeline (
feature): Performs filtering and generates embeddings - Modelling Pipeline (
modelling_run): Includes model training, matrix generation, evaluation and transformations
In this section we will break them down in base environment to teach you how to run the specific part of the pipeline.
Running Individual Pipeline Components
As explained earlier in the section, data engineering pipeline is responsible for ingesting and integrating data from various biomedical knowledge graphs and datasets. The data engineering pipeline consists of two main stages:
Ingestion Stage (ingestion):
kedro run --pipeline=ingestion -e base # also can do kedro run -p ingestion
- Loads raw data from each source
- Performs initial data validation
- Converts data into a standardized format
- Stores intermediate results in Parquet format for efficient processing
Integration Stage (integration):
kedro run --pipeline=integration -e base
- Normalizes data from different sources to a common format (Biolink)
- Resolves entity synonyms to ensure consistent node IDs
- Unions and deduplicates nodes and edges
- Produces a unified knowledge graph
The pipeline is highly configurable, allowing you to enable or disable different data sources based on your needs. These sources are configured in settings.py:
DYNAMIC_PIPELINES_MAPPING = lambda: disable_private_datasets(
generate_dynamic_pipeline_mapping(
{
"cross_validation": {
"n_cross_val_folds": 3,
},
"integration": [
{"name": "rtx_kg2", "integrate_in_kg": True, "is_private": False},
{"name": "robokop", "integrate_in_kg": True, "is_private": False},
# ... other sources
{"name": "drug_list", "integrate_in_kg": False, "has_edges": False},
{"name": "disease_list", "integrate_in_kg": False, "has_edges": False},
],
In the Matrix project, we use dynamic pipelines to handle multiple data sources, model variants, and evaluation types. The DYNAMIC_PIPELINES_MAPPING in settings.py defines which data sources to include, which models to train, and which evaluations to run. This allows us to easily enable/disable different components without modifying the core pipeline code. Note that the dynamic pipelines are not natively supported by kedro and it is our own custom implementation.
Therefore, the settings above reflect the Knowledge Graphs (eg RTX-KG2, SPOKE, ROBOKOP, and Embiology) which get ingested as well as other core datasets which are essential for Matrix (Drug List, Disease List, Ground Truth, Clinical Trials and Medical Team data). Thanks to settings.py and dynamic pipelines, they are not statically defined/hard-coded but configured through settings
Now let's get back to running the pipeline. Try running the following command first:
kedro run -p data_engineering -e base
You should see all different datasets present in the integration part of settings being processed. It should run to completion fairly quickly:
Total pipeline duration: 0:01:20.391892
Now uncomment all of them except for RTX-KG2, like this:
DYNAMIC_PIPELINES_MAPPING = lambda: disable_private_datasets(
generate_dynamic_pipeline_mapping(
{
"cross_validation": {
"n_cross_val_folds": 3,
},
"integration": [
{"name": "rtx_kg2", "integrate_in_kg": True, "is_private": False},
#{"name": "robokop", "integrate_in_kg": True, "is_private": False},
# ... other sources
#{"name": "drug_list", "integrate_in_kg": False, "has_edges": False},
#{"name": "disease_list", "integrate_in_kg": False, "has_edges": False},
],
Total pipeline duration: 0:00:24.715172
Running Specific Nodes
By controlling settings.py we can decide which datasets should be processed by only executing specific nodes. As we found out in the kedro introductory video, nodes are the building blocks of the pipeline which can also be executed on their own for debugging or to test individual components.
For example, to run just the ingestion of drug list:
kedro run --pipeline=ingestion --nodes=write_drug_list -e base
Or to run the node deduplication step in the integration pipeline:
kedro run --pipeline=integration --nodes=create_prm_unified_nodes -e base
integration/pipeline.py and integration/nodes.py.
Modifying parameters in the feature pipeline
The exact details on integration and ingestion pipeline can be found in the pipeline section. In short however, running the following command:
# NOTE: Data engineering pipeline in base environment will take at least +1 hr to run on a single machine in base environment (as it lacks parallelization)
kedro run -p data_engineering -e base
featurepipeline consists of:
filtering: Applies custom filters to the knowledge graphembeddings: Generates topological embeddings for nodes
Both components have configurable parameters that can significantly impact the pipeline's behavior and runtime - we will explore them in more detail to give you a better understanding of config directory role.
Filtering Parameters
The filtering pipeline allows you to control which data sources and relationships are included in the knowledge graph. You can modify these parameters in conf/base/filtering/parameters.yml:
filtering:
node_filters:
filter_sources:
_object: matrix.pipelines.filtering.filters.KeepRowsContaining
column: upstream_data_source
keep_list:
- rtxkg2
# - robokop # Uncomment to include ROBOKOP data
# ...
edge_filters:
filter_sources:
_object: matrix.pipelines.filtering.filters.KeepRowsContaining
column: upstream_data_source
keep_list:
- rtxkg2
# - robokop
Important: Dependency Injection
You might have noticed that _object parameter occurs quite frequently across different environment and config files. This is a critical concept in our codebase - we leverage the dependency injection pattern to ensure clean configuration and re-usability. This design pattern is fundamental to how we structure our code and configuration. We will dig into the details of this custom kedro extension in the kedro extension section.
For example, to include ROBOKOP data in your pipeline:
- Uncomment the
robokopline in thekeep_list - Run the filtering pipeline:
kedro run --pipeline=filtering -e base
This will ensure that nodes and edges derived from ROBOKOP only are kept for the feature generation. You can try running the pipeline with and without robokop parameter and see how it affects the runtime of the pipeline
Embeddings Parameters
The embeddings pipeline generates topological embeddings for nodes in the knowledge graph. The key parameters are defined in conf/base/embeddings/parameters.yml:
embeddings.topological_estimator:
_object: matrix.pipelines.embeddings.graph_algorithms.GDSNode2Vec
concurrency: 4
embedding_dim: 512
random_seed: 42
iterations: 10
walk_length: 30
walks_per_node: 10
window_size: 10
OPENAI_API_KEY needs to be set
Before we run the embeddings pipeline, we will need to make sure that the OPENAI_API_KEY is set in the .env file.
Neo4J is set up and running
The embeddings pipeline require that Neo4J is running and configured correctly. If you have Neo4J set up already, you could simply add the username and password the following to your .env file
NEO4J_HOST: xxx
NEO4J_USER: xxx
NEO4J_PASSWORD: xxx
if not, you could start Neo4J from the docker-compose file.
make compose_up
Important: memory settings for Neo4J
Please ensure that the machine you are running on has at least 128 GB of memory, as the embeddings pipeline is an intensive pipeline and require Neo4J to have at least 20 GB of RAM. The above command assumes you have at least 80 GB of memory. Please adjust accordingly. The following settings work:
NEO4J_server_memory_heap_initial__size=40g
NEO4J_server_memory_heap_max__size=40g
NEO4J_server_memory_pagecache_size=8g
You could add this to the Neo4J environment section of the docker-compose file (if you are running Neo4J through it) or if you are running a standalone version, modify the settings in the environment file or settings of Neo4J accordingly. This is to prevent you from running into an OOM error as the embeddings pipeline is intensive and will be killed if there is insufficient memory.
kedro run -p embeddings -e base
Running into UnsupportedAdministrationCommand error
if you run into the following error:
{code: Neo.ClientError.Statement.UnsupportedAdministrationCommand} {message: Unsupported administration command: CREATE OR REPLACE DATABASE `analytics`}
pipelines/matrix/conf/base/embeddings/catalog.yml and remove the mode: overwrite.
Please note that removing mode: overwrite will not delete the data when the pipeline is re-run. You would have to manually delete the data from the Neo4J folder. Not deleting the data might result in inaccurate or duplicated data.
Finding Data Products while running the modelling pipeline
Once we generated embeddings, we have everything we might need for running the modelling pipeline and generating the MATRIX. This is as simple as running the following command:
kedro run --pipeline=modelling_run
This pipeline includes:
modelling: Trains the ML modelsmatrix_generation: Produces the drug-disease prediction matrixevaluation: Evaluates model performancematrix_transformations: Applies post-processing transformations
Just like we learnt earlier, we can run individual components (e.g. kedro run -p modelling -e base) of the pipeline or modify parameters (e.g. conf/base/modelling/defaults.yml to modify train-test splits). In the modelling pipeline we also have a choice of selecting different algorithms for modelling - these can be selected in settings.py, just like we learnt in the first section:
"modelling": {
"model_name": "xg_ensemble", # model_name suggestions: xg_baseline, xg_ensemble, rf, xg_synth
"model_config": {"num_shards": 3},
},
The pipeline runs multiple types of evaluations which are also configured in settings.py:
"evaluation": [
{"evaluation_name": "simple_classification"}, # Basic classification metrics
{"evaluation_name": "disease_specific"}, # Disease-specific ranking
{"evaluation_name": "full_matrix"}, # Full matrix ranking
{"evaluation_name": "full_matrix_negatives"}, # Negative pairs evaluation
{"evaluation_name": "simple_classification_trials"}, # Clinical trials evaluation
{"evaluation_name": "disease_specific_trials"}, # Disease-specific trials
{"evaluation_name": "full_matrix_trials"}, # Full matrix trials
{"evaluation_name": "disease_specific_off_label"}, # Off-label evaluation
{"evaluation_name": "full_matrix_off_label"} # Full matrix off-label
]
Each evaluation type produces different metrics, you can learn about them in detail here.
So once we run the pipeline:
kedro run -p modelling_run -e base
conf/base/matrix_generation/catalog.yml where once you follow the data layer convention, you can see the output:
"matrix_generation.fold_{fold}.model_output.sorted_matrix_predictions@spark":
<<: *_spark_parquet
filepath: ${globals:paths.matrix_generation}/model_output/fold_{fold}/matrix_predictions
{folds} for cross-validation folds or globals:path.matrix_generation from globals.yml which specify the global variable for matrix generation directory. Similar variables can be found within conf/base/evaluation/catalog.yml where many pathways have source variables - these correspond to the evaluation_metrics variables from settings.py - you can comment some of them out to see how your pipeline matrix run changes
Running the pipeline with a (subset of) real data
Now that you understand how to run different parts of the pipeline, let's try to run the pipeline with real data. The real KG is large and requires plenty of compute/time however we can run the pipeline with a subset of real data if you follow next steps.
Using Public Datasets
The Matrix pipeline now supports public datasets that are available through our public GCS bucket at gs://data.dev.everycure.org/data/01_RAW/. This includes public knowledge graphs like RTX-KG2 and Robokop, as well as supporting datasets like ground truth data that don't require special access permissions.
To run the pipeline with public data sources:
-
Enable public data sources in your
globals.ymlby ensuring the data sources you want are uncommented:data_sources: rtx_kg2: version: v2.7.3 robokop: version: v1.5.0 gt: # ground truth data version: v2.10.0_validated -
Use the base environment which is already configured to read from the public bucket for these data sources via the
raw_publicpath. -
Run the data engineering pipeline to process public datasets:
kedro run --pipeline=data_engineering --tags=rtx_kg2,robokop -e base
This approach allows you to work with real, production-quality data without needing access to private datasets or development buckets.
After approximately 20-30 mins, the pipeline should have finished all stages. If that's the case - well done! You can now repeat the entire process with real data if you would like however note that it will take a very long time - without parallelization, you can expect it to run for +24hrs for KGs such as RTX-KG2. Smaller Graphs might be easier.
Info
Remember that the pipeline is modular by design, allowing you to run components independently. It's very rare that we run the pipeline with real data e2e; we usually first run data_engineering pipeline to examine the generated KG, then we extract features and only after that's complete, we would start modelling.