1 /** @file
2 *
3 * Persistence interface for HostDB RefCountCache.
4 *
5 * @section license License
6 *
7 * Licensed to the Apache Software Foundation (ASF) under one
8 * or more contributor license agreements. See the NOTICE file
9 * distributed with this work for additional information
10 * regarding copyright ownership. The ASF licenses this file
11 * to you under the Apache License, Version 2.0 (the
12 * "License"); you may not use this file except in compliance
13 * with the License. You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24 #pragma once
25
26 #include "P_RefCountCache.h"
27
28 #include <utility>
29 #include <vector>
30
31 // This continuation is responsible for persisting RefCountCache to disk
32 // To avoid locking the partitions for a long time we'll do the following per-partition:
33 // - lock
34 // - copy ptrs (bump refcount)
35 // - unlock
36 // - persist
37 // - remove ptrs (drop refcount)
38 //
39 // This way we only have to hold the lock on the partition for the
40 // time it takes to get Ptr<>s to all items in the partition
41 template <class C> class RefCountCacheSerializer : public Continuation
42 {
43 public:
44 size_t partition; // Current partition
45 RefCountCache<C> *cache; // Pointer to the entire cache
46 Continuation *cont;
47
48 int copy_partition(int event, Event *e);
49 int write_partition(int event, Event *e);
50 int pause_event(int event, Event *e);
51
52 // Create the tmp file on disk we'll be writing to
53 int initialize_storage(int event, Event *e);
54 // do the final mv and close of file handle
55 int finalize_sync();
56
57 // helper method to spin on writes to disk
58 int write_to_disk(const void *, size_t);
59
60 RefCountCacheSerializer(Continuation *acont, RefCountCache<C> *cc, int frequency, std::string dirname, std::string filename);
61 ~RefCountCacheSerializer() override;
62
63 private:
64 std::vector<RefCountCacheHashEntry *> partition_items;
65
66 int fd; // fd for the file we are writing to
67
68 std::string dirname;
69 std::string filename;
70 std::string tmp_filename;
71
72 ink_hrtime time_per_partition;
73 ink_hrtime start;
74
75 int total_items;
76 int64_t total_size;
77
78 RecRawStatBlock *rsb;
79 };
80
81 template <class C>
RefCountCacheSerializer(Continuation * acont,RefCountCache<C> * cc,int frequency,std::string dirname,std::string filename)82 RefCountCacheSerializer<C>::RefCountCacheSerializer(Continuation *acont, RefCountCache<C> *cc, int frequency, std::string dirname,
83 std::string filename)
84 : Continuation(nullptr),
85 partition(0),
86 cache(cc),
87 cont(acont),
88 fd(-1),
89 dirname(std::move(dirname)),
90 filename(std::move(filename)),
91 time_per_partition(HRTIME_SECONDS(frequency) / cc->partition_count()),
92 start(Thread::get_hrtime()),
93 total_items(0),
94 total_size(0),
95 rsb(cc->get_rsb())
96 {
97 this->tmp_filename = this->filename + ".syncing"; // TODO tmp file extension configurable?
98
99 Debug("refcountcache", "started serializer %p", this);
100 SET_HANDLER(&RefCountCacheSerializer::initialize_storage);
101 eventProcessor.schedule_imm(this, ET_TASK);
102 }
103
~RefCountCacheSerializer()104 template <class C> RefCountCacheSerializer<C>::~RefCountCacheSerializer()
105 {
106 // If we failed before finalizing the on-disk copy, close up and nuke the temporary sync file.
107 if (this->fd != -1) {
108 unlink(this->tmp_filename.c_str());
109 socketManager.close(fd);
110 }
111
112 for (auto &entry : this->partition_items) {
113 RefCountCacheHashEntry::free<C>(entry);
114 }
115 this->partition_items.clear();
116
117 Debug("refcountcache", "finished serializer %p", this);
118
119 // Note that we have to do the unlink before we send the completion event, otherwise
120 // we could unlink the sync file out from under another serializer.
121
122 // Schedule off the REFCOUNT event, so the continuation gets properly locked
123 this_ethread()->schedule_imm(cont, REFCOUNT_CACHE_EVENT_SYNC);
124 }
125
126 template <class C>
127 int
copy_partition(int,Event * e)128 RefCountCacheSerializer<C>::copy_partition(int /* event */, Event *e)
129 {
130 if (partition >= cache->partition_count()) {
131 int error = this->finalize_sync();
132 if (error != 0) {
133 Warning("Unable to finalize sync of cache to disk %s: %s", this->filename.c_str(), strerror(-error));
134 }
135
136 Debug("refcountcache", "RefCountCacheSync done");
137 delete this;
138 return EVENT_DONE;
139 }
140
141 Debug("refcountcache", "sync partition=%ld/%ld", partition, cache->partition_count());
142 // copy the partition into our buffer, then we'll let `pauseEvent` write it out
143 this->partition_items.reserve(cache->get_partition(partition).count());
144 cache->get_partition(partition).copy(this->partition_items);
145 partition++;
146
147 SET_HANDLER(&RefCountCacheSerializer::write_partition);
148 mutex = e->ethread->mutex;
149 e->schedule_imm(ET_TASK);
150
151 return EVENT_CONT;
152 }
153
154 template <class C>
155 int
write_partition(int,Event * e)156 RefCountCacheSerializer<C>::write_partition(int /* event */, Event *e)
157 {
158 int curr_time = Thread::get_hrtime() / HRTIME_SECOND;
159
160 // write the partition to disk
161 // for item in this->partitionItems
162 // write to disk with headers per item
163
164 for (unsigned int i = 0; i < this->partition_items.size(); i++) {
165 RefCountCacheHashEntry *entry = this->partition_items[i];
166
167 // check if the item has expired, if so don't persist it to disk
168 if (entry->meta.expiry_time < curr_time) {
169 continue;
170 }
171
172 // Write the RefCountCacheItemMeta (as our header)
173 int ret = this->write_to_disk((char *)&entry->meta, sizeof(entry->meta));
174 if (ret < 0) {
175 Warning("Error writing cache item header to %s: %s", this->tmp_filename.c_str(), strerror(-ret));
176 delete this;
177 return EVENT_DONE;
178 }
179
180 // write the actual object now
181 ret = this->write_to_disk((char *)entry->item.get(), entry->meta.size);
182 if (ret < 0) {
183 Warning("Error writing cache item to %s: %s", this->tmp_filename.c_str(), strerror(-ret));
184 delete this;
185 return EVENT_DONE;
186 }
187
188 this->total_items++;
189 this->total_size += entry->meta.size;
190 }
191
192 // Clear the copied partition for the next round.
193 for (auto &entry : this->partition_items) {
194 RefCountCacheHashEntry::free<C>(entry);
195 }
196 this->partition_items.clear();
197
198 SET_HANDLER(&RefCountCacheSerializer::pause_event);
199
200 // Figure out how much time we spent
201 ink_hrtime elapsed = Thread::get_hrtime() - this->start;
202 ink_hrtime expected_elapsed = (this->partition * this->time_per_partition);
203
204 // If we were quicker than our pace-- lets reschedule in the future
205 if (elapsed < expected_elapsed) {
206 e->schedule_in(expected_elapsed - elapsed, ET_TASK);
207 } else { // Otherwise we were too slow-- and need to go now!
208 e->schedule_imm(ET_TASK);
209 }
210 return EVENT_CONT;
211 }
212
213 template <class C>
214 int
pause_event(int,Event * e)215 RefCountCacheSerializer<C>::pause_event(int /* event */, Event *e)
216 {
217 // Schedule up the next partition
218 if (partition < cache->partition_count()) {
219 mutex = cache->get_partition(partition).lock.get();
220 } else {
221 mutex = cont->mutex;
222 }
223
224 SET_HANDLER(&RefCountCacheSerializer::copy_partition);
225 e->schedule_imm(ET_TASK);
226 return EVENT_CONT;
227 }
228
229 // Open the tmp file, etc.
230 template <class C>
231 int
initialize_storage(int,Event * e)232 RefCountCacheSerializer<C>::initialize_storage(int /* event */, Event *e)
233 {
234 this->fd = socketManager.open(this->tmp_filename.c_str(), O_TRUNC | O_RDWR | O_CREAT, 0644); // TODO: configurable perms
235 if (this->fd < 0) {
236 Warning("Unable to create temporary file %s, unable to persist hostdb: %s", this->tmp_filename.c_str(), strerror(errno));
237 delete this;
238 return EVENT_DONE;
239 }
240
241 // Write out the header
242 int ret = this->write_to_disk((char *)&this->cache->get_header(), sizeof(RefCountCacheHeader));
243 if (ret < 0) {
244 Warning("Error writing cache header to %s: %s", this->tmp_filename.c_str(), strerror(-ret));
245 delete this;
246 return EVENT_DONE;
247 }
248
249 SET_HANDLER(&RefCountCacheSerializer::pause_event);
250 e->schedule_imm(ET_TASK);
251 return EVENT_CONT;
252 }
253
254 // Do the final mv and close of file handle. Only reset "fd" to -1 if we fully succeed.
255 // Returns 0 on success, -errno on failure.
256 template <class C>
257 int
finalize_sync()258 RefCountCacheSerializer<C>::finalize_sync()
259 {
260 int error; // Socket manager return 0 or -errno.
261 int dirfd = -1;
262
263 // fsync the fd we have
264 if ((error = socketManager.fsync(this->fd))) {
265 return error;
266 }
267
268 #ifdef O_DIRECTORY
269 dirfd = socketManager.open(this->dirname.c_str(), O_DIRECTORY);
270 #else
271 struct stat st;
272 stat(this->dirname.c_str(), &st);
273 if (!S_ISDIR(st.st_mode)) {
274 return -ENOTDIR;
275 }
276 dirfd = socketManager.open(this->dirname.c_str(), 0);
277 #endif
278 if (dirfd == -1) {
279 return -errno;
280 }
281
282 // Rename from the temp name to the real name.
283 if (rename(this->tmp_filename.c_str(), this->filename.c_str()) != 0) {
284 error = -errno;
285 socketManager.close(dirfd);
286 return error;
287 }
288
289 // Fsync the directory to persist the rename.
290 if ((error = socketManager.fsync(dirfd))) {
291 socketManager.close(dirfd);
292 return error;
293 }
294
295 // Don't bother checking for errors on the close since there's nothing we can do about it at
296 // this point anyway.
297 socketManager.close(dirfd);
298 socketManager.close(this->fd);
299 this->fd = -1;
300
301 if (this->rsb) {
302 RecSetRawStatCount(this->rsb, refcountcache_last_sync_time, Thread::get_hrtime() / HRTIME_SECOND);
303 RecSetRawStatCount(this->rsb, refcountcache_last_total_items, this->total_items);
304 RecSetRawStatCount(this->rsb, refcountcache_last_total_size, this->total_size);
305 }
306
307 return 0;
308 }
309
310 // Write *i to this->fd, if there is an error we'll just stop this continuation
311 // TODO: reschedule the continuation if the disk was busy?
312 template <class C>
313 int
write_to_disk(const void * ptr,size_t n_bytes)314 RefCountCacheSerializer<C>::write_to_disk(const void *ptr, size_t n_bytes)
315 {
316 size_t written = 0;
317 while (written < n_bytes) {
318 int ret = socketManager.write(this->fd, (char *)ptr + written, n_bytes - written);
319 if (ret <= 0) {
320 return ret;
321 } else {
322 written += ret;
323 }
324 }
325 return 0;
326 }
327