diff --git a/.classpath b/.classpath
index 12d18c5..34d58ad 100644
--- a/.classpath
+++ b/.classpath
@@ -8,9 +8,9 @@
+
-
@@ -18,7 +18,7 @@
-
+
diff --git a/src/main/java/com/github/piedpiper/common/PiedPiperConstants.java b/src/main/java/com/github/piedpiper/common/PiedPiperConstants.java
index 438f325..38bc252 100644
--- a/src/main/java/com/github/piedpiper/common/PiedPiperConstants.java
+++ b/src/main/java/com/github/piedpiper/common/PiedPiperConstants.java
@@ -21,7 +21,9 @@ public class PiedPiperConstants {
public static final String ALMIGHTY_TABLE_RANGE_KEY = "rangeKey";
public static final String ALMIGHTY_TABLE_NAME = "AlmightyTable";
-
+
+ public static final String VERSION = "version";
+
public static final String VALUE = "value";
public static final String AS_JSON = "asJson";
@@ -63,6 +65,8 @@ public class PiedPiperConstants {
public static String REQUIRED_PARAM_ERROR_FORMAT = "parameter: %s required but not present";
public static final String HASH_KEY_QUERY = "hashKey = :hashKey";
+
+ public static final String VERSION_QUERY = "hashKey = :hashKey AND rangeKey > :rangeKey ";
public static final String JWT_VERIFY = "JWT_VERIFY";
diff --git a/src/main/java/com/github/piedpiper/guice/PiedPiperModule.java b/src/main/java/com/github/piedpiper/guice/PiedPiperModule.java
index 168de47..4b7b8fe 100644
--- a/src/main/java/com/github/piedpiper/guice/PiedPiperModule.java
+++ b/src/main/java/com/github/piedpiper/guice/PiedPiperModule.java
@@ -22,6 +22,8 @@
import com.github.piedpiper.node.aws.AWSLambdaFactory;
import com.github.piedpiper.node.aws.AWSLambdaNode;
import com.github.piedpiper.node.aws.ILambdaFactory;
+import com.github.piedpiper.storage.DynamoDBStorage;
+import com.github.piedpiper.storage.IGraphStorage;
import com.github.piedpiper.utils.ParameterUtils;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@@ -50,6 +52,7 @@ protected void configure() {
bind(Function.class).annotatedWith(Names.named(NodeListQueryHandler.class.getName()))
.to(NodeListQueryHandler.class);
bind(ILambdaFactory.class).to(AWSLambdaFactory.class);
+ bind(IGraphStorage.class).to(DynamoDBStorage.class);
bind(AWSLambdaNode.class).annotatedWith(Names.named(PiedPiperConstants.SEARCH_GRAPH_LAMBDA_NODE_NAME))
.toInstance(new AWSLambdaNode());
bind(String.class).annotatedWith(Names.named(PiedPiperConstants.PROD_SEARCH_ENDPOINT))
diff --git a/src/main/java/com/github/piedpiper/storage/DynamoDBStorage.java b/src/main/java/com/github/piedpiper/storage/DynamoDBStorage.java
new file mode 100644
index 0000000..d5cbb52
--- /dev/null
+++ b/src/main/java/com/github/piedpiper/storage/DynamoDBStorage.java
@@ -0,0 +1,103 @@
+package com.github.piedpiper.storage;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import com.amazonaws.regions.Regions;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.github.commons.log.ILogger;
+import com.github.commons.utils.JsonUtils;
+import com.github.piedpiper.common.PiedPiperConstants;
+import com.github.piedpiper.node.NodeInput;
+import com.github.piedpiper.node.NodeOutput;
+import com.github.piedpiper.node.aws.AWSNode;
+import com.github.piedpiper.node.aws.dynamo.DynamoDBBaseNode;
+import com.github.piedpiper.node.aws.dynamo.DynamoDBReaderNode;
+import com.github.piedpiper.utils.ParameterUtils;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
+import com.google.inject.name.Named;
+import com.google.inject.name.Names;
+
+public class DynamoDBStorage implements IGraphStorage{
+
+ private Injector injector;
+ private ILogger logger;
+
+ @Override
+ public void setInjector(Injector injector) {
+ this.injector = injector;
+ }
+
+ @Override
+ public void setLogger(ILogger logger) {
+ this.logger = logger;
+ }
+
+
+ @Override
+ public NodeOutput getGraphs() throws InterruptedException, ExecutionException {
+ DynamoDBReaderNode readerNode = injector.getInstance(DynamoDBReaderNode.class);
+ readerNode.setILogger(this.logger);
+ readerNode.setInjector(this.injector);
+ NodeOutput dynamoRecordOutput = (NodeOutput) readerNode.apply(getSearchNodeInput());
+ return dynamoRecordOutput;
+ }
+
+
+ private NodeInput getSearchNodeInput() throws InterruptedException, ExecutionException{
+
+ ExecutorService executor = injector.getInstance(ExecutorService.class);
+ LoadingCache cacheLoader = injector
+ .getInstance(Key.get(new TypeLiteral>() {
+ }, Names.named(PiedPiperConstants.AWS_SSM_CACHE)));
+ java.util.concurrent.Future futureAccessKey = executor.submit(new Callable() {
+
+ @Override
+ public String call() throws Exception {
+ return cacheLoader.get(PiedPiperConstants.DEFAULT_ACCESS_KEY);
+ }
+
+ });
+ java.util.concurrent.Future futureSecretKey = executor.submit(new Callable() {
+
+ @Override
+ public String call() throws Exception {
+ return cacheLoader.get(PiedPiperConstants.DEFAULT_SECRET_KEY);
+ }
+
+ });
+
+ String accessKey = futureAccessKey.get();
+ String secretKey = futureSecretKey.get();
+
+
+ String tableName = "AlmightyTable";
+
+ ObjectNode queryNode = JsonUtils.mapper.createObjectNode();
+ queryNode.set(AWSNode.ACCESS_KEY.getParameterName(), ParameterUtils.createParamValueNode(accessKey));
+ queryNode.set(AWSNode.SECRET_KEY.getParameterName(), ParameterUtils.createParamValueNode(secretKey));
+ queryNode.set(DynamoDBBaseNode.TABLE_NAME.getParameterName(), ParameterUtils.createParamValueNode(tableName));
+ queryNode.set(AWSNode.REGION.getParameterName(),
+ ParameterUtils.createParamValueNode(Regions.US_EAST_1.getName()));
+
+ augmentQueryNodeWithFilterQuery(queryNode);
+
+ NodeInput nodeInput = new NodeInput();
+ nodeInput.setInput(queryNode);
+ return nodeInput;
+ }
+
+ private void augmentQueryNodeWithFilterQuery(ObjectNode queryNode) {
+ String keyQuery = PiedPiperConstants.HASH_KEY_QUERY;
+ queryNode.set(DynamoDBReaderNode.KEY_QUERY_EXPRESSION.getParameterName(),
+ ParameterUtils.createParamValueNode(keyQuery));
+ queryNode.set(ParameterUtils.getDynamoParamPlaceHolderName(PiedPiperConstants.ALMIGHTY_TABLE_HASH_KEY),
+ ParameterUtils.createParamValueNode(PiedPiperConstants.GRAPH));
+
+ }
+}
diff --git a/src/main/java/com/github/piedpiper/storage/IGraphStorage.java b/src/main/java/com/github/piedpiper/storage/IGraphStorage.java
new file mode 100644
index 0000000..9a6999a
--- /dev/null
+++ b/src/main/java/com/github/piedpiper/storage/IGraphStorage.java
@@ -0,0 +1,14 @@
+package com.github.piedpiper.storage;
+
+import java.util.concurrent.ExecutionException;
+
+import com.github.commons.log.ILogger;
+import com.github.piedpiper.node.NodeOutput;
+import com.google.inject.Injector;
+
+public interface IGraphStorage {
+
+ public void setInjector(Injector injector);
+ public NodeOutput getGraphs() throws InterruptedException, ExecutionException;
+ public void setLogger(ILogger logger);
+}
diff --git a/src/main/java/com/github/piedpiper/transformer/SaveGraphFunction.java b/src/main/java/com/github/piedpiper/transformer/SaveGraphFunction.java
index c69f8d2..9f8707e 100644
--- a/src/main/java/com/github/piedpiper/transformer/SaveGraphFunction.java
+++ b/src/main/java/com/github/piedpiper/transformer/SaveGraphFunction.java
@@ -1,5 +1,6 @@
package com.github.piedpiper.transformer;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
@@ -8,14 +9,17 @@
import com.amazonaws.regions.Regions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.commons.log.ILogger;
import com.github.commons.utils.JsonUtils;
import com.github.piedpiper.common.GraphUtils;
import com.github.piedpiper.common.PiedPiperConstants;
import com.github.piedpiper.node.NodeInput;
+import com.github.piedpiper.node.NodeOutput;
import com.github.piedpiper.node.aws.AWSNode;
import com.github.piedpiper.node.aws.dynamo.DynamoDBBaseNode;
+import com.github.piedpiper.node.aws.dynamo.DynamoDBReaderNode;
import com.github.piedpiper.node.aws.dynamo.DynamoDBWriterNode;
import com.github.piedpiper.utils.ParameterUtils;
import com.google.common.cache.LoadingCache;
@@ -40,21 +44,101 @@ public JsonNode apply(JsonNode inputJson) {
try {
DynamoDBWriterNode writerNode = injector.getInstance(DynamoDBWriterNode.class);
writerNode.setILogger(logger);
- return JsonUtils.mapper.valueToTree(writerNode.apply(getNodeInput(inputJson)));
+ String method = Optional.ofNullable(inputJson).map(methodInput->methodInput.get("method")).map(value->value.asText()).orElse("Save");
+ switch(method) {
+ case("PublishVersion"):
+ return JsonUtils.mapper.valueToTree(writerNode.apply(getPublishVersionNodeInput(inputJson)));
+ case("PublishVersionToAlias"):
+ return JsonUtils.mapper.valueToTree(writerNode.apply(getPublishVersionToAliasNodeInput(inputJson)));
+ case("Save"):
+ return JsonUtils.mapper.valueToTree(writerNode.apply(getSaveNodeInput(inputJson)));
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported Method = %s", method));
+ }
+
} catch (Exception e) {
logger.log(ExceptionUtils.getStackTrace(e));
throw new RuntimeException(e);
}
}
+
+ //no completed yet
+ private NodeInput getPublishVersionNodeInput(JsonNode inputJson) throws ExecutionException {
+ LoadingCache cacheLoader = injector.getInstance(
+ Key.get(new TypeLiteral> () {}, Names.named(PiedPiperConstants.AWS_SSM_CACHE)));
+ String accessKey = cacheLoader.get("AccessKey");
+ String secretKey = cacheLoader.get("SecretKey");
+ String hashKey = GraphUtils.getRangeKeyEquals(
+ inputJson.get(PiedPiperConstants.GRAPH).get(PiedPiperConstants.PROJECT_NAME).asText(),
+ inputJson.get(PiedPiperConstants.GRAPH).get(PiedPiperConstants.GRAPH_NAME).asText());
+ String tableName = inputJson.get(DynamoDBBaseNode.TABLE_NAME.getParameterName()).asText();
+
+ String keyQuery = PiedPiperConstants.VERSION_QUERY;
+
+ ObjectNode queryNode = JsonUtils.mapper.createObjectNode();
+ queryNode.set(AWSNode.ACCESS_KEY.getParameterName(), ParameterUtils.createParamValueNode(accessKey));
+ queryNode.set(AWSNode.SECRET_KEY.getParameterName(), ParameterUtils.createParamValueNode(secretKey));
+ queryNode.set(DynamoDBBaseNode.TABLE_NAME.getParameterName(), ParameterUtils.createParamValueNode(tableName));
+ queryNode.set(AWSNode.REGION.getParameterName(),
+ ParameterUtils.createParamValueNode(Regions.US_EAST_1.getName()));
+ queryNode.set(DynamoDBReaderNode.KEY_QUERY_EXPRESSION.getParameterName(),
+ ParameterUtils.createParamValueNode(keyQuery));
+ queryNode.set(ParameterUtils.getDynamoParamPlaceHolderName(PiedPiperConstants.ALMIGHTY_TABLE_HASH_KEY),
+ ParameterUtils.createParamValueNode(hashKey));
+ queryNode.set(ParameterUtils.getDynamoParamPlaceHolderName(PiedPiperConstants.ALMIGHTY_TABLE_RANGE_KEY),
+ ParameterUtils.createParamValueNode(0));
+
+ DynamoDBReaderNode readerNode = (DynamoDBReaderNode) injector.getInstance(DynamoDBReaderNode.class);
+ readerNode.setILogger(logger);
+ readerNode.setInjector(injector);
+ NodeOutput dynamoRecordOutput;
+ NodeInput nodeInput = new NodeInput();
+ nodeInput.setInput(queryNode);
+
+ dynamoRecordOutput = (NodeOutput) readerNode.apply(nodeInput);
+
+ ArrayNode dynamoRecord = (ArrayNode) dynamoRecordOutput.getOutput();
+ NodeInput Input = new NodeInput();
+ return Input;
+
+ }
+ private NodeInput getPublishVersionToAliasNodeInput(JsonNode inputJson) throws ExecutionException {
+ LoadingCache cacheLoader = injector.getInstance(
+ Key.get(new TypeLiteral> () {}, Names.named(PiedPiperConstants.AWS_SSM_CACHE)));
+ String accessKey = cacheLoader.get("AccessKey");
+ String secretKey = cacheLoader.get("SecretKey");
+
+ String tableName = inputJson.get(DynamoDBBaseNode.TABLE_NAME.getParameterName()).asText();
+ String rangeKey = inputJson.get("alias").asText();
+ String hashKey = inputJson.get("graphName").asText();
+ int version = inputJson.get(PiedPiperConstants.VERSION).asInt();
+
+ ObjectNode inputNodeJsonNode = JsonUtils.mapper.createObjectNode();
+ inputNodeJsonNode.set(AWSNode.ACCESS_KEY.getParameterName(), ParameterUtils.createParamValueNode(accessKey));
+ inputNodeJsonNode.set(AWSNode.SECRET_KEY.getParameterName(), ParameterUtils.createParamValueNode(secretKey));
+ inputNodeJsonNode.set(DynamoDBBaseNode.TABLE_NAME.getParameterName(), ParameterUtils.createParamValueNode(tableName));
+ inputNodeJsonNode.set(AWSNode.REGION.getParameterName(),
+ ParameterUtils.createParamValueNode(Regions.US_EAST_1.getName()));
+ inputNodeJsonNode.set(PiedPiperConstants.ALMIGHTY_TABLE_HASH_KEY, ParameterUtils.createParamValueNode(hashKey));
+ inputNodeJsonNode.set(PiedPiperConstants.ALMIGHTY_TABLE_RANGE_KEY, ParameterUtils.createParamValueNode(rangeKey));
+ inputNodeJsonNode.set(PiedPiperConstants.VERSION, ParameterUtils.createParamValueNode(version));
+ NodeInput nodeInput = new NodeInput();
+ nodeInput.setInput(inputNodeJsonNode);
+ return nodeInput;
- private NodeInput getNodeInput(JsonNode inputJson) throws JsonProcessingException, ExecutionException {
+
+ }
+
+
+
+ private NodeInput getSaveNodeInput(JsonNode inputJson) throws JsonProcessingException, ExecutionException {
LoadingCache cacheLoader = injector.getInstance(
Key.get(new TypeLiteral> () {}, Names.named(PiedPiperConstants.AWS_SSM_CACHE)));
String accessKey = cacheLoader.get("AccessKey");
String secretKey = cacheLoader.get("SecretKey");
String tableName = inputJson.get(DynamoDBBaseNode.TABLE_NAME.getParameterName()).asText();
- String hashKey = PiedPiperConstants.GRAPH;
- String rangeKey = GraphUtils.getRangeKeyEquals(
+ int rangeKey = 0;
+ String hashKey = GraphUtils.getRangeKeyEquals(
inputJson.get(PiedPiperConstants.GRAPH).get(PiedPiperConstants.PROJECT_NAME).asText(),
inputJson.get(PiedPiperConstants.GRAPH).get(PiedPiperConstants.GRAPH_NAME).asText());