diff --git a/src/guide/user/record_linkage.md b/src/guide/user/record_linkage.md
index e91896d4..d6b17d09 100644
--- a/src/guide/user/record_linkage.md
+++ b/src/guide/user/record_linkage.md
@@ -13,7 +13,7 @@ Download the full reference script: record_linkage.py) defines:
-1. `RLAnalyzer` (runs on each analyzer node)
- - **`__init__()`**: Initializes the analyzer node.
- - **`analysis_method()`**: Executes the main analysis workflow, consisting of multiple steps:
- * **Initialize PostgreSQL** – Prerequisite for Mainzelliste (using `run_as_postgres()`, `init_db()`, `start_postgres()`, `create_user_and_db()`).
- * **Start Mainzelliste** – Provides pseudonymization services and performs Bloom filter calculations.
- - `wait_for_mainzelliste()`: Polls the Mainzelliste health check, attempting to establish a connection until the timeout is reached.
- * **Process patients from CSV** – Iterates over all patients in the project’s S3 datastore.
- - `add_patients()`: Adds patients to Mainzelliste from CSV (using `create_session_and_token()` to start a session and `get_new_pseudonyms()` to obtain pseudonyms).
- - `get_bloomfilters()`: Retrieves Bloom filters from Mainzelliste for record linkage.
- * **Cleanup** – Cleanly shuts down services.
- - `cleanup()`: Stops PostgreSQL (via `stop_postgres()`).
- * **Multi-stage processing** – Analysis is performed in up to three iterations (stages 0–2).
-
+1. `RLAnalyzer` (runs on each analyzer node)
+ - **`__init__()`**: Initializes the analyzer node.
+ - **`analysis_method()`**: Executes the main analysis workflow, consisting of multiple steps:
+ * **Initialize PostgreSQL** – Prerequisite for Mainzelliste (using `run_as_postgres()`, `init_db()`, `start_postgres()`, `create_user_and_db()`).
+ * **Start Mainzelliste** – Provides pseudonymization services and performs Bloom filter calculations.
+ - `wait_for_mainzelliste()`: Polls the Mainzelliste health check, attempting to establish a connection until the timeout is reached.
+ * **Process patients from CSV** – Iterates over all patients in the project’s S3 datastore.
+ - `add_patients()`: Adds patients to Mainzelliste from CSV (using `create_session_and_token()` to start a session and `get_new_pseudonyms()` to obtain pseudonyms).
+ - `get_bloomfilters()`: Retrieves Bloom filters from Mainzelliste for record linkage.
+ * **Cleanup** – Cleanly shuts down services.
+ - `cleanup()`: Stops PostgreSQL (via `stop_postgres()`).
+ * **Multi-stage processing** – Analysis is performed in up to three iterations (stages 0–2).
+
2. `RLAggregator` (runs on the aggregator node)
- **`__init__()`**: Initializes the aggregator node, initializes PostgresSQL and starts Mainzelliste.
* **Generates configurations** - Generates configurations for analyzers and aggregator (using `create_config_nodes()`, `create_config_aggregator()`).
- * **Initialize PostgreSQL** – Prerequisite for Mainzelliste (using `run_as_postgres()`, `init_db()`, `start_postgres()`, `create_user_and_db()`).
- * **Start Mainzelliste** – Provides pseudonymization services and performs Bloom filter calculations.
- - **`aggregation_method()`**: Executes matching logic, collects, and distributes results, consisting of multiple steps:
+ * **Initialize PostgreSQL** – Prerequisite for Mainzelliste (using `run_as_postgres()`, `init_db()`, `start_postgres()`, `create_user_and_db()`).
+ * **Start Mainzelliste** – Provides pseudonymization services and performs Bloom filter calculations.
+ - **`aggregation_method()`**: Executes matching logic, collects, and distributes results, consisting of multiple steps:
* Return configuration to analyzer nodes.
* Gather Bloom filter results from all analyzers.
- * `wait_for_mainzelliste()`: Polls the Mainzelliste health check, attempting to establish a connection until the timeout is reached.
- * **Identify duplicates** – Perform record linkage with Bloom filters: iterates over all Bloom filters of all analyzer nodes and adds them to Mainzelliste.
+ * `wait_for_mainzelliste()`: Polls the Mainzelliste health check, attempting to establish a connection until the timeout is reached.
+ * **Identify duplicates** – Perform record linkage with Bloom filters: iterates over all Bloom filters of all analyzer nodes and adds them to Mainzelliste.
- Matching Patients get same pseudonyms.
* **Compute intersections** - Computes global and pairwise duplicates: calculates intersection of all nodes based on pseudonyms (using `all_nodes_intersect()`).
* Return results to analyzers and hub.
- * `stop_postgres()`: Stops PostgreSQL.
+ * `stop_postgres()`: Stops PostgreSQL.
### TOML Configuration File
@@ -76,7 +76,7 @@ Therefore, the analyst must be familiar with the project’s data and assess whe
If no such file is provided, the script falls back to a built-in default configuration, which may not be compatible with the project’s data fields and could therefore cause the analysis to fail.
-The `config.toml` file below indicates the fields that the analyst should customize for the project.
+The `config.toml` file below indicates the fields that the analyst should customize for the project.
```toml
[patient_settings]
@@ -113,7 +113,7 @@ nachname = "^[A-Za-zäÄöÖüÜßáÁéÉèÈ\\.\\- ]*[A-Za-zäÄöÖüÜßáÁ
[validate_date]
fields = ["geburtstag", "geburtsmonat", "geburtsjahr"]
-[thresholds]
+[thresholds]
is_match = 0.95
non_match = 0.95
@@ -131,16 +131,16 @@ The `config.toml` file supports the following settings:
| **validate_fields** | A regular expression applied to specific string fields to verify their validity. For example, it can ensure that names do not contain numbers |
| **validate_date** | A list of fields that together define a complete date |
| **tresholds** | A value between [0,1] indicating the minimum weight for a record pair to count as a definitive match. To prevent failures in Flame, both `is_match` and `non_match` must be set identically |
-| **exchange_groups** | A comma-separated list of field names that are treated as interchangeable. For the built-in matchers (`EpilinkMatcher`, `ThreadedEpilinkMatcher`), all possible permutations of the fields across two records are compared, and the permutation with the highest similarity contributes to the overall score
+| **exchange_groups** | A comma-separated list of field names that are treated as interchangeable. For the built-in matchers (`EpilinkMatcher`, `ThreadedEpilinkMatcher`), all possible permutations of the fields across two records are compared, and the permutation with the highest similarity contributes to the overall score
## Reference Code (Excerpt)
The following excerpts highlight the key distributed computing patterns in record_linkage.py. The full file contains additional logic.
When executing the record linkage analysis in the FLAME platform, multiple iterations between the aggregator and analyzer nodes are required:
-- **Initialization:** The RLAnalyzer and RLAggregator are started.
+- **Initialization:** The RLAnalyzer and RLAggregator are started.
- **First iteration:** The analyzer’s `analysis_method()` is called. Since configuration files from the aggregator are required before Bloom filter generation can begin, the `analysis_method()` returns `None`. The aggregator detects this (because `aggregator_results` are `None`) and responds with the configuration files in a dictionary under the key `"config"`.
-- **Second iteration:** As `aggregator_results` are no longer `None` and contain the `"config"` key, the analyzer recognizes the second iteration, generates the Bloom filters, and returns them. The aggregator then checks the analysis results again. Because they are not `None`, it concludes that this is no longer the first iteration. Once the iterations are complete, the aggregator receives the string `"finished"`. At this point, since the `analysis_results` are neither `None` nor equal to `"finished"`, it moves into the `else` branch and performs the record linkage. It then sends the duplicate patient results back to the respective nodes.
+- **Second iteration:** As `aggregator_results` are no longer `None` and contain the `"config"` key, the analyzer recognizes the second iteration, generates the Bloom filters, and returns them. The aggregator then checks the analysis results again. Because they are not `None`, it concludes that this is no longer the first iteration. Once the iterations are complete, the aggregator receives the string `"finished"`. At this point, since the `analysis_results` are neither `None` nor equal to `"finished"`, it moves into the `else` branch and performs the record linkage. It then sends the duplicate patient results back to the respective nodes.
- **Final step:** The analyzer nodes detect the final iteration because the `aggregator_results` are not `None` and do not contain the `"config"` key. They therefore move into their own `else` branch and store the information about duplicates for future use. Once the analysis is complete, the aggregator receives the string `"finished"` and finally returns the computed intersections to the hub, marking the end of the analysis.
@@ -216,17 +216,17 @@ The aggregator combines results from all analyzer nodes into a federated summary
class RLAggregator(StarAggregator):
def aggregation_method(self, analysis_results: List[Dict[str, Any]]) -> str:
if analysis_results[0] is None: # 0 iteration
- self.flame.flame_log("0 iteration")
+ self.flame.flame_log("0 iteration")
return {"config": self.analyzer_config_dict}
-
+
elif analysis_results[0] == "finished": # 2 iteration
self.flame.flame_log("aggregator finishes")
return self.hub_results
-
+
else: # 1 iteration
...
return node_results
-
+
def has_converged(self, result, last_result, num_iterations):
if num_iterations >= 2: # iterative federation
return True
@@ -236,22 +236,22 @@ class RLAggregator(StarAggregator):
The aggregator receives structured data from each node and produces federation-wide statistics without accessing raw data files.
## Output Structure
-The final JSON result can be downloaded from the Hub.
+The final JSON result can be downloaded from the Hub.
The structure summarizes the number of matching records across nodes. For documentation purposes, placeholders are shown instead of the actual (longer) node IDs.
-- **Example with two nodes**
+- **Example with two nodes**
```json
{
"node1-id:node2-id": 5,
"total": 5
}
```
-- **Example with three nodes**
+- **Example with three nodes**
```json
{
- "node1-id:node2-id": 5,
- "node1-id:node3-id": 5,
- "node2-id:node3-id": 4,
+ "node1-id:node2-id": 5,
+ "node1-id:node3-id": 5,
+ "node2-id:node3-id": 4,
"total": 3
}
```
diff --git a/src/public/files/record_linkage_intersection.py b/src/public/files/record_linkage_intersection.py
index 81ee8a45..65db1534 100644
--- a/src/public/files/record_linkage_intersection.py
+++ b/src/public/files/record_linkage_intersection.py
@@ -26,7 +26,7 @@ def __init__(self, flame):
super().__init__(flame) # Connects this analyzer to the FLAME components
self.result = None
-
+
flame.flame_log("Init of analyzer finished ...")
# Start up Postgres DB
self.PG_BIN_DIR = "/usr/lib/postgresql/14/bin"
@@ -54,7 +54,7 @@ def wait_for_mainzelliste(self, url="http://localhost:7887/health", timeout=60,
except requests.exceptions.ConnectionError:
# Service noch nicht erreichbar, warten
pass
-
+
self.flame.flame_log("Waiting for Mainzelliste...")
time.sleep(interval)
@@ -112,7 +112,7 @@ def stop_postgres(self):
"""Exits PostgreSQL cleanly."""
self.flame.flame_log("Stop PostgreSQL...")
subprocess.run(
- [f"{self.PG_BIN_DIR}/pg_ctl", "stop", "-D", self.PG_DATA_DIR, "-m", "fast"],
+ [f"{self.PG_BIN_DIR}/pg_ctl", "stop", "-D", self.PG_DATA_DIR, "-m", "fast"],
# -m fast = beendet Verbindungen, speichert sofort
check=True,
preexec_fn=self.run_as_postgres,
@@ -123,16 +123,16 @@ def stop_postgres(self):
def get_new_pseudonyms(self, keep_keys, data, addpatienturl, headers):
pseudonyms = {}
for file_bytes in data[0].values():
- decoded = file_bytes.decode("utf-8")
- csv_reader = csv.DictReader(io.StringIO(decoded), delimiter=";")
+ decoded = file_bytes.decode("utf-8")
+ csv_reader = csv.DictReader(io.StringIO(decoded), delimiter=";")
for index, row in enumerate(csv_reader):
self.flame.flame_log("row: ")
- self.flame.flame_log(str(row))
+ self.flame.flame_log(str(row))
filtered_payload = {key: row[key] for key in keep_keys}
add_row = requests.post(addpatienturl, json = filtered_payload, headers = headers)
if add_row.status_code == 201: # TODO: was wenn possible match (also 409)?
self.flame.flame_log("Adding patient to Mainzelliste succeeded")
- resp_data = add_row.json()
+ resp_data = add_row.json()
if resp_data and isinstance(resp_data, list):
pseudonym = resp_data[0].get("idString", "Not found")
pseudonyms[pseudonym] = index # TODO: andersrum? weil bei gleichen wird sonst überschrieben
@@ -182,14 +182,16 @@ def get_bloomfilters(self, pseudonyms, url, content_header, api_key):
"data": {
"searchIds": search_ids,
"resultFields": [
- "vorname_bigram_bloom", "nachname_bigram_bloom",
- "geburtstag_bigram_bloom", "geburtsmonat_bigram_bloom",
- "geburtsjahr_bigram_bloom", "plz_bigram_bloom",
- "ort_bigram_bloom"
+ "vorname", "nachname",
+ "geburtstag", "geburtsmonat",
+ "geburtsjahr", "plz",
+ "ort"
],
"resultIds": ["pid"]
}
}
+ self.flame.flame_log("payload read: ")
+ self.flame.flame_log(str(payload_read))
token_id = self.create_session_and_token(url, payload_read, content_header, api_key)
readpatienturl = f"{url}/patients?tokenId={token_id}"
@@ -207,7 +209,7 @@ def get_bloomfilters(self, pseudonyms, url, content_header, api_key):
patients_with_index.append((index, patient["fields"]))
return sorted(patients_with_index, key=lambda x: x[0])
-
+
def cleanup(self, mainzelliste_proc):
"""Shut down Mainzelliste and PostgreSQL."""
self.flame.flame_log("End Mainzelliste...")
@@ -232,7 +234,7 @@ def analysis_method(self, data, aggregator_results):
- Contains the result from the aggregator's aggregation_method in subsequent iterations.
:return: Any result of your analysis on one node (ex. patient count).
"""
- if aggregator_results == None: # 0 iteration
+ if not aggregator_results or aggregator_results[0] is None: # 0 iteration
self.flame.flame_log("0. Iteration")
return None
elif "config" in aggregator_results[0]: # 1 iteration
@@ -247,13 +249,12 @@ def analysis_method(self, data, aggregator_results):
with open(config_path, "wb") as f:
tomli_w.dump(config, f)
- self.flame.flame_log("Final analyzer configuration created under:", config_path)
self.init_db()
self.start_postgres()
self.create_user_and_db()
- # Start Mainzelliste
+ # Start Mainzelliste
binary_path = "/usr/local/bin/mainzelliste"
env = os.environ.copy()
env["PG_HOST"] = "localhost"
@@ -269,7 +270,7 @@ def analysis_method(self, data, aggregator_results):
try:
self.result = subprocess.Popen(
- [binary_path, "--config", config_path],
+ [binary_path, "--config", config_path],
env=env,
cwd=os.path.dirname(config_path),
stdout=subprocess.PIPE,
@@ -317,7 +318,7 @@ def analysis_method(self, data, aggregator_results):
self.cleanup(self.result) # End Mainzelliste + Postgres
return "finished"
-
+
class RLAggregator(StarAggregator):
@@ -330,7 +331,7 @@ def __init__(self, flame):
flame.flame_log("Init of aggregator started ...")
super().__init__(flame) # Connects this aggregator to the FLAME components
# Generate config + salt here to send to data nodes?
- self.hub_results = {}
+ self.hub_results = {}
flame.flame_log("Generate configs ...")
aggregator_config_path = self.create_config_aggregator()
self.flame.flame_log("Creating config for Analyzer nodes...")
@@ -338,7 +339,7 @@ def __init__(self, flame):
flame.flame_log("Start DB postgres ...")
- # Start postgres db
+ # Start postgres db
self.PG_BIN_DIR = "/usr/lib/postgresql/14/bin"
self.PG_DATA_DIR = "/var/lib/postgresql/data"
@@ -352,7 +353,7 @@ def __init__(self, flame):
self.init_db()
self.start_postgres()
self.create_user_and_db()
-
+
flame.flame_log("Start Aggregator Mainzelliste with Config ...")
# Start Aggregator ML
binary_path = "/usr/local/bin/mainzelliste"
@@ -397,7 +398,7 @@ def wait_for_mainzelliste(self, url="http://localhost:7887/health", timeout=60,
return True
except requests.exceptions.ConnectionError:
pass
-
+
self.flame.flame_log("Waiting for Mainzelliste...")
time.sleep(interval)
@@ -455,7 +456,7 @@ def stop_postgres(self):
"""Shuts down PostgreSQL"""
self.flame.flame_log("Stop PostgreSQL...")
subprocess.run(
- [f"{self.PG_BIN_DIR}/pg_ctl", "stop", "-D", self.PG_DATA_DIR, "-m", "fast"],
+ [f"{self.PG_BIN_DIR}/pg_ctl", "stop", "-D", self.PG_DATA_DIR, "-m", "fast"],
check=True,
preexec_fn=self.run_as_postgres,
env=os.environ
@@ -468,8 +469,8 @@ def all_nodes_intersect(self, all_matches: dict):
hub_res = set.intersection(*node_values) if node_values else set()
# Collect pairwise duplicates
- section_res = {node: {} for node in all_matches}
- pairwise_counts = {}
+ section_res = {node: {} for node in all_matches}
+ pairwise_counts = {}
for node1, node2 in combinations(all_matches.keys(), 2):
matches1 = all_matches[node1]
@@ -493,7 +494,7 @@ def all_nodes_intersect(self, all_matches: dict):
pairwise_counts["total"] = len(hub_res)
return pairwise_counts, section_res
-
+
def create_config_nodes(self):
salt_hex = get_random_bytes(64).hex()
config_path = os.path.join(os.path.dirname(__file__), "config.toml")
@@ -515,6 +516,13 @@ def create_config_nodes(self):
exchange_groups = user_config.get("exchange_groups", {})
config_content = {
"salt" : f"{salt_hex}",
+ "oidc": {
+ "issuer": "http://localhost:9000/application/o//",
+ "client_id": "",
+ "client_secret": "",
+ "user_group": "",
+ "admin_group": ""
+ },
"ids": {"internal_id": "pid"},
"id_generator": {
"pid": {"generator": "PIDGenerator", "k1": 1, "k2": 2, "k3": 3}
@@ -594,7 +602,7 @@ def create_config_nodes(self):
"exchange_group_1": ["geburtstag","geburtsjahr","geburtsmonat"]
}
}
-
+
if patient_settings:
config_content["patient_settings"] = patient_settings
if matcher_frequency:
@@ -617,8 +625,8 @@ def create_config_nodes(self):
config_content["matcher_comparators"] = matcher_comparators
- return config_content
-
+ return config_content
+
# Generate Config
def create_config_aggregator(self):
config_path = os.path.join(os.path.dirname(__file__), "config.toml")
@@ -640,6 +648,13 @@ def create_config_aggregator(self):
exchange_groups = user_config.get("exchange_groups", {})
aggregator_config = {
+ "oidc": {
+ "issuer": "http://localhost:9000/application/o//",
+ "client_id": "",
+ "client_secret": "",
+ "user_group": "",
+ "admin_group": ""
+ },
"ids": {"internal_id": "pid"},
"id_generator": {
"pid": {"generator": "PIDGenerator", "k1": 1, "k2": 2, "k3": 3}
@@ -663,16 +678,18 @@ def create_config_aggregator(self):
}
],
# === DEFAULT PATIENT SETTINGS ===
+
"patient_settings": {
- "vorname": "String",
- "nachname": "String",
- "geburtsname": "String",
- "geburtstag": "Integer",
- "geburtsmonat": "Integer",
- "geburtsjahr": "Integer",
- "ort": "String",
- "plz": "Integer"
+ "vorname": "Bloom",
+ "nachname": "Bloom",
+ "geburtsname": "Bloom",
+ "geburtstag": "Bloom",
+ "geburtsmonat": "Bloom",
+ "geburtsjahr": "Bloom",
+ "ort": "Bloom",
+ "plz": "Bloom"
},
+
"matcher_frequency": {
"vorname": 0.000235,
"nachname": 0.0000271,
@@ -736,12 +753,16 @@ def create_config_aggregator(self):
aggregator_config["thresholds"] = thresholds
if exchange_groups:
aggregator_config["exchange_groups"] = exchange_groups
-
+
matcher_comparators = {
field: "BloomFilterComparator" for field in aggregator_config["patient_settings"].keys()
}
+ patient_settings_new = {
+ field: "Bloom" for field in aggregator_config["patient_settings"].keys()
+ }
aggregator_config["matcher_comparators"] = matcher_comparators
+ aggregator_config["patient_settings"] = patient_settings_new
temp_dir = tempfile.mkdtemp()
final_config_path = os.path.join(temp_dir, "config.toml")
@@ -762,22 +783,22 @@ def aggregation_method(self, analysis_results):
self.flame.flame_log("analysis results:") # 1 iteration
self.flame.flame_log(str(analysis_results))
if analysis_results[0] is None: # 0 iteration
- self.flame.flame_log("0 iteration")
+ self.flame.flame_log("0 iteration")
return {"config": self.analyzer_config_dict}
-
+
elif analysis_results[0] == "finished":
self.flame.flame_log("aggregator finishes")
return self.hub_results
-
+
else:
- self.flame.flame_log("1 iteration")
+ self.flame.flame_log("1 iteration")
self.flame.flame_log("analysis results 1st iteration:") # 1 iteration
self.flame.flame_log(str(analysis_results))
# Health check
url = "http://localhost:7887"
if self.wait_for_mainzelliste():
- self.flame.flame_log("Connected to Mainzelliste...")
+ self.flame.flame_log("Connected to Mainzelliste...")
# Perform Matching on Bloomfilters
allowed_uses = 0
@@ -812,13 +833,13 @@ def aggregation_method(self, analysis_results):
continue
matches = {}
- for patient in bloom_data:
+ for patient in bloom_data:
getId = requests.post(addpatienturl, json = patient[1], headers = headers)
- if getId.status_code == 201:
- data = getId.json()
+ if getId.status_code == 201:
+ data = getId.json()
if data and isinstance(data, list):
pseudonym = data[0].get("idString", "Not found")
- matches[f"{patient[0]}"] = pseudonym
+ matches[f"{patient[0]}"] = pseudonym
elif getId.status_code == 409:
self.flame.flame_log("Possible Match occured - counted as Non Match")
@@ -828,15 +849,15 @@ def aggregation_method(self, analysis_results):
self.hub_results, node_results = self.all_nodes_intersect(matches_all_nodes)
self.flame.flame_log("node_results:")
self.flame.flame_log(str(node_results))
-
+
self.flame.flame_log("Mainzelliste is terminated...")
- self.mainzelliste.terminate()
+ self.mainzelliste.terminate()
try:
self.mainzelliste.wait(timeout=10)
except subprocess.TimeoutExpired:
self.flame.flame_log("Mainzelliste not responding. Force kill.")
self.mainzelliste.kill()
-
+
self.stop_postgres()
return node_results
else:
@@ -857,7 +878,7 @@ def has_converged(self, result, last_result, num_iterations):
if num_iterations >= 2:
return True
return False
-
+
def main():
"""
@@ -879,6 +900,6 @@ def main():
aggregator_kwargs=None # Additional keyword arguments for the custom aggregator constructor (i.e. MyAggregator)
)
-
+
if __name__ == "__main__":
main()