From 84ce2195b2e3417ad438b67bcc6f8db49f27891e Mon Sep 17 00:00:00 2001
From: "alan578.zhao" <956322745@qq.com>
Date: Fri, 11 Oct 2024 19:55:50 +0800
Subject: [PATCH 01/12] =?UTF-8?q?hstore=20bulkload=20=E4=BB=A3=E7=A0=81?=
=?UTF-8?q?=E5=87=86=E5=A4=87?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
hugegraph-client/pom.xml | 5 +
.../AbstractGraphElementSerializer.java | 20 ++
.../serializer/GraphElementSerializer.java | 11 +
.../serializer/SerializerFactory.java | 37 +++
.../serializer/config/SerializerConfig.java | 45 ++++
.../serializer/direct/HBaseSerializer.java | 21 +-
.../serializer/direct/HStoreSerializer.java | 230 +++++++++++++++++
.../serializer/direct/reuse/BytesDemo.java | 4 +-
.../serializer/direct/util/Bits.java | 65 +++++
.../apache/hugegraph/util/PartitionUtils.java | 47 ++++
.../assembly/static/bin/get-params.sh | 3 +-
.../loader/direct/loader/DirectLoader.java | 13 +
.../direct/loader/HBaseDirectLoader.java | 15 +-
.../direct/loader/HStoreDirectLoader.java | 181 ++++++++++++++
.../direct/loaders/AbstractDirectLoader.java | 131 ++++++++++
.../loader/direct/loaders/DirectLoader.java | 29 +++
.../direct/loaders/HBaseDirectLoader.java | 228 +++++++++++++++++
.../direct/loaders/HStoreDirectLoader.java | 232 ++++++++++++++++++
.../outputformat/SSTFileOutputFormat.java | 79 ++++++
.../direct/partitioner/HstorePartitioner.java | 35 +++
.../loader/executor/LoadContext.java | 40 +++
.../loader/executor/LoadOptions.java | 13 +
.../metrics/DistributedLoadMetrics.java | 227 +++++++++++++++++
.../hugegraph/loader/metrics/LoadReport.java | 22 ++
.../hugegraph/loader/metrics/LoadSummary.java | 18 ++
.../loader/spark/HugeGraphSparkLoader.java | 194 ++++++++++++---
pom.xml | 6 +
27 files changed, 1899 insertions(+), 52 deletions(-)
create mode 100644 hugegraph-client/src/main/java/org/apache/hugegraph/serializer/AbstractGraphElementSerializer.java
create mode 100644 hugegraph-client/src/main/java/org/apache/hugegraph/serializer/GraphElementSerializer.java
create mode 100644 hugegraph-client/src/main/java/org/apache/hugegraph/serializer/SerializerFactory.java
create mode 100644 hugegraph-client/src/main/java/org/apache/hugegraph/serializer/config/SerializerConfig.java
create mode 100644 hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java
create mode 100644 hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/util/Bits.java
create mode 100644 hugegraph-client/src/main/java/org/apache/hugegraph/util/PartitionUtils.java
create mode 100644 hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HStoreDirectLoader.java
create mode 100644 hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/AbstractDirectLoader.java
create mode 100644 hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/DirectLoader.java
create mode 100644 hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/HBaseDirectLoader.java
create mode 100644 hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/HStoreDirectLoader.java
create mode 100644 hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/outputformat/SSTFileOutputFormat.java
create mode 100644 hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/partitioner/HstorePartitioner.java
create mode 100644 hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/DistributedLoadMetrics.java
diff --git a/hugegraph-client/pom.xml b/hugegraph-client/pom.xml
index 08b8d3ebe..ee26f2372 100644
--- a/hugegraph-client/pom.xml
+++ b/hugegraph-client/pom.xml
@@ -56,6 +56,11 @@
${lombok.version}
true
+
+ org.scala-lang
+ scala-library
+ provided
+
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/AbstractGraphElementSerializer.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/AbstractGraphElementSerializer.java
new file mode 100644
index 000000000..a5fed4bc7
--- /dev/null
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/AbstractGraphElementSerializer.java
@@ -0,0 +1,20 @@
+package org.apache.hugegraph.serializer;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.serializer.direct.util.GraphSchema;
+
+public abstract class AbstractGraphElementSerializer implements GraphElementSerializer {
+ protected HugeClient client;
+ protected GraphSchema graphSchema;
+
+ public AbstractGraphElementSerializer(HugeClient client) {
+ this.client = client;
+ this.graphSchema = new GraphSchema(client);
+ }
+
+ public void close() {
+ this.client.close();
+ }
+
+}
+
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/GraphElementSerializer.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/GraphElementSerializer.java
new file mode 100644
index 000000000..7031efc68
--- /dev/null
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/GraphElementSerializer.java
@@ -0,0 +1,11 @@
+package org.apache.hugegraph.serializer;
+
+import org.apache.hugegraph.structure.GraphElement;
+import scala.Tuple2;
+
+public interface GraphElementSerializer {
+
+ Tuple2 getKeyBytes(GraphElement e);
+ byte[] getValueBytes(GraphElement e);
+
+}
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/SerializerFactory.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/SerializerFactory.java
new file mode 100644
index 000000000..85695c7f8
--- /dev/null
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/SerializerFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.serializer;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.serializer.config.SerializerConfig;
+import org.apache.hugegraph.serializer.direct.HBaseSerializer;
+import org.apache.hugegraph.serializer.direct.HStoreSerializer;
+
+public class SerializerFactory {
+
+ public static GraphElementSerializer getSerializer(HugeClient client, SerializerConfig config) {
+ switch (config.getBackendStoreType()) {
+ case "hstore":
+ return new HStoreSerializer(client, config.getVertexPartitions(),config.getGraphName(),config.getPdAddress(),config.getPdRestPort());
+ case "hbase":
+ return new HBaseSerializer(client, config.getVertexPartitions(),config.getEdgePartitions());
+ default:
+ throw new IllegalArgumentException("Unsupported serializer backend type: " + config.getBackendStoreType());
+ }
+ }
+}
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/config/SerializerConfig.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/config/SerializerConfig.java
new file mode 100644
index 000000000..21c70ab41
--- /dev/null
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/config/SerializerConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.serializer.config;
+
+
+import lombok.Data;
+
+@Data
+public class SerializerConfig {
+ private int vertexPartitions;
+ private int edgePartitions;
+ private String pdAddress;
+ private String pdRestPort;
+ private String graphName;
+
+
+ private String backendStoreType;
+
+ public SerializerConfig(int vertexPartitions, int edgePartitions, String pdAddress, String pdRestPort, String graphName) {
+ this.vertexPartitions = vertexPartitions;
+ this.edgePartitions = edgePartitions;
+ this.pdAddress = pdAddress;
+ this.pdRestPort = pdRestPort;
+ this.graphName = graphName;
+ }
+
+ public SerializerConfig() {
+ }
+
+}
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HBaseSerializer.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HBaseSerializer.java
index 18cb87af0..2c94c77e9 100644
--- a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HBaseSerializer.java
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HBaseSerializer.java
@@ -21,6 +21,7 @@
import java.util.Map;
import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.serializer.AbstractGraphElementSerializer;
import org.apache.hugegraph.serializer.direct.struct.HugeType;
import org.apache.hugegraph.serializer.direct.util.BytesBuffer;
import org.apache.hugegraph.serializer.direct.util.GraphSchema;
@@ -29,43 +30,43 @@
import org.apache.hugegraph.structure.GraphElement;
import org.apache.hugegraph.structure.graph.Edge;
import org.apache.hugegraph.structure.schema.PropertyKey;
+import scala.Tuple2;
/**
* TODO: review later
*/
-public class HBaseSerializer {
+public class HBaseSerializer extends AbstractGraphElementSerializer {
private int edgeLogicPartitions;
private int vertexLogicPartitions;
- private HugeClient client;
- private GraphSchema graphSchema;
+
public HBaseSerializer(HugeClient client, int vertexPartitions, int edgePartitions) {
- this.client = client;
- this.graphSchema = new GraphSchema(client);
+ super(client);
this.edgeLogicPartitions = edgePartitions;
this.vertexLogicPartitions = vertexPartitions;
}
- public byte[] getKeyBytes(GraphElement e) {
+ @Override
+ public Tuple2 getKeyBytes(GraphElement e) {
byte[] array = null;
if (e.type() == "vertex" && e.id() != null) {
BytesBuffer buffer = BytesBuffer.allocate(2 + 1 + e.id().toString().length());
- buffer.writeShort(getPartition(HugeType.VERTEX, IdGenerator.of(e.id())));
+ buffer.writeShort(getPartition(HugeType.VERTEX, IdGenerator.of(e.id())));
buffer.writeId(IdGenerator.of(e.id()));
array = buffer.bytes();
} else if (e.type() == "edge") {
BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
- Edge edge = (Edge)e;
+ Edge edge = (Edge) e;
buffer.writeShort(getPartition(HugeType.EDGE, IdGenerator.of(edge.sourceId())));
buffer.writeId(IdGenerator.of(edge.sourceId()));
buffer.write(HugeType.EDGE_OUT.code());
- buffer.writeId(IdGenerator.of(graphSchema.getEdgeLabel(e.label()).id())); //出现错误
+ buffer.writeId(IdGenerator.of(graphSchema.getEdgeLabel(e.label()).id()));
buffer.writeStringWithEnding("");
buffer.writeId(IdGenerator.of(edge.targetId()));
array = buffer.bytes();
}
- return array;
+ return new Tuple2<>(array, 0);
}
public byte[] getValueBytes(GraphElement e) {
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java
new file mode 100644
index 000000000..7e03869b1
--- /dev/null
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.serializer.direct;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Range;
+import com.google.common.collect.TreeRangeMap;
+import org.apache.hugegraph.client.RestClient;
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.rest.RestClientConfig;
+import org.apache.hugegraph.rest.RestResult;
+import org.apache.hugegraph.serializer.AbstractGraphElementSerializer;
+import org.apache.hugegraph.serializer.direct.struct.HugeType;
+
+import org.apache.hugegraph.serializer.direct.util.*;
+import org.apache.hugegraph.structure.GraphElement;
+import org.apache.hugegraph.structure.graph.Edge;
+import org.apache.hugegraph.structure.schema.PropertyKey;
+import org.apache.hugegraph.util.PartitionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 序列化器,需要明确的是点边的序列化中是不带有分区信息的?
+ */
+public class HStoreSerializer extends AbstractGraphElementSerializer {
+
+
+
+
+ private static final Logger log = LoggerFactory.getLogger(HStoreSerializer.class);
+ // 准备好treeRangeMap
+
+ // 准备好partGraphIdMap
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+
+ private Map partGraphIdMap;
+ private TreeRangeMap rangeMap;
+
+ public HStoreSerializer(HugeClient client, int numPartitions,String graphName,String pdAddress,String pdRestPort) {
+ super(client);
+ rangeMap = TreeRangeMap.create();
+ int partitionSize = PartitionUtils.MAX_VALUE / numPartitions;
+ if (PartitionUtils.MAX_VALUE % numPartitions != 0) {
+ // 有余数,分区除不尽
+ partitionSize++;
+ }
+
+ for (int i = 0; i < numPartitions; i++) {
+ long startKey = (long) partitionSize * i;
+ long endKey = (long) partitionSize * (i + 1);
+ rangeMap.put(Range.closedOpen(startKey, endKey), i);
+ }
+ log.info("rangeMap:{}", rangeMap);
+ partGraphIdMap=getGraphId(graphName,processAddresses(pdAddress,pdRestPort));
+ log.info("partGraphIdMap:{}", partGraphIdMap);
+
+ }
+ public static String[] processAddresses(String addresses, String newPort) {
+ // 使用逗号分割字符串
+ String[] addressArray = addresses.split(",");
+
+ // 创建一个新的数组来存储处理后的地址
+ String[] processedArray = new String[addressArray.length];
+
+ // 遍历数组并替换端口
+ for (int i = 0; i < addressArray.length; i++) {
+ String address = addressArray[i];
+ // 找到冒号的位置
+ int colonIndex = address.indexOf(":");
+ if (colonIndex != -1) {
+ // 替换端口部分
+ String newAddress = "http://"+address.substring(0, colonIndex + 1) + newPort;
+ processedArray[i] = newAddress;
+ } else {
+ // 如果没有冒号,直接使用原地址
+ processedArray[i] = address;
+ }
+ }
+
+ return processedArray;
+ }
+
+ public static void main(String[] args) {
+// Map graphId = getGraphId("hugegraph", new String[]{"http://10.150.17.39:8620"});
+// System.out.println(graphId);
+ }
+
+ private Map getGraphId(String graphName,String[] urls) {
+ RestClientConfig config = RestClientConfig.builder()
+ .connectTimeout(5*1000) // 连接超时时间 5s
+// .readTimeout(60*60 * 1000) // 读取超时时间 1h
+ .maxConns(10) // 最大连接数
+ .build();
+
+
+ for (String url : urls) {
+ log.info("getGraphId from {}, graphName:{}", url, graphName);
+ RestClient client = null;
+ try {
+ // 创建RestClient对象
+ client = new RestClient(url, config);
+ RestResult restResult = client.get("v1/partitionsAndGraphId" , Collections.singletonMap("graphName", graphName));
+ // 获取响应状态码
+ String content = restResult.content();
+ Map resMap = MAPPER.readValue(content, new TypeReference>() {
+ });
+ log.info("Response :{} ", resMap);
+ // 如果成功,退出循环
+ return resMap;
+ } catch (Exception e) {
+ log.error("Failed to get graphId", e);
+ System.out.println(e);
+ break;
+ } finally {
+ // 确保RestClient被关闭
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ log.error("Failed to close RestClient", e);
+ }
+ }
+ }
+ }
+ return Collections.emptyMap();
+ }
+
+ public Tuple2 getKeyBytes(GraphElement e) {
+ byte[] array = null;
+ if (e.type() == "vertex" && e.id() != null) {
+
+ BytesBuffer buffer = BytesBuffer.allocate( 2+1 + e.id().toString().length());
+ Id id = IdGenerator.of(e.id());
+ buffer.writeId(id);
+ array = buffer.bytes();
+ int code = PartitionUtils.calcHashcode(id.asBytes());
+ log.info("code:{}", code);
+ byte[] buf = new byte[Short.BYTES + array.length + Short.BYTES];
+ // 基于code先拿到partId,然后再基于partId 获取到graphId
+ Integer partId = rangeMap.get((long) code);
+ log.info("partId:{}", partId);
+ Long partGraphId = partGraphIdMap.get((long) partId);
+ // 此处需要加入一个graphId 先默认给 0
+ Bits.putShort(buf, 0, Math.toIntExact(partGraphId));
+ Bits.put(buf, Short.BYTES, array);
+ // code是基于key计算的一个hash值? code
+ Bits.putShort(buf, array.length + Short.BYTES, code);
+ return new Tuple2<>(buf, partId);
+ } else if (e.type() == "edge") {
+ BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
+ Edge edge = (Edge)e;
+// buffer.writeShort();
+ buffer.writeId(IdGenerator.of(edge.sourceId()));
+ buffer.write(HugeType.EDGE_OUT.code());
+ buffer.writeId(IdGenerator.of(graphSchema.getEdgeLabel(e.label()).id())); //出现错误
+ buffer.writeStringWithEnding("");
+ buffer.writeId(IdGenerator.of(edge.targetId()));
+ array = buffer.bytes();
+
+ // 基于code先拿到partId,然后再基于partId 获取到graphId
+ int code = PartitionUtils.calcHashcode(IdGenerator.of(edge.sourceId()).asBytes());
+ Integer partId = rangeMap.get((long) code);
+ Long partGraphId = partGraphIdMap.get((long) partId);
+ byte[] buf = new byte[Short.BYTES + array.length + Short.BYTES];
+ // 此处需要加入一个graphId 先默认给 0
+ Bits.putShort(buf, 0, Math.toIntExact(partGraphId));
+ Bits.put(buf, Short.BYTES, array);
+ // code是基于key计算的一个hash值? code
+ Bits.putShort(buf, array.length + Short.BYTES, code);
+ return new Tuple2<>(buf, code);
+ }
+ return new Tuple2<>(array, 0);
+ }
+
+
+ public byte[] getValueBytes(GraphElement e) {
+ byte[] array = null;
+ if (e.type() == "vertex") {
+ int propsCount = e.properties().size(); //vertex.sizeOfProperties();
+ BytesBuffer buffer = BytesBuffer.allocate(8 + 16 * propsCount);
+ buffer.writeId(IdGenerator.of(graphSchema.getVertexLabel(e.label()).id()));
+ buffer.writeVInt(propsCount);
+ for (Map.Entry entry : e.properties().entrySet()) {
+ PropertyKey propertyKey = graphSchema.getPropertyKey(entry.getKey());
+ buffer.writeVInt(propertyKey.id().intValue());
+ buffer.writeProperty(propertyKey.dataType(),entry.getValue());
+ }
+ array = buffer.bytes();
+ } else if (e.type() == "edge") {
+ int propsCount = e.properties().size();
+ BytesBuffer buffer = BytesBuffer.allocate(4 + 16 * propsCount);
+ buffer.writeVInt(propsCount);
+ for (Map.Entry entry : e.properties().entrySet()) {
+ PropertyKey propertyKey = graphSchema.getPropertyKey(entry.getKey());
+ buffer.writeVInt(propertyKey.id().intValue());
+ buffer.writeProperty(propertyKey.dataType(),entry.getValue());
+ }
+ array = buffer.bytes();
+ }
+
+ return array;
+ }
+
+
+
+
+}
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java
index ea7bbbd9a..328e06ff8 100644
--- a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java
@@ -146,13 +146,13 @@ private void writeGraphElements() {
* */
void writeDirectly(List vertices, List edges) {
for (Vertex vertex : vertices) {
- byte[] rowkey = hBaseSer.getKeyBytes(vertex);
+ byte[] rowkey = hBaseSer.getKeyBytes(vertex)._1;
byte[] values = hBaseSer.getValueBytes(vertex);
sendRpcToHBase("vertex", rowkey, values);
}
for (Edge edge : edges) {
- byte[] rowkey = hBaseSer.getKeyBytes(edge);
+ byte[] rowkey = hBaseSer.getKeyBytes(edge)._1;
byte[] values = hBaseSer.getValueBytes(edge);
sendRpcToHBase("edge", rowkey, values);
}
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/util/Bits.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/util/Bits.java
new file mode 100644
index 000000000..30d7d24c1
--- /dev/null
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/util/Bits.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.serializer.direct.util;
+
+import java.nio.ByteBuffer;
+
+public class Bits {
+ /**
+ * 大头字节序写入short
+ */
+ public static void putShort(byte[] buf, int offSet, int x) {
+ buf[offSet] = (byte) (x >> 8);
+ buf[offSet + 1] = (byte) (x);
+ }
+
+ public static void putInt(byte[] buf, int offSet, int x) {
+ buf[offSet] = (byte) (x >> 24);
+ buf[offSet + 1] = (byte) (x >> 16);
+ buf[offSet + 2] = (byte) (x >> 8);
+ buf[offSet + 3] = (byte) (x);
+ }
+
+ /**
+ * 大头字节序读取short
+ */
+ public static int getShort(byte[] buf, int offSet) {
+ int x = buf[offSet] & 0xff;
+ x = (x << 8) + (buf[offSet + 1] & 0xff);
+ return x;
+ }
+
+ public static int getInt(byte[] buf, int offSet) {
+ int x = (buf[offSet] << 24)
+ + ((buf[offSet + 1] & 0xff) << 16)
+ + ((buf[offSet + 2] & 0xff) << 8)
+ + (buf[offSet + 3] & 0xff);
+ return x;
+ }
+
+ public static void put(byte[] buf, int offSet, byte[] srcBuf) {
+ System.arraycopy(srcBuf, 0, buf, offSet, srcBuf.length);
+ }
+
+ public static int toInt(byte[] bytes) {
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+ buffer.put(bytes);
+ buffer.flip();//need flip
+ return buffer.getInt();
+ }
+}
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/util/PartitionUtils.java b/hugegraph-client/src/main/java/org/apache/hugegraph/util/PartitionUtils.java
new file mode 100644
index 000000000..d38ee0b88
--- /dev/null
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/util/PartitionUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.util;
+
+public class PartitionUtils {
+
+ public static final int MAX_VALUE = 0xffff;
+
+ /**
+ * 计算key的hashcode
+ *
+ * @param key
+ * @return hashcode
+ */
+ public static int calcHashcode(byte[] key) {
+ final int p = 16777619;
+ int hash = (int) 2166136261L;
+ for (byte element : key) {
+ hash = (hash ^ element) * p;
+ }
+ hash += hash << 13;
+ hash ^= hash >> 7;
+ hash += hash << 3;
+ hash ^= hash >> 17;
+ hash += hash << 5;
+ hash = hash & PartitionUtils.MAX_VALUE;
+ if (hash == PartitionUtils.MAX_VALUE) {
+ hash = PartitionUtils.MAX_VALUE - 1;
+ }
+ return hash;
+ }
+}
diff --git a/hugegraph-loader/assembly/static/bin/get-params.sh b/hugegraph-loader/assembly/static/bin/get-params.sh
index dde3f0c7c..860e37d77 100644
--- a/hugegraph-loader/assembly/static/bin/get-params.sh
+++ b/hugegraph-loader/assembly/static/bin/get-params.sh
@@ -27,7 +27,8 @@ function get_params() {
--incremental-mode | --failure-mode | --batch-insert-threads | --single-insert-threads | \
--max-conn | --max-conn-per-route | --batch-size | --max-parse-errors | --max-insert-errors | \
--timeout | --shutdown-timeout | --retry-times | --retry-interval | --check-vertex | \
- --print-progress | --dry-run | --sink-type | --vertex-partitions | --edge-partitions | --help )
+ --print-progress | --dry-run | --sink-type | --vertex-partitions | --edge-partitions | --backend \
+ | --help )
HUGEGRAPH_PARAMS="$HUGEGRAPH_PARAMS $1 $2"
shift 2
;;
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/DirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/DirectLoader.java
index f99e4a497..cf15f9569 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/DirectLoader.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/DirectLoader.java
@@ -64,6 +64,19 @@ protected List getElementBuilders() {
return buildersForGraphElement;
}
+ protected List getElementBuilders(LoadContext context) {
+ context.schemaCache().updateAll();
+ List buildersForGraphElement = new LinkedList<>();
+ for (VertexMapping vertexMapping : struct.vertices()) {
+ buildersForGraphElement.add(new VertexBuilder(context, struct, vertexMapping));
+ }
+ for (EdgeMapping edgeMapping : struct.edges()) {
+ buildersForGraphElement.add(new EdgeBuilder(context, struct, edgeMapping));
+ }
+ context.close();// 关闭了hugeclient
+ return buildersForGraphElement;
+ }
+
abstract JavaPairRDD buildVertexAndEdge(Dataset ds);
abstract String generateFiles(JavaPairRDD buildAndSerRdd);
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java
index dfc9fd998..0ba32fe37 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java
@@ -41,6 +41,7 @@
import org.apache.hugegraph.loader.executor.LoadOptions;
import org.apache.hugegraph.loader.mapping.ElementMapping;
import org.apache.hugegraph.loader.mapping.InputStruct;
+import org.apache.hugegraph.loader.metrics.DistributedLoadMetrics;
import org.apache.hugegraph.loader.metrics.LoadDistributeMetrics;
import org.apache.hugegraph.loader.util.HugeClientHolder;
import org.apache.hugegraph.serializer.direct.HBaseSerializer;
@@ -60,7 +61,7 @@
public class HBaseDirectLoader extends DirectLoader {
private SinkToHBase sinkToHBase;
- private LoadDistributeMetrics loadDistributeMetrics;
+ private DistributedLoadMetrics loadDistributeMetrics;
private static final int RANDOM_VALUE1;
private static final short RANDOM_VALUE2;
@@ -127,7 +128,7 @@ public static String fileID() {
public HBaseDirectLoader(LoadOptions loadOptions,
InputStruct struct,
- LoadDistributeMetrics loadDistributeMetrics) {
+ DistributedLoadMetrics loadDistributeMetrics) {
super(loadOptions, struct);
this.loadDistributeMetrics = loadDistributeMetrics;
this.sinkToHBase = new SinkToHBase(loadOptions);
@@ -270,7 +271,8 @@ List> buildAndSer(HBaseSerializer seria
LOG.debug("vertex already build done {} ", vertex.toString());
Tuple2 tuple2 =
vertexSerialize(serializer, vertex);
- loadDistributeMetrics.increaseDisVertexInsertSuccess(builder.mapping());
+ loadDistributeMetrics.vertexMetrics().get(builder.mapping().label()).plusDisParseSuccess(1L);
+ loadDistributeMetrics.vertexMetrics().get(builder.mapping().label()).plusDisInsertSuccess(1L);
result.add(tuple2);
}
} else {
@@ -278,7 +280,8 @@ List> buildAndSer(HBaseSerializer seria
LOG.debug("edge already build done {}", edge.toString());
Tuple2 tuple2 =
edgeSerialize(serializer, edge);
- loadDistributeMetrics.increaseDisEdgeInsertSuccess(builder.mapping());
+ loadDistributeMetrics.edgeMetrics().get(builder.mapping().label()).plusDisParseSuccess(1L);
+ loadDistributeMetrics.edgeMetrics().get(builder.mapping().label()).plusDisInsertSuccess(1L);
result.add(tuple2);
}
@@ -290,7 +293,7 @@ List> buildAndSer(HBaseSerializer seria
private Tuple2 edgeSerialize(HBaseSerializer serializer,
Edge edge) {
LOG.debug("edge start serialize {}", edge.toString());
- byte[] rowkey = serializer.getKeyBytes(edge);
+ byte[] rowkey = serializer.getKeyBytes(edge)._1;
byte[] values = serializer.getValueBytes(edge);
ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
rowKey.set(rowkey);
@@ -302,7 +305,7 @@ private Tuple2 edgeSerialize(HBaseSerializer s
private Tuple2 vertexSerialize(HBaseSerializer serializer,
Vertex vertex) {
LOG.debug("vertex start serialize {}", vertex.toString());
- byte[] rowkey = serializer.getKeyBytes(vertex);
+ byte[] rowkey = serializer.getKeyBytes(vertex)._1;
byte[] values = serializer.getValueBytes(vertex);
ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
rowKey.set(rowkey);
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HStoreDirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HStoreDirectLoader.java
new file mode 100644
index 000000000..d8cd209f5
--- /dev/null
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HStoreDirectLoader.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.loader.direct.loader;
+
+import org.apache.hugegraph.loader.builder.ElementBuilder;
+import org.apache.hugegraph.loader.executor.LoadContext;
+import org.apache.hugegraph.loader.executor.LoadOptions;
+import org.apache.hugegraph.loader.mapping.ElementMapping;
+import org.apache.hugegraph.loader.mapping.InputStruct;
+import org.apache.hugegraph.loader.metrics.DistributedLoadMetrics;
+import org.apache.hugegraph.serializer.GraphElementSerializer;
+import org.apache.hugegraph.structure.GraphElement;
+import org.apache.hugegraph.structure.graph.Edge;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.hugegraph.util.Log;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HStoreDirectLoader extends DirectLoader,byte[]> {
+
+ private DistributedLoadMetrics loadDistributeMetrics;
+
+
+ public static final Logger LOG = Log.logger(HStoreDirectLoader.class);
+
+
+ public HStoreDirectLoader(LoadOptions loadOptions,
+ InputStruct struct,
+ DistributedLoadMetrics loadDistributeMetrics) {
+ super(loadOptions, struct);
+ this.loadDistributeMetrics = loadDistributeMetrics;
+ }
+
+
+ @Override
+ public JavaPairRDD, byte[]> buildVertexAndEdge(Dataset ds) {
+
+ LOG.info("Start build vertexes and edges");
+ JavaPairRDD, byte[]> tuple2JavaPairRDD = ds.toJavaRDD().mapPartitionsToPair(
+ (PairFlatMapFunction, Tuple2, byte[]>) rowIter -> {
+ // 完成了schema数据的准备,以及pdclient的每个分区的创建
+ LoadContext loaderContext = new LoadContext(super.loadOptions);
+ loaderContext.init(struct);// 准备好elementBuilder以及schema数据的更新
+ List buildersForGraphElement = getElementBuilders(loaderContext);
+ List, byte[]>> result = new LinkedList<>();
+ while (rowIter.hasNext()) {
+ Row row = rowIter.next();
+ List, byte[]>> serList;
+ serList = buildAndSer(loaderContext.getSerializer(), row, buildersForGraphElement);
+ result.addAll(serList);
+ }
+ return result.iterator();
+ }
+ );
+
+ return tuple2JavaPairRDD;
+ }
+
+ @Override
+ protected String generateFiles(JavaPairRDD, byte[]> buildAndSerRdd) {
+ return "";
+ }
+
+
+
+
+
+ @Override
+ protected void loadFiles(String path) {
+
+ }
+
+
+ List, byte[]>> buildAndSer(GraphElementSerializer serializer, Row row,
+ List builders) {
+ List elementsElement;
+ List, byte[]>> result = new LinkedList<>();
+
+ for (ElementBuilder builder : builders) {
+ ElementMapping elementMapping = builder.mapping();
+ if (elementMapping.skip()) {
+ continue;
+ }
+ if ("".equals(row.mkString())) {
+ break;
+ }
+ switch (struct.input().type()) {
+ case FILE:
+ case HDFS:
+ elementsElement = builder.build(row);
+ break;
+ default:
+ throw new AssertionError(String.format("Unsupported input source '%s'",
+ struct.input().type()));
+ }
+
+ boolean isVertex = builder.mapping().type().isVertex();
+ if (isVertex) {
+ for (Vertex vertex : (List) (Object) elementsElement) {
+ LOG.debug("vertex already build done {} ", vertex.toString());
+ Tuple2, byte[]> tuple2 =vertexSerialize(serializer, vertex);
+ //mapping.label();
+ String label = builder.mapping().label();
+ loadDistributeMetrics.vertexMetrics().get(builder.mapping().label()).plusDisParseSuccess(1L);
+ loadDistributeMetrics.vertexMetrics().get(builder.mapping().label()).plusDisInsertSuccess(1L);
+
+ result.add(tuple2);
+ }
+ } else {
+ for (Edge edge : (List) (Object) elementsElement) {
+ LOG.debug("edge already build done {}", edge.toString());
+ Tuple2, byte[]> tuple2 =edgeSerialize(serializer, edge);
+ loadDistributeMetrics.edgeMetrics().get(builder.mapping().label()).plusDisParseSuccess(1L);
+ loadDistributeMetrics.edgeMetrics().get(builder.mapping().label()).plusDisInsertSuccess(1L);
+ result.add(tuple2);
+
+ }
+ }
+ }
+ return result;
+ }
+
+ private Tuple2, byte[]> edgeSerialize(GraphElementSerializer serializer, Edge edge) {
+ LOG.debug("edge start serialize {}", edge.toString());
+ Tuple2 keyBytes = serializer.getKeyBytes(edge);
+ byte[] values = serializer.getValueBytes(edge);
+
+ return new Tuple2<>(keyBytes, values);
+ }
+
+ private Tuple2, byte[]> vertexSerialize(GraphElementSerializer serializer,
+ Vertex vertex) {
+ LOG.debug("vertex start serialize {}", vertex.toString());
+ Tuple2 keyBytes = serializer.getKeyBytes(vertex);
+ byte[] values = serializer.getValueBytes(vertex);
+ return new Tuple2<>(keyBytes, values);
+ }
+
+
+ static class TupleComparator implements Comparator>, Serializable {
+ @Override
+ public int compare(Tuple2 a, Tuple2 b) {
+ return compareByteArrays(a._1, b._1);
+ }
+
+ private int compareByteArrays(byte[] a, byte[] b) {
+ for (int i = 0, j = 0; i < a.length && j < b.length; i++, j++) {
+ int cmp = Byte.compare(a[i], b[j]);
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ return Integer.compare(a.length, b.length);
+ }
+ }
+}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/AbstractDirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/AbstractDirectLoader.java
new file mode 100644
index 000000000..ec0532f1a
--- /dev/null
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/AbstractDirectLoader.java
@@ -0,0 +1,131 @@
+package org.apache.hugegraph.loader.direct.loaders;
+
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hugegraph.loader.builder.EdgeBuilder;
+import org.apache.hugegraph.loader.builder.ElementBuilder;
+import org.apache.hugegraph.loader.builder.VertexBuilder;
+import org.apache.hugegraph.loader.executor.LoadContext;
+import org.apache.hugegraph.loader.executor.LoadOptions;
+import org.apache.hugegraph.loader.mapping.EdgeMapping;
+import org.apache.hugegraph.loader.mapping.ElementMapping;
+import org.apache.hugegraph.loader.mapping.InputStruct;
+import org.apache.hugegraph.loader.mapping.VertexMapping;
+import org.apache.hugegraph.loader.metrics.DistributedLoadMetrics;
+import org.apache.hugegraph.serializer.GraphElementSerializer;
+import org.apache.hugegraph.structure.GraphElement;
+import org.apache.hugegraph.structure.graph.Edge;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.hugegraph.util.Log;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+
+public abstract class AbstractDirectLoader implements DirectLoader, Serializable {
+
+ protected LoadOptions loadOptions;
+ protected InputStruct struct;
+ protected DistributedLoadMetrics loadDistributeMetrics;
+
+ public static final Logger LOG = Log.logger(AbstractDirectLoader.class);
+
+ public AbstractDirectLoader(LoadOptions loadOptions, InputStruct struct, DistributedLoadMetrics loadDistributeMetrics) {
+ this.loadOptions = loadOptions;
+ this.struct = struct;
+ this.loadDistributeMetrics = loadDistributeMetrics;
+ }
+ public AbstractDirectLoader(LoadOptions loadOptions,InputStruct struct) {
+ this.loadOptions = loadOptions;
+ this.struct = struct;
+ }
+
+ public void flushPermission(Configuration conf, String path){
+ FsShell shell = new FsShell(conf);
+ try {
+ LOG.info("shell start execute");
+ shell.run(new String[]{"-chmod", "-R", "777", path});
+ shell.close();
+ } catch (Exception e) {
+ LOG.error("Couldnt change the file permissions " + e
+ + " Please run command:"
+ + "hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles "
+ + path + " '"
+ + "test" + "'\n"
+ + " to load generated HFiles into HBase table");
+ }
+ }
+
+ @Override
+ public void bulkload(Dataset ds) {
+ JavaPairRDD javaPairRDD = buildVertexAndEdge(ds);
+ String path = generateFiles(javaPairRDD);
+ loadFiles(path);
+ }
+
+ protected List getElementBuilders(LoadContext context) {
+ context.schemaCache().updateAll();
+ List buildersForGraphElement = new LinkedList<>();
+ for (VertexMapping vertexMapping : struct.vertices()) {
+ buildersForGraphElement.add(new VertexBuilder(context, struct, vertexMapping));
+ }
+ for (EdgeMapping edgeMapping : struct.edges()) {
+ buildersForGraphElement.add(new EdgeBuilder(context, struct, edgeMapping));
+ }
+ return buildersForGraphElement;
+ }
+
+ protected List> buildAndSer(GraphElementSerializer serializer, Row row, List builders) {
+ List elementsElement;
+ List> result = new LinkedList<>();
+
+ for (ElementBuilder builder : builders) {
+ ElementMapping elementMapping = builder.mapping();
+ if (elementMapping.skip()) {
+ continue;
+ }
+ if ("".equals(row.mkString())) {
+ break;
+ }
+ switch (struct.input().type()) {
+ case FILE:
+ case HDFS:
+ elementsElement = builder.build(row);
+ break;
+ default:
+ throw new AssertionError(String.format("Unsupported input source '%s'", struct.input().type()));
+ }
+
+ boolean isVertex = builder.mapping().type().isVertex();
+ if (isVertex) {
+ for (Vertex vertex : (List) (Object) elementsElement) {
+ LOG.debug("vertex already build done {} ", vertex.toString());
+ Tuple2 tuple2 = vertexSerialize(serializer, vertex);
+ loadDistributeMetrics.vertexMetrics().get(builder.mapping().label()).plusDisParseSuccess(1L);
+ loadDistributeMetrics.vertexMetrics().get(builder.mapping().label()).plusDisInsertSuccess(1L);
+ result.add(tuple2);
+ }
+ } else {
+ for (Edge edge : (List) (Object) elementsElement) {
+ LOG.debug("edge already build done {}", edge.toString());
+ Tuple2 tuple2 = edgeSerialize(serializer, edge);
+ loadDistributeMetrics.edgeMetrics().get(builder.mapping().label()).plusDisParseSuccess(1L);
+ loadDistributeMetrics.edgeMetrics().get(builder.mapping().label()).plusDisInsertSuccess(1L);
+ result.add(tuple2);
+ }
+ }
+ }
+ return result;
+ }
+
+ protected abstract Tuple2 vertexSerialize(GraphElementSerializer serializer, Vertex vertex);
+
+ protected abstract Tuple2 edgeSerialize(GraphElementSerializer serializer, Edge edge);
+}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/DirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/DirectLoader.java
new file mode 100644
index 000000000..df33ae230
--- /dev/null
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/DirectLoader.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.loader.direct.loaders;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+public interface DirectLoader {
+ JavaPairRDD buildVertexAndEdge(Dataset ds);
+ String generateFiles(JavaPairRDD buildAndSerRdd);
+ void loadFiles(String path);
+ void bulkload(Dataset ds);
+}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/HBaseDirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/HBaseDirectLoader.java
new file mode 100644
index 000000000..625a4c3c6
--- /dev/null
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/HBaseDirectLoader.java
@@ -0,0 +1,228 @@
+package org.apache.hugegraph.loader.direct.loaders;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hugegraph.loader.builder.ElementBuilder;
+import org.apache.hugegraph.loader.constant.Constants;
+import org.apache.hugegraph.loader.direct.util.SinkToHBase;
+import org.apache.hugegraph.loader.executor.LoadContext;
+import org.apache.hugegraph.loader.executor.LoadOptions;
+import org.apache.hugegraph.loader.mapping.InputStruct;
+import org.apache.hugegraph.loader.metrics.DistributedLoadMetrics;
+import org.apache.hugegraph.loader.util.HugeClientHolder;
+import org.apache.hugegraph.serializer.GraphElementSerializer;
+import org.apache.hugegraph.serializer.direct.HBaseSerializer;
+import org.apache.hugegraph.structure.graph.Edge;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.hugegraph.util.Log;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class HBaseDirectLoader extends AbstractDirectLoader {
+
+ private SinkToHBase sinkToHBase;
+
+ private static final int RANDOM_VALUE1;
+ private static final short RANDOM_VALUE2;
+ private static final AtomicInteger NEXT_COUNTER;
+
+ public static final Logger LOG = Log.logger(org.apache.hugegraph.loader.direct.loader.HBaseDirectLoader.class);
+
+ static {
+ try {
+ SecureRandom secureRandom = new SecureRandom();
+ RANDOM_VALUE1 = secureRandom.nextInt(0x01000000);
+ RANDOM_VALUE2 = (short) secureRandom.nextInt(0x00008000);
+ NEXT_COUNTER = new AtomicInteger(new SecureRandom().nextInt());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static byte int3(final int x) {
+ return (byte) (x >> 24);
+ }
+
+ private static byte int2(final int x) {
+ return (byte) (x >> 16);
+ }
+
+ private static byte int1(final int x) {
+ return (byte) (x >> 8);
+ }
+
+ private static byte int0(final int x) {
+ return (byte) (x);
+ }
+
+ private static byte short1(final short x) {
+ return (byte) (x >> 8);
+ }
+
+ private static byte short0(final short x) {
+ return (byte) (x);
+ }
+
+ public static String fileID() {
+ long timeStamp = System.currentTimeMillis() / 1000;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(12);
+
+ byteBuffer.put(int3((int) timeStamp));
+ byteBuffer.put(int2((int) timeStamp));
+ byteBuffer.put(int1((int) timeStamp));
+ byteBuffer.put(int0((int) timeStamp));
+
+ byteBuffer.put(int2(RANDOM_VALUE1));
+ byteBuffer.put(int1(RANDOM_VALUE1));
+ byteBuffer.put(int0(RANDOM_VALUE1));
+ byteBuffer.put(short1(RANDOM_VALUE2));
+ byteBuffer.put(short0(RANDOM_VALUE2));
+
+ byteBuffer.put(int2(NEXT_COUNTER.incrementAndGet()));
+ byteBuffer.put(int1(NEXT_COUNTER.incrementAndGet()));
+ byteBuffer.put(int0(NEXT_COUNTER.incrementAndGet()));
+
+ return Bytes.toHex(byteBuffer.array());
+ }
+
+ public HBaseDirectLoader(LoadOptions loadOptions, InputStruct struct, DistributedLoadMetrics loadDistributeMetrics) {
+ super(loadOptions, struct, loadDistributeMetrics);
+ this.sinkToHBase = new SinkToHBase(loadOptions);
+ }
+
+ public HBaseDirectLoader(LoadOptions loadOptions,InputStruct struct) {
+ super(loadOptions,struct);
+ this.sinkToHBase = new SinkToHBase(loadOptions);
+ }
+
+ @Override
+ public JavaPairRDD buildVertexAndEdge(Dataset ds) {
+ LOG.info("Start build vertexes and edges");
+ return ds.toJavaRDD().mapPartitionsToPair(
+ (PairFlatMapFunction, ImmutableBytesWritable, KeyValue>) rowIter -> {
+ HBaseSerializer ser = new HBaseSerializer(HugeClientHolder.create(loadOptions), loadOptions.vertexPartitions, loadOptions.edgePartitions);
+ LoadContext loaderContext = new LoadContext(super.loadOptions);
+ loaderContext.init(struct);// 准备好elementBuilder以及schema数据的更新
+ List buildersForGraphElement = getElementBuilders(loaderContext);
+ List> result = new LinkedList<>();
+ while (rowIter.hasNext()) {
+ Row row = rowIter.next();
+ List> serList;
+ serList = buildAndSer(ser, row, buildersForGraphElement);
+ result.addAll(serList);
+ }
+ ser.close();
+ return result.iterator();
+ }
+ );
+ }
+
+ @Override
+ public String generateFiles(JavaPairRDD buildAndSerRdd) {
+ LOG.info("Start to generate hfile");
+ try {
+ Tuple2 tuple = sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
+ Partitioner partitioner = tuple._1;
+ TableDescriptor tableDescriptor = tuple._2;
+
+ JavaPairRDD repartitionedRdd = buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
+ Configuration conf = sinkToHBase.getHBaseConfiguration().get();
+ Job job = Job.getInstance(conf);
+ HFileOutputFormat2.configureIncrementalLoadMap(job, tableDescriptor);
+ conf.set("hbase.mapreduce.hfileoutputformat.table.name", tableDescriptor.getTableName().getNameAsString());
+ String path = getHFilePath(job.getConfiguration());
+ repartitionedRdd.saveAsNewAPIHadoopFile(path, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, conf);
+ LOG.info("Saved HFiles to: '{}'", path);
+ flushPermission(conf, path);
+ return path;
+ } catch (IOException e) {
+ LOG.error("Failed to generate files", e);
+ }
+ return Constants.EMPTY_STR;
+ }
+
+ @Override
+ public void loadFiles(String path) {
+ try {
+ sinkToHBase.loadHfiles(path, getTableName());
+ } catch (Exception e) {
+ LOG.error("Failed to load hfiles", e);
+ }
+ }
+
+ private String getTableName() {
+ return struct.edges().size() > 0 ? loadOptions.edgeTableName : loadOptions.vertexTableName;
+ }
+
+ private Integer getTablePartitions() {
+ return struct.edges().size() > 0 ? loadOptions.edgePartitions : loadOptions.vertexPartitions;
+ }
+
+ private String getHFilePath(Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ String fileID = fileID();
+ String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" + "/" + fileID + "/";
+ Path hfileGenPath = new Path(pathStr);
+ if (fs.exists(hfileGenPath)) {
+ LOG.info("\n Delete the path where the hfile is generated,path {} ", pathStr);
+ fs.delete(hfileGenPath, true);
+ }
+ return pathStr;
+ }
+
+// private void flushPermission(Configuration conf, String path) {
+// org.apache.hadoop.fs.FsShell shell = new org.apache.hadoop.fs.FsShell(conf);
+// try {
+// LOG.info("Chmod hfile directory permission");
+// shell.run(new String[]{"-chmod", "-R", "777", path});
+// shell.close();
+// } catch (Exception e) {
+// LOG.error("Couldn't change the file permissions " + e + " Please run command:" +
+// "hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles " + path +
+// " '" + "test" + "'\n" + " to load generated HFiles into HBase table");
+// }
+// }
+
+ @Override
+ protected Tuple2 vertexSerialize(GraphElementSerializer serializer, Vertex vertex) {
+ LOG.debug("vertex start serialize {}", vertex.toString());
+ byte[] rowkey = serializer.getKeyBytes(vertex)._1;
+ byte[] values = serializer.getValueBytes(vertex);
+ ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
+ rowKey.set(rowkey);
+ KeyValue keyValue = new KeyValue(rowkey, Bytes.toBytes(Constants.HBASE_COL_FAMILY), Bytes.toBytes(Constants.EMPTY_STR), values);
+ return new Tuple2<>(rowKey, keyValue);
+ }
+
+ @Override
+ protected Tuple2 edgeSerialize(GraphElementSerializer serializer, Edge edge) {
+ LOG.debug("edge start serialize {}", edge.toString());
+ byte[] rowkey = serializer.getKeyBytes(edge)._1;
+ byte[] values = serializer.getValueBytes(edge);
+ ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
+ rowKey.set(rowkey);
+ KeyValue keyValue = new KeyValue(rowkey, Bytes.toBytes(Constants.HBASE_COL_FAMILY), Bytes.toBytes(Constants.EMPTY_STR), values);
+ return new Tuple2<>(rowKey, keyValue);
+ }
+}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/HStoreDirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/HStoreDirectLoader.java
new file mode 100644
index 000000000..480162ba7
--- /dev/null
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/HStoreDirectLoader.java
@@ -0,0 +1,232 @@
+package org.apache.hugegraph.loader.direct.loaders;
+
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hugegraph.client.RestClient;
+import org.apache.hugegraph.loader.builder.ElementBuilder;
+import org.apache.hugegraph.loader.direct.outputformat.SSTFileOutputFormat;
+import org.apache.hugegraph.loader.direct.partitioner.HstorePartitioner;
+import org.apache.hugegraph.loader.executor.LoadContext;
+import org.apache.hugegraph.loader.executor.LoadOptions;
+import org.apache.hugegraph.loader.mapping.InputStruct;
+import org.apache.hugegraph.loader.metrics.DistributedLoadMetrics;
+import org.apache.hugegraph.rest.RestClientConfig;
+import org.apache.hugegraph.rest.RestResult;
+import org.apache.hugegraph.serializer.GraphElementSerializer;
+import org.apache.hugegraph.structure.graph.Edge;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+
+import static org.apache.hugegraph.serializer.direct.HStoreSerializer.processAddresses;
+
+public class HStoreDirectLoader extends AbstractDirectLoader, byte[]> {
+
+ public HStoreDirectLoader(LoadOptions loadOptions, InputStruct struct, DistributedLoadMetrics loadDistributeMetrics) {
+ super(loadOptions, struct, loadDistributeMetrics);
+ }
+ public HStoreDirectLoader(LoadOptions loadOptions, InputStruct struct) {
+ super(loadOptions, struct);
+ }
+
+ @Override
+ public JavaPairRDD, byte[]> buildVertexAndEdge(Dataset ds) {
+ LOG.info("Start build vertexes and edges");
+ return ds.toJavaRDD().mapPartitionsToPair(
+ (PairFlatMapFunction, Tuple2, byte[]>) rowIter -> {
+ LoadContext loaderContext = new LoadContext(super.loadOptions);
+ loaderContext.init(struct);
+ List buildersForGraphElement = getElementBuilders(loaderContext);
+ List, byte[]>> result = new LinkedList<>();
+ while (rowIter.hasNext()) {
+ Row row = rowIter.next();
+ List, byte[]>> serList;
+ serList = buildAndSer(loaderContext.getSerializer(), row, buildersForGraphElement);
+ result.addAll(serList);
+ }
+ return result.iterator();
+ }
+ );
+ }
+
+ @Override
+ public String generateFiles(JavaPairRDD, byte[]> buildAndSerRdd) {
+ LOG.info("bulkload start execute>>>");
+ try {
+ // 自定义分区,
+ JavaPairRDD, byte[]> tuple2JavaPairRDD = buildAndSerRdd.partitionBy(new HstorePartitioner(loadOptions.vertexPartitions));
+ // 丢弃partId
+ JavaPairRDD javaPairRDD = tuple2JavaPairRDD.mapToPair(tuple2 -> new Tuple2<>(tuple2._1._1, tuple2._2));
+ JavaPairRDD sortedRdd = javaPairRDD.mapPartitionsToPair(iterator -> {
+ List> partitionData = new ArrayList<>();
+ iterator.forEachRemaining(partitionData::add);
+ Collections.sort(partitionData, new HStoreDirectLoader.TupleComparator());
+ return partitionData.iterator();
+ });
+
+
+ Configuration hadoopConf = new Configuration();
+ String sstFilePath = getSSTFilePath(hadoopConf);
+ LOG.info("SSTFile生成的hdfs路径:{}", sstFilePath);
+ sortedRdd.saveAsNewAPIHadoopFile(
+ sstFilePath,
+ byte[].class,
+ byte[].class,
+ SSTFileOutputFormat.class,
+ hadoopConf
+ );
+ flushPermission(hadoopConf,sstFilePath);
+ return sstFilePath;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+
+ return null;
+ }
+
+ @Override
+ public void loadFiles(String sstFilePath) {
+ RestClientConfig config = RestClientConfig.builder()
+ .connectTimeout(5*1000) // 连接超时时间 5s
+ .readTimeout(60*60 * 1000) // 读取超时时间 1h
+ .maxConns(10) // 最大连接数
+ .build();
+
+ BulkloadInfo bulkloadInfo = new BulkloadInfo(loadOptions.graph, sstFilePath.replace("TXY-HDP11","txy-hn1-bigdata-hdp11-nn-prd-02.myhll.cn:8020"),getBulkloadType());
+ String[] urls = processAddresses(loadOptions.pdAddress, loadOptions.pdRestPort);
+
+ for (String url : urls) {
+ LOG.info("submit bulkload task to {}, bulkloadInfo:{}", url, bulkloadInfo);
+ RestClient client = null;
+ try {
+ // 创建RestClient对象
+ client = new RestClient(url, config);
+ // 获取响应状态码
+ RestResult restResult = client.post("v1/task/bulkload", bulkloadInfo);
+ Map resMap = restResult.readObject(Map.class);
+ LOG.info("Response :{} ", resMap);
+ // 如果成功,退出循环
+ break;
+ } catch (Exception e) {
+ LOG.error("Failed to submit bulkload task", e);
+ break;
+ } finally {
+ // 确保RestClient被关闭
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close RestClient", e);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ protected Tuple2, byte[]> vertexSerialize(GraphElementSerializer serializer, Vertex vertex) {
+ LOG.debug("vertex start serialize {}", vertex.toString());
+ Tuple2 keyBytes = serializer.getKeyBytes(vertex);
+ byte[] values = serializer.getValueBytes(vertex);
+ return new Tuple2<>(keyBytes, values);
+ }
+
+ @Override
+ protected Tuple2, byte[]> edgeSerialize(GraphElementSerializer serializer, Edge edge) {
+ LOG.debug("edge start serialize {}", edge.toString());
+ Tuple2 keyBytes = serializer.getKeyBytes(edge);
+ byte[] values = serializer.getValueBytes(edge);
+ return new Tuple2<>(keyBytes, values);
+ }
+
+ private BulkloadInfo.LoadType getBulkloadType() {
+ return struct.edges().size() > 0 ? BulkloadInfo.LoadType.EDGE : BulkloadInfo.LoadType.VERTEX;
+ }
+
+ private String getSSTFilePath(Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ long timeStr = System.currentTimeMillis();
+ String pathStr = fs.getWorkingDirectory().toString() + "/hg-1_5/gen-sstfile" + "/" + timeStr+ "/";//sstFile 存储路径
+ org.apache.hadoop.fs.Path hfileGenPath = new Path(pathStr);
+ if(fs.exists(hfileGenPath)){
+ LOG.info("\n delete sstFile path \n");
+ fs.delete(hfileGenPath,true);
+ }
+// fs.close();
+ return pathStr;
+ }
+
+ static class TupleComparator implements Comparator>, Serializable {
+ @Override
+ public int compare(Tuple2 a, Tuple2 b) {
+ return compareByteArrays(a._1, b._1);
+ }
+
+ private int compareByteArrays(byte[] a, byte[] b) {
+ for (int i = 0, j = 0; i < a.length && j < b.length; i++, j++) {
+ int cmp = Byte.compare(a[i], b[j]);
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ return Integer.compare(a.length, b.length);
+ }
+ }
+
+ static class BulkloadInfo {
+ String graphName;
+ String tableName;
+ String hdfsPath;
+
+ public BulkloadInfo(String graphName, String path, LoadType loadType) {
+ this.graphName = processGraphName(graphName);
+ this.tableName = processTableName(graphName, loadType);
+ this.hdfsPath = path;
+ }
+
+ private String processGraphName(String graphName) {
+ return graphName + "/g";
+ }
+
+ private String processTableName(String graphName, LoadType loadType) {
+ if (loadType == LoadType.VERTEX) {
+ return "g+v";
+ } else if (loadType == LoadType.EDGE) {
+ return "g+oe";
+ } else {
+ throw new IllegalArgumentException("Invalid loadType: " + loadType);
+ }
+ }
+
+
+
+
+
+ @Override
+ public String toString() {
+ return "BulkloadInfo{" +
+ "graphName='" + graphName + '\'' +
+ ", tableName='" + tableName + '\'' +
+ ", hdfsPath='" + hdfsPath + '\'' +
+ '}';
+ }
+
+ enum LoadType {
+ VERTEX,
+ EDGE
+ }
+
+ }
+
+}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/outputformat/SSTFileOutputFormat.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/outputformat/SSTFileOutputFormat.java
new file mode 100644
index 000000000..c88eb4361
--- /dev/null
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/outputformat/SSTFileOutputFormat.java
@@ -0,0 +1,79 @@
+package org.apache.hugegraph.loader.direct.outputformat;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.SstFileWriter;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SSTFileOutputFormat extends FileOutputFormat {
+
+ @Override
+ public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException {
+ Path outputPath = getDefaultWorkFile(job, ".sst");
+ FileSystem fs = outputPath.getFileSystem(job.getConfiguration());
+ FSDataOutputStream fileOut = fs.create(outputPath, false);
+ return new RocksDBSSTFileRecordWriter(fileOut, outputPath, fs);
+ }
+
+ public static class RocksDBSSTFileRecordWriter extends RecordWriter {
+ private final FSDataOutputStream out;
+ private final SstFileWriter sstFileWriter;
+ private final Path outputPath;
+ private final FileSystem fs;
+ private final File localSSTFile;
+
+ public RocksDBSSTFileRecordWriter(FSDataOutputStream out, Path outputPath, FileSystem fs) throws IOException {
+ this.out = out;
+ this.outputPath = outputPath;
+ this.fs = fs;
+ Options options = new Options();
+ options.setCreateIfMissing(true);
+ this.localSSTFile = File.createTempFile("sstfile", ".sst");
+ this.sstFileWriter = new SstFileWriter(new org.rocksdb.EnvOptions(), options);
+ try {
+ this.sstFileWriter.open(localSSTFile.getAbsolutePath());
+ } catch (RocksDBException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void write(byte[] key, byte[] value) throws IOException {
+ try {
+ sstFileWriter.put(key, value);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException {
+ try {
+ sstFileWriter.finish();
+ try (InputStream in = new FileInputStream(localSSTFile)) {
+ byte[] buffer = new byte[4096];
+ int bytesRead;
+ while ((bytesRead = in.read(buffer)) != -1) {
+ out.write(buffer, 0, bytesRead);
+ }
+ }
+ out.close();
+ localSSTFile.delete();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+}
+
+
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/partitioner/HstorePartitioner.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/partitioner/HstorePartitioner.java
new file mode 100644
index 000000000..e8e34b511
--- /dev/null
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/partitioner/HstorePartitioner.java
@@ -0,0 +1,35 @@
+package org.apache.hugegraph.loader.direct.partitioner;
+
+
+import org.apache.spark.Partitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+public class HstorePartitioner extends Partitioner {
+ private static final Logger LOG = LoggerFactory.getLogger(HstorePartitioner.class);
+
+ private final int numPartitions;
+
+ public HstorePartitioner(int numPartitions) {
+ this.numPartitions = numPartitions;
+ }
+
+
+ @Override
+ public int numPartitions() {
+ return numPartitions;
+ }
+
+ @Override
+ public int getPartition(Object key) {
+
+ try {
+ return ((Tuple2) key)._2;
+ } catch (Exception e) {
+ LOG.error("When trying to get partitionID, encountered exception: {} \t key = {}", e, key);
+ return 0;
+ }
+
+ }
+}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadContext.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadContext.java
index 0be364bb8..aa2f47552 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadContext.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadContext.java
@@ -19,13 +19,24 @@
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hugegraph.loader.builder.EdgeBuilder;
+import org.apache.hugegraph.loader.builder.ElementBuilder;
+import org.apache.hugegraph.loader.builder.VertexBuilder;
import org.apache.hugegraph.loader.exception.LoadException;
+import org.apache.hugegraph.loader.mapping.EdgeMapping;
+import org.apache.hugegraph.loader.mapping.VertexMapping;
import org.apache.hugegraph.loader.progress.LoadProgress;
import org.apache.hugegraph.loader.util.DateUtil;
import org.apache.hugegraph.loader.util.HugeClientHolder;
+import org.apache.hugegraph.serializer.GraphElementSerializer;
+import org.apache.hugegraph.serializer.SerializerFactory;
+import org.apache.hugegraph.serializer.config.SerializerConfig;
+import org.apache.hugegraph.structure.GraphElement;
import org.slf4j.Logger;
import org.apache.hugegraph.driver.HugeClient;
@@ -57,6 +68,12 @@ public final class LoadContext implements Serializable {
private final HugeClient client;
private final SchemaCache schemaCache;
+ private Map> builders;
+
+
+
+ private GraphElementSerializer serializer;
+
public LoadContext(LoadOptions options) {
this.timestamp = DateUtil.now("yyyyMMdd-HHmmss");
@@ -70,6 +87,9 @@ public LoadContext(LoadOptions options) {
this.loggers = new ConcurrentHashMap<>();
this.client = HugeClientHolder.create(options);
this.schemaCache = new SchemaCache(this.client);
+ SerializerConfig config = new SerializerConfig();
+ initSerializerConfig(config);
+ this.serializer= SerializerFactory.getSerializer(client,config);
}
public LoadContext(ComputerLoadOptions options) {
@@ -86,6 +106,15 @@ public LoadContext(ComputerLoadOptions options) {
this.schemaCache = options.schemaCache();
}
+ public void initSerializerConfig(SerializerConfig config){
+ config.setBackendStoreType(options.backendStoreType);
+ config.setGraphName(options.graph);
+ config.setEdgePartitions(options.edgePartitions);
+ config.setPdAddress(options.pdAddress);
+ config.setPdRestPort(options.pdRestPort);
+ config.setEdgePartitions(options.edgePartitions);
+ }
+
public String timestamp() {
return this.timestamp;
}
@@ -132,6 +161,9 @@ public FailLogger failureLogger(InputStruct struct) {
return new FailLogger(this, struct);
});
}
+ public GraphElementSerializer getSerializer() {
+ return serializer;
+ }
public HugeClient client() {
return this.client;
@@ -194,4 +226,12 @@ public void close() {
LOG.info("Close HugeClient successfully");
this.closed = true;
}
+
+ public void init( InputStruct struct) {
+ for (VertexMapping vertexMapping : struct.vertices())
+ this.builders.put(new VertexBuilder(this, struct, vertexMapping), new ArrayList<>());
+ for (EdgeMapping edgeMapping : struct.edges())
+ this.builders.put(new EdgeBuilder(this, struct, edgeMapping), new ArrayList<>());
+ this.updateSchemaCache();
+ }
}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java
index d2a8a4546..dbec11811 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java
@@ -239,6 +239,19 @@ public class LoadOptions implements Serializable {
description = "HBase zookeeper parent")
public String hbaseZKParent;
+ @Parameter(names = {"--backend"}, arity = 1,
+ description = "backend store type")
+ public String backendStoreType = "hstore";
+
+ @Parameter(names = {"--pd-address"}, arity = 1,
+ description = "pd-address")
+ public String pdAddress ;
+
+
+ @Parameter(names = {"--pd-rest-port"}, arity = 1,
+ description = "pd-rest-port")
+ public String pdRestPort="8620";
+
public String workModeString() {
if (this.incrementalMode) {
return "INCREMENTAL MODE";
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/DistributedLoadMetrics.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/DistributedLoadMetrics.java
new file mode 100644
index 000000000..82d502345
--- /dev/null
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/DistributedLoadMetrics.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.hugegraph.loader.metrics;
+
+import org.apache.hugegraph.loader.mapping.EdgeMapping;
+import org.apache.hugegraph.loader.mapping.ElementMapping;
+import org.apache.hugegraph.loader.mapping.InputStruct;
+import org.apache.hugegraph.loader.mapping.VertexMapping;
+import org.apache.spark.SparkContext;
+import org.apache.spark.util.LongAccumulator;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+
+public final class DistributedLoadMetrics implements Serializable {
+ private final InputStruct struct;
+
+ private long readSuccess;
+
+ private long readFailure;
+
+ private boolean inFlight;
+
+ private final LongAdder flightingNums;
+
+ private final Map vertexMetrics;
+
+ private final Map edgeMetrics;
+
+ public DistributedLoadMetrics(InputStruct struct, SparkContext sc) {
+ this.struct = struct;
+ this.readSuccess = 0L;
+ this.readFailure = 0L;
+ this.inFlight = false;
+ this.flightingNums = new LongAdder();
+ this.vertexMetrics = new HashMap<>();
+ this.edgeMetrics = new HashMap<>();
+ for (VertexMapping mapping : struct.vertices()) {
+ DistributedMetrics distributedMetrics = new DistributedMetrics();
+ distributedMetrics.init(sc, mapping.toString());
+ this.vertexMetrics.put(mapping.label(), distributedMetrics);
+ }
+ for (EdgeMapping mapping : struct.edges()) {
+ DistributedMetrics distributedMetrics = new DistributedMetrics();
+ distributedMetrics.init(sc, mapping.toString());
+ this.edgeMetrics.put(mapping.label(), distributedMetrics);
+ }
+ }
+
+ public long readSuccess() {
+ return this.readSuccess;
+ }
+
+ public void readSuccess(long count) {
+ this.readSuccess = count;
+ }
+
+ public void increaseReadSuccess() {
+ this.readSuccess++;
+ }
+
+ public void plusReadSuccess(long count) {
+ this.readSuccess += count;
+ }
+
+ public long readFailure() {
+ return this.readFailure;
+ }
+
+ public void readFailure(long count) {
+ this.readFailure = count;
+ }
+
+ public void increaseReadFailure() {
+ this.readFailure++;
+ }
+
+ public void startInFlight() {
+ this.inFlight = true;
+ }
+
+ public void stopInFlight() {
+ this.inFlight = false;
+ }
+
+ public void plusFlighting(int num) {
+ this.flightingNums.add(num);
+ }
+
+ public void minusFlighting(int num) {
+ this.flightingNums.add(-num);
+ }
+
+ public long parseSuccess(ElementMapping mapping) {
+ return (metrics(mapping)).parseSuccess.value().longValue();
+ }
+
+ public void plusParseSuccess(ElementMapping mapping, long count) {
+ (metrics(mapping)).parseSuccess.add(count);
+ }
+
+ public long parseFailure(ElementMapping mapping) {
+ return
+ (metrics(mapping)).parseFailure.value().longValue();
+ }
+
+ public void increaseParseFailure(ElementMapping mapping) {
+ (metrics(mapping)).parseFailure.add(1L);
+ }
+
+ public long insertSuccess(ElementMapping mapping) {
+ return (metrics(mapping)).insertSuccess.value().longValue();
+ }
+
+ public void plusInsertSuccess(ElementMapping mapping, long count) {
+ (metrics(mapping)).insertSuccess.add(count);
+ }
+
+ public long insertFailure(ElementMapping mapping) {
+ return (metrics(mapping)).insertFailure.value().longValue();
+ }
+
+ public void increaseInsertFailure(ElementMapping mapping) {
+ (metrics(mapping)).insertFailure.add(1L);
+ }
+
+ public Map vertexMetrics() {
+ return this.vertexMetrics;
+ }
+
+ public Map edgeMetrics() {
+ return this.edgeMetrics;
+ }
+
+ public long totalParseFailures() {
+ long total = 0L;
+ for (DistributedMetrics counter : this.vertexMetrics.values())
+ total += counter.parseFailure.value().longValue();
+ for (DistributedMetrics counter : this.edgeMetrics.values())
+ total += counter.parseFailure.value().longValue();
+ return total;
+ }
+
+ public long totalInsertFailures() {
+ long total = 0L;
+ for (DistributedMetrics counter : this.vertexMetrics.values())
+ total += counter.insertFailure.value().longValue();
+ for (DistributedMetrics counter : this.edgeMetrics.values())
+ total += counter.insertFailure.value().longValue();
+ return total;
+ }
+
+ private DistributedMetrics metrics(ElementMapping mapping) {
+ if (mapping.type().isVertex())
+ return this.vertexMetrics.get(mapping.label());
+ return this.edgeMetrics.get(mapping.label());
+ }
+
+ public static class DistributedMetrics implements Serializable {
+ private LongAccumulator parseSuccess;
+
+ private LongAccumulator parseFailure;
+
+ private LongAccumulator insertSuccess;
+
+ private LongAccumulator insertFailure;
+
+ public void init(SparkContext sc, String label) {
+ this.parseSuccess = sc.longAccumulator(label + "_" + "parseSuccess");
+ this.parseFailure = sc.longAccumulator(label + "_" + "parseFailure");
+ this.insertSuccess = sc.longAccumulator(label + "_" + "insertSuccess");
+ this.insertFailure = sc.longAccumulator(label + "_" + "parseFailure");
+ }
+
+ public void plusDisInsertSuccess(Long count) {
+ this.insertSuccess.add(count);
+ }
+
+ public void plusDisParseSuccess(Long count) {
+ this.parseSuccess.add(count);
+ }
+
+ public long parseSuccess() {
+ return this.parseSuccess.value().longValue();
+ }
+
+ public long parseFailure() {
+ return this.parseFailure.value().longValue();
+ }
+
+ public long insertSuccess() {
+ return this.insertSuccess.value().longValue();
+ }
+
+ public long insertFailure() {
+ return this.insertFailure.value().longValue();
+ }
+
+ @Override
+ public String toString() {
+ return
+ "parseSuccess=" + parseSuccess.value() +
+ ", parseFailure=" + parseFailure.value() +
+ ", insertSuccess=" + insertSuccess.value() +
+ ", insertFailure=" + insertFailure.value()
+ ;
+ }
+ }
+}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadReport.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadReport.java
index d0e7dcedd..db23c6019 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadReport.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadReport.java
@@ -99,4 +99,26 @@ public static LoadReport collect(LoadSummary summary) {
}
return report;
}
+
+ public static LoadReport collectDistributed(LoadSummary summary) {
+ LoadReport report = new LoadReport();
+ report.totalTime = summary.totalTime();
+ for (DistributedLoadMetrics metrics : summary.inputDistributedMetricsMap().values()) {
+ report.readSuccess += metrics.readSuccess();
+ report.readFailure += metrics.readFailure();
+ for (DistributedLoadMetrics.DistributedMetrics labelMetrics : metrics.vertexMetrics().values()) {
+ report.vertexParseSuccess += labelMetrics.parseSuccess();
+ report.vertexParseFailure += labelMetrics.parseFailure();
+ report.vertexInsertSuccess += labelMetrics.insertSuccess();
+ report.vertexInsertFailure += labelMetrics.insertFailure();
+ }
+ for (DistributedLoadMetrics.DistributedMetrics labelMetrics : metrics.edgeMetrics().values()) {
+ report.edgeParseSuccess += labelMetrics.parseSuccess();
+ report.edgeParseFailure += labelMetrics.parseFailure();
+ report.edgeInsertSuccess += labelMetrics.insertSuccess();
+ report.edgeInsertFailure += labelMetrics.insertFailure();
+ }
+ }
+ return report;
+ }
}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadSummary.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadSummary.java
index dda1dd099..b671d7132 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadSummary.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadSummary.java
@@ -29,6 +29,7 @@
import org.apache.hugegraph.loader.mapping.InputStruct;
import org.apache.hugegraph.loader.mapping.LoadMapping;
import org.apache.hugegraph.util.InsertionOrderUtil;
+import org.apache.spark.SparkContext;
public final class LoadSummary {
@@ -43,6 +44,8 @@ public final class LoadSummary {
private final RangesTimer loadRangesTimer;
// Every input struct has a metric
private final Map inputMetricsMap;
+ private final Map inputDistributedMetricsMap ;
+
public LoadSummary() {
this.vertexLoaded = new LongAdder();
@@ -55,6 +58,7 @@ public LoadSummary() {
this.edgeRangesTimer = new RangesTimer(Constants.TIME_RANGE_CAPACITY);
this.loadRangesTimer = new RangesTimer(Constants.TIME_RANGE_CAPACITY);
this.inputMetricsMap = InsertionOrderUtil.newMap();
+ this.inputDistributedMetricsMap = InsertionOrderUtil.newMap();
}
public void initMetrics(LoadMapping mapping) {
@@ -63,6 +67,11 @@ public void initMetrics(LoadMapping mapping) {
}
}
+ public void initMetrics(LoadMapping mapping, SparkContext sc) {
+ for (InputStruct struct : mapping.structs())
+ this.inputDistributedMetricsMap.put(struct.id(), new DistributedLoadMetrics(struct, sc));
+ }
+
public Map inputMetricsMap() {
return this.inputMetricsMap;
}
@@ -176,4 +185,13 @@ public long loadRate(ElemType type) {
long success = isVertex ? this.vertexLoaded() : this.edgeLoaded();
return success * 1000 / totalTime;
}
+
+ public DistributedLoadMetrics distributedLoadMetrics(InputStruct struct) {
+ return this.inputDistributedMetricsMap.get(struct.id());
+ }
+
+ public Map inputDistributedMetricsMap() {
+ return this.inputDistributedMetricsMap;
+ }
+
}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
index 61cf3136c..559299711 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
@@ -17,17 +17,23 @@
package org.apache.hugegraph.loader.spark;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hugegraph.driver.GraphManager;
import org.apache.hugegraph.loader.builder.EdgeBuilder;
import org.apache.hugegraph.loader.builder.ElementBuilder;
import org.apache.hugegraph.loader.builder.VertexBuilder;
-import org.apache.hugegraph.loader.direct.loader.HBaseDirectLoader;
+
+import org.apache.hugegraph.loader.direct.loaders.AbstractDirectLoader;
+import org.apache.hugegraph.loader.direct.loaders.HBaseDirectLoader;
+import org.apache.hugegraph.loader.direct.loaders.HStoreDirectLoader;
import org.apache.hugegraph.loader.exception.LoadException;
import org.apache.hugegraph.loader.executor.LoadContext;
import org.apache.hugegraph.loader.executor.LoadOptions;
-import org.apache.hugegraph.loader.metrics.LoadDistributeMetrics;
+import org.apache.hugegraph.loader.metrics.DistributedLoadMetrics;
+import org.apache.hugegraph.loader.metrics.LoadReport;
+import org.apache.hugegraph.loader.metrics.LoadSummary;
import org.apache.hugegraph.loader.source.InputSource;
import org.apache.hugegraph.loader.source.jdbc.JDBCSource;
import org.apache.hugegraph.loader.util.Printer;
@@ -49,8 +55,10 @@
import org.apache.hugegraph.structure.graph.Vertex;
import org.apache.hugegraph.util.Log;
+import org.apache.hugegraph.util.TimeUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -72,6 +80,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import scala.Tuple2;
import scala.collection.JavaConverters;
public class HugeGraphSparkLoader implements Serializable {
@@ -82,6 +91,8 @@ public class HugeGraphSparkLoader implements Serializable {
private final Map> builders;
private final transient ExecutorService executor;
+ private static final String DIVIDE_LINE = StringUtils.repeat('-', 50);
+
public static void main(String[] args) {
HugeGraphSparkLoader loader;
@@ -138,15 +149,28 @@ public void load() throws ExecutionException, InterruptedException {
SparkSession session = SparkSession.builder().config(conf).getOrCreate();
SparkContext sc = session.sparkContext();
- LongAccumulator totalInsertSuccess = sc.longAccumulator("totalInsertSuccess");
+ LoadSummary summary = new LoadSummary();
+ summary.initMetrics(mapping, sc);
+ summary.startTotalTimer();
+
+
List> futures = new ArrayList<>(structs.size());
- for (InputStruct struct : structs) {
+
+ Object[] dstArray = new JavaPairRDD[structs.size()];
+
+
+
+
+ for (int i = 0; i < structs.size(); i++) {
+ InputStruct struct = (InputStruct)structs.get(i);
+ DistributedLoadMetrics distributedLoadMetrics = summary.distributedLoadMetrics(struct);
+ int finalI = i;
+
Future> future = this.executor.submit(() -> {
LOG.info("\n Initializes the accumulator corresponding to the {} ",
struct.input().asFileSource().path());
- LoadDistributeMetrics loadDistributeMetrics = new LoadDistributeMetrics(struct);
- loadDistributeMetrics.init(sc);
+
LOG.info("\n Start to load data, data info is: \t {} ",
struct.input().asFileSource().path());
Dataset ds = read(session, struct);
@@ -155,21 +179,15 @@ public void load() throws ExecutionException, InterruptedException {
ds.foreachPartition((Iterator p) -> {
LoadContext context = initPartition(this.loadOptions, struct);
p.forEachRemaining((Row row) -> {
- loadRow(struct, row, p, context);
+ loadRow(struct, row, p, context, distributedLoadMetrics);
});
context.close();
});
} else {
LOG.info("\n Start to load data using spark bulkload \n");
- // gen-hfile
- HBaseDirectLoader directLoader = new HBaseDirectLoader(loadOptions, struct,
- loadDistributeMetrics);
- directLoader.bulkload(ds);
-
+ processDirectLoader(struct, distributedLoadMetrics, ds, dstArray, finalI);
}
- collectLoadMetrics(loadDistributeMetrics, totalInsertSuccess);
- LOG.info("\n Finished load {} data ", struct.input().asFileSource().path());
});
futures.add(future);
}
@@ -177,22 +195,107 @@ public void load() throws ExecutionException, InterruptedException {
future.get();
}
- Long totalInsertSuccessCnt = totalInsertSuccess.value();
- LOG.info("\n ------------The data load task is complete-------------------\n" +
- "\n insertSuccessCnt:\t {} \n ---------------------------------------------\n",
- totalInsertSuccessCnt);
+ JavaPairRDD unionRDD = null;
+ String path = null;
+ switch (loadOptions.backendStoreType) {
+ case "hbase":
+ unionRDD = ((JavaPairRDD[]) dstArray)[0];
+ for (int i = 1; i < dstArray.length; i++) {
+ unionRDD = unionRDD.union(((JavaPairRDD[]) dstArray)[i]);
+ }
+ HBaseDirectLoader hbaseDirectLoader = new HBaseDirectLoader(this.loadOptions,structs.get(0));
+ path = hbaseDirectLoader.generateFiles(unionRDD);
+ hbaseDirectLoader.loadFiles(path);
+
+
+ break;
+ case "hstore":
+ unionRDD = ((JavaPairRDD, byte[]>[]) dstArray)[0];
+ for (int i = 1; i < dstArray.length; i++) {
+ unionRDD = unionRDD.union(((JavaPairRDD, byte[]>[]) dstArray)[i]);
+ }
+ HStoreDirectLoader hstoreDirectLoader = new HStoreDirectLoader(this.loadOptions,structs.get(0));
+ path = hstoreDirectLoader.generateFiles(unionRDD);
+ hstoreDirectLoader.loadFiles(path);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported backend store type: " + loadOptions.backendStoreType);
+ }
+
+
+
+ summary.stopTotalTimer();
+ printDistributedSummary(summary);
sc.stop();
session.close();
session.stop();
}
- private void collectLoadMetrics(LoadDistributeMetrics loadMetrics,
- LongAccumulator totalInsertSuccess) {
- Long edgeInsertSuccess = loadMetrics.readEdgeInsertSuccess();
- Long vertexInsertSuccess = loadMetrics.readVertexInsertSuccess();
- totalInsertSuccess.add(edgeInsertSuccess);
- totalInsertSuccess.add(vertexInsertSuccess);
+ private static void log(String message) {
+ LOG.info(message);
+ }
+ public static void printDistributedSummary(LoadSummary summary) {
+ log(DIVIDE_LINE);
+ log("detail metrics");
+ summary.inputDistributedMetricsMap().forEach((id, metrics) -> {
+ log("");
+ log(String.format("input-struct '%s'", new Object[] { id }));
+ metrics.vertexMetrics().forEach((name,distributedMetrics)->{
+ log(String.format("vertex '%s'", new Object[] { name }));
+ log(distributedMetrics.toString());
+ });
+ metrics.edgeMetrics().forEach((name,distributedMetrics)->{
+ log(String.format("edge '%s'", new Object[] { name }));
+ log(distributedMetrics.toString());
+ });
+ });
+ log(DIVIDE_LINE);
+ LoadReport loadReport = LoadReport.collectDistributed(summary);
+ printCountReport(loadReport);
+ log(DIVIDE_LINE);
+ printMeterReport(summary, loadReport);
+ }
+
+ private static void printMeterReport(LoadSummary summary, LoadReport report) {
+ long totalTime = summary.totalTime();
+ long vertexTime = summary.vertexTime();
+ long edgeTime = summary.edgeTime();
+ long loadTime = totalTime;
+ long readTime = totalTime - loadTime;
+ log("meter metrics");
+ log("total time", TimeUtil.readableTime(totalTime));
+ log("read time", TimeUtil.readableTime(readTime));
+ log("load time", TimeUtil.readableTime(loadTime));
+ log("vertex load time", TimeUtil.readableTime(vertexTime));
+ log("vertex load rate(vertices/s)",
+ (report.vertexInsertSuccess() == 0L) ? "0.0" : String.format("%.2f", new Object[] { Double.valueOf(report.vertexInsertSuccess() * 1000.0D / totalTime) }));
+ log("edge load time", TimeUtil.readableTime(edgeTime));
+ log("edge load rate(edges/s)",
+ (report.edgeInsertSuccess() == 0L) ? "0.0" : String.format("%.2f", new Object[] { Double.valueOf(report.edgeInsertSuccess() * 1000.0D / totalTime) }));
+ }
+
+ private static void log(String key, String value) {
+ log(String.format(" %-30s: %-20s", new Object[] { key, value }));
+ }
+ private static void printCountReport(LoadReport report) {
+ log("count metrics");
+ log("input read success", report.readSuccess());
+ log("input read failure", report.readFailure());
+ log("vertex parse success", report.vertexParseSuccess());
+ log("vertex parse failure", report.vertexParseFailure());
+ log("vertex insert success", report.vertexInsertSuccess());
+ log("vertex insert failure", report.vertexInsertFailure());
+ log("edge parse success", report.edgeParseSuccess());
+ log("edge parse failure", report.edgeParseFailure());
+ log("edge insert success", report.edgeInsertSuccess());
+ log("edge insert failure", report.edgeInsertFailure());
+ log("total insert success", report.vertexInsertSuccess() + report.edgeInsertSuccess());
+ log("total insert failure", report.vertexInsertFailure() + report.edgeInsertFailure());
+ }
+
+ private static void log(String key, long value) {
+ LOG.info(String.format(" %-30s: %-20d", new Object[] { key, Long.valueOf(value) }));
}
private LoadContext initPartition(
@@ -211,7 +314,7 @@ private LoadContext initPartition(
}
private void loadRow(InputStruct struct, Row row, Iterator p,
- LoadContext context) {
+ LoadContext context,DistributedLoadMetrics distributedLoadMetrics) {
for (Map.Entry> builderMap :
this.builders.entrySet()) {
ElementMapping elementMapping = builderMap.getKey().mapping();
@@ -219,13 +322,13 @@ private void loadRow(InputStruct struct, Row row, Iterator p,
if (elementMapping.skip()) {
continue;
}
- parse(row, builderMap, struct);
+ parse(row, builderMap, struct,distributedLoadMetrics);
// Insert
List graphElements = builderMap.getValue();
if (graphElements.size() >= elementMapping.batchSize() ||
(!p.hasNext() && graphElements.size() > 0)) {
- flush(builderMap, context.client().graph(), this.loadOptions.checkVertex);
+ flush(builderMap, context.client().graph(), this.loadOptions.checkVertex,distributedLoadMetrics);
}
}
}
@@ -282,7 +385,7 @@ private Dataset read(SparkSession ss, InputStruct struct) {
}
private void parse(Row row, Map.Entry> builderMap,
- InputStruct struct) {
+ InputStruct struct,DistributedLoadMetrics loadDistributeMetrics) {
ElementBuilder builder = builderMap.getKey();
List graphElements = builderMap.getValue();
if ("".equals(row.mkString())) {
@@ -300,6 +403,7 @@ private void parse(Row row, Map.Entry> builde
} else {
elements = builder.build(row);
}
+ loadDistributeMetrics.plusParseSuccess(builder.mapping(),elements.size());
break;
case JDBC:
Object[] structFields = JavaConverters.asJavaCollection(row.schema().toList())
@@ -312,6 +416,7 @@ private void parse(Row row, Map.Entry> builde
values[i] = row.get(i);
}
elements = builder.build(headers, values);
+ loadDistributeMetrics.plusParseSuccess(builder.mapping(),elements.size());
break;
default:
throw new AssertionError(String.format("Unsupported input source '%s'",
@@ -321,7 +426,7 @@ private void parse(Row row, Map.Entry> builde
}
private void flush(Map.Entry> builderMap,
- GraphManager g, boolean isCheckVertex) {
+ GraphManager g, boolean isCheckVertex,DistributedLoadMetrics loadDistributeMetrics) {
ElementBuilder builder = builderMap.getKey();
ElementMapping elementMapping = builder.mapping();
List graphElements = builderMap.getValue();
@@ -329,9 +434,11 @@ private void flush(Map.Entry> builderMap,
Map updateStrategyMap = elementMapping.updateStrategies();
if (updateStrategyMap.isEmpty()) {
if (isVertex) {
- g.addVertices((List) (Object) graphElements);
+ List vertices = g.addVertices((List) (Object) graphElements);
+ loadDistributeMetrics.plusInsertSuccess(elementMapping, vertices.size());
} else {
- g.addEdges((List) (Object) graphElements);
+ List edges = g.addEdges((List) (Object) graphElements);
+ loadDistributeMetrics.plusInsertSuccess(elementMapping, edges.size());
}
} else {
// CreateIfNotExist doesn't support false now
@@ -341,16 +448,37 @@ private void flush(Map.Entry> builderMap,
req.vertices((List) (Object) graphElements)
.updatingStrategies(updateStrategyMap)
.createIfNotExist(true);
- g.updateVertices(req.build());
+ List vertices = g.updateVertices(req.build());
+ loadDistributeMetrics.plusInsertSuccess(elementMapping, vertices.size());
} else {
BatchEdgeRequest.Builder req = new BatchEdgeRequest.Builder();
req.edges((List) (Object) graphElements)
.updatingStrategies(updateStrategyMap)
.checkVertex(isCheckVertex)
.createIfNotExist(true);
- g.updateEdges(req.build());
+ List edges = g.updateEdges(req.build());
+ loadDistributeMetrics.plusInsertSuccess(elementMapping, edges.size());
}
}
graphElements.clear();
}
+
+
+ private void processDirectLoader(InputStruct struct, DistributedLoadMetrics distributedLoadMetrics, Dataset ds, Object[] dstArray, int index) {
+ AbstractDirectLoader directLoader;
+ switch (loadOptions.backendStoreType) {
+ case "hbase":
+ directLoader = new HBaseDirectLoader(this.loadOptions, struct, distributedLoadMetrics);
+ ((JavaPairRDD[]) dstArray)[index] = directLoader.buildVertexAndEdge(ds);
+ break;
+ case "hstore":
+ directLoader = new HStoreDirectLoader(this.loadOptions, struct, distributedLoadMetrics);
+ ((JavaPairRDD, byte[]>[]) dstArray)[index] = directLoader.buildVertexAndEdge(ds);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported backend store type: " + loadOptions.backendStoreType);
+ }
+ }
+
+
}
diff --git a/pom.xml b/pom.xml
index eb0ae13cb..3beea0313 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,6 +116,7 @@
provided
provided
2.12
+ 2.12.11
3.0.0
true
true
@@ -299,6 +300,11 @@
gson
${gson.version}
+
+ org.scala-lang
+ scala-library
+ ${scala-lang.version}
+
From 35e2a19c79a0757b56c2a99332b89de484dd5873 Mon Sep 17 00:00:00 2001
From: "alan578.zhao" <956322745@qq.com>
Date: Sat, 12 Oct 2024 18:03:03 +0800
Subject: [PATCH 02/12] =?UTF-8?q?hstore=20bulkload=20=E4=BB=A3=E7=A0=81bug?=
=?UTF-8?q?fix?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../serializer/SerializerFactory.java | 6 +-
.../serializer/direct/HStoreSerializer.java | 28 +-
hugegraph-loader/pom.xml | 4 +-
.../AbstractDirectLoader.java | 11 +-
.../loader/direct/loader/DirectLoader.java | 63 +----
.../direct/loader/HBaseDirectLoader.java | 215 ++++-----------
.../direct/loader/HStoreDirectLoader.java | 252 +++++++++++-------
.../loader/direct/loaders/DirectLoader.java | 29 --
.../direct/loaders/HBaseDirectLoader.java | 228 ----------------
.../direct/loaders/HStoreDirectLoader.java | 232 ----------------
.../loader/executor/LoadContext.java | 13 +-
.../hugegraph/loader/metrics/LoadReport.java | 88 +++---
.../hugegraph/loader/metrics/LoadSummary.java | 22 +-
.../loader/spark/HugeGraphSparkLoader.java | 131 +++++----
14 files changed, 369 insertions(+), 953 deletions(-)
rename hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/{loaders => loader}/AbstractDirectLoader.java (96%)
delete mode 100644 hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/DirectLoader.java
delete mode 100644 hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/HBaseDirectLoader.java
delete mode 100644 hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/HStoreDirectLoader.java
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/SerializerFactory.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/SerializerFactory.java
index 85695c7f8..e0f9375e4 100644
--- a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/SerializerFactory.java
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/SerializerFactory.java
@@ -24,12 +24,12 @@
public class SerializerFactory {
- public static GraphElementSerializer getSerializer(HugeClient client, SerializerConfig config) {
+ public static AbstractGraphElementSerializer getSerializer(HugeClient client, SerializerConfig config) {
switch (config.getBackendStoreType()) {
case "hstore":
- return new HStoreSerializer(client, config.getVertexPartitions(),config.getGraphName(),config.getPdAddress(),config.getPdRestPort());
+ return new HStoreSerializer(client, config.getVertexPartitions(), config.getGraphName(), config.getPdAddress(), config.getPdRestPort());
case "hbase":
- return new HBaseSerializer(client, config.getVertexPartitions(),config.getEdgePartitions());
+ return new HBaseSerializer(client, config.getVertexPartitions(), config.getEdgePartitions());
default:
throw new IllegalArgumentException("Unsupported serializer backend type: " + config.getBackendStoreType());
}
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java
index 7e03869b1..52e36af9b 100644
--- a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java
@@ -47,19 +47,15 @@
public class HStoreSerializer extends AbstractGraphElementSerializer {
-
-
private static final Logger log = LoggerFactory.getLogger(HStoreSerializer.class);
- // 准备好treeRangeMap
- // 准备好partGraphIdMap
private static final ObjectMapper MAPPER = new ObjectMapper();
- private Map partGraphIdMap;
+ private Map partGraphIdMap;
private TreeRangeMap rangeMap;
- public HStoreSerializer(HugeClient client, int numPartitions,String graphName,String pdAddress,String pdRestPort) {
+ public HStoreSerializer(HugeClient client, int numPartitions, String graphName, String pdAddress, String pdRestPort) {
super(client);
rangeMap = TreeRangeMap.create();
int partitionSize = PartitionUtils.MAX_VALUE / numPartitions;
@@ -108,9 +104,9 @@ public static void main(String[] args) {
// System.out.println(graphId);
}
- private Map getGraphId(String graphName,String[] urls) {
+ private Map getGraphId(String graphName, String[] urls) {
RestClientConfig config = RestClientConfig.builder()
- .connectTimeout(5*1000) // 连接超时时间 5s
+ .connectTimeout(5 * 1000) // 连接超时时间 5s
// .readTimeout(60*60 * 1000) // 读取超时时间 1h
.maxConns(10) // 最大连接数
.build();
@@ -122,7 +118,7 @@ private Map getGraphId(String graphName,String[] urls) {
try {
// 创建RestClient对象
client = new RestClient(url, config);
- RestResult restResult = client.get("v1/partitionsAndGraphId" , Collections.singletonMap("graphName", graphName));
+ RestResult restResult = client.get("v1/partitionsAndGraphId", Collections.singletonMap("graphName", graphName));
// 获取响应状态码
String content = restResult.content();
Map resMap = MAPPER.readValue(content, new TypeReference>() {
@@ -152,7 +148,7 @@ public Tuple2 getKeyBytes(GraphElement e) {
byte[] array = null;
if (e.type() == "vertex" && e.id() != null) {
- BytesBuffer buffer = BytesBuffer.allocate( 2+1 + e.id().toString().length());
+ BytesBuffer buffer = BytesBuffer.allocate(2 + 1 + e.id().toString().length());
Id id = IdGenerator.of(e.id());
buffer.writeId(id);
array = buffer.bytes();
@@ -171,7 +167,7 @@ public Tuple2 getKeyBytes(GraphElement e) {
return new Tuple2<>(buf, partId);
} else if (e.type() == "edge") {
BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
- Edge edge = (Edge)e;
+ Edge edge = (Edge) e;
// buffer.writeShort();
buffer.writeId(IdGenerator.of(edge.sourceId()));
buffer.write(HugeType.EDGE_OUT.code());
@@ -206,25 +202,21 @@ public byte[] getValueBytes(GraphElement e) {
for (Map.Entry entry : e.properties().entrySet()) {
PropertyKey propertyKey = graphSchema.getPropertyKey(entry.getKey());
buffer.writeVInt(propertyKey.id().intValue());
- buffer.writeProperty(propertyKey.dataType(),entry.getValue());
+ buffer.writeProperty(propertyKey.dataType(), entry.getValue());
}
array = buffer.bytes();
} else if (e.type() == "edge") {
- int propsCount = e.properties().size();
+ int propsCount = e.properties().size();
BytesBuffer buffer = BytesBuffer.allocate(4 + 16 * propsCount);
buffer.writeVInt(propsCount);
for (Map.Entry entry : e.properties().entrySet()) {
PropertyKey propertyKey = graphSchema.getPropertyKey(entry.getKey());
buffer.writeVInt(propertyKey.id().intValue());
- buffer.writeProperty(propertyKey.dataType(),entry.getValue());
+ buffer.writeProperty(propertyKey.dataType(), entry.getValue());
}
array = buffer.bytes();
}
return array;
}
-
-
-
-
}
diff --git a/hugegraph-loader/pom.xml b/hugegraph-loader/pom.xml
index 339312e30..833f66611 100644
--- a/hugegraph-loader/pom.xml
+++ b/hugegraph-loader/pom.xml
@@ -15,8 +15,8 @@
License for the specific language governing permissions and limitations
under the License.
-->
-
4.0.0
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/AbstractDirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/AbstractDirectLoader.java
similarity index 96%
rename from hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/AbstractDirectLoader.java
rename to hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/AbstractDirectLoader.java
index ec0532f1a..fbc6d912b 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/AbstractDirectLoader.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/AbstractDirectLoader.java
@@ -1,5 +1,4 @@
-package org.apache.hugegraph.loader.direct.loaders;
-
+package org.apache.hugegraph.loader.direct.loader;
import org.apache.hadoop.conf.Configuration;
@@ -31,23 +30,23 @@
public abstract class AbstractDirectLoader implements DirectLoader, Serializable {
+ public static final Logger LOG = Log.logger(AbstractDirectLoader.class);
protected LoadOptions loadOptions;
protected InputStruct struct;
protected DistributedLoadMetrics loadDistributeMetrics;
- public static final Logger LOG = Log.logger(AbstractDirectLoader.class);
-
public AbstractDirectLoader(LoadOptions loadOptions, InputStruct struct, DistributedLoadMetrics loadDistributeMetrics) {
this.loadOptions = loadOptions;
this.struct = struct;
this.loadDistributeMetrics = loadDistributeMetrics;
}
- public AbstractDirectLoader(LoadOptions loadOptions,InputStruct struct) {
+
+ public AbstractDirectLoader(LoadOptions loadOptions, InputStruct struct) {
this.loadOptions = loadOptions;
this.struct = struct;
}
- public void flushPermission(Configuration conf, String path){
+ public void flushPermission(Configuration conf, String path) {
FsShell shell = new FsShell(conf);
try {
LOG.info("shell start execute");
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/DirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/DirectLoader.java
index cf15f9569..bb05d6c5e 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/DirectLoader.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/DirectLoader.java
@@ -17,69 +17,16 @@
package org.apache.hugegraph.loader.direct.loader;
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hugegraph.loader.builder.EdgeBuilder;
-import org.apache.hugegraph.loader.builder.ElementBuilder;
-import org.apache.hugegraph.loader.builder.VertexBuilder;
-import org.apache.hugegraph.loader.executor.LoadContext;
-import org.apache.hugegraph.loader.executor.LoadOptions;
-import org.apache.hugegraph.loader.mapping.EdgeMapping;
-import org.apache.hugegraph.loader.mapping.InputStruct;
-import org.apache.hugegraph.loader.mapping.VertexMapping;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-public abstract class DirectLoader implements Serializable {
-
- LoadOptions loadOptions;
- InputStruct struct;
-
- public DirectLoader(LoadOptions loadOptions,
- InputStruct struct) {
- this.loadOptions = loadOptions;
- this.struct = struct;
- }
-
- public final void bulkload(Dataset ds) {
- JavaPairRDD javaPairRDD = buildVertexAndEdge(ds);
- String path = generateFiles(javaPairRDD);
- loadFiles(path);
- }
-
- protected List getElementBuilders() {
- LoadContext context = new LoadContext(loadOptions);
- context.schemaCache().updateAll();
- List buildersForGraphElement = new LinkedList<>();
- for (VertexMapping vertexMapping : struct.vertices()) {
- buildersForGraphElement.add(new VertexBuilder(context, struct, vertexMapping));
- }
- for (EdgeMapping edgeMapping : struct.edges()) {
- buildersForGraphElement.add(new EdgeBuilder(context, struct, edgeMapping));
- }
- context.close();
- return buildersForGraphElement;
- }
-
- protected List getElementBuilders(LoadContext context) {
- context.schemaCache().updateAll();
- List buildersForGraphElement = new LinkedList<>();
- for (VertexMapping vertexMapping : struct.vertices()) {
- buildersForGraphElement.add(new VertexBuilder(context, struct, vertexMapping));
- }
- for (EdgeMapping edgeMapping : struct.edges()) {
- buildersForGraphElement.add(new EdgeBuilder(context, struct, edgeMapping));
- }
- context.close();// 关闭了hugeclient
- return buildersForGraphElement;
- }
+public interface DirectLoader {
+ JavaPairRDD buildVertexAndEdge(Dataset ds);
- abstract JavaPairRDD buildVertexAndEdge(Dataset ds);
+ String generateFiles(JavaPairRDD buildAndSerRdd);
- abstract String generateFiles(JavaPairRDD buildAndSerRdd);
+ void loadFiles(String path);
- abstract void loadFiles(String path);
+ void bulkload(Dataset ds);
}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java
index 0ba32fe37..7d9ade08f 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java
@@ -1,33 +1,8 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
package org.apache.hugegraph.loader.direct.loader;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.SecureRandom;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -38,14 +13,13 @@
import org.apache.hugegraph.loader.builder.ElementBuilder;
import org.apache.hugegraph.loader.constant.Constants;
import org.apache.hugegraph.loader.direct.util.SinkToHBase;
+import org.apache.hugegraph.loader.executor.LoadContext;
import org.apache.hugegraph.loader.executor.LoadOptions;
-import org.apache.hugegraph.loader.mapping.ElementMapping;
import org.apache.hugegraph.loader.mapping.InputStruct;
import org.apache.hugegraph.loader.metrics.DistributedLoadMetrics;
-import org.apache.hugegraph.loader.metrics.LoadDistributeMetrics;
import org.apache.hugegraph.loader.util.HugeClientHolder;
+import org.apache.hugegraph.serializer.GraphElementSerializer;
import org.apache.hugegraph.serializer.direct.HBaseSerializer;
-import org.apache.hugegraph.structure.GraphElement;
import org.apache.hugegraph.structure.graph.Edge;
import org.apache.hugegraph.structure.graph.Vertex;
import org.apache.hugegraph.util.Log;
@@ -55,14 +29,18 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
-
import scala.Tuple2;
-public class HBaseDirectLoader extends DirectLoader {
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+public class HBaseDirectLoader extends AbstractDirectLoader {
private SinkToHBase sinkToHBase;
- private DistributedLoadMetrics loadDistributeMetrics;
-
private static final int RANDOM_VALUE1;
private static final short RANDOM_VALUE2;
private static final AtomicInteger NEXT_COUNTER;
@@ -126,44 +104,15 @@ public static String fileID() {
return Bytes.toHex(byteBuffer.array());
}
- public HBaseDirectLoader(LoadOptions loadOptions,
- InputStruct struct,
- DistributedLoadMetrics loadDistributeMetrics) {
- super(loadOptions, struct);
- this.loadDistributeMetrics = loadDistributeMetrics;
- this.sinkToHBase = new SinkToHBase(loadOptions);
-
- }
-
- public String getTableName() {
-
- String tableName = null;
- if (struct.edges().size() > 0) {
- tableName = this.loadOptions.edgeTableName;
-
- } else if (struct.vertices().size() > 0) {
- tableName = this.loadOptions.vertexTableName;
-
- }
- return tableName;
- }
-
- public Integer getTablePartitions() {
- return struct.edges().size() > 0 ?
- loadOptions.edgePartitions : loadOptions.vertexPartitions;
- }
-
@Override
public JavaPairRDD buildVertexAndEdge(Dataset ds) {
LOG.info("Start build vertexes and edges");
- JavaPairRDD tuple2KeyValueJavaPairRDD;
- tuple2KeyValueJavaPairRDD = ds.toJavaRDD().mapPartitionsToPair(
+ return ds.toJavaRDD().mapPartitionsToPair(
(PairFlatMapFunction, ImmutableBytesWritable, KeyValue>) rowIter -> {
- HBaseSerializer ser;
- ser = new HBaseSerializer(HugeClientHolder.create(loadOptions),
- loadOptions.vertexPartitions,
- loadOptions.edgePartitions);
- List buildersForGraphElement = getElementBuilders();
+ HBaseSerializer ser = new HBaseSerializer(HugeClientHolder.create(loadOptions), loadOptions.vertexPartitions, loadOptions.edgePartitions);
+ LoadContext loaderContext = new LoadContext(super.loadOptions);
+ loaderContext.init(struct);// 准备好elementBuilder以及schema数据的更新
+ List buildersForGraphElement = getElementBuilders(loaderContext);
List> result = new LinkedList<>();
while (rowIter.hasNext()) {
Row row = rowIter.next();
@@ -175,29 +124,23 @@ public JavaPairRDD buildVertexAndEdge(Dataset<
return result.iterator();
}
);
- return tuple2KeyValueJavaPairRDD;
}
@Override
- String generateFiles(JavaPairRDD buildAndSerRdd) {
+ public String generateFiles(JavaPairRDD buildAndSerRdd) {
LOG.info("Start to generate hfile");
try {
- Tuple2 tuple =
- sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
+ Tuple2 tuple = sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
Partitioner partitioner = tuple._1;
TableDescriptor tableDescriptor = tuple._2;
- JavaPairRDD repartitionedRdd =
- buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
+ JavaPairRDD repartitionedRdd = buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
Configuration conf = sinkToHBase.getHBaseConfiguration().get();
Job job = Job.getInstance(conf);
HFileOutputFormat2.configureIncrementalLoadMap(job, tableDescriptor);
- conf.set("hbase.mapreduce.hfileoutputformat.table.name",
- tableDescriptor.getTableName().getNameAsString());
+ conf.set("hbase.mapreduce.hfileoutputformat.table.name", tableDescriptor.getTableName().getNameAsString());
String path = getHFilePath(job.getConfiguration());
- repartitionedRdd.saveAsNewAPIHadoopFile(path, ImmutableBytesWritable.class,
- KeyValue.class, HFileOutputFormat2.class,
- conf);
+ repartitionedRdd.saveAsNewAPIHadoopFile(path, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, conf);
LOG.info("Saved HFiles to: '{}'", path);
flushPermission(conf, path);
return path;
@@ -207,110 +150,64 @@ String generateFiles(JavaPairRDD buildAndSerRd
return Constants.EMPTY_STR;
}
- public String getHFilePath(Configuration conf) throws IOException {
- FileSystem fs = FileSystem.get(conf);
- String fileID = fileID();
- String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" + "/" + fileID + "/";
- Path hfileGenPath = new Path(pathStr);
- if (fs.exists(hfileGenPath)) {
- LOG.info("\n Delete the path where the hfile is generated,path {} ", pathStr);
- fs.delete(hfileGenPath, true);
- }
- return pathStr;
- }
-
@Override
public void loadFiles(String path) {
try {
- // BulkLoad HFile to HBase
sinkToHBase.loadHfiles(path, getTableName());
} catch (Exception e) {
- LOG.error(" Failed to load hfiles", e);
+ LOG.error("Failed to load hfiles", e);
}
}
- private void flushPermission(Configuration conf, String path) {
- FsShell shell = new FsShell(conf);
- try {
- LOG.info("Chmod hfile directory permission");
- shell.run(new String[]{"-chmod", "-R", "777", path});
- shell.close();
- } catch (Exception e) {
- LOG.error("Couldn't change the file permissions " + e + " Please run command:" +
- "hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles " + path +
- " '" + "test" + "'\n" + " to load generated HFiles into HBase table");
- }
+ private String getTableName() {
+ return struct.edges().size() > 0 ? loadOptions.edgeTableName : loadOptions.vertexTableName;
}
- List> buildAndSer(HBaseSerializer serializer, Row row,
- List builders) {
- List elementsElement;
- List> result = new LinkedList<>();
+ private Integer getTablePartitions() {
+ return struct.edges().size() > 0 ? loadOptions.edgePartitions : loadOptions.vertexPartitions;
+ }
- for (ElementBuilder builder : builders) {
- ElementMapping elementMapping = builder.mapping();
- if (elementMapping.skip()) {
- continue;
- }
- if ("".equals(row.mkString())) {
- break;
- }
- switch (struct.input().type()) {
- case FILE:
- case HDFS:
- elementsElement = builder.build(row);
- break;
- default:
- throw new AssertionError(String.format("Unsupported input source '%s'",
- struct.input().type()));
- }
+ private String getHFilePath(Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ String fileID = fileID();
+ String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" + "/" + fileID + "/";
+ Path hfileGenPath = new Path(pathStr);
+ if (fs.exists(hfileGenPath)) {
+ LOG.info("\n Delete the path where the hfile is generated,path {} ", pathStr);
+ fs.delete(hfileGenPath, true);
+ }
+ return pathStr;
+ }
- boolean isVertex = builder.mapping().type().isVertex();
- if (isVertex) {
- for (Vertex vertex : (List) (Object) elementsElement) {
- LOG.debug("vertex already build done {} ", vertex.toString());
- Tuple2 tuple2 =
- vertexSerialize(serializer, vertex);
- loadDistributeMetrics.vertexMetrics().get(builder.mapping().label()).plusDisParseSuccess(1L);
- loadDistributeMetrics.vertexMetrics().get(builder.mapping().label()).plusDisInsertSuccess(1L);
- result.add(tuple2);
- }
- } else {
- for (Edge edge : (List) (Object) elementsElement) {
- LOG.debug("edge already build done {}", edge.toString());
- Tuple2 tuple2 =
- edgeSerialize(serializer, edge);
- loadDistributeMetrics.edgeMetrics().get(builder.mapping().label()).plusDisParseSuccess(1L);
- loadDistributeMetrics.edgeMetrics().get(builder.mapping().label()).plusDisInsertSuccess(1L);
- result.add(tuple2);
+ public HBaseDirectLoader(LoadOptions loadOptions, InputStruct struct, DistributedLoadMetrics loadDistributeMetrics) {
+ super(loadOptions, struct, loadDistributeMetrics);
+ this.sinkToHBase = new SinkToHBase(loadOptions);
+ }
- }
- }
- }
- return result;
+ public HBaseDirectLoader(LoadOptions loadOptions, InputStruct struct) {
+ super(loadOptions, struct);
+ this.sinkToHBase = new SinkToHBase(loadOptions);
}
- private Tuple2 edgeSerialize(HBaseSerializer serializer,
- Edge edge) {
- LOG.debug("edge start serialize {}", edge.toString());
- byte[] rowkey = serializer.getKeyBytes(edge)._1;
- byte[] values = serializer.getValueBytes(edge);
+ @Override
+ protected Tuple2 vertexSerialize(GraphElementSerializer serializer, Vertex vertex) {
+ LOG.debug("vertex start serialize {}", vertex.toString());
+ byte[] rowkey = serializer.getKeyBytes(vertex)._1;
+ byte[] values = serializer.getValueBytes(vertex);
ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
rowKey.set(rowkey);
- KeyValue keyValue = new KeyValue(rowkey, Bytes.toBytes(Constants.HBASE_COL_FAMILY),
- Bytes.toBytes(Constants.EMPTY_STR), values);
+ KeyValue keyValue = new KeyValue(rowkey, Bytes.toBytes(Constants.HBASE_COL_FAMILY), Bytes.toBytes(Constants.EMPTY_STR), values);
return new Tuple2<>(rowKey, keyValue);
}
- private Tuple2 vertexSerialize(HBaseSerializer serializer,
- Vertex vertex) {
- LOG.debug("vertex start serialize {}", vertex.toString());
- byte[] rowkey = serializer.getKeyBytes(vertex)._1;
- byte[] values = serializer.getValueBytes(vertex);
+ @Override
+ protected Tuple2 edgeSerialize(GraphElementSerializer serializer, Edge edge) {
+ LOG.debug("edge start serialize {}", edge.toString());
+ byte[] rowkey = serializer.getKeyBytes(edge)._1;
+ byte[] values = serializer.getValueBytes(edge);
ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
rowKey.set(rowkey);
- KeyValue keyValue = new KeyValue(rowkey, Bytes.toBytes(Constants.HBASE_COL_FAMILY),
- Bytes.toBytes(Constants.EMPTY_STR), values);
+ KeyValue keyValue = new KeyValue(rowkey, Bytes.toBytes(Constants.HBASE_COL_FAMILY), Bytes.toBytes(Constants.EMPTY_STR), values);
return new Tuple2<>(rowKey, keyValue);
}
}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HStoreDirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HStoreDirectLoader.java
index d8cd209f5..af1217b89 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HStoreDirectLoader.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HStoreDirectLoader.java
@@ -1,71 +1,51 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
package org.apache.hugegraph.loader.direct.loader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hugegraph.client.RestClient;
import org.apache.hugegraph.loader.builder.ElementBuilder;
+import org.apache.hugegraph.loader.direct.outputformat.SSTFileOutputFormat;
+import org.apache.hugegraph.loader.direct.partitioner.HstorePartitioner;
import org.apache.hugegraph.loader.executor.LoadContext;
import org.apache.hugegraph.loader.executor.LoadOptions;
-import org.apache.hugegraph.loader.mapping.ElementMapping;
import org.apache.hugegraph.loader.mapping.InputStruct;
import org.apache.hugegraph.loader.metrics.DistributedLoadMetrics;
+import org.apache.hugegraph.rest.RestClientConfig;
+import org.apache.hugegraph.rest.RestResult;
import org.apache.hugegraph.serializer.GraphElementSerializer;
-import org.apache.hugegraph.structure.GraphElement;
import org.apache.hugegraph.structure.graph.Edge;
import org.apache.hugegraph.structure.graph.Vertex;
-import org.apache.hugegraph.util.Log;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.slf4j.Logger;
import scala.Tuple2;
+import java.io.IOException;
import java.io.Serializable;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-public class HStoreDirectLoader extends DirectLoader,byte[]> {
-
- private DistributedLoadMetrics loadDistributeMetrics;
+import java.util.*;
+import static org.apache.hugegraph.serializer.direct.HStoreSerializer.processAddresses;
- public static final Logger LOG = Log.logger(HStoreDirectLoader.class);
+public class HStoreDirectLoader extends AbstractDirectLoader, byte[]> {
+ public HStoreDirectLoader(LoadOptions loadOptions, InputStruct struct, DistributedLoadMetrics loadDistributeMetrics) {
+ super(loadOptions, struct, loadDistributeMetrics);
+ }
- public HStoreDirectLoader(LoadOptions loadOptions,
- InputStruct struct,
- DistributedLoadMetrics loadDistributeMetrics) {
+ public HStoreDirectLoader(LoadOptions loadOptions, InputStruct struct) {
super(loadOptions, struct);
- this.loadDistributeMetrics = loadDistributeMetrics;
}
-
@Override
public JavaPairRDD, byte[]> buildVertexAndEdge(Dataset ds) {
-
LOG.info("Start build vertexes and edges");
- JavaPairRDD, byte[]> tuple2JavaPairRDD = ds.toJavaRDD().mapPartitionsToPair(
+ return ds.toJavaRDD().mapPartitionsToPair(
(PairFlatMapFunction, Tuple2, byte[]>) rowIter -> {
- // 完成了schema数据的准备,以及pdclient的每个分区的创建
LoadContext loaderContext = new LoadContext(super.loadOptions);
- loaderContext.init(struct);// 准备好elementBuilder以及schema数据的更新
+ loaderContext.init(struct);
List buildersForGraphElement = getElementBuilders(loaderContext);
List, byte[]>> result = new LinkedList<>();
while (rowIter.hasNext()) {
@@ -77,90 +57,115 @@ public JavaPairRDD, byte[]> buildVertexAndEdge(Dataset, byte[]> buildAndSerRdd) {
- return "";
- }
-
-
-
-
+ public String generateFiles(JavaPairRDD, byte[]> buildAndSerRdd) {
+ LOG.info("bulkload start execute>>>");
+ try {
+ // 自定义分区,
+ JavaPairRDD, byte[]> tuple2JavaPairRDD = buildAndSerRdd.partitionBy(new HstorePartitioner(loadOptions.vertexPartitions));
+ // 丢弃partId
+ JavaPairRDD javaPairRDD = tuple2JavaPairRDD.mapToPair(tuple2 -> new Tuple2<>(tuple2._1._1, tuple2._2));
+ JavaPairRDD sortedRdd = javaPairRDD.mapPartitionsToPair(iterator -> {
+ List> partitionData = new ArrayList<>();
+ iterator.forEachRemaining(partitionData::add);
+ Collections.sort(partitionData, new HStoreDirectLoader.TupleComparator());
+ return partitionData.iterator();
+ });
+
+
+ Configuration hadoopConf = new Configuration();
+ String sstFilePath = getSSTFilePath(hadoopConf);
+ LOG.info("SSTFile生成的hdfs路径:{}", sstFilePath);
+ sortedRdd.saveAsNewAPIHadoopFile(
+ sstFilePath,
+ byte[].class,
+ byte[].class,
+ SSTFileOutputFormat.class,
+ hadoopConf
+ );
+ flushPermission(hadoopConf, sstFilePath);
+ return sstFilePath;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
- @Override
- protected void loadFiles(String path) {
+ return null;
}
-
- List, byte[]>> buildAndSer(GraphElementSerializer serializer, Row row,
- List builders) {
- List elementsElement;
- List, byte[]>> result = new LinkedList<>();
-
- for (ElementBuilder builder : builders) {
- ElementMapping elementMapping = builder.mapping();
- if (elementMapping.skip()) {
- continue;
- }
- if ("".equals(row.mkString())) {
+ @Override
+ public void loadFiles(String sstFilePath) {
+ RestClientConfig config = RestClientConfig.builder()
+ .connectTimeout(5 * 1000) // 连接超时时间 5s
+ .readTimeout(60 * 60 * 1000) // 读取超时时间 1h
+ .maxConns(10) // 最大连接数
+ .build();
+
+ BulkloadInfo bulkloadInfo = new BulkloadInfo(loadOptions.graph, sstFilePath.replace("TXY-HDP11", "txy-hn1-bigdata-hdp11-nn-prd-02.myhll.cn:8020"), getBulkloadType());
+ String[] urls = processAddresses(loadOptions.pdAddress, loadOptions.pdRestPort);
+
+ for (String url : urls) {
+ LOG.info("submit bulkload task to {}, bulkloadInfo:{}", url, bulkloadInfo);
+ RestClient client = null;
+ try {
+ // 创建RestClient对象
+ client = new RestClient(url, config);
+ // 获取响应状态码
+ RestResult restResult = client.post("v1/task/bulkload", bulkloadInfo);
+ Map resMap = restResult.readObject(Map.class);
+ LOG.info("Response :{} ", resMap);
+ // 如果成功,退出循环
break;
- }
- switch (struct.input().type()) {
- case FILE:
- case HDFS:
- elementsElement = builder.build(row);
- break;
- default:
- throw new AssertionError(String.format("Unsupported input source '%s'",
- struct.input().type()));
- }
-
- boolean isVertex = builder.mapping().type().isVertex();
- if (isVertex) {
- for (Vertex vertex : (List) (Object) elementsElement) {
- LOG.debug("vertex already build done {} ", vertex.toString());
- Tuple2, byte[]> tuple2 =vertexSerialize(serializer, vertex);
- //mapping.label();
- String label = builder.mapping().label();
- loadDistributeMetrics.vertexMetrics().get(builder.mapping().label()).plusDisParseSuccess(1L);
- loadDistributeMetrics.vertexMetrics().get(builder.mapping().label()).plusDisInsertSuccess(1L);
-
- result.add(tuple2);
- }
- } else {
- for (Edge edge : (List) (Object) elementsElement) {
- LOG.debug("edge already build done {}", edge.toString());
- Tuple2, byte[]> tuple2 =edgeSerialize(serializer, edge);
- loadDistributeMetrics.edgeMetrics().get(builder.mapping().label()).plusDisParseSuccess(1L);
- loadDistributeMetrics.edgeMetrics().get(builder.mapping().label()).plusDisInsertSuccess(1L);
- result.add(tuple2);
-
+ } catch (Exception e) {
+ LOG.error("Failed to submit bulkload task", e);
+ break;
+ } finally {
+ // 确保RestClient被关闭
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close RestClient", e);
+ }
}
}
}
- return result;
}
- private Tuple2, byte[]> edgeSerialize(GraphElementSerializer serializer, Edge edge) {
+ @Override
+ protected Tuple2, byte[]> vertexSerialize(GraphElementSerializer serializer, Vertex vertex) {
+ LOG.debug("vertex start serialize {}", vertex.toString());
+ Tuple2 keyBytes = serializer.getKeyBytes(vertex);
+ byte[] values = serializer.getValueBytes(vertex);
+ return new Tuple2<>(keyBytes, values);
+ }
+
+ @Override
+ protected Tuple2, byte[]> edgeSerialize(GraphElementSerializer serializer, Edge edge) {
LOG.debug("edge start serialize {}", edge.toString());
Tuple2 keyBytes = serializer.getKeyBytes(edge);
byte[] values = serializer.getValueBytes(edge);
-
return new Tuple2<>(keyBytes, values);
}
- private Tuple2, byte[]> vertexSerialize(GraphElementSerializer serializer,
- Vertex vertex) {
- LOG.debug("vertex start serialize {}", vertex.toString());
- Tuple2 keyBytes = serializer.getKeyBytes(vertex);
- byte[] values = serializer.getValueBytes(vertex);
- return new Tuple2<>(keyBytes, values);
+ private BulkloadInfo.LoadType getBulkloadType() {
+ return struct.edges().size() > 0 ? BulkloadInfo.LoadType.EDGE : BulkloadInfo.LoadType.VERTEX;
}
+ private String getSSTFilePath(Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ long timeStr = System.currentTimeMillis();
+ String pathStr = fs.getWorkingDirectory().toString() + "/hg-1_5/gen-sstfile" + "/" + timeStr + "/";//sstFile 存储路径
+ org.apache.hadoop.fs.Path hfileGenPath = new Path(pathStr);
+ if (fs.exists(hfileGenPath)) {
+ LOG.info("\n delete sstFile path \n");
+ fs.delete(hfileGenPath, true);
+ }
+// fs.close();
+ return pathStr;
+ }
static class TupleComparator implements Comparator>, Serializable {
@Override
@@ -178,4 +183,47 @@ private int compareByteArrays(byte[] a, byte[] b) {
return Integer.compare(a.length, b.length);
}
}
+
+ static class BulkloadInfo {
+ String graphName;
+ String tableName;
+ String hdfsPath;
+
+ public BulkloadInfo(String graphName, String path, LoadType loadType) {
+ this.graphName = processGraphName(graphName);
+ this.tableName = processTableName(graphName, loadType);
+ this.hdfsPath = path;
+ }
+
+ private String processGraphName(String graphName) {
+ return graphName + "/g";
+ }
+
+ private String processTableName(String graphName, LoadType loadType) {
+ if (loadType == LoadType.VERTEX) {
+ return "g+v";
+ } else if (loadType == LoadType.EDGE) {
+ return "g+oe";
+ } else {
+ throw new IllegalArgumentException("Invalid loadType: " + loadType);
+ }
+ }
+
+
+ @Override
+ public String toString() {
+ return "BulkloadInfo{" +
+ "graphName='" + graphName + '\'' +
+ ", tableName='" + tableName + '\'' +
+ ", hdfsPath='" + hdfsPath + '\'' +
+ '}';
+ }
+
+ enum LoadType {
+ VERTEX,
+ EDGE
+ }
+
+ }
+
}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/DirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/DirectLoader.java
deleted file mode 100644
index df33ae230..000000000
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/DirectLoader.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hugegraph.loader.direct.loaders;
-
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-
-public interface DirectLoader {
- JavaPairRDD buildVertexAndEdge(Dataset ds);
- String generateFiles(JavaPairRDD buildAndSerRdd);
- void loadFiles(String path);
- void bulkload(Dataset ds);
-}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/HBaseDirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/HBaseDirectLoader.java
deleted file mode 100644
index 625a4c3c6..000000000
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loaders/HBaseDirectLoader.java
+++ /dev/null
@@ -1,228 +0,0 @@
-package org.apache.hugegraph.loader.direct.loaders;
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hugegraph.loader.builder.ElementBuilder;
-import org.apache.hugegraph.loader.constant.Constants;
-import org.apache.hugegraph.loader.direct.util.SinkToHBase;
-import org.apache.hugegraph.loader.executor.LoadContext;
-import org.apache.hugegraph.loader.executor.LoadOptions;
-import org.apache.hugegraph.loader.mapping.InputStruct;
-import org.apache.hugegraph.loader.metrics.DistributedLoadMetrics;
-import org.apache.hugegraph.loader.util.HugeClientHolder;
-import org.apache.hugegraph.serializer.GraphElementSerializer;
-import org.apache.hugegraph.serializer.direct.HBaseSerializer;
-import org.apache.hugegraph.structure.graph.Edge;
-import org.apache.hugegraph.structure.graph.Vertex;
-import org.apache.hugegraph.util.Log;
-import org.apache.spark.Partitioner;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.slf4j.Logger;
-import scala.Tuple2;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.SecureRandom;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class HBaseDirectLoader extends AbstractDirectLoader {
-
- private SinkToHBase sinkToHBase;
-
- private static final int RANDOM_VALUE1;
- private static final short RANDOM_VALUE2;
- private static final AtomicInteger NEXT_COUNTER;
-
- public static final Logger LOG = Log.logger(org.apache.hugegraph.loader.direct.loader.HBaseDirectLoader.class);
-
- static {
- try {
- SecureRandom secureRandom = new SecureRandom();
- RANDOM_VALUE1 = secureRandom.nextInt(0x01000000);
- RANDOM_VALUE2 = (short) secureRandom.nextInt(0x00008000);
- NEXT_COUNTER = new AtomicInteger(new SecureRandom().nextInt());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private static byte int3(final int x) {
- return (byte) (x >> 24);
- }
-
- private static byte int2(final int x) {
- return (byte) (x >> 16);
- }
-
- private static byte int1(final int x) {
- return (byte) (x >> 8);
- }
-
- private static byte int0(final int x) {
- return (byte) (x);
- }
-
- private static byte short1(final short x) {
- return (byte) (x >> 8);
- }
-
- private static byte short0(final short x) {
- return (byte) (x);
- }
-
- public static String fileID() {
- long timeStamp = System.currentTimeMillis() / 1000;
- ByteBuffer byteBuffer = ByteBuffer.allocate(12);
-
- byteBuffer.put(int3((int) timeStamp));
- byteBuffer.put(int2((int) timeStamp));
- byteBuffer.put(int1((int) timeStamp));
- byteBuffer.put(int0((int) timeStamp));
-
- byteBuffer.put(int2(RANDOM_VALUE1));
- byteBuffer.put(int1(RANDOM_VALUE1));
- byteBuffer.put(int0(RANDOM_VALUE1));
- byteBuffer.put(short1(RANDOM_VALUE2));
- byteBuffer.put(short0(RANDOM_VALUE2));
-
- byteBuffer.put(int2(NEXT_COUNTER.incrementAndGet()));
- byteBuffer.put(int1(NEXT_COUNTER.incrementAndGet()));
- byteBuffer.put(int0(NEXT_COUNTER.incrementAndGet()));
-
- return Bytes.toHex(byteBuffer.array());
- }
-
- public HBaseDirectLoader(LoadOptions loadOptions, InputStruct struct, DistributedLoadMetrics loadDistributeMetrics) {
- super(loadOptions, struct, loadDistributeMetrics);
- this.sinkToHBase = new SinkToHBase(loadOptions);
- }
-
- public HBaseDirectLoader(LoadOptions loadOptions,InputStruct struct) {
- super(loadOptions,struct);
- this.sinkToHBase = new SinkToHBase(loadOptions);
- }
-
- @Override
- public JavaPairRDD buildVertexAndEdge(Dataset