diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java index 6131b8d8fd..0b627baaab 100644 --- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java +++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java @@ -418,10 +418,7 @@ public Object clone() throws CloneNotSupportedException { } hash.quantity = quantity; if (specifiers != null) { - hash.specifiers = new ArrayList<>(); - for (PartitionSpecifier specifier : specifiers) { - hash.specifiers.add(specifier); - } + hash.specifiers = new ArrayList<>(specifiers); } return hash; } @@ -471,10 +468,7 @@ public Object clone() throws CloneNotSupportedException { } } if (specifiers != null) { - listPartition.specifiers = new ArrayList<>(); - for (ListPartitionSpecifier specifier : specifiers) { - listPartition.specifiers.add(specifier); - } + listPartition.specifiers = new ArrayList<>(specifiers); } return listPartition; } diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index 2c4d41e72b..10f59b60f4 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; import static org.apache.tajo.catalog.CatalogUtil.buildTableIdentifier; import static org.apache.tajo.error.Errors.ResultCode.*; @@ -522,9 +523,7 @@ public void addPartitions(String databaseName, String tableName, List getAllIndexesByTable(final String databaseNam final IndexListResponse response = stub.getAllIndexesByTable(null, proto); ensureOk(response.getState()); - List indexDescs = new ArrayList<>(); - for (IndexDescProto descProto : response.getIndexDescList()) { - indexDescs.add(new IndexDesc(descProto)); - } + List indexDescs = response.getIndexDescList().stream().map(IndexDesc::new).collect(Collectors.toList()); return indexDescs; } catch (ServiceException e) { throw new RuntimeException(e); diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java index b855c77733..7198a9f175 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java @@ -65,6 +65,7 @@ import java.io.File; import java.util.*; +import java.util.stream.Collectors; public class HiveCatalogStore extends CatalogConstants implements CatalogStore { protected final Log LOG = LogFactory.getLog(getClass()); @@ -689,11 +690,10 @@ private void renameColumn(String databaseName, String tableName, CatalogProtos.A Table table = client.getHiveClient().getTable(databaseName, tableName); List columns = table.getSd().getCols(); - for (final FieldSchema currentColumn : columns) { - if (currentColumn.getName().equalsIgnoreCase(alterColumnProto.getOldColumnName())) { - currentColumn.setName(alterColumnProto.getNewColumnName()); - } - } + columns.stream().filter(currentColumn -> currentColumn.getName() + .equalsIgnoreCase(alterColumnProto.getOldColumnName())).forEach(currentColumn -> { + currentColumn.setName(alterColumnProto.getNewColumnName()); + }); client.getHiveClient().alter_table(databaseName, tableName, table); } catch (NoSuchObjectException nsoe) { @@ -744,10 +744,8 @@ private void addPartition(String databaseName, String tableName, CatalogProtos.P params.put(StatsSetupConst.TOTAL_SIZE, Long.toString(partitionDescProto.getNumBytes())); partition.setParameters(params); - List values = Lists.newArrayList(); - for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) { - values.add(keyProto.getPartitionValue()); - } + List values = partitionDescProto.getPartitionKeysList().stream() + .map(PartitionKeyProto::getPartitionValue).collect(Collectors.toList()); partition.setValues(values); Table table = client.getHiveClient().getTable(databaseName, tableName); @@ -772,10 +770,8 @@ private void dropPartition(String databaseName, String tableName, CatalogProtos. client = clientPool.getClient(); - List values = Lists.newArrayList(); - for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) { - values.add(keyProto.getPartitionValue()); - } + List values = partitionDescProto.getPartitionKeysList().stream() + .map(PartitionKeyProto::getPartitionValue).collect(Collectors.toList()); client.getHiveClient().dropPartition(databaseName, tableName, values, true); } catch (Exception e) { throw new TajoInternalError(e); @@ -1249,10 +1245,8 @@ public void addPartitions(String databaseName, String tableName, List values = Lists.newArrayList(); - for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) { - values.add(keyProto.getPartitionValue()); - } + List values = partitionDescProto.getPartitionKeysList().stream() + .map(PartitionKeyProto::getPartitionValue).collect(Collectors.toList()); partition.setValues(values); Table table = client.getHiveClient().getTable(databaseName, tableName); diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java index 6583d4eb01..854dd0db97 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java @@ -1032,9 +1032,7 @@ public GetPartitionsResponse getPartitionsByTableName(RpcController controller, List partitions = store.getPartitionsOfTable(dbName, tbName); GetPartitionsResponse.Builder builder = GetPartitionsResponse.newBuilder(); - for (PartitionDescProto partition : partitions) { - builder.addPartition(partition); - } + partitions.forEach(builder::addPartition); builder.setState(OK); return builder.build(); @@ -1414,12 +1412,8 @@ private FunctionDescProto findFunction(String signature, List candidates = Lists.newArrayList(); if (functions.containsKey(signature)) { - for (FunctionDescProto func : functions.get(signature)) { - if (func.getSignature().getParameterTypesList() != null && - func.getSignature().getParameterTypesList().equals(params)) { - candidates.add(func); - } - } + functions.get(signature).stream().filter(func -> func.getSignature().getParameterTypesList() != null && + func.getSignature().getParameterTypesList().equals(params)).forEach(candidates::add); } /* @@ -1432,12 +1426,8 @@ private FunctionDescProto findFunction(String signature, List func.getSignature().getParameterTypesList() != null && + CatalogUtil.isMatchedFunction(func.getSignature().getParameterTypesList(), params)).forEach(candidates::add); // if there are more than one function candidates, we choose the nearest matched function. if (candidates.size() > 0) { @@ -1456,19 +1446,11 @@ private FunctionDescProto findFunction(String signature, FunctionType type, List if (functions.containsKey(signature)) { if (strictTypeCheck) { - for (FunctionDescProto func : functions.get(signature)) { - if (func.getSignature().getType() == type && - func.getSignature().getParameterTypesList().equals(params)) { - candidates.add(func); - } - } + functions.get(signature).stream().filter(func -> func.getSignature().getType() == type && + func.getSignature().getParameterTypesList().equals(params)).forEach(candidates::add); } else { - for (FunctionDescProto func : functions.get(signature)) { - if (func.getSignature().getParameterTypesList() != null && - CatalogUtil.isMatchedFunction(func.getSignature().getParameterTypesList(), params)) { - candidates.add(func); - } - } + functions.get(signature).stream().filter(func -> func.getSignature().getParameterTypesList() != null && + CatalogUtil.isMatchedFunction(func.getSignature().getParameterTypesList(), params)).forEach(candidates::add); } } diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java index d9008aa1b7..86aebf4d17 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import org.apache.tajo.exception.UndefinedTableException; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -78,12 +79,8 @@ public String getSystemDatabaseName() { } public List getAllSystemTables() { - List systemTableNames = new ArrayList<>(); - - for (TableDescriptor descriptor: schemaInfoTableDescriptors) { - systemTableNames.add(descriptor.getTableNameString()); - } - + List systemTableNames = schemaInfoTableDescriptors.stream().map(TableDescriptor::getTableNameString).collect(Collectors.toList()); + return systemTableNames; } diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java index ea15c079ea..16d5c4f62c 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java @@ -43,6 +43,8 @@ import java.util.*; import java.util.jar.JarEntry; import java.util.jar.JarFile; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class XMLCatalogSchemaManager { protected final Log LOG = LogFactory.getLog(getClass()); @@ -302,16 +304,10 @@ public void upgradeBaseSchema(Connection conn, int currentVersion) { throw new TajoInternalError("Database schema files are not loaded."); } - final List candidatePatches = new ArrayList<>(); + final List candidatePatches = this.catalogStore.getPatches().stream() + .filter(patch -> currentVersion >= patch.getPriorVersion()).sorted().collect(Collectors.toList()); Statement stmt; - - for (SchemaPatch patch: this.catalogStore.getPatches()) { - if (currentVersion >= patch.getPriorVersion()) { - candidatePatches.add(patch); - } - } - - Collections.sort(candidatePatches); + try { stmt = conn.createStatement(); } catch (SQLException e) { @@ -602,18 +598,9 @@ protected List mergeDatabaseObjects(List objects unorderedObjects.add(object); } } - - for (DatabaseObject object: orderedObjects) { - if (object != null) { - mergedObjects.add(object); - } - } - - for (DatabaseObject object: unorderedObjects) { - if (object != null) { - mergedObjects.add(object); - } - } + + Stream.concat(orderedObjects.stream(), unorderedObjects.stream()) + .filter(object -> object != null).forEach(mergedObjects::add); return mergedObjects; } @@ -643,75 +630,52 @@ protected void validatePatch(List patches, SchemaPatch testPatch) { } protected void mergePatches(List patches) { - final List objects = new ArrayList<>(); - - Collections.sort(patches); - - for (SchemaPatch patch: patches) { + patches.stream().forEachOrdered(patch -> { validatePatch(patches, patch); - - objects.clear(); + List tempObjects = new ArrayList<>(); tempObjects.addAll(patch.getObjects()); patch.clearObjects(); - patch.addObjects(mergeDatabaseObjects(tempObjects)); - + patch.addObjects(mergeDatabaseObjects(tempObjects)); + targetStore.addPatch(patch); - } + }); } protected void validateSQLObject(List queries, SQLObject testQuery) { - int occurredCount = 0; - - for (SQLObject query: queries) { - if (query.getType() == testQuery.getType()) { - occurredCount++; - } - } + int occurredCount = (int) queries.stream().filter(query -> query.getType() == testQuery.getType()).count(); if (occurredCount > 1) { throw new TajoInternalError("Duplicate Query type (" + testQuery.getType() + ") has found."); } } - protected void mergeExistQueries(List queries) { - for (SQLObject query: queries) { - validateSQLObject(queries, query); - - targetStore.addExistQuery(query); - } - } - - protected void mergeDropStatements(List queries) { - for (SQLObject query: queries) { - validateSQLObject(queries, query); - - targetStore.addDropStatement(query); - } - } - public StoreObject merge() { boolean alreadySetDatabaseObject = false; // first pass - for (StoreObject store : this.storeObjects) { - copySchemaInfo(store); - } + this.storeObjects.forEach(this::copySchemaInfo); // second pass for (StoreObject store: this.storeObjects) { - if (store.getSchema().getVersion() == targetStore.getSchema().getVersion() && + if (store.getSchema().getVersion() == targetStore.getSchema().getVersion() && !alreadySetDatabaseObject) { BaseSchema targetSchema = targetStore.getSchema(); targetSchema.clearObjects(); targetSchema.addObjects(mergeDatabaseObjects(store.getSchema().getObjects())); - + alreadySetDatabaseObject = true; } - + mergePatches(store.getPatches()); - mergeExistQueries(store.getExistQueries()); - mergeDropStatements(store.getDropStatements()); + store.getExistQueries().forEach(query -> { + validateSQLObject(store.getExistQueries(), query); + targetStore.addExistQuery(query); + }); + store.getDropStatements().forEach(query -> { + validateSQLObject(store.getDropStatements(), query); + targetStore.addDropStatement(query); + }); } return this.targetStore; diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/CliClientParamsFactory.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/CliClientParamsFactory.java index a4b0c1d640..c6b997fc0c 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/CliClientParamsFactory.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/CliClientParamsFactory.java @@ -41,11 +41,9 @@ class CliClientParamsFactory { public static Properties get(@Nullable Properties connParam) { Properties copy = connParam == null ? new Properties() : (Properties) connParam.clone(); - for (Map.Entry entry : DEFAULT_PARAMS.entrySet()) { - if (!copy.contains(entry.getKey())) { - copy.setProperty(entry.getKey(), entry.getValue()); - } - } + DEFAULT_PARAMS.entrySet().stream().filter(entry -> !copy.contains(entry.getKey())).forEach(entry -> { + copy.setProperty(entry.getKey(), entry.getValue()); + }); return copy; } } diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescFunctionCommand.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescFunctionCommand.java index 944ad0fb5f..c20cb95e2e 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescFunctionCommand.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescFunctionCommand.java @@ -92,11 +92,10 @@ public void invoke(String[] cmd) throws Exception { Map functionMap = new HashMap<>(); - for (CatalogProtos.FunctionDescProto eachFunction: functions) { - if (!functionMap.containsKey(eachFunction.getSupplement().getShortDescription())) { - functionMap.put(eachFunction.getSupplement().getShortDescription(), eachFunction); - } - } + functions.stream().filter(eachFunction -> !functionMap.containsKey(eachFunction.getSupplement() + .getShortDescription())).forEach(eachFunction -> { + functionMap.put(eachFunction.getSupplement().getShortDescription(), eachFunction); + }); for (CatalogProtos.FunctionDescProto eachFunction: functionMap.values()) { String signature = eachFunction.getSignature().getReturnType().getType() + " " + diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java index 315f954e0a..70c99276cd 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -518,9 +518,8 @@ protected Collection getBatchQueries(Collection paths) throws IOEx List queries = Lists.newArrayList(); for (Path p : paths) { - for (ParsedResult statement: SimpleParser.parseScript(FileUtil.readTextFile(new File(p.toUri())))) { - queries.add(statement.getStatement()); - } + SimpleParser.parseScript(FileUtil.readTextFile(new File(p.toUri()))).stream() + .map(ParsedResult::getStatement).forEach(queries::add); } return queries; diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java index 4b89c8e274..54b8b51f26 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -471,9 +471,7 @@ public void shutdownMiniTajoCluster() { if(this.tajoMaster != null) { this.tajoMaster.stop(); } - for(TajoWorker eachWorker: tajoWorkers) { - eachWorker.stopWorkerForce(); - } + tajoWorkers.parallelStream().forEach(TajoWorker::stopWorkerForce); tajoWorkers.clear(); this.tajoMaster= null; } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java index 828f60c7b7..2fe9f5e014 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java @@ -175,10 +175,7 @@ public final void testSessionVariables() throws IOException, TajoException, Inte String prefixName = "key_"; String prefixValue = "val_"; - List unsetList = new ArrayList<>(); - for(Map.Entry entry: client.getAllSessionVariables().entrySet()) { - unsetList.add(entry.getKey()); - } + List unsetList = new ArrayList<>(client.getAllSessionVariables().keySet()); client.unsetSessionVariables(unsetList); for (int i = 0; i < 10; i++) { diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java index 142b2c3836..18c32c6389 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java @@ -526,12 +526,10 @@ public final void testJoinWithMultipleJoinQual1() throws IOException, TajoExcept } } - for (Map.Entry entry : qualMap.entrySet()) { - if (!entry.getValue()) { - Preconditions.checkArgument(false, - "JoinQual not found. -> required JoinQual:" + entry.getKey().toJson()); - } - } + qualMap.entrySet().parallelStream().filter(entry -> !entry.getValue()).forEach(entry -> { + Preconditions.checkArgument(false, + "JoinQual not found. -> required JoinQual:" + entry.getKey().toJson()); + }); } @Test @@ -566,12 +564,10 @@ public final void testJoinWithMultipleJoinQual2() throws IOException, TajoExcept } } - for (Map.Entry entry : qualMap.entrySet()) { - if (!entry.getValue()) { - Preconditions.checkArgument(false, - "SelectionQual not found. -> required JoinQual:" + entry.getKey().toJson()); - } - } + qualMap.entrySet().parallelStream().filter(entry -> !entry.getValue()).forEach(entry -> { + Preconditions.checkArgument(false, + "SelectionQual not found. -> required JoinQual:" + entry.getKey().toJson()); + }); } @Test @@ -611,12 +607,10 @@ public final void testJoinWithMultipleJoinQual3() throws IOException, TajoExcept } } - for (Map.Entry entry : qualMap.entrySet()) { - if (!entry.getValue()) { - Preconditions.checkArgument(false, - "ScanQual not found. -> required JoinQual:" + entry.getKey().toJson()); - } - } + qualMap.entrySet().parallelStream().filter(entry -> !entry.getValue()).forEach(entry -> { + Preconditions.checkArgument(false, + "ScanQual not found. -> required JoinQual:" + entry.getKey().toJson()); + }); } @@ -680,19 +674,15 @@ public final void testJoinWithMultipleJoinQual4() throws IOException, TajoExcept } - for (Map.Entry entry : joinQualMap.entrySet()) { - if (!entry.getValue()) { - Preconditions.checkArgument(false, - "JoinQual not found. -> required JoinQual:" + entry.getKey().toJson()); - } - } + joinQualMap.entrySet().parallelStream().filter(entry -> !entry.getValue()).forEach(entry -> { + Preconditions.checkArgument(false, + "JoinQual not found. -> required JoinQual:" + entry.getKey().toJson()); + }); - for (Map.Entry entry : scanMap.entrySet()) { - if (!entry.getValue()) { - Preconditions.checkArgument(false, - "ScanQual not found. -> required JoinQual:" + entry.getKey().toJson()); - } - } + scanMap.entrySet().parallelStream().filter(entry -> !entry.getValue()).forEach(entry -> { + Preconditions.checkArgument(false, + "ScanQual not found. -> required JoinQual:" + entry.getKey().toJson()); + }); } static void testQuery7(LogicalNode plan) { diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index 14f3e9803a..7cfa0b2d09 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -844,9 +844,7 @@ public final void testAggregationFunction() throws IOException, TajoException { // Set all aggregation functions to the first phase mode GroupbyNode groupbyNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY); - for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) { - function.setFirstPhase(); - } + groupbyNode.getAggFunctions().parallelStream().forEach(AggregationFunctionCallEval::setFirstPhase); PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); @@ -875,9 +873,7 @@ public final void testCountFunction() throws IOException, TajoException { // Set all aggregation functions to the first phase mode GroupbyNode groupbyNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY); - for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) { - function.setFirstPhase(); - } + groupbyNode.getAggFunctions().parallelStream().forEach(AggregationFunctionCallEval::setFirstPhase); PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java index abec6a0443..a4d2d7cd61 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java @@ -505,9 +505,7 @@ private static void assertFetchProto(FetchProto [] expected, Map resultURLs = Sets.newHashSet(); for (Map> e : result) { - for (List list : e.values()) { - resultURLs.addAll(list); - } + e.values().forEach(resultURLs::addAll); } assertEquals(expectedURLs.size(), resultURLs.size()); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java index 66a1791945..33b386b21f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java @@ -214,9 +214,7 @@ public List getProperties() { return proto.getPropertiesList(); } else { List list = new ArrayList<>(); - for (List propertyList : properties.values()) { - list.addAll(propertyList); - } + properties.values().forEach(list::addAll); return list; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java index 10f6e54722..0b45ba532f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java @@ -103,9 +103,7 @@ public String toString() { // Add all execution blocks in a depth first and postfix order private void buildDepthFirstOrder(ExecutionBlock current) { if (!masterPlan.isLeaf(current.getId())) { - for (ExecutionBlock execBlock : masterPlan.getChilds(current)) { - buildDepthFirstOrder(execBlock); - } + masterPlan.getChilds(current).forEach(this::buildDepthFirstOrder); } orderedBlocks.add(current); } @@ -136,9 +134,7 @@ In the case of the upper plan, buildDepthFirstOrder() makes the following order orderRequiredChildCountMap.get(eachItem.parentEB.getId()).decrementAndGet(); } else { if (eachItem.allSiblingsOrdered()) { - for (BuildOrderItem eachSiblingItem: notOrderedSiblingBlocks) { - orderedBlocks.add(eachSiblingItem.eb); - } + notOrderedSiblingBlocks.stream().map(eachSiblingItem -> eachSiblingItem.eb).forEach(orderedBlocks::add); orderedBlocks.add(eachItem.eb); notOrderedSiblingBlocks.clear(); } else { @@ -162,9 +158,7 @@ private void preExecutionOrder(BuildOrderItem current) { stack.push(item); } } - for (BuildOrderItem eachItem : stack) { - preExecutionOrder(eachItem); - } + stack.forEach(this::preExecutionOrder); } executionOrderedBlocks.add(current); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index fd34c46f7d..acffd96ea2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -56,6 +56,7 @@ import java.io.IOException; import java.util.*; +import java.util.stream.Collectors; import static org.apache.tajo.conf.TajoConf.ConfVars; import static org.apache.tajo.conf.TajoConf.ConfVars.GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS; @@ -547,13 +548,9 @@ private ExecutionBlock buildGroupByIncludingDistinctFunctionsMultiStage(GlobalPl } } - List firstStageTargets = new ArrayList<>(); - for (Column column : firstStageGroupingColumns) { - firstStageTargets.add(new Target(new FieldEval(column))); - } - for (Target target : firstPhaseEvalNodeTargets) { - firstStageTargets.add(target); - } + List firstStageTargets = firstStageGroupingColumns.stream() + .map(column -> new Target(new FieldEval(column))).collect(Collectors.toList()); + firstPhaseEvalNodeTargets.forEach(firstStageTargets::add); // Create the groupby node for the first stage and set all necessary descriptions GroupbyNode firstStageGroupby = new GroupbyNode(context.plan.getLogicalPlan().newPID()); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java index a7b03e73fb..bd71fc60b6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java @@ -36,6 +36,7 @@ import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; public class MasterPlan { private final QueryId queryId; @@ -236,10 +237,8 @@ public List getChilds(ExecutionBlock execBlock) { } public List getChilds(ExecutionBlockId id) { - List childBlocks = new ArrayList<>(); - for (ExecutionBlockId cid : execBlockGraph.getChilds(id)) { - childBlocks.add(execBlockMap.get(cid)); - } + List childBlocks = execBlockGraph.getChilds(id).stream() + .map(cid -> execBlockMap.get(cid)).collect(Collectors.toList()); return childBlocks; } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java index d197bda7ef..ce10a17b60 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java @@ -76,11 +76,8 @@ public synchronized int size() { public synchronized ExecutionBlock[] first() { int max = Math.min(maximum, executable.size()); List result = new ArrayList<>(); - for (Deque queue : executable) { - if (result.size() < max && isExecutableNow(queue.peekLast())) { - result.add(queue.removeLast()); - } - } + executable.stream().filter(queue -> result.size() < max && isExecutableNow(queue.peekLast())) + .forEach(queue -> result.add(queue.removeLast())); LOG.info("Initial executable blocks " + result); return result.toArray(new ExecutionBlock[result.size()]); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java index 720c337eb4..94cdb74951 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java @@ -51,6 +51,7 @@ import org.apache.tajo.util.TUtil; import java.util.*; +import java.util.stream.Collectors; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.HASH_SHUFFLE; @@ -225,12 +226,7 @@ select col1, count(distinct col2), count(distinct col3), sum(col4) from ... grou distinctNodeBuildInfos.put(groupbyMapKey, buildInfo); // Grouping columns are GROUP BY clause's column + Distinct column. - List groupingColumns = new ArrayList<>(); - for (Column eachGroupingColumn: groupbyUniqColumns) { - if (!groupingColumns.contains(eachGroupingColumn)) { - groupingColumns.add(eachGroupingColumn); - } - } + List groupingColumns = groupbyUniqColumns.stream().distinct().collect(Collectors.toList()); distinctGroupbyNode.setGroupingColumns(groupingColumns.toArray(new Column[groupingColumns.size()])); } buildInfo.addAggFunction(aggFunction); @@ -243,9 +239,7 @@ select col1, count(distinct col2), count(distinct col3), sum(col4) from ... grou List baseGroupByTargets = new ArrayList<>(); baseGroupByTargets.add(new Target(new FieldEval(new Column("?distinctseq", Type.INT2)))); - for (Column column : originalGroupingColumns) { - baseGroupByTargets.add(new Target(new FieldEval(column))); - } + originalGroupingColumns.stream().map(column -> new Target(new FieldEval(column))).forEach(baseGroupByTargets::add); //Add child groupby node for each Distinct clause for (DistinctGroupbyNodeBuildInfo buildInfo: distinctNodeBuildInfos.values()) { @@ -391,11 +385,8 @@ select col1, count(distinct col2), count(distinct col3), sum(col4) from ... grou // Grouping columns are GROUP BY clause's column + Distinct column. List groupingColumns = new ArrayList<>(originalGroupingColumns); - for (Column eachGroupingColumn: groupbyUniqColumns) { - if (!groupingColumns.contains(eachGroupingColumn)) { - groupingColumns.add(eachGroupingColumn); - } - } + groupbyUniqColumns.stream().filter(eachGroupingColumn -> + !groupingColumns.contains(eachGroupingColumn)).forEach(groupingColumns::add); distinctGroupbyNode.setGroupingColumns(groupingColumns.toArray(new Column[groupingColumns.size()])); } buildInfo.addAggFunction(aggFunction); @@ -431,10 +422,8 @@ select col1, count(distinct col2), count(distinct col3), sum(col4) from ... grou // finally this aggregation output tuple's order is GROUP_BY_COL1, COL2, .... + AGG_VALUE, SUM_VALUE, ... GroupbyNode otherGroupbyNode = new GroupbyNode(context.getPlan().getLogicalPlan().newPID()); - List targets = new ArrayList<>(); - for (Column column : originalGroupingColumns) { - targets.add(new Target(new FieldEval(column))); - } + List targets = originalGroupingColumns.stream().map(column -> + new Target(new FieldEval(column))).collect(Collectors.toList()); targets.addAll(otherAggregationFunctionTargets); otherGroupbyNode.setTargets(targets); @@ -530,13 +519,9 @@ select col1, count(distinct col2), count(distinct col3), sum(col4) from ... grou List oldTargets = secondStageGroupbyNode.getTargets(); List secondGroupbyTargets = new ArrayList<>(); LinkedHashSet distinctColumns = EvalTreeUtil.findUniqueColumns(secondStageGroupbyNode.getAggFunctions().get(0)); - List uniqueDistinctColumn = new ArrayList<>(); // remove origin group by column from distinctColumns - for (Column eachColumn: distinctColumns) { - if (!originGroupColumns.contains(eachColumn)) { - uniqueDistinctColumn.add(eachColumn); - } - } + List uniqueDistinctColumn = distinctColumns.stream() + .filter(eachColumn -> !originGroupColumns.contains(eachColumn)).collect(Collectors.toList()); for (int i = 0; i < originGroupColumns.size(); i++) { secondGroupbyTargets.add(oldTargets.get(i)); if (grpIdx > 0) { @@ -652,9 +637,7 @@ select col1, count(distinct col2), count(distinct col3), sum(col4) from ... grou int index = 0; for(GroupbyNode eachNode: secondStageDistinctNode.getSubPlans()) { eachNode.setInSchema(firstStageDistinctNode.getOutSchema()); - for (Column column: eachNode.getOutSchema().getRootColumns()) { - secondStageInSchema.add(column); - } + eachNode.getOutSchema().getRootColumns().forEach(secondStageInSchema::add); } secondStageDistinctNode.setInSchema(secondStageInSchema.build()); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java index d390740187..4da6701699 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java @@ -224,12 +224,10 @@ private void visitNonLeafNode(Context context, ExecutionBlock current) throws Ta } JoinType joinType = ((JoinNode)found).getJoinType(); - for (ExecutionBlock child : childs) { - if (!child.isPreservedRow()) { - updateBroadcastableRelForChildEb(context, child, joinType); - updateInputBasedOnChildEb(child, current); - } - } + childs.stream().filter(child -> !child.isPreservedRow()).forEach(child -> { + updateBroadcastableRelForChildEb(context, child, joinType); + updateInputBasedOnChildEb(child, current); + }); if (current.hasBroadcastRelation()) { // The current execution block and its every child are able to be merged. @@ -245,9 +243,7 @@ private void visitNonLeafNode(Context context, ExecutionBlock current) throws Ta } } else { List relations = new ArrayList<>(current.getBroadcastRelations()); - for (ScanNode eachRelation : relations) { - current.removeBroadcastRelation(eachRelation); - } + relations.forEach(current::removeBroadcastRelation); } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java index b13cb0f1a8..566d417f64 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java @@ -36,9 +36,7 @@ public class GlobalPlanRewriteUtil { * @return */ public static ExecutionBlock mergeExecutionBlocks(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent) { - for (ScanNode broadcastable : child.getBroadcastRelations()) { - parent.addBroadcastRelation(broadcastable); - } + child.getBroadcastRelations().forEach(parent::addBroadcastRelation); // connect parent and grand children List grandChilds = plan.getChilds(child); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java index 7ab0943915..29fbb32bb5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java @@ -94,16 +94,12 @@ public BSTIndexScanExec(TaskAttemptContext context, IndexScanNode plan, private static Schema mergeSubSchemas(Schema originalSchema, Schema subSchema, List targets, EvalNode qual) { Set qualAndTargets = new HashSet<>(); qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(qual)); - for (Target target : targets) { - qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(target.getEvalTree())); - } + targets.forEach(target -> qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(target.getEvalTree()))); SchemaBuilder mergedSchema = SchemaBuilder.builder(); - for (Column column : originalSchema.getRootColumns()) { - if (subSchema.contains(column) || qualAndTargets.contains(column)) { - mergedSchema.add(column); - } - } + originalSchema.getRootColumns().stream() + .filter(column -> subSchema.contains(column) || qualAndTargets.contains(column)) + .forEach(mergedSchema::add); return mergedSchema.build(); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java index 18c6bc969d..a0bf345c9c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java @@ -363,9 +363,7 @@ public void rescan() { } public void close() throws IOException { - for (TupleSet set : distinctAggrDatas.values()) { - set.clear(); - } + distinctAggrDatas.values().forEach(TupleSet::clear); distinctAggrDatas.clear(); distinctAggrDatas = null; currentGroupingTuples = null; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java index 6607416e09..05b58a8476 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java @@ -261,9 +261,7 @@ public Tuple next() throws IOException { listIndex++; } - for (TupleList eachList : tupleSlots) { - eachList.clear(); - } + tupleSlots.forEach(TupleList::clear); tupleSlots.clear(); currentAggregatedTupleIndex = 0; @@ -440,9 +438,7 @@ public TupleList aggregate(Map groupTuples) { } public void close() throws IOException { - for (TupleMap map : hashTable.values()) { - map.clear(); - } + hashTable.values().forEach(TupleMap::clear); hashTable.clear(); hashTable = null; iterator = null; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java index 0da1aa6e05..3cd9671ed9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java @@ -234,9 +234,7 @@ public void rescan() throws IOException { @Override public void close() throws IOException{ if (partitionMemoryMap.size() > 0) { - for (RowBlock rowBlock : partitionMemoryMap.values()) { - rowBlock.release(); - } + partitionMemoryMap.values().forEach(RowBlock::release); partitionMemoryMap.clear(); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 52cb080edd..48d45f693d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -153,11 +153,7 @@ public static Schema getProjectSchema(Schema inSchema, Schema outSchema, columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree())); } - for (Column column : inSchema.getAllColumns()) { - if (columnSet.contains(column)) { - projected.add(column); - } - } + inSchema.getAllColumns().stream().filter(columnSet::contains).forEach(projected::add); return projected.build(); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java index 65c23f42ba..0bdeea8f5d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java @@ -216,49 +216,42 @@ public List getFetches() { return this.fetches; } - private void initFetches() { - if (this.fetches != null) { + private void initFetches() { + if (this.fetches != null) { return; } TaskRequestProtoOrBuilder p = viaProto ? proto : builder; - this.fetches = new ArrayList<>(); - for(FetchProto fetch : p.getFetchesList()) { - fetches.add(fetch); - } - } + this.fetches = new ArrayList<>(p.getFetchesList()); + } private void maybeInitBuilder() { - if (viaProto || builder == null) { - builder = TaskRequestProto.newBuilder(proto); - } - viaProto = true; - } + if (viaProto || builder == null) { + builder = TaskRequestProto.newBuilder(proto); + } + viaProto = true; + } - private void mergeLocalToBuilder() { - if (id != null) { - builder.setId(this.id.getProto()); - } - if (fragments != null) { - for (FragmentProto fragment : fragments) { - builder.addFragments(fragment); - } - } - if (this.outputTable != null) { - builder.setOutputTable(this.outputTable); - } - if (this.isUpdated) { - builder.setClusteredOutput(this.clusteredOutput); - } - if (this.plan != null) { - builder.setPlan(this.plan); - } - if (this.interQuery != null) { - builder.setInterQuery(this.interQuery); - } + private void mergeLocalToBuilder() { + if (id != null) { + builder.setId(this.id.getProto()); + } + if (fragments != null) { + fragments.forEach(builder::addFragments); + } + if (this.outputTable != null) { + builder.setOutputTable(this.outputTable); + } + if (this.isUpdated) { + builder.setClusteredOutput(this.clusteredOutput); + } + if (this.plan != null) { + builder.setPlan(this.plan); + } + if (this.interQuery != null) { + builder.setInterQuery(this.interQuery); + } if (this.fetches != null) { - for (FetchProto fetch : fetches) { - builder.addFetches(fetch); - } + fetches.forEach(builder::addFetches); } if (this.queryMasterHostAndPort != null) { builder.setQueryMasterHostAndPort(this.queryMasterHostAndPort); @@ -272,14 +265,14 @@ private void mergeLocalToBuilder() { if (this.enforcer != null) { builder.setEnforcer(enforcer.getProto()); } - } + } - private void mergeLocalToProto() { - if(viaProto) { - maybeInitBuilder(); - } - mergeLocalToBuilder(); - proto = builder.build(); - viaProto = true; - } + private void mergeLocalToProto() { + if(viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java index f2a2217445..e461a07db8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * This is a simple TableCache which just added CacheHolder as needed. @@ -60,12 +61,8 @@ public synchronized void releaseCache(ExecutionBlockId ebId) { } public synchronized List getCacheKeyByExecutionBlockId(ExecutionBlockId ebId) { - List keys = Lists.newArrayList(); - for (TableCacheKey eachKey : cacheMap.keySet()) { - if (eachKey.ebId.equals(ebId.toString())) { - keys.add(eachKey); - } - } + List keys = cacheMap.keySet().stream().filter(eachKey -> eachKey.ebId.equals(ebId.toString())) + .collect(Collectors.toList()); return keys; } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index b4f1d66ac7..8f80b825dd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -95,9 +95,7 @@ public void serviceInit(Configuration conf) throws Exception { @Override public void serviceStop() throws Exception { - for(QueryInProgress eachQueryInProgress: runningQueries.values()) { - eachQueryInProgress.stopProgress(); - } + runningQueries.values().forEach(QueryInProgress::stopProgress); super.serviceStop(); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java index 78cd0159c4..1ec99f5a1c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java @@ -47,13 +47,11 @@ public void prepareTest(MasterPlan plan) { for (ExecutionBlock block : cursor) { List outgoingChannels = plan.getOutgoingChannels(block.getId()); if (outgoingChannels != null) { - for (DataChannel channel : outgoingChannels) { - if (channel.hasShuffleKeys()) { - Column[] shuffleKeys = channel.getShuffleKeys(); - Arrays.sort(shuffleKeys, columnComparator); - channel.setShuffleKeys(shuffleKeys); - } - } + outgoingChannels.stream().filter(channel -> channel.hasShuffleKeys()).forEach(channel -> { + Column[] shuffleKeys = channel.getShuffleKeys(); + Arrays.sort(shuffleKeys, columnComparator); + channel.setShuffleKeys(shuffleKeys); + }); } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index ac917ef006..e4acab7d66 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -70,6 +70,7 @@ import java.util.List; import java.util.Map; import java.util.Stack; +import java.util.stream.Collectors; public class NonForwardQueryResultSystemScanner implements NonForwardQueryResultScanner { @@ -517,24 +518,19 @@ private Tuple getWorkerTuple(Schema outSchema, NodeStatus aNodeStatus) { private List getClusterInfo(Schema outSchema) { Map workerMap = masterContext.getResourceManager().getNodes(); - List tuples; List queryMasterList = new ArrayList<>(); List nodeStatusList = new ArrayList<>(); - - for (NodeStatus aNodeStatus : workerMap.values()) { + + workerMap.values().forEach(aNodeStatus -> { queryMasterList.add(aNodeStatus); nodeStatusList.add(aNodeStatus); - } - - tuples = new ArrayList<>(queryMasterList.size() + nodeStatusList.size()); - for (NodeStatus queryMaster: queryMasterList) { - tuples.add(getQueryMasterTuple(outSchema, queryMaster)); - } - - for (NodeStatus nodeStatus : nodeStatusList) { - tuples.add(getWorkerTuple(outSchema, nodeStatus)); - } - + }); + + List tuples = queryMasterList.stream().map(queryMaster -> getQueryMasterTuple(outSchema, queryMaster)) + .collect(Collectors.toList()); + + nodeStatusList.stream().map(nodeStatus -> getWorkerTuple(outSchema, nodeStatus)).forEach(tuples::add); + return tuples; } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 58f4ec013f..d7a08aec27 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -402,9 +402,7 @@ public static void startScriptExecutors(QueryContext queryContext, EvalContext e } public static void stopScriptExecutors(EvalContext evalContext) { - for (TajoScriptEngine executor : evalContext.getAllScriptEngines()) { - executor.shutdown(); - } + evalContext.getAllScriptEngines().forEach(TajoScriptEngine::shutdown); } /** diff --git a/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java index 6ffe65b059..6a9fdcc958 100644 --- a/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java +++ b/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java @@ -1351,11 +1351,9 @@ public Expr visitCreate_table_statement(Create_table_statementContext ctx) { @Override public Expr visitTruncate_table_statement(@NotNull Truncate_table_statementContext ctx) { List tableNameContexts = ctx.table_name(); - List tableNames = new ArrayList<>(); - - for (Table_nameContext eachTableNameContext: tableNameContexts) { - tableNames.add(buildIdentifierChain(eachTableNameContext.identifier())); - } + List tableNames = tableNameContexts.stream() + .map(eachTableNameContext -> buildIdentifierChain(eachTableNameContext.identifier())) + .collect(Collectors.toList()); return new TruncateTable(tableNames); } @@ -1403,11 +1401,8 @@ public PartitionMethodDescExpr parseTablePartitioningClause(Table_partitioning_c visitNumeric_value_expression(hashPartitions.hash_partitions_by_quantity().quantity)); } else { // ( PARTITION part_name , ...) - List specifiers = Lists.newArrayList(); - for (Individual_hash_partitionContext partition : - hashPartitions.individual_hash_partitions().individual_hash_partition()) { - specifiers.add(new CreateTable.PartitionSpecifier(partition.partition_name().getText())); - } + List specifiers = hashPartitions.individual_hash_partitions().individual_hash_partition() + .stream().map(partition -> new PartitionSpecifier(partition.partition_name().getText())).collect(Collectors.toList()); return new HashPartition(buildColumnReferenceList(hashPartitions.column_reference_list()), specifiers); } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index 401ba0a3a6..f700c9cee6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -63,6 +63,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.apache.tajo.ResourceProtos.*; @@ -285,15 +286,10 @@ public void handle(TaskSchedulerEvent event) { } private Set getWorkerIds(Collection hosts){ - Set workerIds = Sets.newHashSet(); - if(hosts.isEmpty()) return workerIds; + if(hosts.isEmpty()) return new HashSet<>(); - for (WorkerConnectionInfo worker : stage.getContext().getWorkerMap().values()) { - if(hosts.contains(worker.getHost())){ - workerIds.add(worker.getId()); - } - } - return workerIds; + return stage.getContext().getWorkerMap().values().stream().filter(worker -> hosts.contains(worker.getHost())) + .map(WorkerConnectionInfo::getId).collect(Collectors.toSet()); } @@ -328,9 +324,9 @@ protected LinkedList createTaskRequest(final int incompleteTas masterClientService.reserveNodeResources(callBack.getController(), request.build(), callBack); NodeResourceResponse response = callBack.get(); - for (AllocationResourceProto resource : response.getResourceList()) { - taskRequestEvents.add(new TaskRequestEvent(resource.getWorkerId(), resource, context.getBlockId())); - } + response.getResourceList().stream() + .map(resource -> new TaskRequestEvent(resource.getWorkerId(), resource, context.getBlockId())) + .forEach(taskRequestEvents::add); return taskRequestEvents; } @@ -1012,9 +1008,7 @@ public void assignToNonLeafTasks(LinkedList taskRequests) thro for(Map.Entry> entry: task.getFetchMap().entrySet()) { Collection fetches = entry.getValue(); if (fetches != null) { - for (FetchProto fetch : fetches) { - taskAssign.addFetch(fetch); - } + fetches.forEach(taskAssign::addFetch); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index 289d933bd1..9b1227ef2d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -62,6 +62,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; public class Query implements EventHandler { private static final Log LOG = LogFactory.getLog(Query.class); @@ -299,19 +300,10 @@ public void setFinishTime() { public QueryHistory getQueryHistory() { QueryHistory queryHistory = makeQueryHistory(); - queryHistory.setStageHistories(makeStageHistories()); + queryHistory.setStageHistories(getStages().stream().map(Stage::getStageHistory).collect(Collectors.toList())); return queryHistory; } - private List makeStageHistories() { - List stageHistories = new ArrayList<>(); - for(Stage eachStage : getStages()) { - stageHistories.add(eachStage.getStageHistory()); - } - - return stageHistories; - } - private QueryHistory makeQueryHistory() { QueryHistory queryHistory = new QueryHistory(); @@ -322,11 +314,11 @@ private QueryHistory makeQueryHistory() { queryHistory.setDistributedPlan(plan.toString()); List sessionVariables = new ArrayList<>(); - for(Map.Entry entry: plan.getContext().getAllKeyValus().entrySet()) { - if (SessionVars.exists(entry.getKey()) && SessionVars.isPublic(SessionVars.get(entry.getKey()))) { - sessionVariables.add(new String[]{entry.getKey(), entry.getValue()}); - } - } + plan.getContext().getAllKeyValus().entrySet().stream() + .filter(entry -> SessionVars.exists(entry.getKey()) && SessionVars.isPublic(SessionVars.get(entry.getKey()))) + .forEach(entry -> { + sessionVariables.add(new String[]{entry.getKey(), entry.getValue()}); + }); queryHistory.setSessionVariables(sessionVariables); return queryHistory; @@ -341,9 +333,7 @@ public List getPartitions() { } public void clearPartitions() { - for(Stage eachStage : getStages()) { - eachStage.clearPartitions(); - } + getStages().forEach(Stage::clearPartitions); } public SerializedException getFailureReason() { diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index adc7b089e8..ddefc19ab9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -447,20 +447,18 @@ public void run() { List tempTasks = new ArrayList<>(); tempTasks.addAll(queryMasterTasks.values()); - for(QueryMasterTask eachTask: tempTasks) { - if(!eachTask.isStopped()) { - try { - long lastHeartbeat = eachTask.getLastClientHeartbeat(); - long time = System.currentTimeMillis() - lastHeartbeat; - if(lastHeartbeat > 0 && time > querySessionTimeout * 1000) { - LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query session timeout: " + time + " ms"); - eachTask.expireQuerySession(); - } - } catch (Exception e) { - LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e); + tempTasks.stream().filter(eachTask -> !eachTask.isStopped()).forEach(eachTask -> { + try { + long lastHeartbeat = eachTask.getLastClientHeartbeat(); + long time = System.currentTimeMillis() - lastHeartbeat; + if (lastHeartbeat > 0 && time > querySessionTimeout * 1000) { + LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query session timeout: " + time + " ms"); + eachTask.expireQuerySession(); } + } catch (Exception e) { + LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e); } - } + }); } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index c264e3e75f..c75d2d1eff 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -514,14 +514,13 @@ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext sch private static void addJoinShuffle(Stage stage, int partitionId, Map> grouppedPartitions) { Map> fetches = new HashMap<>(); - for (ExecutionBlock execBlock : stage.getMasterPlan().getChilds(stage.getId())) { - if (grouppedPartitions.containsKey(execBlock.getId())) { - String name = execBlock.getId().toString(); - List requests = mergeShuffleRequest(name, partitionId, HASH_SHUFFLE, - grouppedPartitions.get(execBlock.getId())); - fetches.put(name, requests); - } - } + stage.getMasterPlan().getChilds(stage.getId()).stream() + .filter(execBlock -> grouppedPartitions.containsKey(execBlock.getId())).forEach(execBlock -> { + String name = execBlock.getId().toString(); + List requests = mergeShuffleRequest(name, partitionId, HASH_SHUFFLE, + grouppedPartitions.get(execBlock.getId())); + fetches.put(name, requests); + }); if (fetches.isEmpty()) { LOG.info(stage.getId() + "'s " + partitionId + " partition has empty result."); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index c055d119e4..738f132370 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -78,6 +78,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; import static org.apache.tajo.ResourceProtos.*; import static org.apache.tajo.conf.TajoConf.ConfVars; @@ -493,9 +494,7 @@ private StageHistory makeStageHistory() { } Set partitions = Sets.newHashSet(); - for (IntermediateEntry entry : getHashShuffleIntermediateEntries()) { - partitions.add(entry.getPartId()); - } + getHashShuffleIntermediateEntries().forEach(entry -> partitions.add(entry.getPartId())); stageHistory.setTotalInputBytes(totalInputBytes); stageHistory.setTotalReadBytes(totalReadBytes); @@ -761,9 +760,7 @@ protected void stopExecutionBlock() { if (!getContext().getQueryContext().getBool(SessionVars.DEBUG_ENABLED)) { List childs = getMasterPlan().getChilds(getId()); - for (ExecutionBlock executionBlock : childs) { - ebIds.add(executionBlock.getId().getProto()); - } + childs.stream().map(executionBlock -> executionBlock.getId().getProto()).forEach(ebIds::add); } StopExecutionBlockRequest.Builder stopRequest = StopExecutionBlockRequest.newBuilder(); @@ -1326,9 +1323,7 @@ private void finalizeShuffleReport(StageShuffleReportEvent event, ShuffleType ty completedShuffleTasks.addAndGet(report.getSucceededTasks()); if (report.getIntermediateEntriesCount() > 0) { - for (IntermediateEntryProto eachInterm : report.getIntermediateEntriesList()) { - hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm)); - } + report.getIntermediateEntriesList().stream().map(IntermediateEntry::new).forEach(hashShuffleIntermediateEntries::add); } if (completedShuffleTasks.get() >= succeededObjectCount) { diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java index f1ad931e47..6e499ab775 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java @@ -56,6 +56,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import static org.apache.tajo.ResourceProtos.*; import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; @@ -298,10 +299,7 @@ private TaskHistory makeTaskHistory() { taskHistory.setFetchs(fetchList.toArray(new String[][]{})); - List dataLocationList = new ArrayList<>(); - for(DataLocation eachLocation: getDataLocations()) { - dataLocationList.add(eachLocation.toString()); - } + List dataLocationList = getDataLocations().stream().map(DataLocation::toString).collect(Collectors.toList()); taskHistory.setDataLocations(dataLocationList.toArray(new String[dataLocationList.size()])); return taskHistory; @@ -388,9 +386,7 @@ public void setFetches(Map> fetches) { public Collection getAllFragments() { Set fragmentProtos = new HashSet<>(); - for (Set eachFragmentSet : fragMap.values()) { - fragmentProtos.addAll(eachFragmentSet); - } + fragMap.values().forEach(fragmentProtos::addAll); return fragmentProtos; } @@ -786,16 +782,13 @@ public IntermediateEntry(IntermediateEntryProto proto) { this.volume = proto.getVolume(); failureRowNums = new ArrayList<>(); - for (FailureIntermediateProto eachFailure: proto.getFailuresList()) { - - failureRowNums.add(new Pair(eachFailure.getPagePos(), - new Pair(eachFailure.getStartRowNum(), eachFailure.getEndRowNum()))); - } + proto.getFailuresList().stream().map(eachFailure -> + new Pair(eachFailure.getPagePos(), new Pair(eachFailure.getStartRowNum(), eachFailure.getEndRowNum()))) + .forEach(eachFailure -> failureRowNums.add(eachFailure)); pages = new ArrayList<>(); - for (IntermediateEntryProto.PageProto eachPage: proto.getPagesList()) { - pages.add(new Pair(eachPage.getPos(), eachPage.getLength())); - } + proto.getPagesList().stream().map(eachPage -> new Pair(eachPage.getPos(), eachPage.getLength())) + .forEach(eachPage -> pages.add(eachPage)); } public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) { diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java index 668940c4c0..bfb893b350 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java @@ -39,6 +39,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; /** * History directory structure @@ -268,13 +269,8 @@ public void run() { Calendar cal = Calendar.getInstance(); cal.add(Calendar.HOUR_OF_DAY, -2); String closeTargetTime = df.format(cal.getTime()); - List closingTargets = new ArrayList<>(); - - for (String eachWriterTime : taskWriters.keySet()) { - if (eachWriterTime.compareTo(closeTargetTime) <= 0) { - closingTargets.add(eachWriterTime); - } - } + List closingTargets = taskWriters.keySet().stream() + .filter(eachWriterTime -> eachWriterTime.compareTo(closeTargetTime) <= 0).collect(Collectors.toList()); for (String eachWriterTime : closingTargets) { WriterHolder writerHolder; diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java b/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java index bc65cae53e..f77cefccc9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java @@ -140,9 +140,7 @@ public QueryHistoryProto getProto() { List stageHistoryProtos = new ArrayList<>(); if (stageHistories != null) { - for (StageHistory eachStage: stageHistories) { - stageHistoryProtos.add((eachStage.getProto())); - } + stageHistories.stream().map(eachStage -> (eachStage.getProto())).forEach(stageHistoryProtos::add); } builder.addAllStageHistories(stageHistoryProtos); diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java index ef74a4e83e..7a6aa54f0a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java @@ -30,9 +30,7 @@ public class RegexpMetricsFilter implements MetricFilter { List filterPatterns = new ArrayList<>(); public RegexpMetricsFilter(Collection filterExpressions) { - for(String eachExpression: filterExpressions) { - filterPatterns.add(Pattern.compile(eachExpression)); - } + filterExpressions.stream().map(Pattern::compile).forEach(filterPatterns::add); } @Override diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java index 260d4147f7..c255f9a176 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java @@ -104,9 +104,7 @@ public void stop() { protected void stopAndClearReporter() { synchronized(metricsReporters) { - for(TajoMetricsScheduledReporter eachReporter: metricsReporters) { - eachReporter.close(); - } + metricsReporters.forEach(TajoMetricsScheduledReporter::close); metricsReporters.clear(); } diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java index 708c552207..2863a1d2e3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java +++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java @@ -210,9 +210,7 @@ public void service(HttpServletRequest request, } } else if("clearAllQueryRunner".equals(action)) { synchronized(queryRunners) { - for(QueryRunner eachQueryRunner: queryRunners.values()) { - eachQueryRunner.setStop(); - } + queryRunners.values().forEach(QueryRunner::setStop); queryRunners.clear(); } } else if("killQuery".equals(action)) { @@ -267,12 +265,10 @@ public void run() { List queryRunnerList; synchronized(queryRunners) { queryRunnerList = new ArrayList<>(queryRunners.values()); - for(QueryRunner eachQueryRunner: queryRunnerList) { - if(!eachQueryRunner.running.get() && - (System.currentTimeMillis() - eachQueryRunner.finishTime > 180 * 1000)) { - queryRunners.remove(eachQueryRunner.queryRunnerId); - } - } + queryRunnerList.stream().filter(eachQueryRunner -> !eachQueryRunner.running.get() && + (System.currentTimeMillis() - eachQueryRunner.finishTime > 180 * 1000)).forEach(eachQueryRunner -> { + queryRunners.remove(eachQueryRunner.queryRunnerId); + }); } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 4ab6627e8c..6c001b4abf 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -187,17 +187,14 @@ public void stop(){ } // If ExecutionBlock is stopped, all running or pending tasks will be marked as failed. - for (Task task : tasks.values()) { - if (task.getTaskContext().getState() == TajoProtos.TaskAttemptState.TA_PENDING || - task.getTaskContext().getState() == TajoProtos.TaskAttemptState.TA_RUNNING) { - - try{ - task.abort(); - } catch (Throwable e){ - LOG.error(e, e); - } + tasks.values().stream().filter(task -> task.getTaskContext().getState() == TajoProtos.TaskAttemptState.TA_PENDING || + task.getTaskContext().getState() == TajoProtos.TaskAttemptState.TA_RUNNING).forEach(task -> { + try { + task.abort(); + } catch (Throwable e) { + LOG.error(e, e); } - } + }); tasks.clear(); taskHistories.clear(); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java index b49d449fb6..5f190f3605 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java @@ -28,14 +28,11 @@ import org.apache.tajo.querymaster.Task; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.TupleRange; -import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java index d1271f88fc..ff748f5542 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java @@ -102,9 +102,7 @@ protected void serviceStop() throws Exception { isStopped = true; threadPool.shutdown(); - for (ExecutorService fetcherThreadPool : fetcherThreadPoolList) { - fetcherThreadPool.shutdown(); - } + fetcherThreadPoolList.forEach(ExecutorService::shutdown); super.serviceStop(); } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index 920dfe5205..66b8536b0e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -190,9 +190,7 @@ private void startScriptExecutors() throws IOException { } private void stopScriptExecutors() { - for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { - executor.shutdown(); - } + context.getEvalContext().getAllScriptEngines().forEach(TajoScriptEngine::shutdown); } @Override @@ -384,9 +382,7 @@ private void waitForFetch() throws InterruptedException, IOException { Set broadcastTableNames = new HashSet<>(); List broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST); if (broadcasts != null) { - for (EnforceProperty eachBroadcast : broadcasts) { - broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName()); - } + broadcasts.stream().map(eachBroadcast -> eachBroadcast.getBroadcast().getTableName()).forEach(broadcastTableNames::add); } // localize the fetched data and skip the broadcast table @@ -613,15 +609,13 @@ public void run() { try { List fetched = fetcher.get(); if (fetcher.getState() == FetcherState.FETCH_DATA_FINISHED) { - for (FileChunk eachFetch : fetched) { - if (eachFetch.getFile() != null) { - if (!eachFetch.fromRemote()) { - localChunks.add(eachFetch); - } else { - remoteChunks.add(eachFetch); - } + fetched.stream().filter(eachFetch -> eachFetch.getFile() != null).forEach(eachFetch -> { + if (!eachFetch.fromRemote()) { + localChunks.add(eachFetch); + } else { + remoteChunks.add(eachFetch); } - } + }); break; } } catch (Throwable e) { diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java index efdda9abf6..237ea71097 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java @@ -91,9 +91,7 @@ protected void serviceInit(Configuration conf) throws Exception { @Override protected void serviceStop() throws Exception { - for(ExecutionBlockContext context: executionBlockContextMap.values()) { - context.stop(); - } + executionBlockContextMap.values().forEach(ExecutionBlockContext::stop); super.serviceStop(); } @@ -216,15 +214,14 @@ public void handle(TaskManagerEvent event) { QueryStopEvent queryStopEvent = TUtil.checkTypeAndGet(event, QueryStopEvent.class); //cleanup failure ExecutionBlock - for (ExecutionBlockId ebId : executionBlockContextMap.keySet()) { - if (ebId.getQueryId().equals(queryStopEvent.getQueryId())) { - try { - executionBlockContextMap.remove(ebId).stop(); - } catch (Exception e) { - LOG.fatal(e.getMessage(), e); - } + executionBlockContextMap.keySet().stream() + .filter(ebId -> ebId.getQueryId().equals(queryStopEvent.getQueryId())).forEach(ebId -> { + try { + executionBlockContextMap.remove(ebId).stop(); + } catch (Exception e) { + LOG.fatal(e.getMessage(), e); } - } + }); workerContext.cleanup(queryStopEvent.getQueryId().toString()); break; } diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java index 9311139c41..730cc55a9f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -100,12 +101,8 @@ public Response run(JerseyResourceDelegateContext context) { MasterContext masterContext = context.get(masterContextKey); Map workerMap = masterContext.getResourceManager().getNodes(); - List workerList = new ArrayList<>(); - - for (NodeStatus nodeStatus : workerMap.values()) { - workerList.add(new WorkerResponse(nodeStatus)); - } - + List workerList = workerMap.values().stream().map(WorkerResponse::new).collect(Collectors.toList()); + Map> workerResponseMap = new HashMap<>(); workerResponseMap.put(workersName, workerList); diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/FunctionsResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/FunctionsResource.java index 871328d66d..a01c985082 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/FunctionsResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/FunctionsResource.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; @Path("/functions") public class FunctionsResource { @@ -89,11 +90,8 @@ public Response run(JerseyResourceDelegateContext context) { Collection functionDescriptors = masterContext.getCatalog().getFunctions(); if (functionDescriptors.size() > 0) { - List functionSignature = - new ArrayList<>(functionDescriptors.size()); - for (FunctionDesc functionDesc : functionDescriptors) { - functionSignature.add(functionDesc.getSignature()); - } + List functionSignature = functionDescriptors.stream().map(FunctionDesc::getSignature) + .collect(Collectors.toList()); return Response.ok(functionSignature).build(); } else { diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java index defb342ade..7aff6f744b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java @@ -44,6 +44,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; @Path("/queries") public class QueryResource { @@ -172,15 +173,7 @@ public Response run(JerseyResourceDelegateContext context) { } private List selectQueriesInfoByState(List queriesInfo, TajoProtos.QueryState state) { - List resultQueriesInfo = new ArrayList<>(queriesInfo.size() / 2); - - for (QueryInfo queryInfo: queriesInfo) { - if (state.equals(queryInfo.getQueryState())) { - resultQueriesInfo.add(queryInfo); - } - } - - return resultQueriesInfo; + return queriesInfo.stream().filter(queryInfo -> state.equals(queryInfo.getQueryState())).collect(Collectors.toList()); } private List selectQueriesInfoByTime(List queriesInfo, long startTime, long endTime) { diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java index 65d175915e..f453eb9a51 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java @@ -160,13 +160,8 @@ private static LogicalNode handleRemainingFiltersIfNecessary(JoinGraphContext jo Collection markAsEvaluated = new HashSet<>(joinGraphContext.getEvaluatedJoinConditions()); markAsEvaluated.addAll(joinGraphContext.getEvaluatedJoinFilters()); Set remainingQuals = new HashSet<>(joinGraphContext.getCandidateJoinFilters()); - for (JoinEdge eachEdge : joinEdges) { - for (EvalNode eachQual : eachEdge.getJoinQual()) { - if (!markAsEvaluated.contains(eachQual)) { - remainingQuals.add(eachQual); - } - } - } + joinEdges.forEach(eachEdge -> + eachEdge.getJoinQual().stream().filter(eachQual -> !markAsEvaluated.contains(eachQual)).forEach(remainingQuals::add)); if (!remainingQuals.isEmpty()) { LogicalNode topParent = PlannerUtil.findTopParentNode(block.getRoot(), NodeType.JOIN); @@ -199,9 +194,7 @@ public LogicalNode visitJoin(Set ctx, LogicalPlan plan, LogicalPlan.Quer super.visitJoin(ctx, plan, block, node, stack); if (node.hasTargets()) { - for (Target target : node.getTargets()) { - ctx.add(target); - } + node.getTargets().forEach(ctx::add); } return node; } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java index f321c76c83..7c8964beac 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java @@ -44,6 +44,7 @@ import java.lang.reflect.Constructor; import java.util.*; +import java.util.stream.Collectors; /** * This represents and keeps every information about a query plan for a query. @@ -266,13 +267,8 @@ public QueryBlock getBlock(LogicalNode node) { public void removeBlock(QueryBlock block) { queryBlocks.remove(block.getName()); - List tobeRemoved = new ArrayList<>(); - for (Map.Entry entry : queryBlockByPID.entrySet()) { - tobeRemoved.add(entry.getKey()); - } - for (Integer rn : tobeRemoved) { - queryBlockByPID.remove(rn); - } + List tobeRemoved = queryBlockByPID.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList()); + tobeRemoved.forEach(rn -> queryBlockByPID.remove(rn)); } public void disconnectBlocks(QueryBlock srcBlock, QueryBlock targetBlock) { @@ -288,11 +284,8 @@ public QueryBlock getParentBlock(QueryBlock block) { } public List getChildBlocks(QueryBlock block) { - List childBlocks = new ArrayList<>(); - for (String blockName : queryBlockGraph.getChilds(block.getName())) { - childBlocks.add(queryBlocks.get(blockName)); - } - return childBlocks; + return queryBlockGraph.getChilds(block.getName()).stream() + .map(blockName -> queryBlocks.get(blockName)).collect(Collectors.toList()); } public void mapExprToBlock(Expr expr, String blockName) { diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java index aaf64cd4a7..0decd8cc26 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java @@ -735,9 +735,7 @@ private LogicalNode insertWindowAggNode(PlanContext context, LogicalNode child, } } } - for (String winFuncRef : winFuncRefs) { - targets.add(block.namedExprsMgr.getTarget(winFuncRef)); - } + winFuncRefs.stream().map(block.namedExprsMgr::getTarget).forEach(targets::add); windowAggNode.setTargets(targets); verifyProjectedFields(block, windowAggNode); @@ -1189,9 +1187,7 @@ public LogicalNode visitJoin(PlanContext context, Stack stack, Join join) List newlyEvaluatedExprs = getNewlyEvaluatedExprsForJoin(context, joinNode, isTopMostJoin); List targets = new ArrayList<>(PlannerUtil.schemaToTargets(merged)); - for (String newAddedExpr : newlyEvaluatedExprs) { - targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true)); - } + newlyEvaluatedExprs.stream().map(newAddedExpr -> block.namedExprsMgr.getTarget(newAddedExpr, true)).forEach(targets::add); joinNode.setTargets(targets); // Determine join conditions @@ -1285,9 +1281,7 @@ private LogicalNode createCartesianProduct(PlanContext context, LogicalNode left } List targets = new ArrayList<>(PlannerUtil.schemaToTargets(merged)); - for (String newAddedExpr : newlyEvaluatedExprs) { - targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true)); - } + newlyEvaluatedExprs.stream().map(newAddedExpr -> block.namedExprsMgr.getTarget(newAddedExpr, true)).forEach(targets::add); join.setTargets(targets); return join; } @@ -1425,9 +1419,7 @@ private void setTargetOfTableSubQuery (PlanContext context, QueryBlock block, Ta // Assume that each unique expr is evaluated once. LinkedHashSet targets = createFieldTargetsFromRelation(block, subQueryNode, newlyEvaluatedExprs); - for (String newAddedExpr : newlyEvaluatedExprs) { - targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true)); - } + newlyEvaluatedExprs.stream().map(newAddedExpr -> block.namedExprsMgr.getTarget(newAddedExpr, true)).forEach(targets::add); subQueryNode.setTargets(new ArrayList<>(targets)); } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/NamedExprsManager.java b/tajo-plan/src/main/java/org/apache/tajo/plan/NamedExprsManager.java index ee47700d1b..b2dda5b96e 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/NamedExprsManager.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/NamedExprsManager.java @@ -223,9 +223,9 @@ public String addNamedExpr(NamedExpr namedExpr) { public Collection getAllNamedExprs() { List namedExprList = Lists.newArrayList(); - for (Map.Entry entry: idToExprBiMap.entrySet()) { - namedExprList.add(new NamedExpr(entry.getValue(), idToNamesMap.get(entry.getKey()).get(0))); - } + idToExprBiMap.entrySet().stream() + .map(entry -> new NamedExpr(entry.getValue(), idToNamesMap.get(entry.getKey()).get(0))) + .forEach(namedExprList::add); return namedExprList; } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java index c158f5744b..10a1adde67 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java @@ -32,6 +32,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * This is a greedy heuristic algorithm to find a bushy join tree. This algorithm finds @@ -46,10 +47,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm { public FoundJoinOrder findBestOrder(LogicalPlan plan, LogicalPlan.QueryBlock block, JoinGraphContext graphContext) throws TajoException { - Set vertexes = new HashSet<>(); - for (RelationNode relationNode : block.getRelations()) { - vertexes.add(new RelationVertex(relationNode)); - } + Set vertexes = block.getRelations().stream().map(RelationVertex::new).collect(Collectors.toSet()); // As illustrated at LogicalOptimizer.JoinGraphBuilder, the join graph initially forms a kind of tree. // This join graph can be updated by adding new join edges or removing existing join edges diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinGraphContext.java b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinGraphContext.java index b0a22dba93..26ed7c3392 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinGraphContext.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinGraphContext.java @@ -47,19 +47,13 @@ public JoinGraph getJoinGraph() { } public void addCandidateJoinConditions(Collection candidates) { - for (EvalNode eachCandidate : candidates) { - if (!evaluatedJoinConditions.contains(eachCandidate)) { - candidateJoinConditions.add(eachCandidate); - } - } + candidates.stream().filter(eachCandidate -> !evaluatedJoinConditions.contains(eachCandidate)) + .forEach(candidateJoinConditions::add); } public void addCandidateJoinFilters(Collection candidates) { - for (EvalNode eachCandidate : candidates) { - if (!evaluatedJoinFilters.contains(eachCandidate)) { - candidateJoinFilters.add(eachCandidate); - } - } + candidates.stream().filter(eachCandidate -> !evaluatedJoinFilters.contains(eachCandidate)) + .forEach(candidateJoinFilters::add); } public void removeCandidateJoinConditions(Collection willBeRemoved) { @@ -71,21 +65,17 @@ public void removeCandidateJoinFilters(Collection willBeRemoved) { } public void markAsEvaluatedJoinConditions(Collection willBeMarked) { - for (EvalNode eachEval : willBeMarked) { - if (candidateJoinConditions.contains(eachEval)) { - candidateJoinConditions.remove(eachEval); - evaluatedJoinConditions.add(eachEval); - } - } + willBeMarked.stream().filter(eachEval -> candidateJoinConditions.contains(eachEval)).forEach(eachEval -> { + candidateJoinConditions.remove(eachEval); + evaluatedJoinConditions.add(eachEval); + }); } public void markAsEvaluatedJoinFilters(Collection willBeMarked) { - for (EvalNode eachEval : willBeMarked) { - if (candidateJoinFilters.contains(eachEval)) { - candidateJoinFilters.remove(eachEval); - evaluatedJoinFilters.add(eachEval); - } - } + willBeMarked.stream().filter(eachEval -> candidateJoinFilters.contains(eachEval)).forEach(eachEval -> { + candidateJoinFilters.remove(eachEval); + evaluatedJoinFilters.add(eachEval); + }); } public Set getCandidateJoinConditions() { diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinOrderingUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinOrderingUtil.java index b4191391cb..77cfe8c41c 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinOrderingUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinOrderingUtil.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; public class JoinOrderingUtil { @@ -42,14 +43,10 @@ public class JoinOrderingUtil { */ public static Set findJoinConditionForJoinVertex(Set candidates, JoinEdge edge, boolean isOnPredicates) { - Set conditionsForThisJoin = new HashSet<>(); - for (EvalNode predicate : candidates) { - if (EvalTreeUtil.isJoinQual(predicate, false) - && checkIfEvaluatedAtEdge(predicate, edge, isOnPredicates)) { - conditionsForThisJoin.add(predicate); - } - } - return conditionsForThisJoin; + return candidates.stream() + .filter(predicate -> + EvalTreeUtil.isJoinQual(predicate, false) && checkIfEvaluatedAtEdge(predicate, edge, isOnPredicates)) + .collect(Collectors.toSet()); } /** diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DistinctGroupbyNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DistinctGroupbyNode.java index c9d5373441..c5c44b6701 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DistinctGroupbyNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DistinctGroupbyNode.java @@ -251,15 +251,14 @@ public Column[] getFirstStageShuffleKeyColumns() { } } } - for (GroupbyNode eachGroupbyNode: subGroupbyPlan) { - if (eachGroupbyNode.getGroupingColumns() != null && eachGroupbyNode.getGroupingColumns().length > 0) { - for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) { - if (!shuffleKeyColumns.contains(eachColumn)) { - shuffleKeyColumns.add(eachColumn); - } + subGroupbyPlan.stream().filter(eachGroupbyNode -> eachGroupbyNode.getGroupingColumns() != null + && eachGroupbyNode.getGroupingColumns().length > 0).forEach(eachGroupbyNode -> { + for (Column eachColumn : eachGroupbyNode.getGroupingColumns()) { + if (!shuffleKeyColumns.contains(eachColumn)) { + shuffleKeyColumns.add(eachColumn); } } - } + }); return shuffleKeyColumns.toArray(new Column[shuffleKeyColumns.size()]); } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java index 95f8f98090..97937d1342 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java @@ -35,6 +35,7 @@ import org.apache.tajo.util.StringUtils; import java.util.*; +import java.util.stream.Collectors; /** * Column name resolution utility. A SQL statement can include many kinds of column names, @@ -125,15 +126,8 @@ public static RelationNode lookupTable(LogicalPlan.QueryBlock block, String tabl */ public static Collection lookupTableByColumns(LogicalPlan.QueryBlock block, String columnName) { - Set found = new HashSet<>(); - - for (RelationNode rel : block.getRelations()) { - if (rel.getLogicalSchema().contains(columnName)) { - found.add(rel); - } - } - - return found; + return block.getRelations().stream().filter(rel -> rel.getLogicalSchema().contains(columnName)) + .collect(Collectors.toSet()); } /** @@ -246,25 +240,19 @@ static Column lookupColumnFromAllRelsInBlock(LogicalPlan.QueryBlock block, List candidates = new ArrayList<>(); - for (RelationNode rel : block.getRelations()) { - if (rel.isNameResolveBase()) { - Column found = rel.getLogicalSchema().getColumn(columnName); - if (found != null) { - candidates.add(found); - } + block.getRelations().stream().filter(rel -> rel.isNameResolveBase()).forEach(rel -> { + Column found = rel.getLogicalSchema().getColumn(columnName); + if (found != null) { + candidates.add(found); } - } + }); if (!candidates.isEmpty()) { return ensureUniqueColumn(candidates); } else { if (includeSelfDescTable) { - List candidateRels = new ArrayList<>(); - for (RelationNode rel : block.getRelations()) { - if (describeSchemaByItself(rel)) { - candidateRels.add(rel); - } - } + List candidateRels = block.getRelations().stream(). + filter(rel -> describeSchemaByItself(rel)).collect(Collectors.toList()); if (candidateRels.size() == 1) { return guessColumn(IdentifierUtil.buildFQName(candidateRels.get(0).getCanonicalName(), columnName)); } else if (candidateRels.size() > 1) { diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java index a49c42389f..c46779ec00 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java @@ -457,9 +457,7 @@ public int compare(Column c1, Column c2) { } // Add simple columns - for (Column eachColumn : simpleColumns) { - schema.add(eachColumn); - } + simpleColumns.forEach(schema::add); return schema.build(); } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java index 114704414c..4955799b71 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java @@ -181,11 +181,8 @@ private EvalNode rewrite(BinaryEval evalNode) { new HashSet<>(Arrays.asList(AlgebraicUtil.toConjunctiveNormalFormArray(rightChild))) : new HashSet<>(Arrays.asList(AlgebraicUtil.toDisjunctiveNormalFormArray(rightChild))); - for (EvalNode eachLeftChildSplit : leftChildSplits) { - if (rightChildSplits.contains(eachLeftChildSplit)) { - commonQuals.add(eachLeftChildSplit); - } - } + leftChildSplits.stream().filter(eachLeftChildSplit -> rightChildSplits.contains(eachLeftChildSplit)) + .forEach(commonQuals::add); if (leftChildSplits.size() == rightChildSplits.size() && commonQuals.size() == leftChildSplits.size()) { diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java index eb45af1eee..e23d5d725a 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java @@ -45,6 +45,7 @@ import org.apache.tajo.schema.IdentifierUtil; import java.util.*; +import java.util.stream.Collectors; /** * This rule tries to push down all filter conditions into logical nodes as lower as possible. @@ -150,12 +151,8 @@ public LogicalNode visitFilter(FilterPushDownContext context, LogicalPlan plan, } else { // if there remain search conditions // check if it can be evaluated here - Set matched = new HashSet<>(); - for (EvalNode eachEval : context.pushingDownFilters) { - if (LogicalPlanner.checkIfBeEvaluatedAtThis(eachEval, selNode)) { - matched.add(eachEval); - } - } + Set matched = context.pushingDownFilters.stream() + .filter(eachEval -> LogicalPlanner.checkIfBeEvaluatedAtThis(eachEval, selNode)).collect(Collectors.toSet()); // if there are search conditions which can be evaluated here, // push down them and remove them from context.pushingDownFilters. @@ -216,13 +213,11 @@ public LogicalNode visitJoin(FilterPushDownContext context, LogicalPlan plan, Lo // In this case, this join is the top most one within a query block. boolean isTopMostJoin = stack.isEmpty() ? true : stack.peek().getType() != NodeType.JOIN; - for (EvalNode evalNode : context.pushingDownFilters) { - // TODO: currently, non-equi theta join is not supported yet. - if (LogicalPlanner.isEvaluatableJoinQual(block, evalNode, joinNode, onPredicates.contains(evalNode), - isTopMostJoin)) { - matched.add(evalNode); - } - } + // TODO: currently, non-equi theta join is not supported yet. + context.pushingDownFilters.stream() + .filter(evalNode -> + LogicalPlanner.isEvaluatableJoinQual(block, evalNode, joinNode, onPredicates.contains(evalNode), isTopMostJoin)) + .forEach(matched::add); EvalNode qual = null; if (matched.size() > 1) { @@ -363,21 +358,13 @@ private static Set extractNonPushableOuterJoinQuals(final LogicalPlan } Set nonPushableQuals = new HashSet<>(); - for (EvalNode eachQual : onPredicates) { - for (String relName : preservedTableNameSet) { - if (isEvalNeedRelation(eachQual, relName)) { - nonPushableQuals.add(eachQual); - } - } - } + onPredicates.forEach(eachQual -> + preservedTableNameSet.stream().filter(relName -> isEvalNeedRelation(eachQual, relName)) + .map(relName -> eachQual).forEach(nonPushableQuals::add)); - for (EvalNode eachQual : wherePredicates) { - for (String relName : nullSupplyingTableNameSet) { - if (isEvalNeedRelation(eachQual, relName)) { - nonPushableQuals.add(eachQual); - } - } - } + wherePredicates.forEach(eachQual -> + nullSupplyingTableNameSet.stream().filter(relName -> isEvalNeedRelation(eachQual, relName)) + .map(relName -> eachQual).forEach(nonPushableQuals::add)); return nonPushableQuals; } @@ -414,12 +401,8 @@ private static boolean isNonEquiThetaJoinQual(final LogicalPlan.QueryBlock block private static List extractNonEquiThetaJoinQuals(final Set predicates, final LogicalPlan.QueryBlock block, final JoinNode joinNode) { - List nonEquiThetaJoinQuals = new ArrayList<>(); - for (EvalNode eachEval: predicates) { - if (isNonEquiThetaJoinQual(block, joinNode, eachEval)) { - nonEquiThetaJoinQuals.add(eachEval); - } - } + List nonEquiThetaJoinQuals = predicates.stream() + .filter(eachEval -> isNonEquiThetaJoinQual(block, joinNode, eachEval)).collect(Collectors.toList()); return nonEquiThetaJoinQuals; } @@ -473,11 +456,9 @@ private Map transformEvalsWidthByPassNode( } Set columns = EvalTreeUtil.findUniqueColumns(copy); - for (Column c : columns) { - if (c.hasQualifier()) { - EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), c.getSimpleName()); - } - } + columns.stream().filter(c -> c.hasQualifier()).forEach(c -> { + EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), c.getSimpleName()); + }); transformedMap.put(copy, eval); } @@ -608,9 +589,7 @@ public LogicalNode visitProjection(FilterPushDownContext context, stack.pop(); // find not matched after visiting child - for (EvalNode eval: context.pushingDownFilters) { - notMatched.add(transformedMap.get(eval)); - } + context.pushingDownFilters.stream().map(transformedMap::get).forEach(notMatched::add); EvalNode qual = null; if (notMatched.size() > 1) { @@ -647,11 +626,7 @@ public LogicalNode visitProjection(FilterPushDownContext context, } private Collection reverseTransform(BiMap map, Set remainFilters) { - Set reversed = new HashSet<>(); - for (EvalNode evalNode : remainFilters) { - reversed.add(map.get(evalNode)); - } - return reversed; + return remainFilters.stream().map(map::get).collect(Collectors.toSet()); } private BiMap findCanPushdownAndTransform( @@ -785,12 +760,8 @@ private List addHavingNode(FilterPushDownContext context, LogicalPlan GroupbyNode groupByNode) throws TajoException { // find aggregation column Set groupingColumns = new HashSet<>(Arrays.asList(groupByNode.getGroupingColumns())); - Set aggrFunctionOutColumns = new HashSet<>(); - for (Column column : groupByNode.getOutSchema().getRootColumns()) { - if (!groupingColumns.contains(column)) { - aggrFunctionOutColumns.add(column.getQualifiedName()); - } - } + Set aggrFunctionOutColumns = groupByNode.getOutSchema().getRootColumns().stream() + .filter(column -> !groupingColumns.contains(column)).map(Column::getQualifiedName).collect(Collectors.toSet()); List aggrEvalOrigins = new ArrayList<>(); List aggrEvals = new ArrayList<>(); @@ -949,11 +920,8 @@ public LogicalNode visitScan(FilterPushDownContext context, LogicalPlan plan, Map transformed = findCanPushdownAndTransform(context, block, scanNode, null, notMatched, partitionColumns, 0); - for (EvalNode eval : transformed.keySet()) { - if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, scanNode)) { - matched.add(eval); - } - } + transformed.keySet().stream() + .filter(eval -> LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, scanNode)).forEach(matched::add); EvalNode qual = null; if (matched.size() > 1) { @@ -1005,9 +973,7 @@ public LogicalNode visitScan(FilterPushDownContext context, LogicalPlan plan, } } - for (EvalNode matchedEval: matched) { - transformed.remove(matchedEval); - } + matched.forEach(transformed::remove); context.setToOrigin(transformed); context.addFiltersTobePushed(notMatched); diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java index fa0fb4ff14..4ea5a8a59d 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java @@ -42,6 +42,7 @@ import org.apache.tajo.util.TUtil; import java.util.*; +import java.util.stream.Collectors; /** * ProjectionPushDownRule deploys expressions in a selection list to proper @@ -366,11 +367,8 @@ class FilteredTargetIterator implements Iterator { Iterator iterator; public FilteredTargetIterator(Set required) { - for (String name : nameToIdBiMap.keySet()) { - if (required.contains(name)) { - filtered.add(getTarget(name)); - } - } + nameToIdBiMap.keySet().stream() + .filter(name -> required.contains(name)).map(TargetListManager.this::getTarget).forEach(filtered::add); iterator = filtered.iterator(); } @@ -432,9 +430,7 @@ public String addExpr(EvalNode evalNode) throws DuplicateColumnException { } private void addNecessaryReferences(EvalNode evalNode) { - for (Column column : EvalTreeUtil.findUniqueColumns(evalNode)) { - requiredSet.add(column.getQualifiedName()); - } + EvalTreeUtil.findUniqueColumns(evalNode).stream().map(Column::getQualifiedName).forEach(requiredSet::add); } @Override @@ -968,12 +964,7 @@ static class FilteredStringsIterator implements Iterator { Iterator iterator; FilteredStringsIterator(Collection targetNames, Collection required) { - List filtered = new ArrayList<>(); - for (String name : targetNames) { - if (required.contains(name)) { - filtered.add(name); - } - } + List filtered = targetNames.stream().filter(name -> required.contains(name)).collect(Collectors.toList()); iterator = filtered.iterator(); } @@ -1016,11 +1007,7 @@ static class FilteredIterator implements Iterator { } } - for (String name : requiredReferences) { - if (targetSet.containsKey(name)) { - filtered.add(targetSet.get(name)); - } - } + requiredReferences.stream().filter(name -> targetSet.containsKey(name)).map(targetSet::get).forEach(filtered::add); iterator = filtered.iterator(); } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java index 2b197ff0ec..39d5900397 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java @@ -86,16 +86,13 @@ public Expr visitProjection(Context context, Stack stack, Projection expr) Set names = new HashSet<>(); - for (NamedExpr namedExpr : expr.getNamedExprs()) { - - if (namedExpr.hasAlias()) { - if (names.contains(namedExpr.getAlias())) { - context.state.addVerification(SyntaxErrorUtil.makeDuplicateAlias(namedExpr.getAlias())); - } else { - names.add(namedExpr.getAlias()); - } + expr.getNamedExprs().stream().filter(namedExpr -> namedExpr.hasAlias()).forEach(namedExpr -> { + if (names.contains(namedExpr.getAlias())) { + context.state.addVerification(SyntaxErrorUtil.makeDuplicateAlias(namedExpr.getAlias())); + } else { + names.add(namedExpr.getAlias()); } - } + }); return expr; } diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java index 14db7c0bc3..96b9c9d5ff 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java @@ -95,9 +95,7 @@ protected synchronized static void clearCache() { */ public static void shutdown() throws IOException { synchronized(storageManagers) { - for (Tablespace eachTablespace : storageManagers.values()) { - eachTablespace.close(); - } + storageManagers.values().forEach(Tablespace::close); } clearCache(); } diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index e3f7c25e4b..4ac47289e1 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -372,9 +372,7 @@ public static HTableDescriptor parseHTableDescriptor(TableMeta tableMeta, Schema Collection columnFamilies = columnMapping.getColumnFamilyNames(); //If 'columns' attribute is empty, Tajo table columns are mapped to all HBase table column. if (columnFamilies.isEmpty()) { - for (Column eachColumn: schema.getRootColumns()) { - columnFamilies.add(eachColumn.getSimpleName()); - } + schema.getRootColumns().stream().map(Column::getSimpleName).forEach(columnFamilies::add); } for (String eachColumnFamily: columnFamilies) { diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java index 79203a2dc4..575c1e53d0 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java @@ -220,9 +220,7 @@ public MemoryRowBlock call() throws Exception { } public void shutdown() { - for (ExecutorService service : executors.values()) { - service.shutdownNow(); - } + executors.values().forEach(ExecutorService::shutdownNow); } public static class HashShuffleIntermediate { diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java index ccd528b1f7..756ba6ec05 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java @@ -167,9 +167,7 @@ public List> getPages() { public List>> getMergedTupleIndexes() { List>> merged = new ArrayList<>(); - for (List>> eachFailureIndex: taskTupleIndexes.values()) { - merged.addAll(eachFailureIndex); - } + taskTupleIndexes.values().forEach(merged::addAll); return merged; } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java index 5dedc2eae8..d5e2aa4713 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java @@ -1989,9 +1989,7 @@ private int writeFooter(long bodyLength) throws IOException { // serialize the types writeTypes(builder, schema); // add the stripe information - for(OrcProto.StripeInformation stripe: stripes) { - builder.addStripes(stripe); - } + stripes.forEach(builder::addStripes); // add the column statistics writeFileStatistics(builder, treeWriter); // add all of the user metadata