Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 52 additions & 10 deletions jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,56 @@ private InterpreterResult executeSql(String dbPrefix, String sql,
ResultSet resultSet = null;
String paragraphId = context.getParagraphId();
String user = getUser(context);

String interpreterName = getInterpreterGroup().getId();
String userName = getUser(context);
String sqlToValidate = sql
.replace("\n", " ")
.replace("\r", " ")
.replace("\t", " ");

ValidationRequest request = new ValidationRequest(sqlToValidate, userName,
interpreterName, sql);
ValidationResponse response = null;
// String effectiveDbPrefix = dbPrefix;

try {
response = sendValidationRequest(request);

context.out.write("base properties map: " + basePropertiesMap.toString() + "\n");
context.out.write("dbPrefix: " + dbPrefix + "\n");
context.out.flush();
Comment on lines +820 to +825
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Sensitive internal configuration is being written to user-facing output.

basePropertiesMap.toString() likely includes JDBC URLs, usernames, and potentially passwords. Writing this to context.out exposes it directly to end users. This appears to be debug code that should not be in production.

🔒 Proposed fix — remove debug output
-      context.out.write("base properties map: " + basePropertiesMap.toString() + "\n");
-      context.out.write("dbPrefix: " + dbPrefix + "\n");
-      context.out.flush();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try {
response = sendValidationRequest(request);
context.out.write("base properties map: " + basePropertiesMap.toString() + "\n");
context.out.write("dbPrefix: " + dbPrefix + "\n");
context.out.flush();
try {
response = sendValidationRequest(request);
🤖 Prompt for AI Agents
In `@jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java` around
lines 820 - 825, Remove the debug lines that write internal config to user
output: delete the context.out.write(...) and context.out.flush() calls that
print basePropertiesMap and dbPrefix after sendValidationRequest; if you need
visibility for debugging, log only non-sensitive, sanitized values via a secure
logger (not context.out) or mask credentials when referencing basePropertiesMap
in any logs, and ensure sendValidationRequest remains unchanged.


if (response.getTargetCluster() != null &&
!response.getTargetCluster().isEmpty()) {
String targetClusterPrefix = response.getTargetCluster();

if (!targetClusterPrefix.equals(dbPrefix)) {
// Verify the target cluster exists in configuration
if (basePropertiesMap.containsKey(targetClusterPrefix)) {
LOGGER.info("Long-range query detected. Routing from '{}' to '{}' cluster",
dbPrefix, targetClusterPrefix);

try {
context.out.write("%text Long-range query detected. " +
"Automatically routing to optimized cluster: " +
targetClusterPrefix + "\n\n");
context.out.flush();
} catch (IOException e) {
LOGGER.warn("Failed to write redirection notification", e);
}

dbPrefix = targetClusterPrefix;
} else {
LOGGER.warn("Target cluster '{}' not found in configuration. " +
"Using requested cluster '{}'",
targetClusterPrefix, dbPrefix);
}
}
}
} catch (Exception e) {
LOGGER.warn("Failed to call validation API, using requested cluster: {}", dbPrefix, e);
}
Comment on lines +817 to +856
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: response is null when validation API call fails, causing NPE at line 922.

If sendValidationRequest throws (line 854 catch block), response remains null (initialized on line 817). Later at line 922, response.isPreSubmitFail() dereferences it, throwing a NullPointerException. While the surrounding catch on line 985 prevents a crash, the user sees a misleading error message and the intended graceful fallback is broken.

🐛 Proposed fix — initialize a safe default response on failure
     } catch (Exception e) {
       LOGGER.warn("Failed to call validation API, using requested cluster: {}", dbPrefix, e);
+      response = new ValidationResponse(); // safe defaults: preSubmitFail=false, newQueryText=null
     }
🤖 Prompt for AI Agents
In `@jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java` around
lines 817 - 856, The ValidationResponse variable `response` can remain null if
`sendValidationRequest(request)` throws, leading to an NPE later when code calls
methods like `response.isPreSubmitFail()`; modify the catch block around
`sendValidationRequest` to initialize `response` to a safe default (e.g., a new
ValidationResponse populated with conservative values such as
preSubmitFail=false and empty/neutral fields) so subsequent uses (in code
referencing `response`, `response.getTargetCluster()`,
`response.isPreSubmitFail()`, etc.) never dereference null; alternatively add a
defensive null-check before any later `response` method calls, but the preferred
fix is to assign a well-defined default ValidationResponse inside the existing
catch for `sendValidationRequest`.


try {
connection = getConnection(dbPrefix, context);
Expand Down Expand Up @@ -839,8 +889,6 @@ private InterpreterResult executeSql(String dbPrefix, String sql,
LOGGER.info("Execute sql: " + sqlToExecute);
statement = connection.createStatement();

String interpreterName = getInterpreterGroup().getId();

if (interpreterName != null && interpreterName.startsWith("spark_rca_")) {
statement.setQueryTimeout(10800); // 10800 seconds = 3 hours
}
Expand Down Expand Up @@ -870,14 +918,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql,
Boolean.parseBoolean(getProperty("hive.log.display", "true")), this);
}

String userName = getUser(context);
String sqlToValidate = sqlToExecute
.replace("\n", " ")
.replace("\r", " ")
.replace("\t", " ");
ValidationRequest request = new ValidationRequest(sqlToValidate, userName, interpreterName);
try {
ValidationResponse response = sendValidationRequest(request);
if (response.isPreSubmitFail()) {
if(response.getVersion() == "v1") {
String outputMessage = response.getMessage();
Expand Down Expand Up @@ -917,7 +958,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql,
finalOutput.append("Use: ").append(jsonObject.getString(table)).append(" in place of ").append(table).append("\n");
}
}
}else if (outputMessage.contains("UnAuthorized Query")) {
} else if (outputMessage.contains("UnAuthorized Query")) {
context.out.write("Query Error: UnAuthorized Query\n");
finalOutput.append("You are not authorized to execute this query.\n");
}
Expand All @@ -939,6 +980,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql,
context.out.write("%text " + message + "\n\n");
context.out.flush();
}
sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

newQueryText replacement applies uniformly to every statement in a multi-statement batch.

sqlSplitter.splitSql(sql) on line 878 may produce multiple statements. Line 983 replaces each sqlToExecute with the same response.getNewQueryText(), discarding the original individual statements. If the validation API returns a rewritten query, it likely corresponds to the full SQL, not each split fragment.

Consider applying the replacement before the split, or only when there's a single statement.

🔧 Proposed fix — apply replacement before splitting
+    // Apply query text replacement from validation before splitting
+    if (response != null && response.getNewQueryText() != null) {
+      sql = response.getNewQueryText();
+    }
+
     try {
       List<String>  sqlArray = sqlSplitter.splitSql(sql);
       for (String sqlToExecute : sqlArray) {

And remove the in-loop replacement at line 983:

-              sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute;
// Apply query text replacement from validation before splitting
if (response != null && response.getNewQueryText() != null) {
sql = response.getNewQueryText();
}
try {
List<String> sqlArray = sqlSplitter.splitSql(sql);
for (String sqlToExecute : sqlArray) {
// ... (loop content - the line below has been removed)
// Previous line removed: sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute;
🤖 Prompt for AI Agents
In `@jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java` at line
983, The current logic applies response.getNewQueryText() inside the loop over
statements produced by sqlSplitter.splitSql(sql), causing the same rewritten SQL
to replace each split fragment; instead, detect if response.getNewQueryText() is
non-null before splitting (or only replace when sqlSplitter.splitSql(sql) yields
a single statement), set sql = response.getNewQueryText() (or skip per-fragment
replacement) and then call sqlSplitter.splitSql(sql) so each fragment remains
correct; update references in JDBCInterpreter to remove the in-loop assignment
sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() :
sqlToExecute and apply the replacement at the top-level using
response.getNewQueryText() or conditionally when there is exactly one split
statement.

}
} catch (Exception e) {
String error = "Error occurred while sending request " + e.getMessage();
Expand Down
17 changes: 15 additions & 2 deletions jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
package org.apache.zeppelin.jdbc;

import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;

public class ValidationRequest {
@SerializedName("query_text")
private String queryText;

@SerializedName("user")
private String user;

@SerializedName("interpreter_name")
private String interpreterName;

@SerializedName("raw_query_text")
private String rawQueryText;

public ValidationRequest(String queryText, String user, String interpreterName) {
public ValidationRequest(String queryText, String user, String interpreterName, String rawQueryText) {
this.queryText = queryText;
this.user = user;
this.interpreterName = interpreterName;
this.rawQueryText = rawQueryText;
}

public String toJson() {
return "{\"query_text\":\"" + queryText + "\",\"user\":\"" + user + "\",\"interpreter_name\":\"" + interpreterName + "\"}";
Gson gson = new Gson();
return gson.toJson(this);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ public class ValidationResponse {
private String errorHeader;
private String message;
private String version;
private String rawQueryText;
private String newQueryText;
private String targetCluster;

// Getters and Setters
public boolean isPreSubmitFail() {
Expand Down Expand Up @@ -60,6 +63,30 @@ public String getMessage() {
public void setMessage(String message) {
this.message = message;
}

public String getRawQueryText() {
return rawQueryText;
}

public void setRawQueryText(String rawQueryText) {
this.rawQueryText = rawQueryText;
}

public String getNewQueryText() {
return newQueryText;
}

public void setNewQueryText(String newQueryText) {
this.newQueryText = newQueryText;
}

public String getTargetCluster() {
return targetCluster;
}

public void setTargetCluster(String targetCluster) {
this.targetCluster = targetCluster;
}

public static ValidationResponse fromJson(String jsonResponse) {
Gson gson = new Gson();
Expand Down Expand Up @@ -94,13 +121,31 @@ public static ValidationResponse fromJson(String jsonResponse) {
} else {
response.setVersion("v1");
}
if (jsonObject.has("raw_query_text") && !jsonObject.get("raw_query_text").isJsonNull()) {
response.setRawQueryText(jsonObject.get("raw_query_text").getAsString());
} else {
response.setRawQueryText("");
}
if (jsonObject.has("new_query_text") && !jsonObject.get("new_query_text").isJsonNull()) {
response.setNewQueryText(jsonObject.get("new_query_text").getAsString());
} else {
response.setNewQueryText(null);
}
if (jsonObject.has("target_cluster") && !jsonObject.get("target_cluster").isJsonNull()) {
response.setTargetCluster(jsonObject.get("target_cluster").getAsString());
} else {
response.setTargetCluster(null);
}
} else {
response.setPreSubmitFail(false);
response.setFailFast(false);
response.setFailedByDeprecatedTable(false);
response.setErrorHeader(""); // Default error header
response.setMessage(""); // Default message
response.setVersion("v1"); // Default version
response.setRawQueryText("");
response.setNewQueryText(null);
response.setTargetCluster(null);
}
return response;
}
Expand Down