1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 1997-2018. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20-module(disk_log).
21
22%% Efficient file based log - process part
23
24-export([start/0, istart_link/1,
25	 log/2, log_terms/2, blog/2, blog_terms/2,
26	 alog/2, alog_terms/2, balog/2, balog_terms/2,
27	 close/1, lclose/1, lclose/2, sync/1, open/1,
28	 truncate/1, truncate/2, btruncate/2,
29	 reopen/2, reopen/3, breopen/3, inc_wrap_file/1, change_size/2,
30	 change_notify/3, change_header/2,
31	 chunk/2, chunk/3, bchunk/2, bchunk/3, chunk_step/3, chunk_info/1,
32	 block/1, block/2, unblock/1, info/1, format_error/1,
33	 accessible_logs/0]).
34
35%% Internal exports
36-export([init/2, internal_open/2,
37	 system_continue/3, system_terminate/4, system_code_change/4]).
38
39%% To be used by disk_log_h.erl (not (yet) in Erlang/OTP) only.
40-export([ll_open/1, ll_close/1, do_log/2, do_sync/1, do_info/2]).
41
42%% To be used by wrap_log_reader only.
43-export([ichunk_end/2]).
44
45%% To be used for debugging only:
46-export([pid2name/1]).
47
48-export_type([continuation/0]).
49
50-type dlog_state_error() :: 'ok' | {'error', term()}.
51
52-record(state, {queue = [],
53		messages = [],
54		parent,
55		server,
56		cnt = 0           :: non_neg_integer(),
57		args,
58		error_status = ok :: dlog_state_error(),
59		cache_error = ok     %% cache write error after timeout
60	       }).
61
62-include("disk_log.hrl").
63
64-define(failure(Error, Function, Arg),
65	{{failed, Error}, [{?MODULE, Function, Arg}]}).
66
67%%-define(PROFILE(C), C).
68-define(PROFILE(C), void).
69
70-compile({inline,[{log_loop,6},{log_end_sync,2},{replies,2},{rflat,1}]}).
71
72%%%----------------------------------------------------------------------
73%%% Contract type specifications
74%%%----------------------------------------------------------------------
75
76-opaque continuation() :: #continuation{}.
77
78-type file_error()     :: term().  % XXX: refine
79-type invalid_header() :: term().  % XXX: refine
80
81%%%----------------------------------------------------------------------
82%%% API
83%%%----------------------------------------------------------------------
84
85%%-----------------------------------------------------------------
86%% This module implements the API, and the processes for each log.
87%% There is one process per log.
88%%-----------------------------------------------------------------
89
90-type open_error_rsn() :: 'no_such_log'
91                        | {'badarg', term()}
92                        | {'size_mismatch', CurrentSize :: dlog_size(),
93                           NewSize :: dlog_size()}
94                        | {'arg_mismatch', OptionName :: dlog_optattr(),
95                           CurrentValue :: term(), Value :: term()}
96                        | {'name_already_open', Log :: log()}
97                        | {'open_read_write', Log :: log()}
98                        | {'open_read_only', Log :: log()}
99                        | {'need_repair', Log :: log()}
100                        | {'not_a_log_file', FileName :: file:filename()}
101                        | {'invalid_index_file', FileName :: file:filename()}
102                        | {'invalid_header', invalid_header()}
103                        | {'file_error', file:filename(), file_error()}
104                        | {'node_already_open', Log :: log()}.
105-type dist_error_rsn() :: 'nodedown' | open_error_rsn().
106-type ret()            :: {'ok', Log :: log()}
107                        | {'repaired', Log :: log(),
108                           {'recovered', Rec :: non_neg_integer()},
109                           {'badbytes', Bad :: non_neg_integer()}}.
110-type open_ret()       :: ret() | {'error', open_error_rsn()}.
111-type dist_open_ret()  :: {[{node(), ret()}],
112			   [{node(), {'error', dist_error_rsn()}}]}.
113
114-spec open(ArgL) -> open_ret() | dist_open_ret() when
115      ArgL :: dlog_options().
116open(A) ->
117    disk_log_server:open(check_arg(A, #arg{options = A})).
118
119-type log_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()}
120                       | {'format_external', log()} | {'blocked_log', log()}
121                       | {'full', log()} | {'invalid_header', invalid_header()}
122                       | {'file_error', file:filename(), file_error()}.
123
124-spec log(Log, Term) -> ok | {error, Reason :: log_error_rsn()} when
125      Log :: log(),
126      Term :: term().
127log(Log, Term) ->
128    req(Log, {log, internal, [term_to_binary(Term)]}).
129
130-spec blog(Log, Bytes) -> ok | {error, Reason :: log_error_rsn()} when
131      Log :: log(),
132      Bytes :: iodata().
133blog(Log, Bytes) ->
134    req(Log, {log, external, [ensure_binary(Bytes)]}).
135
136-spec log_terms(Log, TermList) -> ok | {error, Resaon :: log_error_rsn()} when
137      Log :: log(),
138      TermList :: [term()].
139log_terms(Log, Terms) ->
140    Bs = terms2bins(Terms),
141    req(Log, {log, internal, Bs}).
142
143-spec blog_terms(Log, BytesList) ->
144                        ok | {error, Reason :: log_error_rsn()} when
145      Log :: log(),
146      BytesList :: [iodata()].
147blog_terms(Log, Bytess) ->
148    Bs = ensure_binary_list(Bytess),
149    req(Log, {log, external, Bs}).
150
151-type notify_ret() :: 'ok' | {'error', 'no_such_log'}.
152
153-spec alog(Log, Term) -> notify_ret() when
154      Log :: log(),
155      Term :: term().
156alog(Log, Term) ->
157    notify(Log, {alog, internal, [term_to_binary(Term)]}).
158
159-spec alog_terms(Log, TermList) -> notify_ret() when
160      Log :: log(),
161      TermList :: [term()].
162alog_terms(Log, Terms) ->
163    Bs = terms2bins(Terms),
164    notify(Log, {alog, internal, Bs}).
165
166-spec balog(Log, Bytes) -> notify_ret() when
167      Log :: log(),
168      Bytes :: iodata().
169balog(Log, Bytes) ->
170    notify(Log, {alog, external, [ensure_binary(Bytes)]}).
171
172-spec balog_terms(Log, ByteList) -> notify_ret() when
173      Log :: log(),
174      ByteList :: [iodata()].
175balog_terms(Log, Bytess) ->
176    Bs = ensure_binary_list(Bytess),
177    notify(Log, {alog, external, Bs}).
178
179-type close_error_rsn() ::'no_such_log' | 'nonode'
180                         | {'file_error', file:filename(), file_error()}.
181
182-spec close(Log) -> 'ok' | {'error', close_error_rsn()} when
183      Log :: log().
184close(Log) ->
185    req(Log, close).
186
187-type lclose_error_rsn() :: 'no_such_log'
188                          | {'file_error', file:filename(), file_error()}.
189
190-spec lclose(Log) -> 'ok' | {'error', lclose_error_rsn()} when
191      Log :: log().
192lclose(Log) ->
193    lclose(Log, node()).
194
195-spec lclose(Log, Node) -> 'ok' | {'error', lclose_error_rsn()} when
196      Log :: log(),
197      Node :: node().
198lclose(Log, Node) ->
199    lreq(Log, close, Node).
200
201-type trunc_error_rsn() :: 'no_such_log' | 'nonode'
202                         | {'read_only_mode', log()}
203                         | {'blocked_log', log()}
204                         | {'invalid_header', invalid_header()}
205                         | {'file_error', file:filename(), file_error()}.
206
207-spec truncate(Log) -> 'ok' | {'error', trunc_error_rsn()} when
208      Log :: log().
209truncate(Log) ->
210    req(Log, {truncate, none, truncate, 1}).
211
212-spec truncate(Log, Head) -> 'ok' | {'error', trunc_error_rsn()} when
213      Log :: log(),
214      Head :: term().
215truncate(Log, Head) ->
216    req(Log, {truncate, {ok, term_to_binary(Head)}, truncate, 2}).
217
218-spec btruncate(Log, BHead) -> 'ok' | {'error', trunc_error_rsn()} when
219      Log :: log(),
220      BHead :: iodata().
221btruncate(Log, Head) ->
222    req(Log, {truncate, {ok, ensure_binary(Head)}, btruncate, 2}).
223
224-type reopen_error_rsn() :: no_such_log
225                          | nonode
226                          | {read_only_mode, log()}
227                          | {blocked_log, log()}
228                          | {same_file_name, log()} |
229                            {invalid_index_file, file:filename()}
230                          | {invalid_header, invalid_header()}
231                          | {'file_error', file:filename(), file_error()}.
232
233-spec reopen(Log, File) -> 'ok' | {'error', reopen_error_rsn()} when
234      Log :: log(),
235      File :: file:filename().
236reopen(Log, NewFile) ->
237    req(Log, {reopen, NewFile, none, reopen, 2}).
238
239-spec reopen(Log, File, Head) -> 'ok' | {'error', reopen_error_rsn()} when
240      Log :: log(),
241      File :: file:filename(),
242      Head :: term().
243reopen(Log, NewFile, NewHead) ->
244    req(Log, {reopen, NewFile, {ok, term_to_binary(NewHead)}, reopen, 3}).
245
246-spec breopen(Log, File, BHead) -> 'ok' | {'error', reopen_error_rsn()} when
247      Log :: log(),
248      File :: file:filename(),
249      BHead :: iodata().
250breopen(Log, NewFile, NewHead) ->
251    req(Log, {reopen, NewFile, {ok, ensure_binary(NewHead)}, breopen, 3}).
252
253-type inc_wrap_error_rsn() :: 'no_such_log' | 'nonode'
254                            | {'read_only_mode', log()}
255                            | {'blocked_log', log()} | {'halt_log', log()}
256                            | {'invalid_header', invalid_header()}
257                            | {'file_error', file:filename(), file_error()}.
258
259-spec inc_wrap_file(Log) -> 'ok' | {'error', inc_wrap_error_rsn()} when
260      Log :: log().
261inc_wrap_file(Log) ->
262    req(Log, inc_wrap_file).
263
264-spec change_size(Log, Size) -> 'ok' | {'error', Reason} when
265      Log :: log(),
266      Size :: dlog_size(),
267      Reason :: no_such_log | nonode | {read_only_mode, Log}
268              | {blocked_log, Log}
269              | {new_size_too_small, Log, CurrentSize :: pos_integer()}
270              | {badarg, size}
271              | {file_error, file:filename(), file_error()}.
272change_size(Log, NewSize) ->
273    req(Log, {change_size, NewSize}).
274
275-spec change_notify(Log, Owner, Notify) -> 'ok' | {'error', Reason} when
276      Log :: log(),
277      Owner :: pid(),
278      Notify :: boolean(),
279      Reason :: no_such_log | nonode | {blocked_log, Log}
280              | {badarg, notify} | {not_owner, Owner}.
281change_notify(Log, Pid, NewNotify) ->
282    req(Log, {change_notify, Pid, NewNotify}).
283
284-spec change_header(Log, Header) -> 'ok' | {'error', Reason} when
285      Log :: log(),
286      Header :: {head, dlog_head_opt()}
287              | {head_func, MFA :: {atom(), atom(), list()}},
288      Reason :: no_such_log | nonode | {read_only_mode, Log}
289              | {blocked_log, Log} | {badarg, head}.
290change_header(Log, NewHead) ->
291    req(Log, {change_header, NewHead}).
292
293-type sync_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()}
294                        | {'blocked_log', log()}
295                        | {'file_error', file:filename(), file_error()}.
296
297-spec sync(Log) -> 'ok' | {'error', sync_error_rsn()} when
298      Log :: log().
299sync(Log) ->
300    req(Log, sync).
301
302-type block_error_rsn() :: 'no_such_log' | 'nonode' | {'blocked_log', log()}.
303
304-spec block(Log) -> 'ok' | {'error', block_error_rsn()} when
305      Log :: log().
306block(Log) ->
307    block(Log, true).
308
309-spec block(Log, QueueLogRecords) -> 'ok' | {'error', block_error_rsn()} when
310      Log :: log(),
311      QueueLogRecords :: boolean().
312block(Log, QueueLogRecords) ->
313    req(Log, {block, QueueLogRecords}).
314
315-type unblock_error_rsn() :: 'no_such_log' | 'nonode'
316                           | {'not_blocked', log()}
317                           | {'not_blocked_by_pid', log()}.
318
319-spec unblock(Log) -> 'ok' | {'error', unblock_error_rsn()} when
320      Log :: log().
321unblock(Log) ->
322    req(Log, unblock).
323
324-spec format_error(Error) -> io_lib:chars() when
325      Error :: term().
326format_error(Error) ->
327    do_format_error(Error).
328
329-type dlog_info() :: {name, Log :: log()}
330                   | {file, File :: file:filename()}
331                   | {type, Type :: dlog_type()}
332                   | {format, Format :: dlog_format()}
333                   | {size, Size :: dlog_size()}
334                   | {mode, Mode :: dlog_mode()}
335                   | {owners, [{pid(), Notify :: boolean()}]}
336                   | {users, Users :: non_neg_integer()}
337                   | {status, Status ::
338                        ok | {blocked, QueueLogRecords :: boolean()}}
339                   | {node, Node :: node()}
340                   | {distributed, Dist :: local | [node()]}
341                   | {head, Head :: none
342                                  | {head, term()}
343                                  | (MFA :: {atom(), atom(), list()})}
344                   | {no_written_items, NoWrittenItems ::non_neg_integer()}
345                   | {full, Full :: boolean}
346                   | {no_current_bytes, non_neg_integer()}
347                   | {no_current_items, non_neg_integer()}
348                   | {no_items, non_neg_integer()}
349                   | {current_file, pos_integer()}
350                   | {no_overflows, {SinceLogWasOpened :: non_neg_integer(),
351                                     SinceLastInfo :: non_neg_integer()}}.
352-spec info(Log) -> InfoList | {'error', no_such_log} when
353      Log :: log(),
354      InfoList :: [dlog_info()].
355info(Log) ->
356    sreq(Log, info).
357
358-spec pid2name(Pid) -> {'ok', Log} | 'undefined' when
359      Pid :: pid(),
360      Log :: log().
361pid2name(Pid) ->
362    disk_log_server:start(),
363    case ets:lookup(?DISK_LOG_PID_TABLE, Pid) of
364        [] -> undefined;
365        [{_Pid, Log}] -> {ok, Log}
366    end.
367
368%% This function Takes 3 args, a Log, a Continuation and N.
369%% It retuns a {Cont2, ObjList} | eof | {error, Reason}
370%% The initial continuation is the atom 'start'
371
372-type chunk_error_rsn() :: no_such_log
373                         | {format_external, log()}
374                         | {blocked_log, log()}
375                         | {badarg, continuation}
376                         | {not_internal_wrap, log()}
377                         | {corrupt_log_file, FileName :: file:filename()}
378                         | {file_error, file:filename(), file_error()}.
379
380-type chunk_ret() :: {Continuation2 :: continuation(), Terms :: [term()]}
381                   | {Continuation2 :: continuation(),
382                      Terms :: [term()],
383                      Badbytes :: non_neg_integer()}
384                   | eof
385                   | {error, Reason :: chunk_error_rsn()}.
386
387-spec chunk(Log, Continuation) -> chunk_ret() when
388      Log :: log(),
389      Continuation :: start | continuation().
390chunk(Log, Cont) ->
391    chunk(Log, Cont, infinity).
392
393-spec chunk(Log, Continuation, N) -> chunk_ret() when
394      Log :: log(),
395      Continuation :: start | continuation(),
396      N :: pos_integer() | infinity.
397chunk(Log, Cont, infinity) ->
398    %% There cannot be more than ?MAX_CHUNK_SIZE terms in a chunk.
399    ichunk(Log, Cont, ?MAX_CHUNK_SIZE);
400chunk(Log, Cont, N) when is_integer(N), N > 0 ->
401    ichunk(Log, Cont, N).
402
403ichunk(Log, start, N) ->
404    R = sreq(Log, {chunk, 0, [], N}),
405    ichunk_end(R, Log);
406ichunk(Log, More, N) when is_record(More, continuation) ->
407    R = req2(More#continuation.pid,
408	     {chunk, More#continuation.pos, More#continuation.b, N}),
409    ichunk_end(R, Log);
410ichunk(_Log, _, _) ->
411    {error, {badarg, continuation}}.
412
413ichunk_end({C, R}, Log) when is_record(C, continuation) ->
414    ichunk_end(R, read_write, Log, C, 0);
415ichunk_end({C, R, Bad}, Log) when is_record(C, continuation) ->
416    ichunk_end(R, read_only, Log, C, Bad);
417ichunk_end(R, _Log) ->
418    R.
419
420%% Create the terms on the client's heap, not the server's.
421%% The list of binaries is reversed.
422ichunk_end(R, Mode, Log, C, Bad) ->
423    case catch bins2terms(R, []) of
424	{'EXIT', _} ->
425	    RR = lists:reverse(R),
426	    ichunk_bad_end(RR, Mode, Log, C, Bad, []);
427	Ts when Bad > 0 ->
428	    {C, Ts, Bad};
429	Ts when Bad =:= 0 ->
430	    {C, Ts}
431    end.
432
433bins2terms([], L) ->
434    L;
435bins2terms([B | Bs], L) ->
436    bins2terms(Bs, [binary_to_term(B) | L]).
437
438ichunk_bad_end([B | Bs], Mode, Log, C, Bad, A) ->
439    case catch binary_to_term(B) of
440	{'EXIT', _} when read_write =:= Mode ->
441	    InfoList = info(Log),
442	    {value, {file, FileName}} = lists:keysearch(file, 1, InfoList),
443            File = case C#continuation.pos of
444		       Pos when is_integer(Pos) -> FileName; % halt log
445		       {FileNo, _} -> add_ext(FileName, FileNo) % wrap log
446		   end,
447	    {error, {corrupt_log_file, File}};
448	{'EXIT', _} when read_only =:= Mode ->
449	    Reread = lists:foldl(fun(Bin, Sz) ->
450                                         Sz + byte_size(Bin) + ?HEADERSZ
451                                 end, 0, Bs),
452	    NewPos = case C#continuation.pos of
453			 Pos when is_integer(Pos) -> Pos-Reread;
454			 {FileNo, Pos} -> {FileNo, Pos-Reread}
455		     end,
456	    NewBad = Bad + byte_size(B),
457	    {C#continuation{pos = NewPos, b = []}, lists:reverse(A), NewBad};
458	T ->
459	    ichunk_bad_end(Bs, Mode, Log, C, Bad, [T | A])
460    end.
461
462-type bchunk_ret() :: {Continuation2 :: continuation(),
463                       Binaries :: [binary()]}
464                    | {Continuation2 :: continuation(),
465                       Binaries :: [binary()],
466                       Badbytes :: non_neg_integer()}
467                    | eof
468                    | {error, Reason :: chunk_error_rsn()}.
469
470-spec bchunk(Log, Continuation) -> bchunk_ret() when
471      Log :: log(),
472      Continuation :: start | continuation().
473bchunk(Log, Cont) ->
474    bchunk(Log, Cont, infinity).
475
476-spec bchunk(Log, Continuation, N) -> bchunk_ret() when
477      Log :: log(),
478      Continuation :: start | continuation(),
479      N :: pos_integer() | infinity.
480bchunk(Log, Cont, infinity) ->
481    %% There cannot be more than ?MAX_CHUNK_SIZE terms in a chunk.
482    bichunk(Log, Cont, ?MAX_CHUNK_SIZE);
483bchunk(Log, Cont, N) when is_integer(N), N > 0 ->
484    bichunk(Log, Cont, N).
485
486bichunk(Log, start, N) ->
487    R = sreq(Log, {chunk, 0, [], N}),
488    bichunk_end(R);
489bichunk(_Log, #continuation{pid = Pid, pos = Pos, b = B}, N) ->
490    R = req2(Pid, {chunk, Pos, B, N}),
491    bichunk_end(R);
492bichunk(_Log, _, _) ->
493    {error, {badarg, continuation}}.
494
495bichunk_end({C = #continuation{}, R}) ->
496    {C, lists:reverse(R)};
497bichunk_end({C = #continuation{}, R, Bad}) ->
498    {C, lists:reverse(R), Bad};
499bichunk_end(R) ->
500    R.
501
502-spec chunk_step(Log, Continuation, Step) ->
503                        {'ok', any()} | {'error', Reason} when
504      Log :: log(),
505      Continuation :: start | continuation(),
506      Step :: integer(),
507      Reason :: no_such_log | end_of_log | {format_external, Log}
508              | {blocked_log, Log}  | {badarg, continuation}
509              | {file_error, file:filename(), file_error()}.
510chunk_step(Log, Cont, N) when is_integer(N) ->
511    ichunk_step(Log, Cont, N).
512
513ichunk_step(Log, start, N) ->
514    sreq(Log, {chunk_step, 0, N});
515ichunk_step(_Log, More, N) when is_record(More, continuation) ->
516    req2(More#continuation.pid, {chunk_step, More#continuation.pos, N});
517ichunk_step(_Log, _, _) ->
518    {error, {badarg, continuation}}.
519
520-spec chunk_info(Continuation) -> InfoList | {error, Reason} when
521      Continuation :: continuation(),
522      InfoList :: [{node, Node :: node()}, ...],
523      Reason :: {no_continuation, Continuation}.
524chunk_info(More = #continuation{}) ->
525   [{node, node(More#continuation.pid)}];
526chunk_info(BadCont) ->
527   {error, {no_continuation, BadCont}}.
528
529-spec accessible_logs() -> {[LocalLog], [DistributedLog]} when
530      LocalLog :: log(),
531      DistributedLog :: log().
532accessible_logs() ->
533    disk_log_server:accessible_logs().
534
535istart_link(Server) ->
536    {ok, proc_lib:spawn_link(disk_log, init, [self(), Server])}.
537
538%% Only for backwards compatibility, could probably be removed.
539-spec start() -> 'ok'.
540start() ->
541    disk_log_server:start().
542
543internal_open(Pid, A) ->
544    req2(Pid, {internal_open, A}).
545
546%%% ll_open() and ll_close() are used by disk_log_h.erl, a module not
547%%% (yet) in Erlang/OTP.
548
549%% -spec ll_open(dlog_options()) -> {'ok', Res :: _, #log{}, Cnt :: _} | Error.
550ll_open(A) ->
551    case check_arg(A, #arg{options = A}) of
552	{ok, L} -> do_open(L);
553	Error -> Error
554    end.
555
556%% -> closed | throw(Error)
557ll_close(Log) ->
558    close_disk_log2(Log).
559
560check_arg([], Res) ->
561    Ret = case Res#arg.head of
562	      none ->
563		  {ok, Res};
564	      _ ->
565		  case check_head(Res#arg.head, Res#arg.format) of
566		      {ok, Head} ->
567			  {ok, Res#arg{head = Head}};
568		      Error ->
569			  Error
570		  end
571	  end,
572
573    if  %% check result
574	Res#arg.name =:= 0 ->
575	    {error, {badarg, name}};
576	Res#arg.file =:= none ->
577	    case catch lists:concat([Res#arg.name, ".LOG"]) of
578		{'EXIT',_} -> {error, {badarg, file}};
579		FName ->  check_arg([], Res#arg{file = FName})
580	    end;
581	Res#arg.repair =:= truncate, Res#arg.mode =:= read_only ->
582	    {error, {badarg, repair_read_only}};
583	Res#arg.type =:= halt, is_tuple(Res#arg.size) ->
584	    {error, {badarg, size}};
585	Res#arg.type =:= wrap ->
586	    {OldSize, Version} =
587		disk_log_1:read_size_file_version(Res#arg.file),
588	    check_wrap_arg(Ret, OldSize, Version);
589	true ->
590	    Ret
591    end;
592check_arg([{file, F} | Tail], Res) when is_list(F) ->
593    check_arg(Tail, Res#arg{file = F});
594check_arg([{file, F} | Tail], Res) when is_atom(F) ->
595    check_arg(Tail, Res#arg{file = F});
596check_arg([{linkto, Pid} |Tail], Res) when is_pid(Pid) ->
597    check_arg(Tail, Res#arg{linkto = Pid});
598check_arg([{linkto, none} |Tail], Res) ->
599    check_arg(Tail, Res#arg{linkto = none});
600check_arg([{name, Name}|Tail], Res) ->
601    check_arg(Tail, Res#arg{name = Name});
602check_arg([{repair, true}|Tail], Res) ->
603    check_arg(Tail, Res#arg{repair = true});
604check_arg([{repair, false}|Tail], Res) ->
605    check_arg(Tail, Res#arg{repair = false});
606check_arg([{repair, truncate}|Tail], Res) ->
607    check_arg(Tail, Res#arg{repair = truncate});
608check_arg([{size, Int}|Tail], Res) when is_integer(Int), Int > 0 ->
609    check_arg(Tail, Res#arg{size = Int});
610check_arg([{size, infinity}|Tail], Res) ->
611    check_arg(Tail, Res#arg{size = infinity});
612check_arg([{size, {MaxB,MaxF}}|Tail], Res) when is_integer(MaxB),
613                                                is_integer(MaxF),
614						MaxB > 0, MaxB =< ?MAX_BYTES,
615						MaxF > 0, MaxF < ?MAX_FILES ->
616    check_arg(Tail, Res#arg{size = {MaxB, MaxF}});
617check_arg([{type, wrap}|Tail], Res) ->
618    check_arg(Tail, Res#arg{type = wrap});
619check_arg([{type, halt}|Tail], Res) ->
620    check_arg(Tail, Res#arg{type = halt});
621check_arg([{format, internal}|Tail], Res) ->
622    check_arg(Tail, Res#arg{format = internal});
623check_arg([{format, external}|Tail], Res) ->
624    check_arg(Tail, Res#arg{format = external});
625check_arg([{distributed, []}|Tail], Res) ->
626    check_arg(Tail, Res#arg{distributed = false});
627check_arg([{distributed, Nodes}|Tail], Res) when is_list(Nodes) ->
628    check_arg(Tail, Res#arg{distributed = {true, Nodes}});
629check_arg([{notify, true}|Tail], Res) ->
630    check_arg(Tail, Res#arg{notify = true});
631check_arg([{notify, false}|Tail], Res) ->
632    check_arg(Tail, Res#arg{notify = false});
633check_arg([{head_func, HeadFunc}|Tail], Res)  ->
634    check_arg(Tail, Res#arg{head = {head_func, HeadFunc}});
635check_arg([{head, Term}|Tail], Res) ->
636    check_arg(Tail, Res#arg{head = {head, Term}});
637check_arg([{mode, read_only}|Tail], Res) ->
638    check_arg(Tail, Res#arg{mode = read_only});
639check_arg([{mode, read_write}|Tail], Res) ->
640    check_arg(Tail, Res#arg{mode = read_write});
641check_arg([{quiet, Boolean}|Tail], Res) when is_boolean(Boolean) ->
642    check_arg(Tail, Res#arg{quiet = Boolean});
643check_arg(Arg, _) ->
644    {error, {badarg, Arg}}.
645
646check_wrap_arg({ok, Res}, {0,0}, _Version) when Res#arg.size =:= infinity ->
647    {error, {badarg, size}};
648check_wrap_arg({ok, Res}, OldSize, Version) when Res#arg.size =:= infinity ->
649    NewRes = Res#arg{size = OldSize},
650    check_wrap_arg({ok, NewRes}, OldSize, Version);
651check_wrap_arg({ok, Res}, {0,0}, Version) ->
652    {ok, Res#arg{version = Version}};
653check_wrap_arg({ok, Res}, OldSize, Version) when OldSize =:= Res#arg.size ->
654    {ok, Res#arg{version = Version}};
655check_wrap_arg({ok, Res}, _OldSize, Version) when Res#arg.repair =:= truncate,
656						  is_tuple(Res#arg.size) ->
657    {ok, Res#arg{version = Version}};
658check_wrap_arg({ok, Res}, OldSize, _Version) when is_tuple(Res#arg.size) ->
659    {error, {size_mismatch, OldSize, Res#arg.size}};
660check_wrap_arg({ok, _Res}, _OldSize, _Version) ->
661    {error, {badarg, size}};
662check_wrap_arg(Ret, _OldSize, _Version) ->
663    Ret.
664
665%%%-----------------------------------------------------------------
666%%% Server functions
667%%%-----------------------------------------------------------------
668init(Parent, Server) ->
669    ?PROFILE(ep:do()),
670    process_flag(trap_exit, true),
671    loop(#state{parent = Parent, server = Server}).
672
673loop(#state{messages = []}=State) ->
674    receive
675	Message ->
676	    handle(Message, State)
677    end;
678loop(#state{messages = [M | Ms]}=State) ->
679    handle(M, State#state{messages = Ms}).
680
681handle({From, write_cache}, S) when From =:= self() ->
682    case catch do_write_cache(get(log)) of
683        ok ->
684            loop(S);
685        Error ->
686	    loop(S#state{cache_error = Error})
687    end;
688handle({From, {log, Format, B}}=Message, S) ->
689    case get(log) of
690	#log{mode = read_only}=L ->
691	    reply(From, {error, {read_only_mode, L#log.name}}, S);
692	#log{status = ok, format=external}=L when Format =:= internal ->
693	    reply(From, {error, {format_external, L#log.name}}, S);
694	#log{status = ok, format=LogFormat} ->
695	    log_loop(S, From, [B], [], iolist_size(B), LogFormat);
696	#log{status = {blocked, false}}=L ->
697	    reply(From, {error, {blocked_log, L#log.name}}, S);
698	#log{blocked_by = From}=L ->
699	    reply(From, {error, {blocked_log, L#log.name}}, S);
700	_ ->
701	    enqueue(Message, S)
702    end;
703handle({alog, Format, B}=Message, S) ->
704    case get(log) of
705	#log{mode = read_only} ->
706	    notify_owners({read_only,B}),
707	    loop(S);
708	#log{status = ok, format = external} when Format =:= internal ->
709	    notify_owners({format_external, B}),
710	    loop(S);
711	#log{status = ok, format=LogFormat} ->
712	    log_loop(S, [], [B], [], iolist_size(B), LogFormat);
713	#log{status = {blocked, false}} ->
714	    notify_owners({blocked_log, B}),
715	    loop(S);
716	_ ->
717	    enqueue(Message, S)
718    end;
719handle({From, {block, QueueLogRecs}}=Message, S) ->
720    case get(log) of
721	#log{status = ok}=L ->
722	    do_block(From, QueueLogRecs, L),
723	    reply(From, ok, S);
724	#log{status = {blocked, false}}=L ->
725	    reply(From, {error, {blocked_log, L#log.name}}, S);
726	#log{blocked_by = From}=L ->
727	    reply(From, {error, {blocked_log, L#log.name}}, S);
728	_ ->
729	    enqueue(Message, S)
730    end;
731handle({From, unblock}, S) ->
732    case get(log) of
733	#log{status = ok}=L ->
734	    reply(From, {error, {not_blocked, L#log.name}}, S);
735	#log{blocked_by = From}=L ->
736	    S2 = do_unblock(L, S),
737	    reply(From, ok, S2);
738	L ->
739	    reply(From, {error, {not_blocked_by_pid, L#log.name}}, S)
740    end;
741handle({From, sync}=Message, S) ->
742    case get(log) of
743	#log{mode = read_only}=L ->
744	    reply(From, {error, {read_only_mode, L#log.name}}, S);
745	#log{status = ok, format=LogFormat} ->
746            log_loop(S, [], [], [From], 0, LogFormat);
747	#log{status = {blocked, false}}=L ->
748	    reply(From, {error, {blocked_log, L#log.name}}, S);
749	#log{blocked_by = From}=L ->
750	    reply(From, {error, {blocked_log, L#log.name}}, S);
751	_ ->
752	    enqueue(Message, S)
753    end;
754handle({From, {truncate, Head, F, A}}=Message, S) ->
755    case get(log) of
756	#log{mode = read_only}=L ->
757	    reply(From, {error, {read_only_mode, L#log.name}}, S);
758	#log{status = ok} when S#state.cache_error =/= ok ->
759	    loop(cache_error(S, [From]));
760	#log{status = ok}=L ->
761	    H = merge_head(Head, L#log.head),
762	    case catch do_trunc(L, H) of
763		ok ->
764		    erase(is_full),
765		    notify_owners({truncated, S#state.cnt}),
766		    N = if Head =:= none -> 0; true -> 1 end,
767		    reply(From, ok, (state_ok(S))#state{cnt = N});
768		Error ->
769		    do_exit(S, From, Error, ?failure(Error, F, A))
770	    end;
771	#log{status = {blocked, false}}=L ->
772	    reply(From, {error, {blocked_log, L#log.name}}, S);
773	#log{blocked_by = From}=L ->
774	    reply(From, {error, {blocked_log, L#log.name}}, S);
775	_ ->
776	    enqueue(Message, S)
777    end;
778handle({From, {chunk, Pos, B, N}}=Message,  S) ->
779    case get(log) of
780	#log{status = ok} when S#state.cache_error =/= ok ->
781	    loop(cache_error(S, [From]));
782	#log{status = ok}=L ->
783	    R = do_chunk(L, Pos, B, N),
784	    reply(From, R, S);
785	#log{blocked_by = From}=L ->
786	    R = do_chunk(L, Pos, B, N),
787	    reply(From, R, S);
788	#log{status = {blocked, false}}=L ->
789	    reply(From, {error, {blocked_log, L#log.name}}, S);
790	_L ->
791	    enqueue(Message, S)
792    end;
793handle({From, {chunk_step, Pos, N}}=Message,  S) ->
794    case get(log) of
795	#log{status = ok} when S#state.cache_error =/= ok ->
796	    loop(cache_error(S, [From]));
797	#log{status = ok}=L ->
798	    R = do_chunk_step(L, Pos, N),
799	    reply(From, R, S);
800	#log{blocked_by = From}=L ->
801	    R = do_chunk_step(L, Pos, N),
802	    reply(From, R, S);
803	#log{status = {blocked, false}}=L ->
804	    reply(From, {error, {blocked_log, L#log.name}}, S);
805	_ ->
806	    enqueue(Message, S)
807    end;
808handle({From, {change_notify, Pid, NewNotify}}=Message, S) ->
809    case get(log) of
810	#log{status = ok}=L ->
811	    case do_change_notify(L, Pid, NewNotify) of
812		{ok, L1} ->
813		    put(log, L1),
814		    reply(From, ok, S);
815		Error ->
816		    reply(From, Error, S)
817	    end;
818	#log{status = {blocked, false}}=L ->
819	    reply(From, {error, {blocked_log, L#log.name}}, S);
820	#log{blocked_by = From}=L ->
821	    reply(From, {error, {blocked_log, L#log.name}}, S);
822	_ ->
823	    enqueue(Message, S)
824    end;
825handle({From, {change_header, NewHead}}=Message, S) ->
826    case get(log) of
827	#log{mode = read_only}=L ->
828	    reply(From, {error, {read_only_mode, L#log.name}}, S);
829	#log{status = ok, format = Format}=L ->
830	    case check_head(NewHead, Format) of
831		{ok, Head} ->
832		    put(log, L#log{head = mk_head(Head, Format)}),
833		    reply(From, ok, S);
834		Error ->
835		    reply(From, Error, S)
836	    end;
837	#log{status = {blocked, false}}=L ->
838	    reply(From, {error, {blocked_log, L#log.name}}, S);
839	#log{blocked_by = From}=L ->
840	    reply(From, {error, {blocked_log, L#log.name}}, S);
841	_ ->
842	    enqueue(Message, S)
843    end;
844handle({From, {change_size, NewSize}}=Message, S) ->
845    case get(log) of
846	#log{mode = read_only}=L ->
847	    reply(From, {error, {read_only_mode, L#log.name}}, S);
848	#log{status = ok}=L ->
849	    case check_size(L#log.type, NewSize) of
850		ok ->
851		    case catch do_change_size(L, NewSize) of % does the put
852			ok ->
853			    reply(From, ok, S);
854			{big, CurSize} ->
855			    reply(From,
856				  {error,
857				   {new_size_too_small, L#log.name, CurSize}},
858				  S);
859			Else ->
860			    reply(From, Else, state_err(S, Else))
861		    end;
862		not_ok ->
863		    reply(From, {error, {badarg, size}}, S)
864	    end;
865	#log{status = {blocked, false}}=L ->
866	    reply(From, {error, {blocked_log, L#log.name}}, S);
867	#log{blocked_by = From}=L ->
868	    reply(From, {error, {blocked_log, L#log.name}}, S);
869	_ ->
870	    enqueue(Message, S)
871    end;
872handle({From, inc_wrap_file}=Message, S) ->
873    case get(log) of
874	#log{mode = read_only}=L ->
875	    reply(From, {error, {read_only_mode, L#log.name}}, S);
876	#log{type = halt}=L ->
877	    reply(From, {error, {halt_log, L#log.name}}, S);
878	#log{status = ok} when S#state.cache_error =/= ok ->
879	    loop(cache_error(S, [From]));
880	#log{status = ok}=L ->
881	    case catch do_inc_wrap_file(L) of
882		{ok, L2, Lost} ->
883		    put(log, L2),
884		    notify_owners({wrap, Lost}),
885		    reply(From, ok, S#state{cnt = S#state.cnt-Lost});
886		{error, Error, L2} ->
887		    put(log, L2),
888		    reply(From, Error, state_err(S, Error))
889	    end;
890	#log{status = {blocked, false}}=L ->
891	    reply(From, {error, {blocked_log, L#log.name}}, S);
892	#log{blocked_by = From}=L ->
893	    reply(From, {error, {blocked_log, L#log.name}}, S);
894	_ ->
895	    enqueue(Message, S)
896    end;
897handle({From, {reopen, NewFile, Head, F, A}}, S) ->
898    case get(log) of
899	#log{mode = read_only}=L ->
900	    reply(From, {error, {read_only_mode, L#log.name}}, S);
901	#log{status = ok} when S#state.cache_error =/= ok ->
902	    loop(cache_error(S, [From]));
903	#log{status = ok, filename = NewFile}=L ->
904	    reply(From, {error, {same_file_name, L#log.name}}, S);
905	#log{status = ok}=L ->
906	    case catch close_disk_log2(L) of
907		closed ->
908		    File = L#log.filename,
909		    case catch rename_file(File, NewFile, L#log.type) of
910			ok ->
911			    H = merge_head(Head, L#log.head),
912			    case do_open((S#state.args)#arg{name = L#log.name,
913							    repair = truncate,
914							    head = H,
915							    file = File}) of
916				{ok, Res, L2, Cnt} ->
917				    put(log, L2#log{owners = L#log.owners,
918						    head = L#log.head,
919						    users = L#log.users}),
920				    notify_owners({truncated, S#state.cnt}),
921				    erase(is_full),
922				    case Res of
923					{error, _} ->
924					    do_exit(S, From, Res,
925						    ?failure(Res, F, A));
926					_ ->
927					    reply(From, ok, S#state{cnt = Cnt})
928				    end;
929				Res ->
930				    do_exit(S, From, Res, ?failure(Res, F, A))
931			    end;
932			Error ->
933			    do_exit(S, From, Error, ?failure(Error, reopen, 2))
934		    end;
935		Error ->
936		    do_exit(S, From, Error, ?failure(Error, F, A))
937	    end;
938	L ->
939	    reply(From, {error, {blocked_log, L#log.name}}, S)
940    end;
941handle({Server, {internal_open, A}}, S) ->
942    case get(log) of
943	undefined ->
944	    case do_open(A) of % does the put
945		{ok, Res, L, Cnt} ->
946		    put(log, opening_pid(A#arg.linkto, A#arg.notify, L)),
947		    reply(Server, Res, S#state{args=A, cnt=Cnt});
948		Res ->
949		    do_fast_exit(S, Server, Res)
950	    end;
951	L ->
952	    TestH = mk_head(A#arg.head, A#arg.format),
953	    case compare_arg(A#arg.options, S#state.args, TestH, L#log.head) of
954		ok ->
955		    case add_pid(A#arg.linkto, A#arg.notify, L) of
956			{ok, L1} ->
957			    put(log, L1),
958			    reply(Server, {ok, L#log.name}, S);
959			Error ->
960			    reply(Server, Error, S)
961		    end;
962		Error ->
963		    reply(Server, Error, S)
964	    end
965    end;
966handle({From, close}, S) ->
967    case do_close(From, S) of
968	{stop, S1} ->
969	    do_exit(S1, From, ok, normal);
970	{continue, S1} ->
971	    reply(From, ok, S1)
972    end;
973handle({From, info}, S) ->
974    reply(From, do_info(get(log), S#state.cnt), S);
975handle({'EXIT', From, Reason}, #state{parent=From}=S) ->
976    %% Parent orders shutdown.
977    _ = do_stop(S),
978    exit(Reason);
979handle({'EXIT', From, Reason}, #state{server=From}=S) ->
980    %% The server is gone.
981    _ = do_stop(S),
982    exit(Reason);
983handle({'EXIT', From, _Reason}, S) ->
984    L = get(log),
985    case is_owner(From, L) of
986	{true, _Notify} ->
987	    case close_owner(From, L, S) of
988		{stop, S1} ->
989		    _ = do_stop(S1),
990		    exit(normal);
991		{continue, S1} ->
992		    loop(S1)
993	    end;
994	false ->
995	    %% 'users' is not decremented.
996	    S1 = do_unblock(From, get(log), S),
997	    loop(S1)
998    end;
999handle({system, From, Req}, S) ->
1000    sys:handle_system_msg(Req, From, S#state.parent, ?MODULE, [], S);
1001handle(_, S) ->
1002    loop(S).
1003
1004enqueue(Message, #state{queue = Queue}=S) ->
1005    loop(S#state{queue = [Message | Queue]}).
1006
1007%% Collect further log and sync requests already in the mailbox or queued
1008
1009-define(MAX_LOOK_AHEAD, 64*1024).
1010
1011%% Inlined.
1012log_loop(#state{cache_error = CE}=S, Pids, _Bins, _Sync, _Sz, _F) when CE =/= ok ->
1013    loop(cache_error(S, Pids));
1014log_loop(#state{}=S, Pids, Bins, Sync, Sz, _F) when Sz > ?MAX_LOOK_AHEAD ->
1015    loop(log_end(S, Pids, Bins, Sync, Sz));
1016log_loop(#state{messages = []}=S, Pids, Bins, Sync, Sz, F) ->
1017    receive
1018	Message ->
1019            log_loop(Message, Pids, Bins, Sync, Sz, F, S)
1020    after 0 ->
1021	    loop(log_end(S, Pids, Bins, Sync, Sz))
1022    end;
1023log_loop(#state{messages = [M | Ms]}=S, Pids, Bins, Sync, Sz, F) ->
1024    S1 = S#state{messages = Ms},
1025    log_loop(M, Pids, Bins, Sync, Sz, F, S1).
1026
1027%% Items logged after the last sync request found are sync:ed as well.
1028log_loop({alog, internal, B}, Pids, Bins, Sync, Sz, internal=F, S) ->
1029    %% alog of terms allowed for the internal format only
1030    log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B), F);
1031log_loop({alog, binary, B}, Pids, Bins, Sync, Sz, F, S) ->
1032    log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B), F);
1033log_loop({From, {log, internal, B}}, Pids, Bins, Sync, Sz, internal=F, S) ->
1034    %% log of terms allowed for the internal format only
1035    log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B), F);
1036log_loop({From, {log, binary, B}}, Pids, Bins, Sync, Sz, F, S) ->
1037    log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B), F);
1038log_loop({From, sync}, Pids, Bins, Sync, Sz, F, S) ->
1039    log_loop(S, Pids, Bins, [From | Sync], Sz, F);
1040log_loop(Message, Pids, Bins, Sync, Sz, _F, S) ->
1041    NS = log_end(S, Pids, Bins, Sync, Sz),
1042    handle(Message, NS).
1043
1044log_end(S, [], [], Sync, _Sz) ->
1045    log_end_sync(S, Sync);
1046log_end(#state{cnt = Cnt}=S, Pids, Bins, Sync, Sz) ->
1047    case do_log(get(log), rflat(Bins), Sz) of
1048	N when is_integer(N) ->
1049	    ok = replies(Pids, ok),
1050	    S1 = (state_ok(S))#state{cnt = Cnt + N},
1051	    log_end_sync(S1, Sync);
1052        {error, {error, {full, _Name}}, N} when Pids =:= [] ->
1053            log_end_sync(state_ok(S#state{cnt = Cnt + N}), Sync);
1054	{error, Error, N} ->
1055	    ok = replies(Pids, Error),
1056	    state_err(S#state{cnt = Cnt + N}, Error)
1057    end.
1058
1059%% Inlined.
1060log_end_sync(S, []) ->
1061    S;
1062log_end_sync(S, Sync) ->
1063    Res = do_sync(get(log)),
1064    ok = replies(Sync, Res),
1065    state_err(S, Res).
1066
1067%% Inlined.
1068rflat([B]) -> B;
1069rflat(B) -> rflat(B, []).
1070
1071rflat([B | Bs], L) ->
1072    rflat(Bs, B ++ L);
1073rflat([], L) -> L.
1074
1075%% -> {ok, Log} | {error, Error}
1076do_change_notify(L, Pid, Notify) ->
1077    case is_owner(Pid, L) of
1078	{true, Notify} ->
1079	    {ok, L};
1080	{true, _OldNotify} when Notify =/= true, Notify =/= false ->
1081	    {error, {badarg, notify}};
1082	{true, _OldNotify} ->
1083	    Owners = lists:keydelete(Pid, 1, L#log.owners),
1084	    L1 = L#log{owners = [{Pid, Notify} | Owners]},
1085	    {ok, L1};
1086	false ->
1087	    {error, {not_owner, Pid}}
1088    end.
1089
1090%% -> {stop, S} | {continue, S}
1091do_close(Pid, S) ->
1092    L = get(log),
1093    case is_owner(Pid, L) of
1094	{true, _Notify} ->
1095	    close_owner(Pid, L, S);
1096	false ->
1097	    close_user(Pid, L, S)
1098    end.
1099
1100%% -> {stop, S} | {continue, S}
1101close_owner(Pid, L, S) ->
1102    L1 = L#log{owners = lists:keydelete(Pid, 1, L#log.owners)},
1103    put(log, L1),
1104    S2 = do_unblock(Pid, get(log), S),
1105    unlink(Pid),
1106    do_close2(L1, S2).
1107
1108%% -> {stop, S} | {continue, S}
1109close_user(Pid, #log{users=Users}=L, S) when Users > 0 ->
1110    L1 = L#log{users = Users - 1},
1111    put(log, L1),
1112    S2 = do_unblock(Pid, get(log), S),
1113    do_close2(L1, S2);
1114close_user(_Pid, _L, S) ->
1115    {continue, S}.
1116
1117do_close2(#log{users = 0, owners = []}, S) ->
1118    {stop, S};
1119do_close2(_L, S) ->
1120    {continue, S}.
1121
1122%%-----------------------------------------------------------------
1123%% Callback functions for system messages handling.
1124%%-----------------------------------------------------------------
1125system_continue(_Parent, _, State) ->
1126    loop(State).
1127
1128-spec system_terminate(_, _, _, #state{}) -> no_return().
1129system_terminate(Reason, _Parent, _, State) ->
1130    _ = do_stop(State),
1131    exit(Reason).
1132
1133%%-----------------------------------------------------------------
1134%% Temporay code for upgrade.
1135%%-----------------------------------------------------------------
1136system_code_change(State, _Module, _OldVsn, _Extra) ->
1137    {ok, State}.
1138
1139
1140%%%----------------------------------------------------------------------
1141%%% Internal functions
1142%%%----------------------------------------------------------------------
1143-spec do_exit(#state{}, pid(), _, _) -> no_return().
1144do_exit(S, From, Message0, Reason) ->
1145    R = do_stop(S),
1146    Message = case S#state.cache_error of
1147		  Err when Err =/= ok -> Err;
1148		  _ when R =:= closed -> Message0;
1149		  _ when Message0 =:= ok -> R;
1150		  _ -> Message0
1151	      end,
1152    _ = disk_log_server:close(self()),
1153    ok = replies(From, Message),
1154    ?PROFILE(ep:done()),
1155    exit(Reason).
1156
1157-spec do_fast_exit(#state{}, pid(), _) -> no_return().
1158do_fast_exit(S, Server, Message) ->
1159    _ = do_stop(S),
1160    Server ! {disk_log, self(), Message},
1161    exit(normal).
1162
1163%% -> closed | Error
1164do_stop(S) ->
1165    proc_q(S#state.queue ++ S#state.messages),
1166    close_disk_log(get(log)).
1167
1168proc_q([{From, _R}|Tail]) when is_pid(From) ->
1169    From ! {disk_log, self(), {error, disk_log_stopped}},
1170    proc_q(Tail);
1171proc_q([_|T]) -> %% async stuff
1172    proc_q(T);
1173proc_q([]) ->
1174    ok.
1175
1176%% -> log()
1177opening_pid(Pid, Notify, L) ->
1178    {ok, L1} = add_pid(Pid, Notify, L),
1179    L1.
1180
1181%% -> {ok, log()} | Error
1182add_pid(Pid, Notify, L) when is_pid(Pid) ->
1183    case is_owner(Pid, L) of
1184	false ->
1185            link(Pid),
1186	    {ok, L#log{owners = [{Pid, Notify} | L#log.owners]}};
1187	{true, Notify}  ->
1188%%	    {error, {pid_already_connected, L#log.name}};
1189	    {ok, L};
1190	{true, CurNotify} when Notify =/= CurNotify ->
1191	    {error, {arg_mismatch, notify, CurNotify, Notify}}
1192    end;
1193add_pid(_NotAPid, _Notify, L) ->
1194    {ok, L#log{users = L#log.users + 1}}.
1195
1196unblock_pid(#log{blocked_by = none}) ->
1197    ok;
1198unblock_pid(#log{blocked_by = Pid}=L) ->
1199    case is_owner(Pid, L) of
1200	{true, _Notify} ->
1201	    ok;
1202	false ->
1203	    unlink(Pid)
1204    end.
1205
1206%% -> true | false
1207is_owner(Pid, L) ->
1208    case lists:keysearch(Pid, 1, L#log.owners) of
1209	{value, {_Pid, Notify}} ->
1210	    {true, Notify};
1211	false ->
1212	    false
1213    end.
1214
1215%% ok | throw(Error)
1216rename_file(File, NewFile, halt) ->
1217    case file:rename(File, NewFile) of
1218        ok ->
1219            ok;
1220        Else ->
1221            file_error(NewFile, Else)
1222    end;
1223rename_file(File, NewFile, wrap) ->
1224    rename_file(wrap_file_extensions(File), File, NewFile, ok).
1225
1226rename_file([Ext|Exts], File, NewFile0, Res) ->
1227    NewFile = add_ext(NewFile0, Ext),
1228    NRes = case file:rename(add_ext(File, Ext), NewFile) of
1229	       ok ->
1230		   Res;
1231	       Else ->
1232		   file_error(NewFile, Else)
1233	   end,
1234    rename_file(Exts, File, NewFile0, NRes);
1235rename_file([], _File, _NewFiles, Res) -> Res.
1236
1237file_error(FileName, {error, Error}) ->
1238    {error, {file_error, FileName, Error}}.
1239
1240%% "Old" error messages have been kept, arg_mismatch has been added.
1241%%-spec compare_arg(dlog_options(), #arg{},
1242compare_arg([], _A, none, _OrigHead) ->
1243    % no header option given
1244    ok;
1245compare_arg([], _A, Head, OrigHead) when Head =/= OrigHead ->
1246    {error, {arg_mismatch, head, OrigHead, Head}};
1247compare_arg([], _A, _Head, _OrigHead) ->
1248    ok;
1249compare_arg([{Attr, Val} | Tail], A, Head, OrigHead) ->
1250    case compare_arg(Attr, Val, A) of
1251	{not_ok, OrigVal} ->
1252	    {error, {arg_mismatch, Attr, OrigVal, Val}};
1253	ok ->
1254	    compare_arg(Tail, A, Head, OrigHead);
1255	Error ->
1256	    Error
1257    end.
1258
1259-spec compare_arg(atom(), _, #arg{}) ->
1260	     'ok' | {'not_ok', _} | {'error', {atom(), _}}.
1261compare_arg(file, F, A) when F =/= A#arg.file ->
1262    {error, {name_already_open, A#arg.name}};
1263compare_arg(mode, read_only, A) when A#arg.mode =:= read_write ->
1264    {error, {open_read_write, A#arg.name}};
1265compare_arg(mode, read_write, A) when A#arg.mode =:= read_only ->
1266    {error, {open_read_only, A#arg.name}};
1267compare_arg(type, T, A) when T =/= A#arg.type ->
1268    {not_ok, A#arg.type};
1269compare_arg(format, F, A) when F =/= A#arg.format ->
1270    {not_ok, A#arg.format};
1271compare_arg(repair, R, A) when R =/= A#arg.repair ->
1272    %% not used, but check it anyway...
1273    {not_ok, A#arg.repair};
1274compare_arg(_Attr, _Val, _A) ->
1275    ok.
1276
1277%% -> {ok, Res, log(), Cnt} | Error
1278do_open(A) ->
1279    #arg{type = Type, format = Format, name = Name, head = Head0,
1280         file = FName, repair = Repair, size = Size, mode = Mode,
1281         quiet = Quiet, version = V} = A,
1282    disk_log_1:set_quiet(Quiet),
1283    Head = mk_head(Head0, Format),
1284    case do_open2(Type, Format, Name, FName, Repair, Size, Mode, Head, V) of
1285        {ok, Ret, Extra, FormatType, NoItems} ->
1286            L = #log{name = Name, type = Type, format = Format,
1287                     filename = FName, size = Size,
1288                     format_type = FormatType, head = Head, mode = Mode,
1289                     version = V, extra = Extra},
1290            {ok, Ret, L, NoItems};
1291        Error ->
1292            Error
1293    end.
1294
1295mk_head({head, Term}, internal) -> {ok, term_to_binary(Term)};
1296mk_head({head, Bytes}, external) -> {ok, ensure_binary(Bytes)};
1297mk_head(H, _) -> H.
1298
1299terms2bins([T | Ts]) ->
1300    [term_to_binary(T) | terms2bins(Ts)];
1301terms2bins([]) ->
1302    [].
1303
1304ensure_binary_list(Bs) ->
1305    ensure_binary_list(Bs, Bs).
1306
1307ensure_binary_list([B | Bs], Bs0) when is_binary(B) ->
1308    ensure_binary_list(Bs, Bs0);
1309ensure_binary_list([], Bs0) ->
1310    Bs0;
1311ensure_binary_list(_, Bs0) ->
1312    make_binary_list(Bs0).
1313
1314make_binary_list([B | Bs]) ->
1315    [ensure_binary(B) | make_binary_list(Bs)];
1316make_binary_list([]) ->
1317    [].
1318
1319ensure_binary(Bytes) ->
1320    iolist_to_binary(Bytes).
1321
1322%%-----------------------------------------------------------------
1323%% Change size of the logs in runtime.
1324%%-----------------------------------------------------------------
1325%% -> ok | {big, CurSize} | throw(Error)
1326do_change_size(#log{type = halt}=L, NewSize) ->
1327    Halt = L#log.extra,
1328    CurB = Halt#halt.curB,
1329    NewLog = L#log{extra = Halt#halt{size = NewSize}},
1330    if
1331	NewSize =:= infinity ->
1332	    erase(is_full),
1333	    put(log, NewLog),
1334	    ok;
1335	CurB =< NewSize ->
1336	    erase(is_full),
1337	    put(log, NewLog),
1338	    ok;
1339	true ->
1340	    {big, CurB}
1341    end;
1342do_change_size(#log{type = wrap}=L, NewSize) ->
1343    #log{extra = Extra, version = Version} = L,
1344    {ok, Handle} = disk_log_1:change_size_wrap(Extra, NewSize, Version),
1345    erase(is_full),
1346    put(log, L#log{extra = Handle}),
1347    ok.
1348
1349%% -> {ok, Head} | Error; Head = none | {head, H} | {M,F,A}
1350check_head({head, none}, _Format) ->
1351    {ok, none};
1352check_head({head_func, {M, F, A}}, _Format) when is_atom(M),
1353                                                 is_atom(F),
1354                                                 is_list(A) ->
1355    {ok, {M, F, A}};
1356check_head({head, Head}, external) ->
1357    case catch ensure_binary(Head) of
1358	{'EXIT', _} ->
1359	    {error, {badarg, head}};
1360	_ ->
1361	    {ok, {head, Head}}
1362    end;
1363check_head({head, Term}, internal) ->
1364    {ok, {head, Term}};
1365check_head(_Head, _Format) ->
1366    {error, {badarg, head}}.
1367
1368check_size(wrap, {NewMaxB,NewMaxF}) when
1369  is_integer(NewMaxB), is_integer(NewMaxF),
1370  NewMaxB > 0, NewMaxB =< ?MAX_BYTES, NewMaxF > 0, NewMaxF < ?MAX_FILES ->
1371    ok;
1372check_size(halt, NewSize) when is_integer(NewSize), NewSize > 0 ->
1373    ok;
1374check_size(halt, infinity) ->
1375    ok;
1376check_size(_, _) ->
1377    not_ok.
1378
1379%%-----------------------------------------------------------------
1380%% Increment a wrap log.
1381%%-----------------------------------------------------------------
1382%% -> {ok, log(), Lost} | {error, Error, log()}
1383do_inc_wrap_file(L) ->
1384    #log{format = Format, extra = Handle} = L,
1385    case Format of
1386	internal ->
1387	    case disk_log_1:mf_int_inc(Handle, L#log.head) of
1388		{ok, Handle2, Lost} ->
1389		    {ok, L#log{extra = Handle2}, Lost};
1390		{error, Error, Handle2} ->
1391		    {error, Error, L#log{extra = Handle2}}
1392	    end;
1393	external ->
1394	    case disk_log_1:mf_ext_inc(Handle, L#log.head) of
1395		{ok, Handle2, Lost} ->
1396		    {ok, L#log{extra = Handle2}, Lost};
1397		{error, Error, Handle2} ->
1398		    {error, Error, L#log{extra = Handle2}}
1399	    end
1400    end.
1401
1402
1403%%-----------------------------------------------------------------
1404%% Open a log file.
1405%%-----------------------------------------------------------------
1406%% -> {ok, Reply, log(), Cnt} | Error
1407%% Note: the header is always written, even if the log size is too small.
1408do_open2(halt, internal, Name, FName, Repair, Size, Mode, Head, _V) ->
1409    case catch disk_log_1:int_open(FName, Repair, Mode, Head) of
1410	{ok, {_Alloc, FdC, {NoItems, _NoBytes}, FileSize}} ->
1411            Halt = #halt{fdc = FdC, curB = FileSize, size = Size},
1412	    {ok, {ok, Name}, Halt, halt_int, NoItems};
1413	{repaired, FdC, Rec, Bad, FileSize} ->
1414            Halt = #halt{fdc = FdC, curB = FileSize, size = Size},
1415	    {ok, {repaired, Name, {recovered, Rec}, {badbytes, Bad}},
1416             Halt, halt_int, Rec};
1417	Error ->
1418	    Error
1419    end;
1420do_open2(wrap, internal, Name, FName, Repair, Size, Mode, Head, V) ->
1421    {MaxB, MaxF} = Size,
1422    case catch
1423      disk_log_1:mf_int_open(FName, MaxB, MaxF, Repair, Mode, Head, V) of
1424	{ok, Handle, Cnt} ->
1425	    {ok, {ok, Name}, Handle, wrap_int, Cnt};
1426	{repaired, Handle, Rec, Bad, Cnt} ->
1427	    {ok, {repaired, Name, {recovered, Rec}, {badbytes, Bad}},
1428	     Handle, wrap_int, Cnt};
1429	Error ->
1430	    Error
1431    end;
1432do_open2(halt, external, Name, FName, Repair, Size, Mode, Head, _V) ->
1433    case catch disk_log_1:ext_open(FName, Repair, Mode, Head) of
1434	{ok, {_Alloc, FdC, {NoItems, _NoBytes}, FileSize}} ->
1435            Halt = #halt{fdc = FdC, curB = FileSize, size = Size},
1436	    {ok, {ok, Name}, Halt, halt_ext, NoItems};
1437	Error ->
1438	    Error
1439    end;
1440do_open2(wrap, external, Name, FName, Repair, Size, Mode, Head, V) ->
1441    {MaxB, MaxF} = Size,
1442    case catch
1443      disk_log_1:mf_ext_open(FName, MaxB, MaxF, Repair, Mode, Head, V) of
1444	{ok, Handle, Cnt} ->
1445	    {ok, {ok, Name}, Handle, wrap_ext, Cnt};
1446	Error ->
1447	    Error
1448    end.
1449
1450%% -> closed | Error
1451close_disk_log(undefined) ->
1452    closed;
1453close_disk_log(L) ->
1454    unblock_pid(L),
1455    F = fun({Pid, _}) ->
1456		unlink(Pid)
1457	end,
1458    lists:foreach(F, L#log.owners),
1459    R = (catch close_disk_log2(L)),
1460    erase(log),
1461    R.
1462
1463-spec close_disk_log2(#log{}) -> 'closed'. % | throw(Error)
1464
1465close_disk_log2(L) ->
1466    case L of
1467	#log{format_type = halt_int, mode = Mode, extra = Halt} ->
1468	    disk_log_1:close(Halt#halt.fdc, L#log.filename, Mode);
1469	#log{format_type = wrap_int, mode = Mode, extra = Handle} ->
1470	    disk_log_1:mf_int_close(Handle, Mode);
1471	#log{format_type = halt_ext, extra = Halt} ->
1472	    disk_log_1:fclose(Halt#halt.fdc, L#log.filename);
1473	#log{format_type = wrap_ext, mode = Mode, extra = Handle} ->
1474	    disk_log_1:mf_ext_close(Handle, Mode)
1475    end,
1476    closed.
1477
1478do_format_error({error, Module, Error}) ->
1479    Module:format_error(Error);
1480do_format_error({error, Reason}) ->
1481    do_format_error(Reason);
1482do_format_error({Node, Error = {error, _Reason}}) ->
1483    lists:append(io_lib:format("~p: ", [Node]), do_format_error(Error));
1484do_format_error({badarg, Arg}) ->
1485    io_lib:format("The argument ~p is missing, not recognized or "
1486		  "not wellformed~n", [Arg]);
1487do_format_error({size_mismatch, OldSize, ArgSize}) ->
1488    io_lib:format("The given size ~p does not match the size ~p found on "
1489		  "the disk log size file~n", [ArgSize, OldSize]);
1490do_format_error({read_only_mode, Log}) ->
1491    io_lib:format("The disk log ~tp has been opened read-only, but the "
1492		  "requested operation needs read-write access~n", [Log]);
1493do_format_error({format_external, Log}) ->
1494    io_lib:format("The requested operation can only be applied on internally "
1495		  "formatted disk logs, but ~tp is externally formatted~n",
1496		  [Log]);
1497do_format_error({blocked_log, Log}) ->
1498    io_lib:format("The blocked disk log ~tp does not queue requests, or "
1499		  "the log has been blocked by the calling process~n", [Log]);
1500do_format_error({full, Log}) ->
1501    io_lib:format("The halt log ~tp is full~n", [Log]);
1502do_format_error({not_blocked, Log}) ->
1503    io_lib:format("The disk log ~tp is not blocked~n", [Log]);
1504do_format_error({not_owner, Pid}) ->
1505    io_lib:format("The pid ~tp is not an owner of the disk log~n", [Pid]);
1506do_format_error({not_blocked_by_pid, Log}) ->
1507    io_lib:format("The disk log ~tp is blocked, but only the blocking pid "
1508		  "can unblock a disk log~n", [Log]);
1509do_format_error({new_size_too_small, Log, CurrentSize}) ->
1510    io_lib:format("The current size ~p of the halt log ~tp is greater than the "
1511		  "requested new size~n", [CurrentSize, Log]);
1512do_format_error({halt_log, Log}) ->
1513    io_lib:format("The halt log ~tp cannot be wrapped~n", [Log]);
1514do_format_error({same_file_name, Log}) ->
1515    io_lib:format("Current and new file name of the disk log ~tp "
1516		  "are the same~n", [Log]);
1517do_format_error({arg_mismatch, Option, FirstValue, ArgValue}) ->
1518    io_lib:format("The value ~tp of the disk log option ~p does not match "
1519		  "the current value ~tp~n", [ArgValue, Option, FirstValue]);
1520do_format_error({name_already_open, Log}) ->
1521    io_lib:format("The disk log ~tp has already opened another file~n", [Log]);
1522do_format_error({node_already_open, Log}) ->
1523    io_lib:format("The distribution option of the disk log ~tp does not match "
1524		  "already open log~n", [Log]);
1525do_format_error({open_read_write, Log}) ->
1526    io_lib:format("The disk log ~tp has already been opened read-write~n",
1527		  [Log]);
1528do_format_error({open_read_only, Log}) ->
1529    io_lib:format("The disk log ~tp has already been opened read-only~n",
1530		  [Log]);
1531do_format_error({not_internal_wrap, Log}) ->
1532    io_lib:format("The requested operation cannot be applied since ~tp is not "
1533		  "an internally formatted disk log~n", [Log]);
1534do_format_error(no_such_log) ->
1535    io_lib:format("There is no disk log with the given name~n", []);
1536do_format_error(nonode) ->
1537    io_lib:format("There seems to be no node up that can handle "
1538		  "the request~n", []);
1539do_format_error(nodedown) ->
1540    io_lib:format("There seems to be no node up that can handle "
1541		  "the request~n", []);
1542do_format_error({corrupt_log_file, FileName}) ->
1543    io_lib:format("The disk log file \"~ts\" contains corrupt data~n",
1544                  [FileName]);
1545do_format_error({need_repair, FileName}) ->
1546    io_lib:format("The disk log file \"~ts\" has not been closed properly and "
1547		  "needs repair~n", [FileName]);
1548do_format_error({not_a_log_file, FileName}) ->
1549    io_lib:format("The file \"~ts\" is not a wrap log file~n", [FileName]);
1550do_format_error({invalid_header, InvalidHeader}) ->
1551    io_lib:format("The disk log header is not wellformed: ~p~n",
1552		  [InvalidHeader]);
1553do_format_error(end_of_log) ->
1554    io_lib:format("An attempt was made to step outside a not yet "
1555		  "full wrap log~n", []);
1556do_format_error({invalid_index_file, FileName}) ->
1557    io_lib:format("The wrap log index file \"~ts\" cannot be used~n",
1558		  [FileName]);
1559do_format_error({no_continuation, BadCont}) ->
1560    io_lib:format("The term ~p is not a chunk continuation~n", [BadCont]);
1561do_format_error({file_error, FileName, Reason}) ->
1562    io_lib:format("\"~ts\": ~tp~n", [FileName, file:format_error(Reason)]);
1563do_format_error(E) ->
1564    io_lib:format("~tp~n", [E]).
1565
1566do_info(L, Cnt) ->
1567    #log{name = Name, type = Type, mode = Mode, filename = File,
1568	 extra = Extra, status = Status, owners = Owners, users = Users,
1569	 format = Format, head = Head} = L,
1570    Size = case Type of
1571	       wrap ->
1572		   disk_log_1:get_wrap_size(Extra);
1573	       halt ->
1574		   Extra#halt.size
1575	   end,
1576    Distribution =
1577	case disk_log_server:get_log_pids(Name) of
1578	    {local, _Pid} ->
1579		local;
1580	    {distributed, Pids} ->
1581                [node(P) || P <- Pids];
1582	    undefined -> % "cannot happen"
1583		[]
1584	end,
1585    RW = case Type of
1586	     wrap when Mode =:= read_write ->
1587		 #handle{curB = CurB, curF = CurF,
1588			 cur_cnt = CurCnt, acc_cnt = AccCnt,
1589			 noFull = NoFull, accFull = AccFull} = Extra,
1590		 NewAccFull = AccFull + NoFull,
1591		 NewExtra = Extra#handle{noFull = 0, accFull = NewAccFull},
1592		 put(log, L#log{extra = NewExtra}),
1593		 [{no_current_bytes, CurB},
1594		  {no_current_items, CurCnt},
1595		  {no_items, Cnt},
1596		  {no_written_items, CurCnt + AccCnt},
1597		  {current_file, CurF},
1598		  {no_overflows, {NewAccFull, NoFull}}
1599		 ];
1600	     halt when Mode =:= read_write ->
1601		 IsFull = case get(is_full) of
1602			      undefined -> false;
1603			      _ -> true
1604			  end,
1605		 [{full, IsFull},
1606		  {no_written_items, Cnt}
1607		 ];
1608	     _ when Mode =:= read_only ->
1609		 []
1610	 end,
1611    HeadL = case Mode of
1612		read_write ->
1613		    [{head, Head}];
1614		read_only ->
1615		    []
1616	    end,
1617    Common = [{name, Name},
1618	      {file, File},
1619	      {type, Type},
1620	      {format, Format},
1621	      {size, Size},
1622	      {items, Cnt}, % kept for "backward compatibility" (undocumented)
1623	      {owners, Owners},
1624	      {users, Users}] ++
1625	     HeadL ++
1626	     [{mode, Mode},
1627	      {status, Status},
1628	      {node, node()},
1629	      {distributed, Distribution}
1630	     ],
1631    Common ++ RW.
1632
1633do_block(Pid, QueueLogRecs, L) ->
1634    L2 = L#log{status = {blocked, QueueLogRecs}, blocked_by = Pid},
1635    put(log, L2),
1636    case is_owner(Pid, L2) of
1637	{true, _Notify} ->
1638	    ok;
1639	false ->
1640	    link(Pid)
1641    end.
1642
1643do_unblock(Pid, #log{blocked_by = Pid}=L, S) ->
1644    do_unblock(L, S);
1645do_unblock(_Pid, _L, S) ->
1646    S.
1647
1648do_unblock(L, S) ->
1649    unblock_pid(L),
1650    L2 = L#log{blocked_by = none, status = ok},
1651    put(log, L2),
1652    %% Since the block request is synchronous, and the blocking
1653    %% process is the only process that can unblock, all requests in
1654    %% 'messages' will have been put in 'queue' before the unblock
1655    %% request is granted.
1656    [] = S#state.messages, % assertion
1657    S#state{queue = [], messages = lists:reverse(S#state.queue)}.
1658
1659-spec do_log(#log{}, [binary()]) -> integer() | {'error', _, integer()}.
1660
1661do_log(L, B) ->
1662    do_log(L, B, iolist_size(B)).
1663
1664do_log(#log{type = halt}=L, B, BSz) ->
1665    #log{format = Format, extra = Halt} = L,
1666    #halt{curB = CurSize, size = Sz} = Halt,
1667    {Bs, BSize} = logl(B, Format, BSz),
1668    case get(is_full) of
1669	true ->
1670            {error, {error, {full, L#log.name}}, 0};
1671	undefined when Sz =:= infinity; CurSize + BSize =< Sz ->
1672	    halt_write(Halt, L, B, Bs, BSize);
1673	undefined ->
1674	    halt_write_full(L, B, Format, 0)
1675    end;
1676do_log(#log{format_type = wrap_int}=L, B, _BSz) ->
1677    case disk_log_1:mf_int_log(L#log.extra, B, L#log.head) of
1678	{ok, Handle, Logged, Lost, Wraps} ->
1679	    notify_owners_wrap(Wraps),
1680	    put(log, L#log{extra = Handle}),
1681	    Logged - Lost;
1682	{ok, Handle, Logged} ->
1683	    put(log, L#log{extra = Handle}),
1684	    Logged;
1685	{error, Error, Handle, Logged, Lost} ->
1686	    put(log, L#log{extra = Handle}),
1687	    {error, Error, Logged - Lost}
1688    end;
1689do_log(#log{format_type = wrap_ext}=L, B, _BSz) ->
1690    case disk_log_1:mf_ext_log(L#log.extra, B, L#log.head) of
1691	{ok, Handle, Logged, Lost, Wraps} ->
1692	    notify_owners_wrap(Wraps),
1693	    put(log, L#log{extra = Handle}),
1694	    Logged - Lost;
1695	{ok, Handle, Logged} ->
1696	    put(log, L#log{extra = Handle}),
1697	    Logged;
1698	{error, Error, Handle, Logged, Lost} ->
1699	    put(log, L#log{extra = Handle}),
1700	    {error, Error, Logged - Lost}
1701    end.
1702
1703logl(B, external, undefined) ->
1704    {B, iolist_size(B)};
1705logl(B, external, Sz) ->
1706    {B, Sz};
1707logl(B, internal, _Sz) ->
1708    disk_log_1:logl(B).
1709
1710halt_write_full(L, [Bin | Bins], Format, N) ->
1711    B = [Bin],
1712    {Bs, BSize} = logl(B, Format, undefined),
1713    Halt = L#log.extra,
1714    #halt{curB = CurSize, size = Sz} = Halt,
1715    if
1716	CurSize + BSize =< Sz ->
1717	    case halt_write(Halt, L, B, Bs, BSize) of
1718		N1 when is_integer(N1) ->
1719		    halt_write_full(get(log), Bins, Format, N+N1);
1720		Error ->
1721		    Error
1722	    end;
1723	true ->
1724	    halt_write_full(L, [], Format, N)
1725    end;
1726halt_write_full(L, _Bs, _Format, N) ->
1727    put(is_full, true),
1728    notify_owners(full),
1729    {error, {error, {full, L#log.name}}, N}.
1730
1731halt_write(Halt, L, B, Bs, BSize) ->
1732    case disk_log_1:fwrite(Halt#halt.fdc, L#log.filename, Bs, BSize) of
1733	{ok, NewFdC} ->
1734	    NCurB = Halt#halt.curB + BSize,
1735	    NewHalt = Halt#halt{fdc = NewFdC, curB = NCurB},
1736	    put(log, L#log{extra = NewHalt}),
1737	    length(B);
1738	{Error, NewFdC} ->
1739	    put(log, L#log{extra = Halt#halt{fdc = NewFdC}}),
1740	    {error, Error, 0}
1741    end.
1742
1743%% -> ok | Error
1744do_write_cache(#log{filename = FName, type = halt, extra = Halt} = Log) ->
1745    {Reply, NewFdC} = disk_log_1:write_cache(Halt#halt.fdc, FName),
1746    put(log, Log#log{extra = Halt#halt{fdc = NewFdC}}),
1747    Reply;
1748do_write_cache(#log{type = wrap, extra = Handle} = Log) ->
1749    {Reply, NewHandle} = disk_log_1:mf_write_cache(Handle),
1750    put(log, Log#log{extra = NewHandle}),
1751    Reply.
1752
1753%% -> ok | Error
1754do_sync(#log{filename = FName, type = halt, extra = Halt} = Log) ->
1755    {Reply, NewFdC} = disk_log_1:sync(Halt#halt.fdc, FName),
1756    put(log, Log#log{extra = Halt#halt{fdc = NewFdC}}),
1757    Reply;
1758do_sync(#log{type = wrap, extra = Handle} = Log) ->
1759    {Reply, NewHandle} = disk_log_1:mf_sync(Handle),
1760    put(log, Log#log{extra = NewHandle}),
1761    Reply.
1762
1763%% -> ok | Error | throw(Error)
1764do_trunc(#log{type = halt}=L, Head) ->
1765    #log{filename = FName, extra = Halt} = L,
1766    FdC = Halt#halt.fdc,
1767    {Reply1, FdC2} =
1768        case L#log.format of
1769            internal ->
1770                disk_log_1:truncate(FdC, FName, Head);
1771            external ->
1772		case disk_log_1:truncate_at(FdC, FName, bof) of
1773		    {ok, NFdC} when Head =:= none ->
1774                        {ok, NFdC};
1775		    {ok, NFdC} ->
1776			{ok, H} = Head,
1777                        disk_log_1:fwrite(NFdC, FName, H, byte_size(H));
1778		    R ->
1779			R
1780		end
1781        end,
1782    {Reply, NewHalt} =
1783    case disk_log_1:position(FdC2, FName, cur) of
1784	{ok, NewFdC, FileSize} when Reply1 =:= ok ->
1785	    {ok, Halt#halt{fdc = NewFdC, curB = FileSize}};
1786	{Reply2, NewFdC} ->
1787	    {Reply2, Halt#halt{fdc = NewFdC}};
1788	{ok, NewFdC, _} ->
1789	    {Reply1, Halt#halt{fdc = NewFdC}}
1790    end,
1791    put(log, L#log{extra = NewHalt}),
1792    Reply;
1793do_trunc(#log{type = wrap}=L, Head) ->
1794    Handle = L#log.extra,
1795    OldHead = L#log.head,
1796    {MaxB, MaxF} = disk_log_1:get_wrap_size(Handle),
1797    ok = do_change_size(L, {MaxB, 1}),
1798    NewLog = trunc_wrap((get(log))#log{head = Head}),
1799    %% Just to remove all files with suffix > 1:
1800    NewLog2 = trunc_wrap(NewLog),
1801    NewHandle = (NewLog2#log.extra)#handle{noFull = 0, accFull = 0},
1802    do_change_size(NewLog2#log{extra = NewHandle, head = OldHead},
1803		   {MaxB, MaxF}).
1804
1805trunc_wrap(L) ->
1806    case do_inc_wrap_file(L) of
1807	{ok, L2, _Lost} ->
1808	    L2;
1809	{error, Error, _L2} ->
1810	    throw(Error)
1811    end.
1812
1813do_chunk(#log{format_type = halt_int, extra = Halt} = L, Pos, B, N) ->
1814    FdC = Halt#halt.fdc,
1815    {NewFdC, Reply} =
1816        case L#log.mode of
1817            read_only ->
1818                disk_log_1:chunk_read_only(FdC, L#log.filename, Pos, B, N);
1819            read_write ->
1820                disk_log_1:chunk(FdC, L#log.filename, Pos, B, N)
1821        end,
1822    put(log, L#log{extra = Halt#halt{fdc = NewFdC}}),
1823    Reply;
1824do_chunk(#log{format_type = wrap_int, mode = read_only,
1825	      extra = Handle} = Log, Pos, B, N) ->
1826    {NewHandle, Reply} = disk_log_1:mf_int_chunk_read_only(Handle, Pos, B, N),
1827    put(log, Log#log{extra = NewHandle}),
1828    Reply;
1829do_chunk(#log{format_type = wrap_int, extra = Handle} = Log, Pos, B, N) ->
1830    {NewHandle, Reply} = disk_log_1:mf_int_chunk(Handle, Pos, B, N),
1831    put(log, Log#log{extra = NewHandle}),
1832    Reply;
1833do_chunk(Log, _Pos, _B, _) ->
1834    {error, {format_external, Log#log.name}}.
1835
1836do_chunk_step(#log{format_type = wrap_int, extra = Handle}, Pos, N) ->
1837    disk_log_1:mf_int_chunk_step(Handle, Pos, N);
1838do_chunk_step(Log, _Pos, _N) ->
1839    {error, {not_internal_wrap, Log#log.name}}.
1840
1841%% Inlined.
1842replies(Pids, Reply) ->
1843    M = {disk_log, self(), Reply},
1844    send_reply(Pids, M).
1845
1846send_reply(Pid, M) when is_pid(Pid) ->
1847    Pid ! M,
1848    ok;
1849send_reply([Pid | Pids], M) ->
1850    Pid ! M,
1851    send_reply(Pids, M);
1852send_reply([], _M) ->
1853    ok.
1854
1855reply(To, Reply, S) ->
1856    To ! {disk_log, self(), Reply},
1857    loop(S).
1858
1859req(Log, R) ->
1860    case disk_log_server:get_log_pids(Log) of
1861	{local, Pid} ->
1862	    monitor_request(Pid, R);
1863	undefined ->
1864	    {error, no_such_log};
1865	{distributed, Pids} ->
1866	    multi_req({self(), R}, Pids)
1867    end.
1868
1869multi_req(Msg, Pids) ->
1870    Refs =
1871	lists:map(fun(Pid) ->
1872			  Ref = erlang:monitor(process, Pid),
1873			  Pid ! Msg,
1874			  {Pid, Ref}
1875		  end, Pids),
1876    lists:foldl(fun({Pid, Ref}, Reply) ->
1877			receive
1878			    {'DOWN', Ref, process, Pid, _Info} ->
1879				Reply;
1880			    {disk_log, Pid, _Reply} ->
1881				erlang:demonitor(Ref, [flush]),
1882				ok
1883			end
1884		end, {error, nonode}, Refs).
1885
1886sreq(Log, R) ->
1887    case nearby_pid(Log, node()) of
1888	undefined ->
1889	    {error, no_such_log};
1890	Pid ->
1891	    monitor_request(Pid, R)
1892    end.
1893
1894%% Local req - always talk to log on Node
1895lreq(Log, R, Node) ->
1896    case nearby_pid(Log, Node) of
1897	Pid when is_pid(Pid), node(Pid) =:= Node ->
1898	    monitor_request(Pid, R);
1899	_Else ->
1900	    {error, no_such_log}
1901    end.
1902
1903nearby_pid(Log, Node) ->
1904    case disk_log_server:get_log_pids(Log) of
1905	undefined ->
1906	    undefined;
1907	{local, Pid} ->
1908	    Pid;
1909	{distributed, Pids} ->
1910	    get_near_pid(Pids, Node)
1911    end.
1912
1913-spec get_near_pid([pid(),...], node()) -> pid().
1914
1915get_near_pid([Pid | _], Node) when node(Pid) =:= Node -> Pid;
1916get_near_pid([Pid], _ ) -> Pid;
1917get_near_pid([_ | T], Node) -> get_near_pid(T, Node).
1918
1919monitor_request(Pid, Req) ->
1920    Ref = erlang:monitor(process, Pid),
1921    Pid ! {self(), Req},
1922    receive
1923	{'DOWN', Ref, process, Pid, _Info} ->
1924	    {error, no_such_log};
1925	{disk_log, Pid, Reply} when not is_tuple(Reply) orelse
1926                                    element(2, Reply) =/= disk_log_stopped ->
1927	    erlang:demonitor(Ref, [flush]),
1928	    Reply
1929    end.
1930
1931req2(Pid, R) ->
1932    monitor_request(Pid, R).
1933
1934merge_head(none, Head) ->
1935    Head;
1936merge_head(Head, _) ->
1937    Head.
1938
1939%% -> List of extensions of existing files (no dot included) | throw(FileError)
1940wrap_file_extensions(File) ->
1941    {_CurF, _CurFSz, _TotSz, NoOfFiles} =
1942	disk_log_1:read_index_file(File),
1943    Fs = if
1944	     NoOfFiles >= 1 ->
1945		 lists:seq(1, NoOfFiles);
1946	     NoOfFiles =:= 0 ->
1947		 []
1948	 end,
1949    Fun = fun(Ext) ->
1950		  case file:read_file_info(add_ext(File, Ext)) of
1951		      {ok, _} ->
1952			  true;
1953		      _Else ->
1954			  false
1955		  end
1956	  end,
1957    lists:filter(Fun, ["idx", "siz" | Fs]).
1958
1959add_ext(File, Ext) ->
1960    lists:concat([File, ".", Ext]).
1961
1962notify(Log, R) ->
1963    case disk_log_server:get_log_pids(Log) of
1964	undefined ->
1965	    {error, no_such_log};
1966	{local, Pid} ->
1967	    Pid ! R,
1968	    ok;
1969	{distributed, Pids} ->
1970	    lists:foreach(fun(Pid) -> Pid ! R end, Pids),
1971	    ok
1972    end.
1973
1974notify_owners_wrap([]) ->
1975    ok;
1976notify_owners_wrap([N | Wraps]) ->
1977    notify_owners({wrap, N}),
1978    notify_owners_wrap(Wraps).
1979
1980notify_owners(Note) ->
1981    L = get(log),
1982    Msg = {disk_log, node(), L#log.name, Note},
1983    lists:foreach(fun({Pid, true}) -> Pid ! Msg;
1984		     (_) -> ok
1985		  end, L#log.owners).
1986
1987cache_error(#state{cache_error=Error}=S, Pids) ->
1988    ok = replies(Pids, Error),
1989    state_err(S#state{cache_error = ok}, Error).
1990
1991state_ok(S) ->
1992    state_err(S, ok).
1993
1994-spec state_err(#state{}, dlog_state_error()) -> #state{}.
1995
1996state_err(S, Err) when S#state.error_status =:= Err -> S;
1997state_err(S, Err) ->
1998    notify_owners({error_status, Err}),
1999    S#state{error_status = Err}.
2000