From e8dc29cca47a23d4a54f0e7ed2ec97cf780b4bfb Mon Sep 17 00:00:00 2001 From: Taishi Ikai Date: Tue, 6 Oct 2015 11:07:13 +0900 Subject: [PATCH 1/2] [WIP] Add feature send to Elasticsearch From 10231d6c75e1b0629ce683099540fd8c85fdc785 Mon Sep 17 00:00:00 2001 From: Taishi Ikai Date: Tue, 6 Oct 2015 13:29:38 +0900 Subject: [PATCH 2/2] Refactor sending log part --- src/LogCollector.js | 25 ++++++++++++++ src/aws/Elasticsearch.js | 17 ++++++++++ src/gcp/BigQuery.js | 70 ++++++++++++++++++---------------------- src/main.js | 17 +++------- src/parser.js | 35 ++++++++++++++++++-- 5 files changed, 111 insertions(+), 53 deletions(-) create mode 100644 src/LogCollector.js create mode 100644 src/aws/Elasticsearch.js diff --git a/src/LogCollector.js b/src/LogCollector.js new file mode 100644 index 0000000..7abf7ff --- /dev/null +++ b/src/LogCollector.js @@ -0,0 +1,25 @@ +import "core-js"; +import config from "./config.js"; + + +export default class LogCollector { + + /** + * @returns {LogCollector} + */ + constructor(bucket, key) { + this.bucket = bucket; + this.key = key; + return this; + } + + /** + * Insert data async + * + * @param parser Parser + * @returns {Promise} + */ + insertAsync(parser) { + return parser.parseAsync(); + } +} diff --git a/src/aws/Elasticsearch.js b/src/aws/Elasticsearch.js new file mode 100644 index 0000000..8f1055d --- /dev/null +++ b/src/aws/Elasticsearch.js @@ -0,0 +1,17 @@ +import "core-js"; +import AWS from "./index.js"; + + +export default class Elasticsearch extends LogCollector { + constructor(bucket, key) { + super(bucket, key); + } + + insertAsync(parser) { + let self = this; + return parser.parseAsync().then(parsedParser => { + let jsonString = parsedParser.toJSON(); + + }); + } +} \ No newline at end of file diff --git a/src/gcp/BigQuery.js b/src/gcp/BigQuery.js index 8bbc256..cce2bce 100644 --- a/src/gcp/BigQuery.js +++ b/src/gcp/BigQuery.js @@ -2,10 +2,23 @@ import "core-js"; import Google from "googleapis"; import GcpOAuth2 from "./OAuth2.js"; import config from "../config.js"; +import LogCollector from "../LogCollector.js"; -export default class BigQuery { - constructor(tableId=config.gcp.bigQuery.tableId, datasetId=config.gcp.bigQuery.datasetId, projectId=config.gcp.projectId) { +export default class BigQuery extends LogCollector { + + /** + * @param bucket + * @param key + * @param tableId + * @param datasetId + * @param projectId + */ + constructor(bucket, key, + tableId=config.gcp.bigQuery.tableId, + datasetId=config.gcp.bigQuery.datasetId, + projectId=config.gcp.projectId) { + super(bucket, key); this.auth = new GcpOAuth2().auth; this.tableId = tableId; this.datasetId = datasetId; @@ -14,55 +27,36 @@ export default class BigQuery { version: "v2", auth: this.auth }); + + if (config.gcp.bigQuery.decideTableIdFromBucketName) { + this.tableId = bucket.replace(/\./g, "_"); + } } - insert(jsonObject, callback=()=>{}) { - console.log(jsonObject); + /** + * Insert json string given to BigQuery. + * + * @param parser Parser + * @returns {Promise} + */ + insertAsync(parser) { let self = this; - this.bigquery.tabledata.insertAll({ - "projectId": self.projectId, - "datasetId": self.datasetId, - "tableId": self.tableId, - "resource": { - "kind": "bigquery#tableDataInsertAllRequest", - "rows": [ - { - "json": jsonObject - } - ] - } - }, (error, result) => { - if (error) { - return console.error(error); - } - console.log(result); - callback(result); - }); - } + return parser.parseAsync().then(parsedParser => { + let jsonString = parsedParser.toJSON(); - insertAsync(jsonObject) { - return new Promise((resolve, reject) => { - console.log(`[insert] BigQuery`); - console.log(jsonObject); - let self = this; - this.bigquery.tabledata.insertAll({ + return new Promise((resolve, reject) => self.bigquery.tabledata.insertAll({ "projectId": self.projectId, "datasetId": self.datasetId, - "tableId": self.tableId, + "tableId": self.tableId, "resource": { "kind": "bigquery#tableDataInsertAllRequest", "rows": [ { - "json": jsonObject + "json": jsonString } ] } - }, (error, result) => { - if (error) { - return reject(error); - } - return resolve(result); - }) + }, (error, result) => error ? reject(error) : resolve(result))); }); } } diff --git a/src/main.js b/src/main.js index 3198271..6d1a5ca 100644 --- a/src/main.js +++ b/src/main.js @@ -1,5 +1,5 @@ import "core-js"; -import Parser from "./parser.js"; +import Parser from "./Parser.js"; import BigQuery from "./gcp/BigQuery.js"; import S3 from "./aws/S3.js"; @@ -10,19 +10,12 @@ export function handler(event, context) { let bucket = event.Records[0].s3.bucket.name; let key = event.Records[0].s3.object.key; - let [s3, bq] = [new S3(), new BigQuery()]; + let [s3, bq] = [new S3(), new BigQuery(bucket, key)]; - if (config.gcp.bigQuery.decideTableIdFromBucketName) { - bq.tableId = bucket.replace(/\./g, "_"); - } - s3.get(key, bucket).then(data => { - return Promise.all(data.toString().split(/\r\n|\r|\n/).slice(0, -1).map(line => { - console.log("splitted!", line); - return new Parser(line).parseAsync().then(data => - bq.insertAsync(data)); - })) - }) + s3.get(key, bucket).then(data => + Promise.all(data.toString().split(/\r\n|\r|\n/).slice(0, -1).map(line => + bq.insertAsync(new Parser(line))))) .catch(error => context.fail(error)) .then(msg => diff --git a/src/parser.js b/src/parser.js index 5375106..8b66bf8 100644 --- a/src/parser.js +++ b/src/parser.js @@ -9,7 +9,14 @@ export default class Parser { this.line = line; } - _object(parts) { + /** + * Mapping parts + * + * @param parts + * @returns {{}} + * @private + */ + _mapping(parts) { let data = {}; data.BucketOwner = parts[0]; data.Bucket = parts[1]; @@ -32,6 +39,13 @@ export default class Parser { return data; } + /** + * Set Time to unixtime + * + * @param data + * @returns {*} + * @private + */ _format(data) { data.Time = Moment(data.Time, "DD/MMM/YYYY:HH:mm:ss Z").utc().unix(); @@ -39,6 +53,11 @@ export default class Parser { return data; } + /** + * Parse async + * + * @returns {Promise}, Parser(self) + */ parseAsync() { let self = this; return new Promise(resolve => { @@ -46,6 +65,11 @@ export default class Parser { }); } + /** + * Parse and return self + * + * @returns {Parser} + */ parse() { let parts = []; let restString = this.line; @@ -72,10 +96,15 @@ export default class Parser { } } - this.data = this._format(this._object(parts)); - return this.data; + this.data = this._format(this._mapping(parts)); + return this; } + /** + * Get json string + * + * @returns string + */ toJSON() { return JSON.stringify(this.data); }