Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 35 additions & 35 deletions src/guide/user/record_linkage.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Download the full reference script: <a href="/files/record_linkage_intersection.

## Goal

This tutorial demonstrates how to perform a multi-centric record linkage analysis using Python, a user-adapted configuration TOML file, and a streamlined version of the Mainzelliste pseudonymization, identity management, and record linkage system.
This tutorial demonstrates how to perform a multi-centric record linkage analysis using Python, a user-adapted configuration TOML file, and a streamlined version of the Mainzelliste pseudonymization, identity management, and record linkage system.
The streamlined version of Mainzelliste, implemented in Rust, provides high performance, robustness, and a compact runtime footprint, making it suitable for distributed data processing across multiple institutions or nodes.

Record linkage is the process of identifying records that correspond to the same individual across different datasets. In multi-centric settings, this must be done without sharing sensitive personal data between sites. Mainzelliste achieves this through privacy-preserving algorithms: identifying information is encoded using Bloom filters, and original identifiers are replaced by pseudonyms, ensuring that sensitive data is never directly exchanged.
Expand All @@ -38,34 +38,34 @@ The general workflow of the record linkage analysis is as follows:
### Python Analysis Script
The script (<a href="/files/record_linkage_intersection.py" download>record_linkage.py</a>) defines:

1. `RLAnalyzer` (runs on each analyzer node)
- **`__init__()`**: Initializes the analyzer node.
- **`analysis_method()`**: Executes the main analysis workflow, consisting of multiple steps:
* **Initialize PostgreSQL** – Prerequisite for Mainzelliste (using `run_as_postgres()`, `init_db()`, `start_postgres()`, `create_user_and_db()`).
* **Start Mainzelliste** – Provides pseudonymization services and performs Bloom filter calculations.
- `wait_for_mainzelliste()`: Polls the Mainzelliste health check, attempting to establish a connection until the timeout is reached.
* **Process patients from CSV** – Iterates over all patients in the project’s S3 datastore.
- `add_patients()`: Adds patients to Mainzelliste from CSV (using `create_session_and_token()` to start a session and `get_new_pseudonyms()` to obtain pseudonyms).
- `get_bloomfilters()`: Retrieves Bloom filters from Mainzelliste for record linkage.
* **Cleanup** – Cleanly shuts down services.
- `cleanup()`: Stops PostgreSQL (via `stop_postgres()`).
* **Multi-stage processing** – Analysis is performed in up to three iterations (stages 0–2).
1. `RLAnalyzer` (runs on each analyzer node)
- **`__init__()`**: Initializes the analyzer node.
- **`analysis_method()`**: Executes the main analysis workflow, consisting of multiple steps:
* **Initialize PostgreSQL** – Prerequisite for Mainzelliste (using `run_as_postgres()`, `init_db()`, `start_postgres()`, `create_user_and_db()`).
* **Start Mainzelliste** – Provides pseudonymization services and performs Bloom filter calculations.
- `wait_for_mainzelliste()`: Polls the Mainzelliste health check, attempting to establish a connection until the timeout is reached.
* **Process patients from CSV** – Iterates over all patients in the project’s S3 datastore.
- `add_patients()`: Adds patients to Mainzelliste from CSV (using `create_session_and_token()` to start a session and `get_new_pseudonyms()` to obtain pseudonyms).
- `get_bloomfilters()`: Retrieves Bloom filters from Mainzelliste for record linkage.
* **Cleanup** – Cleanly shuts down services.
- `cleanup()`: Stops PostgreSQL (via `stop_postgres()`).
* **Multi-stage processing** – Analysis is performed in up to three iterations (stages 0–2).


2. `RLAggregator` (runs on the aggregator node)
- **`__init__()`**: Initializes the aggregator node, initializes PostgresSQL and starts Mainzelliste.
* **Generates configurations** - Generates configurations for analyzers and aggregator (using `create_config_nodes()`, `create_config_aggregator()`).
* **Initialize PostgreSQL** – Prerequisite for Mainzelliste (using `run_as_postgres()`, `init_db()`, `start_postgres()`, `create_user_and_db()`).
* **Start Mainzelliste** – Provides pseudonymization services and performs Bloom filter calculations.
- **`aggregation_method()`**: Executes matching logic, collects, and distributes results, consisting of multiple steps:
* **Initialize PostgreSQL** – Prerequisite for Mainzelliste (using `run_as_postgres()`, `init_db()`, `start_postgres()`, `create_user_and_db()`).
* **Start Mainzelliste** – Provides pseudonymization services and performs Bloom filter calculations.
- **`aggregation_method()`**: Executes matching logic, collects, and distributes results, consisting of multiple steps:
* Return configuration to analyzer nodes.
* Gather Bloom filter results from all analyzers.
* `wait_for_mainzelliste()`: Polls the Mainzelliste health check, attempting to establish a connection until the timeout is reached.
* **Identify duplicates** – Perform record linkage with Bloom filters: iterates over all Bloom filters of all analyzer nodes and adds them to Mainzelliste.
* `wait_for_mainzelliste()`: Polls the Mainzelliste health check, attempting to establish a connection until the timeout is reached.
* **Identify duplicates** – Perform record linkage with Bloom filters: iterates over all Bloom filters of all analyzer nodes and adds them to Mainzelliste.
- Matching Patients get same pseudonyms.
* **Compute intersections** - Computes global and pairwise duplicates: calculates intersection of all nodes based on pseudonyms (using `all_nodes_intersect()`).
* Return results to analyzers and hub.
* `stop_postgres()`: Stops PostgreSQL.
* `stop_postgres()`: Stops PostgreSQL.


### TOML Configuration File
Expand All @@ -76,7 +76,7 @@ Therefore, the analyst must be familiar with the project’s data and assess whe

If no such file is provided, the script falls back to a built-in default configuration, which may not be compatible with the project’s data fields and could therefore cause the analysis to fail.

The `config.toml` file below indicates the fields that the analyst should customize for the project.
The `config.toml` file below indicates the fields that the analyst should customize for the project.

```toml
[patient_settings]
Expand Down Expand Up @@ -113,7 +113,7 @@ nachname = "^[A-Za-zäÄöÖüÜßáÁéÉèÈ\\.\\- ]*[A-Za-zäÄöÖüÜßáÁ
[validate_date]
fields = ["geburtstag", "geburtsmonat", "geburtsjahr"]

[thresholds]
[thresholds]
is_match = 0.95
non_match = 0.95

Expand All @@ -131,16 +131,16 @@ The `config.toml` file supports the following settings:
| **validate_fields** | A regular expression applied to specific string fields to verify their validity. For example, it can ensure that names do not contain numbers |
| **validate_date** | A list of fields that together define a complete date |
| **tresholds** | A value between [0,1] indicating the minimum weight for a record pair to count as a definitive match. To prevent failures in Flame, both `is_match` and `non_match` must be set identically |
| **exchange_groups** | A comma-separated list of field names that are treated as interchangeable. For the built-in matchers (`EpilinkMatcher`, `ThreadedEpilinkMatcher`), all possible permutations of the fields across two records are compared, and the permutation with the highest similarity contributes to the overall score
| **exchange_groups** | A comma-separated list of field names that are treated as interchangeable. For the built-in matchers (`EpilinkMatcher`, `ThreadedEpilinkMatcher`), all possible permutations of the fields across two records are compared, and the permutation with the highest similarity contributes to the overall score


## Reference Code (Excerpt)
The following excerpts highlight the key distributed computing patterns in <a href="/files/record_linkage_intersection.py" download>record_linkage.py</a>. The full file contains additional logic.

When executing the record linkage analysis in the FLAME platform, multiple iterations between the aggregator and analyzer nodes are required:
- **Initialization:** The RLAnalyzer and RLAggregator are started.
- **Initialization:** The RLAnalyzer and RLAggregator are started.
- **First iteration:** The analyzer’s `analysis_method()` is called. Since configuration files from the aggregator are required before Bloom filter generation can begin, the `analysis_method()` returns `None`. The aggregator detects this (because `aggregator_results` are `None`) and responds with the configuration files in a dictionary under the key `"config"`.
- **Second iteration:** As `aggregator_results` are no longer `None` and contain the `"config"` key, the analyzer recognizes the second iteration, generates the Bloom filters, and returns them. The aggregator then checks the analysis results again. Because they are not `None`, it concludes that this is no longer the first iteration. Once the iterations are complete, the aggregator receives the string `"finished"`. At this point, since the `analysis_results` are neither `None` nor equal to `"finished"`, it moves into the `else` branch and performs the record linkage. It then sends the duplicate patient results back to the respective nodes.
- **Second iteration:** As `aggregator_results` are no longer `None` and contain the `"config"` key, the analyzer recognizes the second iteration, generates the Bloom filters, and returns them. The aggregator then checks the analysis results again. Because they are not `None`, it concludes that this is no longer the first iteration. Once the iterations are complete, the aggregator receives the string `"finished"`. At this point, since the `analysis_results` are neither `None` nor equal to `"finished"`, it moves into the `else` branch and performs the record linkage. It then sends the duplicate patient results back to the respective nodes.
- **Final step:** The analyzer nodes detect the final iteration because the `aggregator_results` are not `None` and do not contain the `"config"` key. They therefore move into their own `else` branch and store the information about duplicates for future use. Once the analysis is complete, the aggregator receives the string `"finished"` and finally returns the computed intersections to the hub, marking the end of the analysis.


Expand Down Expand Up @@ -216,17 +216,17 @@ The aggregator combines results from all analyzer nodes into a federated summary
class RLAggregator(StarAggregator):
def aggregation_method(self, analysis_results: List[Dict[str, Any]]) -> str:
if analysis_results[0] is None: # 0 iteration
self.flame.flame_log("0 iteration")
self.flame.flame_log("0 iteration")
return {"config": self.analyzer_config_dict}

elif analysis_results[0] == "finished": # 2 iteration
self.flame.flame_log("aggregator finishes")
return self.hub_results

else: # 1 iteration
...
return node_results

def has_converged(self, result, last_result, num_iterations):
if num_iterations >= 2: # iterative federation
return True
Expand All @@ -236,22 +236,22 @@ class RLAggregator(StarAggregator):
The aggregator receives structured data from each node and produces federation-wide statistics without accessing raw data files.

## Output Structure
The final JSON result can be downloaded from the Hub.
The final JSON result can be downloaded from the Hub.
The structure summarizes the number of matching records across nodes. For documentation purposes, placeholders are shown instead of the actual (longer) node IDs.

- **Example with two nodes**
- **Example with two nodes**
```json
{
"node1-id:node2-id": 5,
"total": 5
}
```
- **Example with three nodes**
- **Example with three nodes**
```json
{
"node1-id:node2-id": 5,
"node1-id:node3-id": 5,
"node2-id:node3-id": 4,
"node1-id:node2-id": 5,
"node1-id:node3-id": 5,
"node2-id:node3-id": 4,
"total": 3
}
```
Expand Down
Loading