-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.js
132 lines (112 loc) · 3.98 KB
/
main.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
const fs = require("node:fs");
const {checkFileExists, waitLine} = require("./utils");
const { exit } = require("node:process");
const axios = require('axios').default
// 输入conf
const ConfPath = './config.json';
const DefaultUpParrallelNum = 10;
const DefaultUpStep = 1;
async function inputConf() {
conf = {
"comment": [
"// 注释:",
"// Laf console上获取信息,注意信息安全。终端黑框中右键可以粘贴。",
"// 注意只修改双引号里的字符!源码地址:https://github.com/sysucats/upload_laf_db"
],
};
console.log(conf.comment.slice(0, -1).join("\n"));
// 获取输入
conf.url = await waitLine("Laf addRecords 函数地址:");
conf.collections = (await waitLine("Laf数据表名,用英文逗号分隔 collections, split by comma ',':")).split(',');
conf.parallelNum = await waitLine(`并发数,parallel uploading number [回车默认 default "${DefaultUpParrallelNum}"]:`);
conf.parallelNum = conf.parallelNum ? parseInt(conf.parallelNum) : DefaultUpParrallelNum;
conf.step = await waitLine(`单次上传条数,single upload count [回车默认 default "${DefaultUpStep}"]:`);
conf.step = conf.step ? parseInt(conf.step) : DefaultUpStep;
await fs.promises.writeFile(ConfPath, JSON.stringify(conf, null, 4));
}
var conf = null;
async function readConfig() {
var dirExists = await checkFileExists(ConfPath);
if (!dirExists) {
await inputConf();
}
conf = JSON.parse(fs.readFileSync(ConfPath, 'utf-8'));
return true;
}
async function uploadLines(lines, coll, threadID) {
const steps = lines.length / conf.step + 1;
var count = 0;
var retry_times = 5;
for (let i = 0; i < steps; i++) {
const batch = lines.slice(i * conf.step, (i + 1) * conf.step);
if (!batch.length) {
continue;
}
count += batch.length;
var batchStr = JSON.stringify(batch);
batchStr = Buffer.from(batchStr, "utf-8").toString('base64');
var pack = {
collection: coll,
data: batchStr
}
while (retry_times) {
try {
await axios.post(conf.url, pack);
break;
} catch {
console.log(`[thread-${threadID}][coll-${coll}] ${count}/${lines.length} retry...`)
retry_times --;
}
}
console.log(`[thread-${threadID}][coll-${coll}] ${count}/${lines.length} done.`)
}
}
async function uploadColl(coll) {
// 读取多行json文件
console.log(`Uploading collection: ${coll}`);
const collFilePath = `./data/${coll}.json`;
if (!await checkFileExists(collFilePath)) {
console.log(`[ERROR] Data file not exist! file path: "${collFilePath}"`);
return false;
}
const lines = fs.readFileSync(collFilePath, 'utf-8').trim().split("\n");
var poolLines = [];
for (let i = 0; i < lines.length; i++) {
const pi = i % conf.parallelNum;
if (poolLines.length < conf.parallelNum) {
poolLines.push([]);
}
try {
poolLines[pi].push(JSON.parse(lines[i]));
} catch (e) {
console.error(`[ERROR] line ${i}: "${lines[i]}"`);
throw e;
}
}
// 并发上传
var pool = [];
for (let i = 0; i < poolLines.length; i++) {
pool.push(uploadLines(poolLines[i], coll, i));
}
await Promise.all(pool);
}
async function main() {
const readSuccess = await readConfig();
if (!readSuccess) {
await waitLine(`\n======== [ERROR] read config error! Press "Enter" to exit. ========`);
exit();
}
// 处理多个collection
for (var coll of conf.collections) {
coll = coll.trim();
await uploadColl(coll);
}
await waitLine(`\n======== Press "Enter" to exit. ========`);
exit();
}
try {
main();
} catch (error) {
console.log(error);
waitLine(`\n======== Press "Enter" to exit. ========`);
}