-
Notifications
You must be signed in to change notification settings - Fork 10
/
filecache.cpp
308 lines (258 loc) · 7.61 KB
/
filecache.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
#include <string>
#include <map>
#include <queue>
#include <vector>
#include <syslog.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include "common.h"
#include "fileinfo.h"
#include "attrcache.h"
#include "filecache.h"
#include "s3request.h"
class cmp_file {
public:
bool operator ()(Filecache *&a, Filecache *&b) {
// we want older stuff sorting as higher
return a->time_enqueued > b->time_enqueued;
}
};
static std::map<std::string, Filecache *> open_files;
static std::priority_queue<Filecache *, std::vector<Filecache *>, cmp_file>
queue;
Filecache::Filecache(std::string path, bool exists) {
#ifdef DEBUG_CACHE
syslog(LOG_INFO, "Filecache::new[%s]", path.c_str());
#endif
this->path = path;
info = NULL;
fd = -1;
opencount = 0;
this->exists = exists;
deleted = false;
dirty_data = false;
dirty_metadata = false;
enqueued = false;
}
Filecache::~Filecache() {
#ifdef DEBUG_CACHE
syslog(LOG_INFO, "Filecache::delete[%s]", path.c_str());
#endif
if (info) {
delete info;
info = NULL;
}
if (fd >= 0) {
// ignore any error, because throwing exceptions
// from within dtor is frowned upon
close(fd);
fd = -1;
}
if (opencount)
syslog(LOG_ERR, "Filecache deleted with non-zero open count");
}
Filecache *Filecache::get(std::string path, mode_t mode) {
Filecache *res = NULL;
if (open_files.count(path) > 0) {
res = open_files[path];
} else {
res = new Filecache(path, !mode);
open_files[path] = res;
}
if (res->enqueued)
res->resurrected = true;
return res;
}
void Filecache::release() {
if (!opencount && !enqueued) {
// queue this up for flushing
enqueued = true;
resurrected = false;
time_enqueued = time(NULL);
queue.push(this);
}
}
Filecache *Filecache::from_queue() {
time_t now = time(NULL);
while (queue.size()) {
Filecache *file = queue.top();
queue.pop();
open_files.erase(file->path);
// was this accessed after being put in the queue?
if (file->resurrected) {
file->resurrected = false;
if (file->opencount) {
#ifdef DEBUG_CACHE
syslog(LOG_INFO, "Filecache::from_queue resurrecting open file[%s]",
file->path.c_str());
#endif
// if it is open, don't put it back in the queue
file->enqueued = false;
open_files[file->path] = file;
continue;
} else {
#ifdef DEBUG_CACHE
syslog(LOG_INFO, "Filecache::from_queue resurrecting file[%s]",
file->path.c_str());
#endif
// it's not open, so put it back with a new timestamp
file->time_enqueued = now;
queue.push(file);
open_files[file->path] = file;
continue;
}
}
if (now - file->time_enqueued >= writeback_delay) {
// return it with the lock held
#ifdef DEBUG_CACHE
syslog(LOG_INFO, "Filecache::from_queue expiring[%s]",
file->path.c_str());
#endif
return file;
}
// otherwise, it isn't old enough yet
queue.push(file);
open_files[file->path] = file;
break;
}
return NULL;
}
void Filecache::fsync() {
#ifdef DEBUG_CACHE
syslog(LOG_INFO, "Filecache::fsync[%s]", path.c_str());
#endif
// new file that was deleted before it was ever transmitted
if (!exists && deleted) {
#ifdef DEBUG_CACHE
syslog(LOG_INFO, "fsync: !exists && deleted");
#endif
// this is what caching is all about
}
// deleted file: clear the cache and remove it from the server
else if (deleted) {
#ifdef DEBUG_CACHE
syslog(LOG_INFO, "fsync: deleted");
#endif
attrcache->del(path);
S3request::remove(path);
exists = false;
}
// new file, or one that has changed: in either case transmit it
else if (!exists || dirty_data) {
#ifdef DEBUG_CACHE
syslog(LOG_INFO, "fsync: !exists || dirty_data");
#endif
// does this file have contents to be transmitted?
if (fd >= 0) {
struct stat st;
if (fstat(fd, &st) < 0)
throw -errno;
// grab the size from the local copy
info->size = st.st_size;
} else {
info->size = 0;
}
// clear the old entry from the cache, then update
attrcache->del(path);
S3request::put_file(info, fd);
attrcache->set(info);
exists = true;
dirty_data = false;
dirty_metadata = false;
}
// old file with metadata-only updates
else if (dirty_metadata) {
#ifdef DEBUG_CACHE
syslog(LOG_INFO, "fsync: dirty_metadata");
#endif
// special case: for the root, just update the cache
if (path == "/") {
attrcache->del(path);
attrcache->set(info);
return;
}
// clear the old entry from the cache, then update
attrcache->del(path);
S3request::set_fileinfo(path, info);
attrcache->set(info);
dirty_metadata = false;
}
// not new, not deleted, not updated; this is easy...
}
void Filecache::sync() {
#ifdef DEBUG_CACHE
syslog(LOG_INFO, "Filecache::sync all");
#endif
for (std::map<std::string, Filecache *>::iterator
it = open_files.begin();
it != open_files.end();
it++)
{
it->second->fsync();
}
}
void Filecache::syncdir(std::string path) {
#ifdef DEBUG_CACHE
syslog(LOG_INFO, "Filecache::sync all in directory[%s]", path.c_str());
#endif
for (std::map<std::string, Filecache *>::iterator
it = open_files.begin();
it != open_files.end();
it++)
{
if (in_directory(it->first.c_str(), path.c_str()))
it->second->fsync();
}
}
// return true if any files are currently open with the given prefix
bool Filecache::openfiles(std::string prefix) {
#ifdef DEBUG_CACHE
syslog(LOG_INFO, "Filecache::openfiles checking for prefix[%s]",
prefix.c_str());
#endif
for (std::map<std::string, Filecache *>::iterator
it = open_files.begin();
it != open_files.end();
it++)
{
Filecache *file = it->second;
if (file->path.length() >= prefix.length() &&
file->path.compare(0, prefix.length(), prefix) == 0 &&
file->opencount > 0)
{
return true;
}
}
return false;
}
// the loop for the cache flushing thread
void *flush_loop(void *param) {
pthread_mutex_lock(&global_lock);
// once we get the lock, we hold on to it until the queue is clear
// this prevents other transactions from happening, which prevents
// the backlog from getting too big. we only release the lock when
// we are ready to sleep
while (!flush_shutdown) {
Filecache *file = Filecache::from_queue();
if (file) {
try {
file->fsync();
} catch (int e) {
syslog(LOG_ERR, "flush_loop: fsync failed[%s]",
file->path.c_str());
}
file->release();
delete file;
} else {
pthread_mutex_unlock(&global_lock);
sleep(1);
pthread_mutex_lock(&global_lock);
}
}
// sync everything when shutting down
Filecache::sync();
pthread_mutex_unlock(&global_lock);
return NULL;
}