1 #include "exclusive_file.h"
2 #include "singleton.h"
3 #include "log.h"
4 
5 #include <sys/stat.h>
6 #include <fcntl.h>
7 #include <string.h>
8 #include <errno.h>
9 
10 #include <map>
11 using std::map;
12 
13 #ifndef EXCL_BUFFER_SIZE
14 #define EXCL_BUFFER_SIZE 1024*1024 /* 1 MB */
15 #endif
16 
17 class _excl_file_reg
18 {
19   struct excl_file_entry
20   {
21     exclusive_file* excl_fp;
22     unsigned int    ref_cnt;
23 
24     excl_file_entry()
25       : excl_fp(NULL),
26         ref_cnt(0)
27     {}
28   };
29 
30   map<string,excl_file_entry> files;
31   AmMutex                     files_mut;
32 
33 public:
34   exclusive_file* get(const string& name, bool& is_new) {
35     AmLock l(files_mut);
36     map<string,excl_file_entry>::iterator it = files.find(name);
37     if(it != files.end()) {
38       excl_file_entry& fe = it->second;
39       if(!fe.ref_cnt) {
40         ERROR("trying to re-open a file not yet closed");
41         return NULL;
42       }
43 
44       fe.ref_cnt++;
45       is_new = false;
46       return fe.excl_fp;
47     }
48     else {
49       exclusive_file* fp = new exclusive_file(name);
50       if(fp->open(is_new) < 0) {
51         ERROR("could not open '%s': %s",name.c_str(),strerror(errno));
52         delete fp;
53         return NULL;
54       }
55 
56       files[name].excl_fp = fp;
57       files[name].ref_cnt++;
58 
59       if(is_new) fp->lock();
60       return fp;
61     }
62   }
63 
64   void deref(const string& name) {
65     AmLock l(files_mut);
66     map<string,excl_file_entry>::iterator it = files.find(name);
67     if(it != files.end()) {
68       excl_file_entry& fe = it->second;
69       if(!(--fe.ref_cnt)) {
70         // async delete
71         //  - call close()
72         //  - wait for notification of close before deleting
73         fe.excl_fp->close();
74       }
75     }
76   }
77 
78   bool delete_on_flushed(const string& name) {
79     AmLock l(files_mut);
80     map<string,excl_file_entry>::iterator it = files.find(name);
81     if(it != files.end()) {
82       excl_file_entry& fe = it->second;
83       if(!fe.ref_cnt) {
84         delete fe.excl_fp;
85         fe.excl_fp = NULL;
86         files.erase(it);
87         return true;
88       }
89     }
90     return false;
91   }
92 };
93 
94 typedef singleton<_excl_file_reg> excl_file_reg;
95 
96 exclusive_file::exclusive_file(const string& name)
97   : async_file(EXCL_BUFFER_SIZE),
98     name(name),fd(-1)
99 {}
100 
101 exclusive_file::~exclusive_file()
102 {
103   if(fd >= 0)
104     ::close(fd);
105 
106   DBG("just closed %s",name.c_str());
107 }
108 
109 //
110 // TODO: add a close() method that closes the underlying async_file
111 //
112 
113 int exclusive_file::open(bool& is_new)
114 {
115   if(fd != -1) {
116     ERROR("file already open\n");
117     return -1;
118   }
119 
120   fd = ::open(name.c_str(),O_WRONLY | O_CREAT | O_APPEND,
121 	      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
122   if(fd < 0) {
123     ERROR("could not open file '%s': %s", name.c_str(), strerror(errno));
124     return -1;
125   }
126 
127   if(lseek(fd,0,SEEK_END) > 0) {
128     is_new = false;
129   }
130   else {
131     is_new = true;
132   }
133 
134   return 0;
135 }
136 
137 int exclusive_file::write_to_file(const void* buf, unsigned int len)
138 {
139   int res = 0;
140   int retries = 0;
141 
142   do {
143     res = ::write(fd, buf, len);
144 
145   } while((res < 0)
146           && (errno == EINTR)
147           && (++retries < 10));
148 
149   if (res != (int)len) {
150     ERROR("writing to file '%s': %s\n",name.c_str(),strerror(errno));
151   }
152   //else {
153   //DBG("%i bytes written to %s",res,name.c_str());
154   //}
155 
156   return res;
157 }
158 
159 void exclusive_file::on_flushed()
160 {
161   excl_file_reg::instance()->delete_on_flushed(name);
162 }
163 
164 int exclusive_file::open(const char* filename,
165                          exclusive_file*& excl_fp,
166                          bool& is_new)
167 {
168   excl_fp = excl_file_reg::instance()->get(filename,is_new);
169   if(!excl_fp) return -1;
170   return 0;
171 }
172 
173 void exclusive_file::close(const exclusive_file* excl_fp)
174 {
175   if (excl_fp) {
176     excl_file_reg::instance()->deref(excl_fp->name);
177   }
178 }
179 
180 int exclusive_file::write(const void *buf, int len)
181 {
182   DBG("async writting %i bytes to %s",len,name.c_str());
183   return (int)async_file::write(buf,len);
184 }
185 
186 int exclusive_file::writev(const struct iovec* iov, int iovcnt)
187 {
188   // int len=0;
189   // for(int i=0; i<iovcnt; i++)
190   //   len += iov[i].iov_len;
191   //DBG("async writting (iov) %i bytes to %s",len,name.c_str());
192 
193   return (int)async_file::writev(iov,iovcnt);
194 }
195 
196