1 /** @file
2  *
3  *  Persistence interface for HostDB RefCountCache.
4  *
5  *  @section license License
6  *
7  *  Licensed to the Apache Software Foundation (ASF) under one
8  *  or more contributor license agreements.  See the NOTICE file
9  *  distributed with this work for additional information
10  *  regarding copyright ownership.  The ASF licenses this file
11  *  to you under the Apache License, Version 2.0 (the
12  *  "License"); you may not use this file except in compliance
13  *  with the License.  You may obtain a copy of the License at
14  *
15  *      http://www.apache.org/licenses/LICENSE-2.0
16  *
17  *  Unless required by applicable law or agreed to in writing, software
18  *  distributed under the License is distributed on an "AS IS" BASIS,
19  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20  *  See the License for the specific language governing permissions and
21  *  limitations under the License.
22  */
23 
24 #pragma once
25 
26 #include "P_RefCountCache.h"
27 
28 #include <utility>
29 #include <vector>
30 
31 // This continuation is responsible for persisting RefCountCache to disk
32 // To avoid locking the partitions for a long time we'll do the following per-partition:
33 //    - lock
34 //    - copy ptrs (bump refcount)
35 //    - unlock
36 //    - persist
37 //    - remove ptrs (drop refcount)
38 //
39 // This way we only have to hold the lock on the partition for the
40 // time it takes to get Ptr<>s to all items in the partition
41 template <class C> class RefCountCacheSerializer : public Continuation
42 {
43 public:
44   size_t partition;        // Current partition
45   RefCountCache<C> *cache; // Pointer to the entire cache
46   Continuation *cont;
47 
48   int copy_partition(int event, Event *e);
49   int write_partition(int event, Event *e);
50   int pause_event(int event, Event *e);
51 
52   // Create the tmp file on disk we'll be writing to
53   int initialize_storage(int event, Event *e);
54   // do the final mv and close of file handle
55   int finalize_sync();
56 
57   // helper method to spin on writes to disk
58   int write_to_disk(const void *, size_t);
59 
60   RefCountCacheSerializer(Continuation *acont, RefCountCache<C> *cc, int frequency, std::string dirname, std::string filename);
61   ~RefCountCacheSerializer() override;
62 
63 private:
64   std::vector<RefCountCacheHashEntry *> partition_items;
65 
66   int fd; // fd for the file we are writing to
67 
68   std::string dirname;
69   std::string filename;
70   std::string tmp_filename;
71 
72   ink_hrtime time_per_partition;
73   ink_hrtime start;
74 
75   int total_items;
76   int64_t total_size;
77 
78   RecRawStatBlock *rsb;
79 };
80 
81 template <class C>
RefCountCacheSerializer(Continuation * acont,RefCountCache<C> * cc,int frequency,std::string dirname,std::string filename)82 RefCountCacheSerializer<C>::RefCountCacheSerializer(Continuation *acont, RefCountCache<C> *cc, int frequency, std::string dirname,
83                                                     std::string filename)
84   : Continuation(nullptr),
85     partition(0),
86     cache(cc),
87     cont(acont),
88     fd(-1),
89     dirname(std::move(dirname)),
90     filename(std::move(filename)),
91     time_per_partition(HRTIME_SECONDS(frequency) / cc->partition_count()),
92     start(Thread::get_hrtime()),
93     total_items(0),
94     total_size(0),
95     rsb(cc->get_rsb())
96 {
97   this->tmp_filename = this->filename + ".syncing"; // TODO tmp file extension configurable?
98 
99   Debug("refcountcache", "started serializer %p", this);
100   SET_HANDLER(&RefCountCacheSerializer::initialize_storage);
101   eventProcessor.schedule_imm(this, ET_TASK);
102 }
103 
~RefCountCacheSerializer()104 template <class C> RefCountCacheSerializer<C>::~RefCountCacheSerializer()
105 {
106   // If we failed before finalizing the on-disk copy, close up and nuke the temporary sync file.
107   if (this->fd != -1) {
108     unlink(this->tmp_filename.c_str());
109     socketManager.close(fd);
110   }
111 
112   for (auto &entry : this->partition_items) {
113     RefCountCacheHashEntry::free<C>(entry);
114   }
115   this->partition_items.clear();
116 
117   Debug("refcountcache", "finished serializer %p", this);
118 
119   // Note that we have to do the unlink before we send the completion event, otherwise
120   // we could unlink the sync file out from under another serializer.
121 
122   // Schedule off the REFCOUNT event, so the continuation gets properly locked
123   this_ethread()->schedule_imm(cont, REFCOUNT_CACHE_EVENT_SYNC);
124 }
125 
126 template <class C>
127 int
copy_partition(int,Event * e)128 RefCountCacheSerializer<C>::copy_partition(int /* event */, Event *e)
129 {
130   if (partition >= cache->partition_count()) {
131     int error = this->finalize_sync();
132     if (error != 0) {
133       Warning("Unable to finalize sync of cache to disk %s: %s", this->filename.c_str(), strerror(-error));
134     }
135 
136     Debug("refcountcache", "RefCountCacheSync done");
137     delete this;
138     return EVENT_DONE;
139   }
140 
141   Debug("refcountcache", "sync partition=%ld/%ld", partition, cache->partition_count());
142   // copy the partition into our buffer, then we'll let `pauseEvent` write it out
143   this->partition_items.reserve(cache->get_partition(partition).count());
144   cache->get_partition(partition).copy(this->partition_items);
145   partition++;
146 
147   SET_HANDLER(&RefCountCacheSerializer::write_partition);
148   mutex = e->ethread->mutex;
149   e->schedule_imm(ET_TASK);
150 
151   return EVENT_CONT;
152 }
153 
154 template <class C>
155 int
write_partition(int,Event * e)156 RefCountCacheSerializer<C>::write_partition(int /* event */, Event *e)
157 {
158   int curr_time = Thread::get_hrtime() / HRTIME_SECOND;
159 
160   // write the partition to disk
161   // for item in this->partitionItems
162   // write to disk with headers per item
163 
164   for (unsigned int i = 0; i < this->partition_items.size(); i++) {
165     RefCountCacheHashEntry *entry = this->partition_items[i];
166 
167     // check if the item has expired, if so don't persist it to disk
168     if (entry->meta.expiry_time < curr_time) {
169       continue;
170     }
171 
172     // Write the RefCountCacheItemMeta (as our header)
173     int ret = this->write_to_disk((char *)&entry->meta, sizeof(entry->meta));
174     if (ret < 0) {
175       Warning("Error writing cache item header to %s: %s", this->tmp_filename.c_str(), strerror(-ret));
176       delete this;
177       return EVENT_DONE;
178     }
179 
180     // write the actual object now
181     ret = this->write_to_disk((char *)entry->item.get(), entry->meta.size);
182     if (ret < 0) {
183       Warning("Error writing cache item to %s: %s", this->tmp_filename.c_str(), strerror(-ret));
184       delete this;
185       return EVENT_DONE;
186     }
187 
188     this->total_items++;
189     this->total_size += entry->meta.size;
190   }
191 
192   // Clear the copied partition for the next round.
193   for (auto &entry : this->partition_items) {
194     RefCountCacheHashEntry::free<C>(entry);
195   }
196   this->partition_items.clear();
197 
198   SET_HANDLER(&RefCountCacheSerializer::pause_event);
199 
200   // Figure out how much time we spent
201   ink_hrtime elapsed          = Thread::get_hrtime() - this->start;
202   ink_hrtime expected_elapsed = (this->partition * this->time_per_partition);
203 
204   // If we were quicker than our pace-- lets reschedule in the future
205   if (elapsed < expected_elapsed) {
206     e->schedule_in(expected_elapsed - elapsed, ET_TASK);
207   } else { // Otherwise we were too slow-- and need to go now!
208     e->schedule_imm(ET_TASK);
209   }
210   return EVENT_CONT;
211 }
212 
213 template <class C>
214 int
pause_event(int,Event * e)215 RefCountCacheSerializer<C>::pause_event(int /* event */, Event *e)
216 {
217   // Schedule up the next partition
218   if (partition < cache->partition_count()) {
219     mutex = cache->get_partition(partition).lock.get();
220   } else {
221     mutex = cont->mutex;
222   }
223 
224   SET_HANDLER(&RefCountCacheSerializer::copy_partition);
225   e->schedule_imm(ET_TASK);
226   return EVENT_CONT;
227 }
228 
229 // Open the tmp file, etc.
230 template <class C>
231 int
initialize_storage(int,Event * e)232 RefCountCacheSerializer<C>::initialize_storage(int /* event */, Event *e)
233 {
234   this->fd = socketManager.open(this->tmp_filename.c_str(), O_TRUNC | O_RDWR | O_CREAT, 0644); // TODO: configurable perms
235   if (this->fd < 0) {
236     Warning("Unable to create temporary file %s, unable to persist hostdb: %s", this->tmp_filename.c_str(), strerror(errno));
237     delete this;
238     return EVENT_DONE;
239   }
240 
241   // Write out the header
242   int ret = this->write_to_disk((char *)&this->cache->get_header(), sizeof(RefCountCacheHeader));
243   if (ret < 0) {
244     Warning("Error writing cache header to %s: %s", this->tmp_filename.c_str(), strerror(-ret));
245     delete this;
246     return EVENT_DONE;
247   }
248 
249   SET_HANDLER(&RefCountCacheSerializer::pause_event);
250   e->schedule_imm(ET_TASK);
251   return EVENT_CONT;
252 }
253 
254 // Do the final mv and close of file handle. Only reset "fd" to -1 if we fully succeed.
255 // Returns 0 on success, -errno on failure.
256 template <class C>
257 int
finalize_sync()258 RefCountCacheSerializer<C>::finalize_sync()
259 {
260   int error; // Socket manager return 0 or -errno.
261   int dirfd = -1;
262 
263   // fsync the fd we have
264   if ((error = socketManager.fsync(this->fd))) {
265     return error;
266   }
267 
268 #ifdef O_DIRECTORY
269   dirfd = socketManager.open(this->dirname.c_str(), O_DIRECTORY);
270 #else
271   struct stat st;
272   stat(this->dirname.c_str(), &st);
273   if (!S_ISDIR(st.st_mode)) {
274     return -ENOTDIR;
275   }
276   dirfd = socketManager.open(this->dirname.c_str(), 0);
277 #endif
278   if (dirfd == -1) {
279     return -errno;
280   }
281 
282   // Rename from the temp name to the real name.
283   if (rename(this->tmp_filename.c_str(), this->filename.c_str()) != 0) {
284     error = -errno;
285     socketManager.close(dirfd);
286     return error;
287   }
288 
289   // Fsync the directory to persist the rename.
290   if ((error = socketManager.fsync(dirfd))) {
291     socketManager.close(dirfd);
292     return error;
293   }
294 
295   // Don't bother checking for errors on the close since there's nothing we can do about it at
296   // this point anyway.
297   socketManager.close(dirfd);
298   socketManager.close(this->fd);
299   this->fd = -1;
300 
301   if (this->rsb) {
302     RecSetRawStatCount(this->rsb, refcountcache_last_sync_time, Thread::get_hrtime() / HRTIME_SECOND);
303     RecSetRawStatCount(this->rsb, refcountcache_last_total_items, this->total_items);
304     RecSetRawStatCount(this->rsb, refcountcache_last_total_size, this->total_size);
305   }
306 
307   return 0;
308 }
309 
310 // Write *i to this->fd, if there is an error we'll just stop this continuation
311 // TODO: reschedule the continuation if the disk was busy?
312 template <class C>
313 int
write_to_disk(const void * ptr,size_t n_bytes)314 RefCountCacheSerializer<C>::write_to_disk(const void *ptr, size_t n_bytes)
315 {
316   size_t written = 0;
317   while (written < n_bytes) {
318     int ret = socketManager.write(this->fd, (char *)ptr + written, n_bytes - written);
319     if (ret <= 0) {
320       return ret;
321     } else {
322       written += ret;
323     }
324   }
325   return 0;
326 }
327