diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStoreClientPool.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStoreClientPool.java index 8be3a73448..7180694270 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStoreClientPool.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStoreClientPool.java @@ -48,14 +48,11 @@ public class HiveCatalogStoreClient { private HiveCatalogStoreClient(HiveConf hiveConf) { try { - HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() { - @Override - public HiveMetaHook getHook(Table table) throws MetaException { - /* metadata hook implementation, or null if this - * storage handler does not need any metadata notifications - */ - return null; - } + HiveMetaHookLoader hookLoader = table -> { + /* metadata hook implementation, or null if this + * storage handler does not need any metadata notifications + */ + return null; }; this.hiveClient = RetryingMetaStoreClient.getProxy(hiveConf, hookLoader, HiveMetaStoreClient.class.getName()); diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java index 6583d4eb01..d413ddf5ca 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java @@ -299,15 +299,10 @@ public GetTablespaceListResponse getAllTablespaces(RpcController controller, Nul // retrieves tablespaces from linked meta data tableSpaces.addAll(Collections2.transform(linkedMetadataManager.getTablespaces(), - new Function, TablespaceProto>() { - @Override - public TablespaceProto apply(Pair input) { - return TablespaceProto.newBuilder() - .setSpaceName(input.getFirst()) - .setUri(input.getSecond().toString()) - .build(); - } - })); + input -> TablespaceProto.newBuilder() + .setSpaceName(input.getFirst()) + .setUri(input.getSecond().toString()) + .build())); return GetTablespaceListResponse.newBuilder() .setState(OK) diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LinkedMetadataManager.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LinkedMetadataManager.java index 1ffe550fab..759c001d54 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LinkedMetadataManager.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LinkedMetadataManager.java @@ -77,12 +77,7 @@ public Collection getTablespaceNames() { */ public Optional> getTablespace(final String spaceName) { Collection filtered = filter(providerMap.values(), - new Predicate() { - @Override - public boolean apply(@Nullable MetadataProvider input) { - return input.getTablespaceName().equals(spaceName); - } - }); + input -> input.getTablespaceName().equals(spaceName)); if (filtered.isEmpty()) { return Optional.empty(); diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java index ea15c079ea..1f0c6393d5 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java @@ -447,14 +447,8 @@ protected String[] listJarResources(URL dirURL, FilenameFilter filter) protected String[] listResources() throws IOException, URISyntaxException { String[] files = new String[0]; URL dirURL = ClassLoader.getSystemResource(schemaPath); - FilenameFilter fileFilter = new FilenameFilter() { - - @Override - public boolean accept(File dir, String name) { - return ((name.lastIndexOf('.') > -1) && - (".xml".equalsIgnoreCase(name.substring(name.lastIndexOf('.'))))); - } - }; + FilenameFilter fileFilter = (dir, name) -> ((name.lastIndexOf('.') > -1) && + (".xml".equalsIgnoreCase(name.substring(name.lastIndexOf('.'))))); if (dirURL == null) { throw new FileNotFoundException(schemaPath); diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestLinkedMetadataManager.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestLinkedMetadataManager.java index dece380ea8..53e0afa82f 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestLinkedMetadataManager.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestLinkedMetadataManager.java @@ -177,12 +177,7 @@ public void testGetTablespace() throws Exception { @Test public void testGetTablespaces() throws Exception { Collection names = Collections2.transform(catalog.getAllTablespaces(), - new Function() { - @Override - public String apply(@Nullable CatalogProtos.TablespaceProto input) { - return input.getSpaceName(); - } - }); + input -> input.getSpaceName()); assertEquals(Sets.newHashSet("space1", "space2", "default"), Sets.newHashSet(names)); } diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java index e6e8410ccf..6bf6ee1cd1 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java @@ -440,15 +440,12 @@ private Collection getKeywords() { } private void addShutdownHook() { - ShutdownHookManager.get().addShutdownHook(new Runnable() { - @Override - public void run() { - try { - history.flush(); - } catch (IOException e) { - } - client.close(); + ShutdownHookManager.get().addShutdownHook(() -> { + try { + history.flush(); + } catch (IOException e) { } + client.close(); }, SHUTDOWN_HOOK_PRIORITY); } diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java index 315f954e0a..77e6feb683 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -902,12 +902,7 @@ private Collection getPositiveQueryFiles() throws IOException { throw new IOException("Cannot find " + positiveQueryDir); } - return Collections2.transform(Lists.newArrayList(fs.listStatus(positiveQueryDir)), new Function(){ - @Override - public Path apply(@Nullable FileStatus fileStatus) { - return fileStatus.getPath(); - } - }); + return Collections2.transform(Lists.newArrayList(fs.listStatus(positiveQueryDir)), fileStatus -> fileStatus.getPath()); } private Collection getNegativeQueryFiles() throws IOException { @@ -918,12 +913,7 @@ private Collection getNegativeQueryFiles() throws IOException { throw new IOException("Cannot find " + positiveQueryDir); } - return Collections2.transform(Lists.newArrayList(fs.listStatus(positiveQueryDir)),new Function(){ - @Override - public Path apply(@Nullable FileStatus fileStatus) { - return fileStatus.getPath(); - } - }); + return Collections2.transform(Lists.newArrayList(fs.listStatus(positiveQueryDir)), fileStatus -> fileStatus.getPath()); } private Path getQueryFilePath(String fileName) throws IOException { diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java index 828f60c7b7..a39b990f62 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java @@ -770,12 +770,7 @@ public void testGetQueryInfoAndHistory() throws Exception { List taskHistories = new ArrayList<>(queryHistory.getStageHistoriesList()); - Collections.sort(taskHistories, new Comparator() { - @Override - public int compare(ClientProtos.StageHistoryProto o1, StageHistoryProto o2) { - return o1.getExecutionBlockId().compareTo(o2.getExecutionBlockId()); - } - }); + Collections.sort(taskHistories, (o1, o2) -> o1.getExecutionBlockId().compareTo(o2.getExecutionBlockId())); assertEquals(8, taskHistories.get(0).getTotalReadRows()); assertEquals(1, taskHistories.get(0).getTotalWriteRows()); assertEquals(1, taskHistories.get(1).getTotalReadRows()); diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/client/v2/TestTajoClientV2.java b/tajo-core-tests/src/test/java/org/apache/tajo/client/v2/TestTajoClientV2.java index f6b4f88e02..1d3385cf8c 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/client/v2/TestTajoClientV2.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/client/v2/TestTajoClientV2.java @@ -157,30 +157,27 @@ public void testExecuteQueryAsyncWithListener() throws TajoException, IOExceptio final AtomicBoolean success = new AtomicBoolean(false); final List resultContainer = Lists.newArrayList(); - future.addListener(new FutureListener() { - @Override - public void processingCompleted(QueryFuture future) { - try { - ResultSet result = future.get(); - resultContainer.add(result); // for better error handling, it should be verified outside this future. - - assertTrue(future.isDone()); - assertEquals(QueryState.COMPLETED, future.state()); - assertTrue(future.isSuccessful()); - assertFalse(future.isFailed()); - assertFalse(future.isKilled()); - assertTrue(1.0f == future.progress()); - assertEquals("default", future.queue()); - - assertTrue(future.submitTime() > 0); - assertTrue(future.startTime() > 0); - assertTrue(future.finishTime() > 0); - - success.set(true); - - } catch (Throwable t) { - throw new RuntimeException(t); - } + future.addListener(queryFuture -> { + try { + ResultSet result = queryFuture.get(); + resultContainer.add(result); // for better error handling, it should be verified outside this future. + + assertTrue(queryFuture.isDone()); + assertEquals(QueryState.COMPLETED, queryFuture.state()); + assertTrue(queryFuture.isSuccessful()); + assertFalse(queryFuture.isFailed()); + assertFalse(queryFuture.isKilled()); + assertTrue(1.0f == queryFuture.progress()); + assertEquals("default", queryFuture.queue()); + + assertTrue(queryFuture.submitTime() > 0); + assertTrue(queryFuture.startTime() > 0); + assertTrue(queryFuture.finishTime() > 0); + + success.set(true); + + } catch (Throwable t) { + throw new RuntimeException(t); } }); diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java index 142b2c3836..4089bc16a3 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java @@ -1355,12 +1355,7 @@ public void testSelectFromSelfDescTable() throws Exception { // projection column test List targets = projectionNode.getTargets(); - Collections.sort(targets, new Comparator() { - @Override - public int compare(Target o1, Target o2) { - return o1.getCanonicalName().compareTo(o2.getCanonicalName()); - } - }); + Collections.sort(targets, (o1, o2) -> o1.getCanonicalName().compareTo(o2.getCanonicalName())); assertEquals(3, targets.size()); assertEquals("default.self_desc_table1.dept", targets.get(0).getCanonicalName()); assertEquals("default.self_desc_table1.id", targets.get(1).getCanonicalName()); @@ -1370,12 +1365,7 @@ public int compare(Target o1, Target o2) { assertEquals(NodeType.SCAN, projectionNode.getChild().getType()); ScanNode scanNode = projectionNode.getChild(); targets = scanNode.getTargets(); - Collections.sort(targets, new Comparator() { - @Override - public int compare(Target o1, Target o2) { - return o1.getCanonicalName().compareTo(o2.getCanonicalName()); - } - }); + Collections.sort(targets, (o1, o2) -> o1.getCanonicalName().compareTo(o2.getCanonicalName())); assertEquals(3, targets.size()); assertEquals("default.self_desc_table1.dept", targets.get(0).getCanonicalName()); assertEquals("default.self_desc_table1.id", targets.get(1).getCanonicalName()); @@ -1409,12 +1399,7 @@ public void testSelectWhereFromSelfDescTable() throws Exception { // projection column test List targets = projectionNode.getTargets(); - Collections.sort(targets, new Comparator() { - @Override - public int compare(Target o1, Target o2) { - return o1.getCanonicalName().compareTo(o2.getCanonicalName()); - } - }); + Collections.sort(targets, (o1, o2) -> o1.getCanonicalName().compareTo(o2.getCanonicalName())); assertEquals(2, targets.size()); assertEquals("default.self_desc_table1.dept", targets.get(0).getCanonicalName()); assertEquals("default.self_desc_table1.name", targets.get(1).getCanonicalName()); @@ -1428,12 +1413,7 @@ public int compare(Target o1, Target o2) { assertEquals(NodeType.SCAN, selectionNode.getChild().getType()); ScanNode scanNode = selectionNode.getChild(); targets = scanNode.getTargets(); - Collections.sort(targets, new Comparator() { - @Override - public int compare(Target o1, Target o2) { - return o1.getCanonicalName().compareTo(o2.getCanonicalName()); - } - }); + Collections.sort(targets, (o1, o2) -> o1.getCanonicalName().compareTo(o2.getCanonicalName())); assertEquals(4, targets.size()); assertEquals("?greaterthan", targets.get(0).getCanonicalName()); assertEquals("default.self_desc_table1.dept", targets.get(1).getCanonicalName()); diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java index 125a2ec677..d33fb7c666 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java @@ -193,16 +193,21 @@ protected static void createCommonTables() throws Exception { // According to node type(leaf or non-leaf) Broadcast join is determined differently by Repartitioner. // testMultipleBroadcastDataFileWithZeroLength testcase is for the leaf node - createMultiFile("nation", 2, new TupleCreator() { - public Tuple createTuple(String[] columnDatas) { - return new VTuple(new Datum[]{ - columnDatas[0].equals("") ? NullDatum.get() : new Int4Datum(Integer.parseInt(columnDatas[0])), - columnDatas[1].equals("") ? NullDatum.get() : new TextDatum(columnDatas[1]), - columnDatas[2].equals("") ? NullDatum.get() : new Int4Datum(Integer.parseInt(columnDatas[2])), - columnDatas[3].equals("") ? NullDatum.get() : new TextDatum(columnDatas[3]) - }); - } - }); +//<<<<<<< 1c44272bff0fc0022a1c8ce060b70d11a30c59e0 + createMultiFile("nation", 2, columnDatas -> new VTuple(new Datum[]{ + columnDatas[0].equals("") ? NullDatum.get() : new Int4Datum(Integer.parseInt(columnDatas[0])), + columnDatas[1].equals("") ? NullDatum.get() : new TextDatum(columnDatas[1]), + columnDatas[2].equals("") ? NullDatum.get() : new Int4Datum(Integer.parseInt(columnDatas[2])), + columnDatas[3].equals("") ? NullDatum.get() : new TextDatum(columnDatas[3]) + })); +//======= +// createMultiFile("nation", 2, columnDatas -> new VTuple(new Datum[]{ +// new Int4Datum(Integer.parseInt(columnDatas[0])), +// new TextDatum(columnDatas[1]), +// new Int4Datum(Integer.parseInt(columnDatas[2])), +// new TextDatum(columnDatas[3]) +// })); +//>>>>>>> initial commit addEmptyDataFile("nation_multifile", false); } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTableCache.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTableCache.java index 2161074640..650ca12df5 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTableCache.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTableCache.java @@ -67,43 +67,40 @@ public void testBroadcastTableCache() throws Exception { } private Callable> createTask(final TableCacheKey key, final ExecutionBlockSharedResource resource) { - return new Callable>() { - @Override - public CacheHolder call() throws Exception { - CacheHolder result; - synchronized (resource.getLock()) { - if (!TableCache.getInstance().hasCache(key)) { - final long nanoTime = System.nanoTime(); - final TableStats tableStats = new TableStats(); - tableStats.setNumRows(100); - tableStats.setNumBytes(1000); - - final CacheHolder cacheHolder = new CacheHolder() { - - @Override - public Long getData() { - return nanoTime; - } - - @Override - public TableStats getTableStats() { - return tableStats; - } - - @Override - public void release() { - - } - }; - - resource.addBroadcastCache(key, cacheHolder); - } + return () -> { + CacheHolder result; + synchronized (resource.getLock()) { + if (!TableCache.getInstance().hasCache(key)) { + final long nanoTime = System.nanoTime(); + final TableStats tableStats = new TableStats(); + tableStats.setNumRows(100); + tableStats.setNumBytes(1000); + + final CacheHolder cacheHolder = new CacheHolder() { + + @Override + public Long getData() { + return nanoTime; + } + + @Override + public TableStats getTableStats() { + return tableStats; + } + + @Override + public void release() { + + } + }; + + resource.addBroadcastCache(key, cacheHolder); } - - CacheHolder holder = resource.getBroadcastCache(key); - result = (CacheHolder) holder; - return result; } + + CacheHolder holder = resource.getBroadcastCache(key); + result = (CacheHolder) holder; + return result; }; } } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java index 41ebd67a2e..d36196ed68 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java @@ -267,13 +267,10 @@ public QueryContext getQueryContext() { @Override protected boolean startQuery(final QueryId queryId, final AllocationResourceProto allocation) { - executorService.schedule(new Runnable() { - @Override - public void run() { - barrier.release(); - qmAllocationMap.put(queryId, allocation); - rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE)); - } + executorService.schedule((Runnable) () -> { + barrier.release(); + qmAllocationMap.put(queryId, allocation); + rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE)); }, testDelay, TimeUnit.MILLISECONDS); return true; } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/parser/sql/TestSQLAnalyzer.java b/tajo-core-tests/src/test/java/org/apache/tajo/parser/sql/TestSQLAnalyzer.java index e2968a8664..5b6a7d1240 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/parser/sql/TestSQLAnalyzer.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/parser/sql/TestSQLAnalyzer.java @@ -72,26 +72,18 @@ public Collection getResourceFiles(String subdir) throws URISyntaxExceptio // get only files Collection files = filter(Lists.newArrayList(fs.listStatus(positiveQueryDir)), - new Predicate() { - @Override - public boolean apply(@Nullable FileStatus input) { - // TODO: This should be removed at TAJO-1891 - if (input.getPath().getName().indexOf("add_partition") > -1) { - return false; - } else { - return input.isFile(); - } - } + input -> { + // TODO: This should be removed at TAJO-1891 + if (input.getPath().getName().indexOf("add_partition") > -1) { + return false; + } else { + return input.isFile(); } + } ); // transform FileStatus into File - return transform(files, new Function() { - @Override - public File apply(@Nullable FileStatus fileStatus) { - return new File(URI.create(fileStatus.getPath().toString())); - } - }); + return transform(files, fileStatus -> new File(URI.create(fileStatus.getPath().toString()))); } /** @@ -102,14 +94,11 @@ public File apply(@Nullable FileStatus fileStatus) { * @throws URISyntaxException */ public Collection> getFileContents(String subdir) throws IOException, URISyntaxException { - return transform(getResourceFiles(subdir), new Function>() { - @Override - public Pair apply(@Nullable File file) { - try { - return new Pair<>(file.getName(), FileUtil.readTextFile(file)); - } catch (IOException e) { - throw new RuntimeException(e); - } + return transform(getResourceFiles(subdir), file -> { + try { + return new Pair<>(file.getName(), FileUtil.readTextFile(file)); + } catch (IOException e) { + throw new RuntimeException(e); } }); } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java index baae7edb82..ccbf3ff90a 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java @@ -147,12 +147,7 @@ private void assertStatus(QueryId queryId, int numStages, List stages = queryHistory.getStageHistories(); assertEquals(numStages, stages.size()); - Collections.sort(stages, new Comparator() { - @Override - public int compare(StageHistory o1, StageHistory o2) { - return o1.getExecutionBlockId().compareTo(o2.getExecutionBlockId()); - } - }); + Collections.sort(stages, (o1, o2) -> o1.getExecutionBlockId().compareTo(o2.getExecutionBlockId())); int index = 0; StringBuilder expectedString = new StringBuilder(); diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java index 3901cdb6f6..41683c6899 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java @@ -252,36 +252,33 @@ public void testParallelRequest() throws Exception { List futureList = Lists.newArrayList(); for (int i = 0; i < parallelCount; i++) { - futureList.add(executor.submit(new Runnable() { - @Override - public void run() { - int complete = 0; - while (true) { - TaskAllocationProto task = totalTasks.poll(); - if (task == null) break; - - - BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder(); - requestProto.addTaskRequest(task); - requestProto.setExecutionBlockId(ebId.getProto()); - - CallFuture callFuture = new CallFuture<>(); - dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); - try { - BatchAllocationResponse proto = callFuture.get(); - if (proto.getCancellationTaskCount() > 0) { - totalTasks.addAll(proto.getCancellationTaskList()); - totalCanceled.addAndGet(proto.getCancellationTaskCount()); - } else { - complete++; - } - } catch (Exception e) { - fail(e.getMessage()); - } - } - totalComplete.addAndGet(complete); + futureList.add(executor.submit((Runnable) () -> { + int complete = 0; + while (true) { + TaskAllocationProto task1 = totalTasks.poll(); + if (task1 == null) break; + + + BatchAllocationRequest.Builder requestProto1 = BatchAllocationRequest.newBuilder(); + requestProto1.addTaskRequest(task1); + requestProto1.setExecutionBlockId(ebId.getProto()); + + CallFuture callFuture1 = new CallFuture<>(); + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto1.build(), callFuture1)); + try { + BatchAllocationResponse proto = callFuture1.get(); + if (proto.getCancellationTaskCount() > 0) { + totalTasks.addAll(proto.getCancellationTaskList()); + totalCanceled.addAndGet(proto.getCancellationTaskCount()); + } else { + complete++; } - }) + } catch (Exception e) { + fail(e.getMessage()); + } + } + totalComplete.addAndGet(complete); + }) ); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java index 96ed0a6e74..6c4778d8dc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java @@ -172,11 +172,8 @@ protected Iterator rightFiltered(Iterable rightTuples) { if (rightJoinFilter == null) { return rightTuples.iterator(); } - return Iterators.filter(rightTuples.iterator(), new Predicate() { - @Override - public boolean apply(Tuple input) { - return rightJoinFilter.eval(input).isTrue(); - } + return Iterators.filter(rightTuples.iterator(), input -> { + return rightJoinFilter.eval(input).isTrue(); }); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java index 44649cd100..08c8509c54 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java @@ -63,16 +63,11 @@ public void swap(int i1, int i2) { @Override public Iterable sort() { new QuickSort().sort(this, 0, mappings.length); - return new Iterable() { - @Override - public Iterator iterator() { - return new Iterator() { - int index; - public boolean hasNext() { return index < mappings.length; } - public Tuple next() { return tuples[mappings[index++]]; } - public void remove() { throw new TajoRuntimeException(new UnsupportedException()); } - }; - } + return () -> new Iterator() { + int index; + public boolean hasNext() { return index < mappings.length; } + public Tuple next() { return tuples[mappings[index++]]; } + public void remove() { throw new TajoRuntimeException(new UnsupportedException()); } }; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index c088a8b4c6..b6642a4f60 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -154,11 +154,7 @@ private QueryContext createQueryContext(Session session) { // Set queryCache in session int queryCacheSize = context.getConf().getIntVar(TajoConf.ConfVars.QUERY_SESSION_QUERY_CACHE_SIZE); if (queryCacheSize > 0 && session.getQueryCache() == null) { - Weigher weighByLength = new Weigher() { - public int weigh(String key, Expr expr) { - return key.length(); - } - }; + Weigher weighByLength = (key, expr) -> key.length(); LoadingCache cache = CacheBuilder.newBuilder() .maximumWeight(queryCacheSize * 1024) .weigher(weighByLength) diff --git a/tajo-core/src/main/java/org/apache/tajo/metrics/ClusterResourceMetricSet.java b/tajo-core/src/main/java/org/apache/tajo/metrics/ClusterResourceMetricSet.java index 1798f22e88..a7d8878be0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/metrics/ClusterResourceMetricSet.java +++ b/tajo-core/src/main/java/org/apache/tajo/metrics/ClusterResourceMetricSet.java @@ -39,54 +39,19 @@ public ClusterResourceMetricSet(TajoMaster.MasterContext masterContext) { public Map getMetrics() { Map metricsMap = new HashMap<>(); - metricsMap.put(Cluster.TOTAL_NODES.name(), new Gauge() { - @Override - public Integer getValue() { - return masterContext.getResourceManager().getNodes().size(); - } - }); + metricsMap.put(Cluster.TOTAL_NODES.name(), (Gauge) () -> masterContext.getResourceManager().getNodes().size()); - metricsMap.put(Cluster.ACTIVE_NODES.name(), new Gauge() { - @Override - public Integer getValue() { - return getNumWorkers(NodeState.RUNNING); - } - }); + metricsMap.put(Cluster.ACTIVE_NODES.name(), (Gauge) () -> getNumWorkers(NodeState.RUNNING)); - metricsMap.put(Cluster.LOST_NODES.name(), new Gauge() { - @Override - public Integer getValue() { - return getNumWorkers(NodeState.LOST); - } - }); + metricsMap.put(Cluster.LOST_NODES.name(), (Gauge) () -> getNumWorkers(NodeState.LOST)); - metricsMap.put(Cluster.TOTAL_MEMORY.name(), new Gauge() { - @Override - public Integer getValue() { - return masterContext.getResourceManager().getScheduler().getMaximumResourceCapability().getMemory(); - } - }); + metricsMap.put(Cluster.TOTAL_MEMORY.name(), (Gauge) () -> masterContext.getResourceManager().getScheduler().getMaximumResourceCapability().getMemory()); - metricsMap.put(Cluster.FREE_MEMORY.name(), new Gauge() { - @Override - public Integer getValue() { - return masterContext.getResourceManager().getScheduler().getClusterResource().getMemory(); - } - }); + metricsMap.put(Cluster.FREE_MEMORY.name(), (Gauge) () -> masterContext.getResourceManager().getScheduler().getClusterResource().getMemory()); - metricsMap.put(Cluster.TOTAL_VCPU.name(), new Gauge() { - @Override - public Integer getValue() { - return masterContext.getResourceManager().getScheduler().getMaximumResourceCapability().getVirtualCores(); - } - }); + metricsMap.put(Cluster.TOTAL_VCPU.name(), (Gauge) () -> masterContext.getResourceManager().getScheduler().getMaximumResourceCapability().getVirtualCores()); - metricsMap.put(Cluster.FREE_VCPU.name(), new Gauge() { - @Override - public Integer getValue() { - return masterContext.getResourceManager().getScheduler().getClusterResource().getVirtualCores(); - } - }); + metricsMap.put(Cluster.FREE_VCPU.name(), (Gauge) () -> masterContext.getResourceManager().getScheduler().getClusterResource().getVirtualCores()); return metricsMap; } diff --git a/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java index 6ffe65b059..bac9272e7b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java +++ b/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java @@ -2035,11 +2035,6 @@ private static String buildIdentifier(IdentifierContext identifier) { } private static String buildIdentifierChain(final Collection identifierChains) { - return Joiner.on(".").join(Collections2.transform(identifierChains, new Function() { - @Override - public String apply(IdentifierContext identifierContext) { - return buildIdentifier(identifierContext); - } - })); + return Joiner.on(".").join(Collections2.transform(identifierChains, identifierContext -> buildIdentifier(identifierContext))); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index 401ba0a3a6..7c81a29dfe 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -722,17 +722,14 @@ private TaskAttemptId allocateRackTask(String host) { if (remainingTasks.size() > 0) { synchronized (scheduledRequests) { //find largest remaining task of other host in rack - Collections.sort(remainingTasks, new Comparator() { - @Override - public int compare(HostVolumeMapping v1, HostVolumeMapping v2) { - // descending remaining tasks - if (v2.remainTasksNum.get() > v1.remainTasksNum.get()) { - return 1; - } else if (v2.remainTasksNum.get() == v1.remainTasksNum.get()) { - return 0; - } else { - return -1; - } + Collections.sort(remainingTasks, (v1, v2) -> { + // descending remaining tasks + if (v2.remainTasksNum.get() > v1.remainTasksNum.get()) { + return 1; + } else if (v2.remainTasksNum.get() == v1.remainTasksNum.get()) { + return 0; + } else { + return -1; } }); } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index 8eaab3f417..1f05b80b0e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -281,12 +281,7 @@ protected void killTaskAttempt(int workerId, TaskAttemptId taskAttemptId) { private class LocalTaskEventHandler implements EventHandler { @Override public void handle(final LocalTaskEvent event) { - queryMasterContext.getEventExecutor().submit(new Runnable() { - @Override - public void run() { - killTaskAttempt(event.getWorkerId(), event.getTaskAttemptId()); - } - }); + queryMasterContext.getEventExecutor().submit((Runnable) () -> killTaskAttempt(event.getWorkerId(), event.getTaskAttemptId())); } } @@ -475,17 +470,14 @@ private void cleanupQuery(final QueryId queryId) { LOG.info("Cleanup resources of all workers. Query: " + queryId + ", workers: " + workers.size()); for (final InetSocketAddress worker : workers) { - queryMasterContext.getEventExecutor().submit(new Runnable() { - @Override - public void run() { - try { - AsyncRpcClient rpc = RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true, - rpcParams); - TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); - tajoWorkerProtocolService.stopQuery(null, queryId.getProto(), NullCallback.get()); - } catch (Throwable e) { - LOG.error(e.getMessage(), e); - } + queryMasterContext.getEventExecutor().submit((Runnable) () -> { + try { + AsyncRpcClient rpc = RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true, + rpcParams); + TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); + tajoWorkerProtocolService.stopQuery(null, queryId.getProto(), NullCallback.get()); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); } }); } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index c055d119e4..be5445f16f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -734,18 +734,15 @@ protected Map getAssignedWorkerMap() { private void sendStopExecutionBlockEvent(final StopExecutionBlockRequest requestProto) { for (final InetSocketAddress worker : getAssignedWorkerMap().values()) { - getContext().getQueryMasterContext().getEventExecutor().submit(new Runnable() { - @Override - public void run() { - try { - AsyncRpcClient tajoWorkerRpc = - RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true, rpcParams); - TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); - tajoWorkerRpcClient.stopExecutionBlock(null, - requestProto, NullCallback.get(PrimitiveProtos.BoolProto.class)); - } catch (Throwable e) { - LOG.error(e.getMessage(), e); - } + getContext().getQueryMasterContext().getEventExecutor().submit((Runnable) () -> { + try { + AsyncRpcClient tajoWorkerRpc = + RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true, rpcParams); + TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); + tajoWorkerRpcClient.stopExecutionBlock(null, + requestProto, NullCallback.get(PrimitiveProtos.BoolProto.class)); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); } }); } @@ -859,42 +856,37 @@ public StageState transition(final Stage stage, StageEvent stageEvent) { // TODO: verify changed shuffle plan initTaskScheduler(stage); // execute pre-processing asyncronously - stage.getContext().getQueryMasterContext().getSingleEventExecutor() - .submit(new Runnable() { - @Override - public void run() { - try { - schedule(stage); - stage.totalScheduledObjectsCount = stage.getTaskScheduler().remainingScheduledObjectNum(); - LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled"); - - if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks - stage.eventHandler.handle( - new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); - } else { - if(stage.getSynchronizedState() == StageState.INITED) { - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_START)); - stage.taskScheduler.start(); - } else { - /* all tasks are killed before stage are inited */ - if (stage.getTotalScheduledObjectsCount() == stage.getCompletedTaskCount()) { - stage.eventHandler.handle( - new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); - } else { - stage.eventHandler.handle( - new StageEvent(stage.getId(), StageEventType.SQ_KILL)); - } - } - } - } catch (Throwable e) { - LOG.error("Stage (" + stage.getId() + ") ERROR: ", e); - stage.setFinishTime(); - stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage())); - stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR)); - } - } - } - ); + stage.getContext().getQueryMasterContext().getSingleEventExecutor().submit(() -> { + try { + schedule(stage); + stage.totalScheduledObjectsCount = stage.getTaskScheduler().remainingScheduledObjectNum(); + LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled"); + + if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks + stage.eventHandler.handle( + new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); + } else { + if(stage.getSynchronizedState() == StageState.INITED) { + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_START)); + stage.taskScheduler.start(); + } else { + /* all tasks are killed before stage are inited */ + if (stage.getTotalScheduledObjectsCount() == stage.getCompletedTaskCount()) { + stage.eventHandler.handle( + new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); + } else { + stage.eventHandler.handle( + new StageEvent(stage.getId(), StageEventType.SQ_KILL)); + } + } + } + } catch (Throwable e) { + LOG.error("Stage (" + stage.getId() + ") ERROR: ", e); + stage.setFinishTime(); + stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage())); + stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR)); + } + }); state = StageState.INITED; } } catch (Throwable e) { diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java index 671a365a4a..48e085605c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java @@ -87,15 +87,11 @@ public static List sortQueryMasterTask(Collection queryMasterTaskList = new ArrayList<>(queryMasterTasks); - Collections.sort(queryMasterTaskList, new Comparator() { - - @Override - public int compare(QueryMasterTask task1, QueryMasterTask task2) { - if(desc) { - return task2.getQueryId().toString().compareTo(task1.getQueryId().toString()); - } else { - return task1.getQueryId().toString().compareTo(task2.getQueryId().toString()); - } + Collections.sort(queryMasterTaskList, (task1, task2) -> { + if(desc) { + return task2.getQueryId().toString().compareTo(task1.getQueryId().toString()); + } else { + return task1.getQueryId().toString().compareTo(task2.getQueryId().toString()); } }); @@ -106,14 +102,11 @@ public static List sortQueryInProgress(Collection queryProgressList = new ArrayList<>(queryInProgresses); - Collections.sort(queryProgressList, new Comparator() { - @Override - public int compare(QueryInProgress query1, QueryInProgress query2) { - if(desc) { - return query2.getQueryId().toString().compareTo(query1.getQueryId().toString()); - } else { - return query1.getQueryId().toString().compareTo(query2.getQueryId().toString()); - } + Collections.sort(queryProgressList, (query1, query2) -> { + if(desc) { + return query2.getQueryId().toString().compareTo(query1.getQueryId().toString()); + } else { + return query1.getQueryId().toString().compareTo(query2.getQueryId().toString()); } }); @@ -122,21 +115,18 @@ public int compare(QueryInProgress query1, QueryInProgress query2) { public static List sortStages(Collection stages) { List stageList = new ArrayList<>(stages); - Collections.sort(stageList, new Comparator() { - @Override - public int compare(Stage stage1, Stage stage2) { - long q1StartTime = stage1.getStartTime(); - long q2StartTime = stage2.getStartTime(); - - q1StartTime = (q1StartTime == 0 ? Long.MAX_VALUE : q1StartTime); - q2StartTime = (q2StartTime == 0 ? Long.MAX_VALUE : q2StartTime); - - int result = compareLong(q1StartTime, q2StartTime); - if (result == 0) { - return stage1.getId().toString().compareTo(stage2.getId().toString()); - } else { - return result; - } + Collections.sort(stageList, (stage1, stage2) -> { + long q1StartTime = stage1.getStartTime(); + long q2StartTime = stage2.getStartTime(); + + q1StartTime = (q1StartTime == 0 ? Long.MAX_VALUE : q1StartTime); + q2StartTime = (q2StartTime == 0 ? Long.MAX_VALUE : q2StartTime); + + int result = compareLong(q1StartTime, q2StartTime); + if (result == 0) { + return stage1.getId().toString().compareTo(stage2.getId().toString()); + } else { + return result; } }); @@ -145,21 +135,18 @@ public int compare(Stage stage1, Stage stage2) { public static List sortStageHistories(Collection stages) { List stageList = new ArrayList<>(stages); - Collections.sort(stageList, new Comparator() { - @Override - public int compare(StageHistory stage1, StageHistory stage2) { - long q1StartTime = stage1.getStartTime(); - long q2StartTime = stage2.getStartTime(); - - q1StartTime = (q1StartTime == 0 ? Long.MAX_VALUE : q1StartTime); - q2StartTime = (q2StartTime == 0 ? Long.MAX_VALUE : q2StartTime); - - int result = compareLong(q1StartTime, q2StartTime); - if (result == 0) { - return stage1.getExecutionBlockId().compareTo(stage2.getExecutionBlockId()); - } else { - return result; - } + Collections.sort(stageList, (stage1, stage2) -> { + long q1StartTime = stage1.getStartTime(); + long q2StartTime = stage2.getStartTime(); + + q1StartTime = (q1StartTime == 0 ? Long.MAX_VALUE : q1StartTime); + q2StartTime = (q2StartTime == 0 ? Long.MAX_VALUE : q2StartTime); + + int result = compareLong(q1StartTime, q2StartTime); + if (result == 0) { + return stage1.getExecutionBlockId().compareTo(stage2.getExecutionBlockId()); + } else { + return result; } }); diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java index 10d17042ca..80c0e84781 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java @@ -31,33 +31,13 @@ public class LogEventGaugeSet implements MetricSet { public Map getMetrics() { final Map gauges = new HashMap<>(); - gauges.put("Fatal", new Gauge() { - @Override - public Long getValue() { - return TajoLogEventCounter.getFatal(); - } - }); + gauges.put("Fatal", (Gauge) () -> TajoLogEventCounter.getFatal()); - gauges.put("Error", new Gauge() { - @Override - public Long getValue() { - return TajoLogEventCounter.getError(); - } - }); + gauges.put("Error", (Gauge) () -> TajoLogEventCounter.getError()); - gauges.put("Warn", new Gauge() { - @Override - public Long getValue() { - return TajoLogEventCounter.getWarn(); - } - }); + gauges.put("Warn", (Gauge) () -> TajoLogEventCounter.getWarn()); - gauges.put("Info", new Gauge() { - @Override - public Long getValue() { - return TajoLogEventCounter.getInfo(); - } - }); + gauges.put("Info", (Gauge) () -> TajoLogEventCounter.getInfo()); return gauges; } diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java index 7e3d9bca6e..9594c7c07a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java @@ -136,14 +136,11 @@ public void start() { */ public void start(long period, TimeUnit unit) { this.period = period; - executor.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - report(); - } catch (Exception e) { - LOG.warn("Metric report error:" + e.getMessage(), e); - } + executor.scheduleAtFixedRate((Runnable) () -> { + try { + report(); + } catch (Exception e) { + LOG.warn("Metric report error:" + e.getMessage(), e); } }, period, period, unit); } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java index aaf64cd4a7..06d1ecd859 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java @@ -374,12 +374,7 @@ private void insertDistinctOperator(PlanContext context, ProjectionNode projecti List windowSpecReferencesList = new ArrayList<>(); - List targetsIds = normalize(context, projection, normalizedExprList, new Matcher() { - @Override - public boolean isMatch(Expr expr) { - return ExprFinder.finds(expr, OpType.WindowFunction).size() == 0; - } - }); + List targetsIds = normalize(context, projection, normalizedExprList, expr -> ExprFinder.finds(expr, OpType.WindowFunction).size() == 0); // Note: Why separate normalization and add(Named)Expr? // @@ -388,12 +383,7 @@ public boolean isMatch(Expr expr) { // the same logical node. It will cause impossible evaluation in physical executors. addNamedExprs(block, referenceNames, normalizedExprList, windowSpecReferencesList, projection, targetsIds); - targetsIds = normalize(context, projection, normalizedExprList, new Matcher() { - @Override - public boolean isMatch(Expr expr) { - return ExprFinder.finds(expr, OpType.WindowFunction).size() > 0; - } - }); + targetsIds = normalize(context, projection, normalizedExprList, expr -> ExprFinder.finds(expr, OpType.WindowFunction).size() > 0); addNamedExprs(block, referenceNames, normalizedExprList, windowSpecReferencesList, projection, targetsIds); return new Pair<>(referenceNames, diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizer.java index 4d742c7186..2d027da373 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizer.java @@ -56,13 +56,10 @@ public class EvalTreeOptimizer { rules.add(rule); } - Collections.sort(rules, new Comparator() { - @Override - public int compare(EvalTreeOptimizationRule o1, EvalTreeOptimizationRule o2) { - int priority1 = o1.getClass().getAnnotation(Prioritized.class).priority(); - int priority2 = o2.getClass().getAnnotation(Prioritized.class).priority(); - return priority1 - priority2; - } + Collections.sort(rules, (o1, o2) -> { + int priority1 = o1.getClass().getAnnotation(Prioritized.class).priority(); + int priority2 = o2.getClass().getAnnotation(Prioritized.class).priority(); + return priority1 - priority2; }); } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java index a49c42389f..9243e98239 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java @@ -407,13 +407,8 @@ private Schema buildSchemaFromColumnSet(Set columns) throws TajoExceptio Set simpleColumns = new HashSet<>(); List columnList = new ArrayList<>(columns); - Collections.sort(columnList, new Comparator() { - @Override - public int compare(Column c1, Column c2) { - return c2.getSimpleName().split(NestedPathUtil.PATH_DELIMITER).length - - c1.getSimpleName().split(NestedPathUtil.PATH_DELIMITER).length; - } - }); + Collections.sort(columnList, (c1, c2) -> c2.getSimpleName().split(NestedPathUtil.PATH_DELIMITER).length - + c1.getSimpleName().split(NestedPathUtil.PATH_DELIMITER).length); for (Column eachColumn : columnList) { String simpleName = eachColumn.getSimpleName(); diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java index 6b38feb450..5e1ae7b4e6 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java @@ -62,12 +62,7 @@ public static EvalNode deserialize(OverridableConf context, EvalContext evalCont // sort serialized eval nodes in an ascending order of their IDs. List nodeList = Lists.newArrayList(tree.getNodesList()); - Collections.sort(nodeList, new Comparator() { - @Override - public int compare(PlanProto.EvalNode o1, PlanProto.EvalNode o2) { - return o1.getId() - o2.getId(); - } - }); + Collections.sort(nodeList, (o1, o2) -> o1.getId() - o2.getId()); EvalNode current = null; diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java index 4b47e4ac87..03d3b6c515 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java @@ -58,12 +58,7 @@ public static LogicalNode deserialize(OverridableConf context, @Nullable EvalCon // sort serialized logical nodes in an ascending order of their sids List nodeList = Lists.newArrayList(tree.getNodesList()); - Collections.sort(nodeList, new Comparator() { - @Override - public int compare(PlanProto.LogicalNode o1, PlanProto.LogicalNode o2) { - return o1.getVisitSeq() - o2.getVisitSeq(); - } - }); + Collections.sort(nodeList, (o1, o2) -> o1.getVisitSeq() - o2.getVisitSeq()); LogicalNode current = null; diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java index 15cfdf4cea..a29da18ab3 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java @@ -151,11 +151,9 @@ private ChannelFuture sendFile(ChannelHandlerContext ctx, file.startOffset(), file.length()); writeFuture = ctx.write(region); lastContentFuture = ctx.write(LastHttpContent.EMPTY_LAST_CONTENT); - writeFuture.addListener(new ChannelFutureListener() { - public void operationComplete(ChannelFuture future) { - if (region.refCnt() > 0) { - region.release(); - } + writeFuture.addListener(future -> { + if (region.refCnt() > 0) { + region.release(); } }); } diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java index 88410bb2cc..8f83c3bd94 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java @@ -446,18 +446,8 @@ public static Iterable getAllTablespaces() { } public static Collection getMetadataProviders() { - Collection filteredSpace = Collections2.filter(TABLE_SPACES.values(), new Predicate() { - @Override - public boolean apply(@Nullable Tablespace space) { - return space.getProperty().isMetadataProvided(); - } - }); + Collection filteredSpace = Collections2.filter(TABLE_SPACES.values(), space -> space.getProperty().isMetadataProvided()); - return Collections2.transform(filteredSpace, new Function() { - @Override - public MetadataProvider apply(@Nullable Tablespace space) { - return space.getMetadataProvider(); - } - }); + return Collections2.transform(filteredSpace, space -> space.getMetadataProvider()); } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 17c413ebfa..1a28ffdf54 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -56,11 +56,9 @@ public class FileTablespace extends Tablespace { - public static final PathFilter hiddenFileFilter = new PathFilter() { - public boolean accept(Path p) { - String name = p.getName(); - return !name.startsWith("_") && !name.startsWith("."); - } + public static final PathFilter hiddenFileFilter = p -> { + String name1 = p.getName(); + return !name1.startsWith("_") && !name1.startsWith("."); }; private static final Log LOG = LogFactory.getLog(FileTablespace.class); @@ -424,13 +422,7 @@ protected FileFragment makeNonSplit(String fragmentId, Path file, long start, lo } List> entries = new ArrayList<>(hostsBlockMap.entrySet()); - Collections.sort(entries, new Comparator>() { - - @Override - public int compare(Map.Entry v1, Map.Entry v2) { - return v1.getValue().compareTo(v2.getValue()); - } - }); + Collections.sort(entries, (v1, v2) -> v1.getValue().compareTo(v2.getValue())); String[] hosts = new String[blkLocations[0].getHosts().length]; diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java index 79203a2dc4..5b07cd8b6b 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java @@ -206,16 +206,13 @@ public Future writePartitions(TableMeta meta, Schema schema, fin HashShuffleAppenderWrapper appender = getAppender(rowBlock, taskId.getTaskId().getExecutionBlockId(), partId, meta, schema); ExecutorService executor = executors.get(appender.getVolumeId()); - return executor.submit(new Callable() { - @Override - public MemoryRowBlock call() throws Exception { - appender.writeRowBlock(taskId, rowBlock); + return executor.submit(() -> { + appender.writeRowBlock(taskId, rowBlock); - if (release) rowBlock.release(); - else rowBlock.clear(); + if (release) rowBlock.release(); + else rowBlock.clear(); - return rowBlock; - } + return rowBlock; }); } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java index 5e200a0b2f..9cd7b505ee 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java @@ -249,13 +249,6 @@ public void write(DataOutput out) throws IOException { static { WritableFactories.setFactory(BytesRefArrayWritable.class, - new WritableFactory() { - - @Override - public Writable newInstance() { - return new BytesRefArrayWritable(); - } - - }); + () -> new BytesRefArrayWritable()); } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java index 158c740784..ac63501f7f 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java @@ -228,14 +228,7 @@ public boolean equals(Object right_obj) { } static { - WritableFactories.setFactory(BytesRefWritable.class, new WritableFactory() { - - @Override - public Writable newInstance() { - return new BytesRefWritable(); - } - - }); + WritableFactories.setFactory(BytesRefWritable.class, () -> new BytesRefWritable()); } public int getLength() { diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java index 5dedc2eae8..d0e8c6f7d4 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java @@ -138,13 +138,7 @@ public WriterImpl(FileSystem fs, this.callback = opts.getCallback(); this.schema = opts.getSchema(); if (callback != null) { - callbackContext = new OrcFile.WriterContext(){ - - @Override - public Writer getWriter() { - return WriterImpl.this; - } - }; + callbackContext = () -> WriterImpl.this; } else { callbackContext = null; } diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java index fb095a40bf..bff96c84b1 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java @@ -204,20 +204,10 @@ public TableDesc getTableDesc(String schemaName, String tableName) throws Undefi } // sort columns in an order of ordinal position - Collections.sort(columns, new Comparator>() { - @Override - public int compare(Pair o1, Pair o2) { - return o1.getFirst() - o2.getFirst(); - } - }); + Collections.sort(columns, (o1, o2) -> o1.getFirst() - o2.getFirst()); // transform the pair list into collection for columns - final Schema schema = SchemaBuilder.builder().addAll(Collections2.transform(columns, new Function, Column>() { - @Override - public Column apply(@Nullable Pair columnPair) { - return columnPair.getSecond(); - } - })).build(); + final Schema schema = SchemaBuilder.builder().addAll(Collections2.transform(columns, Pair::getSecond)).build(); // fill the table stats diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java index 14d5eb8d33..d7872d36cf 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java @@ -53,11 +53,8 @@ public String build(String tableName, Column [] targets, @Nullable EvalNode filt StringBuilder selectClause = new StringBuilder("SELECT "); if (targets.length > 0) { - selectClause.append(StringUtils.join(targets, ",", new Function() { - @Override - public String apply(@Nullable Column input) { - return input.getSimpleName(); - } + selectClause.append(StringUtils.join(targets, ",", input -> { + return input.getSimpleName(); })); } else { selectClause.append("1"); @@ -174,15 +171,12 @@ public void visitScan(SQLBuilderContext ctx, ScanNode scan, Stack s } public String generateTargetList(List targets) { - return StringUtils.join(targets, ",", new Function() { - @Override - public String apply(@Nullable Target t) { - StringBuilder sb = new StringBuilder(sqlExprGen.generate(t.getEvalTree())); - if (t.hasAlias()) { - sb.append(" AS ").append(t.getAlias()); - } - return sb.toString(); + return StringUtils.join(targets, ",", t -> { + StringBuilder sb = new StringBuilder(sqlExprGen.generate(t.getEvalTree())); + if (t.hasAlias()) { + sb.append(" AS ").append(t.getAlias()); } + return sb.toString(); }); } } diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java index da97e32e92..c681be1d37 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java @@ -128,11 +128,8 @@ protected EvalNode visitConst(Context context, ConstEval constant, Stack stack) { StringBuilder sb = new StringBuilder("("); - sb.append(StringUtils.join(row.getValues(), ",", new Function() { - @Override - public String apply(Datum v) { - return convertDatumToSQLLiteral(v); - } + sb.append(StringUtils.join(row.getValues(), ",", v -> { + return convertDatumToSQLLiteral(v); })); sb.append(")"); context.append(sb.toString()); diff --git a/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/PgSQLTestServer.java b/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/PgSQLTestServer.java index 6de1573d4e..40f1846fc1 100644 --- a/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/PgSQLTestServer.java +++ b/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/PgSQLTestServer.java @@ -73,14 +73,11 @@ private PgSQLTestServer() throws Exception { "tpch" ); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - try { - server.close(); - } catch (IOException e) { - e.printStackTrace(); - } + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + server.close(); + } catch (IOException e) { + e.printStackTrace(); } }));