-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpush.js
More file actions
executable file
·141 lines (115 loc) · 4.18 KB
/
push.js
File metadata and controls
executable file
·141 lines (115 loc) · 4.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
const fs = require('fs');
const Config = require('./src/Config');
const JsonBuffer = require('./src/JsonBuffer');
const PushApi = require('./src/PushApi');
const StreamApi = require('./src/StreamApi');
function pushFile(config, file, dryRun = false) {
console.log(`Loading file: ${file}`);
if (dryRun) {
console.log('DRY-RUN, not pushing.');
return;
}
fs.readFile(file, async (err, data) => {
if (!err) {
try {
const payload = JSON.parse(data);
// quick validation of the payload
if (!payload || (!(payload instanceof Array) && !payload.AddOrUpdate && !payload.addOrUpdate)) {
console.warn(`\n\t !! Your payload seems to be in a wrong format !!\n\n\tMissing \x1b[33m\x1b[1m{ "AddOrUpdate": [] }\x1b[0m around your data?\n\n`);
}
if (config.useStreamApi) {
const streamHelper = new StreamApi(config);
await streamHelper.pushFile(payload);
} else {
console.log(`\nPushing one file to source: \x1b[33m\x1b[1m${config.source}\x1b[0m`);
const pushApiHelper = new PushApi(config);
await pushApiHelper.changeStatus('REBUILD');
await pushApiHelper.pushFile(payload);
await pushApiHelper.changeStatus('IDLE');
}
console.log(`\nDone\n`);
} catch (e) {
console.warn('Invalid payload.');
console.warn(e);
return;
}
} else {
console.log(`\nCouldn't read file "${file}": \n\t`, err);
}
});
}
function deleteBuffers() {
let buffers = fs.readdirSync('.').filter(fileName => fileName.startsWith('.pushapi.buffer.'));
buffers.forEach(fileName => {
console.log('deleting buffer: ', fileName);
fs.unlinkSync(fileName);
});
console.log('');
}
async function main(FILE_OR_FOLDER, argv = { deleteOlderThan: null }) {
try {
const dryRun = argv['dry-run'] ? true : false;
const config = new Config();
const pushApiHelper = new PushApi(config, dryRun);
if (argv.deleteOlderThan !== null) {
const orderingId = Date.now() - (argv.deleteOlderThan * 60 * 60 * 1000) - 1;
console.log(`Deleting items older than ${argv.deleteOlderThan} hours (${orderingId}).`);
await pushApiHelper.deleteOlderThan(orderingId);
}
if (dryRun) {
deleteBuffers();
}
let stats = fs.statSync(FILE_OR_FOLDER);
if (stats.isDirectory()) {
let _dir = process.cwd();
let folderName = FILE_OR_FOLDER;
// process every .json files in the folder as separate batch requests.
console.log(`Loading folder: ${_dir}/${folderName}`);
console.log('\nUpdate status for source: \x1b[33m \x1b[1m', config.source, '\x1b[0m');
if (!config.useStreamApi) await pushApiHelper.changeStatus('REBUILD');
let apiHelper = null;
try {
if (config.useStreamApi) {
apiHelper = new StreamApi(config);
await apiHelper.openStream();
}
else {
apiHelper = new PushApi(config);
}
}
catch (err) {
console.error(err.statusCode, err.statusMessage, (err.req && err.req.path || ''));
console.error(err.body || err);
}
let pushApiBuffer = new JsonBuffer(apiHelper, config, dryRun);
let files = fs.readdirSync(`${_dir}/${folderName}`);
// consider only .json files
files = files.filter(fileName => (/\.json$/.test(fileName)));
for (let fileName of files) {
await pushApiBuffer.addJsonFile(`${_dir}/${folderName}/${fileName}`);
}
await pushApiBuffer.sendBuffer();
if (config.useStreamApi) {
try {
await apiHelper.closeStream();
}
catch (err) {
console.error(err.statusCode, err.statusMessage, (err.req && err.req.path || ''));
console.error(err.body || err);
}
}
if (!config.useStreamApi) await pushApiHelper.changeStatus('IDLE');
console.log(`\nDone\n`);
} else if (stats.isFile()) {
pushFile(config, FILE_OR_FOLDER, argv['dry-run'] ? true : false);
} else if (argv.help) {
argv.help();
}
} catch (e) {
PushApi.throwError(e, 10);
}
}
exports.main = main;
exports.Config = Config;
exports.PushApi = PushApi;
exports.StreamApi = StreamApi;