-
Notifications
You must be signed in to change notification settings - Fork 2
Rca cluster redirect #31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
192bcfc
0b1e507
7552bcb
ec30ba9
0667609
11a248f
b4c7494
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -804,6 +804,56 @@ private InterpreterResult executeSql(String dbPrefix, String sql, | |||||||||||||||||||||||
| ResultSet resultSet = null; | ||||||||||||||||||||||||
| String paragraphId = context.getParagraphId(); | ||||||||||||||||||||||||
| String user = getUser(context); | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| String interpreterName = getInterpreterGroup().getId(); | ||||||||||||||||||||||||
| String userName = getUser(context); | ||||||||||||||||||||||||
| String sqlToValidate = sql | ||||||||||||||||||||||||
| .replace("\n", " ") | ||||||||||||||||||||||||
| .replace("\r", " ") | ||||||||||||||||||||||||
| .replace("\t", " "); | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| ValidationRequest request = new ValidationRequest(sqlToValidate, userName, | ||||||||||||||||||||||||
| interpreterName, sql); | ||||||||||||||||||||||||
| ValidationResponse response = null; | ||||||||||||||||||||||||
| // String effectiveDbPrefix = dbPrefix; | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||
| response = sendValidationRequest(request); | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| context.out.write("base properties map: " + basePropertiesMap.toString() + "\n"); | ||||||||||||||||||||||||
| context.out.write("dbPrefix: " + dbPrefix + "\n"); | ||||||||||||||||||||||||
| context.out.flush(); | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| if (response.getTargetCluster() != null && | ||||||||||||||||||||||||
| !response.getTargetCluster().isEmpty()) { | ||||||||||||||||||||||||
| String targetClusterPrefix = response.getTargetCluster(); | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| if (!targetClusterPrefix.equals(dbPrefix)) { | ||||||||||||||||||||||||
| // Verify the target cluster exists in configuration | ||||||||||||||||||||||||
| if (basePropertiesMap.containsKey(targetClusterPrefix)) { | ||||||||||||||||||||||||
| LOGGER.info("Long-range query detected. Routing from '{}' to '{}' cluster", | ||||||||||||||||||||||||
| dbPrefix, targetClusterPrefix); | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||
| context.out.write("%text Long-range query detected. " + | ||||||||||||||||||||||||
| "Automatically routing to optimized cluster: " + | ||||||||||||||||||||||||
| targetClusterPrefix + "\n\n"); | ||||||||||||||||||||||||
| context.out.flush(); | ||||||||||||||||||||||||
| } catch (IOException e) { | ||||||||||||||||||||||||
| LOGGER.warn("Failed to write redirection notification", e); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| dbPrefix = targetClusterPrefix; | ||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||
| LOGGER.warn("Target cluster '{}' not found in configuration. " + | ||||||||||||||||||||||||
| "Using requested cluster '{}'", | ||||||||||||||||||||||||
| targetClusterPrefix, dbPrefix); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } catch (Exception e) { | ||||||||||||||||||||||||
| LOGGER.warn("Failed to call validation API, using requested cluster: {}", dbPrefix, e); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
Comment on lines
+817
to
+856
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical: If 🐛 Proposed fix — initialize a safe default response on failure } catch (Exception e) {
LOGGER.warn("Failed to call validation API, using requested cluster: {}", dbPrefix, e);
+ response = new ValidationResponse(); // safe defaults: preSubmitFail=false, newQueryText=null
}🤖 Prompt for AI Agents |
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||
| connection = getConnection(dbPrefix, context); | ||||||||||||||||||||||||
|
|
@@ -839,8 +889,6 @@ private InterpreterResult executeSql(String dbPrefix, String sql, | |||||||||||||||||||||||
| LOGGER.info("Execute sql: " + sqlToExecute); | ||||||||||||||||||||||||
| statement = connection.createStatement(); | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| String interpreterName = getInterpreterGroup().getId(); | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| if (interpreterName != null && interpreterName.startsWith("spark_rca_")) { | ||||||||||||||||||||||||
| statement.setQueryTimeout(10800); // 10800 seconds = 3 hours | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
@@ -870,14 +918,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, | |||||||||||||||||||||||
| Boolean.parseBoolean(getProperty("hive.log.display", "true")), this); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| String userName = getUser(context); | ||||||||||||||||||||||||
| String sqlToValidate = sqlToExecute | ||||||||||||||||||||||||
| .replace("\n", " ") | ||||||||||||||||||||||||
| .replace("\r", " ") | ||||||||||||||||||||||||
| .replace("\t", " "); | ||||||||||||||||||||||||
| ValidationRequest request = new ValidationRequest(sqlToValidate, userName, interpreterName); | ||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||
| ValidationResponse response = sendValidationRequest(request); | ||||||||||||||||||||||||
| if (response.isPreSubmitFail()) { | ||||||||||||||||||||||||
| if(response.getVersion() == "v1") { | ||||||||||||||||||||||||
| String outputMessage = response.getMessage(); | ||||||||||||||||||||||||
|
|
@@ -917,7 +958,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, | |||||||||||||||||||||||
| finalOutput.append("Use: ").append(jsonObject.getString(table)).append(" in place of ").append(table).append("\n"); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| }else if (outputMessage.contains("UnAuthorized Query")) { | ||||||||||||||||||||||||
| } else if (outputMessage.contains("UnAuthorized Query")) { | ||||||||||||||||||||||||
| context.out.write("Query Error: UnAuthorized Query\n"); | ||||||||||||||||||||||||
| finalOutput.append("You are not authorized to execute this query.\n"); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
@@ -939,6 +980,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, | |||||||||||||||||||||||
| context.out.write("%text " + message + "\n\n"); | ||||||||||||||||||||||||
| context.out.flush(); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute; | ||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Consider applying the replacement before the split, or only when there's a single statement. 🔧 Proposed fix — apply replacement before splitting+ // Apply query text replacement from validation before splitting
+ if (response != null && response.getNewQueryText() != null) {
+ sql = response.getNewQueryText();
+ }
+
try {
List<String> sqlArray = sqlSplitter.splitSql(sql);
for (String sqlToExecute : sqlArray) {And remove the in-loop replacement at line 983: - sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute;📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } catch (Exception e) { | ||||||||||||||||||||||||
| String error = "Error occurred while sending request " + e.getMessage(); | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,18 +1,31 @@ | ||
| package org.apache.zeppelin.jdbc; | ||
|
|
||
| import com.google.gson.Gson; | ||
| import com.google.gson.annotations.SerializedName; | ||
|
|
||
| public class ValidationRequest { | ||
| @SerializedName("query_text") | ||
| private String queryText; | ||
|
|
||
| @SerializedName("user") | ||
| private String user; | ||
|
|
||
| @SerializedName("interpreter_name") | ||
| private String interpreterName; | ||
|
|
||
| @SerializedName("raw_query_text") | ||
| private String rawQueryText; | ||
|
|
||
| public ValidationRequest(String queryText, String user, String interpreterName) { | ||
| public ValidationRequest(String queryText, String user, String interpreterName, String rawQueryText) { | ||
| this.queryText = queryText; | ||
| this.user = user; | ||
| this.interpreterName = interpreterName; | ||
| this.rawQueryText = rawQueryText; | ||
| } | ||
|
|
||
| public String toJson() { | ||
| return "{\"query_text\":\"" + queryText + "\",\"user\":\"" + user + "\",\"interpreter_name\":\"" + interpreterName + "\"}"; | ||
| Gson gson = new Gson(); | ||
| return gson.toJson(this); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sensitive internal configuration is being written to user-facing output.
basePropertiesMap.toString()likely includes JDBC URLs, usernames, and potentially passwords. Writing this tocontext.outexposes it directly to end users. This appears to be debug code that should not be in production.🔒 Proposed fix — remove debug output
📝 Committable suggestion
🤖 Prompt for AI Agents