-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvariable_reader_engine.py
More file actions
220 lines (182 loc) · 8.72 KB
/
variable_reader_engine.py
File metadata and controls
220 lines (182 loc) · 8.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import json
import paramiko
import psycopg2
import io
import pandas as pd
import subprocess
import os
import time
import warnings
from collections import defaultdict
# Suppress FutureWarning from Pandas
warnings.simplefilter(action='ignore', category=FutureWarning)
# --- CONFIGURATION (MASKED) ---
DB_NAME = "YOUR_DATABASE_NAME"
DB_HOST = "YOUR_DB_HOST_ADDRESS"
DB_PORT = "5432"
def _parse_variables(variable_input):
"""Splits comma-separated variable string into a clean list."""
if not variable_input: return []
return list(set([v.strip() for v in variable_input.split(',') if v.strip()]))
def _format_results(found_values, searched_vars):
"""Formats the search results into a readable report."""
if not found_values:
return f"No values found for: {', '.join(searched_vars)}"
grouped = defaultdict(list)
for item in found_values:
grouped[item['variable']].append(item)
report_lines = []
sorted_vars = sorted(list(set(searched_vars)))
for var in sorted_vars:
if var not in grouped:
report_lines.append(f"Variable: {var}\n - Not found.\n")
continue
items = grouped[var]
report_lines.append(f"Variable: {var}")
unique_items = []
seen = set()
def sort_key(x):
return (x.get('date', '0000-00-00'), x.get('event', 'N/A'))
for item in sorted(items, key=sort_key):
sig = (item['variable'], item['value'], item.get('event'), item.get('source_file'))
if sig not in seen:
seen.add(sig)
unique_items.append(item)
for item in unique_items:
value = item['value']
details = []
if 'event' in item and item['event'] != 'N/A':
details.append(f"Event: {item['event']}")
if 'date' in item and item['date'] != 'N/A':
details.append(f"Date: {item['date']}")
if 'source_file' in item:
details.append(f"File: {item['source_file']}")
report_lines.append(f" - {value} ({', '.join(details)})")
report_lines.append("")
return "\n".join(report_lines).strip()
def _process_stream_data(chunk_iter, subject_id, target_vars, filename, found_values):
"""Processes CSV data chunks to find matching variables for a specific subject."""
# These column names might need adjustment based on your data schema
event_column = 'redcap_event_name'
date_column = 'interview_date'
for df in chunk_iter:
subject_col = None
for c in ['subject_id', 'src_subject_id', 'subjectkey']:
if c in df.columns:
subject_col = c
break
if subject_col:
clean_subject_id = str(subject_id).strip()
mask = df[subject_col].astype(str).str.strip() == clean_subject_id
subject_df = df[mask].copy()
else:
continue
if subject_df.empty: continue
for var_name in target_vars:
if var_name not in subject_df.columns: continue
for _, row in subject_df.iterrows():
val = str(row[var_name]).strip()
if val == "" or val.lower() == "nan": continue
event = row.get(event_column, 'N/A')
date_str = 'N/A'
if date_column in row and pd.notna(row[date_column]):
try:
date_str = pd.to_datetime(row[date_column]).strftime('%Y-%m-%d')
except: pass
found_values.append({
"variable": var_name,
"value": val,
"event": event,
"date": date_str,
"source_file": filename
})
# --- SSH SEARCH LOGIC ---
def _read_remote_csv_paramiko(ssh_client, remote_path, subject_id, target_vars, found_values):
filename = os.path.basename(remote_path)
try:
stdin, stdout, stderr = ssh_client.exec_command(f"cat \"{remote_path}\"", get_pty=False)
chunk_iter = pd.read_csv(stdout, chunksize=10000, dtype=str, keep_default_na=False, on_bad_lines='skip')
_process_stream_data(chunk_iter, subject_id, target_vars, filename, found_values)
except Exception as e:
print(f" [Error] Failed to read {filename}: {e}")
def _search_combined_csv(ssh_client, base_path, file_pattern, subject_id, variable_input):
target_vars = _parse_variables(variable_input)
found_values = []
search_glob = f"{base_path.rstrip('/')}/{file_pattern}"
cmd = f"grep -l '{subject_id}' {search_glob} 2>/dev/null"
try:
stdin, stdout, stderr = ssh_client.exec_command(cmd, timeout=30)
target_files = [f.strip() for f in stdout.read().decode('utf-8').split('\n') if f.strip()]
for filepath in target_files:
_read_remote_csv_paramiko(ssh_client, filepath, subject_id, target_vars, found_values)
except Exception as e:
return f"Error: {e}"
return _format_results(found_values, target_vars)
def _read_surveys_ssh(ssh_client, path, subject_id, variable_input):
target_vars = _parse_variables(variable_input)
found_values = []
try:
cmd = f"ls {path}*.json {path}*.csv 2>/dev/null"
stdin, stdout, stderr = ssh_client.exec_command(cmd, timeout=30)
files = [f.strip() for f in stdout.read().decode('utf-8').split('\n') if f.strip()]
for filepath in files:
filename = os.path.basename(filepath)
if filename.endswith('.json'):
stdin, stdout, stderr = ssh_client.exec_command(f"cat \"{filepath}\"")
data = json.loads(stdout.read().decode('utf-8'))
data_list = data if isinstance(data, list) else [data]
for entry in data_list:
for var in target_vars:
val = entry.get(var)
if val:
found_values.append({"variable": var, "value": str(val), "source_file": filename})
elif filename.endswith('.csv'):
_read_remote_csv_paramiko(ssh_client, filepath, subject_id, target_vars, found_values)
except Exception as e:
return f"Error: {e}"
return _format_results(found_values, target_vars)
# --- DISPATCHER ---
def read_from_ssh_server(source, host, user, password, site, subject_id, variable_input, search_type):
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if source == "NetworkA":
# MASKED PATH
path = f"/path/to/network_a/data/{site}/raw/{subject_id}/surveys/"
ssh.connect(host, username=user, timeout=15)
result = _read_surveys_ssh(ssh, path, subject_id, variable_input)
ssh.close()
return result
elif source == "Internal":
ssh.connect(host, username=user, password=password, timeout=15)
if search_type == "Surveys":
# MASKED PATH
p = f"/path/to/internal/surveys/{site}/{subject_id}/"
result = _read_surveys_ssh(ssh, p, subject_id, variable_input)
else:
# MASKED PATH
base = "/path/to/internal/combined_data/"
pat = "*.csv"
result = _search_combined_csv(ssh, base, pat, subject_id, variable_input)
ssh.close()
return result
except Exception as e: return f"Connection Error: {e}"
def read_from_database(db_user, db_password, subject_id, variable_input):
target_vars = _parse_variables(variable_input)
try:
conn = psycopg2.connect(dbname=DB_NAME, user=db_user, password=db_password, host=DB_HOST, port=DB_PORT)
cursor = conn.cursor()
# Ensure 'forms' schema and 'subject_id' column match your DB
cursor.execute("SELECT form_data, event_name FROM data_table WHERE subject_id = %s", (subject_id,))
results = cursor.fetchall()
cursor.close(); conn.close()
found_values = []
for row in results:
form_data, event_name = row
if isinstance(form_data, dict):
for var in target_vars:
val = form_data.get(var)
if val is not None:
found_values.append({"variable": var, "value": str(val), "event": event_name})
return _format_results(found_values, target_vars)
except Exception as e: return f"DB Error: {e}"