Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
<version>2.0.9</version>
<version>2.33</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -762,7 +762,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<hyracks.fullstack.version>0.3.1</hyracks.fullstack.version>
<hyracks.version>0.3.0</hyracks.version>
<hyracks.version>0.3.3</hyracks.version>
<lucene.version>5.5.1</lucene.version>
<hadoop.version>2.7.0</hadoop.version>
<apache-rat-plugin.version>0.11</apache-rat-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
package org.apache.vxquery.compiler.algebricks;

import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;

import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;

public class VXQueryComparatorFactoryProvider implements IBinaryComparatorFactoryProvider {
@Override
Expand All @@ -30,6 +30,12 @@ public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean
return new BinaryComparatorFactory(type, ascending);
}

@Override
public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending, boolean ignoreCase)
throws AlgebricksException {
throw new NotImplementedException();
}

private static class BinaryComparatorFactory implements IBinaryComparatorFactory {
private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
package org.apache.vxquery.compiler.rewriter.rules;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.mutable.Mutable;
import org.apache.vxquery.functions.BuiltinFunctions;
import org.apache.vxquery.functions.BuiltinOperators;

import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
Expand All @@ -37,6 +35,8 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import org.apache.vxquery.functions.BuiltinFunctions;
import org.apache.vxquery.functions.BuiltinOperators;

/**
* The rule searches for aggregate operators with an aggregate function
Expand Down Expand Up @@ -91,28 +91,37 @@ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext
if (op.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
return false;
}
AggregateOperator aggregate = (AggregateOperator) op;
if (aggregate.getExpressions().size() == 0) {
final AggregateOperator aggregate = (AggregateOperator) op;
AggregateFunctionCallExpression aggregateFunctionCall = getAggregateFunctionCall(aggregate);
if (aggregateFunctionCall == null || aggregateFunctionCall.isTwoStep()) {
return false;
}
Mutable<ILogicalExpression> mutableLogicalExpression = aggregate.getExpressions().get(0);
ILogicalExpression logicalExpression = mutableLogicalExpression.getValue();
// Replace single step aggregate function with two step function call
final IFunctionInfo functionInfo = aggregateFunctionCall.getFunctionInfo();
final List<Mutable<ILogicalExpression>> arguments = aggregateFunctionCall.getArguments();
AggregateFunctionCallExpression twoStepCall =
new AggregateFunctionCallExpression(functionInfo, true, arguments);
final Pair<IFunctionInfo, IFunctionInfo> functionInfoPair =
AGGREGATE_MAP.get(aggregateFunctionCall.getFunctionIdentifier());
twoStepCall.setStepOneAggregate(functionInfoPair.first);
twoStepCall.setStepTwoAggregate(functionInfoPair.second);
aggregate.getExpressions().get(0).setValue(twoStepCall);
return true;
}

private AggregateFunctionCallExpression getAggregateFunctionCall(AggregateOperator aggregate) {
if (aggregate.getExpressions().size() == 0) {
return null;
}
ILogicalExpression logicalExpression = aggregate.getExpressions().get(0).getValue();
if (logicalExpression.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
return false;
return null;
}
AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) logicalExpression;

if (AGGREGATE_MAP.containsKey(functionCall.getFunctionIdentifier())) {
AggregateFunctionCallExpression aggregateFunctionCall = (AggregateFunctionCallExpression) functionCall;
if (aggregateFunctionCall.isTwoStep()) {
return false;
}
aggregateFunctionCall.setTwoStep(true);
aggregateFunctionCall.setStepOneAggregate(AGGREGATE_MAP.get(functionCall.getFunctionIdentifier()).first);
aggregateFunctionCall.setStepTwoAggregate(AGGREGATE_MAP.get(functionCall.getFunctionIdentifier()).second);
return true;
return (AggregateFunctionCallExpression) functionCall;
}
return false;
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -92,7 +93,7 @@ public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, Abs
totalDataSources = (short) ds.getTotalDataSources();
childSeq = ds.getChildSeq();
valueSeq = ds.getValueSeq();
recordDescriptors[0] = rDesc;
outRecDescs[0] = rDesc;
this.tag = ds.getTag();
this.hdfsConf = hdfsConf;
this.nodeControllerInfos = nodeControllerInfos;
Expand All @@ -108,7 +109,7 @@ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
final IFrameFieldAppender appender = new FrameFixedFieldTupleAppender(fieldOutputCount);
final short partitionId = (short) ctx.getTaskAttemptId().getTaskId().getPartition();
final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider(partitionId, dataSourceId, totalDataSources);
final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
final String nodeId = ctx.getJobletContext().getServiceContext().getNodeId();
final DynamicContext dCtx = (DynamicContext) ctx.getJobletContext().getGlobalJobData();
final ArrayBackedValueStorage jsonAbvs = new ArrayBackedValueStorage();
final String collectionName = collectionPartitions[partition % collectionPartitions.length];
Expand Down Expand Up @@ -157,7 +158,7 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
try {
hdfs.scheduleSplits();
ArrayList<Integer> schedule = hdfs
.getScheduleForNode(InetAddress.getLocalHost().getHostAddress());
.getScheduleForNode(Inet4Address.getLoopbackAddress().getHostAddress());
List<InputSplit> splits = hdfs.getSplits();
List<FileSplit> fileSplits = new ArrayList<>();
for (int i : schedule) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public VXQueryIndexingOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQue
collectionPartitions = ds.getPartitions();
dataSourceId = (short) ds.getDataSourceId();
totalDataSources = (short) ds.getTotalDataSources();
recordDescriptors[0] = rDesc;
outRecDescs[0] = rDesc;
childSeq = ds.getChildSeq();
indexChildSeq = ds.getIndexChildSeq();
indexAttsSeq = ds.getIndexAttsSeq();
Expand All @@ -86,11 +86,11 @@ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
final IFrameFieldAppender appender = new FrameFixedFieldTupleAppender(fieldOutputCount);
final short partitionId = (short) ctx.getTaskAttemptId().getTaskId().getPartition();
final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider(partitionId, dataSourceId, totalDataSources);
final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
final String nodeId = ctx.getJobletContext().getServiceContext().getNodeId();
final String collectionName = collectionPartitions[partition % collectionPartitions.length];
final String collectionModifiedName = collectionName.replace("${nodeId}", nodeId);
IndexCentralizerUtil indexCentralizerUtil = new IndexCentralizerUtil(
ctx.getIOManager().getIODevices().get(0).getMount());
ctx.getIoManager().getIODevices().get(0).getMount());
indexCentralizerUtil.readIndexDirectory();
final IPointable result = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected void evaluate(TaggedValuePointable[] args, IPointable result) throws S
abvs.reset();
sb.reset(abvs);
IndexCentralizerUtil indexCentralizerUtil = new IndexCentralizerUtil(
ctx.getIOManager().getIODevices().get(0).getMount());
ctx.getIoManager().getIODevices().get(0).getMount());
indexCentralizerUtil.readIndexDirectory();
indexCentralizerUtil.getAllCollections(sb);
sb.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected IScalarEvaluator createEvaluator(IHyracksTaskContext ctx, IScalarEvalu
final DataInputStream di = new DataInputStream(bbis);
final int partition = ctx.getTaskAttemptId().getTaskId().getPartition();
final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partition);
final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
final String nodeId = ctx.getJobletContext().getServiceContext().getNodeId();

return new AbstractTaggedValueArgumentScalarEvaluator(args) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected IScalarEvaluator createEvaluator(IHyracksTaskContext ctx, IScalarEvalu
final DataInputStream di = new DataInputStream(bbis);
final int partition = ctx.getTaskAttemptId().getTaskId().getPartition();
final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partition);
final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
final String nodeId = ctx.getJobletContext().getServiceContext().getNodeId();

return new AbstractTaggedValueArgumentScalarEvaluator(args) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
import org.apache.hyracks.api.application.ICCApplication;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.vxquery.exceptions.VXQueryRuntimeException;
import org.apache.vxquery.rest.RestServer;
import org.apache.vxquery.rest.service.VXQueryConfig;
Expand All @@ -44,15 +48,20 @@
*
* @author Erandi Ganepola
*/
public class VXQueryApplication implements ICCApplicationEntryPoint {
public class VXQueryApplication implements ICCApplication {

private static final Logger LOGGER = Logger.getLogger(VXQueryApplication.class.getName());

private VXQueryService vxQueryService;
private RestServer restServer;
private ICCServiceContext ccAppCtx;

public void init(IServiceContext serviceCtx) throws Exception {
ccAppCtx = (ICCServiceContext)serviceCtx;
}

@Override
public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
public void start(String[] args) throws Exception {
AppArgs appArgs = new AppArgs();
if (args != null) {
CmdLineParser parser = new CmdLineParser(appArgs);
Expand Down Expand Up @@ -98,6 +107,22 @@ public void startupCompleted() throws Exception {
}
}


@Override
public Object getApplicationContext() {
return ccAppCtx;
}

@Override
public void registerConfig(IConfigManager configManager) {
throw new UnsupportedOperationException();
}

@Override
public IJobCapacityController getJobCapacityController() {
return DefaultJobCapacityController.INSTANCE;
}

/**
* Loads properties from
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import static org.apache.vxquery.rest.Constants.Properties.MAXIMUM_DATA_SIZE;

import java.io.IOException;
import java.net.InetAddress;
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.util.Arrays;
Expand Down Expand Up @@ -59,8 +59,6 @@ public class LocalClusterUtil {

private ClusterControllerService clusterControllerService;
private NodeControllerService nodeControllerSerivce;
private IHyracksClientConnection hcc;
private IHyracksDataset hds;
private VXQueryService vxQueryService;

public void init(VXQueryConfig config) throws Exception {
Expand All @@ -77,55 +75,45 @@ public void init(VXQueryConfig config) throws Exception {
clusterControllerService = new ClusterControllerService(ccConfig);
clusterControllerService.start();

hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
hds = new HyracksDataset(hcc, config.getFrameSize(), config.getAvailableProcessors());

// Node controller
NCConfig ncConfig = createNCConfig();
nodeControllerSerivce = new NodeControllerService(ncConfig);
nodeControllerSerivce.start();

hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);

// REST controller
config.setHyracksClientIp(ccConfig.clientNetIpAddress);
config.setHyracksClientPort(ccConfig.clientNetPort);
config.setHyracksClientIp(ccConfig.getClientListenAddress());
config.setHyracksClientPort(ccConfig.getClientListenPort());
vxQueryService = new VXQueryService(config);
vxQueryService.start();
}

protected CCConfig createCCConfig() throws IOException {
String localAddress = getIpAddress();
CCConfig ccConfig = new CCConfig();
ccConfig.clientNetIpAddress = localAddress;
ccConfig.clientNetPort = DEFAULT_HYRACKS_CC_CLIENT_PORT;
ccConfig.clusterNetIpAddress = localAddress;
ccConfig.clusterNetPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
ccConfig.httpPort = DEFAULT_HYRACKS_CC_HTTP_PORT;
ccConfig.profileDumpPeriod = 10000;
ccConfig.appCCMainClass = VXQueryApplication.class.getName();
ccConfig.appArgs = Arrays.asList("-restPort", String.valueOf(DEFAULT_VXQUERY_REST_PORT));

ccConfig.setClientListenAddress(localAddress);
ccConfig.setClientListenPort(DEFAULT_HYRACKS_CC_CLIENT_PORT);
ccConfig.setClusterListenAddress(localAddress);
ccConfig.setClusterListenPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT);
ccConfig.setConsoleListenPort(DEFAULT_HYRACKS_CC_HTTP_PORT);
ccConfig.setProfileDumpPeriod(10000);
ccConfig.setAppClass(VXQueryApplication.class.getName());
ccConfig.getAppArgs().addAll(Arrays.asList("-restPort", String.valueOf(DEFAULT_VXQUERY_REST_PORT)));
return ccConfig;
}

protected NCConfig createNCConfig() throws IOException {
String localAddress = getIpAddress();
NCConfig ncConfig = new NCConfig();
ncConfig.ccHost = "localhost";
ncConfig.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
ncConfig.clusterNetIPAddress = localAddress;
ncConfig.dataIPAddress = localAddress;
ncConfig.resultIPAddress = localAddress;
ncConfig.nodeId = "test_node";
ncConfig.ioDevices = Files.createTempDirectory(ncConfig.nodeId).toString();
String nodeId = "test_node";
NCConfig ncConfig = new NCConfig(nodeId);
ncConfig.setClusterAddress("localhost");
ncConfig.setClusterPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT);
ncConfig.setClusterListenAddress(localAddress);
ncConfig.setDataListenAddress(localAddress);
ncConfig.setResultListenAddress(localAddress);
ncConfig.setIODevices(new String[] { Files.createTempDirectory(nodeId).toString() });
ncConfig.setVirtualNC();
return ncConfig;
}

public IHyracksClientConnection getHyracksClientConnection() {
return hcc;
}

public VXQueryService getVxQueryService() {
return vxQueryService;
}
Expand Down Expand Up @@ -166,21 +154,10 @@ public void run() {
}

public String getIpAddress() throws UnknownHostException {
return InetAddress.getLocalHost().getHostAddress();
return Inet4Address.getLoopbackAddress().getHostAddress();
}

public int getRestPort() {
return DEFAULT_VXQUERY_REST_PORT;
}

@Deprecated
public IHyracksClientConnection getConnection() {
return hcc;
}

@Deprecated
public IHyracksDataset getDataset() {
return hds;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void setShowMetrics(boolean showMetrics) {
}

public String toString() {
return String.format("{ statement : %s }", statement);
return String.format("{ statement : \"%s\" }", statement);
}

public String getRequestId() {
Expand Down
Loading