diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 2acd4846b72..c4420741d13 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -804,6 +804,56 @@ private InterpreterResult executeSql(String dbPrefix, String sql, ResultSet resultSet = null; String paragraphId = context.getParagraphId(); String user = getUser(context); + + String interpreterName = getInterpreterGroup().getId(); + String userName = getUser(context); + String sqlToValidate = sql + .replace("\n", " ") + .replace("\r", " ") + .replace("\t", " "); + + ValidationRequest request = new ValidationRequest(sqlToValidate, userName, + interpreterName, sql); + ValidationResponse response = null; + // String effectiveDbPrefix = dbPrefix; + + try { + response = sendValidationRequest(request); + + context.out.write("base properties map: " + basePropertiesMap.toString() + "\n"); + context.out.write("dbPrefix: " + dbPrefix + "\n"); + context.out.flush(); + + if (response.getTargetCluster() != null && + !response.getTargetCluster().isEmpty()) { + String targetClusterPrefix = response.getTargetCluster(); + + if (!targetClusterPrefix.equals(dbPrefix)) { + // Verify the target cluster exists in configuration + if (basePropertiesMap.containsKey(targetClusterPrefix)) { + LOGGER.info("Long-range query detected. Routing from '{}' to '{}' cluster", + dbPrefix, targetClusterPrefix); + + try { + context.out.write("%text Long-range query detected. " + + "Automatically routing to optimized cluster: " + + targetClusterPrefix + "\n\n"); + context.out.flush(); + } catch (IOException e) { + LOGGER.warn("Failed to write redirection notification", e); + } + + dbPrefix = targetClusterPrefix; + } else { + LOGGER.warn("Target cluster '{}' not found in configuration. " + + "Using requested cluster '{}'", + targetClusterPrefix, dbPrefix); + } + } + } + } catch (Exception e) { + LOGGER.warn("Failed to call validation API, using requested cluster: {}", dbPrefix, e); + } try { connection = getConnection(dbPrefix, context); @@ -839,8 +889,6 @@ private InterpreterResult executeSql(String dbPrefix, String sql, LOGGER.info("Execute sql: " + sqlToExecute); statement = connection.createStatement(); - String interpreterName = getInterpreterGroup().getId(); - if (interpreterName != null && interpreterName.startsWith("spark_rca_")) { statement.setQueryTimeout(10800); // 10800 seconds = 3 hours } @@ -870,14 +918,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, Boolean.parseBoolean(getProperty("hive.log.display", "true")), this); } - String userName = getUser(context); - String sqlToValidate = sqlToExecute - .replace("\n", " ") - .replace("\r", " ") - .replace("\t", " "); - ValidationRequest request = new ValidationRequest(sqlToValidate, userName, interpreterName); try { - ValidationResponse response = sendValidationRequest(request); if (response.isPreSubmitFail()) { if(response.getVersion() == "v1") { String outputMessage = response.getMessage(); @@ -917,7 +958,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, finalOutput.append("Use: ").append(jsonObject.getString(table)).append(" in place of ").append(table).append("\n"); } } - }else if (outputMessage.contains("UnAuthorized Query")) { + } else if (outputMessage.contains("UnAuthorized Query")) { context.out.write("Query Error: UnAuthorized Query\n"); finalOutput.append("You are not authorized to execute this query.\n"); } @@ -939,6 +980,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, context.out.write("%text " + message + "\n\n"); context.out.flush(); } + sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute; } } catch (Exception e) { String error = "Error occurred while sending request " + e.getMessage(); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java index 23cf6f67a60..3410ba4f9cb 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java @@ -1,18 +1,31 @@ package org.apache.zeppelin.jdbc; +import com.google.gson.Gson; +import com.google.gson.annotations.SerializedName; + public class ValidationRequest { + @SerializedName("query_text") private String queryText; + + @SerializedName("user") private String user; + + @SerializedName("interpreter_name") private String interpreterName; + + @SerializedName("raw_query_text") + private String rawQueryText; - public ValidationRequest(String queryText, String user, String interpreterName) { + public ValidationRequest(String queryText, String user, String interpreterName, String rawQueryText) { this.queryText = queryText; this.user = user; this.interpreterName = interpreterName; + this.rawQueryText = rawQueryText; } public String toJson() { - return "{\"query_text\":\"" + queryText + "\",\"user\":\"" + user + "\",\"interpreter_name\":\"" + interpreterName + "\"}"; + Gson gson = new Gson(); + return gson.toJson(this); } } diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 759e6522883..39d9fd50a5a 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -11,6 +11,9 @@ public class ValidationResponse { private String errorHeader; private String message; private String version; + private String rawQueryText; + private String newQueryText; + private String targetCluster; // Getters and Setters public boolean isPreSubmitFail() { @@ -60,6 +63,30 @@ public String getMessage() { public void setMessage(String message) { this.message = message; } + + public String getRawQueryText() { + return rawQueryText; + } + + public void setRawQueryText(String rawQueryText) { + this.rawQueryText = rawQueryText; + } + + public String getNewQueryText() { + return newQueryText; + } + + public void setNewQueryText(String newQueryText) { + this.newQueryText = newQueryText; + } + + public String getTargetCluster() { + return targetCluster; + } + + public void setTargetCluster(String targetCluster) { + this.targetCluster = targetCluster; + } public static ValidationResponse fromJson(String jsonResponse) { Gson gson = new Gson(); @@ -94,6 +121,21 @@ public static ValidationResponse fromJson(String jsonResponse) { } else { response.setVersion("v1"); } + if (jsonObject.has("raw_query_text") && !jsonObject.get("raw_query_text").isJsonNull()) { + response.setRawQueryText(jsonObject.get("raw_query_text").getAsString()); + } else { + response.setRawQueryText(""); + } + if (jsonObject.has("new_query_text") && !jsonObject.get("new_query_text").isJsonNull()) { + response.setNewQueryText(jsonObject.get("new_query_text").getAsString()); + } else { + response.setNewQueryText(null); + } + if (jsonObject.has("target_cluster") && !jsonObject.get("target_cluster").isJsonNull()) { + response.setTargetCluster(jsonObject.get("target_cluster").getAsString()); + } else { + response.setTargetCluster(null); + } } else { response.setPreSubmitFail(false); response.setFailFast(false); @@ -101,6 +143,9 @@ public static ValidationResponse fromJson(String jsonResponse) { response.setErrorHeader(""); // Default error header response.setMessage(""); // Default message response.setVersion("v1"); // Default version + response.setRawQueryText(""); + response.setNewQueryText(null); + response.setTargetCluster(null); } return response; }