diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index b0acbb12a4..ec139c2ceb 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@ -25,6 +25,7 @@ import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService.BlockingInterface; import org.apache.tajo.catalog.CatalogProtocol.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.conf.TajoConf; @@ -957,4 +958,65 @@ public void updateTableStats(final UpdateTableStatsProto updateTableStatsProto) throw new RuntimeException(e); } } + + @Override + public void addDirectOutputCommitHistory(DirectOutputCommitHistoryProto history) throws DuplicateQueryIdException + , InsufficientPrivilegeException{ + try { + final BlockingInterface stub = getStub(); + final ReturnState state = stub.addDirectOutputCommitHistory(null, history); + + throwsIfThisError(state, DuplicateQueryIdException.class); + throwsIfThisError(state, InsufficientPrivilegeException.class); + ensureOk(state); + } catch (ServiceException e) { + throw new RuntimeException(e); + } + } + + @Override + public void updateDirectOutputCommitHistoryProto(UpdateDirectOutputCommitHistoryProto history) + throws UndefinedQueryIdException, InsufficientPrivilegeException { + try { + final BlockingInterface stub = getStub(); + final ReturnState state = stub.updateDirectOutputCommitHistory(null, history); + + throwsIfThisError(state, UndefinedQueryIdException.class); + throwsIfThisError(state, InsufficientPrivilegeException.class); + ensureOk(state); + } catch (ServiceException e) { + throw new RuntimeException(e); + } + } + + @Override + public List getAllDirectOutputCommitHistories() { + try { + final BlockingInterface stub = getStub(); + final DirectOutputCommitHistoryListResponse response = stub.getAllDirectOutputCommitHistories(null, + ProtoUtil.NULL_PROTO); + + ensureOk(response.getState()); + return response.getCommitHistoryList(); + + } catch (ServiceException e) { + throw new RuntimeException(e); + } + } + + @Override + public List getIncompleteDirectOutputCommitHistories() { + try { + final BlockingInterface stub = getStub(); + final DirectOutputCommitHistoryListResponse response = stub.getIncompleteDirectOutputCommitHistories(null, + ProtoUtil.NULL_PROTO); + + ensureOk(response.getState()); + return response.getCommitHistoryList(); + + } catch (ServiceException e) { + throw new RuntimeException(e); + } + } + } diff --git a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto index 33fe91e1a3..f5f1dacca7 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto +++ b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto @@ -148,4 +148,10 @@ service CatalogProtocolService { rpc getFunctions(NullProto) returns (GetFunctionsResponse); rpc getFunctionMeta(GetFunctionMetaRequest) returns (FunctionResponse); rpc containFunction(ContainFunctionRequest) returns (ReturnState); + + rpc addDirectOutputCommitHistory(DirectOutputCommitHistoryProto) returns (ReturnState); + rpc updateDirectOutputCommitHistory(UpdateDirectOutputCommitHistoryProto) returns (ReturnState); + rpc getAllDirectOutputCommitHistories(NullProto) returns (DirectOutputCommitHistoryListResponse); + rpc getIncompleteDirectOutputCommitHistories(NullProto) returns (DirectOutputCommitHistoryListResponse); + } diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java index f2acf983db..a26643ac7c 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java @@ -49,6 +49,7 @@ public class CatalogConstants { public static final String TB_PARTITION_METHODS = "PARTITION_METHODS"; public static final String TB_PARTTIONS = "PARTITIONS"; public static final String TB_PARTTION_KEYS = "PARTITION_KEYS"; + public static final String TB_DIRECT_OUTPUT_COMMIT_HISTORIES = "DIRECT_OUTPUT_COMMIT_HISTORIES"; public static final String COL_TABLESPACE_PK = "SPACE_ID"; public static final String COL_DATABASES_PK = "DB_ID"; diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java index 54d22e7878..b700312939 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java @@ -257,4 +257,17 @@ void alterTable(AlterTableDesc desc) NotImplementedException; void updateTableStats(UpdateTableStatsProto stats) throws UndefinedTableException, InsufficientPrivilegeException; + + + /************************** DirectOutputCommitter *****************************/ + + void addDirectOutputCommitHistory(DirectOutputCommitHistoryProto history) throws DuplicateQueryIdException + , InsufficientPrivilegeException; + + void updateDirectOutputCommitHistoryProto(UpdateDirectOutputCommitHistoryProto history) + throws UndefinedQueryIdException, InsufficientPrivilegeException; + + List getAllDirectOutputCommitHistories(); + + List getIncompleteDirectOutputCommitHistories(); } diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index d064c62586..984db05c65 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -429,3 +429,26 @@ message PartitionListResponse { required ReturnState state = 1; repeated PartitionDescProto partition = 2; } + + +//////////////////////////////////////////////// +// DirectOutputCommitter +//////////////////////////////////////////////// + +message DirectOutputCommitHistoryProto { + required string query_id = 1; + required string path = 2; + required int64 start_time = 3; + optional int64 end_time = 4; + required string query_state = 5; +} + +message DirectOutputCommitHistoryListResponse { + required ReturnState state = 1; + repeated DirectOutputCommitHistoryProto commitHistory = 2; +} + +message UpdateDirectOutputCommitHistoryProto { + required string query_id = 1; + required string query_state = 2; +} diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java index 6aa058561e..772ef4973f 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java @@ -1294,4 +1294,24 @@ public List getAllTables() { public List getTablespaces() { return Lists.newArrayList(getTablespace(TajoConstants.DEFAULT_TABLESPACE_NAME)); } + + @Override + public void addDirectOutputCommitHistory(DirectOutputCommitHistoryProto history) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public void updateDirectOutputCommitHistoryProto(UpdateDirectOutputCommitHistoryProto history) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public List getAllDirectOutputCommitHistories() { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public List getIncompleteDirectOutputCommitHistories() { + throw new TajoRuntimeException(new UnsupportedException()); + } } diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java index df40b9b751..59943a5457 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java @@ -45,6 +45,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.rpc.BlockingRpcServer; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListResponse; @@ -1613,6 +1614,84 @@ public ReturnState containFunction(RpcController controller, ContainFunctionRequ return returnError(t); } } + + @Override + public ReturnState addDirectOutputCommitHistory(RpcController controller, DirectOutputCommitHistoryProto request) + throws ServiceException { + wlock.lock(); + try { + store.addDirectOutputCommitHistory(request); + return OK; + + } catch (Throwable t) { + printStackTraceIfError(LOG, t); + return returnError(t); + + } finally { + wlock.unlock(); + } + } + + @Override + public ReturnState updateDirectOutputCommitHistory(RpcController controller, + UpdateDirectOutputCommitHistoryProto request) throws ServiceException { + wlock.lock(); + try { + store.updateDirectOutputCommitHistoryProto(request); + return OK; + + } catch (Throwable t) { + printStackTraceIfError(LOG, t); + return returnError(t); + + } finally { + wlock.unlock(); + } + } + + @Override + public DirectOutputCommitHistoryListResponse getAllDirectOutputCommitHistories(RpcController controller, + NullProto request) throws ServiceException { + rlock.lock(); + try { + return DirectOutputCommitHistoryListResponse.newBuilder() + .setState(OK) + .addAllCommitHistory(store.getAllDirectOutputCommitHistories()) + .build(); + + } catch (Throwable t) { + printStackTraceIfError(LOG, t); + + return DirectOutputCommitHistoryListResponse.newBuilder() + .setState(returnError(t)) + .build(); + + } finally { + rlock.unlock(); + } + } + + @Override + public DirectOutputCommitHistoryListResponse getIncompleteDirectOutputCommitHistories(RpcController controller, + NullProto request) throws ServiceException { + rlock.lock(); + try { + return DirectOutputCommitHistoryListResponse.newBuilder() + .setState(OK) + .addAllCommitHistory(store.getIncompleteDirectOutputCommitHistories()) + .build(); + + } catch (Throwable t) { + printStackTraceIfError(LOG, t); + + return DirectOutputCommitHistoryListResponse.newBuilder() + .setState(returnError(t)) + .build(); + + } finally { + rlock.unlock(); + } + } } private static class FunctionSignature { diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java index 88fabe2b8a..4eda0b42b2 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java @@ -3024,6 +3024,166 @@ private boolean existColumn(final int tableId, final String columnName) { return exist; } + private boolean existQueryIdFromDirectOutputCommitHistories(String queryId) { + ResultSet res = null; + boolean exist = false; + + String sql = "SELECT PATH FROM " + TB_DIRECT_OUTPUT_COMMIT_HISTORIES + " WHERE QUERY_ID = ? "; + + if (LOG.isDebugEnabled()) { + LOG.debug(sql); + } + + try (PreparedStatement pstmt = getConnection().prepareStatement(sql)) { + pstmt.setString(1, queryId); + res = pstmt.executeQuery(); + exist = res.next(); + } catch (SQLException se) { + throw new TajoInternalError(se); + } finally { + CatalogUtil.closeQuietly(res); + } + + return exist; + } + @Override + public void addDirectOutputCommitHistory(DirectOutputCommitHistoryProto history) throws DuplicateQueryIdException { + + if (existQueryIdFromDirectOutputCommitHistories(history.getQueryId())) { + throw new DuplicateQueryIdException(history.getQueryId()); + } + + Connection conn = null; + PreparedStatement pstmt = null; + + try { + conn = getConnection(); + conn.setAutoCommit(false); + + final String insertHistorySql = "INSERT INTO " + TB_DIRECT_OUTPUT_COMMIT_HISTORIES + + " (QUERY_ID, PATH, START_TIME, QUERY_STATE) VALUES (?, ?, ?, ?) "; + + if (LOG.isDebugEnabled()) { + LOG.debug(insertHistorySql); + } + + pstmt = conn.prepareStatement(insertHistorySql); + pstmt.setString(1, history.getQueryId()); + pstmt.setString(2, history.getPath()); + pstmt.setLong(3, history.getStartTime()); + pstmt.setString(4, history.getQueryState()); + pstmt.executeUpdate(); + conn.commit(); + } catch (SQLException se) { + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException e) { + LOG.error(e, e); + } + } + throw new TajoInternalError(se); + } finally { + CatalogUtil.closeQuietly(pstmt); + } + } + + @Override + public void updateDirectOutputCommitHistoryProto(UpdateDirectOutputCommitHistoryProto history) + throws UndefinedQueryIdException { + + if (!existQueryIdFromDirectOutputCommitHistories(history.getQueryId())) { + throw new UndefinedQueryIdException(history.getQueryId()); + } + + Connection conn = null; + PreparedStatement pstmt = null; + + try { + conn = getConnection(); + conn.setAutoCommit(false); + + final String updateHistorySql = "UPDATE " + TB_DIRECT_OUTPUT_COMMIT_HISTORIES + + " SET END_TIME = ?, QUERY_STATE = ? WHERE QUERY_ID = ? "; + + if (LOG.isDebugEnabled()) { + LOG.debug(updateHistorySql); + } + + pstmt = conn.prepareStatement(updateHistorySql); + pstmt.setLong(1, System.currentTimeMillis()); + pstmt.setString(2, history.getQueryState()); + pstmt.setString(3, history.getQueryId()); + pstmt.executeUpdate(); + conn.commit(); + } catch (SQLException se) { + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException e) { + LOG.error(e, e); + } + } + throw new TajoInternalError(se); + } finally { + CatalogUtil.closeQuietly(pstmt); + } + } + + @Override + public List getAllDirectOutputCommitHistories() { + List histories = new ArrayList<>(); + + String sql = "SELECT * FROM " + TB_DIRECT_OUTPUT_COMMIT_HISTORIES; + + try (Statement stmt = getConnection().createStatement(); + ResultSet rs = stmt.executeQuery(sql)) { + while (rs.next()) { + DirectOutputCommitHistoryProto.Builder builder = DirectOutputCommitHistoryProto.newBuilder(); + + builder.setQueryId(rs.getString("QUERY_ID")); + builder.setPath(rs.getString("PATH")); + builder.setStartTime(rs.getLong("START_TIME")); + builder.setEndTime(rs.getLong("END_TIME")); + builder.setQueryState(rs.getString("QUERY_STATE")); + histories.add(builder.build()); + } + + } catch (SQLException e) { + throw new TajoInternalError(e); + } + + return histories; + } + + @Override + public List getIncompleteDirectOutputCommitHistories() { + List histories = new ArrayList<>(); + + String sql = "SELECT * FROM " + TB_DIRECT_OUTPUT_COMMIT_HISTORIES + + " WHERE END_TIME IS NULL " + + " OR QUERY_STATE NOT IN ('QUERY_SUCCEEDED', 'QUERY_FAILED', 'QUERY_KILLED', 'QUERY_ERROR')"; + + try (Statement stmt = getConnection().createStatement(); + ResultSet rs = stmt.executeQuery(sql)) { + while (rs.next()) { + DirectOutputCommitHistoryProto.Builder builder = DirectOutputCommitHistoryProto.newBuilder(); + + builder.setQueryId(rs.getString("QUERY_ID")); + builder.setPath(rs.getString("PATH")); + builder.setStartTime(rs.getLong("START_TIME")); + builder.setEndTime(rs.getLong("END_TIME")); + builder.setQueryState(rs.getString("QUERY_STATE")); + histories.add(builder.build()); + } + + } catch (SQLException e) { + throw new TajoInternalError(e); + } + + return histories; + } + class PartitionFilterSet { private String columnName; private List> parameters; diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java index 52889791c3..f8e203dbd3 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java @@ -226,4 +226,15 @@ boolean existIndexesByTable(String databaseName, String tableName) void existFunction(FunctionDesc func); List getAllFunctionNames(); + + /************************** DirectOutputCommitter *****************************/ + + void addDirectOutputCommitHistory(DirectOutputCommitHistoryProto history) throws DuplicateQueryIdException; + + void updateDirectOutputCommitHistoryProto(UpdateDirectOutputCommitHistoryProto history) + throws UndefinedQueryIdException; + + List getAllDirectOutputCommitHistories(); + + List getIncompleteDirectOutputCommitHistories(); } diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml index 96100e8430..316abd53a3 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml @@ -19,6 +19,7 @@ - + @@ -176,6 +177,19 @@ )]]> + + + + diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml index c0dadaa3da..166b3c9f6e 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml @@ -19,6 +19,7 @@