From 192bcfc606aeb535e7dfcbaf1fe42ba96208d0b3 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Wed, 4 Feb 2026 19:41:43 +0530 Subject: [PATCH 01/21] handled update query part --- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 6 ++- .../zeppelin/jdbc/ValidationRequest.java | 6 ++- .../zeppelin/jdbc/ValidationResponse.java | 45 +++++++++++++++++++ 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 2acd4846b72..22d1105e7f5 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -875,7 +875,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, .replace("\n", " ") .replace("\r", " ") .replace("\t", " "); - ValidationRequest request = new ValidationRequest(sqlToValidate, userName, interpreterName); + ValidationRequest request = new ValidationRequest(sqlToValidate, userName, interpreterName, sqlToExecute); try { ValidationResponse response = sendValidationRequest(request); if (response.isPreSubmitFail()) { @@ -917,7 +917,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, finalOutput.append("Use: ").append(jsonObject.getString(table)).append(" in place of ").append(table).append("\n"); } } - }else if (outputMessage.contains("UnAuthorized Query")) { + } else if (outputMessage.contains("UnAuthorized Query")) { context.out.write("Query Error: UnAuthorized Query\n"); finalOutput.append("You are not authorized to execute this query.\n"); } @@ -939,6 +939,8 @@ private InterpreterResult executeSql(String dbPrefix, String sql, context.out.write("%text " + message + "\n\n"); context.out.flush(); } + } if (response.isQueryUpdated()) { + sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : response.getQueryText(); } } catch (Exception e) { String error = "Error occurred while sending request " + e.getMessage(); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java index 23cf6f67a60..a9a2aecb532 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java @@ -4,15 +4,17 @@ public class ValidationRequest { private String queryText; private String user; private String interpreterName; + private String rawQueryText; - public ValidationRequest(String queryText, String user, String interpreterName) { + public ValidationRequest(String queryText, String user, String interpreterName, String rawQueryText) { this.queryText = queryText; this.user = user; this.interpreterName = interpreterName; + this.rawQueryText = rawQueryText; } public String toJson() { - return "{\"query_text\":\"" + queryText + "\",\"user\":\"" + user + "\",\"interpreter_name\":\"" + interpreterName + "\"}"; + return "{\"query_text\":\"" + queryText + "\",\"user\":\"" + user + "\",\"interpreter_name\":\"" + interpreterName + "\",\"raw_query_text\":\"" + rawQueryText + "\"}"; } } diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 759e6522883..a46111e7163 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -11,6 +11,9 @@ public class ValidationResponse { private String errorHeader; private String message; private String version; + private boolean isQueryUpdated; + private String queryText; + private String newQueryText; // Getters and Setters public boolean isPreSubmitFail() { @@ -61,6 +64,30 @@ public void setMessage(String message) { this.message = message; } + public boolean isQueryUpdated() { + return isQueryUpdated; + } + + public void setQueryUpdated(boolean isQueryUpdated) { + this.isQueryUpdated = isQueryUpdated; + } + + public String getQueryText() { + return queryText; + } + + public void setQueryText(String queryText) { + this.queryText = queryText; + } + + public String getNewQueryText() { + return newQueryText; + } + + public void setNewQueryText(String newQueryText) { + this.newQueryText = newQueryText; + } + public static ValidationResponse fromJson(String jsonResponse) { Gson gson = new Gson(); ValidationResponse response = new ValidationResponse(); @@ -94,6 +121,21 @@ public static ValidationResponse fromJson(String jsonResponse) { } else { response.setVersion("v1"); } + if (jsonObject.has("is_query_updated") && !jsonObject.get("is_query_updated").isJsonNull()) { + response.setQueryUpdated(jsonObject.get("is_query_updated").getAsBoolean()); + } else { + response.setQueryUpdated(false); + } + if (jsonObject.has("query_text") && !jsonObject.get("query_text").isJsonNull()) { + response.setQueryText(jsonObject.get("query_text").getAsString()); + } else { + response.setQueryText(""); + } + if (jsonObject.has("new_query_text") && !jsonObject.get("new_query_text").isJsonNull()) { + response.setNewQueryText(jsonObject.get("new_query_text").getAsString()); + } else { + response.setNewQueryText(null); + } } else { response.setPreSubmitFail(false); response.setFailFast(false); @@ -101,6 +143,9 @@ public static ValidationResponse fromJson(String jsonResponse) { response.setErrorHeader(""); // Default error header response.setMessage(""); // Default message response.setVersion("v1"); // Default version + response.setQueryUpdated(false); + response.setQueryText(""); + response.setNewQueryText(null); } return response; } From 0b1e5079a9f90429c15441123b08bb3b8081a105 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Thu, 5 Feb 2026 17:29:22 +0530 Subject: [PATCH 02/21] removed unused code --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 3 +-- .../apache/zeppelin/jdbc/ValidationResponse.java | 15 --------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 22d1105e7f5..39c8304f36b 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -939,9 +939,8 @@ private InterpreterResult executeSql(String dbPrefix, String sql, context.out.write("%text " + message + "\n\n"); context.out.flush(); } - } if (response.isQueryUpdated()) { - sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : response.getQueryText(); } + sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : response.getQueryText(); } catch (Exception e) { String error = "Error occurred while sending request " + e.getMessage(); String mess = e.getLocalizedMessage(); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index a46111e7163..9e012c13e4a 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -11,7 +11,6 @@ public class ValidationResponse { private String errorHeader; private String message; private String version; - private boolean isQueryUpdated; private String queryText; private String newQueryText; @@ -63,14 +62,6 @@ public String getMessage() { public void setMessage(String message) { this.message = message; } - - public boolean isQueryUpdated() { - return isQueryUpdated; - } - - public void setQueryUpdated(boolean isQueryUpdated) { - this.isQueryUpdated = isQueryUpdated; - } public String getQueryText() { return queryText; @@ -121,11 +112,6 @@ public static ValidationResponse fromJson(String jsonResponse) { } else { response.setVersion("v1"); } - if (jsonObject.has("is_query_updated") && !jsonObject.get("is_query_updated").isJsonNull()) { - response.setQueryUpdated(jsonObject.get("is_query_updated").getAsBoolean()); - } else { - response.setQueryUpdated(false); - } if (jsonObject.has("query_text") && !jsonObject.get("query_text").isJsonNull()) { response.setQueryText(jsonObject.get("query_text").getAsString()); } else { @@ -143,7 +129,6 @@ public static ValidationResponse fromJson(String jsonResponse) { response.setErrorHeader(""); // Default error header response.setMessage(""); // Default message response.setVersion("v1"); // Default version - response.setQueryUpdated(false); response.setQueryText(""); response.setNewQueryText(null); } From 7552bcb5a93a1e3f3434c89b9cb19628cc529089 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Thu, 5 Feb 2026 19:27:35 +0530 Subject: [PATCH 03/21] fixed issue --- .../src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 39c8304f36b..a1145f3b9c2 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -940,7 +940,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, context.out.flush(); } } - sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : response.getQueryText(); + sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute; } catch (Exception e) { String error = "Error occurred while sending request " + e.getMessage(); String mess = e.getLocalizedMessage(); From ec30ba900923a7db04925319eb150ce5c206d9d6 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Fri, 6 Feb 2026 11:11:23 +0530 Subject: [PATCH 04/21] bug fix --- .../main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index a1145f3b9c2..3962cb31e42 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -929,6 +929,8 @@ private InterpreterResult executeSql(String dbPrefix, String sql, String detailedMessage = response.getMessage(); context.getLocalProperties().put(CANCEL_REASON, detailedMessage); } + + sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute; cancel(context); return new InterpreterResult(Code.ERROR); @@ -939,8 +941,8 @@ private InterpreterResult executeSql(String dbPrefix, String sql, context.out.write("%text " + message + "\n\n"); context.out.flush(); } + sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute; } - sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute; } catch (Exception e) { String error = "Error occurred while sending request " + e.getMessage(); String mess = e.getLocalizedMessage(); From 06676096ee7e017292e407b63d6d2fb0b88a3297 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Sun, 8 Feb 2026 19:45:03 +0530 Subject: [PATCH 05/21] updated variable name --- .../zeppelin/jdbc/ValidationResponse.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 9e012c13e4a..428cbeaed22 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -11,7 +11,7 @@ public class ValidationResponse { private String errorHeader; private String message; private String version; - private String queryText; + private String rawQueryText; private String newQueryText; // Getters and Setters @@ -63,12 +63,12 @@ public void setMessage(String message) { this.message = message; } - public String getQueryText() { - return queryText; + public String getRawQueryText() { + return rawQueryText; } - public void setQueryText(String queryText) { - this.queryText = queryText; + public void setRawQueryText(String rawQueryText) { + this.rawQueryText = rawQueryText; } public String getNewQueryText() { @@ -112,10 +112,10 @@ public static ValidationResponse fromJson(String jsonResponse) { } else { response.setVersion("v1"); } - if (jsonObject.has("query_text") && !jsonObject.get("query_text").isJsonNull()) { - response.setQueryText(jsonObject.get("query_text").getAsString()); + if (jsonObject.has("raw_query_text") && !jsonObject.get("raw_query_text").isJsonNull()) { + response.setRawQueryText(jsonObject.get("raw_query_text").getAsString()); } else { - response.setQueryText(""); + response.setRawQueryText(""); } if (jsonObject.has("new_query_text") && !jsonObject.get("new_query_text").isJsonNull()) { response.setNewQueryText(jsonObject.get("new_query_text").getAsString()); @@ -129,7 +129,7 @@ public static ValidationResponse fromJson(String jsonResponse) { response.setErrorHeader(""); // Default error header response.setMessage(""); // Default message response.setVersion("v1"); // Default version - response.setQueryText(""); + response.setRawQueryText(""); response.setNewQueryText(null); } return response; From 11a248f41aa42b567daa997287777233d8c881d7 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Sun, 8 Feb 2026 20:44:33 +0530 Subject: [PATCH 06/21] bug fix --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 2 -- .../org/apache/zeppelin/jdbc/ValidationRequest.java | 13 ++++++++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 3962cb31e42..92a0a4ebdec 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -929,8 +929,6 @@ private InterpreterResult executeSql(String dbPrefix, String sql, String detailedMessage = response.getMessage(); context.getLocalProperties().put(CANCEL_REASON, detailedMessage); } - - sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute; cancel(context); return new InterpreterResult(Code.ERROR); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java index a9a2aecb532..3410ba4f9cb 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java @@ -1,9 +1,19 @@ package org.apache.zeppelin.jdbc; +import com.google.gson.Gson; +import com.google.gson.annotations.SerializedName; + public class ValidationRequest { + @SerializedName("query_text") private String queryText; + + @SerializedName("user") private String user; + + @SerializedName("interpreter_name") private String interpreterName; + + @SerializedName("raw_query_text") private String rawQueryText; public ValidationRequest(String queryText, String user, String interpreterName, String rawQueryText) { @@ -14,7 +24,8 @@ public ValidationRequest(String queryText, String user, String interpreterName, } public String toJson() { - return "{\"query_text\":\"" + queryText + "\",\"user\":\"" + user + "\",\"interpreter_name\":\"" + interpreterName + "\",\"raw_query_text\":\"" + rawQueryText + "\"}"; + Gson gson = new Gson(); + return gson.toJson(this); } } From 7935112ffac857b82cec7fec8857ab6531572f37 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Tue, 17 Feb 2026 14:39:46 +0530 Subject: [PATCH 07/21] added logging --- .../main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index c7dde1b1853..cea1eb9e931 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -949,6 +949,11 @@ private InterpreterResult executeSql(String sql, String detailedMessage = response.getMessage(); context.getLocalProperties().put(CANCEL_REASON, detailedMessage); } + + context.out.write("Interpreter context: " + context.toString()); + context.out.write("basePropertiesMap: " + basePropertiesMap.toString()); + context.out.write("jdbcUserConfigurationsMap: " + jdbcUserConfigurationsMap.toString()); + context.out.write("jdbcUserConfigurations.getProperty(): " + getJDBCConfiguration(user).getProperty().toString()); cancel(context); return new InterpreterResult(Code.ERROR); From a979a9cfb22dc08c0e823d7ae7682f8dc4471885 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Tue, 17 Feb 2026 20:42:39 +0530 Subject: [PATCH 08/21] updated to handle new jdbc url --- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 61 ++++++++++++++----- .../zeppelin/jdbc/ValidationRequest.java | 6 +- .../zeppelin/jdbc/ValidationResponse.java | 30 +++++++++ 3 files changed, 80 insertions(+), 17 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index cea1eb9e931..619ecbf8f4e 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -580,6 +580,16 @@ private Connection getConnectionFromPool(String url, String user, public Connection getConnection(InterpreterContext context) throws ClassNotFoundException, SQLException, InterpreterException, IOException { + return getConnection(context, null); + } + + /** + * Get connection with optional URL override + * @param context Interpreter context + * @param overrideUrl URL to use instead of default (pass null or empty string to use default) + */ + public Connection getConnection(InterpreterContext context, String overrideUrl) + throws ClassNotFoundException, SQLException, InterpreterException, IOException { if (basePropertiesMap.get(DEFAULT_KEY) == null) { LOGGER.warn("No default config"); @@ -592,7 +602,16 @@ public Connection getConnection(InterpreterContext context) setUserProperty(context); final Properties properties = jdbcUserConfigurations.getProperty(); - String url = properties.getProperty(URL_KEY); + + // Use override URL if provided, otherwise use default + String url = (overrideUrl != null && !overrideUrl.isEmpty()) + ? overrideUrl + : properties.getProperty(URL_KEY); + + if (overrideUrl != null && !overrideUrl.isEmpty()) { + LOGGER.info("Using override URL: {}", overrideUrl); + } + url = appendProxyUserToURL(url, user); String connectionUrl = appendTagsToURL(url, context); validateConnectionUrl(connectionUrl); @@ -819,8 +838,32 @@ private InterpreterResult executeSql(String sql, String paragraphId = context.getParagraphId(); String user = getUser(context); + String interpreterName = getInterpreterGroup().getId(); + + String sqlToValidate = sql + .replace("\n", " ") + .replace("\r", " ") + .replace("\t", " "); + + String targetJdbcUrl = getJDBCConfiguration(user).getProperty().getProperty(URL_KEY); + + ValidationRequest request = new ValidationRequest(sqlToValidate, userName, + interpreterName, sql, targetJdbcUrl); + ValidationResponse response = null; + + try { + response = sendValidationRequest(request); + + if (response.getRawJdbcUrl() != null && + !response.getRawJdbcUrl().isEmpty()) { + targetJdbcUrl = response.getRawJdbcUrl(); + } + } catch (Exception e) { + LOGGER.warn("Failed to call validation API: {}", e); + } + try { - connection = getConnection(context); + connection = getConnection(context, targetJdbcUrl); } catch (IllegalArgumentException e) { LOGGER.error("Cannot run " + sql, e); return new InterpreterResult(Code.ERROR, "Connection URL contains improper configuration"); @@ -856,8 +899,6 @@ private InterpreterResult executeSql(String sql, LOGGER.info("Execute sql: " + sqlToExecute); statement = connection.createStatement(); - String interpreterName = getInterpreterGroup().getId(); - if (interpreterName != null && interpreterName.startsWith("spark_rca_")) { statement.setQueryTimeout(10800); // 10800 seconds = 3 hours } @@ -890,14 +931,7 @@ private InterpreterResult executeSql(String sql, Boolean.parseBoolean(getProperty("hive.log.display", "true")), this); } - String userName = getUser(context); - String sqlToValidate = sqlToExecute - .replace("\n", " ") - .replace("\r", " ") - .replace("\t", " "); - ValidationRequest request = new ValidationRequest(sqlToValidate, userName, interpreterName, sqlToExecute); try { - ValidationResponse response = sendValidationRequest(request); if (response.isPreSubmitFail()) { if(response.getVersion() == "v1") { String outputMessage = response.getMessage(); @@ -949,11 +983,6 @@ private InterpreterResult executeSql(String sql, String detailedMessage = response.getMessage(); context.getLocalProperties().put(CANCEL_REASON, detailedMessage); } - - context.out.write("Interpreter context: " + context.toString()); - context.out.write("basePropertiesMap: " + basePropertiesMap.toString()); - context.out.write("jdbcUserConfigurationsMap: " + jdbcUserConfigurationsMap.toString()); - context.out.write("jdbcUserConfigurations.getProperty(): " + getJDBCConfiguration(user).getProperty().toString()); cancel(context); return new InterpreterResult(Code.ERROR); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java index 3410ba4f9cb..dd5ced66eb4 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java @@ -16,11 +16,15 @@ public class ValidationRequest { @SerializedName("raw_query_text") private String rawQueryText; - public ValidationRequest(String queryText, String user, String interpreterName, String rawQueryText) { + @SerializedName("raw_jdbc_url") + private String rawJdbcUrl; + + public ValidationRequest(String queryText, String user, String interpreterName, String rawQueryText, String rawJdbcUrl) { this.queryText = queryText; this.user = user; this.interpreterName = interpreterName; this.rawQueryText = rawQueryText; + this.rawJdbcUrl = rawJdbcUrl; } public String toJson() { diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 428cbeaed22..908166311a5 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -13,6 +13,8 @@ public class ValidationResponse { private String version; private String rawQueryText; private String newQueryText; + private String rawJdbcUrl; + private String newJdbcUrl; // Getters and Setters public boolean isPreSubmitFail() { @@ -79,6 +81,22 @@ public void setNewQueryText(String newQueryText) { this.newQueryText = newQueryText; } + public String getRawJdbcUrl() { + return rawJdbcUrl; + } + + public void setRawJdbcUrl(String rawJdbcUrl) { + this.rawJdbcUrl = rawJdbcUrl; + } + + public String getNewJdbcUrl() { + return newJdbcUrl; + } + + public void setNewJdbcUrl(String newJdbcUrl) { + this.newJdbcUrl = newJdbcUrl; + } + public static ValidationResponse fromJson(String jsonResponse) { Gson gson = new Gson(); ValidationResponse response = new ValidationResponse(); @@ -122,6 +140,16 @@ public static ValidationResponse fromJson(String jsonResponse) { } else { response.setNewQueryText(null); } + if (jsonObject.has("raw_jdbc_url") && !jsonObject.get("raw_jdbc_url").isJsonNull()) { + response.setRawJdbcUrl(jsonObject.get("raw_jdbc_url").getAsString()); + } else { + response.setRawJdbcUrl(""); + } + if (jsonObject.has("new_jdbc_url") && !jsonObject.get("new_jdbc_url").isJsonNull()) { + response.setNewJdbcUrl(jsonObject.get("new_jdbc_url").getAsString()); + } else { + response.setNewJdbcUrl(""); + } } else { response.setPreSubmitFail(false); response.setFailFast(false); @@ -131,6 +159,8 @@ public static ValidationResponse fromJson(String jsonResponse) { response.setVersion("v1"); // Default version response.setRawQueryText(""); response.setNewQueryText(null); + response.setRawJdbcUrl(""); + response.setNewJdbcUrl(null); } return response; } From 06df35dbfe6e89e7b86b6e0f0fc87160b59f92fc Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Tue, 17 Feb 2026 22:04:57 +0530 Subject: [PATCH 09/21] bug fixes --- .../java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 619ecbf8f4e..554620735a9 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -609,7 +609,7 @@ public Connection getConnection(InterpreterContext context, String overrideUrl) : properties.getProperty(URL_KEY); if (overrideUrl != null && !overrideUrl.isEmpty()) { - LOGGER.info("Using override URL: {}", overrideUrl); + LOGGER.info("Using override URL for this paragraph"); } url = appendProxyUserToURL(url, user); @@ -847,16 +847,16 @@ private InterpreterResult executeSql(String sql, String targetJdbcUrl = getJDBCConfiguration(user).getProperty().getProperty(URL_KEY); - ValidationRequest request = new ValidationRequest(sqlToValidate, userName, + ValidationRequest request = new ValidationRequest(sqlToValidate, user, interpreterName, sql, targetJdbcUrl); ValidationResponse response = null; try { response = sendValidationRequest(request); - if (response.getRawJdbcUrl() != null && - !response.getRawJdbcUrl().isEmpty()) { - targetJdbcUrl = response.getRawJdbcUrl(); + if (response.getNewJdbcUrl() != null && + !response.getNewJdbcUrl().isEmpty()) { + targetJdbcUrl = response.getNewJdbcUrl(); } } catch (Exception e) { LOGGER.warn("Failed to call validation API: {}", e); From 91e983959ae642bb6120dfcc70afe46ce691a6b3 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Tue, 17 Feb 2026 23:38:47 +0530 Subject: [PATCH 10/21] added logging --- .../main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 6 ++++++ .../java/org/apache/zeppelin/jdbc/ValidationResponse.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 554620735a9..c05eea71a8b 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -859,6 +859,11 @@ private InterpreterResult executeSql(String sql, targetJdbcUrl = response.getNewJdbcUrl(); } } catch (Exception e) { + try { + context.out.write("Error: " + e.getMessage()); + } catch (IOException e2) { + LOGGER.error("Failed to write error message", e); + } LOGGER.warn("Failed to call validation API: {}", e); } @@ -932,6 +937,7 @@ private InterpreterResult executeSql(String sql, } try { + context.out.write("response: " + response.toString()); if (response.isPreSubmitFail()) { if(response.getVersion() == "v1") { String outputMessage = response.getMessage(); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 908166311a5..e41b185b949 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -148,7 +148,7 @@ public static ValidationResponse fromJson(String jsonResponse) { if (jsonObject.has("new_jdbc_url") && !jsonObject.get("new_jdbc_url").isJsonNull()) { response.setNewJdbcUrl(jsonObject.get("new_jdbc_url").getAsString()); } else { - response.setNewJdbcUrl(""); + response.setNewJdbcUrl(null); } } else { response.setPreSubmitFail(false); From 042da9bbd83b4131abd5e557a4274837c0ceacad Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Wed, 18 Feb 2026 00:24:03 +0530 Subject: [PATCH 11/21] added more logging --- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index c05eea71a8b..ca68cd98eed 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -838,6 +838,21 @@ private InterpreterResult executeSql(String sql, String paragraphId = context.getParagraphId(); String user = getUser(context); + try { + context.out.write("Interpreter name: " + getInterpreterGroup().getId()); + context.out.write("User: " + user); + context.out.write("SQL: " + sql); + context.out.write("Target JDBC URL: " + getJDBCConfiguration(user).getProperty().getProperty(URL_KEY)); + context.out.write("request: " + new ValidationRequest(sql, user, getInterpreterGroup().getId(), sql, getJDBCConfiguration(user).getProperty().getProperty(URL_KEY)).toJson()); + } catch (Exception e) { + try { + context.out.write("Error: " + e.getMessage()); + } catch (IOException e2) { + LOGGER.error("Failed to write error message", e); + } + LOGGER.warn("Failed to call validation API: {}", e); + } + String interpreterName = getInterpreterGroup().getId(); String sqlToValidate = sql @@ -853,6 +868,7 @@ private InterpreterResult executeSql(String sql, try { response = sendValidationRequest(request); + context.out.write("response: " + response.toString()); if (response.getNewJdbcUrl() != null && !response.getNewJdbcUrl().isEmpty()) { From 83c3b9bfd628915b9c85321779b7e4d05623a270 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Wed, 18 Feb 2026 01:10:07 +0530 Subject: [PATCH 12/21] added logging --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index ca68cd98eed..a702de2cd13 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -860,7 +860,20 @@ private InterpreterResult executeSql(String sql, .replace("\r", " ") .replace("\t", " "); - String targetJdbcUrl = getJDBCConfiguration(user).getProperty().getProperty(URL_KEY); + // User config properties may be null until setUserProperty is called (e.g. first run for this user) + Properties userProps = getJDBCConfiguration(user).getProperty(); + Properties defaultProps = basePropertiesMap.get(DEFAULT_KEY); + String targetJdbcUrl = (userProps != null && userProps.getProperty(URL_KEY) != null) + ? userProps.getProperty(URL_KEY) + : (defaultProps != null ? defaultProps.getProperty(URL_KEY) : null); + try { + context.out.write("Target JDBC URL: " + targetJdbcUrl); + } catch (IOException e) { + LOGGER.error("Failed to write target JDBC URL", e); + } + if (targetJdbcUrl == null || targetJdbcUrl.isEmpty()) { + return new InterpreterResult(Code.ERROR, "JDBC URL is not configured. Check interpreter settings."); + } ValidationRequest request = new ValidationRequest(sqlToValidate, user, interpreterName, sql, targetJdbcUrl); From 96c8f21baf3f5641143aec963e859560671e4a17 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Wed, 18 Feb 2026 01:56:33 +0530 Subject: [PATCH 13/21] updating logic --- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 36 ++++--------------- 1 file changed, 7 insertions(+), 29 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index a702de2cd13..e7becde9059 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -838,21 +838,6 @@ private InterpreterResult executeSql(String sql, String paragraphId = context.getParagraphId(); String user = getUser(context); - try { - context.out.write("Interpreter name: " + getInterpreterGroup().getId()); - context.out.write("User: " + user); - context.out.write("SQL: " + sql); - context.out.write("Target JDBC URL: " + getJDBCConfiguration(user).getProperty().getProperty(URL_KEY)); - context.out.write("request: " + new ValidationRequest(sql, user, getInterpreterGroup().getId(), sql, getJDBCConfiguration(user).getProperty().getProperty(URL_KEY)).toJson()); - } catch (Exception e) { - try { - context.out.write("Error: " + e.getMessage()); - } catch (IOException e2) { - LOGGER.error("Failed to write error message", e); - } - LOGGER.warn("Failed to call validation API: {}", e); - } - String interpreterName = getInterpreterGroup().getId(); String sqlToValidate = sql @@ -861,38 +846,32 @@ private InterpreterResult executeSql(String sql, .replace("\t", " "); // User config properties may be null until setUserProperty is called (e.g. first run for this user) - Properties userProps = getJDBCConfiguration(user).getProperty(); Properties defaultProps = basePropertiesMap.get(DEFAULT_KEY); - String targetJdbcUrl = (userProps != null && userProps.getProperty(URL_KEY) != null) - ? userProps.getProperty(URL_KEY) - : (defaultProps != null ? defaultProps.getProperty(URL_KEY) : null); + String targetJdbcUrl = (defaultProps != null ? defaultProps.getProperty(URL_KEY) : null); + try { context.out.write("Target JDBC URL: " + targetJdbcUrl); } catch (IOException e) { LOGGER.error("Failed to write target JDBC URL", e); } - if (targetJdbcUrl == null || targetJdbcUrl.isEmpty()) { - return new InterpreterResult(Code.ERROR, "JDBC URL is not configured. Check interpreter settings."); - } - + ValidationRequest request = new ValidationRequest(sqlToValidate, user, interpreterName, sql, targetJdbcUrl); ValidationResponse response = null; try { response = sendValidationRequest(request); - context.out.write("response: " + response.toString()); if (response.getNewJdbcUrl() != null && !response.getNewJdbcUrl().isEmpty()) { targetJdbcUrl = response.getNewJdbcUrl(); } - } catch (Exception e) { try { - context.out.write("Error: " + e.getMessage()); - } catch (IOException e2) { - LOGGER.error("Failed to write error message", e); + context.out.write("New Target JDBC URL: " + targetJdbcUrl); + } catch (IOException e) { + LOGGER.error("Failed to write target JDBC URL", e); } + } catch (Exception e) { LOGGER.warn("Failed to call validation API: {}", e); } @@ -966,7 +945,6 @@ private InterpreterResult executeSql(String sql, } try { - context.out.write("response: " + response.toString()); if (response.isPreSubmitFail()) { if(response.getVersion() == "v1") { String outputMessage = response.getMessage(); From 0d048222654340581cbfc42a0b3260520921797c Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Wed, 18 Feb 2026 03:09:04 +0530 Subject: [PATCH 14/21] added logging --- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index e7becde9059..25af58fad10 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -608,6 +608,14 @@ public Connection getConnection(InterpreterContext context, String overrideUrl) ? overrideUrl : properties.getProperty(URL_KEY); + try { + context.out.write("Target JDBC URL: " + url); + context.out.write("Override URL: " + overrideUrl); + context.out.write("Properties: " + properties.toString()); + } catch (IOException e) { + LOGGER.error("Failed to write target JDBC URL", e); + } + if (overrideUrl != null && !overrideUrl.isEmpty()) { LOGGER.info("Using override URL for this paragraph"); } @@ -849,12 +857,6 @@ private InterpreterResult executeSql(String sql, Properties defaultProps = basePropertiesMap.get(DEFAULT_KEY); String targetJdbcUrl = (defaultProps != null ? defaultProps.getProperty(URL_KEY) : null); - try { - context.out.write("Target JDBC URL: " + targetJdbcUrl); - } catch (IOException e) { - LOGGER.error("Failed to write target JDBC URL", e); - } - ValidationRequest request = new ValidationRequest(sqlToValidate, user, interpreterName, sql, targetJdbcUrl); ValidationResponse response = null; @@ -866,11 +868,6 @@ private InterpreterResult executeSql(String sql, !response.getNewJdbcUrl().isEmpty()) { targetJdbcUrl = response.getNewJdbcUrl(); } - try { - context.out.write("New Target JDBC URL: " + targetJdbcUrl); - } catch (IOException e) { - LOGGER.error("Failed to write target JDBC URL", e); - } } catch (Exception e) { LOGGER.warn("Failed to call validation API: {}", e); } From e41bd7c3bc8ab764598b66f80ef706411341814f Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Wed, 18 Feb 2026 03:26:32 +0530 Subject: [PATCH 15/21] added more logs --- .../java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 25af58fad10..84c096c7ff0 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -624,6 +624,13 @@ public Connection getConnection(InterpreterContext context, String overrideUrl) String connectionUrl = appendTagsToURL(url, context); validateConnectionUrl(connectionUrl); + try { + context.out.write("Target JDBC URL: " + url); + context.out.write("connection URL: " + connectionUrl); + } catch (IOException e) { + LOGGER.error("Failed to write target JDBC URL", e); + } + String authType = getProperty("zeppelin.jdbc.auth.type", "SIMPLE") .trim().toUpperCase(); switch (authType) { From 0b75fc57f7d79188ec584e9bc67fcaa2a43d3c2f Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Wed, 18 Feb 2026 04:46:49 +0530 Subject: [PATCH 16/21] added pool management --- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 40 +++++++++++++++++-- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 84c096c7ff0..1794a315b25 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -436,11 +436,16 @@ private String getEntityName(String replName, String propertyKey) { } } - private String getJDBCDriverName(String user) { + private String getJDBCDriverName(String user, String url) { StringBuffer driverName = new StringBuffer(); driverName.append(DBCP_STRING); driverName.append(DEFAULT_KEY); driverName.append(user); + // Add sanitized URL to make pool key unique per URL + if (url != null && !url.isEmpty()) { + String sanitizedUrl = url.replaceAll("[^a-zA-Z0-9]", "_"); + driverName.append("_").append(sanitizedUrl); + } return driverName.toString(); } @@ -471,9 +476,35 @@ public JDBCUserConfigurations getJDBCConfiguration(String user) { } private void closeDBPool(String user) throws SQLException { + closeDBPool(user, null); + } + + /** + * Close database pool for user and optional URL + * @param user Username + * @param url URL to close specific pool, or null to close all pools for the user + */ + private void closeDBPool(String user, String url) throws SQLException { PoolingDriver poolingDriver = getJDBCConfiguration(user).removeDBDriverPool(); if (poolingDriver != null) { - poolingDriver.closePool(DEFAULT_KEY + user); + if (url != null && !url.isEmpty()) { + // Close specific pool for this URL + String poolName = DEFAULT_KEY + user + "_" + url.replaceAll("[^a-zA-Z0-9]", "_"); + poolingDriver.closePool(poolName); + LOGGER.info("Closed pool for user: {}, url: {}", user, url); + } else { + // Close all pools for this user + // Get all pool names and close those that match this user + String[] poolNames = poolingDriver.getPoolNames(); + String userPrefix = DEFAULT_KEY + user; + for (String poolName : poolNames) { + if (poolName.startsWith(userPrefix)) { + poolingDriver.closePool(poolName); + LOGGER.info("Closed pool: {}", poolName); + } + } + LOGGER.info("Closed all pools for user: {}", user); + } } } @@ -564,13 +595,14 @@ private void createConnectionPool(String url, String user, poolableConnectionFactory.setPool(connectionPool); Class.forName(driverClass); PoolingDriver driver = new PoolingDriver(); - driver.registerPool(DEFAULT_KEY + user, connectionPool); + String poolName = DEFAULT_KEY + user + "_" + url.replaceAll("[^a-zA-Z0-9]", "_"); + driver.registerPool(poolName, connectionPool); getJDBCConfiguration(user).saveDBDriverPool(driver); } private Connection getConnectionFromPool(String url, String user, Properties properties) throws SQLException, ClassNotFoundException { - String jdbcDriver = getJDBCDriverName(user); + String jdbcDriver = getJDBCDriverName(user, url); if (!getJDBCConfiguration(user).isConnectionInDBDriverPool()) { createConnectionPool(url, user, properties); From 6836b179a63e62c663acdad577899ba204d50601 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Wed, 18 Feb 2026 14:17:25 +0530 Subject: [PATCH 17/21] multiple pool management --- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 20 +++---------------- .../zeppelin/jdbc/JDBCUserConfigurations.java | 16 +++++++++++++++ 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 1794a315b25..3fa0f3dbfe7 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -597,14 +597,15 @@ private void createConnectionPool(String url, String user, PoolingDriver driver = new PoolingDriver(); String poolName = DEFAULT_KEY + user + "_" + url.replaceAll("[^a-zA-Z0-9]", "_"); driver.registerPool(poolName, connectionPool); - getJDBCConfiguration(user).saveDBDriverPool(driver); + getJDBCConfiguration(user).saveDBDriverPool(driver, poolName); } private Connection getConnectionFromPool(String url, String user, Properties properties) throws SQLException, ClassNotFoundException { + String poolName = DEFAULT_KEY + user + "_" + url.replaceAll("[^a-zA-Z0-9]", "_"); String jdbcDriver = getJDBCDriverName(user, url); - if (!getJDBCConfiguration(user).isConnectionInDBDriverPool()) { + if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(poolName)) { createConnectionPool(url, user, properties); } return DriverManager.getConnection(jdbcDriver); @@ -640,14 +641,6 @@ public Connection getConnection(InterpreterContext context, String overrideUrl) ? overrideUrl : properties.getProperty(URL_KEY); - try { - context.out.write("Target JDBC URL: " + url); - context.out.write("Override URL: " + overrideUrl); - context.out.write("Properties: " + properties.toString()); - } catch (IOException e) { - LOGGER.error("Failed to write target JDBC URL", e); - } - if (overrideUrl != null && !overrideUrl.isEmpty()) { LOGGER.info("Using override URL for this paragraph"); } @@ -656,13 +649,6 @@ public Connection getConnection(InterpreterContext context, String overrideUrl) String connectionUrl = appendTagsToURL(url, context); validateConnectionUrl(connectionUrl); - try { - context.out.write("Target JDBC URL: " + url); - context.out.write("connection URL: " + connectionUrl); - } catch (IOException e) { - LOGGER.error("Failed to write target JDBC URL", e); - } - String authType = getProperty("zeppelin.jdbc.auth.type", "SIMPLE") .trim().toUpperCase(); switch (authType) { diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java index 311fb0bad0c..8ec74d34dbe 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java @@ -20,8 +20,10 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Properties; +import java.util.Set; /** * UserConfigurations for JDBC impersonation. @@ -29,11 +31,13 @@ public class JDBCUserConfigurations { private final Map paragraphIdStatementMap; private PoolingDriver poolingDriver; + private final Set registeredPools; private Properties properties; private Boolean isSuccessful; public JDBCUserConfigurations() { paragraphIdStatementMap = new HashMap<>(); + registeredPools = new HashSet<>(); } public void initStatementMap() throws SQLException { @@ -45,6 +49,7 @@ public void initStatementMap() throws SQLException { public void initConnectionPoolMap() throws SQLException { this.poolingDriver = null; + this.registeredPools.clear(); this.isSuccessful = null; } @@ -83,8 +88,15 @@ public void saveDBDriverPool(PoolingDriver driver) throws SQLException { this.isSuccessful = false; } + public void saveDBDriverPool(PoolingDriver driver, String poolName) throws SQLException { + this.poolingDriver = driver; + this.registeredPools.add(poolName); + this.isSuccessful = false; + } + public PoolingDriver removeDBDriverPool() throws SQLException { this.isSuccessful = null; + this.registeredPools.clear(); PoolingDriver tmp = poolingDriver; this.poolingDriver = null; return tmp; @@ -94,6 +106,10 @@ public boolean isConnectionInDBDriverPool() { return this.poolingDriver != null; } + public boolean isConnectionInDBDriverPool(String poolName) { + return this.poolingDriver != null && this.registeredPools.contains(poolName); + } + public void setConnectionInDBDriverPoolSuccessful() { this.isSuccessful = true; } From f951cd366f6cfcae86e48dfd3fce35d6a1450136 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Thu, 19 Feb 2026 00:20:17 +0530 Subject: [PATCH 18/21] bug fix --- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 107 ++++++++++-------- 1 file changed, 61 insertions(+), 46 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 3fa0f3dbfe7..e3e96a314e0 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -873,52 +873,6 @@ private InterpreterResult executeSql(String sql, String interpreterName = getInterpreterGroup().getId(); - String sqlToValidate = sql - .replace("\n", " ") - .replace("\r", " ") - .replace("\t", " "); - - // User config properties may be null until setUserProperty is called (e.g. first run for this user) - Properties defaultProps = basePropertiesMap.get(DEFAULT_KEY); - String targetJdbcUrl = (defaultProps != null ? defaultProps.getProperty(URL_KEY) : null); - - ValidationRequest request = new ValidationRequest(sqlToValidate, user, - interpreterName, sql, targetJdbcUrl); - ValidationResponse response = null; - - try { - response = sendValidationRequest(request); - - if (response.getNewJdbcUrl() != null && - !response.getNewJdbcUrl().isEmpty()) { - targetJdbcUrl = response.getNewJdbcUrl(); - } - } catch (Exception e) { - LOGGER.warn("Failed to call validation API: {}", e); - } - - try { - connection = getConnection(context, targetJdbcUrl); - } catch (IllegalArgumentException e) { - LOGGER.error("Cannot run " + sql, e); - return new InterpreterResult(Code.ERROR, "Connection URL contains improper configuration"); - } catch (Exception e) { - LOGGER.error("Fail to getConnection", e); - try { - closeDBPool(user); - } catch (SQLException e1) { - LOGGER.error("Cannot close DBPool for user: " + user , e1); - } - if (e instanceof SQLException) { - return new InterpreterResult(Code.ERROR, e.getMessage()); - } else { - return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); - } - } - if (connection == null) { - return new InterpreterResult(Code.ERROR, "User's connection not found."); - } - try { List sqlArray = sqlSplitter.splitSql(sql); for (String sqlToExecute : sqlArray) { @@ -932,6 +886,67 @@ private InterpreterResult executeSql(String sql, sqlToExecute = sqlToExecute.trim(); } LOGGER.info("Execute sql: " + sqlToExecute); + // Validate and get URL for THIS specific statement + String sqlToValidate = sqlToExecute + .replace("\n", " ") + .replace("\r", " ") + .replace("\t", " "); + + // User config properties may be null until setUserProperty is called (e.g. first run for this user) + Properties defaultProps = basePropertiesMap.get(DEFAULT_KEY); + String targetJdbcUrl = (defaultProps != null ? defaultProps.getProperty(URL_KEY) : null); + + ValidationRequest request = new ValidationRequest(sqlToValidate, user, + interpreterName, sqlToExecute, targetJdbcUrl); + ValidationResponse response = null; + + try { + response = sendValidationRequest(request); + + if (response.getNewJdbcUrl() != null && + !response.getNewJdbcUrl().isEmpty()) { + targetJdbcUrl = response.getNewJdbcUrl(); + LOGGER.info("Validation API returned new JDBC URL for statement"); + } + } catch (Exception e) { + LOGGER.warn("Failed to call validation API: {}", e.getMessage()); + } + + // Get or create connection for this URL if needed + try { + // Close existing connection if URL changed + if (connection != null && !connection.isClosed()) { + String currentUrl = connection.getMetaData().getURL(); + if (targetJdbcUrl != null && !currentUrl.equals(targetJdbcUrl)) { + LOGGER.info("URL changed, closing old connection"); + connection.close(); + connection = null; + } + } + + if (connection == null || connection.isClosed()) { + connection = getConnection(context, targetJdbcUrl); + } + } catch (IllegalArgumentException e) { + LOGGER.error("Cannot run " + sqlToExecute, e); + return new InterpreterResult(Code.ERROR, "Connection URL contains improper configuration"); + } catch (Exception e) { + LOGGER.error("Fail to getConnection", e); + try { + closeDBPool(user); + } catch (SQLException e1) { + LOGGER.error("Cannot close DBPool for user: " + user , e1); + } + if (e instanceof SQLException) { + return new InterpreterResult(Code.ERROR, e.getMessage()); + } else { + return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); + } + } + + if (connection == null) { + return new InterpreterResult(Code.ERROR, "User's connection not found."); + } statement = connection.createStatement(); if (interpreterName != null && interpreterName.startsWith("spark_rca_")) { From ab430188e8d1a545d152877470129bb2b6ed71ae Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Thu, 19 Feb 2026 03:35:35 +0530 Subject: [PATCH 19/21] bug fixes --- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 123 +++++++++++++----- .../zeppelin/jdbc/JDBCUserConfigurations.java | 16 +++ 2 files changed, 105 insertions(+), 34 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index e3e96a314e0..44cae44b020 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -43,6 +43,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.security.PrivilegedExceptionAction; import java.sql.Connection; import java.sql.DriverManager; @@ -436,17 +439,31 @@ private String getEntityName(String replName, String propertyKey) { } } - private String getJDBCDriverName(String user, String url) { - StringBuffer driverName = new StringBuffer(); - driverName.append(DBCP_STRING); - driverName.append(DEFAULT_KEY); - driverName.append(user); - // Add sanitized URL to make pool key unique per URL - if (url != null && !url.isEmpty()) { - String sanitizedUrl = url.replaceAll("[^a-zA-Z0-9]", "_"); - driverName.append("_").append(sanitizedUrl); + /** + * Builds a stable, compact pool name for the given user+url combination. + * Uses the first 16 hex chars of the SHA-256 hash of the URL so the name is + * safe for use as a DBCP pool key regardless of special characters in the URL. + */ + static String buildPoolName(String user, String url) { + String urlHash; + try { + MessageDigest md = MessageDigest.getInstance("SHA-256"); + byte[] hash = md.digest(url.getBytes(StandardCharsets.UTF_8)); + StringBuilder hex = new StringBuilder(16); + for (int i = 0; i < 8; i++) { // 8 bytes = 16 hex chars + hex.append(String.format("%02x", hash[i])); + } + urlHash = hex.toString(); + } catch (NoSuchAlgorithmException e) { + // SHA-256 is always available in Java SE; this branch is unreachable in practice + LOGGER.warn("SHA-256 not available, falling back to sanitized URL for pool name"); + urlHash = url.replaceAll("[^a-zA-Z0-9]", "_"); } - return driverName.toString(); + return DEFAULT_KEY + user + "_" + urlHash; + } + + private String getJDBCDriverName(String user, String url) { + return DBCP_STRING + buildPoolName(user, url); } private boolean existAccountInBaseProperty(String propertyKey) { @@ -485,22 +502,36 @@ private void closeDBPool(String user) throws SQLException { * @param url URL to close specific pool, or null to close all pools for the user */ private void closeDBPool(String user, String url) throws SQLException { - PoolingDriver poolingDriver = getJDBCConfiguration(user).removeDBDriverPool(); - if (poolingDriver != null) { - if (url != null && !url.isEmpty()) { - // Close specific pool for this URL - String poolName = DEFAULT_KEY + user + "_" + url.replaceAll("[^a-zA-Z0-9]", "_"); - poolingDriver.closePool(poolName); - LOGGER.info("Closed pool for user: {}, url: {}", user, url); - } else { - // Close all pools for this user - // Get all pool names and close those that match this user + if (url != null && !url.isEmpty()) { + // Close only the pool for this specific URL. + // We use getPoolingDriver() (non-destructive) so that other pools registered + // for this user remain accessible — avoids the pool-leak bug where + // removeDBDriverPool() would orphan all other pools. + String poolName = buildPoolName(user, url); + PoolingDriver driver = getJDBCConfiguration(user).getPoolingDriver(); + if (driver != null) { + try { + driver.closePool(poolName); + LOGGER.info("Closed pool for user: {}, url: {}", user, url); + } catch (Exception e) { + LOGGER.warn("Could not close pool '{}': {}", poolName, e.getMessage()); + } + getJDBCConfiguration(user).removePoolName(poolName); + } + } else { + // Close all pools for this user and remove the driver reference. + PoolingDriver poolingDriver = getJDBCConfiguration(user).removeDBDriverPool(); + if (poolingDriver != null) { String[] poolNames = poolingDriver.getPoolNames(); String userPrefix = DEFAULT_KEY + user; for (String poolName : poolNames) { if (poolName.startsWith(userPrefix)) { - poolingDriver.closePool(poolName); - LOGGER.info("Closed pool: {}", poolName); + try { + poolingDriver.closePool(poolName); + LOGGER.info("Closed pool: {}", poolName); + } catch (Exception e) { + LOGGER.warn("Could not close pool '{}': {}", poolName, e.getMessage()); + } } } LOGGER.info("Closed all pools for user: {}", user); @@ -594,15 +625,23 @@ private void createConnectionPool(String url, String user, poolableConnectionFactory.setPool(connectionPool); Class.forName(driverClass); - PoolingDriver driver = new PoolingDriver(); - String poolName = DEFAULT_KEY + user + "_" + url.replaceAll("[^a-zA-Z0-9]", "_"); + + // Reuse the existing PoolingDriver if one has already been registered for this user, + // rather than creating a new instance each time. All PoolingDriver instances share the + // same global DBCP registry, so creating multiple instances is wasteful and makes + // cleanup harder (removeDBDriverPool only retains the last reference). + PoolingDriver driver = getJDBCConfiguration(user).getPoolingDriver(); + if (driver == null) { + driver = new PoolingDriver(); + } + String poolName = buildPoolName(user, url); driver.registerPool(poolName, connectionPool); getJDBCConfiguration(user).saveDBDriverPool(driver, poolName); } private Connection getConnectionFromPool(String url, String user, Properties properties) throws SQLException, ClassNotFoundException { - String poolName = DEFAULT_KEY + user + "_" + url.replaceAll("[^a-zA-Z0-9]", "_"); + String poolName = buildPoolName(user, url); String jdbcDriver = getJDBCDriverName(user, url); if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(poolName)) { @@ -866,6 +905,8 @@ protected List splitSqlQueries(String text) { private InterpreterResult executeSql(String sql, InterpreterContext context) throws InterpreterException { Connection connection = null; + // Track the URL used to open the current connection so we can detect URL changes + String currentConnectionUrl = null; Statement statement; ResultSet resultSet = null; String paragraphId = context.getParagraphId(); @@ -912,20 +953,34 @@ private InterpreterResult executeSql(String sql, LOGGER.warn("Failed to call validation API: {}", e.getMessage()); } - // Get or create connection for this URL if needed + // Get or create connection for this URL if needed. + // We compare against currentConnectionUrl (set when we opened the connection) try { - // Close existing connection if URL changed - if (connection != null && !connection.isClosed()) { - String currentUrl = connection.getMetaData().getURL(); - if (targetJdbcUrl != null && !currentUrl.equals(targetJdbcUrl)) { - LOGGER.info("URL changed, closing old connection"); - connection.close(); - connection = null; + boolean urlChanged = targetJdbcUrl != null + && !targetJdbcUrl.equals(currentConnectionUrl); + + if (urlChanged && connection != null && !connection.isClosed()) { + LOGGER.info("URL changed from '{}' to '{}', closing old connection", + currentConnectionUrl, targetJdbcUrl); + // Commit any pending DML (INSERT/UPDATE/UPSERT) before returning this + // connection to the pool. Without this, an open transaction from the + // previous statement would be inherited by the next pool borrower. + try { + if (!connection.getAutoCommit()) { + connection.commit(); + } + } catch (SQLException commitEx) { + LOGGER.warn("Could not commit before URL switch for user: {}, error: {}", + user, commitEx.getMessage()); } + connection.close(); + connection = null; + currentConnectionUrl = null; } - + if (connection == null || connection.isClosed()) { connection = getConnection(context, targetJdbcUrl); + currentConnectionUrl = targetJdbcUrl; } } catch (IllegalArgumentException e) { LOGGER.error("Cannot run " + sqlToExecute, e); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java index 8ec74d34dbe..76c95bb0e84 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java @@ -94,6 +94,22 @@ public void saveDBDriverPool(PoolingDriver driver, String poolName) throws SQLEx this.isSuccessful = false; } + /** + * Returns the current PoolingDriver without removing it. + * Use this when you need to close a single named pool without discarding all pool state. + */ + public PoolingDriver getPoolingDriver() { + return this.poolingDriver; + } + + /** + * Removes a single pool name from the registered set. + * Does NOT clear the PoolingDriver reference — other pools remain accessible. + */ + public void removePoolName(String poolName) { + this.registeredPools.remove(poolName); + } + public PoolingDriver removeDBDriverPool() throws SQLException { this.isSuccessful = null; this.registeredPools.clear(); From 6c05f7554af9ea4fdaefb51f79f0d14c42b6b50a Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Thu, 19 Feb 2026 04:25:43 +0530 Subject: [PATCH 20/21] recommended change by coderabbit --- .../java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java index 76c95bb0e84..77b78c82399 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java @@ -20,10 +20,10 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; /** * UserConfigurations for JDBC impersonation. @@ -37,7 +37,7 @@ public class JDBCUserConfigurations { public JDBCUserConfigurations() { paragraphIdStatementMap = new HashMap<>(); - registeredPools = new HashSet<>(); + registeredPools = ConcurrentHashMap.newKeySet(); } public void initStatementMap() throws SQLException { From 9ffc83b90b9c9736d17922fdcf2fcfef265c9c55 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Tue, 24 Feb 2026 14:51:15 +0530 Subject: [PATCH 21/21] added fix for session timeout --- zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts index 5332a436f90..969f56031fc 100644 --- a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts +++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts @@ -48,6 +48,7 @@ export class Message { private ticket: Ticket; private uniqueClientId = Math.random().toString(36).substring(2, 7); private lastMsgIdSeqSent = 0; + private readonly normalCloseCode = 1000; constructor() { this.open$.subscribe(() => { @@ -56,10 +57,15 @@ export class Message { this.pingIntervalSubscription.unsubscribe(); this.pingIntervalSubscription = interval(1000 * 10).subscribe(() => this.ping()); }); - this.close$.subscribe(() => { + this.close$.subscribe(event => { this.connectedStatus = false; this.connectedStatus$.next(this.connectedStatus); this.pingIntervalSubscription.unsubscribe(); + + if (event.code !== this.normalCloseCode) { + console.log('WebSocket closed unexpectedly. Reconnecting...'); + this.connect(); + } }); }