diff --git a/pom.xml b/pom.xml index 277b2a2..f9bad7a 100644 --- a/pom.xml +++ b/pom.xml @@ -1,18 +1,18 @@ - 4.0.0 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 - com.tayo - KinesisEncryption - 0.0.1-SNAPSHOT - jar + com.tayo + KinesisEncryption + 0.0.1-SNAPSHOT + jar - KinesisEncryption - http://maven.apache.org + KinesisEncryption + http://maven.apache.org - - UTF-8 - + + UTF-8 + @@ -45,42 +45,42 @@ - - - junit - junit - 3.8.1 - test - - - com.amazonaws - aws-java-sdk-core - 1.11.46 - - - com.amazonaws - aws-java-sdk - 1.11.46 - - - org.bouncycastle - bcprov-ext-jdk15on - 1.54 - - - com.amazonaws - aws-encryption-sdk-java - 0.0.1 - - + + + junit + junit + 3.8.1 + test + + + com.amazonaws + aws-java-sdk-core + 1.11.714 + + + + org.bouncycastle + bcprov-ext-jdk15on + 1.54 + + + com.amazonaws + aws-encryption-sdk-java + 0.0.1 + + com.amazonaws amazon-kinesis-client - 1.7.2 + 1.13.2 com.amazonaws amazon-kinesis-producer - 0.12.3 + 0.14.0 org.slf4j @@ -88,21 +88,21 @@ 1.7.13 - - - commons-io - commons-io - 2.5 - - - org.json - json - 20160212 - - - com.googlecode.json-simple - json-simple - 1.1.1 - - + + + commons-io + commons-io + 2.5 + + + org.json + json + 20160212 + + + com.googlecode.json-simple + json-simple + 1.1.1 + + diff --git a/src/main/java/kinesisencryption/dao/TickerSalesObject.java b/src/main/java/kinesisencryption/dao/TickerSalesObject.java index 63d95f5..03f15e2 100644 --- a/src/main/java/kinesisencryption/dao/TickerSalesObject.java +++ b/src/main/java/kinesisencryption/dao/TickerSalesObject.java @@ -31,4 +31,41 @@ public String toString() ", timeStamp='" + timeStamp + '\'' + '}'; } + + public TickerSalesObject() + { + + } + + public String getTickerSymbol() { + return tickerSymbol; + } + + public void setTickerSymbol(String tickerSymbol) { + this.tickerSymbol = tickerSymbol; + } + + public String getSalesPrice() { + return salesPrice; + } + + public void setSalesPrice(String salesPrice) { + this.salesPrice = salesPrice; + } + + public String getOrderId() { + return orderId; + } + + public void setOrderId(String orderId) { + this.orderId = orderId; + } + + public String getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(String timeStamp) { + this.timeStamp = timeStamp; + } } diff --git a/src/main/java/kinesisencryption/kcl2/DownstreamThread.java b/src/main/java/kinesisencryption/kcl2/DownstreamThread.java new file mode 100644 index 0000000..6f8c461 --- /dev/null +++ b/src/main/java/kinesisencryption/kcl2/DownstreamThread.java @@ -0,0 +1,70 @@ +package kinesisencryption.kcl2; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.util.*; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import kinesisencryption.dao.TickerSalesObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.Record; + +/** + * Thread used for decrypting and printing consumed records to the logs + * + */ +public class DownstreamThread implements Runnable { + private static final Logger log = LoggerFactory.getLogger(kinesisencryption.kcl.RecordPrinterThread.class); + private List recordList; + private AmazonKinesisClient kinesis; + + public AmazonKinesisClient getKinesis() { + return kinesis; + } + + + public List getRecordList() { + return recordList; + } + + public DownstreamThread(List recordList) { + this.recordList = recordList; + } + + + + @Override + public void run() { + Map resultMap = new HashMap(); + for (Record record : this.getRecordList()) { + try { + ByteBuffer buffer = record.getData(); + + String result = Charset.forName("UTF-8").newDecoder().decode(buffer).toString(); + log.info("Cipher Blob :" + record.getData().toString() + " : " + "Decrypted Text is :" + + result); + TickerSalesObject salesObject = new TickerSalesObject(); + ObjectMapper mapper = new ObjectMapper(); + salesObject = mapper.readValue(result, TickerSalesObject.class); + resultMap.put(UUID.randomUUID().toString(), salesObject); + } catch (CharacterCodingException e) { + log.error("Unable to decode result for " + record.getData().toString() + "with equence number " + + record.getSequenceNumber()); + } catch (JsonParseException e) { + e.printStackTrace(); + } catch (JsonMappingException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } +} + diff --git a/src/main/java/kinesisencryption/kcl2/KCLConsumer.java b/src/main/java/kinesisencryption/kcl2/KCLConsumer.java new file mode 100644 index 0000000..1ac90f6 --- /dev/null +++ b/src/main/java/kinesisencryption/kcl2/KCLConsumer.java @@ -0,0 +1,64 @@ +package kinesisencryption.kcl2; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; +import kinesisencryption.utils.KinesisEncryptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.UUID; + +/** + * A consumer application that consumes encrypted records from kinesis streams. + */ +public class KCLConsumer { + private static final InitialPositionInStream INITIAL_POSITION_IN_STREAM = InitialPositionInStream.TRIM_HORIZON; + private static AWSCredentialsProvider credentialsProvider; + private static final Logger log = LoggerFactory.getLogger(kinesisencryption.kcl.EncryptedConsumerWithKCL.class); + + private static void initialize() { + java.security.Security.setProperty("networkaddress.cache.ttl", "60"); + credentialsProvider = new DefaultAWSCredentialsProviderChain(); + + try { + credentialsProvider.getCredentials(); + + } catch (Exception e) { + throw new AmazonClientException("Cannot find credentials"); + } + + } + + public static void main(String[] args) throws Exception { + initialize(); + + String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); + String streamName = KinesisEncryptionUtils.getProperties().getProperty("stream_name"); + String appName = KinesisEncryptionUtils.getProperties().getProperty("kcl_name"); + String ddbRegion = KinesisEncryptionUtils.getProperties().getProperty("ddb_region_4_kcl"); + KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, + streamName, credentialsProvider, workerId); + kinesisClientLibConfiguration.withInitialPositionInStream(INITIAL_POSITION_IN_STREAM).withRegionName(ddbRegion); + + IRecordProcessorFactory recordProcessorFactory = new KCLRecordProcessorFactory(); + Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration); + + log.info("Started KCL Worker process for Stream " + streamName + " " + "with workerId " + workerId); + + int exitCode = 0; + try { + worker.run(); + } catch (Throwable t) { + System.err.println("Caught throwable while processing data."); + t.printStackTrace(); + exitCode = 1; + } + System.exit(exitCode); + } +} diff --git a/src/main/java/kinesisencryption/kcl2/KCLRecordProcessor.java b/src/main/java/kinesisencryption/kcl2/KCLRecordProcessor.java new file mode 100644 index 0000000..39223b9 --- /dev/null +++ b/src/main/java/kinesisencryption/kcl2/KCLRecordProcessor.java @@ -0,0 +1,95 @@ +package kinesisencryption.kcl2; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import com.amazonaws.services.kinesis.model.Record; + +/** + * A KCL RecordProcessor that processes server-side encrypted Kinesis records + */ +public class KCLRecordProcessor implements IRecordProcessor { + private static final Logger log = LoggerFactory.getLogger(kinesisencryption.kcl.EncryptedKCLRecordProcessor.class); + private String shardId, keyArn, encryptionContext; + + // Backoff and retry settings + private static final long BACKOFF_TIME_IN_MILLIS = 3000L; + private static final int NUM_RETRIES = 10; + + // Checkpoint about once a minute + private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L; + private long nextCheckpointTimeInMillis; + + @Override + public void initialize(String shardId) { + log.info("Initializing record processor for shard :" + shardId); + this.shardId = shardId; + } + + @Override + public void processRecords(List recordList, IRecordProcessorCheckpointer iRecordProcessorCheckpointer) { + if (recordList.size() > 0) { + log.info("Received record size is : " + recordList.size()); + S3ArchiverThread printer = new S3ArchiverThread(recordList); + Thread thread = new Thread(printer); + thread.start(); + } + + if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { + checkpoint(iRecordProcessorCheckpointer); + nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; + } + } + + @Override + public void shutdown(IRecordProcessorCheckpointer iRecordProcessorCheckpointer, ShutdownReason shutdownReason) { + log.info("Shutting down record processor for shard: " + shardId); + // Important to checkpoint after reaching end of shard, so we can start + // processing data from child shards. + if (shutdownReason == ShutdownReason.TERMINATE) { + checkpoint(iRecordProcessorCheckpointer); + } + } + + private void checkpoint(IRecordProcessorCheckpointer checkpointer) { + log.info("Checkpointing shard " + shardId); + for (int i = 0; i < NUM_RETRIES; i++) { + try { + checkpointer.checkpoint(); + break; + } catch (ShutdownException se) { + // Ignore checkpoint if the processor instance has been shutdown + // (fail over). + log.info("Caught shutdown exception, skipping checkpoint.", se); + break; + } catch (ThrottlingException e) { + // Backoff and re-attempt checkpoint upon transient failures + if (i >= (NUM_RETRIES - 1)) { + log.error("Checkpoint failed after " + (i + 1) + "attempts.", e); + break; + } else { + log.info("Transient issue when checkpointing - attempt " + (i + 1) + " of " + NUM_RETRIES, e); + } + } catch (InvalidStateException e) { + // This indicates an issue with the DynamoDB table (check for + // table, provisioned IOPS). + log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); + break; + } + + try { + Thread.sleep(BACKOFF_TIME_IN_MILLIS); + } catch (InterruptedException e) { + log.debug("Interrupted sleep", e); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/kinesisencryption/kcl2/KCLRecordProcessorFactory.java b/src/main/java/kinesisencryption/kcl2/KCLRecordProcessorFactory.java new file mode 100644 index 0000000..0fc1e3a --- /dev/null +++ b/src/main/java/kinesisencryption/kcl2/KCLRecordProcessorFactory.java @@ -0,0 +1,13 @@ +package kinesisencryption.kcl2; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; + +/** + * + */ +public class KCLRecordProcessorFactory implements IRecordProcessorFactory { + public IRecordProcessor createProcessor() { + return new KCLRecordProcessor(); + } +} diff --git a/src/main/java/kinesisencryption/kcl2/S3ArchiverThread.java b/src/main/java/kinesisencryption/kcl2/S3ArchiverThread.java new file mode 100644 index 0000000..ffc2b3d --- /dev/null +++ b/src/main/java/kinesisencryption/kcl2/S3ArchiverThread.java @@ -0,0 +1,154 @@ +package kinesisencryption.kcl2; + + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.PutObjectRequest; + +import kinesisencryption.utils.KinesisEncryptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +/** + * Created by temitayo on 11/30/16. + */ +public class S3ArchiverThread implements Runnable +{ + private static String BUCKET_NAME = "12615eucentral"; + private static final String fileName = "centos_consumer"; + private static final Logger log = LoggerFactory.getLogger(S3ArchiverThread.class); + private List recordList; + private static final CharsetDecoder DECODER = Charset.forName("UTF-8").newDecoder(); + AmazonS3Client s3Client = new AmazonS3Client(new DefaultAWSCredentialsProviderChain()).withRegion(Region.getRegion(Regions.US_WEST_2)); + + public S3ArchiverThread(List recordList) + { + this.recordList = recordList; + } + + public List getRecordList() + { + return recordList; + } + + + + public String writeRecordsToFile(List recordList) + { + + //File file = new File(fileName+System.currentTimeMillis()); + Path path = Paths.get(fileName+System.currentTimeMillis()); + try(BufferedWriter writer = Files.newBufferedWriter(path)) + { + + for(Record record: recordList) + { + writer.write(DECODER.decode(record.getData()).toString()); + } + + } + catch(Exception e) + { + log.error("error on writing to file for " + path.toString() + "Error : " + e.toString()); + } + + + return path.toString(); + } + + //This is a test + public static void main(String[] args) + { + + //File file = new File(fileName+System.currentTimeMillis()); + Path path = Paths.get("/Users/temitayo/Desktop/archive"+System.currentTimeMillis()); + try(BufferedWriter writer = Files.newBufferedWriter(path)) + { + + System.out.println("Writing now..."); + writer.write("This is a CodeGuru , Gives Clues "); + writer.write("Java\n"); + writer.write("Python\n"); + writer.write("Clojure\n"); + writer.write("Scala\n"); + writer.write("JavaScript\n"); + + } + catch(Exception e) + { + System.out.println(e.toString()); + } + + AmazonS3Client s3Client = new AmazonS3Client(new DefaultAWSCredentialsProviderChain()); + // s3Client.setEndpoint("s3.eu-central-1.amazonaws.com"); + try + { + System.out.println("Uploading a new object to S3 from a file\n"); + File file = new File(path.toString()); + System.out.println("Filename is " + path.toString()); + s3Client.putObject(new PutObjectRequest(BUCKET_NAME, file.getName(), file)); + + } + catch(AmazonServiceException ase) + { + System.out.println("Uploading a new object to S3 from a file\n"); + } + + } + + private void uploadS3File(String path) + { + + try + { + s3Client.setEndpoint(KinesisEncryptionUtils.getProperties().getProperty("s3endpoint")); + String bucketName = KinesisEncryptionUtils.getProperties().getProperty("s3bucket"); + log.info("Uploading a new object to S3 bucket " + bucketName); + File file = new File(path); + log.info("Filename is " + path); + s3Client.putObject(new PutObjectRequest(bucketName, file.getName(), file)); + log.info("File : " + file + " uploaded successfully"); + log.info("Now removing the file .... file : " + file); + //removing objects that are already uploaded + if(file.delete()) + { + log.info("File : " + file + " deleted successfully"); + } + else + { + log.error("Unable to delete file " + file); + } + + } + catch(AmazonServiceException ase) + { + log.error("Failed uploading file to S3 : " + ase.toString()); + } + catch(IOException ioe) + { + log.error("Failed uploading file to S3 : " + ioe.toString()); + } + } + + @Override + public void run() + { + String path = this.writeRecordsToFile(this.getRecordList()); + uploadS3File(path); + + } +} diff --git a/src/main/java/kinesisencryption/kpl/EncryptedProducerWithKPL.java b/src/main/java/kinesisencryption/kpl/EncryptedProducerWithKPL.java index 1f2ec80..4885ab3 100644 --- a/src/main/java/kinesisencryption/kpl/EncryptedProducerWithKPL.java +++ b/src/main/java/kinesisencryption/kpl/EncryptedProducerWithKPL.java @@ -111,7 +111,7 @@ public void onSuccess(UserRecordResult result) { // Adding the encrypted record to stream ListenableFuture f = producer.addUserRecord(streamName, randomPartitionKey(), data); - Futures.addCallback(f, callback); + //Futures.addCallback(f, callback); log.info("Encrypted record " + data.toString() + " " + "added successfully"); } tickerObjectList = encryptedProducerWithKPLProducer.getTickerSymbolList(); diff --git a/src/main/java/kinesisencryption/streams/ServerSideEncryptedProducer.java b/src/main/java/kinesisencryption/streams/ServerSideEncryptedProducer.java new file mode 100644 index 0000000..62c133e --- /dev/null +++ b/src/main/java/kinesisencryption/streams/ServerSideEncryptedProducer.java @@ -0,0 +1,88 @@ +package kinesisencryption.streams; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.EncryptionType; +import com.amazonaws.services.kinesis.model.PutRecordRequest; +import com.amazonaws.services.kinesis.model.StartStreamEncryptionRequest; +import kinesisencryption.dao.TickerSalesObject; +import kinesisencryption.utils.KinesisEncryptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +import static kinesisencryption.streams.EncryptedProducerWithStreams.randomPartitionKey; + +public class ServerSideEncryptedProducer +{ + private static final Logger log = LoggerFactory.getLogger(EncryptedProducerWithStreams.class); + private List tickerSymbolList; + + public List getTickerSymbolList() { + return tickerSymbolList; + } + + public void setTickerSymbolList(List tickerSymbolList) { + this.tickerSymbolList = tickerSymbolList; + } + + public static void main(String[] args) { + AmazonKinesisClient kinesis = new AmazonKinesisClient( + new DefaultAWSCredentialsProviderChain().getCredentials()); + + + /* + * Simulating the appearance of a steady flow of data by continuously + * loading data from file + */ + try { + + String streamName = KinesisEncryptionUtils.getProperties().getProperty("stream_name"); + log.info("Successfully retrieved stream name property " + streamName); + String keyId = KinesisEncryptionUtils.getProperties().getProperty("key_id"); + log.info("Successfully retrieved key id property " + keyId); + String kinesisEndpoint = KinesisEncryptionUtils.getProperties().getProperty("kinesis_endpoint"); + log.info("Successfully retrieved kinesis endpoint property " + kinesisEndpoint); + kinesis.setEndpoint(kinesisEndpoint); + String fileLocation = KinesisEncryptionUtils.getProperties().getProperty("file_path"); + log.info("Successfully retrieved file location property " + fileLocation); + + List tickerSymbolsList = KinesisEncryptionUtils.getDataObjects(fileLocation); + ServerSideEncryptedProducer producer = new ServerSideEncryptedProducer(); + producer.setTickerSymbolList(tickerSymbolsList); + StartStreamEncryptionRequest startStreamEncryptionRequest = new StartStreamEncryptionRequest(); + startStreamEncryptionRequest.setEncryptionType(EncryptionType.KMS); + startStreamEncryptionRequest.setKeyId(keyId); + startStreamEncryptionRequest.setStreamName(streamName); + + + while (true) { + + for (TickerSalesObject ticker : tickerSymbolsList) { + PutRecordRequest putRecordRequest = new PutRecordRequest(); + putRecordRequest.setStreamName(streamName); + log.info("Before encryption size of String Object is " + + KinesisEncryptionUtils.calculateSizeOfObject(ticker.toString())); + // UTF-8 encoding of encryptyed record + putRecordRequest.setData(ByteBuffer.wrap(String.format(ticker.toString()).getBytes("UTF-8"))); + putRecordRequest.setPartitionKey(randomPartitionKey()); + // putting the record into the stream + kinesis.putRecord(putRecordRequest); + log.info("Ticker added :" + ticker.toString() + "Ticker :" + ticker.toString()); + + } + tickerSymbolsList = producer.getTickerSymbolList(); + Thread.sleep(100); + } + } catch (IOException ioe) { + log.error(ioe.toString()); + } catch (InterruptedException ie) { + log.error(ie.toString()); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/resources/app.properties b/src/main/resources/app.properties index 88a81ca..84e5cee 100644 --- a/src/main/resources/app.properties +++ b/src/main/resources/app.properties @@ -7,4 +7,6 @@ kms_endpoint=kms.us-east-1.amazonaws.com sharditerator_type=TRIM_HORIZON kcl_name=EncryptedKCL ddb_region_4_kcl=us-west-2 -encryption_context=cars \ No newline at end of file +encryption_context=cars +s3endpoint= +s3bucket= \ No newline at end of file