1 #include "exclusive_file.h" 2 #include "singleton.h" 3 #include "log.h" 4 5 #include <sys/stat.h> 6 #include <fcntl.h> 7 #include <string.h> 8 #include <errno.h> 9 10 #include <map> 11 using std::map; 12 13 #ifndef EXCL_BUFFER_SIZE 14 #define EXCL_BUFFER_SIZE 1024*1024 /* 1 MB */ 15 #endif 16 17 class _excl_file_reg 18 { 19 struct excl_file_entry 20 { 21 exclusive_file* excl_fp; 22 unsigned int ref_cnt; 23 24 excl_file_entry() 25 : excl_fp(NULL), 26 ref_cnt(0) 27 {} 28 }; 29 30 map<string,excl_file_entry> files; 31 AmMutex files_mut; 32 33 public: 34 exclusive_file* get(const string& name, bool& is_new) { 35 AmLock l(files_mut); 36 map<string,excl_file_entry>::iterator it = files.find(name); 37 if(it != files.end()) { 38 excl_file_entry& fe = it->second; 39 if(!fe.ref_cnt) { 40 ERROR("trying to re-open a file not yet closed"); 41 return NULL; 42 } 43 44 fe.ref_cnt++; 45 is_new = false; 46 return fe.excl_fp; 47 } 48 else { 49 exclusive_file* fp = new exclusive_file(name); 50 if(fp->open(is_new) < 0) { 51 ERROR("could not open '%s': %s",name.c_str(),strerror(errno)); 52 delete fp; 53 return NULL; 54 } 55 56 files[name].excl_fp = fp; 57 files[name].ref_cnt++; 58 59 if(is_new) fp->lock(); 60 return fp; 61 } 62 } 63 64 void deref(const string& name) { 65 AmLock l(files_mut); 66 map<string,excl_file_entry>::iterator it = files.find(name); 67 if(it != files.end()) { 68 excl_file_entry& fe = it->second; 69 if(!(--fe.ref_cnt)) { 70 // async delete 71 // - call close() 72 // - wait for notification of close before deleting 73 fe.excl_fp->close(); 74 } 75 } 76 } 77 78 bool delete_on_flushed(const string& name) { 79 AmLock l(files_mut); 80 map<string,excl_file_entry>::iterator it = files.find(name); 81 if(it != files.end()) { 82 excl_file_entry& fe = it->second; 83 if(!fe.ref_cnt) { 84 delete fe.excl_fp; 85 fe.excl_fp = NULL; 86 files.erase(it); 87 return true; 88 } 89 } 90 return false; 91 } 92 }; 93 94 typedef singleton<_excl_file_reg> excl_file_reg; 95 96 exclusive_file::exclusive_file(const string& name) 97 : async_file(EXCL_BUFFER_SIZE), 98 name(name),fd(-1) 99 {} 100 101 exclusive_file::~exclusive_file() 102 { 103 if(fd >= 0) 104 ::close(fd); 105 106 DBG("just closed %s",name.c_str()); 107 } 108 109 // 110 // TODO: add a close() method that closes the underlying async_file 111 // 112 113 int exclusive_file::open(bool& is_new) 114 { 115 if(fd != -1) { 116 ERROR("file already open\n"); 117 return -1; 118 } 119 120 fd = ::open(name.c_str(),O_WRONLY | O_CREAT | O_APPEND, 121 S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); 122 if(fd < 0) { 123 ERROR("could not open file '%s': %s", name.c_str(), strerror(errno)); 124 return -1; 125 } 126 127 if(lseek(fd,0,SEEK_END) > 0) { 128 is_new = false; 129 } 130 else { 131 is_new = true; 132 } 133 134 return 0; 135 } 136 137 int exclusive_file::write_to_file(const void* buf, unsigned int len) 138 { 139 int res = 0; 140 int retries = 0; 141 142 do { 143 res = ::write(fd, buf, len); 144 145 } while((res < 0) 146 && (errno == EINTR) 147 && (++retries < 10)); 148 149 if (res != (int)len) { 150 ERROR("writing to file '%s': %s\n",name.c_str(),strerror(errno)); 151 } 152 //else { 153 //DBG("%i bytes written to %s",res,name.c_str()); 154 //} 155 156 return res; 157 } 158 159 void exclusive_file::on_flushed() 160 { 161 excl_file_reg::instance()->delete_on_flushed(name); 162 } 163 164 int exclusive_file::open(const char* filename, 165 exclusive_file*& excl_fp, 166 bool& is_new) 167 { 168 excl_fp = excl_file_reg::instance()->get(filename,is_new); 169 if(!excl_fp) return -1; 170 return 0; 171 } 172 173 void exclusive_file::close(const exclusive_file* excl_fp) 174 { 175 if (excl_fp) { 176 excl_file_reg::instance()->deref(excl_fp->name); 177 } 178 } 179 180 int exclusive_file::write(const void *buf, int len) 181 { 182 DBG("async writting %i bytes to %s",len,name.c_str()); 183 return (int)async_file::write(buf,len); 184 } 185 186 int exclusive_file::writev(const struct iovec* iov, int iovcnt) 187 { 188 // int len=0; 189 // for(int i=0; i<iovcnt; i++) 190 // len += iov[i].iov_len; 191 //DBG("async writting (iov) %i bytes to %s",len,name.c_str()); 192 193 return (int)async_file::writev(iov,iovcnt); 194 } 195 196