diff --git a/components/__init__.py b/components/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/components/assistant.py b/components/assistant.py new file mode 100644 index 0000000..b50aac9 --- /dev/null +++ b/components/assistant.py @@ -0,0 +1,97 @@ +"""Shared assistant helpers for Streamlit pages.""" +from __future__ import annotations + +from typing import Any, Dict, List + +import streamlit as st + +from services.ai_core import call_llm, load_page_config + + +def _offline_assistant_reply( + question: str, + context: Dict[str, Any], + config: Dict[str, Any], + history: List[Dict[str, str]], +) -> str: + context_lines = [f"- {key}: {value}" for key, value in context.items()] if context else [] + description = config.get( + "description", + "Review the provided context details and adjust your simulation inputs accordingly.", + ) + history_lines: List[str] = [] + if history: + history_lines.append("Recent conversation:") + for turn in history[-4:]: + speaker = "You" if turn["role"] == "user" else "Assistant" + history_lines.append(f"{speaker}: {turn['content']}") + message_parts = [ + "Assistant response (offline mode)", + description, + ] + if context_lines: + message_parts.append("Context snapshot:") + message_parts.extend(context_lines) + if history_lines: + message_parts.extend(history_lines) + message_parts.append(f"Echoing your question for reference: {question}") + message_parts.append("Consult the NeqSim documentation or in-app tooltips for deeper guidance.") + return "\n".join(message_parts) + + +def render_ai_helper(page_id: str, context: Dict[str, str]) -> None: + """Render a consistent assistant helper based on page configuration.""" + + config = load_page_config(page_id) + if not config: + return + + title = config.get("title", "AI helper") + description = config.get( + "description", + "Ask the assistant for guidance on configuring the simulation.", + ) + history_key = f"{page_id}_assistant_history" + if history_key not in st.session_state: + st.session_state[history_key] = [] + history: List[Dict[str, str]] = st.session_state[history_key] + + with st.expander(title, expanded=False): + st.write(description) + if context: + st.json(context) + if history: + st.markdown("### Conversation history") + for turn in history[-6:]: + speaker = "You" if turn["role"] == "user" else "Assistant" + st.markdown(f"**{speaker}:** {turn['content']}") + question = st.text_area("Ask a question", key=f"{page_id}_assistant") + if st.button("Get assistant reply", key=f"{page_id}_assistant_button") and question: + history.append({"role": "user", "content": question}) + conversation_lines = [ + f"{('User' if turn['role'] == 'user' else 'Assistant')}: {turn['content']}" + for turn in history[-6:] + ] + prompt_parts = [ + "Use the JSON context to assist with NeqSim simulations.", + f"Context: {context}", + ] + if conversation_lines: + prompt_parts.append("Prior conversation:\n" + "\n".join(conversation_lines)) + prompt_parts.append(f"Current question: {question}") + prompt = "\n\n".join(prompt_parts) + system_prompt = config.get( + "system_prompt", "Provide helpful tips based on the context JSON and conversation history." + ) + + response = call_llm( + prompt, + system_prompt=system_prompt, + offline_fallback=lambda *_: _offline_assistant_reply( + question, context, config, history + ), + ) + history.append({"role": "assistant", "content": response.text}) + st.session_state[history_key] = history + st.session_state[f"{page_id}_assistant"] = "" + st.markdown(response.text) diff --git a/components/chat.py b/components/chat.py new file mode 100644 index 0000000..5271a54 --- /dev/null +++ b/components/chat.py @@ -0,0 +1,83 @@ +"""Reusable chat widget for conversational what-if analysis.""" +from __future__ import annotations + +import re +from typing import Callable, Tuple + +import pandas as pd +import streamlit as st + +from services.ai import summarize_flash_results + +_TEMPERATURE_DIRECTIVE = re.compile(r"(-?\d+(?:\.\d+)?)\s*(?:c|°c|degc)", re.IGNORECASE) +_PRESSURE_DIRECTIVE = re.compile(r"(-?\d+(?:\.\d+)?)\s*(?:bar|bara)", re.IGNORECASE) + + +def _apply_adjustments(schedule: pd.DataFrame, message: str) -> pd.DataFrame: + updated = schedule.copy(deep=True) + temp_matches = list(_TEMPERATURE_DIRECTIVE.finditer(message)) + pres_matches = list(_PRESSURE_DIRECTIVE.finditer(message)) + if temp_matches: + target_temp = float(temp_matches[-1].group(1)) + updated['Temperature (C)'] = target_temp + if pres_matches: + target_pres = float(pres_matches[-1].group(1)) + updated['Pressure (bara)'] = target_pres + return updated + + +def render_what_if_chat( + fluid_df: pd.DataFrame, + schedule_df: pd.DataFrame, + is_plus_fluid: bool, + run_flash: Callable[[pd.DataFrame, pd.DataFrame, bool], Tuple[pd.DataFrame, float, float]], +) -> None: + """Render a sidebar chat widget that can iterate on TP flash scenarios.""" + + chat_key = "what_if_chat_history" + if chat_key not in st.session_state: + st.session_state[chat_key] = [] + history = st.session_state[chat_key] + if history and isinstance(history[0], tuple): + st.session_state[chat_key] = [ + {"role": entry[0], "content": entry[1]} for entry in history if len(entry) >= 2 + ] + history = st.session_state[chat_key] + + with st.sidebar.expander("What-if assistant", expanded=False): + st.markdown("Enter adjustments like 'try 40 °C at 15 bar' to explore new flashes.") + user_message = st.text_input("Ask the assistant", key="what_if_prompt") + if st.button("Send", key="what_if_send") and user_message: + history.append({"role": "user", "content": user_message}) + updated_schedule = _apply_adjustments(schedule_df, user_message) + results_df, last_temp, last_pres = run_flash(fluid_df, updated_schedule, is_plus_fluid) + prior_turns = [ + f"{('User' if turn['role'] == 'user' else 'Assistant')}: {turn['content']}" + for turn in history[-6:] + ] + scenario_context = { + "temperature": f"{last_temp:.2f} °C" if not pd.isna(last_temp) else "n/a", + "pressure": f"{last_pres:.2f} bara" if not pd.isna(last_pres) else "n/a", + "latest_request": user_message, + } + if prior_turns: + scenario_context["recent_conversation"] = "\n".join(prior_turns) + summary = summarize_flash_results( + results_df, + scenario_context, + ) + history.append( + { + "role": "assistant", + "content": summary, + "temperature": last_temp, + "pressure": last_pres, + } + ) + st.session_state[chat_key] = history + st.session_state["what_if_prompt"] = "" + st.experimental_rerun() + + for turn in history[-8:]: + speaker = "You" if turn["role"] == "user" else "Assistant" + st.markdown(f"**{speaker}:** {turn['content']}") diff --git a/configs/ai_pages.json b/configs/ai_pages.json new file mode 100644 index 0000000..3f3f857 --- /dev/null +++ b/configs/ai_pages.json @@ -0,0 +1,22 @@ +{ + "tp_flash": { + "title": "TP Flash assistant", + "description": "Discuss how to prepare feed compositions and interpret TP flash outputs.", + "system_prompt": "You are an experienced thermodynamics engineer helping colleagues run NeqSim simulations." + }, + "gas_hydrate": { + "title": "Hydrate assistant", + "description": "Ask about hydrate inhibitors, composition tips, or interpreting the hydrate curve.", + "system_prompt": "Provide practical gas hydrate modelling advice for offshore process engineers." + }, + "lng_ageing": { + "title": "LNG ageing assistant", + "description": "Get tips on simulating LNG weathering scenarios and selecting compositions.", + "system_prompt": "Guide the user through LNG ageing simulations with a focus on safety considerations." + }, + "property_generator": { + "title": "Property helper", + "description": "Ask how to compute or interpret physical properties in the generator tool.", + "system_prompt": "Support users by describing property correlations and required inputs." + } +} diff --git a/docs/ai.md b/docs/ai.md new file mode 100644 index 0000000..ed5de57 --- /dev/null +++ b/docs/ai.md @@ -0,0 +1,28 @@ +# AI-assisted workflows + +This project now ships with lightweight AI scaffolding to help configure and interpret +NeqSim simulations. The helpers now include deterministic “offline mode” templates so the +Streamlit UI remains fully functional without API credentials. When a valid +`st.make_request` integration is available the helpers automatically forward prompts to the +configured LLM provider. + +## Available assistants + +* **TP Flash** – supports natural-language scenario planning, validation guidance, result + summaries, knowledge lookups, and a conversational what-if exploration widget. +* **Gas Hydrate, LNG Ageing, Property Generator** – each page exposes a contextual + assistant that reuses the shared helper framework defined in `components/assistant.py`. + +Both the shared helper and the what-if widget maintain per-session conversation history so +follow-up prompts automatically include recent context, whether the app is online or using +offline fallbacks. + +## Extending the system + +1. Add metadata for the new page to `configs/ai_pages.json`. +2. Import and call `render_ai_helper` with any runtime context you want to surface. +3. If the feature requires model interaction, build a helper in `services/ai.py` or + `services/retrieval.py` and keep API access logic centralized in `services/ai_core.py`. + +This structure keeps prompts, retrieval logic, and UI wiring modular so additional +Streamlit pages can opt in with only a few lines of code. diff --git a/pages/0_TP_flash.py b/pages/0_TP_flash.py index eb4d494..d8d20a5 100644 --- a/pages/0_TP_flash.py +++ b/pages/0_TP_flash.py @@ -1,32 +1,40 @@ -import streamlit as st +import math +from typing import Dict, Tuple + import pandas as pd -import time +import streamlit as st import neqsim from neqsim.thermo.thermoTools import fluidcreator, fluid_df, TPflash, dataFrame + from fluids import default_fluid +from services.ai import ( + create_issue_messages, + explain_validation_issue, + plan_scenario, + summarize_flash_results, +) +from services.retrieval import query_equilibrium_knowledge +from components.chat import render_what_if_chat +from components.assistant import render_ai_helper + st.title('TP flash') +render_ai_helper( + "tp_flash", + { + "default_components": len(default_fluid['ComponentName']), + "default_schedule_rows": 2, + }, +) + """ The NeqSim flash model will select the best thermodynamic model based on the fluid composition. For fluids containing polar components it will use the CPA-EoS. For non-polar fluids it will use the SRK/PR-EoS. The flash will calculate the phase equilibrium for given composition at the specified temperatures and pressures. You can select components from a predifined component list. Alterative component names ([see available components](https://github.com/equinor/neqsim/blob/master/src/main/resources/data/COMP.csv)) can be used by manually editing the table. """ -st.divider() -st.text("Set fluid composition:") - -hidecomponents = st.checkbox('Show active components') -if hidecomponents: - st.edited_df['MolarComposition[-]'] = st.edited_df['MolarComposition[-]'] - st.session_state.activefluid_df = st.edited_df[st.edited_df['MolarComposition[-]'] > 0] -if 'uploaded_file' in st.session_state and hidecomponents == False: - try: - st.session_state.activefluid_df = pd.read_csv(st.session_state.uploaded_file) - numeric_columns = ['MolarComposition[-]', 'MolarMass[kg/mol]', 'RelativeDensity[-]'] - st.session_state.activefluid_df[numeric_columns] = st.session_state.activefluid_df[numeric_columns].astype(float) - except: - st.session_state.activefluid_df = pd.DataFrame(default_fluid) +st.divider() if 'activefluid_df' not in st.session_state or st.session_state.get('activefluid_name') != 'default_fluid': st.session_state.activefluid_df = pd.DataFrame(default_fluid) @@ -34,92 +42,208 @@ if 'tp_flash_data' not in st.session_state: st.session_state['tp_flash_data'] = pd.DataFrame({ - 'Temperature (C)': [20.0, 25.0], # Default example temperature - 'Pressure (bara)': [1.0, 10.0] # Default example pressure + 'Temperature (C)': [20.0, 25.0], + 'Pressure (bara)': [1.0, 10.0] }) -st.edited_df = st.data_editor( - st.session_state.activefluid_df, +st.subheader('Natural-language scenario setup') +scenario_prompt = st.text_area( + 'Describe the fluid and conditions (e.g. "simulate 90% methane and 10% ethane at 40 °C and 30 bar")', + key='scenario_prompt', +) +if st.button('Apply scenario plan') and scenario_prompt.strip(): + scenario = plan_scenario(scenario_prompt) + st.session_state.activefluid_df = scenario['fluid'] + st.session_state.tp_flash_data = scenario['schedule'] + st.success('Scenario applied to the editors below.') + +st.divider() +st.text('Set fluid composition:') + +uploaded_file = st.sidebar.file_uploader( + 'Import Fluid', + key='uploaded_file', + help='Fluids can be saved by hovering over the fluid window and clicking the "Download as CSV" button in the upper-right corner.', +) +if uploaded_file is not None: + try: + imported_df = pd.read_csv(uploaded_file) + numeric_columns = ['MolarComposition[-]', 'MolarMass[kg/mol]', 'RelativeDensity[-]'] + imported_df[numeric_columns] = imported_df[numeric_columns].astype(float) + st.session_state.activefluid_df = imported_df + except Exception as exc: # pragma: no cover - user facing guard + st.warning(f'Could not import the provided file ({exc}). Falling back to defaults.') + st.session_state.activefluid_df = pd.DataFrame(default_fluid) + + +hidecomponents = st.checkbox('Show active components only') +if hidecomponents: + editor_df = st.session_state.activefluid_df[st.session_state.activefluid_df['MolarComposition[-]'] > 0].copy() + editor_df = editor_df.reset_index(drop=True) +else: + editor_df = st.session_state.activefluid_df + +edited_df = st.data_editor( + editor_df, column_config={ - "ComponentName": "Component Name", - "MolarComposition[-]": st.column_config.NumberColumn("Molar Composition [-]", min_value=0, max_value=10000, format="%f"), - "MolarMass[kg/mol]": st.column_config.NumberColumn( - "Molar Mass [kg/mol]", min_value=0, max_value=10000, format="%f kg/mol" + + 'ComponentName': 'Component Name', + 'MolarComposition[-]': st.column_config.NumberColumn( + 'Molar Composition [-]', min_value=0.0, max_value=10000.0, format='%f' ), - "RelativeDensity[-]": st.column_config.NumberColumn( - "Density [gr/cm3]", min_value=1e-10, max_value=10.0, format="%f gr/cm3" + 'MolarMass[kg/mol]': st.column_config.NumberColumn( + 'Molar Mass [kg/mol]', min_value=0.0, max_value=10000.0, format='%f kg/mol' ), + 'RelativeDensity[-]': st.column_config.NumberColumn( + 'Density [gr/cm3]', min_value=1e-10, max_value=10.0, format='%f gr/cm3' + ), + }, -num_rows='dynamic') + num_rows='dynamic', +) +if hidecomponents: + base_df = st.session_state.activefluid_df.copy() + active_mask = base_df['MolarComposition[-]'] > 0 + base_df.loc[active_mask, :] = edited_df.values + st.session_state.activefluid_df = base_df +else: + st.session_state.activefluid_df = edited_df + isplusfluid = st.checkbox('Plus Fluid') -st.text("Fluid composition will be normalized before simulation") +st.text('Fluid composition will be normalized before simulation') st.divider() -# Use st.data_editor for inputting temperature and pressure -st.text("Input Pressures and Temperatures") -st.edited_dfTP = st.data_editor( +st.text('Input Pressures and Temperatures') + +st.session_state.tp_flash_data = st.data_editor( st.session_state.tp_flash_data.dropna().reset_index(drop=True), - num_rows='dynamic', # Allows dynamic number of rows + num_rows='dynamic', column_config={ 'Temperature (C)': st.column_config.NumberColumn( - label="Temperature (C)", - min_value=-273.15, # Minimum temperature in Celsius - max_value=1000, # Maximum temperature in Celsius - format='%f', # Decimal format - help='Enter the temperature in degrees Celsius.' # Help text for guidance + label='Temperature (C)', + min_value=-273.15, + max_value=1000.0, + format='%f', + help='Enter the temperature in degrees Celsius.' ), 'Pressure (bara)': st.column_config.NumberColumn( - label="Pressure (bara)", - min_value=0.0, # Minimum pressure - max_value=1000, # Maximum pressure - format='%f', # Decimal format - help='Enter the pressure in bar absolute.' # Help text for guidance + label='Pressure (bara)', + min_value=0.0, + max_value=1000.0, + format='%f', + help='Enter the pressure in bar absolute.' ), - } + }, ) -if st.button('Run TP Flash Calculations'): - if st.edited_df['MolarComposition[-]'].sum() > 0: - # Check if the dataframe is empty - if st.session_state.tp_flash_data.empty: - st.error('No data to perform calculations. Please input temperature and pressure values.') + +def _normalize_fluid(df: pd.DataFrame) -> pd.DataFrame: + total = df['MolarComposition[-]'].sum() + if total > 0: + normalized = df.copy() + normalized['MolarComposition[-]'] = normalized['MolarComposition[-]'] / total + return normalized + return df + + +def validate_inputs(fluid_frame: pd.DataFrame, schedule_frame: pd.DataFrame) -> Tuple[Dict[str, str], ...]: + issues = create_issue_messages(fluid_frame, schedule_frame) + return tuple(issues) + + +def run_flash_simulation( + fluid_frame: pd.DataFrame, + schedule_frame: pd.DataFrame, + is_plus: bool, +) -> Tuple[pd.DataFrame, float, float]: + if schedule_frame.empty: + return pd.DataFrame(), math.nan, math.nan + + neqsim_fluid = fluid_df(fluid_frame, lastIsPlusFraction=is_plus, add_all_components=False).autoSelectModel() + results_list = [] + last_temp = math.nan + last_pres = math.nan + + for _, row in schedule_frame.dropna().iterrows(): + temp = float(row['Temperature (C)']) + pressure = float(row['Pressure (bara)']) + neqsim_fluid.setPressure(pressure, 'bara') + neqsim_fluid.setTemperature(temp, 'C') + TPflash(neqsim_fluid) + results_list.append(dataFrame(neqsim_fluid)) + last_temp = temp + last_pres = pressure + + combined_results = pd.concat(results_list, ignore_index=True) if results_list else pd.DataFrame() + return combined_results, last_temp, last_pres + + +issues = validate_inputs(st.session_state.activefluid_df, st.session_state.tp_flash_data) +if issues: + for issue in issues: + if issue['level'] == 'error': + st.error(issue['message']) else: - # Initialize a list to store results - results_list = [] - neqsim_fluid = fluid_df(st.edited_df, lastIsPlusFraction=isplusfluid, add_all_components=False).autoSelectModel() - - # Iterate over each row and perform calculations - for idx, row in st.edited_dfTP.dropna().iterrows(): - temp = row['Temperature (C)'] - pressure = row['Pressure (bara)'] - neqsim_fluid.setPressure(pressure, 'bara') - neqsim_fluid.setTemperature(temp, 'C') - TPflash(neqsim_fluid) - #results_df = st.data_editor(dataFrame(neqsim_fluid)) - results_list.append(dataFrame(neqsim_fluid)) - + st.warning(issue['message']) + explanation = explain_validation_issue(issues, st.session_state.activefluid_df, st.session_state.tp_flash_data) + st.info(explanation) + if any(issue['code'] == 'composition_normalize' for issue in issues): + if st.button('Normalize compositions automatically'): + st.session_state.activefluid_df = _normalize_fluid(st.session_state.activefluid_df) + st.experimental_rerun() + +run_results = None +last_temp = math.nan +last_pres = math.nan + +if st.button('Run TP Flash Calculations'): + if any(issue['level'] == 'error' for issue in issues): + st.error('Resolve input errors before running the simulation.') + else: + normalized_fluid = _normalize_fluid(st.session_state.activefluid_df) + run_results, last_temp, last_pres = run_flash_simulation( + normalized_fluid, + st.session_state.tp_flash_data, + isplusfluid, + ) + if run_results is not None and not run_results.empty: st.success('Flash calculations finished successfully!') - st.subheader("Results:") - # Combine all results into a single dataframe - combined_results = pd.concat(results_list, ignore_index=True) - - # Display the combined results - #st.subheader('Combined TP Flash Results') - #st.dataframe(combined_results) - results_df = st.data_editor(combined_results) + st.subheader('Results:') + results_df = st.data_editor(run_results) + st.session_state['tp_flash_last_results'] = run_results + component_names = normalized_fluid['ComponentName'][normalized_fluid['MolarComposition[-]'] > 0] + knowledge_results = query_equilibrium_knowledge( + component_names.tolist(), + last_temp if not math.isnan(last_temp) else 0.0, + last_pres if not math.isnan(last_pres) else 0.0, + ) st.divider() - list1 = neqsim_fluid.getComponentNames() - l1 = list(list1) - string_list = [str(element) for element in l1] - delimiter = ", " - result_string = delimiter.join(string_list) - try: - input = "What scientific experimental equilibrium data are available for mixtures of " + result_string + " at temperature around " + str(temp) + " Celsius and pressure around " + str(pressure) + " bar." - openapitext = st.make_request(input) - st.write(openapitext) - except: - st.write('OpenAI key needed for data analysis') - else: - st.error('The sum of Molar Composition must be greater than 0. Please adjust your inputs.') - -st.sidebar.file_uploader("Import Fluid", key='uploaded_file', help='Fluids can be saved by hovering over the fluid window and clicking the "Download as CSV" button in the upper-right corner.') + st.subheader('Knowledge base insights') + for entry in knowledge_results: + st.markdown(f"**{entry['title']}** (confidence {entry['confidence']:.0%})") + st.markdown(entry['summary']) + st.caption(f"Source: {entry['source']}") + + st.divider() + if st.checkbox('Summarize results with AI'): + context = { + 'temperature': f'{last_temp:.2f} °C' if not math.isnan(last_temp) else 'n/a', + 'pressure': f'{last_pres:.2f} bara' if not math.isnan(last_pres) else 'n/a', + } + summary = summarize_flash_results(run_results, context) + st.markdown(summary) + st.download_button( + 'Download summary as Markdown', + data=summary, + file_name='tp_flash_summary.md', + mime='text/markdown', + ) + else: + st.error('No results were produced. Check your inputs and try again.') + +render_what_if_chat( + _normalize_fluid(st.session_state.activefluid_df), + st.session_state.tp_flash_data, + isplusfluid, + run_flash_simulation, +) diff --git a/pages/10_Gas_Hydrate.py b/pages/10_Gas_Hydrate.py index 33607fe..17a0850 100644 --- a/pages/10_Gas_Hydrate.py +++ b/pages/10_Gas_Hydrate.py @@ -6,7 +6,16 @@ from fluids import default_fluid import matplotlib.pyplot as plt +from components.assistant import render_ai_helper + st.title('Gas Hydrate Calculation') +render_ai_helper( + 'gas_hydrate', + { + 'requires_water': True, + 'default_pressure_points': 4, + }, +) """ Gas hydrate calculations are done using the CPA-EoS combined with a model for the solid hydrate phase. """ diff --git a/pages/40_LNGageing.py b/pages/40_LNGageing.py index b32d7b4..cb7c18e 100644 --- a/pages/40_LNGageing.py +++ b/pages/40_LNGageing.py @@ -4,6 +4,8 @@ import pandas as pd from fluids import lng_fluid from neqsim.thermo.thermoTools import fluid_df + +from components.assistant import render_ai_helper from io import BytesIO col1, col2 = st.columns([30,70]) @@ -11,6 +13,13 @@ with col2: # Streamlit page configuration st.title('LNG Ageing Simulation') + render_ai_helper( + 'lng_ageing', + { + 'default_samples': 5, + 'supports_weathering': True, + }, + ) with col1: st.image('images/LNGship.jpg') diff --git a/pages/50_Property Generator.py b/pages/50_Property Generator.py index 79edbab..6489f1a 100644 --- a/pages/50_Property Generator.py +++ b/pages/50_Property Generator.py @@ -1,4 +1,6 @@ import streamlit as st + +from components.assistant import render_ai_helper import pandas as pd import numpy as np import matplotlib.pyplot as plt @@ -215,6 +217,12 @@ def get_phase_number(fluid, p_name): def main(): st.title("Property Generator") + render_ai_helper( + 'property_generator', + { + 'supports_components': True, + }, + ) st.write(""" This application allows you to define a fluid composition, set up a grid of temperatures and pressures, diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/ai.py b/services/ai.py new file mode 100644 index 0000000..3e4b6aa --- /dev/null +++ b/services/ai.py @@ -0,0 +1,207 @@ +"""High level AI helpers reused by multiple Streamlit pages.""" +from __future__ import annotations + +import math +import re +from typing import Dict, Iterable, List, Optional, Tuple + +import pandas as pd + +from fluids import default_fluid + +from .ai_core import call_llm + +_COMPONENT_PATTERN = re.compile( + r"(?P[-+]?\d+(?:\.\d+)?)\s*(?:%|percent|mole\s*%|mol\s*%)\s*(?:of\s+)?(?P[A-Za-z0-9\-\s]+)", + flags=re.IGNORECASE, +) +_PRESSURE_PATTERN = re.compile(r"(\-?\d+(?:\.\d+)?)\s*(?:bar|bara|barg)", flags=re.IGNORECASE) +_TEMPERATURE_PATTERN = re.compile(r"(\-?\d+(?:\.\d+)?)\s*(?:c|°c|degc|deg\s*c|celsius)", flags=re.IGNORECASE) + + +def _empty_fluid_frame() -> pd.DataFrame: + return pd.DataFrame(default_fluid) + + +def _match_component(name: str, candidates: Iterable[str]) -> str: + target = name.strip().lower() + for candidate in candidates: + if candidate.lower() == target: + return candidate + # fallback: partial match + for candidate in candidates: + if target in candidate.lower() or candidate.lower() in target: + return candidate + return name.strip() + + +def plan_scenario(prompt: str) -> Dict[str, pd.DataFrame]: + """Translate a natural-language description into fluid and TP schedules.""" + + base_df = _empty_fluid_frame() + fluid_df = base_df.copy(deep=True) + + matches = list(_COMPONENT_PATTERN.finditer(prompt)) + if matches: + fluid_df['MolarComposition[-]'] = 0.0 + component_names = fluid_df['ComponentName'].tolist() + for match in matches: + amount = float(match.group('amount')) + name = _match_component(match.group('name'), component_names) + fluid_df.loc[fluid_df['ComponentName'] == name, 'MolarComposition[-]'] = amount + total = fluid_df['MolarComposition[-]'].sum() + if total > 0: + fluid_df['MolarComposition[-]'] = fluid_df['MolarComposition[-]'] / total + # If no explicit matches were found, return the defaults so the UI remains usable + + pressures = [float(match.group(1)) for match in _PRESSURE_PATTERN.finditer(prompt)] + temperatures = [float(match.group(1)) for match in _TEMPERATURE_PATTERN.finditer(prompt)] + schedule_rows: List[Tuple[float, float]] = [] + max_len = max(len(pressures), len(temperatures), 1) + for idx in range(max_len): + temp = temperatures[idx] if idx < len(temperatures) else temperatures[0] if temperatures else 20.0 + pres = pressures[idx] if idx < len(pressures) else pressures[0] if pressures else 1.0 + schedule_rows.append((temp, pres)) + schedule_df = pd.DataFrame(schedule_rows, columns=['Temperature (C)', 'Pressure (bara)']) + return {"fluid": fluid_df, "schedule": schedule_df} + + +def _compose_validation_guidance(issues: List[Dict[str, str]]) -> str: + if not issues: + return "Inputs look good—no validation issues detected." + + bullet_lines = [f"• {issue['message']}" for issue in issues] + suggestions: List[str] = [] + for issue in issues: + code = issue.get("code") + if code == "composition_zero": + suggestions.append( + "Assign a positive molar composition to at least one component before running the flash." + ) + elif code == "composition_normalize": + suggestions.append( + "Use the normalization helper or scale the composition values so they sum to 1.0." + ) + elif code == "schedule_empty": + suggestions.append( + "Add at least one temperature/pressure pair to the schedule table." + ) + elif code == "temperature_range": + suggestions.append( + "Review extreme temperatures and confirm they are intentional for the model setup." + ) + elif code == "pressure_positive": + suggestions.append("Ensure all pressures are positive values in bara.") + + message_parts = ["Validation checks highlighted the following issues:"] + message_parts.extend(bullet_lines) + if suggestions: + message_parts.append("Suggested fixes:") + message_parts.extend(f"- {tip}" for tip in suggestions) + return "\n".join(message_parts) + + +def explain_validation_issue(issues: List[Dict[str, str]], fluid: pd.DataFrame, schedule: pd.DataFrame) -> str: + """Compose an AI explanation for validation warnings.""" + + issue_lines = "\n".join(f"- {issue['message']}" for issue in issues) + prompt = ( + "You are assisting with thermodynamic simulations. The user provided " + "the following validation feedback:\n" + f"{issue_lines}\n\nSummarize the problems and propose concrete fixes." + ) + fallback_message = _compose_validation_guidance(issues) + response = call_llm(prompt, offline_fallback=lambda *_: fallback_message) + return response.text + + +def summarize_flash_results(results_df: pd.DataFrame, scenario_context: Dict[str, str]) -> str: + """Create a high-level summary of TP flash outputs.""" + + if results_df.empty: + return "No results available to summarize." + phase_counts = results_df['phase'].value_counts() if 'phase' in results_df.columns else pd.Series() + compositions = [] + for component_col in [col for col in results_df.columns if col.lower().startswith('x[') or col.lower().startswith('y[')]: + compositions.append(f"{component_col}: mean={results_df[component_col].mean():.3f}") + summary_parts = [ + "### Flash overview", + f"Simulations evaluated {len(results_df)} state points.", + ] + if scenario_context: + summary_parts.append( + "Key scenario inputs: " + ", ".join(f"{k}={v}" for k, v in scenario_context.items()) + ) + if not phase_counts.empty: + phase_text = ", ".join(f"{phase}: {count}" for phase, count in phase_counts.items()) + summary_parts.append(f"Phase occurrences: {phase_text}.") + if compositions: + summary_parts.append("Representative compositions:\n" + "\n".join(f"* {line}" for line in compositions[:6])) + prompt = "\n\n".join(summary_parts) + + def _fallback_summary(_: str, __: Optional[str]) -> str: + lines = summary_parts.copy() + if not results_df.empty: + numeric_cols = [col for col in results_df.columns if pd.api.types.is_numeric_dtype(results_df[col])] + if numeric_cols: + lines.append("Key numeric ranges:") + for column in numeric_cols[:6]: + series = results_df[column] + lines.append( + f"- {column}: min={series.min():.3g}, mean={series.mean():.3g}, max={series.max():.3g}" + ) + return "\n".join(lines) + + ai_response = call_llm( + prompt, + system_prompt=( + "Provide an approachable explanation for process engineers based on the " + "structured summary of TP flash simulation results." + ), + offline_fallback=_fallback_summary, + ) + return ai_response.text + + +def create_issue_messages(fluid: pd.DataFrame, schedule: pd.DataFrame) -> List[Dict[str, str]]: + issues: List[Dict[str, str]] = [] + total = fluid['MolarComposition[-]'].sum() + if total <= 0: + issues.append({ + "level": "error", + "code": "composition_zero", + "message": "Total molar composition must be greater than zero.", + }) + elif not math.isclose(total, 1.0, rel_tol=0.05): + issues.append({ + "level": "warning", + "code": "composition_normalize", + "message": ( + f"Molar composition sums to {total:.3f}, consider normalizing to 1.0." + ), + }) + if schedule.empty: + issues.append({ + "level": "error", + "code": "schedule_empty", + "message": "At least one temperature/pressure pair is required.", + }) + else: + for idx, row in schedule.iterrows(): + if row['Temperature (C)'] < -271 or row['Temperature (C)'] > 1000: + issues.append({ + "level": "warning", + "code": "temperature_range", + "message": ( + f"Row {idx + 1} temperature {row['Temperature (C)']}°C is outside recommended limits." + ), + }) + if row['Pressure (bara)'] <= 0: + issues.append({ + "level": "error", + "code": "pressure_positive", + "message": ( + f"Row {idx + 1} pressure {row['Pressure (bara)']} bara must be positive." + ), + }) + return issues diff --git a/services/ai_core.py b/services/ai_core.py new file mode 100644 index 0000000..dc8166c --- /dev/null +++ b/services/ai_core.py @@ -0,0 +1,106 @@ +"""Core utilities for interacting with AI backends used across the app.""" +from __future__ import annotations + +import json +import pathlib +from dataclasses import dataclass +from functools import lru_cache +from typing import Any, Callable, Dict, Optional, Union + +try: + import streamlit as st +except ImportError: # pragma: no cover - Streamlit not available in some environments + st = None # type: ignore + + +CONFIG_PATH = pathlib.Path(__file__).resolve().parent.parent / "configs" / "ai_pages.json" + + +@dataclass +class AIResponse: + """Container for AI responses to keep structure consistent.""" + + text: str + raw: Optional[Dict[str, Any]] = None + + +def _compose_prompt(prompt: str, system_prompt: Optional[str]) -> str: + if not system_prompt: + return prompt + return f"System:\n{system_prompt.strip()}\n\nUser:\n{prompt.strip()}" + + +OfflineFallback = Union[str, Callable[[str, Optional[str]], str]] + + +def _resolve_offline_fallback( + fallback: OfflineFallback, + prompt: str, + system_prompt: Optional[str], +) -> str: + if callable(fallback): + try: + return fallback(prompt, system_prompt) + except Exception: # pragma: no cover - guardrail for fallback utilities + pass + if isinstance(fallback, str): + return fallback + return ( + "(Simulated AI response) Set up a real API key via Streamlit secrets to replace this placeholder." + ) + + +def call_llm( + prompt: str, + *, + system_prompt: Optional[str] = None, + temperature: float = 0.2, + offline_fallback: Optional[OfflineFallback] = None, +) -> AIResponse: + """Attempt to call the configured LLM via Streamlit's ``make_request`` hook. + + Falls back to a simulated response when the hook is unavailable so that the + rest of the UI remains functional in offline development environments. + """ + + request_payload = _compose_prompt(prompt, system_prompt) + if st is not None and hasattr(st, "make_request"): + try: + response = st.make_request(request_payload) + text = response if isinstance(response, str) else json.dumps(response) + return AIResponse(text=text, raw={"response": response, "temperature": temperature}) + except Exception as exc: # pragma: no cover - runtime guard + if offline_fallback is not None: + fallback_text = _resolve_offline_fallback(offline_fallback, prompt, system_prompt) + return AIResponse( + text=fallback_text, + raw={"error": str(exc), "temperature": temperature, "offline": True}, + ) + fallback_text = ( + "AI request failed with error " + f"{exc}. Returning a diagnostic message so the UI can continue." + ) + return AIResponse(text=fallback_text) + if offline_fallback is not None: + fallback_text = _resolve_offline_fallback(offline_fallback, prompt, system_prompt) + return AIResponse(text=fallback_text, raw={"offline": True, "temperature": temperature}) + simulated = ( + "(Simulated AI response) " + "Set up a real API key via Streamlit secrets to replace this placeholder." + ) + return AIResponse(text=simulated, raw={"offline": True, "temperature": temperature}) + + +@lru_cache(maxsize=None) +def load_page_config(page_id: str) -> Dict[str, Any]: + """Load assistant configuration for a given page. + + The configuration file lives in ``configs/ai_pages.json`` and provides + metadata about inputs/outputs that the assistant can use to tailor prompts. + """ + + if not CONFIG_PATH.exists(): + return {} + with CONFIG_PATH.open("r", encoding="utf-8") as handle: + config = json.load(handle) + return config.get(page_id, {}) diff --git a/services/retrieval.py b/services/retrieval.py new file mode 100644 index 0000000..59d57b6 --- /dev/null +++ b/services/retrieval.py @@ -0,0 +1,46 @@ +"""Lightweight retrieval helper that simulates knowledge-base lookups.""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict, List, Optional + +from .ai_core import call_llm + + +@dataclass +class RetrievalResult: + title: str + summary: str + source: str + confidence: float + + +def query_equilibrium_knowledge(components: List[str], temperature: float, pressure: float) -> List[Dict[str, object]]: + """Return stubbed knowledge results filtered by component keywords.""" + + unique_components = sorted({component.strip() for component in components if component.strip()}) + component_list = ", ".join(unique_components) + prompt = ( + "Provide a short literature-style summary for equilibrium data that " + "matches the following mixture and operating conditions." + f"\nComponents: {component_list or 'default mixture'}\nTemperature: {temperature} °C\nPressure: {pressure} bara." + ) + + def _offline_summary(_: str, __: Optional[str]) -> str: + summary_lines = [ + "Knowledge base lookup (offline mode)", + f"Mixture components: {component_list or 'default composition'}.", + f"Operating window ≈ {temperature:.1f} °C and {pressure:.1f} bara.", + "Consult internal lab reports or literature tables for phase-equilibrium references at comparable conditions.", + "Use these insights as qualitative guidance until the live retrieval service is configured.", + ] + return "\n".join(summary_lines) + + response = call_llm(prompt, offline_fallback=_offline_summary) + simulated_result = RetrievalResult( + title="Thermodynamic insights", + summary=response.text, + source="internal-knowledge-base", + confidence=0.4, + ) + return [simulated_result.__dict__]