From 192bcfc606aeb535e7dfcbaf1fe42ba96208d0b3 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Wed, 4 Feb 2026 19:41:43 +0530 Subject: [PATCH 1/7] handled update query part --- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 6 ++- .../zeppelin/jdbc/ValidationRequest.java | 6 ++- .../zeppelin/jdbc/ValidationResponse.java | 45 +++++++++++++++++++ 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 2acd4846b72..22d1105e7f5 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -875,7 +875,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, .replace("\n", " ") .replace("\r", " ") .replace("\t", " "); - ValidationRequest request = new ValidationRequest(sqlToValidate, userName, interpreterName); + ValidationRequest request = new ValidationRequest(sqlToValidate, userName, interpreterName, sqlToExecute); try { ValidationResponse response = sendValidationRequest(request); if (response.isPreSubmitFail()) { @@ -917,7 +917,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, finalOutput.append("Use: ").append(jsonObject.getString(table)).append(" in place of ").append(table).append("\n"); } } - }else if (outputMessage.contains("UnAuthorized Query")) { + } else if (outputMessage.contains("UnAuthorized Query")) { context.out.write("Query Error: UnAuthorized Query\n"); finalOutput.append("You are not authorized to execute this query.\n"); } @@ -939,6 +939,8 @@ private InterpreterResult executeSql(String dbPrefix, String sql, context.out.write("%text " + message + "\n\n"); context.out.flush(); } + } if (response.isQueryUpdated()) { + sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : response.getQueryText(); } } catch (Exception e) { String error = "Error occurred while sending request " + e.getMessage(); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java index 23cf6f67a60..a9a2aecb532 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java @@ -4,15 +4,17 @@ public class ValidationRequest { private String queryText; private String user; private String interpreterName; + private String rawQueryText; - public ValidationRequest(String queryText, String user, String interpreterName) { + public ValidationRequest(String queryText, String user, String interpreterName, String rawQueryText) { this.queryText = queryText; this.user = user; this.interpreterName = interpreterName; + this.rawQueryText = rawQueryText; } public String toJson() { - return "{\"query_text\":\"" + queryText + "\",\"user\":\"" + user + "\",\"interpreter_name\":\"" + interpreterName + "\"}"; + return "{\"query_text\":\"" + queryText + "\",\"user\":\"" + user + "\",\"interpreter_name\":\"" + interpreterName + "\",\"raw_query_text\":\"" + rawQueryText + "\"}"; } } diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 759e6522883..a46111e7163 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -11,6 +11,9 @@ public class ValidationResponse { private String errorHeader; private String message; private String version; + private boolean isQueryUpdated; + private String queryText; + private String newQueryText; // Getters and Setters public boolean isPreSubmitFail() { @@ -61,6 +64,30 @@ public void setMessage(String message) { this.message = message; } + public boolean isQueryUpdated() { + return isQueryUpdated; + } + + public void setQueryUpdated(boolean isQueryUpdated) { + this.isQueryUpdated = isQueryUpdated; + } + + public String getQueryText() { + return queryText; + } + + public void setQueryText(String queryText) { + this.queryText = queryText; + } + + public String getNewQueryText() { + return newQueryText; + } + + public void setNewQueryText(String newQueryText) { + this.newQueryText = newQueryText; + } + public static ValidationResponse fromJson(String jsonResponse) { Gson gson = new Gson(); ValidationResponse response = new ValidationResponse(); @@ -94,6 +121,21 @@ public static ValidationResponse fromJson(String jsonResponse) { } else { response.setVersion("v1"); } + if (jsonObject.has("is_query_updated") && !jsonObject.get("is_query_updated").isJsonNull()) { + response.setQueryUpdated(jsonObject.get("is_query_updated").getAsBoolean()); + } else { + response.setQueryUpdated(false); + } + if (jsonObject.has("query_text") && !jsonObject.get("query_text").isJsonNull()) { + response.setQueryText(jsonObject.get("query_text").getAsString()); + } else { + response.setQueryText(""); + } + if (jsonObject.has("new_query_text") && !jsonObject.get("new_query_text").isJsonNull()) { + response.setNewQueryText(jsonObject.get("new_query_text").getAsString()); + } else { + response.setNewQueryText(null); + } } else { response.setPreSubmitFail(false); response.setFailFast(false); @@ -101,6 +143,9 @@ public static ValidationResponse fromJson(String jsonResponse) { response.setErrorHeader(""); // Default error header response.setMessage(""); // Default message response.setVersion("v1"); // Default version + response.setQueryUpdated(false); + response.setQueryText(""); + response.setNewQueryText(null); } return response; } From 0b1e5079a9f90429c15441123b08bb3b8081a105 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Thu, 5 Feb 2026 17:29:22 +0530 Subject: [PATCH 2/7] removed unused code --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 3 +-- .../apache/zeppelin/jdbc/ValidationResponse.java | 15 --------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 22d1105e7f5..39c8304f36b 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -939,9 +939,8 @@ private InterpreterResult executeSql(String dbPrefix, String sql, context.out.write("%text " + message + "\n\n"); context.out.flush(); } - } if (response.isQueryUpdated()) { - sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : response.getQueryText(); } + sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : response.getQueryText(); } catch (Exception e) { String error = "Error occurred while sending request " + e.getMessage(); String mess = e.getLocalizedMessage(); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index a46111e7163..9e012c13e4a 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -11,7 +11,6 @@ public class ValidationResponse { private String errorHeader; private String message; private String version; - private boolean isQueryUpdated; private String queryText; private String newQueryText; @@ -63,14 +62,6 @@ public String getMessage() { public void setMessage(String message) { this.message = message; } - - public boolean isQueryUpdated() { - return isQueryUpdated; - } - - public void setQueryUpdated(boolean isQueryUpdated) { - this.isQueryUpdated = isQueryUpdated; - } public String getQueryText() { return queryText; @@ -121,11 +112,6 @@ public static ValidationResponse fromJson(String jsonResponse) { } else { response.setVersion("v1"); } - if (jsonObject.has("is_query_updated") && !jsonObject.get("is_query_updated").isJsonNull()) { - response.setQueryUpdated(jsonObject.get("is_query_updated").getAsBoolean()); - } else { - response.setQueryUpdated(false); - } if (jsonObject.has("query_text") && !jsonObject.get("query_text").isJsonNull()) { response.setQueryText(jsonObject.get("query_text").getAsString()); } else { @@ -143,7 +129,6 @@ public static ValidationResponse fromJson(String jsonResponse) { response.setErrorHeader(""); // Default error header response.setMessage(""); // Default message response.setVersion("v1"); // Default version - response.setQueryUpdated(false); response.setQueryText(""); response.setNewQueryText(null); } From 7552bcb5a93a1e3f3434c89b9cb19628cc529089 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Thu, 5 Feb 2026 19:27:35 +0530 Subject: [PATCH 3/7] fixed issue --- .../src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 39c8304f36b..a1145f3b9c2 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -940,7 +940,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, context.out.flush(); } } - sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : response.getQueryText(); + sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute; } catch (Exception e) { String error = "Error occurred while sending request " + e.getMessage(); String mess = e.getLocalizedMessage(); From ec30ba900923a7db04925319eb150ce5c206d9d6 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Fri, 6 Feb 2026 11:11:23 +0530 Subject: [PATCH 4/7] bug fix --- .../main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index a1145f3b9c2..3962cb31e42 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -929,6 +929,8 @@ private InterpreterResult executeSql(String dbPrefix, String sql, String detailedMessage = response.getMessage(); context.getLocalProperties().put(CANCEL_REASON, detailedMessage); } + + sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute; cancel(context); return new InterpreterResult(Code.ERROR); @@ -939,8 +941,8 @@ private InterpreterResult executeSql(String dbPrefix, String sql, context.out.write("%text " + message + "\n\n"); context.out.flush(); } + sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute; } - sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute; } catch (Exception e) { String error = "Error occurred while sending request " + e.getMessage(); String mess = e.getLocalizedMessage(); From 06676096ee7e017292e407b63d6d2fb0b88a3297 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Sun, 8 Feb 2026 19:45:03 +0530 Subject: [PATCH 5/7] updated variable name --- .../zeppelin/jdbc/ValidationResponse.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 9e012c13e4a..428cbeaed22 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -11,7 +11,7 @@ public class ValidationResponse { private String errorHeader; private String message; private String version; - private String queryText; + private String rawQueryText; private String newQueryText; // Getters and Setters @@ -63,12 +63,12 @@ public void setMessage(String message) { this.message = message; } - public String getQueryText() { - return queryText; + public String getRawQueryText() { + return rawQueryText; } - public void setQueryText(String queryText) { - this.queryText = queryText; + public void setRawQueryText(String rawQueryText) { + this.rawQueryText = rawQueryText; } public String getNewQueryText() { @@ -112,10 +112,10 @@ public static ValidationResponse fromJson(String jsonResponse) { } else { response.setVersion("v1"); } - if (jsonObject.has("query_text") && !jsonObject.get("query_text").isJsonNull()) { - response.setQueryText(jsonObject.get("query_text").getAsString()); + if (jsonObject.has("raw_query_text") && !jsonObject.get("raw_query_text").isJsonNull()) { + response.setRawQueryText(jsonObject.get("raw_query_text").getAsString()); } else { - response.setQueryText(""); + response.setRawQueryText(""); } if (jsonObject.has("new_query_text") && !jsonObject.get("new_query_text").isJsonNull()) { response.setNewQueryText(jsonObject.get("new_query_text").getAsString()); @@ -129,7 +129,7 @@ public static ValidationResponse fromJson(String jsonResponse) { response.setErrorHeader(""); // Default error header response.setMessage(""); // Default message response.setVersion("v1"); // Default version - response.setQueryText(""); + response.setRawQueryText(""); response.setNewQueryText(null); } return response; From 11a248f41aa42b567daa997287777233d8c881d7 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Sun, 8 Feb 2026 20:44:33 +0530 Subject: [PATCH 6/7] bug fix --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 2 -- .../org/apache/zeppelin/jdbc/ValidationRequest.java | 13 ++++++++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 3962cb31e42..92a0a4ebdec 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -929,8 +929,6 @@ private InterpreterResult executeSql(String dbPrefix, String sql, String detailedMessage = response.getMessage(); context.getLocalProperties().put(CANCEL_REASON, detailedMessage); } - - sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute; cancel(context); return new InterpreterResult(Code.ERROR); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java index a9a2aecb532..3410ba4f9cb 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java @@ -1,9 +1,19 @@ package org.apache.zeppelin.jdbc; +import com.google.gson.Gson; +import com.google.gson.annotations.SerializedName; + public class ValidationRequest { + @SerializedName("query_text") private String queryText; + + @SerializedName("user") private String user; + + @SerializedName("interpreter_name") private String interpreterName; + + @SerializedName("raw_query_text") private String rawQueryText; public ValidationRequest(String queryText, String user, String interpreterName, String rawQueryText) { @@ -14,7 +24,8 @@ public ValidationRequest(String queryText, String user, String interpreterName, } public String toJson() { - return "{\"query_text\":\"" + queryText + "\",\"user\":\"" + user + "\",\"interpreter_name\":\"" + interpreterName + "\",\"raw_query_text\":\"" + rawQueryText + "\"}"; + Gson gson = new Gson(); + return gson.toJson(this); } } From b4c749463a2281e401a8a4c9fc1910ced8a48ca9 Mon Sep 17 00:00:00 2001 From: rajesh-meesho Date: Wed, 11 Feb 2026 22:35:50 +0530 Subject: [PATCH 7/7] rca-cluster redirection --- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 59 ++++++++++++++++--- .../zeppelin/jdbc/ValidationResponse.java | 15 +++++ 2 files changed, 65 insertions(+), 9 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 92a0a4ebdec..c4420741d13 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -804,6 +804,56 @@ private InterpreterResult executeSql(String dbPrefix, String sql, ResultSet resultSet = null; String paragraphId = context.getParagraphId(); String user = getUser(context); + + String interpreterName = getInterpreterGroup().getId(); + String userName = getUser(context); + String sqlToValidate = sql + .replace("\n", " ") + .replace("\r", " ") + .replace("\t", " "); + + ValidationRequest request = new ValidationRequest(sqlToValidate, userName, + interpreterName, sql); + ValidationResponse response = null; + // String effectiveDbPrefix = dbPrefix; + + try { + response = sendValidationRequest(request); + + context.out.write("base properties map: " + basePropertiesMap.toString() + "\n"); + context.out.write("dbPrefix: " + dbPrefix + "\n"); + context.out.flush(); + + if (response.getTargetCluster() != null && + !response.getTargetCluster().isEmpty()) { + String targetClusterPrefix = response.getTargetCluster(); + + if (!targetClusterPrefix.equals(dbPrefix)) { + // Verify the target cluster exists in configuration + if (basePropertiesMap.containsKey(targetClusterPrefix)) { + LOGGER.info("Long-range query detected. Routing from '{}' to '{}' cluster", + dbPrefix, targetClusterPrefix); + + try { + context.out.write("%text Long-range query detected. " + + "Automatically routing to optimized cluster: " + + targetClusterPrefix + "\n\n"); + context.out.flush(); + } catch (IOException e) { + LOGGER.warn("Failed to write redirection notification", e); + } + + dbPrefix = targetClusterPrefix; + } else { + LOGGER.warn("Target cluster '{}' not found in configuration. " + + "Using requested cluster '{}'", + targetClusterPrefix, dbPrefix); + } + } + } + } catch (Exception e) { + LOGGER.warn("Failed to call validation API, using requested cluster: {}", dbPrefix, e); + } try { connection = getConnection(dbPrefix, context); @@ -839,8 +889,6 @@ private InterpreterResult executeSql(String dbPrefix, String sql, LOGGER.info("Execute sql: " + sqlToExecute); statement = connection.createStatement(); - String interpreterName = getInterpreterGroup().getId(); - if (interpreterName != null && interpreterName.startsWith("spark_rca_")) { statement.setQueryTimeout(10800); // 10800 seconds = 3 hours } @@ -870,14 +918,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, Boolean.parseBoolean(getProperty("hive.log.display", "true")), this); } - String userName = getUser(context); - String sqlToValidate = sqlToExecute - .replace("\n", " ") - .replace("\r", " ") - .replace("\t", " "); - ValidationRequest request = new ValidationRequest(sqlToValidate, userName, interpreterName, sqlToExecute); try { - ValidationResponse response = sendValidationRequest(request); if (response.isPreSubmitFail()) { if(response.getVersion() == "v1") { String outputMessage = response.getMessage(); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 428cbeaed22..39d9fd50a5a 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -13,6 +13,7 @@ public class ValidationResponse { private String version; private String rawQueryText; private String newQueryText; + private String targetCluster; // Getters and Setters public boolean isPreSubmitFail() { @@ -79,6 +80,14 @@ public void setNewQueryText(String newQueryText) { this.newQueryText = newQueryText; } + public String getTargetCluster() { + return targetCluster; + } + + public void setTargetCluster(String targetCluster) { + this.targetCluster = targetCluster; + } + public static ValidationResponse fromJson(String jsonResponse) { Gson gson = new Gson(); ValidationResponse response = new ValidationResponse(); @@ -122,6 +131,11 @@ public static ValidationResponse fromJson(String jsonResponse) { } else { response.setNewQueryText(null); } + if (jsonObject.has("target_cluster") && !jsonObject.get("target_cluster").isJsonNull()) { + response.setTargetCluster(jsonObject.get("target_cluster").getAsString()); + } else { + response.setTargetCluster(null); + } } else { response.setPreSubmitFail(false); response.setFailFast(false); @@ -131,6 +145,7 @@ public static ValidationResponse fromJson(String jsonResponse) { response.setVersion("v1"); // Default version response.setRawQueryText(""); response.setNewQueryText(null); + response.setTargetCluster(null); } return response; }