1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 1997-2020. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20-module(disk_log).
21
22%% Efficient file based log - process part
23
24-export([start/0, istart_link/1,
25	 log/2, log_terms/2, blog/2, blog_terms/2,
26	 alog/2, alog_terms/2, balog/2, balog_terms/2,
27	 close/1, lclose/1, lclose/2, sync/1, open/1,
28	 truncate/1, truncate/2, btruncate/2,
29	 reopen/2, reopen/3, breopen/3, inc_wrap_file/1, change_size/2,
30	 change_notify/3, change_header/2,
31	 chunk/2, chunk/3, bchunk/2, bchunk/3, chunk_step/3, chunk_info/1,
32	 block/1, block/2, unblock/1, info/1, format_error/1,
33	 accessible_logs/0, all/0]).
34
35%% Internal exports
36-export([init/2, internal_open/2,
37	 system_continue/3, system_terminate/4, system_code_change/4]).
38
39%% To be used by disk_log_h.erl (not (yet) in Erlang/OTP) only.
40-export([ll_open/1, ll_close/1, do_log/2, do_sync/1, do_info/2]).
41
42%% To be used by wrap_log_reader only.
43-export([ichunk_end/2]).
44
45%% To be used for debugging only:
46-export([pid2name/1]).
47
48-export_type([continuation/0]).
49
50-deprecated([{accessible_logs, 0, "use disk_log:all/0 instead"},
51             {lclose, 1, "use disk_log:close/1 instead"},
52             {lclose, 2, "use disk_log:close/1 instead"}]).
53
54-type dlog_state_error() :: 'ok' | {'error', term()}.
55
56-record(state, {queue = [],
57		messages = [],
58		parent,
59		server,
60		cnt = 0           :: non_neg_integer(),
61		args,
62		error_status = ok :: dlog_state_error(),
63		cache_error = ok     %% cache write error after timeout
64	       }).
65
66-include("disk_log.hrl").
67
68-define(failure(Error, Function, Arg),
69	{{failed, Error}, [{?MODULE, Function, Arg}]}).
70
71%%-define(PROFILE(C), C).
72-define(PROFILE(C), void).
73
74-compile({inline,[{log_loop,6},{log_end_sync,2},{replies,2},{rflat,1}]}).
75
76%%%----------------------------------------------------------------------
77%%% Contract type specifications
78%%%----------------------------------------------------------------------
79
80-opaque continuation() :: #continuation{}.
81
82-type file_error()     :: term().  % XXX: refine
83-type invalid_header() :: term().  % XXX: refine
84
85%%%----------------------------------------------------------------------
86%%% API
87%%%----------------------------------------------------------------------
88
89%%-----------------------------------------------------------------
90%% This module implements the API, and the processes for each log.
91%% There is one process per log.
92%%-----------------------------------------------------------------
93
94-type open_error_rsn() :: 'no_such_log'
95                        | {'badarg', term()}
96                        | {'size_mismatch', CurrentSize :: dlog_size(),
97                           NewSize :: dlog_size()}
98                        | {'arg_mismatch', OptionName :: dlog_optattr(),
99                           CurrentValue :: term(), Value :: term()}
100                        | {'name_already_open', Log :: log()}
101                        | {'open_read_write', Log :: log()}
102                        | {'open_read_only', Log :: log()}
103                        | {'need_repair', Log :: log()}
104                        | {'not_a_log_file', FileName :: file:filename()}
105                        | {'invalid_index_file', FileName :: file:filename()}
106                        | {'invalid_header', invalid_header()}
107                        | {'file_error', file:filename(), file_error()}
108                        | {'node_already_open', Log :: log()}.
109-type open_ret()       :: {'ok', Log :: log()}
110                        | {'repaired', Log :: log(),
111                           {'recovered', Rec :: non_neg_integer()},
112                           {'badbytes', Bad :: non_neg_integer()}}
113                        | {'error', open_error_rsn()}.
114
115-spec open(ArgL) -> open_ret() when
116      ArgL :: dlog_options().
117open(A) ->
118    disk_log_server:open(check_arg(A, #arg{options = A})).
119
120-type log_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()}
121                       | {'format_external', log()} | {'blocked_log', log()}
122                       | {'full', log()} | {'invalid_header', invalid_header()}
123                       | {'file_error', file:filename(), file_error()}.
124
125-spec log(Log, Term) -> ok | {error, Reason :: log_error_rsn()} when
126      Log :: log(),
127      Term :: term().
128log(Log, Term) ->
129    req(Log, {log, internal, [term_to_binary(Term)]}).
130
131-spec blog(Log, Bytes) -> ok | {error, Reason :: log_error_rsn()} when
132      Log :: log(),
133      Bytes :: iodata().
134blog(Log, Bytes) ->
135    req(Log, {log, external, [ensure_binary(Bytes)]}).
136
137-spec log_terms(Log, TermList) -> ok | {error, Reason :: log_error_rsn()} when
138      Log :: log(),
139      TermList :: [term()].
140log_terms(Log, Terms) ->
141    Bs = terms2bins(Terms),
142    req(Log, {log, internal, Bs}).
143
144-spec blog_terms(Log, BytesList) ->
145                        ok | {error, Reason :: log_error_rsn()} when
146      Log :: log(),
147      BytesList :: [iodata()].
148blog_terms(Log, Bytess) ->
149    Bs = ensure_binary_list(Bytess),
150    req(Log, {log, external, Bs}).
151
152-type notify_ret() :: 'ok' | {'error', 'no_such_log'}.
153
154-spec alog(Log, Term) -> notify_ret() when
155      Log :: log(),
156      Term :: term().
157alog(Log, Term) ->
158    notify(Log, {alog, internal, [term_to_binary(Term)]}).
159
160-spec alog_terms(Log, TermList) -> notify_ret() when
161      Log :: log(),
162      TermList :: [term()].
163alog_terms(Log, Terms) ->
164    Bs = terms2bins(Terms),
165    notify(Log, {alog, internal, Bs}).
166
167-spec balog(Log, Bytes) -> notify_ret() when
168      Log :: log(),
169      Bytes :: iodata().
170balog(Log, Bytes) ->
171    notify(Log, {alog, external, [ensure_binary(Bytes)]}).
172
173-spec balog_terms(Log, ByteList) -> notify_ret() when
174      Log :: log(),
175      ByteList :: [iodata()].
176balog_terms(Log, Bytess) ->
177    Bs = ensure_binary_list(Bytess),
178    notify(Log, {alog, external, Bs}).
179
180-type close_error_rsn() ::'no_such_log' | 'nonode'
181                         | {'file_error', file:filename(), file_error()}.
182
183-spec close(Log) -> 'ok' | {'error', close_error_rsn()} when
184      Log :: log().
185close(Log) ->
186    req(Log, close).
187
188-type lclose_error_rsn() :: 'no_such_log'
189                          | {'file_error', file:filename(), file_error()}.
190
191-spec lclose(Log) -> 'ok' | {'error', lclose_error_rsn()} when
192      Log :: log().
193lclose(Log) ->
194    lclose(Log, node()).
195
196-spec lclose(Log, Node) -> 'ok' | {'error', lclose_error_rsn()} when
197      Log :: log(),
198      Node :: node().
199lclose(Log, Node) when node() =:= Node ->
200    req(Log, close);
201lclose(_Log, _Node) ->
202    {error, no_such_log}.
203
204-type trunc_error_rsn() :: 'no_such_log' | 'nonode'
205                         | {'read_only_mode', log()}
206                         | {'blocked_log', log()}
207                         | {'invalid_header', invalid_header()}
208                         | {'file_error', file:filename(), file_error()}.
209
210-spec truncate(Log) -> 'ok' | {'error', trunc_error_rsn()} when
211      Log :: log().
212truncate(Log) ->
213    req(Log, {truncate, none, truncate, 1}).
214
215-spec truncate(Log, Head) -> 'ok' | {'error', trunc_error_rsn()} when
216      Log :: log(),
217      Head :: term().
218truncate(Log, Head) ->
219    req(Log, {truncate, {ok, term_to_binary(Head)}, truncate, 2}).
220
221-spec btruncate(Log, BHead) -> 'ok' | {'error', trunc_error_rsn()} when
222      Log :: log(),
223      BHead :: iodata().
224btruncate(Log, Head) ->
225    req(Log, {truncate, {ok, ensure_binary(Head)}, btruncate, 2}).
226
227-type reopen_error_rsn() :: no_such_log
228                          | nonode
229                          | {read_only_mode, log()}
230                          | {blocked_log, log()}
231                          | {same_file_name, log()} |
232                            {invalid_index_file, file:filename()}
233                          | {invalid_header, invalid_header()}
234                          | {'file_error', file:filename(), file_error()}.
235
236-spec reopen(Log, File) -> 'ok' | {'error', reopen_error_rsn()} when
237      Log :: log(),
238      File :: file:filename().
239reopen(Log, NewFile) ->
240    req(Log, {reopen, NewFile, none, reopen, 2}).
241
242-spec reopen(Log, File, Head) -> 'ok' | {'error', reopen_error_rsn()} when
243      Log :: log(),
244      File :: file:filename(),
245      Head :: term().
246reopen(Log, NewFile, NewHead) ->
247    req(Log, {reopen, NewFile, {ok, term_to_binary(NewHead)}, reopen, 3}).
248
249-spec breopen(Log, File, BHead) -> 'ok' | {'error', reopen_error_rsn()} when
250      Log :: log(),
251      File :: file:filename(),
252      BHead :: iodata().
253breopen(Log, NewFile, NewHead) ->
254    req(Log, {reopen, NewFile, {ok, ensure_binary(NewHead)}, breopen, 3}).
255
256-type inc_wrap_error_rsn() :: 'no_such_log' | 'nonode'
257                            | {'read_only_mode', log()}
258                            | {'blocked_log', log()} | {'halt_log', log()}
259                            | {'invalid_header', invalid_header()}
260                            | {'file_error', file:filename(), file_error()}.
261
262-spec inc_wrap_file(Log) -> 'ok' | {'error', inc_wrap_error_rsn()} when
263      Log :: log().
264inc_wrap_file(Log) ->
265    req(Log, inc_wrap_file).
266
267-spec change_size(Log, Size) -> 'ok' | {'error', Reason} when
268      Log :: log(),
269      Size :: dlog_size(),
270      Reason :: no_such_log | nonode | {read_only_mode, Log}
271              | {blocked_log, Log}
272              | {new_size_too_small, Log, CurrentSize :: pos_integer()}
273              | {badarg, size}
274              | {file_error, file:filename(), file_error()}.
275change_size(Log, NewSize) ->
276    req(Log, {change_size, NewSize}).
277
278-spec change_notify(Log, Owner, Notify) -> 'ok' | {'error', Reason} when
279      Log :: log(),
280      Owner :: pid(),
281      Notify :: boolean(),
282      Reason :: no_such_log | nonode | {blocked_log, Log}
283              | {badarg, notify} | {not_owner, Owner}.
284change_notify(Log, Pid, NewNotify) ->
285    req(Log, {change_notify, Pid, NewNotify}).
286
287-spec change_header(Log, Header) -> 'ok' | {'error', Reason} when
288      Log :: log(),
289      Header :: {head, dlog_head_opt()}
290              | {head_func, MFA :: {atom(), atom(), list()}},
291      Reason :: no_such_log | nonode | {read_only_mode, Log}
292              | {blocked_log, Log} | {badarg, head}.
293change_header(Log, NewHead) ->
294    req(Log, {change_header, NewHead}).
295
296-type sync_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()}
297                        | {'blocked_log', log()}
298                        | {'file_error', file:filename(), file_error()}.
299
300-spec sync(Log) -> 'ok' | {'error', sync_error_rsn()} when
301      Log :: log().
302sync(Log) ->
303    req(Log, sync).
304
305-type block_error_rsn() :: 'no_such_log' | 'nonode' | {'blocked_log', log()}.
306
307-spec block(Log) -> 'ok' | {'error', block_error_rsn()} when
308      Log :: log().
309block(Log) ->
310    block(Log, true).
311
312-spec block(Log, QueueLogRecords) -> 'ok' | {'error', block_error_rsn()} when
313      Log :: log(),
314      QueueLogRecords :: boolean().
315block(Log, QueueLogRecords) ->
316    req(Log, {block, QueueLogRecords}).
317
318-type unblock_error_rsn() :: 'no_such_log' | 'nonode'
319                           | {'not_blocked', log()}
320                           | {'not_blocked_by_pid', log()}.
321
322-spec unblock(Log) -> 'ok' | {'error', unblock_error_rsn()} when
323      Log :: log().
324unblock(Log) ->
325    req(Log, unblock).
326
327-spec format_error(Error) -> io_lib:chars() when
328      Error :: term().
329format_error(Error) ->
330    do_format_error(Error).
331
332-type dlog_info() :: {name, Log :: log()}
333                   | {file, File :: file:filename()}
334                   | {type, Type :: dlog_type()}
335                   | {format, Format :: dlog_format()}
336                   | {size, Size :: dlog_size()}
337                   | {mode, Mode :: dlog_mode()}
338                   | {owners, [{pid(), Notify :: boolean()}]}
339                   | {users, Users :: non_neg_integer()}
340                   | {status, Status ::
341                        ok | {blocked, QueueLogRecords :: boolean()}}
342                   | {node, Node :: node()}
343                   | {head, Head :: none
344                                  | {head, binary()}
345                                  | (MFA :: {atom(), atom(), list()})}
346                   | {no_written_items, NoWrittenItems ::non_neg_integer()}
347                   | {full, Full :: boolean}
348                   | {no_current_bytes, non_neg_integer()}
349                   | {no_current_items, non_neg_integer()}
350                   | {no_items, non_neg_integer()}
351                   | {current_file, pos_integer()}
352                   | {no_overflows, {SinceLogWasOpened :: non_neg_integer(),
353                                     SinceLastInfo :: non_neg_integer()}}.
354-spec info(Log) -> InfoList | {'error', no_such_log} when
355      Log :: log(),
356      InfoList :: [dlog_info()].
357info(Log) ->
358    req(Log, info).
359
360-spec pid2name(Pid) -> {'ok', Log} | 'undefined' when
361      Pid :: pid(),
362      Log :: log().
363pid2name(Pid) ->
364    disk_log_server:start(),
365    case ets:lookup(?DISK_LOG_PID_TABLE, Pid) of
366        [] -> undefined;
367        [{_Pid, Log}] -> {ok, Log}
368    end.
369
370%% This function Takes 3 args, a Log, a Continuation and N.
371%% It retuns a {Cont2, ObjList} | eof | {error, Reason}
372%% The initial continuation is the atom 'start'
373
374-type chunk_error_rsn() :: no_such_log
375                         | {format_external, log()}
376                         | {blocked_log, log()}
377                         | {badarg, continuation}
378                         | {not_internal_wrap, log()}
379                         | {corrupt_log_file, FileName :: file:filename()}
380                         | {file_error, file:filename(), file_error()}.
381
382-type chunk_ret() :: {Continuation2 :: continuation(), Terms :: [term()]}
383                   | {Continuation2 :: continuation(),
384                      Terms :: [term()],
385                      Badbytes :: non_neg_integer()}
386                   | eof
387                   | {error, Reason :: chunk_error_rsn()}.
388
389-spec chunk(Log, Continuation) -> chunk_ret() when
390      Log :: log(),
391      Continuation :: start | continuation().
392chunk(Log, Cont) ->
393    chunk(Log, Cont, infinity).
394
395-spec chunk(Log, Continuation, N) -> chunk_ret() when
396      Log :: log(),
397      Continuation :: start | continuation(),
398      N :: pos_integer() | infinity.
399chunk(Log, Cont, infinity) ->
400    %% There cannot be more than ?MAX_CHUNK_SIZE terms in a chunk.
401    ichunk(Log, Cont, ?MAX_CHUNK_SIZE);
402chunk(Log, Cont, N) when is_integer(N), N > 0 ->
403    ichunk(Log, Cont, N).
404
405ichunk(Log, start, N) ->
406    R = req(Log, {chunk, 0, [], N}),
407    ichunk_end(R, Log);
408ichunk(Log, More, N) when is_record(More, continuation) ->
409    R = req2(More#continuation.pid,
410	     {chunk, More#continuation.pos, More#continuation.b, N}),
411    ichunk_end(R, Log);
412ichunk(_Log, _, _) ->
413    {error, {badarg, continuation}}.
414
415ichunk_end({C, R}, Log) when is_record(C, continuation) ->
416    ichunk_end(R, read_write, Log, C, 0);
417ichunk_end({C, R, Bad}, Log) when is_record(C, continuation) ->
418    ichunk_end(R, read_only, Log, C, Bad);
419ichunk_end(R, _Log) ->
420    R.
421
422%% Create the terms on the client's heap, not the server's.
423%% The list of binaries is reversed.
424ichunk_end(R, Mode, Log, C, Bad) ->
425    case catch bins2terms(R, []) of
426	{'EXIT', _} ->
427	    RR = lists:reverse(R),
428	    ichunk_bad_end(RR, Mode, Log, C, Bad, []);
429	Ts when Bad > 0 ->
430	    {C, Ts, Bad};
431	Ts when Bad =:= 0 ->
432	    {C, Ts}
433    end.
434
435bins2terms([], L) ->
436    L;
437bins2terms([B | Bs], L) ->
438    bins2terms(Bs, [binary_to_term(B) | L]).
439
440ichunk_bad_end([B | Bs], Mode, Log, C, Bad, A) ->
441    case catch binary_to_term(B) of
442	{'EXIT', _} when read_write =:= Mode ->
443	    InfoList = info(Log),
444	    {value, {file, FileName}} = lists:keysearch(file, 1, InfoList),
445            File = case C#continuation.pos of
446		       Pos when is_integer(Pos) -> FileName; % halt log
447		       {FileNo, _} -> add_ext(FileName, FileNo) % wrap log
448		   end,
449	    {error, {corrupt_log_file, File}};
450	{'EXIT', _} when read_only =:= Mode ->
451	    Reread = lists:foldl(fun(Bin, Sz) ->
452                                         Sz + byte_size(Bin) + ?HEADERSZ
453                                 end, 0, Bs),
454	    NewPos = case C#continuation.pos of
455			 Pos when is_integer(Pos) -> Pos-Reread;
456			 {FileNo, Pos} -> {FileNo, Pos-Reread}
457		     end,
458	    NewBad = Bad + byte_size(B),
459	    {C#continuation{pos = NewPos, b = []}, lists:reverse(A), NewBad};
460	T ->
461	    ichunk_bad_end(Bs, Mode, Log, C, Bad, [T | A])
462    end.
463
464-type bchunk_ret() :: {Continuation2 :: continuation(),
465                       Binaries :: [binary()]}
466                    | {Continuation2 :: continuation(),
467                       Binaries :: [binary()],
468                       Badbytes :: non_neg_integer()}
469                    | eof
470                    | {error, Reason :: chunk_error_rsn()}.
471
472-spec bchunk(Log, Continuation) -> bchunk_ret() when
473      Log :: log(),
474      Continuation :: start | continuation().
475bchunk(Log, Cont) ->
476    bchunk(Log, Cont, infinity).
477
478-spec bchunk(Log, Continuation, N) -> bchunk_ret() when
479      Log :: log(),
480      Continuation :: start | continuation(),
481      N :: pos_integer() | infinity.
482bchunk(Log, Cont, infinity) ->
483    %% There cannot be more than ?MAX_CHUNK_SIZE terms in a chunk.
484    bichunk(Log, Cont, ?MAX_CHUNK_SIZE);
485bchunk(Log, Cont, N) when is_integer(N), N > 0 ->
486    bichunk(Log, Cont, N).
487
488bichunk(Log, start, N) ->
489    R = req(Log, {chunk, 0, [], N}),
490    bichunk_end(R);
491bichunk(_Log, #continuation{pid = Pid, pos = Pos, b = B}, N) ->
492    R = req2(Pid, {chunk, Pos, B, N}),
493    bichunk_end(R);
494bichunk(_Log, _, _) ->
495    {error, {badarg, continuation}}.
496
497bichunk_end({C = #continuation{}, R}) ->
498    {C, lists:reverse(R)};
499bichunk_end({C = #continuation{}, R, Bad}) ->
500    {C, lists:reverse(R), Bad};
501bichunk_end(R) ->
502    R.
503
504-spec chunk_step(Log, Continuation, Step) ->
505                        {'ok', any()} | {'error', Reason} when
506      Log :: log(),
507      Continuation :: start | continuation(),
508      Step :: integer(),
509      Reason :: no_such_log | end_of_log | {format_external, Log}
510              | {blocked_log, Log}  | {badarg, continuation}
511              | {file_error, file:filename(), file_error()}.
512chunk_step(Log, Cont, N) when is_integer(N) ->
513    ichunk_step(Log, Cont, N).
514
515ichunk_step(Log, start, N) ->
516    req(Log, {chunk_step, 0, N});
517ichunk_step(_Log, More, N) when is_record(More, continuation) ->
518    req2(More#continuation.pid, {chunk_step, More#continuation.pos, N});
519ichunk_step(_Log, _, _) ->
520    {error, {badarg, continuation}}.
521
522-spec chunk_info(Continuation) -> InfoList | {error, Reason} when
523      Continuation :: continuation(),
524      InfoList :: [{node, Node :: node()}, ...],
525      Reason :: {no_continuation, Continuation}.
526chunk_info(More = #continuation{}) ->
527   [{node, node(More#continuation.pid)}];
528chunk_info(BadCont) ->
529   {error, {no_continuation, BadCont}}.
530
531-spec accessible_logs() -> {[Log], []} when
532      Log :: log().
533accessible_logs() ->
534    {disk_log_server:all(), []}.
535
536-spec all() -> [Log] when
537      Log :: log().
538all() ->
539    disk_log_server:all().
540
541istart_link(Server) ->
542    {ok, proc_lib:spawn_link(disk_log, init, [self(), Server])}.
543
544%% Only for backwards compatibility, could probably be removed.
545-spec start() -> 'ok'.
546start() ->
547    disk_log_server:start().
548
549internal_open(Pid, A) ->
550    req2(Pid, {internal_open, A}).
551
552%%% ll_open() and ll_close() are used by disk_log_h.erl, a module not
553%%% (yet) in Erlang/OTP.
554
555%% -spec ll_open(dlog_options()) -> {'ok', Res :: _, #log{}, Cnt :: _} | Error.
556ll_open(A) ->
557    case check_arg(A, #arg{options = A}) of
558	{ok, L} -> do_open(L);
559	Error -> Error
560    end.
561
562%% -> closed | throw(Error)
563ll_close(Log) ->
564    close_disk_log2(Log).
565
566check_arg([], Res) ->
567    Ret = case Res#arg.head of
568	      none ->
569		  {ok, Res};
570	      _ ->
571		  case check_head(Res#arg.head, Res#arg.format) of
572		      {ok, Head} ->
573			  {ok, Res#arg{head = Head}};
574		      Error ->
575			  Error
576		  end
577	  end,
578
579    if  %% check result
580	Res#arg.name =:= 0 ->
581	    {error, {badarg, name}};
582	Res#arg.file =:= none ->
583	    case catch lists:concat([Res#arg.name, ".LOG"]) of
584		{'EXIT',_} -> {error, {badarg, file}};
585		FName ->  check_arg([], Res#arg{file = FName})
586	    end;
587	Res#arg.repair =:= truncate, Res#arg.mode =:= read_only ->
588	    {error, {badarg, repair_read_only}};
589	Res#arg.type =:= halt, is_tuple(Res#arg.size) ->
590	    {error, {badarg, size}};
591	Res#arg.type =:= wrap ->
592	    {OldSize, Version} =
593		disk_log_1:read_size_file_version(Res#arg.file),
594	    check_wrap_arg(Ret, OldSize, Version);
595	true ->
596	    Ret
597    end;
598check_arg([{file, F} | Tail], Res) when is_list(F) ->
599    check_arg(Tail, Res#arg{file = F});
600check_arg([{file, F} | Tail], Res) when is_atom(F) ->
601    check_arg(Tail, Res#arg{file = F});
602check_arg([{linkto, Pid} |Tail], Res) when is_pid(Pid) ->
603    check_arg(Tail, Res#arg{linkto = Pid});
604check_arg([{linkto, none} |Tail], Res) ->
605    check_arg(Tail, Res#arg{linkto = none});
606check_arg([{name, Name}|Tail], Res) ->
607    check_arg(Tail, Res#arg{name = Name});
608check_arg([{repair, true}|Tail], Res) ->
609    check_arg(Tail, Res#arg{repair = true});
610check_arg([{repair, false}|Tail], Res) ->
611    check_arg(Tail, Res#arg{repair = false});
612check_arg([{repair, truncate}|Tail], Res) ->
613    check_arg(Tail, Res#arg{repair = truncate});
614check_arg([{size, Int}|Tail], Res) when is_integer(Int), Int > 0 ->
615    check_arg(Tail, Res#arg{size = Int});
616check_arg([{size, infinity}|Tail], Res) ->
617    check_arg(Tail, Res#arg{size = infinity});
618check_arg([{size, {MaxB,MaxF}}|Tail], Res) when is_integer(MaxB),
619                                                is_integer(MaxF),
620						MaxB > 0, MaxB =< ?MAX_BYTES,
621						MaxF > 0, MaxF < ?MAX_FILES ->
622    check_arg(Tail, Res#arg{size = {MaxB, MaxF}});
623check_arg([{type, wrap}|Tail], Res) ->
624    check_arg(Tail, Res#arg{type = wrap});
625check_arg([{type, halt}|Tail], Res) ->
626    check_arg(Tail, Res#arg{type = halt});
627check_arg([{format, internal}|Tail], Res) ->
628    check_arg(Tail, Res#arg{format = internal});
629check_arg([{format, external}|Tail], Res) ->
630    check_arg(Tail, Res#arg{format = external});
631check_arg([{notify, true}|Tail], Res) ->
632    check_arg(Tail, Res#arg{notify = true});
633check_arg([{notify, false}|Tail], Res) ->
634    check_arg(Tail, Res#arg{notify = false});
635check_arg([{head_func, HeadFunc}|Tail], Res)  ->
636    check_arg(Tail, Res#arg{head = {head_func, HeadFunc}});
637check_arg([{head, Term}|Tail], Res) ->
638    check_arg(Tail, Res#arg{head = {head, Term}});
639check_arg([{mode, read_only}|Tail], Res) ->
640    check_arg(Tail, Res#arg{mode = read_only});
641check_arg([{mode, read_write}|Tail], Res) ->
642    check_arg(Tail, Res#arg{mode = read_write});
643check_arg([{quiet, Boolean}|Tail], Res) when is_boolean(Boolean) ->
644    check_arg(Tail, Res#arg{quiet = Boolean});
645check_arg(Arg, _) ->
646    {error, {badarg, Arg}}.
647
648check_wrap_arg({ok, Res}, {0,0}, _Version) when Res#arg.size =:= infinity ->
649    {error, {badarg, size}};
650check_wrap_arg({ok, Res}, OldSize, Version) when Res#arg.size =:= infinity ->
651    {ok,  Res#arg{version = Version, old_size = OldSize, size = OldSize}};
652check_wrap_arg({ok, Res}, {0,0}, Version) ->
653    {ok, Res#arg{version = Version, old_size = Res#arg.size}};
654check_wrap_arg({ok, Res}, OldSize, Version) when OldSize =:= Res#arg.size ->
655    {ok, Res#arg{version = Version, old_size = OldSize}};
656check_wrap_arg({ok, Res}, OldSize, Version) when Res#arg.repair =:= truncate,
657                                                 is_tuple(Res#arg.size) ->
658    {ok, Res#arg{version = Version, old_size = OldSize}};
659check_wrap_arg({ok, Res}, OldSize, _Version) when Res#arg.size =/= OldSize,
660                                                  Res#arg.mode =:= read_only ->
661    {error, {size_mismatch, OldSize, Res#arg.size}};
662check_wrap_arg({ok, Res}, OldSize, Version) when is_tuple(Res#arg.size) ->
663    {ok, Res#arg{version = Version, old_size = OldSize}};
664check_wrap_arg({ok, _Res}, _OldSize, _Version) ->
665    {error, {badarg, size}};
666check_wrap_arg(Ret, _OldSize, _Version) ->
667    Ret.
668
669%%%-----------------------------------------------------------------
670%%% Server functions
671%%%-----------------------------------------------------------------
672init(Parent, Server) ->
673    ?PROFILE(ep:do()),
674    process_flag(trap_exit, true),
675    loop(#state{parent = Parent, server = Server}).
676
677loop(#state{messages = []}=State) ->
678    receive
679	Message ->
680	    handle(Message, State)
681    end;
682loop(#state{messages = [M | Ms]}=State) ->
683    handle(M, State#state{messages = Ms}).
684
685handle({From, write_cache}, S) when From =:= self() ->
686    case catch do_write_cache(get(log)) of
687        ok ->
688            loop(S);
689        Error ->
690	    loop(S#state{cache_error = Error})
691    end;
692handle({From, {log, Format, B}}=Message, S) ->
693    case get(log) of
694	#log{mode = read_only}=L ->
695	    reply(From, {error, {read_only_mode, L#log.name}}, S);
696	#log{status = ok, format=external}=L when Format =:= internal ->
697	    reply(From, {error, {format_external, L#log.name}}, S);
698	#log{status = ok, format=LogFormat} ->
699	    log_loop(S, From, [B], [], iolist_size(B), LogFormat);
700	#log{status = {blocked, false}}=L ->
701	    reply(From, {error, {blocked_log, L#log.name}}, S);
702	#log{blocked_by = From}=L ->
703	    reply(From, {error, {blocked_log, L#log.name}}, S);
704	_ ->
705	    enqueue(Message, S)
706    end;
707handle({alog, Format, B}=Message, S) ->
708    case get(log) of
709	#log{mode = read_only} ->
710	    notify_owners({read_only,B}),
711	    loop(S);
712	#log{status = ok, format = external} when Format =:= internal ->
713	    notify_owners({format_external, B}),
714	    loop(S);
715	#log{status = ok, format=LogFormat} ->
716	    log_loop(S, [], [B], [], iolist_size(B), LogFormat);
717	#log{status = {blocked, false}} ->
718	    notify_owners({blocked_log, B}),
719	    loop(S);
720	_ ->
721	    enqueue(Message, S)
722    end;
723handle({From, {block, QueueLogRecs}}=Message, S) ->
724    case get(log) of
725	#log{status = ok}=L ->
726	    do_block(From, QueueLogRecs, L),
727	    reply(From, ok, S);
728	#log{status = {blocked, false}}=L ->
729	    reply(From, {error, {blocked_log, L#log.name}}, S);
730	#log{blocked_by = From}=L ->
731	    reply(From, {error, {blocked_log, L#log.name}}, S);
732	_ ->
733	    enqueue(Message, S)
734    end;
735handle({From, unblock}, S) ->
736    case get(log) of
737	#log{status = ok}=L ->
738	    reply(From, {error, {not_blocked, L#log.name}}, S);
739	#log{blocked_by = From}=L ->
740	    S2 = do_unblock(L, S),
741	    reply(From, ok, S2);
742	L ->
743	    reply(From, {error, {not_blocked_by_pid, L#log.name}}, S)
744    end;
745handle({From, sync}=Message, S) ->
746    case get(log) of
747	#log{mode = read_only}=L ->
748	    reply(From, {error, {read_only_mode, L#log.name}}, S);
749	#log{status = ok, format=LogFormat} ->
750            log_loop(S, [], [], [From], 0, LogFormat);
751	#log{status = {blocked, false}}=L ->
752	    reply(From, {error, {blocked_log, L#log.name}}, S);
753	#log{blocked_by = From}=L ->
754	    reply(From, {error, {blocked_log, L#log.name}}, S);
755	_ ->
756	    enqueue(Message, S)
757    end;
758handle({From, {truncate, Head, F, A}}=Message, S) ->
759    case get(log) of
760	#log{mode = read_only}=L ->
761	    reply(From, {error, {read_only_mode, L#log.name}}, S);
762	#log{status = ok} when S#state.cache_error =/= ok ->
763	    loop(cache_error(S, [From]));
764	#log{status = ok}=L ->
765	    H = merge_head(Head, L#log.head),
766	    case catch do_trunc(L, H) of
767		ok ->
768		    erase(is_full),
769		    notify_owners({truncated, S#state.cnt}),
770		    N = if H =:= none -> 0; true -> 1 end,
771		    reply(From, ok, (state_ok(S))#state{cnt = N});
772		Error ->
773		    do_exit(S, From, Error, ?failure(Error, F, A))
774	    end;
775	#log{status = {blocked, false}}=L ->
776	    reply(From, {error, {blocked_log, L#log.name}}, S);
777	#log{blocked_by = From}=L ->
778	    reply(From, {error, {blocked_log, L#log.name}}, S);
779	_ ->
780	    enqueue(Message, S)
781    end;
782handle({From, {chunk, Pos, B, N}}=Message,  S) ->
783    case get(log) of
784	#log{status = ok} when S#state.cache_error =/= ok ->
785	    loop(cache_error(S, [From]));
786	#log{status = ok}=L ->
787	    R = do_chunk(L, Pos, B, N),
788	    reply(From, R, S);
789	#log{blocked_by = From}=L ->
790	    R = do_chunk(L, Pos, B, N),
791	    reply(From, R, S);
792	#log{status = {blocked, false}}=L ->
793	    reply(From, {error, {blocked_log, L#log.name}}, S);
794	_L ->
795	    enqueue(Message, S)
796    end;
797handle({From, {chunk_step, Pos, N}}=Message,  S) ->
798    case get(log) of
799	#log{status = ok} when S#state.cache_error =/= ok ->
800	    loop(cache_error(S, [From]));
801	#log{status = ok}=L ->
802	    R = do_chunk_step(L, Pos, N),
803	    reply(From, R, S);
804	#log{blocked_by = From}=L ->
805	    R = do_chunk_step(L, Pos, N),
806	    reply(From, R, S);
807	#log{status = {blocked, false}}=L ->
808	    reply(From, {error, {blocked_log, L#log.name}}, S);
809	_ ->
810	    enqueue(Message, S)
811    end;
812handle({From, {change_notify, Pid, NewNotify}}=Message, S) ->
813    case get(log) of
814	#log{status = ok}=L ->
815	    case do_change_notify(L, Pid, NewNotify) of
816		{ok, L1} ->
817		    put(log, L1),
818		    reply(From, ok, S);
819		Error ->
820		    reply(From, Error, S)
821	    end;
822	#log{status = {blocked, false}}=L ->
823	    reply(From, {error, {blocked_log, L#log.name}}, S);
824	#log{blocked_by = From}=L ->
825	    reply(From, {error, {blocked_log, L#log.name}}, S);
826	_ ->
827	    enqueue(Message, S)
828    end;
829handle({From, {change_header, NewHead}}=Message, S) ->
830    case get(log) of
831	#log{mode = read_only}=L ->
832	    reply(From, {error, {read_only_mode, L#log.name}}, S);
833	#log{status = ok, format = Format}=L ->
834	    case check_head(NewHead, Format) of
835		{ok, Head} ->
836		    put(log, L#log{head = mk_head(Head, Format)}),
837		    reply(From, ok, S);
838		Error ->
839		    reply(From, Error, S)
840	    end;
841	#log{status = {blocked, false}}=L ->
842	    reply(From, {error, {blocked_log, L#log.name}}, S);
843	#log{blocked_by = From}=L ->
844	    reply(From, {error, {blocked_log, L#log.name}}, S);
845	_ ->
846	    enqueue(Message, S)
847    end;
848handle({From, {change_size, NewSize}}=Message, S) ->
849    case get(log) of
850	#log{mode = read_only}=L ->
851	    reply(From, {error, {read_only_mode, L#log.name}}, S);
852	#log{status = ok}=L ->
853	    case check_size(L#log.type, NewSize) of
854		ok ->
855		    case catch do_change_size(L, NewSize) of % does the put
856			ok ->
857			    reply(From, ok, S);
858			{big, CurSize} ->
859			    reply(From,
860				  {error,
861				   {new_size_too_small, L#log.name, CurSize}},
862				  S);
863			Else ->
864			    reply(From, Else, state_err(S, Else))
865		    end;
866		not_ok ->
867		    reply(From, {error, {badarg, size}}, S)
868	    end;
869	#log{status = {blocked, false}}=L ->
870	    reply(From, {error, {blocked_log, L#log.name}}, S);
871	#log{blocked_by = From}=L ->
872	    reply(From, {error, {blocked_log, L#log.name}}, S);
873	_ ->
874	    enqueue(Message, S)
875    end;
876handle({From, inc_wrap_file}=Message, S) ->
877    case get(log) of
878	#log{mode = read_only}=L ->
879	    reply(From, {error, {read_only_mode, L#log.name}}, S);
880	#log{type = halt}=L ->
881	    reply(From, {error, {halt_log, L#log.name}}, S);
882	#log{status = ok} when S#state.cache_error =/= ok ->
883	    loop(cache_error(S, [From]));
884	#log{status = ok}=L ->
885	    case catch do_inc_wrap_file(L) of
886		{ok, L2, Lost} ->
887		    put(log, L2),
888		    notify_owners({wrap, Lost}),
889		    reply(From, ok, S#state{cnt = S#state.cnt-Lost});
890		{error, Error, L2} ->
891		    put(log, L2),
892		    reply(From, Error, state_err(S, Error))
893	    end;
894	#log{status = {blocked, false}}=L ->
895	    reply(From, {error, {blocked_log, L#log.name}}, S);
896	#log{blocked_by = From}=L ->
897	    reply(From, {error, {blocked_log, L#log.name}}, S);
898	_ ->
899	    enqueue(Message, S)
900    end;
901handle({From, {reopen, NewFile, Head, F, A}}, S) ->
902    case get(log) of
903	#log{mode = read_only}=L ->
904	    reply(From, {error, {read_only_mode, L#log.name}}, S);
905	#log{status = ok} when S#state.cache_error =/= ok ->
906	    loop(cache_error(S, [From]));
907	#log{status = ok, filename = NewFile}=L ->
908	    reply(From, {error, {same_file_name, L#log.name}}, S);
909	#log{status = ok}=L ->
910	    case catch close_disk_log2(L) of
911		closed ->
912		    File = L#log.filename,
913		    case catch rename_file(File, NewFile, L#log.type) of
914			ok ->
915			    H = merge_head(Head, L#log.head),
916			    case do_open((S#state.args)#arg{name = L#log.name,
917							    repair = truncate,
918							    head = H,
919							    file = File}) of
920				{ok, Res, L2, Cnt} ->
921				    put(log, L2#log{owners = L#log.owners,
922						    head = L#log.head,
923						    users = L#log.users}),
924				    notify_owners({truncated, S#state.cnt}),
925				    erase(is_full),
926				    case Res of
927					{error, _} ->
928					    do_exit(S, From, Res,
929						    ?failure(Res, F, A));
930					_ ->
931					    reply(From, ok, S#state{cnt = Cnt})
932				    end;
933				Res ->
934				    do_exit(S, From, Res, ?failure(Res, F, A))
935			    end;
936			Error ->
937			    do_exit(S, From, Error, ?failure(Error, reopen, 2))
938		    end;
939		Error ->
940		    do_exit(S, From, Error, ?failure(Error, F, A))
941	    end;
942	L ->
943	    reply(From, {error, {blocked_log, L#log.name}}, S)
944    end;
945handle({Server, {internal_open, A}}, S) ->
946    case get(log) of
947	undefined ->
948	    case do_open(A) of % does the put
949		{ok, Res, L, Cnt} ->
950		    put(log, opening_pid(A#arg.linkto, A#arg.notify, L)),
951                    #arg{size = Size, old_size = OldSize} = A,
952                    S1 = S#state{args=A, cnt=Cnt},
953                    case Size =:= OldSize of
954                        true ->
955                            reply(Server, Res, S1);
956                        false ->
957                            case catch do_change_size(get(log), Size) of % does the put
958                                ok ->
959                                    reply(Server, Res, S1);
960                                {big, _CurSize} ->
961                                    %% Note: halt logs are opened even if the
962                                    %% given size is too small.
963                                    reply(Server, Res, S1);
964                                Else ->
965                                    reply(Server, Else, S1)
966                            end
967                    end;
968		Res ->
969		    do_fast_exit(S, Server, Res)
970	    end;
971	L ->
972	    TestH = mk_head(A#arg.head, A#arg.format),
973	    case compare_arg(A#arg.options, S#state.args, TestH, L#log.head) of
974		ok ->
975		    case add_pid(A#arg.linkto, A#arg.notify, L) of
976			{ok, L1} ->
977			    put(log, L1),
978			    reply(Server, {ok, L#log.name}, S);
979			Error ->
980			    reply(Server, Error, S)
981		    end;
982		Error ->
983		    reply(Server, Error, S)
984	    end
985    end;
986handle({From, close}, S) ->
987    case do_close(From, S) of
988	{stop, S1} ->
989	    do_exit(S1, From, ok, normal);
990	{continue, S1} ->
991	    reply(From, ok, S1)
992    end;
993handle({From, info}, S) ->
994    reply(From, do_info(get(log), S#state.cnt), S);
995handle({'EXIT', From, Reason}, #state{parent=From}=S) ->
996    %% Parent orders shutdown.
997    _ = do_stop(S),
998    exit(Reason);
999handle({'EXIT', From, Reason}, #state{server=From}=S) ->
1000    %% The server is gone.
1001    _ = do_stop(S),
1002    exit(Reason);
1003handle({'EXIT', From, _Reason}, S) ->
1004    L = get(log),
1005    case is_owner(From, L) of
1006	{true, _Notify} ->
1007	    case close_owner(From, L, S) of
1008		{stop, S1} ->
1009		    _ = do_stop(S1),
1010		    exit(normal);
1011		{continue, S1} ->
1012		    loop(S1)
1013	    end;
1014	false ->
1015	    %% 'users' is not decremented.
1016	    S1 = do_unblock(From, get(log), S),
1017	    loop(S1)
1018    end;
1019handle({system, From, Req}, S) ->
1020    sys:handle_system_msg(Req, From, S#state.parent, ?MODULE, [], S);
1021handle(_, S) ->
1022    loop(S).
1023
1024enqueue(Message, #state{queue = Queue}=S) ->
1025    loop(S#state{queue = [Message | Queue]}).
1026
1027%% Collect further log and sync requests already in the mailbox or queued
1028
1029-define(MAX_LOOK_AHEAD, 64*1024).
1030
1031%% Inlined.
1032log_loop(#state{cache_error = CE}=S, Pids, _Bins, _Sync, _Sz, _F) when CE =/= ok ->
1033    loop(cache_error(S, Pids));
1034log_loop(#state{}=S, Pids, Bins, Sync, Sz, _F) when Sz > ?MAX_LOOK_AHEAD ->
1035    loop(log_end(S, Pids, Bins, Sync, Sz));
1036log_loop(#state{messages = []}=S, Pids, Bins, Sync, Sz, F) ->
1037    receive
1038	Message ->
1039            log_loop(Message, Pids, Bins, Sync, Sz, F, S)
1040    after 0 ->
1041	    loop(log_end(S, Pids, Bins, Sync, Sz))
1042    end;
1043log_loop(#state{messages = [M | Ms]}=S, Pids, Bins, Sync, Sz, F) ->
1044    S1 = S#state{messages = Ms},
1045    log_loop(M, Pids, Bins, Sync, Sz, F, S1).
1046
1047%% Items logged after the last sync request found are sync:ed as well.
1048log_loop({alog, internal, B}, Pids, Bins, Sync, Sz, internal=F, S) ->
1049    %% alog of terms allowed for the internal format only
1050    log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B), F);
1051log_loop({alog, binary, B}, Pids, Bins, Sync, Sz, F, S) ->
1052    log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B), F);
1053log_loop({From, {log, internal, B}}, Pids, Bins, Sync, Sz, internal=F, S) ->
1054    %% log of terms allowed for the internal format only
1055    log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B), F);
1056log_loop({From, {log, binary, B}}, Pids, Bins, Sync, Sz, F, S) ->
1057    log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B), F);
1058log_loop({From, sync}, Pids, Bins, Sync, Sz, F, S) ->
1059    log_loop(S, Pids, Bins, [From | Sync], Sz, F);
1060log_loop(Message, Pids, Bins, Sync, Sz, _F, S) ->
1061    NS = log_end(S, Pids, Bins, Sync, Sz),
1062    handle(Message, NS).
1063
1064log_end(S, [], [], Sync, _Sz) ->
1065    log_end_sync(S, Sync);
1066log_end(#state{cnt = Cnt}=S, Pids, Bins, Sync, Sz) ->
1067    case do_log(get(log), rflat(Bins), Sz) of
1068	N when is_integer(N) ->
1069	    ok = replies(Pids, ok),
1070	    S1 = (state_ok(S))#state{cnt = Cnt + N},
1071	    log_end_sync(S1, Sync);
1072        {error, {error, {full, _Name}}, N} when Pids =:= [] ->
1073            log_end_sync(state_ok(S#state{cnt = Cnt + N}), Sync);
1074	{error, Error, N} ->
1075	    ok = replies(Pids, Error),
1076	    state_err(S#state{cnt = Cnt + N}, Error)
1077    end.
1078
1079%% Inlined.
1080log_end_sync(S, []) ->
1081    S;
1082log_end_sync(S, Sync) ->
1083    Res = do_sync(get(log)),
1084    ok = replies(Sync, Res),
1085    state_err(S, Res).
1086
1087%% Inlined.
1088rflat([B]) -> B;
1089rflat(B) -> rflat(B, []).
1090
1091rflat([B | Bs], L) ->
1092    rflat(Bs, B ++ L);
1093rflat([], L) -> L.
1094
1095%% -> {ok, Log} | {error, Error}
1096do_change_notify(L, Pid, Notify) ->
1097    case is_owner(Pid, L) of
1098	{true, Notify} ->
1099	    {ok, L};
1100	{true, _OldNotify} when Notify =/= true, Notify =/= false ->
1101	    {error, {badarg, notify}};
1102	{true, _OldNotify} ->
1103	    Owners = lists:keydelete(Pid, 1, L#log.owners),
1104	    L1 = L#log{owners = [{Pid, Notify} | Owners]},
1105	    {ok, L1};
1106	false ->
1107	    {error, {not_owner, Pid}}
1108    end.
1109
1110%% -> {stop, S} | {continue, S}
1111do_close(Pid, S) ->
1112    L = get(log),
1113    case is_owner(Pid, L) of
1114	{true, _Notify} ->
1115	    close_owner(Pid, L, S);
1116	false ->
1117	    close_user(Pid, L, S)
1118    end.
1119
1120%% -> {stop, S} | {continue, S}
1121close_owner(Pid, L, S) ->
1122    L1 = L#log{owners = lists:keydelete(Pid, 1, L#log.owners)},
1123    put(log, L1),
1124    S2 = do_unblock(Pid, get(log), S),
1125    unlink(Pid),
1126    do_close2(L1, S2).
1127
1128%% -> {stop, S} | {continue, S}
1129close_user(Pid, #log{users=Users}=L, S) when Users > 0 ->
1130    L1 = L#log{users = Users - 1},
1131    put(log, L1),
1132    S2 = do_unblock(Pid, get(log), S),
1133    do_close2(L1, S2);
1134close_user(_Pid, _L, S) ->
1135    {continue, S}.
1136
1137do_close2(#log{users = 0, owners = []}, S) ->
1138    {stop, S};
1139do_close2(_L, S) ->
1140    {continue, S}.
1141
1142%%-----------------------------------------------------------------
1143%% Callback functions for system messages handling.
1144%%-----------------------------------------------------------------
1145system_continue(_Parent, _, State) ->
1146    loop(State).
1147
1148-spec system_terminate(_, _, _, #state{}) -> no_return().
1149system_terminate(Reason, _Parent, _, State) ->
1150    _ = do_stop(State),
1151    exit(Reason).
1152
1153%%-----------------------------------------------------------------
1154%% Temporay code for upgrade.
1155%%-----------------------------------------------------------------
1156system_code_change(State, _Module, _OldVsn, _Extra) ->
1157    {ok, State}.
1158
1159
1160%%%----------------------------------------------------------------------
1161%%% Internal functions
1162%%%----------------------------------------------------------------------
1163-spec do_exit(#state{}, pid(), _, _) -> no_return().
1164do_exit(S, From, Message0, Reason) ->
1165    R = do_stop(S),
1166    Message = case S#state.cache_error of
1167		  Err when Err =/= ok -> Err;
1168		  _ when R =:= closed -> Message0;
1169		  _ when Message0 =:= ok -> R;
1170		  _ -> Message0
1171	      end,
1172    _ = disk_log_server:close(self()),
1173    ok = replies(From, Message),
1174    ?PROFILE(ep:done()),
1175    exit(Reason).
1176
1177-spec do_fast_exit(#state{}, pid(), _) -> no_return().
1178do_fast_exit(S, Server, Message) ->
1179    _ = do_stop(S),
1180    Server ! {disk_log, self(), Message},
1181    exit(normal).
1182
1183%% -> closed | Error
1184do_stop(S) ->
1185    proc_q(S#state.queue ++ S#state.messages),
1186    close_disk_log(get(log)).
1187
1188proc_q([{From, _R}|Tail]) when is_pid(From) ->
1189    From ! {disk_log, self(), {error, disk_log_stopped}},
1190    proc_q(Tail);
1191proc_q([_|T]) -> %% async stuff
1192    proc_q(T);
1193proc_q([]) ->
1194    ok.
1195
1196%% -> log()
1197opening_pid(Pid, Notify, L) ->
1198    {ok, L1} = add_pid(Pid, Notify, L),
1199    L1.
1200
1201%% -> {ok, log()} | Error
1202add_pid(Pid, Notify, L) when is_pid(Pid) ->
1203    case is_owner(Pid, L) of
1204	false ->
1205            link(Pid),
1206	    {ok, L#log{owners = [{Pid, Notify} | L#log.owners]}};
1207	{true, Notify}  ->
1208%%	    {error, {pid_already_connected, L#log.name}};
1209	    {ok, L};
1210	{true, CurNotify} when Notify =/= CurNotify ->
1211	    {error, {arg_mismatch, notify, CurNotify, Notify}}
1212    end;
1213add_pid(_NotAPid, _Notify, L) ->
1214    {ok, L#log{users = L#log.users + 1}}.
1215
1216unblock_pid(#log{blocked_by = none}) ->
1217    ok;
1218unblock_pid(#log{blocked_by = Pid}=L) ->
1219    case is_owner(Pid, L) of
1220	{true, _Notify} ->
1221	    ok;
1222	false ->
1223	    unlink(Pid)
1224    end.
1225
1226%% -> true | false
1227is_owner(Pid, L) ->
1228    case lists:keysearch(Pid, 1, L#log.owners) of
1229	{value, {_Pid, Notify}} ->
1230	    {true, Notify};
1231	false ->
1232	    false
1233    end.
1234
1235%% ok | throw(Error)
1236rename_file(File, NewFile, halt) ->
1237    case file:rename(File, NewFile) of
1238        ok ->
1239            ok;
1240        Else ->
1241            file_error(NewFile, Else)
1242    end;
1243rename_file(File, NewFile, wrap) ->
1244    rename_file(wrap_file_extensions(File), File, NewFile, ok).
1245
1246rename_file([Ext|Exts], File, NewFile0, Res) ->
1247    NewFile = add_ext(NewFile0, Ext),
1248    NRes = case file:rename(add_ext(File, Ext), NewFile) of
1249	       ok ->
1250		   Res;
1251	       Else ->
1252		   file_error(NewFile, Else)
1253	   end,
1254    rename_file(Exts, File, NewFile0, NRes);
1255rename_file([], _File, _NewFiles, Res) -> Res.
1256
1257file_error(FileName, {error, Error}) ->
1258    {error, {file_error, FileName, Error}}.
1259
1260%% "Old" error messages have been kept, arg_mismatch has been added.
1261%%-spec compare_arg(dlog_options(), #arg{},
1262compare_arg([], _A, none, _OrigHead) ->
1263    % no header option given
1264    ok;
1265compare_arg([], _A, Head, OrigHead) when Head =/= OrigHead ->
1266    {error, {arg_mismatch, head, OrigHead, Head}};
1267compare_arg([], _A, _Head, _OrigHead) ->
1268    ok;
1269compare_arg([{Attr, Val} | Tail], A, Head, OrigHead) ->
1270    case compare_arg(Attr, Val, A) of
1271	{not_ok, OrigVal} ->
1272	    {error, {arg_mismatch, Attr, OrigVal, Val}};
1273	ok ->
1274	    compare_arg(Tail, A, Head, OrigHead);
1275	Error ->
1276	    Error
1277    end.
1278
1279-spec compare_arg(atom(), _, #arg{}) ->
1280	     'ok' | {'not_ok', _} | {'error', {atom(), _}}.
1281compare_arg(file, F, A) when F =/= A#arg.file ->
1282    {error, {name_already_open, A#arg.name}};
1283compare_arg(mode, read_only, A) when A#arg.mode =:= read_write ->
1284    {error, {open_read_write, A#arg.name}};
1285compare_arg(mode, read_write, A) when A#arg.mode =:= read_only ->
1286    {error, {open_read_only, A#arg.name}};
1287compare_arg(type, T, A) when T =/= A#arg.type ->
1288    {not_ok, A#arg.type};
1289compare_arg(format, F, A) when F =/= A#arg.format ->
1290    {not_ok, A#arg.format};
1291compare_arg(repair, R, A) when R =/= A#arg.repair ->
1292    %% not used, but check it anyway...
1293    {not_ok, A#arg.repair};
1294compare_arg(size, Sz, A) when Sz =/= A#arg.size, A#arg.type =:= wrap ->
1295    %% Note: if the given size does not match the size of an open
1296    %% halt log, the given size is ignored.
1297    {error, {size_mismatch, A#arg.size, Sz}};
1298compare_arg(_Attr, _Val, _A) ->
1299    ok.
1300
1301%% -> {ok, Res, log(), Cnt} | Error
1302do_open(A) ->
1303    #arg{type = Type, format = Format, name = Name, head = Head0,
1304         file = FName, repair = Repair, size = Size, mode = Mode,
1305         quiet = Quiet, version = V, old_size = OldSize} = A,
1306    disk_log_1:set_quiet(Quiet),
1307    Head = mk_head(Head0, Format),
1308    case do_open2(Type, Format, Name, FName, Repair, OldSize, Mode, Head, V) of
1309        {ok, Ret, Extra, FormatType, NoItems} ->
1310            L = #log{name = Name, type = Type, format = Format,
1311                     filename = FName, size = Size,
1312                     format_type = FormatType, head = Head, mode = Mode,
1313                     version = V, extra = Extra},
1314            {ok, Ret, L, NoItems};
1315        Error ->
1316            Error
1317    end.
1318
1319mk_head({head, Term}, internal) -> {ok, term_to_binary(Term)};
1320mk_head({head, Bytes}, external) -> {ok, ensure_binary(Bytes)};
1321mk_head(H, _) -> H.
1322
1323terms2bins([T | Ts]) ->
1324    [term_to_binary(T) | terms2bins(Ts)];
1325terms2bins([]) ->
1326    [].
1327
1328ensure_binary_list(Bs) ->
1329    ensure_binary_list(Bs, Bs).
1330
1331ensure_binary_list([B | Bs], Bs0) when is_binary(B) ->
1332    ensure_binary_list(Bs, Bs0);
1333ensure_binary_list([], Bs0) ->
1334    Bs0;
1335ensure_binary_list(_, Bs0) ->
1336    make_binary_list(Bs0).
1337
1338make_binary_list([B | Bs]) ->
1339    [ensure_binary(B) | make_binary_list(Bs)];
1340make_binary_list([]) ->
1341    [].
1342
1343ensure_binary(Bytes) ->
1344    iolist_to_binary(Bytes).
1345
1346%%-----------------------------------------------------------------
1347%% Change size of the logs in runtime.
1348%%-----------------------------------------------------------------
1349%% -> ok | {big, CurSize} | throw(Error)
1350do_change_size(#log{type = halt}=L, NewSize) ->
1351    Halt = L#log.extra,
1352    CurB = Halt#halt.curB,
1353    NewLog = L#log{extra = Halt#halt{size = NewSize}},
1354    if
1355	NewSize =:= infinity ->
1356	    erase(is_full),
1357	    put(log, NewLog),
1358	    ok;
1359	CurB =< NewSize ->
1360	    erase(is_full),
1361	    put(log, NewLog),
1362	    ok;
1363	true ->
1364	    {big, CurB}
1365    end;
1366do_change_size(#log{type = wrap}=L, NewSize) ->
1367    #log{extra = Extra, version = Version} = L,
1368    {ok, Handle} = disk_log_1:change_size_wrap(Extra, NewSize, Version),
1369    erase(is_full),
1370    put(log, L#log{extra = Handle}),
1371    ok.
1372
1373%% -> {ok, Head} | Error; Head = none | {head, H} | {M,F,A}
1374check_head({head, none}, _Format) ->
1375    {ok, none};
1376check_head({head_func, {M, F, A}}, _Format) when is_atom(M),
1377                                                 is_atom(F),
1378                                                 is_list(A) ->
1379    {ok, {M, F, A}};
1380check_head({head, Head}, external) ->
1381    case catch ensure_binary(Head) of
1382	{'EXIT', _} ->
1383	    {error, {badarg, head}};
1384	_ ->
1385	    {ok, {head, Head}}
1386    end;
1387check_head({head, Term}, internal) ->
1388    {ok, {head, Term}};
1389check_head(_Head, _Format) ->
1390    {error, {badarg, head}}.
1391
1392check_size(wrap, {NewMaxB,NewMaxF}) when
1393  is_integer(NewMaxB), is_integer(NewMaxF),
1394  NewMaxB > 0, NewMaxB =< ?MAX_BYTES, NewMaxF > 0, NewMaxF < ?MAX_FILES ->
1395    ok;
1396check_size(halt, NewSize) when is_integer(NewSize), NewSize > 0 ->
1397    ok;
1398check_size(halt, infinity) ->
1399    ok;
1400check_size(_, _) ->
1401    not_ok.
1402
1403%%-----------------------------------------------------------------
1404%% Increment a wrap log.
1405%%-----------------------------------------------------------------
1406%% -> {ok, log(), Lost} | {error, Error, log()}
1407do_inc_wrap_file(L) ->
1408    #log{format = Format, extra = Handle} = L,
1409    case Format of
1410	internal ->
1411	    case disk_log_1:mf_int_inc(Handle, L#log.head) of
1412		{ok, Handle2, Lost} ->
1413		    {ok, L#log{extra = Handle2}, Lost};
1414		{error, Error, Handle2} ->
1415		    {error, Error, L#log{extra = Handle2}}
1416	    end;
1417	external ->
1418	    case disk_log_1:mf_ext_inc(Handle, L#log.head) of
1419		{ok, Handle2, Lost} ->
1420		    {ok, L#log{extra = Handle2}, Lost};
1421		{error, Error, Handle2} ->
1422		    {error, Error, L#log{extra = Handle2}}
1423	    end
1424    end.
1425
1426
1427%%-----------------------------------------------------------------
1428%% Open a log file.
1429%%-----------------------------------------------------------------
1430%% -> {ok, Reply, log(), Cnt} | Error
1431%% Note: the header is always written, even if the log size is too small.
1432do_open2(halt, internal, Name, FName, Repair, Size, Mode, Head, _V) ->
1433    case catch disk_log_1:int_open(FName, Repair, Mode, Head) of
1434	{ok, {_Alloc, FdC, {NoItems, _NoBytes}, FileSize}} ->
1435            Halt = #halt{fdc = FdC, curB = FileSize, size = Size},
1436	    {ok, {ok, Name}, Halt, halt_int, NoItems};
1437	{repaired, FdC, Rec, Bad, FileSize} ->
1438            Halt = #halt{fdc = FdC, curB = FileSize, size = Size},
1439	    {ok, {repaired, Name, {recovered, Rec}, {badbytes, Bad}},
1440             Halt, halt_int, Rec};
1441	Error ->
1442	    Error
1443    end;
1444do_open2(wrap, internal, Name, FName, Repair, Size, Mode, Head, V) ->
1445    {MaxB, MaxF} = Size,
1446    case catch
1447      disk_log_1:mf_int_open(FName, MaxB, MaxF, Repair, Mode, Head, V) of
1448	{ok, Handle, Cnt} ->
1449	    {ok, {ok, Name}, Handle, wrap_int, Cnt};
1450	{repaired, Handle, Rec, Bad, Cnt} ->
1451	    {ok, {repaired, Name, {recovered, Rec}, {badbytes, Bad}},
1452	     Handle, wrap_int, Cnt};
1453	Error ->
1454	    Error
1455    end;
1456do_open2(halt, external, Name, FName, Repair, Size, Mode, Head, _V) ->
1457    case catch disk_log_1:ext_open(FName, Repair, Mode, Head) of
1458	{ok, {_Alloc, FdC, {NoItems, _NoBytes}, FileSize}} ->
1459            Halt = #halt{fdc = FdC, curB = FileSize, size = Size},
1460	    {ok, {ok, Name}, Halt, halt_ext, NoItems};
1461	Error ->
1462	    Error
1463    end;
1464do_open2(wrap, external, Name, FName, Repair, Size, Mode, Head, V) ->
1465    {MaxB, MaxF} = Size,
1466    case catch
1467      disk_log_1:mf_ext_open(FName, MaxB, MaxF, Repair, Mode, Head, V) of
1468	{ok, Handle, Cnt} ->
1469	    {ok, {ok, Name}, Handle, wrap_ext, Cnt};
1470	Error ->
1471	    Error
1472    end.
1473
1474%% -> closed | Error
1475close_disk_log(undefined) ->
1476    closed;
1477close_disk_log(L) ->
1478    unblock_pid(L),
1479    F = fun({Pid, _}) ->
1480		unlink(Pid)
1481	end,
1482    lists:foreach(F, L#log.owners),
1483    R = (catch close_disk_log2(L)),
1484    erase(log),
1485    R.
1486
1487-spec close_disk_log2(#log{}) -> 'closed'. % | throw(Error)
1488
1489close_disk_log2(L) ->
1490    case L of
1491	#log{format_type = halt_int, mode = Mode, extra = Halt} ->
1492	    disk_log_1:close(Halt#halt.fdc, L#log.filename, Mode);
1493	#log{format_type = wrap_int, mode = Mode, extra = Handle} ->
1494	    disk_log_1:mf_int_close(Handle, Mode);
1495	#log{format_type = halt_ext, extra = Halt} ->
1496	    disk_log_1:fclose(Halt#halt.fdc, L#log.filename);
1497	#log{format_type = wrap_ext, mode = Mode, extra = Handle} ->
1498	    disk_log_1:mf_ext_close(Handle, Mode)
1499    end,
1500    closed.
1501
1502do_format_error({error, Module, Error}) ->
1503    Module:format_error(Error);
1504do_format_error({error, Reason}) ->
1505    do_format_error(Reason);
1506do_format_error({Node, Error = {error, _Reason}}) ->
1507    lists:append(io_lib:format("~p: ", [Node]), do_format_error(Error));
1508do_format_error({badarg, Arg}) ->
1509    io_lib:format("The argument ~p is missing, not recognized or "
1510		  "not wellformed~n", [Arg]);
1511do_format_error({size_mismatch, OldSize, ArgSize}) ->
1512    io_lib:format("The given size ~p does not match the size ~p found on "
1513		  "the disk log size file~n", [ArgSize, OldSize]);
1514do_format_error({read_only_mode, Log}) ->
1515    io_lib:format("The disk log ~tp has been opened read-only, but the "
1516		  "requested operation needs read-write access~n", [Log]);
1517do_format_error({format_external, Log}) ->
1518    io_lib:format("The requested operation can only be applied on internally "
1519		  "formatted disk logs, but ~tp is externally formatted~n",
1520		  [Log]);
1521do_format_error({blocked_log, Log}) ->
1522    io_lib:format("The blocked disk log ~tp does not queue requests, or "
1523		  "the log has been blocked by the calling process~n", [Log]);
1524do_format_error({full, Log}) ->
1525    io_lib:format("The halt log ~tp is full~n", [Log]);
1526do_format_error({not_blocked, Log}) ->
1527    io_lib:format("The disk log ~tp is not blocked~n", [Log]);
1528do_format_error({not_owner, Pid}) ->
1529    io_lib:format("The pid ~tp is not an owner of the disk log~n", [Pid]);
1530do_format_error({not_blocked_by_pid, Log}) ->
1531    io_lib:format("The disk log ~tp is blocked, but only the blocking pid "
1532		  "can unblock a disk log~n", [Log]);
1533do_format_error({new_size_too_small, Log, CurrentSize}) ->
1534    io_lib:format("The current size ~p of the halt log ~tp is greater than the "
1535		  "requested new size~n", [CurrentSize, Log]);
1536do_format_error({halt_log, Log}) ->
1537    io_lib:format("The halt log ~tp cannot be wrapped~n", [Log]);
1538do_format_error({same_file_name, Log}) ->
1539    io_lib:format("Current and new file name of the disk log ~tp "
1540		  "are the same~n", [Log]);
1541do_format_error({arg_mismatch, Option, FirstValue, ArgValue}) ->
1542    io_lib:format("The value ~tp of the disk log option ~p does not match "
1543		  "the current value ~tp~n", [ArgValue, Option, FirstValue]);
1544do_format_error({name_already_open, Log}) ->
1545    io_lib:format("The disk log ~tp has already opened another file~n", [Log]);
1546do_format_error({open_read_write, Log}) ->
1547    io_lib:format("The disk log ~tp has already been opened read-write~n",
1548		  [Log]);
1549do_format_error({open_read_only, Log}) ->
1550    io_lib:format("The disk log ~tp has already been opened read-only~n",
1551		  [Log]);
1552do_format_error({not_internal_wrap, Log}) ->
1553    io_lib:format("The requested operation cannot be applied since ~tp is not "
1554		  "an internally formatted disk log~n", [Log]);
1555do_format_error(no_such_log) ->
1556    io_lib:format("There is no disk log with the given name~n", []);
1557do_format_error(nonode) ->
1558    io_lib:format("There seems to be no node up that can handle "
1559		  "the request~n", []);
1560do_format_error(nodedown) ->
1561    io_lib:format("There seems to be no node up that can handle "
1562		  "the request~n", []);
1563do_format_error({corrupt_log_file, FileName}) ->
1564    io_lib:format("The disk log file \"~ts\" contains corrupt data~n",
1565                  [FileName]);
1566do_format_error({need_repair, FileName}) ->
1567    io_lib:format("The disk log file \"~ts\" has not been closed properly and "
1568		  "needs repair~n", [FileName]);
1569do_format_error({not_a_log_file, FileName}) ->
1570    io_lib:format("The file \"~ts\" is not a wrap log file~n", [FileName]);
1571do_format_error({invalid_header, InvalidHeader}) ->
1572    io_lib:format("The disk log header is not wellformed: ~p~n",
1573		  [InvalidHeader]);
1574do_format_error(end_of_log) ->
1575    io_lib:format("An attempt was made to step outside a not yet "
1576		  "full wrap log~n", []);
1577do_format_error({invalid_index_file, FileName}) ->
1578    io_lib:format("The wrap log index file \"~ts\" cannot be used~n",
1579		  [FileName]);
1580do_format_error({no_continuation, BadCont}) ->
1581    io_lib:format("The term ~p is not a chunk continuation~n", [BadCont]);
1582do_format_error({file_error, FileName, Reason}) ->
1583    io_lib:format("\"~ts\": ~tp~n", [FileName, file:format_error(Reason)]);
1584do_format_error(E) ->
1585    io_lib:format("~tp~n", [E]).
1586
1587do_info(L, Cnt) ->
1588    #log{name = Name, type = Type, mode = Mode, filename = File,
1589	 extra = Extra, status = Status, owners = Owners, users = Users,
1590	 format = Format, head = Head} = L,
1591    Size = case Type of
1592	       wrap ->
1593		   disk_log_1:get_wrap_size(Extra);
1594	       halt ->
1595		   Extra#halt.size
1596	   end,
1597    RW = case Type of
1598	     wrap when Mode =:= read_write ->
1599		 #handle{curB = CurB, curF = CurF,
1600			 cur_cnt = CurCnt, acc_cnt = AccCnt,
1601			 noFull = NoFull, accFull = AccFull} = Extra,
1602		 NewAccFull = AccFull + NoFull,
1603		 NewExtra = Extra#handle{noFull = 0, accFull = NewAccFull},
1604		 put(log, L#log{extra = NewExtra}),
1605		 [{no_current_bytes, CurB},
1606		  {no_current_items, CurCnt},
1607		  {no_items, Cnt},
1608		  {no_written_items, CurCnt + AccCnt},
1609		  {current_file, CurF},
1610		  {no_overflows, {NewAccFull, NoFull}}
1611		 ];
1612	     halt when Mode =:= read_write ->
1613		 IsFull = case get(is_full) of
1614			      undefined -> false;
1615			      _ -> true
1616			  end,
1617		 [{full, IsFull},
1618		  {no_written_items, Cnt}
1619		 ];
1620	     _ when Mode =:= read_only ->
1621		 []
1622	 end,
1623    HeadL = case Mode of
1624		read_write ->
1625		    [{head, case Head of
1626                                {ok, H} -> H;
1627                                none -> Head;
1628                                {_M, _F, _A} -> Head
1629                            end}];
1630		read_only ->
1631		    []
1632	    end,
1633    Common = [{name, Name},
1634	      {file, File},
1635	      {type, Type},
1636	      {format, Format},
1637	      {size, Size},
1638	      {items, Cnt}, % kept for "backward compatibility" (undocumented)
1639	      {owners, Owners},
1640	      {users, Users}] ++
1641	     HeadL ++
1642	     [{mode, Mode},
1643	      {status, Status},
1644	      {node, node()}
1645	     ],
1646    Common ++ RW.
1647
1648do_block(Pid, QueueLogRecs, L) ->
1649    L2 = L#log{status = {blocked, QueueLogRecs}, blocked_by = Pid},
1650    put(log, L2),
1651    case is_owner(Pid, L2) of
1652	{true, _Notify} ->
1653	    ok;
1654	false ->
1655	    link(Pid)
1656    end.
1657
1658do_unblock(Pid, #log{blocked_by = Pid}=L, S) ->
1659    do_unblock(L, S);
1660do_unblock(_Pid, _L, S) ->
1661    S.
1662
1663do_unblock(L, S) ->
1664    unblock_pid(L),
1665    L2 = L#log{blocked_by = none, status = ok},
1666    put(log, L2),
1667    %% Since the block request is synchronous, and the blocking
1668    %% process is the only process that can unblock, all requests in
1669    %% 'messages' will have been put in 'queue' before the unblock
1670    %% request is granted.
1671    [] = S#state.messages, % assertion
1672    S#state{queue = [], messages = lists:reverse(S#state.queue)}.
1673
1674-spec do_log(#log{}, [binary()]) -> integer() | {'error', _, integer()}.
1675
1676do_log(L, B) ->
1677    do_log(L, B, iolist_size(B)).
1678
1679do_log(#log{type = halt}=L, B, BSz) ->
1680    #log{format = Format, extra = Halt} = L,
1681    #halt{curB = CurSize, size = Sz} = Halt,
1682    {Bs, BSize} = logl(B, Format, BSz),
1683    case get(is_full) of
1684	true ->
1685            {error, {error, {full, L#log.name}}, 0};
1686	undefined when Sz =:= infinity; CurSize + BSize =< Sz ->
1687	    halt_write(Halt, L, B, Bs, BSize);
1688	undefined ->
1689	    halt_write_full(L, B, Format, 0)
1690    end;
1691do_log(#log{format_type = wrap_int}=L, B, _BSz) ->
1692    case disk_log_1:mf_int_log(L#log.extra, B, L#log.head) of
1693	{ok, Handle, Logged, Lost, Wraps} ->
1694	    notify_owners_wrap(Wraps),
1695	    put(log, L#log{extra = Handle}),
1696	    Logged - Lost;
1697	{ok, Handle, Logged} ->
1698	    put(log, L#log{extra = Handle}),
1699	    Logged;
1700	{error, Error, Handle, Logged, Lost} ->
1701	    put(log, L#log{extra = Handle}),
1702	    {error, Error, Logged - Lost}
1703    end;
1704do_log(#log{format_type = wrap_ext}=L, B, _BSz) ->
1705    case disk_log_1:mf_ext_log(L#log.extra, B, L#log.head) of
1706	{ok, Handle, Logged, Lost, Wraps} ->
1707	    notify_owners_wrap(Wraps),
1708	    put(log, L#log{extra = Handle}),
1709	    Logged - Lost;
1710	{ok, Handle, Logged} ->
1711	    put(log, L#log{extra = Handle}),
1712	    Logged;
1713	{error, Error, Handle, Logged, Lost} ->
1714	    put(log, L#log{extra = Handle}),
1715	    {error, Error, Logged - Lost}
1716    end.
1717
1718logl(B, external, undefined) ->
1719    {B, iolist_size(B)};
1720logl(B, external, Sz) ->
1721    {B, Sz};
1722logl(B, internal, _Sz) ->
1723    disk_log_1:logl(B).
1724
1725halt_write_full(L, [Bin | Bins], Format, N) ->
1726    B = [Bin],
1727    {Bs, BSize} = logl(B, Format, undefined),
1728    Halt = L#log.extra,
1729    #halt{curB = CurSize, size = Sz} = Halt,
1730    if
1731	CurSize + BSize =< Sz ->
1732	    case halt_write(Halt, L, B, Bs, BSize) of
1733		N1 when is_integer(N1) ->
1734		    halt_write_full(get(log), Bins, Format, N+N1);
1735		Error ->
1736		    Error
1737	    end;
1738	true ->
1739	    halt_write_full(L, [], Format, N)
1740    end;
1741halt_write_full(L, _Bs, _Format, N) ->
1742    put(is_full, true),
1743    notify_owners(full),
1744    {error, {error, {full, L#log.name}}, N}.
1745
1746halt_write(Halt, L, B, Bs, BSize) ->
1747    case disk_log_1:fwrite(Halt#halt.fdc, L#log.filename, Bs, BSize) of
1748	{ok, NewFdC} ->
1749	    NCurB = Halt#halt.curB + BSize,
1750	    NewHalt = Halt#halt{fdc = NewFdC, curB = NCurB},
1751	    put(log, L#log{extra = NewHalt}),
1752	    length(B);
1753	{Error, NewFdC} ->
1754	    put(log, L#log{extra = Halt#halt{fdc = NewFdC}}),
1755	    {error, Error, 0}
1756    end.
1757
1758%% -> ok | Error
1759do_write_cache(#log{filename = FName, type = halt, extra = Halt} = Log) ->
1760    {Reply, NewFdC} = disk_log_1:write_cache(Halt#halt.fdc, FName),
1761    put(log, Log#log{extra = Halt#halt{fdc = NewFdC}}),
1762    Reply;
1763do_write_cache(#log{type = wrap, extra = Handle} = Log) ->
1764    {Reply, NewHandle} = disk_log_1:mf_write_cache(Handle),
1765    put(log, Log#log{extra = NewHandle}),
1766    Reply.
1767
1768%% -> ok | Error
1769do_sync(#log{filename = FName, type = halt, extra = Halt} = Log) ->
1770    {Reply, NewFdC} = disk_log_1:sync(Halt#halt.fdc, FName),
1771    put(log, Log#log{extra = Halt#halt{fdc = NewFdC}}),
1772    Reply;
1773do_sync(#log{type = wrap, extra = Handle} = Log) ->
1774    {Reply, NewHandle} = disk_log_1:mf_sync(Handle),
1775    put(log, Log#log{extra = NewHandle}),
1776    Reply.
1777
1778%% -> ok | Error | throw(Error)
1779do_trunc(#log{type = halt}=L, Head) ->
1780    #log{filename = FName, extra = Halt} = L,
1781    FdC = Halt#halt.fdc,
1782    {Reply1, FdC2} =
1783        case L#log.format of
1784            internal ->
1785                disk_log_1:truncate(FdC, FName, Head);
1786            external ->
1787		case disk_log_1:truncate_at(FdC, FName, bof) of
1788		    {ok, NFdC} when Head =:= none ->
1789                        {ok, NFdC};
1790		    {ok, NFdC} ->
1791			{ok, H} = Head,
1792                        disk_log_1:fwrite(NFdC, FName, H, byte_size(H));
1793		    R ->
1794			R
1795		end
1796        end,
1797    {Reply, NewHalt} =
1798    case disk_log_1:position(FdC2, FName, cur) of
1799	{ok, NewFdC, FileSize} when Reply1 =:= ok ->
1800	    {ok, Halt#halt{fdc = NewFdC, curB = FileSize}};
1801	{Reply2, NewFdC} ->
1802	    {Reply2, Halt#halt{fdc = NewFdC}};
1803	{ok, NewFdC, _} ->
1804	    {Reply1, Halt#halt{fdc = NewFdC}}
1805    end,
1806    put(log, L#log{extra = NewHalt}),
1807    Reply;
1808do_trunc(#log{type = wrap}=L, Head) ->
1809    Handle = L#log.extra,
1810    OldHead = L#log.head,
1811    {MaxB, MaxF} = disk_log_1:get_wrap_size(Handle),
1812    ok = do_change_size(L, {MaxB, 1}),
1813    NewLog = trunc_wrap((get(log))#log{head = Head}),
1814    %% Just to remove all files with suffix > 1:
1815    NewLog2 = trunc_wrap(NewLog),
1816    NewHandle = (NewLog2#log.extra)#handle{noFull = 0, accFull = 0},
1817    do_change_size(NewLog2#log{extra = NewHandle, head = OldHead},
1818		   {MaxB, MaxF}).
1819
1820trunc_wrap(L) ->
1821    case do_inc_wrap_file(L) of
1822	{ok, L2, _Lost} ->
1823	    L2;
1824	{error, Error, _L2} ->
1825	    throw(Error)
1826    end.
1827
1828do_chunk(#log{format_type = halt_int, extra = Halt} = L, Pos, B, N) ->
1829    FdC = Halt#halt.fdc,
1830    {NewFdC, Reply} =
1831        case L#log.mode of
1832            read_only ->
1833                disk_log_1:chunk_read_only(FdC, L#log.filename, Pos, B, N);
1834            read_write ->
1835                disk_log_1:chunk(FdC, L#log.filename, Pos, B, N)
1836        end,
1837    put(log, L#log{extra = Halt#halt{fdc = NewFdC}}),
1838    Reply;
1839do_chunk(#log{format_type = wrap_int, mode = read_only,
1840	      extra = Handle} = Log, Pos, B, N) ->
1841    {NewHandle, Reply} = disk_log_1:mf_int_chunk_read_only(Handle, Pos, B, N),
1842    put(log, Log#log{extra = NewHandle}),
1843    Reply;
1844do_chunk(#log{format_type = wrap_int, extra = Handle} = Log, Pos, B, N) ->
1845    {NewHandle, Reply} = disk_log_1:mf_int_chunk(Handle, Pos, B, N),
1846    put(log, Log#log{extra = NewHandle}),
1847    Reply;
1848do_chunk(Log, _Pos, _B, _) ->
1849    {error, {format_external, Log#log.name}}.
1850
1851do_chunk_step(#log{format_type = wrap_int, extra = Handle}, Pos, N) ->
1852    disk_log_1:mf_int_chunk_step(Handle, Pos, N);
1853do_chunk_step(Log, _Pos, _N) ->
1854    {error, {not_internal_wrap, Log#log.name}}.
1855
1856%% Inlined.
1857replies(Pids, Reply) ->
1858    M = {disk_log, self(), Reply},
1859    send_reply(Pids, M).
1860
1861send_reply(Pid, M) when is_pid(Pid) ->
1862    Pid ! M,
1863    ok;
1864send_reply([Pid | Pids], M) ->
1865    Pid ! M,
1866    send_reply(Pids, M);
1867send_reply([], _M) ->
1868    ok.
1869
1870reply(To, Reply, S) ->
1871    To ! {disk_log, self(), Reply},
1872    loop(S).
1873
1874req(Log, R) ->
1875    case disk_log_server:get_log_pid(Log) of
1876	undefined ->
1877	    {error, no_such_log};
1878	Pid ->
1879	    monitor_request(Pid, R)
1880    end.
1881
1882monitor_request(Pid, Req) ->
1883    Ref = erlang:monitor(process, Pid),
1884    Pid ! {self(), Req},
1885    receive
1886	{'DOWN', Ref, process, Pid, _Info} ->
1887	    {error, no_such_log};
1888	{disk_log, Pid, Reply} when not is_tuple(Reply) orelse
1889                                    element(2, Reply) =/= disk_log_stopped ->
1890	    erlang:demonitor(Ref, [flush]),
1891	    Reply
1892    end.
1893
1894req2(Pid, R) ->
1895    monitor_request(Pid, R).
1896
1897merge_head(none, Head) ->
1898    Head;
1899merge_head(Head, _) ->
1900    Head.
1901
1902%% -> List of extensions of existing files (no dot included) | throw(FileError)
1903wrap_file_extensions(File) ->
1904    {_CurF, _CurFSz, _TotSz, NoOfFiles} =
1905	disk_log_1:read_index_file(File),
1906    Fs = if
1907	     NoOfFiles >= 1 ->
1908		 lists:seq(1, NoOfFiles);
1909	     NoOfFiles =:= 0 ->
1910		 []
1911	 end,
1912    Fun = fun(Ext) ->
1913		  case file:read_file_info(add_ext(File, Ext)) of
1914		      {ok, _} ->
1915			  true;
1916		      _Else ->
1917			  false
1918		  end
1919	  end,
1920    lists:filter(Fun, ["idx", "siz" | Fs]).
1921
1922add_ext(File, Ext) ->
1923    lists:concat([File, ".", Ext]).
1924
1925notify(Log, R) ->
1926    case disk_log_server:get_log_pid(Log) of
1927	undefined ->
1928	    {error, no_such_log};
1929	Pid ->
1930	    Pid ! R,
1931	    ok
1932    end.
1933
1934notify_owners_wrap([]) ->
1935    ok;
1936notify_owners_wrap([N | Wraps]) ->
1937    notify_owners({wrap, N}),
1938    notify_owners_wrap(Wraps).
1939
1940notify_owners(Note) ->
1941    L = get(log),
1942    Msg = {disk_log, node(), L#log.name, Note},
1943    lists:foreach(fun({Pid, true}) -> Pid ! Msg;
1944		     (_) -> ok
1945		  end, L#log.owners).
1946
1947cache_error(#state{cache_error=Error}=S, Pids) ->
1948    ok = replies(Pids, Error),
1949    state_err(S#state{cache_error = ok}, Error).
1950
1951state_ok(S) ->
1952    state_err(S, ok).
1953
1954-spec state_err(#state{}, dlog_state_error()) -> #state{}.
1955
1956state_err(S, Err) when S#state.error_status =:= Err -> S;
1957state_err(S, Err) ->
1958    notify_owners({error_status, Err}),
1959    S#state{error_status = Err}.
1960