1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 1998-2016. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20 21%% Read wrap files with internal format 22 23-module(wrap_log_reader). 24 25%%-define(debug, true). 26-ifdef(debug). 27-define(FORMAT(P, A), io:format(P, A)). 28-else. 29-define(FORMAT(P, A), ok). 30-endif. 31 32-export([open/1, open/2, chunk/1, chunk/2, close/1]). 33 34-export_type([continuation/0]). 35 36-include("disk_log.hrl"). 37 38-record(wrap_reader, 39 {fd :: file:fd(), 40 cont :: dlog_cont(), % disk_log's continuation record 41 file :: file:filename(), % file name without extension 42 file_no :: non_neg_integer(), % current file number 43 mod_time :: file:date_time(), % modification time of current file 44 first_no :: non_neg_integer() | 'one' % first read file number 45 }). 46 47-opaque continuation() :: #wrap_reader{}. 48 49%% 50%% Exported functions 51%% 52 53%% A special case to be handled when appropriate: if current file 54%% number is one greater than number of files then the max file number 55%% is not yet reached, we are on the first 'round' of filling the wrap 56%% files. 57 58-type open_ret() :: {'ok', Continuation :: continuation()} 59 | {'error', Reason :: tuple()}. 60 61-spec open(Filename) -> open_ret() when 62 Filename :: string() | atom(). 63 64open(File) when is_atom(File) -> 65 open(atom_to_list(File)); 66open(File) when is_list(File) -> 67 case read_index_file(File) of 68 %% The special case described above. 69 {ok, {CurFileNo, _CurFileSz, _TotSz, NoOfFiles}} 70 when CurFileNo =:= NoOfFiles + 1 -> 71 FileNo = 1, 72 ?FORMAT("open from ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n", 73 [FileNo, CurFileNo, _CurFileSz, _TotSz, NoOfFiles]), 74 open_int(File, FileNo, FileNo); 75 {ok, {CurFileNo, _CurFileSz, _TotSz, NoOfFiles}} -> 76 FileNo = case (CurFileNo + 1) rem NoOfFiles of 77 0 -> NoOfFiles; 78 No -> No 79 end, 80 ?FORMAT("open from ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n", 81 [FileNo, CurFileNo, _CurFileSz, _TotSz, NoOfFiles]), 82 open_int(File, FileNo, FileNo); 83 Error -> 84 Error 85 end. 86 87-spec open(Filename, N) -> open_ret() when 88 Filename :: string() | atom(), 89 N :: integer(). 90 91open(File, FileNo) when is_atom(File), is_integer(FileNo) -> 92 open(atom_to_list(File), FileNo); 93open(File, FileNo) when is_list(File), is_integer(FileNo) -> 94 case read_index_file(File) of 95 {ok, {_CurFileNo, _CurFileSz, _TotSz, NoOfFiles}} 96 when NoOfFiles >= FileNo -> 97 ?FORMAT("open file ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n", 98 [FileNo, _CurFileNo, _CurFileSz, _TotSz, NoOfFiles]), 99 open_int(File, FileNo, one); 100 %% The special case described above. 101 {ok, {CurFileNo, _CurFileSz, _TotSz, NoOfFiles}} 102 when CurFileNo =:= FileNo, CurFileNo =:= NoOfFiles +1 -> 103 ?FORMAT("open file ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n", 104 [FileNo, CurFileNo, _CurFileSz, _TotSz, NoOfFiles]), 105 open_int(File, FileNo, one); 106 {ok, {_CurFileNo, _CurFileSz, _TotSz, _NoOfFiles}} -> 107 {error, {file_not_found, add_ext(File, FileNo)}}; 108 Error -> 109 Error 110 end. 111 112-spec close(Continuation) -> 'ok' | {'error', Reason} when 113 Continuation :: continuation(), 114 Reason :: file:posix(). 115 116close(#wrap_reader{fd = FD}) -> 117 file:close(FD). 118 119-type chunk_ret() :: {Continuation2, Terms :: [term()]} 120 | {Continuation2, 121 Terms :: [term()], 122 Badbytes :: non_neg_integer()} 123 | {Continuation2, 'eof'} 124 | {'error', Reason :: term()}. 125 126-spec chunk(Continuation) -> chunk_ret() when 127 Continuation :: continuation(). 128 129chunk(WR = #wrap_reader{}) -> 130 chunk(WR, ?MAX_CHUNK_SIZE, 0). 131 132-spec chunk(Continuation, N) -> chunk_ret() when 133 Continuation :: continuation(), 134 N :: infinity | pos_integer(). 135 136chunk(WR = #wrap_reader{}, infinity) -> 137 chunk(WR, ?MAX_CHUNK_SIZE, 0); 138chunk(WR = #wrap_reader{}, N) when is_integer(N), N > 0 -> 139 chunk(WR, N, 0). 140 141%% 142%% Local functions 143%% 144 145open_int(File, FileNo, FirstFileNo) -> 146 FName = add_ext(File, FileNo), 147 case file:open(FName, [raw, binary, read]) of 148 {ok, Fd} -> %% File exists 149 case file:read(Fd, ?HEADSZ) of 150 {ok, Head} -> 151 case disk_log_1:is_head(Head) of 152 no -> 153 _ = file:close(Fd), 154 {error, {not_a_log_file, FName}}; 155 _ -> % yes or yes_not_closed 156 case last_mod_time(FName) of 157 {ok, ModTime} -> 158 WR = #wrap_reader{fd = Fd, cont = start, 159 file = File, 160 file_no = FileNo, 161 mod_time = ModTime, 162 first_no = FirstFileNo}, 163 {ok, WR}; 164 {error, E} -> 165 _ = file:close(Fd), 166 {error, {file_error, FName, E}} 167 end 168 end; 169 _Other -> 170 _ = file:close(Fd), 171 {error, {not_a_log_file, FName}} 172 end; 173 _Other -> 174 {error, {not_a_log_file, FName}} 175 end. 176 177chunk(WR, N, Bad) -> 178 #wrap_reader{fd = Fd, cont = Continue, file = File, file_no = CurFileNo, 179 first_no = FirstFileNo} = WR, 180 case read_a_chunk(Fd, N, Continue, add_ext(File, CurFileNo)) of 181 eof -> 182 case FirstFileNo of 183 one -> 184 {WR, eof}; 185 _Else -> 186 chunk_at_eof(WR, N, Bad) 187 end; 188 {ContOut, [], BadBytes} -> 189 ?FORMAT("chunk: empty chunk read, ~p bad bytes~n", [BadBytes]), 190 chunk(WR#wrap_reader{cont = ContOut}, N, Bad + BadBytes); 191 {ContOut, Chunk, BadBytes} when Bad + BadBytes =:= 0 -> 192 {WR#wrap_reader{cont = ContOut}, Chunk}; 193 {ContOut, Chunk, BadBytes} -> 194 ?FORMAT("chunk: total of ~p bad bytes~n", [BadBytes]), 195 {WR#wrap_reader{cont = ContOut}, Chunk, Bad + BadBytes}; 196 Error -> 197 Error 198 end. 199 200read_a_chunk(Fd, N, start, FileName) -> 201 read_a_chunk(Fd, FileName, 0, [], N); 202read_a_chunk(Fd, N, More, FileName) -> 203 Pos = More#continuation.pos, 204 B = More#continuation.b, 205 read_a_chunk(Fd, FileName, Pos, B, N). 206 207read_a_chunk(Fd, FileName, Pos, B, N) -> 208 R = disk_log_1:chunk_read_only(Fd, FileName, Pos, B, N), 209 %% Create terms from the binaries returned from chunk_read_only/5. 210 %% 'foo' will do here since Log is not used in read-only mode. 211 Log = foo, 212 case disk_log:ichunk_end(R, Log) of 213 {C = #continuation{}, S} -> 214 {C, S, 0}; 215 Else -> 216 Else 217 end. 218 219chunk_at_eof(WR, N, Bad) -> 220 #wrap_reader{file = File, file_no = CurFileNo, 221 first_no = FirstFileNo} = WR, 222 case read_index_file(File) of 223 {ok, IndexFile} -> 224 {_, _, _, NoOfFiles} = IndexFile, 225 NewFileNo = case (CurFileNo + 1) rem NoOfFiles of 226 %% The special case described above. 227 _ when CurFileNo > NoOfFiles -> 1; 228 0 when NoOfFiles > 1 -> NoOfFiles; 229 No when CurFileNo =:= NoOfFiles -> 230 FileName = add_ext(File, CurFileNo+1), 231 case file:read_file_info(FileName) of 232 {ok, _} -> CurFileNo + 1; 233 _ -> No 234 end; 235 No -> No 236 end, 237 ?FORMAT("chunk: at eof, index file: ~p, FirstFileNo: ~p, " 238 "CurFileNo: ~p, NoOfFiles: ~p, NewFileNo: ~p~n", 239 [IndexFile, FirstFileNo, CurFileNo, 240 NoOfFiles, NewFileNo]), 241 case {FirstFileNo, NewFileNo} of 242 {_, 0} -> {WR, eof}; 243 {_, FirstFileNo} -> {WR, eof}; 244 _ -> read_next_file(WR, N, NewFileNo, Bad) 245 end; 246 Error -> 247 Error 248 end. 249 250%% Read the index file for the File 251%% -> {ok, {CurFileNo, CurFileSz, TotSz, NoOfFiles}} | {error, Reason} 252read_index_file(File) -> 253 case catch disk_log_1:read_index_file(File) of 254 {1, 0, 0, 0} -> 255 {error, {index_file_not_found, File}}; 256 {error, _Reason} -> 257 {error, {index_file_not_found, File}}; 258 FileData -> 259 {ok, FileData} 260 end. 261 262%% When reading all the index files, this function closes the previous 263%% index file and opens the next one. 264read_next_file(WR, N, NewFileNo, Bad) -> 265 #wrap_reader{file = File, file_no = CurFileNo, 266 mod_time = ModTime, first_no = FirstFileNo} = WR, 267 %% If current file were closed here, then WR would be in a strange 268 %% state should an error occur below. 269 case last_mod_time(add_ext(File, NewFileNo)) of 270 {ok, NewModTime} -> 271 OldMT = calendar:datetime_to_gregorian_seconds(ModTime), 272 NewMT = calendar:datetime_to_gregorian_seconds(NewModTime), 273 Diff = NewMT - OldMT, 274 ?FORMAT("next: now = ~p~n last mtime = ~p~n" 275 " mtime = ~p~n diff = ~p~n", 276 [calendar:local_time(), ModTime, NewModTime, Diff]), 277 if 278 Diff < 0 -> 279 %% The file to be read is older than the one just finished. 280 {error, {is_wrapped, add_ext(File, CurFileNo)}}; 281 true -> 282 case open_int(File, NewFileNo, FirstFileNo) of 283 {ok, NWR} -> 284 _ = close(WR), %% Now we can safely close the old file. 285 chunk(NWR, N, Bad); 286 Error -> 287 Error 288 end 289 end; 290 {error, EN} -> 291 {error, {file_error, add_ext(File, NewFileNo), EN}} 292 end. 293 294%% Get the last modification time of a file 295last_mod_time(File) -> 296 case file:read_file_info(File) of 297 {ok, FileInfo} -> 298 {ok, FileInfo#file_info.mtime}; 299 E -> 300 {error, E} 301 end. 302 303add_ext(File, Ext) -> 304 lists:concat([File, ".", Ext]). 305