diff --git a/pom.xml b/pom.xml
index 277b2a2..f9bad7a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,18 +1,18 @@
- 4.0.0
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ 4.0.0
- com.tayo
- KinesisEncryption
- 0.0.1-SNAPSHOT
- jar
+ com.tayo
+ KinesisEncryption
+ 0.0.1-SNAPSHOT
+ jar
- KinesisEncryption
- http://maven.apache.org
+ KinesisEncryption
+ http://maven.apache.org
-
- UTF-8
-
+
+ UTF-8
+
@@ -45,42 +45,42 @@
-
-
- junit
- junit
- 3.8.1
- test
-
-
- com.amazonaws
- aws-java-sdk-core
- 1.11.46
-
-
- com.amazonaws
- aws-java-sdk
- 1.11.46
-
-
- org.bouncycastle
- bcprov-ext-jdk15on
- 1.54
-
-
- com.amazonaws
- aws-encryption-sdk-java
- 0.0.1
-
-
+
+
+ junit
+ junit
+ 3.8.1
+ test
+
+
+ com.amazonaws
+ aws-java-sdk-core
+ 1.11.714
+
+
+
+ org.bouncycastle
+ bcprov-ext-jdk15on
+ 1.54
+
+
+ com.amazonaws
+ aws-encryption-sdk-java
+ 0.0.1
+
+
com.amazonaws
amazon-kinesis-client
- 1.7.2
+ 1.13.2
com.amazonaws
amazon-kinesis-producer
- 0.12.3
+ 0.14.0
org.slf4j
@@ -88,21 +88,21 @@
1.7.13
-
-
- commons-io
- commons-io
- 2.5
-
-
- org.json
- json
- 20160212
-
-
- com.googlecode.json-simple
- json-simple
- 1.1.1
-
-
+
+
+ commons-io
+ commons-io
+ 2.5
+
+
+ org.json
+ json
+ 20160212
+
+
+ com.googlecode.json-simple
+ json-simple
+ 1.1.1
+
+
diff --git a/src/main/java/kinesisencryption/dao/TickerSalesObject.java b/src/main/java/kinesisencryption/dao/TickerSalesObject.java
index 63d95f5..03f15e2 100644
--- a/src/main/java/kinesisencryption/dao/TickerSalesObject.java
+++ b/src/main/java/kinesisencryption/dao/TickerSalesObject.java
@@ -31,4 +31,41 @@ public String toString()
", timeStamp='" + timeStamp + '\'' +
'}';
}
+
+ public TickerSalesObject()
+ {
+
+ }
+
+ public String getTickerSymbol() {
+ return tickerSymbol;
+ }
+
+ public void setTickerSymbol(String tickerSymbol) {
+ this.tickerSymbol = tickerSymbol;
+ }
+
+ public String getSalesPrice() {
+ return salesPrice;
+ }
+
+ public void setSalesPrice(String salesPrice) {
+ this.salesPrice = salesPrice;
+ }
+
+ public String getOrderId() {
+ return orderId;
+ }
+
+ public void setOrderId(String orderId) {
+ this.orderId = orderId;
+ }
+
+ public String getTimeStamp() {
+ return timeStamp;
+ }
+
+ public void setTimeStamp(String timeStamp) {
+ this.timeStamp = timeStamp;
+ }
}
diff --git a/src/main/java/kinesisencryption/kcl2/DownstreamThread.java b/src/main/java/kinesisencryption/kcl2/DownstreamThread.java
new file mode 100644
index 0000000..6f8c461
--- /dev/null
+++ b/src/main/java/kinesisencryption/kcl2/DownstreamThread.java
@@ -0,0 +1,70 @@
+package kinesisencryption.kcl2;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.util.*;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import kinesisencryption.dao.TickerSalesObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.Record;
+
+/**
+ * Thread used for decrypting and printing consumed records to the logs
+ *
+ */
+public class DownstreamThread implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(kinesisencryption.kcl.RecordPrinterThread.class);
+ private List recordList;
+ private AmazonKinesisClient kinesis;
+
+ public AmazonKinesisClient getKinesis() {
+ return kinesis;
+ }
+
+
+ public List getRecordList() {
+ return recordList;
+ }
+
+ public DownstreamThread(List recordList) {
+ this.recordList = recordList;
+ }
+
+
+
+ @Override
+ public void run() {
+ Map resultMap = new HashMap();
+ for (Record record : this.getRecordList()) {
+ try {
+ ByteBuffer buffer = record.getData();
+
+ String result = Charset.forName("UTF-8").newDecoder().decode(buffer).toString();
+ log.info("Cipher Blob :" + record.getData().toString() + " : " + "Decrypted Text is :"
+ + result);
+ TickerSalesObject salesObject = new TickerSalesObject();
+ ObjectMapper mapper = new ObjectMapper();
+ salesObject = mapper.readValue(result, TickerSalesObject.class);
+ resultMap.put(UUID.randomUUID().toString(), salesObject);
+ } catch (CharacterCodingException e) {
+ log.error("Unable to decode result for " + record.getData().toString() + "with equence number "
+ + record.getSequenceNumber());
+ } catch (JsonParseException e) {
+ e.printStackTrace();
+ } catch (JsonMappingException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
+
diff --git a/src/main/java/kinesisencryption/kcl2/KCLConsumer.java b/src/main/java/kinesisencryption/kcl2/KCLConsumer.java
new file mode 100644
index 0000000..1ac90f6
--- /dev/null
+++ b/src/main/java/kinesisencryption/kcl2/KCLConsumer.java
@@ -0,0 +1,64 @@
+package kinesisencryption.kcl2;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
+import kinesisencryption.utils.KinesisEncryptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+/**
+ * A consumer application that consumes encrypted records from kinesis streams.
+ */
+public class KCLConsumer {
+ private static final InitialPositionInStream INITIAL_POSITION_IN_STREAM = InitialPositionInStream.TRIM_HORIZON;
+ private static AWSCredentialsProvider credentialsProvider;
+ private static final Logger log = LoggerFactory.getLogger(kinesisencryption.kcl.EncryptedConsumerWithKCL.class);
+
+ private static void initialize() {
+ java.security.Security.setProperty("networkaddress.cache.ttl", "60");
+ credentialsProvider = new DefaultAWSCredentialsProviderChain();
+
+ try {
+ credentialsProvider.getCredentials();
+
+ } catch (Exception e) {
+ throw new AmazonClientException("Cannot find credentials");
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ initialize();
+
+ String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
+ String streamName = KinesisEncryptionUtils.getProperties().getProperty("stream_name");
+ String appName = KinesisEncryptionUtils.getProperties().getProperty("kcl_name");
+ String ddbRegion = KinesisEncryptionUtils.getProperties().getProperty("ddb_region_4_kcl");
+ KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName,
+ streamName, credentialsProvider, workerId);
+ kinesisClientLibConfiguration.withInitialPositionInStream(INITIAL_POSITION_IN_STREAM).withRegionName(ddbRegion);
+
+ IRecordProcessorFactory recordProcessorFactory = new KCLRecordProcessorFactory();
+ Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);
+
+ log.info("Started KCL Worker process for Stream " + streamName + " " + "with workerId " + workerId);
+
+ int exitCode = 0;
+ try {
+ worker.run();
+ } catch (Throwable t) {
+ System.err.println("Caught throwable while processing data.");
+ t.printStackTrace();
+ exitCode = 1;
+ }
+ System.exit(exitCode);
+ }
+}
diff --git a/src/main/java/kinesisencryption/kcl2/KCLRecordProcessor.java b/src/main/java/kinesisencryption/kcl2/KCLRecordProcessor.java
new file mode 100644
index 0000000..39223b9
--- /dev/null
+++ b/src/main/java/kinesisencryption/kcl2/KCLRecordProcessor.java
@@ -0,0 +1,95 @@
+package kinesisencryption.kcl2;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
+import com.amazonaws.services.kinesis.model.Record;
+
+/**
+ * A KCL RecordProcessor that processes server-side encrypted Kinesis records
+ */
+public class KCLRecordProcessor implements IRecordProcessor {
+ private static final Logger log = LoggerFactory.getLogger(kinesisencryption.kcl.EncryptedKCLRecordProcessor.class);
+ private String shardId, keyArn, encryptionContext;
+
+ // Backoff and retry settings
+ private static final long BACKOFF_TIME_IN_MILLIS = 3000L;
+ private static final int NUM_RETRIES = 10;
+
+ // Checkpoint about once a minute
+ private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
+ private long nextCheckpointTimeInMillis;
+
+ @Override
+ public void initialize(String shardId) {
+ log.info("Initializing record processor for shard :" + shardId);
+ this.shardId = shardId;
+ }
+
+ @Override
+ public void processRecords(List recordList, IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
+ if (recordList.size() > 0) {
+ log.info("Received record size is : " + recordList.size());
+ S3ArchiverThread printer = new S3ArchiverThread(recordList);
+ Thread thread = new Thread(printer);
+ thread.start();
+ }
+
+ if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
+ checkpoint(iRecordProcessorCheckpointer);
+ nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
+ }
+ }
+
+ @Override
+ public void shutdown(IRecordProcessorCheckpointer iRecordProcessorCheckpointer, ShutdownReason shutdownReason) {
+ log.info("Shutting down record processor for shard: " + shardId);
+ // Important to checkpoint after reaching end of shard, so we can start
+ // processing data from child shards.
+ if (shutdownReason == ShutdownReason.TERMINATE) {
+ checkpoint(iRecordProcessorCheckpointer);
+ }
+ }
+
+ private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
+ log.info("Checkpointing shard " + shardId);
+ for (int i = 0; i < NUM_RETRIES; i++) {
+ try {
+ checkpointer.checkpoint();
+ break;
+ } catch (ShutdownException se) {
+ // Ignore checkpoint if the processor instance has been shutdown
+ // (fail over).
+ log.info("Caught shutdown exception, skipping checkpoint.", se);
+ break;
+ } catch (ThrottlingException e) {
+ // Backoff and re-attempt checkpoint upon transient failures
+ if (i >= (NUM_RETRIES - 1)) {
+ log.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
+ break;
+ } else {
+ log.info("Transient issue when checkpointing - attempt " + (i + 1) + " of " + NUM_RETRIES, e);
+ }
+ } catch (InvalidStateException e) {
+ // This indicates an issue with the DynamoDB table (check for
+ // table, provisioned IOPS).
+ log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
+ break;
+ }
+
+ try {
+ Thread.sleep(BACKOFF_TIME_IN_MILLIS);
+ } catch (InterruptedException e) {
+ log.debug("Interrupted sleep", e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/kinesisencryption/kcl2/KCLRecordProcessorFactory.java b/src/main/java/kinesisencryption/kcl2/KCLRecordProcessorFactory.java
new file mode 100644
index 0000000..0fc1e3a
--- /dev/null
+++ b/src/main/java/kinesisencryption/kcl2/KCLRecordProcessorFactory.java
@@ -0,0 +1,13 @@
+package kinesisencryption.kcl2;
+
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
+
+/**
+ *
+ */
+public class KCLRecordProcessorFactory implements IRecordProcessorFactory {
+ public IRecordProcessor createProcessor() {
+ return new KCLRecordProcessor();
+ }
+}
diff --git a/src/main/java/kinesisencryption/kcl2/S3ArchiverThread.java b/src/main/java/kinesisencryption/kcl2/S3ArchiverThread.java
new file mode 100644
index 0000000..ffc2b3d
--- /dev/null
+++ b/src/main/java/kinesisencryption/kcl2/S3ArchiverThread.java
@@ -0,0 +1,154 @@
+package kinesisencryption.kcl2;
+
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+
+import kinesisencryption.utils.KinesisEncryptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+/**
+ * Created by temitayo on 11/30/16.
+ */
+public class S3ArchiverThread implements Runnable
+{
+ private static String BUCKET_NAME = "12615eucentral";
+ private static final String fileName = "centos_consumer";
+ private static final Logger log = LoggerFactory.getLogger(S3ArchiverThread.class);
+ private List recordList;
+ private static final CharsetDecoder DECODER = Charset.forName("UTF-8").newDecoder();
+ AmazonS3Client s3Client = new AmazonS3Client(new DefaultAWSCredentialsProviderChain()).withRegion(Region.getRegion(Regions.US_WEST_2));
+
+ public S3ArchiverThread(List recordList)
+ {
+ this.recordList = recordList;
+ }
+
+ public List getRecordList()
+ {
+ return recordList;
+ }
+
+
+
+ public String writeRecordsToFile(List recordList)
+ {
+
+ //File file = new File(fileName+System.currentTimeMillis());
+ Path path = Paths.get(fileName+System.currentTimeMillis());
+ try(BufferedWriter writer = Files.newBufferedWriter(path))
+ {
+
+ for(Record record: recordList)
+ {
+ writer.write(DECODER.decode(record.getData()).toString());
+ }
+
+ }
+ catch(Exception e)
+ {
+ log.error("error on writing to file for " + path.toString() + "Error : " + e.toString());
+ }
+
+
+ return path.toString();
+ }
+
+ //This is a test
+ public static void main(String[] args)
+ {
+
+ //File file = new File(fileName+System.currentTimeMillis());
+ Path path = Paths.get("/Users/temitayo/Desktop/archive"+System.currentTimeMillis());
+ try(BufferedWriter writer = Files.newBufferedWriter(path))
+ {
+
+ System.out.println("Writing now...");
+ writer.write("This is a CodeGuru , Gives Clues ");
+ writer.write("Java\n");
+ writer.write("Python\n");
+ writer.write("Clojure\n");
+ writer.write("Scala\n");
+ writer.write("JavaScript\n");
+
+ }
+ catch(Exception e)
+ {
+ System.out.println(e.toString());
+ }
+
+ AmazonS3Client s3Client = new AmazonS3Client(new DefaultAWSCredentialsProviderChain());
+ // s3Client.setEndpoint("s3.eu-central-1.amazonaws.com");
+ try
+ {
+ System.out.println("Uploading a new object to S3 from a file\n");
+ File file = new File(path.toString());
+ System.out.println("Filename is " + path.toString());
+ s3Client.putObject(new PutObjectRequest(BUCKET_NAME, file.getName(), file));
+
+ }
+ catch(AmazonServiceException ase)
+ {
+ System.out.println("Uploading a new object to S3 from a file\n");
+ }
+
+ }
+
+ private void uploadS3File(String path)
+ {
+
+ try
+ {
+ s3Client.setEndpoint(KinesisEncryptionUtils.getProperties().getProperty("s3endpoint"));
+ String bucketName = KinesisEncryptionUtils.getProperties().getProperty("s3bucket");
+ log.info("Uploading a new object to S3 bucket " + bucketName);
+ File file = new File(path);
+ log.info("Filename is " + path);
+ s3Client.putObject(new PutObjectRequest(bucketName, file.getName(), file));
+ log.info("File : " + file + " uploaded successfully");
+ log.info("Now removing the file .... file : " + file);
+ //removing objects that are already uploaded
+ if(file.delete())
+ {
+ log.info("File : " + file + " deleted successfully");
+ }
+ else
+ {
+ log.error("Unable to delete file " + file);
+ }
+
+ }
+ catch(AmazonServiceException ase)
+ {
+ log.error("Failed uploading file to S3 : " + ase.toString());
+ }
+ catch(IOException ioe)
+ {
+ log.error("Failed uploading file to S3 : " + ioe.toString());
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ String path = this.writeRecordsToFile(this.getRecordList());
+ uploadS3File(path);
+
+ }
+}
diff --git a/src/main/java/kinesisencryption/kpl/EncryptedProducerWithKPL.java b/src/main/java/kinesisencryption/kpl/EncryptedProducerWithKPL.java
index 1f2ec80..4885ab3 100644
--- a/src/main/java/kinesisencryption/kpl/EncryptedProducerWithKPL.java
+++ b/src/main/java/kinesisencryption/kpl/EncryptedProducerWithKPL.java
@@ -111,7 +111,7 @@ public void onSuccess(UserRecordResult result) {
// Adding the encrypted record to stream
ListenableFuture f = producer.addUserRecord(streamName, randomPartitionKey(),
data);
- Futures.addCallback(f, callback);
+ //Futures.addCallback(f, callback);
log.info("Encrypted record " + data.toString() + " " + "added successfully");
}
tickerObjectList = encryptedProducerWithKPLProducer.getTickerSymbolList();
diff --git a/src/main/java/kinesisencryption/streams/ServerSideEncryptedProducer.java b/src/main/java/kinesisencryption/streams/ServerSideEncryptedProducer.java
new file mode 100644
index 0000000..62c133e
--- /dev/null
+++ b/src/main/java/kinesisencryption/streams/ServerSideEncryptedProducer.java
@@ -0,0 +1,88 @@
+package kinesisencryption.streams;
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.EncryptionType;
+import com.amazonaws.services.kinesis.model.PutRecordRequest;
+import com.amazonaws.services.kinesis.model.StartStreamEncryptionRequest;
+import kinesisencryption.dao.TickerSalesObject;
+import kinesisencryption.utils.KinesisEncryptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static kinesisencryption.streams.EncryptedProducerWithStreams.randomPartitionKey;
+
+public class ServerSideEncryptedProducer
+{
+ private static final Logger log = LoggerFactory.getLogger(EncryptedProducerWithStreams.class);
+ private List tickerSymbolList;
+
+ public List getTickerSymbolList() {
+ return tickerSymbolList;
+ }
+
+ public void setTickerSymbolList(List tickerSymbolList) {
+ this.tickerSymbolList = tickerSymbolList;
+ }
+
+ public static void main(String[] args) {
+ AmazonKinesisClient kinesis = new AmazonKinesisClient(
+ new DefaultAWSCredentialsProviderChain().getCredentials());
+
+
+ /*
+ * Simulating the appearance of a steady flow of data by continuously
+ * loading data from file
+ */
+ try {
+
+ String streamName = KinesisEncryptionUtils.getProperties().getProperty("stream_name");
+ log.info("Successfully retrieved stream name property " + streamName);
+ String keyId = KinesisEncryptionUtils.getProperties().getProperty("key_id");
+ log.info("Successfully retrieved key id property " + keyId);
+ String kinesisEndpoint = KinesisEncryptionUtils.getProperties().getProperty("kinesis_endpoint");
+ log.info("Successfully retrieved kinesis endpoint property " + kinesisEndpoint);
+ kinesis.setEndpoint(kinesisEndpoint);
+ String fileLocation = KinesisEncryptionUtils.getProperties().getProperty("file_path");
+ log.info("Successfully retrieved file location property " + fileLocation);
+
+ List tickerSymbolsList = KinesisEncryptionUtils.getDataObjects(fileLocation);
+ ServerSideEncryptedProducer producer = new ServerSideEncryptedProducer();
+ producer.setTickerSymbolList(tickerSymbolsList);
+ StartStreamEncryptionRequest startStreamEncryptionRequest = new StartStreamEncryptionRequest();
+ startStreamEncryptionRequest.setEncryptionType(EncryptionType.KMS);
+ startStreamEncryptionRequest.setKeyId(keyId);
+ startStreamEncryptionRequest.setStreamName(streamName);
+
+
+ while (true) {
+
+ for (TickerSalesObject ticker : tickerSymbolsList) {
+ PutRecordRequest putRecordRequest = new PutRecordRequest();
+ putRecordRequest.setStreamName(streamName);
+ log.info("Before encryption size of String Object is "
+ + KinesisEncryptionUtils.calculateSizeOfObject(ticker.toString()));
+ // UTF-8 encoding of encryptyed record
+ putRecordRequest.setData(ByteBuffer.wrap(String.format(ticker.toString()).getBytes("UTF-8")));
+ putRecordRequest.setPartitionKey(randomPartitionKey());
+ // putting the record into the stream
+ kinesis.putRecord(putRecordRequest);
+ log.info("Ticker added :" + ticker.toString() + "Ticker :" + ticker.toString());
+
+ }
+ tickerSymbolsList = producer.getTickerSymbolList();
+ Thread.sleep(100);
+ }
+ } catch (IOException ioe) {
+ log.error(ioe.toString());
+ } catch (InterruptedException ie) {
+ log.error(ie.toString());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/resources/app.properties b/src/main/resources/app.properties
index 88a81ca..84e5cee 100644
--- a/src/main/resources/app.properties
+++ b/src/main/resources/app.properties
@@ -7,4 +7,6 @@ kms_endpoint=kms.us-east-1.amazonaws.com
sharditerator_type=TRIM_HORIZON
kcl_name=EncryptedKCL
ddb_region_4_kcl=us-west-2
-encryption_context=cars
\ No newline at end of file
+encryption_context=cars
+s3endpoint=
+s3bucket=
\ No newline at end of file