1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 1998-2016. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20
21%% Read wrap files with internal format
22
23-module(wrap_log_reader).
24
25%%-define(debug, true).
26-ifdef(debug).
27-define(FORMAT(P, A), io:format(P, A)).
28-else.
29-define(FORMAT(P, A), ok).
30-endif.
31
32-export([open/1, open/2, chunk/1, chunk/2, close/1]).
33
34-export_type([continuation/0]).
35
36-include("disk_log.hrl").
37
38-record(wrap_reader,
39	{fd       :: file:fd(),
40	 cont     :: dlog_cont(), 	% disk_log's continuation record
41	 file     :: file:filename(),	% file name without extension
42	 file_no  :: non_neg_integer(),	% current file number
43	 mod_time :: file:date_time(),	% modification time of current file
44	 first_no :: non_neg_integer() | 'one' % first read file number
45	}).
46
47-opaque continuation() :: #wrap_reader{}.
48
49%%
50%%  Exported functions
51%%
52
53%% A special case to be handled when appropriate: if current file
54%% number is one greater than number of files then the max file number
55%% is not yet reached, we are on the first 'round' of filling the wrap
56%% files.
57
58-type open_ret() :: {'ok', Continuation :: continuation()}
59                  | {'error', Reason :: tuple()}.
60
61-spec open(Filename) -> open_ret() when
62      Filename :: string() | atom().
63
64open(File) when is_atom(File) ->
65    open(atom_to_list(File));
66open(File) when is_list(File) ->
67    case read_index_file(File) of
68	%% The special case described above.
69	{ok, {CurFileNo, _CurFileSz, _TotSz, NoOfFiles}}
70	             when CurFileNo =:= NoOfFiles + 1 ->
71	    FileNo = 1,
72	    ?FORMAT("open from ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n",
73		    [FileNo, CurFileNo, _CurFileSz, _TotSz, NoOfFiles]),
74	    open_int(File, FileNo, FileNo);
75	{ok, {CurFileNo, _CurFileSz, _TotSz, NoOfFiles}} ->
76	    FileNo = case (CurFileNo + 1) rem NoOfFiles of
77			 0 -> NoOfFiles;
78			 No -> No
79		     end,
80	    ?FORMAT("open from ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n",
81		    [FileNo, CurFileNo, _CurFileSz, _TotSz, NoOfFiles]),
82	    open_int(File, FileNo, FileNo);
83	Error ->
84	    Error
85    end.
86
87-spec open(Filename, N) -> open_ret() when
88      Filename :: string() | atom(),
89      N :: integer().
90
91open(File, FileNo) when is_atom(File), is_integer(FileNo) ->
92    open(atom_to_list(File), FileNo);
93open(File, FileNo) when is_list(File), is_integer(FileNo) ->
94    case read_index_file(File) of
95	{ok, {_CurFileNo, _CurFileSz, _TotSz, NoOfFiles}}
96	  when NoOfFiles >= FileNo ->
97	    ?FORMAT("open file ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n",
98		    [FileNo, _CurFileNo, _CurFileSz, _TotSz, NoOfFiles]),
99	    open_int(File, FileNo, one);
100	%% The special case described above.
101	{ok, {CurFileNo, _CurFileSz, _TotSz, NoOfFiles}}
102	  when CurFileNo =:= FileNo, CurFileNo =:= NoOfFiles +1 ->
103	    ?FORMAT("open file ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n",
104		    [FileNo, CurFileNo, _CurFileSz, _TotSz, NoOfFiles]),
105	    open_int(File, FileNo, one);
106	{ok, {_CurFileNo, _CurFileSz, _TotSz, _NoOfFiles}} ->
107	    {error, {file_not_found, add_ext(File, FileNo)}};
108	Error ->
109	    Error
110    end.
111
112-spec close(Continuation) -> 'ok' | {'error', Reason} when
113      Continuation :: continuation(),
114      Reason :: file:posix().
115
116close(#wrap_reader{fd = FD}) ->
117    file:close(FD).
118
119-type chunk_ret() :: {Continuation2, Terms :: [term()]}
120                   | {Continuation2,
121                      Terms :: [term()],
122                      Badbytes :: non_neg_integer()}
123                   | {Continuation2, 'eof'}
124                   | {'error', Reason :: term()}.
125
126-spec chunk(Continuation) -> chunk_ret() when
127      Continuation :: continuation().
128
129chunk(WR = #wrap_reader{}) ->
130    chunk(WR, ?MAX_CHUNK_SIZE, 0).
131
132-spec chunk(Continuation, N) -> chunk_ret() when
133      Continuation :: continuation(),
134      N :: infinity | pos_integer().
135
136chunk(WR = #wrap_reader{}, infinity) ->
137    chunk(WR, ?MAX_CHUNK_SIZE, 0);
138chunk(WR = #wrap_reader{}, N) when is_integer(N), N > 0 ->
139    chunk(WR, N, 0).
140
141%%
142%%  Local functions
143%%
144
145open_int(File, FileNo, FirstFileNo) ->
146    FName = add_ext(File, FileNo),
147    case file:open(FName, [raw, binary, read]) of
148	{ok, Fd} ->  %% File exists
149	    case file:read(Fd, ?HEADSZ) of
150		{ok, Head} ->
151		    case disk_log_1:is_head(Head) of
152			no ->
153			    _ = file:close(Fd),
154			    {error, {not_a_log_file, FName}};
155			_ -> % yes or yes_not_closed
156			    case last_mod_time(FName) of
157				{ok, ModTime} ->
158				    WR = #wrap_reader{fd = Fd, cont = start,
159						      file = File,
160						      file_no = FileNo,
161						      mod_time = ModTime,
162						      first_no = FirstFileNo},
163				    {ok, WR};
164				{error, E} ->
165				    _ = file:close(Fd),
166				    {error, {file_error, FName, E}}
167			    end
168		    end;
169		_Other ->
170		    _ = file:close(Fd),
171		    {error, {not_a_log_file, FName}}
172	    end;
173	_Other ->
174	    {error, {not_a_log_file, FName}}
175    end.
176
177chunk(WR, N, Bad) ->
178    #wrap_reader{fd = Fd, cont = Continue, file = File, file_no = CurFileNo,
179		 first_no = FirstFileNo} = WR,
180    case read_a_chunk(Fd, N, Continue, add_ext(File, CurFileNo)) of
181	eof ->
182	    case FirstFileNo of
183		one ->
184		    {WR, eof};
185		_Else ->
186		    chunk_at_eof(WR, N, Bad)
187	    end;
188	{ContOut, [], BadBytes} ->
189	    ?FORMAT("chunk: empty chunk read, ~p bad bytes~n", [BadBytes]),
190	    chunk(WR#wrap_reader{cont = ContOut}, N, Bad + BadBytes);
191	{ContOut, Chunk, BadBytes} when Bad + BadBytes =:= 0 ->
192	    {WR#wrap_reader{cont = ContOut}, Chunk};
193	{ContOut, Chunk, BadBytes} ->
194	    ?FORMAT("chunk: total of ~p bad bytes~n", [BadBytes]),
195	    {WR#wrap_reader{cont = ContOut}, Chunk, Bad + BadBytes};
196	Error ->
197	    Error
198    end.
199
200read_a_chunk(Fd, N, start, FileName) ->
201    read_a_chunk(Fd, FileName, 0, [], N);
202read_a_chunk(Fd, N, More, FileName) ->
203    Pos = More#continuation.pos,
204    B = More#continuation.b,
205    read_a_chunk(Fd, FileName, Pos, B, N).
206
207read_a_chunk(Fd, FileName, Pos, B, N) ->
208    R = disk_log_1:chunk_read_only(Fd, FileName, Pos, B, N),
209    %% Create terms from the binaries returned from chunk_read_only/5.
210    %% 'foo' will do here since Log is not used in read-only mode.
211    Log = foo,
212    case disk_log:ichunk_end(R, Log) of
213	{C = #continuation{}, S} ->
214	    {C, S, 0};
215	Else ->
216	    Else
217    end.
218
219chunk_at_eof(WR, N, Bad) ->
220    #wrap_reader{file = File, file_no = CurFileNo,
221		 first_no = FirstFileNo} = WR,
222    case read_index_file(File) of
223	{ok, IndexFile} ->
224	    {_, _, _, NoOfFiles} = IndexFile,
225	    NewFileNo = case (CurFileNo + 1) rem NoOfFiles of
226			    %% The special case described above.
227			    _ when CurFileNo > NoOfFiles -> 1;
228			    0 when NoOfFiles > 1 -> NoOfFiles;
229			    No when CurFileNo =:= NoOfFiles ->
230				FileName = add_ext(File, CurFileNo+1),
231				case file:read_file_info(FileName) of
232				    {ok, _} -> CurFileNo + 1;
233				    _ -> No
234				end;
235			    No -> No
236			end,
237	    ?FORMAT("chunk: at eof, index file: ~p, FirstFileNo: ~p, "
238		    "CurFileNo: ~p, NoOfFiles: ~p, NewFileNo: ~p~n",
239		    [IndexFile, FirstFileNo, CurFileNo,
240		     NoOfFiles, NewFileNo]),
241	    case {FirstFileNo, NewFileNo} of
242		{_, 0} -> {WR, eof};
243		{_, FirstFileNo} -> {WR, eof};
244		_ -> read_next_file(WR, N, NewFileNo, Bad)
245	    end;
246	Error ->
247	    Error
248    end.
249
250%% Read the index file for the File
251%% -> {ok, {CurFileNo, CurFileSz, TotSz, NoOfFiles}} | {error, Reason}
252read_index_file(File) ->
253    case catch disk_log_1:read_index_file(File) of
254	{1, 0, 0, 0} ->
255	    {error, {index_file_not_found, File}};
256	{error, _Reason} ->
257	    {error, {index_file_not_found, File}};
258	FileData ->
259	    {ok, FileData}
260    end.
261
262%% When reading all the index files, this function closes the previous
263%% index file and opens the next one.
264read_next_file(WR, N, NewFileNo, Bad) ->
265    #wrap_reader{file = File, file_no = CurFileNo,
266		 mod_time = ModTime, first_no = FirstFileNo} = WR,
267    %% If current file were closed here, then WR would be in a strange
268    %% state should an error occur below.
269    case last_mod_time(add_ext(File, NewFileNo)) of
270	{ok, NewModTime} ->
271	    OldMT = calendar:datetime_to_gregorian_seconds(ModTime),
272	    NewMT = calendar:datetime_to_gregorian_seconds(NewModTime),
273	    Diff = NewMT - OldMT,
274	    ?FORMAT("next: now = ~p~n      last mtime = ~p~n"
275		    "      mtime = ~p~n      diff = ~p~n",
276		    [calendar:local_time(), ModTime, NewModTime, Diff]),
277	    if
278		Diff < 0 ->
279		    %% The file to be read is older than the one just finished.
280		    {error, {is_wrapped, add_ext(File, CurFileNo)}};
281		true ->
282		    case open_int(File, NewFileNo, FirstFileNo) of
283			{ok, NWR} ->
284			    _ = close(WR), %% Now we can safely close the old file.
285			    chunk(NWR, N, Bad);
286			Error ->
287			    Error
288		    end
289	    end;
290	{error, EN} ->
291	    {error, {file_error, add_ext(File, NewFileNo), EN}}
292    end.
293
294%% Get the last modification time of a file
295last_mod_time(File) ->
296    case file:read_file_info(File) of
297	{ok, FileInfo} ->
298	    {ok, FileInfo#file_info.mtime};
299	E ->
300	    {error, E}
301    end.
302
303add_ext(File, Ext) ->
304    lists:concat([File, ".", Ext]).
305