-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscriber.ts
104 lines (91 loc) · 2.68 KB
/
subscriber.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import redis from "redis";
import * as Minio from "minio";
import { rmdir, readdir, readFile } from "node:fs/promises";
import { join } from "node:path";
const getMinioClient = () => {
return new Minio.Client({
endPoint: "localhost",
port: 9000,
useSSL: false,
accessKey: "minio",
secretKey: "minio123",
});
};
const getRedisClient = () => {
return redis.createClient({
url: "redis://localhost:6379",
password: "redis",
});
};
(async () => {
const client = getRedisClient();
const subscriber = client.duplicate();
await subscriber.connect();
await subscriber.subscribe(
"__keyspace@0__:temp-video-encoder",
async (message: string) => {
if (message === "hset") {
await processMessage(message);
}
}
);
})();
async function processMessage(message: string) {
const client = getRedisClient();
const subscriber = client.duplicate();
await subscriber.connect();
const key = "temp-video-encoder";
const value = await subscriber.hGetAll(key);
const minioClient = getMinioClient();
Object.keys(value).forEach(async (key) => {
const videoName = key.split("/")[1];
await minioClient.fGetObject(
"video-encoder-temp",
videoName,
`temp/${videoName}`
);
});
const proc = Bun.spawnSync(["bash", "./script.sh"]);
if (proc.success) {
const files = await getFiles("temp");
for (const file of files) {
const fileName = file.split("/")[1];
try {
await subscriber.hDel(key, `video-encoder-temp/${fileName}`);
} catch (error) {
console.error(error);
}
}
await uploadFolder("final");
await rmdir("temp", { recursive: true });
await rmdir("final", { recursive: true });
}
await subscriber.disconnect();
}
async function getFiles(directoryPath: string) {
try {
const fileNames = await readdir(directoryPath);
const filePaths = fileNames.map((fn) => join(directoryPath, fn));
return filePaths;
} catch (err) {
console.error(err);
return [];
}
}
const uploadFolder = async (currentPath: string, prefix: string = "") => {
const bucketName = "video-encoder-final";
const minioClient = getMinioClient();
const items = await readdir(currentPath, { withFileTypes: true });
for (const item of items) {
const itemPath = join(currentPath, item.name);
console.log(itemPath);
const minioPath = prefix ? `${prefix}/${item.name}` : item.name;
if (item.isDirectory()) {
await uploadFolder(itemPath, minioPath);
} else {
const fileContent = await readFile(itemPath);
await minioClient.putObject(bucketName, minioPath, fileContent);
console.log(`Uploaded ${minioPath} to ${bucketName}`);
}
}
};