1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 1997-2020. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20-module(disk_log). 21 22%% Efficient file based log - process part 23 24-export([start/0, istart_link/1, 25 log/2, log_terms/2, blog/2, blog_terms/2, 26 alog/2, alog_terms/2, balog/2, balog_terms/2, 27 close/1, lclose/1, lclose/2, sync/1, open/1, 28 truncate/1, truncate/2, btruncate/2, 29 reopen/2, reopen/3, breopen/3, inc_wrap_file/1, change_size/2, 30 change_notify/3, change_header/2, 31 chunk/2, chunk/3, bchunk/2, bchunk/3, chunk_step/3, chunk_info/1, 32 block/1, block/2, unblock/1, info/1, format_error/1, 33 accessible_logs/0, all/0]). 34 35%% Internal exports 36-export([init/2, internal_open/2, 37 system_continue/3, system_terminate/4, system_code_change/4]). 38 39%% To be used by disk_log_h.erl (not (yet) in Erlang/OTP) only. 40-export([ll_open/1, ll_close/1, do_log/2, do_sync/1, do_info/2]). 41 42%% To be used by wrap_log_reader only. 43-export([ichunk_end/2]). 44 45%% To be used for debugging only: 46-export([pid2name/1]). 47 48-export_type([continuation/0]). 49 50-deprecated([{accessible_logs, 0, "use disk_log:all/0 instead"}, 51 {lclose, 1, "use disk_log:close/1 instead"}, 52 {lclose, 2, "use disk_log:close/1 instead"}]). 53 54-type dlog_state_error() :: 'ok' | {'error', term()}. 55 56-record(state, {queue = [], 57 messages = [], 58 parent, 59 server, 60 cnt = 0 :: non_neg_integer(), 61 args, 62 error_status = ok :: dlog_state_error(), 63 cache_error = ok %% cache write error after timeout 64 }). 65 66-include("disk_log.hrl"). 67 68-define(failure(Error, Function, Arg), 69 {{failed, Error}, [{?MODULE, Function, Arg}]}). 70 71%%-define(PROFILE(C), C). 72-define(PROFILE(C), void). 73 74-compile({inline,[{log_loop,6},{log_end_sync,2},{replies,2},{rflat,1}]}). 75 76%%%---------------------------------------------------------------------- 77%%% Contract type specifications 78%%%---------------------------------------------------------------------- 79 80-opaque continuation() :: #continuation{}. 81 82-type file_error() :: term(). % XXX: refine 83-type invalid_header() :: term(). % XXX: refine 84 85%%%---------------------------------------------------------------------- 86%%% API 87%%%---------------------------------------------------------------------- 88 89%%----------------------------------------------------------------- 90%% This module implements the API, and the processes for each log. 91%% There is one process per log. 92%%----------------------------------------------------------------- 93 94-type open_error_rsn() :: 'no_such_log' 95 | {'badarg', term()} 96 | {'size_mismatch', CurrentSize :: dlog_size(), 97 NewSize :: dlog_size()} 98 | {'arg_mismatch', OptionName :: dlog_optattr(), 99 CurrentValue :: term(), Value :: term()} 100 | {'name_already_open', Log :: log()} 101 | {'open_read_write', Log :: log()} 102 | {'open_read_only', Log :: log()} 103 | {'need_repair', Log :: log()} 104 | {'not_a_log_file', FileName :: file:filename()} 105 | {'invalid_index_file', FileName :: file:filename()} 106 | {'invalid_header', invalid_header()} 107 | {'file_error', file:filename(), file_error()} 108 | {'node_already_open', Log :: log()}. 109-type open_ret() :: {'ok', Log :: log()} 110 | {'repaired', Log :: log(), 111 {'recovered', Rec :: non_neg_integer()}, 112 {'badbytes', Bad :: non_neg_integer()}} 113 | {'error', open_error_rsn()}. 114 115-spec open(ArgL) -> open_ret() when 116 ArgL :: dlog_options(). 117open(A) -> 118 disk_log_server:open(check_arg(A, #arg{options = A})). 119 120-type log_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()} 121 | {'format_external', log()} | {'blocked_log', log()} 122 | {'full', log()} | {'invalid_header', invalid_header()} 123 | {'file_error', file:filename(), file_error()}. 124 125-spec log(Log, Term) -> ok | {error, Reason :: log_error_rsn()} when 126 Log :: log(), 127 Term :: term(). 128log(Log, Term) -> 129 req(Log, {log, internal, [term_to_binary(Term)]}). 130 131-spec blog(Log, Bytes) -> ok | {error, Reason :: log_error_rsn()} when 132 Log :: log(), 133 Bytes :: iodata(). 134blog(Log, Bytes) -> 135 req(Log, {log, external, [ensure_binary(Bytes)]}). 136 137-spec log_terms(Log, TermList) -> ok | {error, Reason :: log_error_rsn()} when 138 Log :: log(), 139 TermList :: [term()]. 140log_terms(Log, Terms) -> 141 Bs = terms2bins(Terms), 142 req(Log, {log, internal, Bs}). 143 144-spec blog_terms(Log, BytesList) -> 145 ok | {error, Reason :: log_error_rsn()} when 146 Log :: log(), 147 BytesList :: [iodata()]. 148blog_terms(Log, Bytess) -> 149 Bs = ensure_binary_list(Bytess), 150 req(Log, {log, external, Bs}). 151 152-type notify_ret() :: 'ok' | {'error', 'no_such_log'}. 153 154-spec alog(Log, Term) -> notify_ret() when 155 Log :: log(), 156 Term :: term(). 157alog(Log, Term) -> 158 notify(Log, {alog, internal, [term_to_binary(Term)]}). 159 160-spec alog_terms(Log, TermList) -> notify_ret() when 161 Log :: log(), 162 TermList :: [term()]. 163alog_terms(Log, Terms) -> 164 Bs = terms2bins(Terms), 165 notify(Log, {alog, internal, Bs}). 166 167-spec balog(Log, Bytes) -> notify_ret() when 168 Log :: log(), 169 Bytes :: iodata(). 170balog(Log, Bytes) -> 171 notify(Log, {alog, external, [ensure_binary(Bytes)]}). 172 173-spec balog_terms(Log, ByteList) -> notify_ret() when 174 Log :: log(), 175 ByteList :: [iodata()]. 176balog_terms(Log, Bytess) -> 177 Bs = ensure_binary_list(Bytess), 178 notify(Log, {alog, external, Bs}). 179 180-type close_error_rsn() ::'no_such_log' | 'nonode' 181 | {'file_error', file:filename(), file_error()}. 182 183-spec close(Log) -> 'ok' | {'error', close_error_rsn()} when 184 Log :: log(). 185close(Log) -> 186 req(Log, close). 187 188-type lclose_error_rsn() :: 'no_such_log' 189 | {'file_error', file:filename(), file_error()}. 190 191-spec lclose(Log) -> 'ok' | {'error', lclose_error_rsn()} when 192 Log :: log(). 193lclose(Log) -> 194 lclose(Log, node()). 195 196-spec lclose(Log, Node) -> 'ok' | {'error', lclose_error_rsn()} when 197 Log :: log(), 198 Node :: node(). 199lclose(Log, Node) when node() =:= Node -> 200 req(Log, close); 201lclose(_Log, _Node) -> 202 {error, no_such_log}. 203 204-type trunc_error_rsn() :: 'no_such_log' | 'nonode' 205 | {'read_only_mode', log()} 206 | {'blocked_log', log()} 207 | {'invalid_header', invalid_header()} 208 | {'file_error', file:filename(), file_error()}. 209 210-spec truncate(Log) -> 'ok' | {'error', trunc_error_rsn()} when 211 Log :: log(). 212truncate(Log) -> 213 req(Log, {truncate, none, truncate, 1}). 214 215-spec truncate(Log, Head) -> 'ok' | {'error', trunc_error_rsn()} when 216 Log :: log(), 217 Head :: term(). 218truncate(Log, Head) -> 219 req(Log, {truncate, {ok, term_to_binary(Head)}, truncate, 2}). 220 221-spec btruncate(Log, BHead) -> 'ok' | {'error', trunc_error_rsn()} when 222 Log :: log(), 223 BHead :: iodata(). 224btruncate(Log, Head) -> 225 req(Log, {truncate, {ok, ensure_binary(Head)}, btruncate, 2}). 226 227-type reopen_error_rsn() :: no_such_log 228 | nonode 229 | {read_only_mode, log()} 230 | {blocked_log, log()} 231 | {same_file_name, log()} | 232 {invalid_index_file, file:filename()} 233 | {invalid_header, invalid_header()} 234 | {'file_error', file:filename(), file_error()}. 235 236-spec reopen(Log, File) -> 'ok' | {'error', reopen_error_rsn()} when 237 Log :: log(), 238 File :: file:filename(). 239reopen(Log, NewFile) -> 240 req(Log, {reopen, NewFile, none, reopen, 2}). 241 242-spec reopen(Log, File, Head) -> 'ok' | {'error', reopen_error_rsn()} when 243 Log :: log(), 244 File :: file:filename(), 245 Head :: term(). 246reopen(Log, NewFile, NewHead) -> 247 req(Log, {reopen, NewFile, {ok, term_to_binary(NewHead)}, reopen, 3}). 248 249-spec breopen(Log, File, BHead) -> 'ok' | {'error', reopen_error_rsn()} when 250 Log :: log(), 251 File :: file:filename(), 252 BHead :: iodata(). 253breopen(Log, NewFile, NewHead) -> 254 req(Log, {reopen, NewFile, {ok, ensure_binary(NewHead)}, breopen, 3}). 255 256-type inc_wrap_error_rsn() :: 'no_such_log' | 'nonode' 257 | {'read_only_mode', log()} 258 | {'blocked_log', log()} | {'halt_log', log()} 259 | {'invalid_header', invalid_header()} 260 | {'file_error', file:filename(), file_error()}. 261 262-spec inc_wrap_file(Log) -> 'ok' | {'error', inc_wrap_error_rsn()} when 263 Log :: log(). 264inc_wrap_file(Log) -> 265 req(Log, inc_wrap_file). 266 267-spec change_size(Log, Size) -> 'ok' | {'error', Reason} when 268 Log :: log(), 269 Size :: dlog_size(), 270 Reason :: no_such_log | nonode | {read_only_mode, Log} 271 | {blocked_log, Log} 272 | {new_size_too_small, Log, CurrentSize :: pos_integer()} 273 | {badarg, size} 274 | {file_error, file:filename(), file_error()}. 275change_size(Log, NewSize) -> 276 req(Log, {change_size, NewSize}). 277 278-spec change_notify(Log, Owner, Notify) -> 'ok' | {'error', Reason} when 279 Log :: log(), 280 Owner :: pid(), 281 Notify :: boolean(), 282 Reason :: no_such_log | nonode | {blocked_log, Log} 283 | {badarg, notify} | {not_owner, Owner}. 284change_notify(Log, Pid, NewNotify) -> 285 req(Log, {change_notify, Pid, NewNotify}). 286 287-spec change_header(Log, Header) -> 'ok' | {'error', Reason} when 288 Log :: log(), 289 Header :: {head, dlog_head_opt()} 290 | {head_func, MFA :: {atom(), atom(), list()}}, 291 Reason :: no_such_log | nonode | {read_only_mode, Log} 292 | {blocked_log, Log} | {badarg, head}. 293change_header(Log, NewHead) -> 294 req(Log, {change_header, NewHead}). 295 296-type sync_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()} 297 | {'blocked_log', log()} 298 | {'file_error', file:filename(), file_error()}. 299 300-spec sync(Log) -> 'ok' | {'error', sync_error_rsn()} when 301 Log :: log(). 302sync(Log) -> 303 req(Log, sync). 304 305-type block_error_rsn() :: 'no_such_log' | 'nonode' | {'blocked_log', log()}. 306 307-spec block(Log) -> 'ok' | {'error', block_error_rsn()} when 308 Log :: log(). 309block(Log) -> 310 block(Log, true). 311 312-spec block(Log, QueueLogRecords) -> 'ok' | {'error', block_error_rsn()} when 313 Log :: log(), 314 QueueLogRecords :: boolean(). 315block(Log, QueueLogRecords) -> 316 req(Log, {block, QueueLogRecords}). 317 318-type unblock_error_rsn() :: 'no_such_log' | 'nonode' 319 | {'not_blocked', log()} 320 | {'not_blocked_by_pid', log()}. 321 322-spec unblock(Log) -> 'ok' | {'error', unblock_error_rsn()} when 323 Log :: log(). 324unblock(Log) -> 325 req(Log, unblock). 326 327-spec format_error(Error) -> io_lib:chars() when 328 Error :: term(). 329format_error(Error) -> 330 do_format_error(Error). 331 332-type dlog_info() :: {name, Log :: log()} 333 | {file, File :: file:filename()} 334 | {type, Type :: dlog_type()} 335 | {format, Format :: dlog_format()} 336 | {size, Size :: dlog_size()} 337 | {mode, Mode :: dlog_mode()} 338 | {owners, [{pid(), Notify :: boolean()}]} 339 | {users, Users :: non_neg_integer()} 340 | {status, Status :: 341 ok | {blocked, QueueLogRecords :: boolean()}} 342 | {node, Node :: node()} 343 | {head, Head :: none 344 | {head, binary()} 345 | (MFA :: {atom(), atom(), list()})} 346 | {no_written_items, NoWrittenItems ::non_neg_integer()} 347 | {full, Full :: boolean} 348 | {no_current_bytes, non_neg_integer()} 349 | {no_current_items, non_neg_integer()} 350 | {no_items, non_neg_integer()} 351 | {current_file, pos_integer()} 352 | {no_overflows, {SinceLogWasOpened :: non_neg_integer(), 353 SinceLastInfo :: non_neg_integer()}}. 354-spec info(Log) -> InfoList | {'error', no_such_log} when 355 Log :: log(), 356 InfoList :: [dlog_info()]. 357info(Log) -> 358 req(Log, info). 359 360-spec pid2name(Pid) -> {'ok', Log} | 'undefined' when 361 Pid :: pid(), 362 Log :: log(). 363pid2name(Pid) -> 364 disk_log_server:start(), 365 case ets:lookup(?DISK_LOG_PID_TABLE, Pid) of 366 [] -> undefined; 367 [{_Pid, Log}] -> {ok, Log} 368 end. 369 370%% This function Takes 3 args, a Log, a Continuation and N. 371%% It retuns a {Cont2, ObjList} | eof | {error, Reason} 372%% The initial continuation is the atom 'start' 373 374-type chunk_error_rsn() :: no_such_log 375 | {format_external, log()} 376 | {blocked_log, log()} 377 | {badarg, continuation} 378 | {not_internal_wrap, log()} 379 | {corrupt_log_file, FileName :: file:filename()} 380 | {file_error, file:filename(), file_error()}. 381 382-type chunk_ret() :: {Continuation2 :: continuation(), Terms :: [term()]} 383 | {Continuation2 :: continuation(), 384 Terms :: [term()], 385 Badbytes :: non_neg_integer()} 386 | eof 387 | {error, Reason :: chunk_error_rsn()}. 388 389-spec chunk(Log, Continuation) -> chunk_ret() when 390 Log :: log(), 391 Continuation :: start | continuation(). 392chunk(Log, Cont) -> 393 chunk(Log, Cont, infinity). 394 395-spec chunk(Log, Continuation, N) -> chunk_ret() when 396 Log :: log(), 397 Continuation :: start | continuation(), 398 N :: pos_integer() | infinity. 399chunk(Log, Cont, infinity) -> 400 %% There cannot be more than ?MAX_CHUNK_SIZE terms in a chunk. 401 ichunk(Log, Cont, ?MAX_CHUNK_SIZE); 402chunk(Log, Cont, N) when is_integer(N), N > 0 -> 403 ichunk(Log, Cont, N). 404 405ichunk(Log, start, N) -> 406 R = req(Log, {chunk, 0, [], N}), 407 ichunk_end(R, Log); 408ichunk(Log, More, N) when is_record(More, continuation) -> 409 R = req2(More#continuation.pid, 410 {chunk, More#continuation.pos, More#continuation.b, N}), 411 ichunk_end(R, Log); 412ichunk(_Log, _, _) -> 413 {error, {badarg, continuation}}. 414 415ichunk_end({C, R}, Log) when is_record(C, continuation) -> 416 ichunk_end(R, read_write, Log, C, 0); 417ichunk_end({C, R, Bad}, Log) when is_record(C, continuation) -> 418 ichunk_end(R, read_only, Log, C, Bad); 419ichunk_end(R, _Log) -> 420 R. 421 422%% Create the terms on the client's heap, not the server's. 423%% The list of binaries is reversed. 424ichunk_end(R, Mode, Log, C, Bad) -> 425 case catch bins2terms(R, []) of 426 {'EXIT', _} -> 427 RR = lists:reverse(R), 428 ichunk_bad_end(RR, Mode, Log, C, Bad, []); 429 Ts when Bad > 0 -> 430 {C, Ts, Bad}; 431 Ts when Bad =:= 0 -> 432 {C, Ts} 433 end. 434 435bins2terms([], L) -> 436 L; 437bins2terms([B | Bs], L) -> 438 bins2terms(Bs, [binary_to_term(B) | L]). 439 440ichunk_bad_end([B | Bs], Mode, Log, C, Bad, A) -> 441 case catch binary_to_term(B) of 442 {'EXIT', _} when read_write =:= Mode -> 443 InfoList = info(Log), 444 {value, {file, FileName}} = lists:keysearch(file, 1, InfoList), 445 File = case C#continuation.pos of 446 Pos when is_integer(Pos) -> FileName; % halt log 447 {FileNo, _} -> add_ext(FileName, FileNo) % wrap log 448 end, 449 {error, {corrupt_log_file, File}}; 450 {'EXIT', _} when read_only =:= Mode -> 451 Reread = lists:foldl(fun(Bin, Sz) -> 452 Sz + byte_size(Bin) + ?HEADERSZ 453 end, 0, Bs), 454 NewPos = case C#continuation.pos of 455 Pos when is_integer(Pos) -> Pos-Reread; 456 {FileNo, Pos} -> {FileNo, Pos-Reread} 457 end, 458 NewBad = Bad + byte_size(B), 459 {C#continuation{pos = NewPos, b = []}, lists:reverse(A), NewBad}; 460 T -> 461 ichunk_bad_end(Bs, Mode, Log, C, Bad, [T | A]) 462 end. 463 464-type bchunk_ret() :: {Continuation2 :: continuation(), 465 Binaries :: [binary()]} 466 | {Continuation2 :: continuation(), 467 Binaries :: [binary()], 468 Badbytes :: non_neg_integer()} 469 | eof 470 | {error, Reason :: chunk_error_rsn()}. 471 472-spec bchunk(Log, Continuation) -> bchunk_ret() when 473 Log :: log(), 474 Continuation :: start | continuation(). 475bchunk(Log, Cont) -> 476 bchunk(Log, Cont, infinity). 477 478-spec bchunk(Log, Continuation, N) -> bchunk_ret() when 479 Log :: log(), 480 Continuation :: start | continuation(), 481 N :: pos_integer() | infinity. 482bchunk(Log, Cont, infinity) -> 483 %% There cannot be more than ?MAX_CHUNK_SIZE terms in a chunk. 484 bichunk(Log, Cont, ?MAX_CHUNK_SIZE); 485bchunk(Log, Cont, N) when is_integer(N), N > 0 -> 486 bichunk(Log, Cont, N). 487 488bichunk(Log, start, N) -> 489 R = req(Log, {chunk, 0, [], N}), 490 bichunk_end(R); 491bichunk(_Log, #continuation{pid = Pid, pos = Pos, b = B}, N) -> 492 R = req2(Pid, {chunk, Pos, B, N}), 493 bichunk_end(R); 494bichunk(_Log, _, _) -> 495 {error, {badarg, continuation}}. 496 497bichunk_end({C = #continuation{}, R}) -> 498 {C, lists:reverse(R)}; 499bichunk_end({C = #continuation{}, R, Bad}) -> 500 {C, lists:reverse(R), Bad}; 501bichunk_end(R) -> 502 R. 503 504-spec chunk_step(Log, Continuation, Step) -> 505 {'ok', any()} | {'error', Reason} when 506 Log :: log(), 507 Continuation :: start | continuation(), 508 Step :: integer(), 509 Reason :: no_such_log | end_of_log | {format_external, Log} 510 | {blocked_log, Log} | {badarg, continuation} 511 | {file_error, file:filename(), file_error()}. 512chunk_step(Log, Cont, N) when is_integer(N) -> 513 ichunk_step(Log, Cont, N). 514 515ichunk_step(Log, start, N) -> 516 req(Log, {chunk_step, 0, N}); 517ichunk_step(_Log, More, N) when is_record(More, continuation) -> 518 req2(More#continuation.pid, {chunk_step, More#continuation.pos, N}); 519ichunk_step(_Log, _, _) -> 520 {error, {badarg, continuation}}. 521 522-spec chunk_info(Continuation) -> InfoList | {error, Reason} when 523 Continuation :: continuation(), 524 InfoList :: [{node, Node :: node()}, ...], 525 Reason :: {no_continuation, Continuation}. 526chunk_info(More = #continuation{}) -> 527 [{node, node(More#continuation.pid)}]; 528chunk_info(BadCont) -> 529 {error, {no_continuation, BadCont}}. 530 531-spec accessible_logs() -> {[Log], []} when 532 Log :: log(). 533accessible_logs() -> 534 {disk_log_server:all(), []}. 535 536-spec all() -> [Log] when 537 Log :: log(). 538all() -> 539 disk_log_server:all(). 540 541istart_link(Server) -> 542 {ok, proc_lib:spawn_link(disk_log, init, [self(), Server])}. 543 544%% Only for backwards compatibility, could probably be removed. 545-spec start() -> 'ok'. 546start() -> 547 disk_log_server:start(). 548 549internal_open(Pid, A) -> 550 req2(Pid, {internal_open, A}). 551 552%%% ll_open() and ll_close() are used by disk_log_h.erl, a module not 553%%% (yet) in Erlang/OTP. 554 555%% -spec ll_open(dlog_options()) -> {'ok', Res :: _, #log{}, Cnt :: _} | Error. 556ll_open(A) -> 557 case check_arg(A, #arg{options = A}) of 558 {ok, L} -> do_open(L); 559 Error -> Error 560 end. 561 562%% -> closed | throw(Error) 563ll_close(Log) -> 564 close_disk_log2(Log). 565 566check_arg([], Res) -> 567 Ret = case Res#arg.head of 568 none -> 569 {ok, Res}; 570 _ -> 571 case check_head(Res#arg.head, Res#arg.format) of 572 {ok, Head} -> 573 {ok, Res#arg{head = Head}}; 574 Error -> 575 Error 576 end 577 end, 578 579 if %% check result 580 Res#arg.name =:= 0 -> 581 {error, {badarg, name}}; 582 Res#arg.file =:= none -> 583 case catch lists:concat([Res#arg.name, ".LOG"]) of 584 {'EXIT',_} -> {error, {badarg, file}}; 585 FName -> check_arg([], Res#arg{file = FName}) 586 end; 587 Res#arg.repair =:= truncate, Res#arg.mode =:= read_only -> 588 {error, {badarg, repair_read_only}}; 589 Res#arg.type =:= halt, is_tuple(Res#arg.size) -> 590 {error, {badarg, size}}; 591 Res#arg.type =:= wrap -> 592 {OldSize, Version} = 593 disk_log_1:read_size_file_version(Res#arg.file), 594 check_wrap_arg(Ret, OldSize, Version); 595 true -> 596 Ret 597 end; 598check_arg([{file, F} | Tail], Res) when is_list(F) -> 599 check_arg(Tail, Res#arg{file = F}); 600check_arg([{file, F} | Tail], Res) when is_atom(F) -> 601 check_arg(Tail, Res#arg{file = F}); 602check_arg([{linkto, Pid} |Tail], Res) when is_pid(Pid) -> 603 check_arg(Tail, Res#arg{linkto = Pid}); 604check_arg([{linkto, none} |Tail], Res) -> 605 check_arg(Tail, Res#arg{linkto = none}); 606check_arg([{name, Name}|Tail], Res) -> 607 check_arg(Tail, Res#arg{name = Name}); 608check_arg([{repair, true}|Tail], Res) -> 609 check_arg(Tail, Res#arg{repair = true}); 610check_arg([{repair, false}|Tail], Res) -> 611 check_arg(Tail, Res#arg{repair = false}); 612check_arg([{repair, truncate}|Tail], Res) -> 613 check_arg(Tail, Res#arg{repair = truncate}); 614check_arg([{size, Int}|Tail], Res) when is_integer(Int), Int > 0 -> 615 check_arg(Tail, Res#arg{size = Int}); 616check_arg([{size, infinity}|Tail], Res) -> 617 check_arg(Tail, Res#arg{size = infinity}); 618check_arg([{size, {MaxB,MaxF}}|Tail], Res) when is_integer(MaxB), 619 is_integer(MaxF), 620 MaxB > 0, MaxB =< ?MAX_BYTES, 621 MaxF > 0, MaxF < ?MAX_FILES -> 622 check_arg(Tail, Res#arg{size = {MaxB, MaxF}}); 623check_arg([{type, wrap}|Tail], Res) -> 624 check_arg(Tail, Res#arg{type = wrap}); 625check_arg([{type, halt}|Tail], Res) -> 626 check_arg(Tail, Res#arg{type = halt}); 627check_arg([{format, internal}|Tail], Res) -> 628 check_arg(Tail, Res#arg{format = internal}); 629check_arg([{format, external}|Tail], Res) -> 630 check_arg(Tail, Res#arg{format = external}); 631check_arg([{notify, true}|Tail], Res) -> 632 check_arg(Tail, Res#arg{notify = true}); 633check_arg([{notify, false}|Tail], Res) -> 634 check_arg(Tail, Res#arg{notify = false}); 635check_arg([{head_func, HeadFunc}|Tail], Res) -> 636 check_arg(Tail, Res#arg{head = {head_func, HeadFunc}}); 637check_arg([{head, Term}|Tail], Res) -> 638 check_arg(Tail, Res#arg{head = {head, Term}}); 639check_arg([{mode, read_only}|Tail], Res) -> 640 check_arg(Tail, Res#arg{mode = read_only}); 641check_arg([{mode, read_write}|Tail], Res) -> 642 check_arg(Tail, Res#arg{mode = read_write}); 643check_arg([{quiet, Boolean}|Tail], Res) when is_boolean(Boolean) -> 644 check_arg(Tail, Res#arg{quiet = Boolean}); 645check_arg(Arg, _) -> 646 {error, {badarg, Arg}}. 647 648check_wrap_arg({ok, Res}, {0,0}, _Version) when Res#arg.size =:= infinity -> 649 {error, {badarg, size}}; 650check_wrap_arg({ok, Res}, OldSize, Version) when Res#arg.size =:= infinity -> 651 {ok, Res#arg{version = Version, old_size = OldSize, size = OldSize}}; 652check_wrap_arg({ok, Res}, {0,0}, Version) -> 653 {ok, Res#arg{version = Version, old_size = Res#arg.size}}; 654check_wrap_arg({ok, Res}, OldSize, Version) when OldSize =:= Res#arg.size -> 655 {ok, Res#arg{version = Version, old_size = OldSize}}; 656check_wrap_arg({ok, Res}, OldSize, Version) when Res#arg.repair =:= truncate, 657 is_tuple(Res#arg.size) -> 658 {ok, Res#arg{version = Version, old_size = OldSize}}; 659check_wrap_arg({ok, Res}, OldSize, _Version) when Res#arg.size =/= OldSize, 660 Res#arg.mode =:= read_only -> 661 {error, {size_mismatch, OldSize, Res#arg.size}}; 662check_wrap_arg({ok, Res}, OldSize, Version) when is_tuple(Res#arg.size) -> 663 {ok, Res#arg{version = Version, old_size = OldSize}}; 664check_wrap_arg({ok, _Res}, _OldSize, _Version) -> 665 {error, {badarg, size}}; 666check_wrap_arg(Ret, _OldSize, _Version) -> 667 Ret. 668 669%%%----------------------------------------------------------------- 670%%% Server functions 671%%%----------------------------------------------------------------- 672init(Parent, Server) -> 673 ?PROFILE(ep:do()), 674 process_flag(trap_exit, true), 675 loop(#state{parent = Parent, server = Server}). 676 677loop(#state{messages = []}=State) -> 678 receive 679 Message -> 680 handle(Message, State) 681 end; 682loop(#state{messages = [M | Ms]}=State) -> 683 handle(M, State#state{messages = Ms}). 684 685handle({From, write_cache}, S) when From =:= self() -> 686 case catch do_write_cache(get(log)) of 687 ok -> 688 loop(S); 689 Error -> 690 loop(S#state{cache_error = Error}) 691 end; 692handle({From, {log, Format, B}}=Message, S) -> 693 case get(log) of 694 #log{mode = read_only}=L -> 695 reply(From, {error, {read_only_mode, L#log.name}}, S); 696 #log{status = ok, format=external}=L when Format =:= internal -> 697 reply(From, {error, {format_external, L#log.name}}, S); 698 #log{status = ok, format=LogFormat} -> 699 log_loop(S, From, [B], [], iolist_size(B), LogFormat); 700 #log{status = {blocked, false}}=L -> 701 reply(From, {error, {blocked_log, L#log.name}}, S); 702 #log{blocked_by = From}=L -> 703 reply(From, {error, {blocked_log, L#log.name}}, S); 704 _ -> 705 enqueue(Message, S) 706 end; 707handle({alog, Format, B}=Message, S) -> 708 case get(log) of 709 #log{mode = read_only} -> 710 notify_owners({read_only,B}), 711 loop(S); 712 #log{status = ok, format = external} when Format =:= internal -> 713 notify_owners({format_external, B}), 714 loop(S); 715 #log{status = ok, format=LogFormat} -> 716 log_loop(S, [], [B], [], iolist_size(B), LogFormat); 717 #log{status = {blocked, false}} -> 718 notify_owners({blocked_log, B}), 719 loop(S); 720 _ -> 721 enqueue(Message, S) 722 end; 723handle({From, {block, QueueLogRecs}}=Message, S) -> 724 case get(log) of 725 #log{status = ok}=L -> 726 do_block(From, QueueLogRecs, L), 727 reply(From, ok, S); 728 #log{status = {blocked, false}}=L -> 729 reply(From, {error, {blocked_log, L#log.name}}, S); 730 #log{blocked_by = From}=L -> 731 reply(From, {error, {blocked_log, L#log.name}}, S); 732 _ -> 733 enqueue(Message, S) 734 end; 735handle({From, unblock}, S) -> 736 case get(log) of 737 #log{status = ok}=L -> 738 reply(From, {error, {not_blocked, L#log.name}}, S); 739 #log{blocked_by = From}=L -> 740 S2 = do_unblock(L, S), 741 reply(From, ok, S2); 742 L -> 743 reply(From, {error, {not_blocked_by_pid, L#log.name}}, S) 744 end; 745handle({From, sync}=Message, S) -> 746 case get(log) of 747 #log{mode = read_only}=L -> 748 reply(From, {error, {read_only_mode, L#log.name}}, S); 749 #log{status = ok, format=LogFormat} -> 750 log_loop(S, [], [], [From], 0, LogFormat); 751 #log{status = {blocked, false}}=L -> 752 reply(From, {error, {blocked_log, L#log.name}}, S); 753 #log{blocked_by = From}=L -> 754 reply(From, {error, {blocked_log, L#log.name}}, S); 755 _ -> 756 enqueue(Message, S) 757 end; 758handle({From, {truncate, Head, F, A}}=Message, S) -> 759 case get(log) of 760 #log{mode = read_only}=L -> 761 reply(From, {error, {read_only_mode, L#log.name}}, S); 762 #log{status = ok} when S#state.cache_error =/= ok -> 763 loop(cache_error(S, [From])); 764 #log{status = ok}=L -> 765 H = merge_head(Head, L#log.head), 766 case catch do_trunc(L, H) of 767 ok -> 768 erase(is_full), 769 notify_owners({truncated, S#state.cnt}), 770 N = if H =:= none -> 0; true -> 1 end, 771 reply(From, ok, (state_ok(S))#state{cnt = N}); 772 Error -> 773 do_exit(S, From, Error, ?failure(Error, F, A)) 774 end; 775 #log{status = {blocked, false}}=L -> 776 reply(From, {error, {blocked_log, L#log.name}}, S); 777 #log{blocked_by = From}=L -> 778 reply(From, {error, {blocked_log, L#log.name}}, S); 779 _ -> 780 enqueue(Message, S) 781 end; 782handle({From, {chunk, Pos, B, N}}=Message, S) -> 783 case get(log) of 784 #log{status = ok} when S#state.cache_error =/= ok -> 785 loop(cache_error(S, [From])); 786 #log{status = ok}=L -> 787 R = do_chunk(L, Pos, B, N), 788 reply(From, R, S); 789 #log{blocked_by = From}=L -> 790 R = do_chunk(L, Pos, B, N), 791 reply(From, R, S); 792 #log{status = {blocked, false}}=L -> 793 reply(From, {error, {blocked_log, L#log.name}}, S); 794 _L -> 795 enqueue(Message, S) 796 end; 797handle({From, {chunk_step, Pos, N}}=Message, S) -> 798 case get(log) of 799 #log{status = ok} when S#state.cache_error =/= ok -> 800 loop(cache_error(S, [From])); 801 #log{status = ok}=L -> 802 R = do_chunk_step(L, Pos, N), 803 reply(From, R, S); 804 #log{blocked_by = From}=L -> 805 R = do_chunk_step(L, Pos, N), 806 reply(From, R, S); 807 #log{status = {blocked, false}}=L -> 808 reply(From, {error, {blocked_log, L#log.name}}, S); 809 _ -> 810 enqueue(Message, S) 811 end; 812handle({From, {change_notify, Pid, NewNotify}}=Message, S) -> 813 case get(log) of 814 #log{status = ok}=L -> 815 case do_change_notify(L, Pid, NewNotify) of 816 {ok, L1} -> 817 put(log, L1), 818 reply(From, ok, S); 819 Error -> 820 reply(From, Error, S) 821 end; 822 #log{status = {blocked, false}}=L -> 823 reply(From, {error, {blocked_log, L#log.name}}, S); 824 #log{blocked_by = From}=L -> 825 reply(From, {error, {blocked_log, L#log.name}}, S); 826 _ -> 827 enqueue(Message, S) 828 end; 829handle({From, {change_header, NewHead}}=Message, S) -> 830 case get(log) of 831 #log{mode = read_only}=L -> 832 reply(From, {error, {read_only_mode, L#log.name}}, S); 833 #log{status = ok, format = Format}=L -> 834 case check_head(NewHead, Format) of 835 {ok, Head} -> 836 put(log, L#log{head = mk_head(Head, Format)}), 837 reply(From, ok, S); 838 Error -> 839 reply(From, Error, S) 840 end; 841 #log{status = {blocked, false}}=L -> 842 reply(From, {error, {blocked_log, L#log.name}}, S); 843 #log{blocked_by = From}=L -> 844 reply(From, {error, {blocked_log, L#log.name}}, S); 845 _ -> 846 enqueue(Message, S) 847 end; 848handle({From, {change_size, NewSize}}=Message, S) -> 849 case get(log) of 850 #log{mode = read_only}=L -> 851 reply(From, {error, {read_only_mode, L#log.name}}, S); 852 #log{status = ok}=L -> 853 case check_size(L#log.type, NewSize) of 854 ok -> 855 case catch do_change_size(L, NewSize) of % does the put 856 ok -> 857 reply(From, ok, S); 858 {big, CurSize} -> 859 reply(From, 860 {error, 861 {new_size_too_small, L#log.name, CurSize}}, 862 S); 863 Else -> 864 reply(From, Else, state_err(S, Else)) 865 end; 866 not_ok -> 867 reply(From, {error, {badarg, size}}, S) 868 end; 869 #log{status = {blocked, false}}=L -> 870 reply(From, {error, {blocked_log, L#log.name}}, S); 871 #log{blocked_by = From}=L -> 872 reply(From, {error, {blocked_log, L#log.name}}, S); 873 _ -> 874 enqueue(Message, S) 875 end; 876handle({From, inc_wrap_file}=Message, S) -> 877 case get(log) of 878 #log{mode = read_only}=L -> 879 reply(From, {error, {read_only_mode, L#log.name}}, S); 880 #log{type = halt}=L -> 881 reply(From, {error, {halt_log, L#log.name}}, S); 882 #log{status = ok} when S#state.cache_error =/= ok -> 883 loop(cache_error(S, [From])); 884 #log{status = ok}=L -> 885 case catch do_inc_wrap_file(L) of 886 {ok, L2, Lost} -> 887 put(log, L2), 888 notify_owners({wrap, Lost}), 889 reply(From, ok, S#state{cnt = S#state.cnt-Lost}); 890 {error, Error, L2} -> 891 put(log, L2), 892 reply(From, Error, state_err(S, Error)) 893 end; 894 #log{status = {blocked, false}}=L -> 895 reply(From, {error, {blocked_log, L#log.name}}, S); 896 #log{blocked_by = From}=L -> 897 reply(From, {error, {blocked_log, L#log.name}}, S); 898 _ -> 899 enqueue(Message, S) 900 end; 901handle({From, {reopen, NewFile, Head, F, A}}, S) -> 902 case get(log) of 903 #log{mode = read_only}=L -> 904 reply(From, {error, {read_only_mode, L#log.name}}, S); 905 #log{status = ok} when S#state.cache_error =/= ok -> 906 loop(cache_error(S, [From])); 907 #log{status = ok, filename = NewFile}=L -> 908 reply(From, {error, {same_file_name, L#log.name}}, S); 909 #log{status = ok}=L -> 910 case catch close_disk_log2(L) of 911 closed -> 912 File = L#log.filename, 913 case catch rename_file(File, NewFile, L#log.type) of 914 ok -> 915 H = merge_head(Head, L#log.head), 916 case do_open((S#state.args)#arg{name = L#log.name, 917 repair = truncate, 918 head = H, 919 file = File}) of 920 {ok, Res, L2, Cnt} -> 921 put(log, L2#log{owners = L#log.owners, 922 head = L#log.head, 923 users = L#log.users}), 924 notify_owners({truncated, S#state.cnt}), 925 erase(is_full), 926 case Res of 927 {error, _} -> 928 do_exit(S, From, Res, 929 ?failure(Res, F, A)); 930 _ -> 931 reply(From, ok, S#state{cnt = Cnt}) 932 end; 933 Res -> 934 do_exit(S, From, Res, ?failure(Res, F, A)) 935 end; 936 Error -> 937 do_exit(S, From, Error, ?failure(Error, reopen, 2)) 938 end; 939 Error -> 940 do_exit(S, From, Error, ?failure(Error, F, A)) 941 end; 942 L -> 943 reply(From, {error, {blocked_log, L#log.name}}, S) 944 end; 945handle({Server, {internal_open, A}}, S) -> 946 case get(log) of 947 undefined -> 948 case do_open(A) of % does the put 949 {ok, Res, L, Cnt} -> 950 put(log, opening_pid(A#arg.linkto, A#arg.notify, L)), 951 #arg{size = Size, old_size = OldSize} = A, 952 S1 = S#state{args=A, cnt=Cnt}, 953 case Size =:= OldSize of 954 true -> 955 reply(Server, Res, S1); 956 false -> 957 case catch do_change_size(get(log), Size) of % does the put 958 ok -> 959 reply(Server, Res, S1); 960 {big, _CurSize} -> 961 %% Note: halt logs are opened even if the 962 %% given size is too small. 963 reply(Server, Res, S1); 964 Else -> 965 reply(Server, Else, S1) 966 end 967 end; 968 Res -> 969 do_fast_exit(S, Server, Res) 970 end; 971 L -> 972 TestH = mk_head(A#arg.head, A#arg.format), 973 case compare_arg(A#arg.options, S#state.args, TestH, L#log.head) of 974 ok -> 975 case add_pid(A#arg.linkto, A#arg.notify, L) of 976 {ok, L1} -> 977 put(log, L1), 978 reply(Server, {ok, L#log.name}, S); 979 Error -> 980 reply(Server, Error, S) 981 end; 982 Error -> 983 reply(Server, Error, S) 984 end 985 end; 986handle({From, close}, S) -> 987 case do_close(From, S) of 988 {stop, S1} -> 989 do_exit(S1, From, ok, normal); 990 {continue, S1} -> 991 reply(From, ok, S1) 992 end; 993handle({From, info}, S) -> 994 reply(From, do_info(get(log), S#state.cnt), S); 995handle({'EXIT', From, Reason}, #state{parent=From}=S) -> 996 %% Parent orders shutdown. 997 _ = do_stop(S), 998 exit(Reason); 999handle({'EXIT', From, Reason}, #state{server=From}=S) -> 1000 %% The server is gone. 1001 _ = do_stop(S), 1002 exit(Reason); 1003handle({'EXIT', From, _Reason}, S) -> 1004 L = get(log), 1005 case is_owner(From, L) of 1006 {true, _Notify} -> 1007 case close_owner(From, L, S) of 1008 {stop, S1} -> 1009 _ = do_stop(S1), 1010 exit(normal); 1011 {continue, S1} -> 1012 loop(S1) 1013 end; 1014 false -> 1015 %% 'users' is not decremented. 1016 S1 = do_unblock(From, get(log), S), 1017 loop(S1) 1018 end; 1019handle({system, From, Req}, S) -> 1020 sys:handle_system_msg(Req, From, S#state.parent, ?MODULE, [], S); 1021handle(_, S) -> 1022 loop(S). 1023 1024enqueue(Message, #state{queue = Queue}=S) -> 1025 loop(S#state{queue = [Message | Queue]}). 1026 1027%% Collect further log and sync requests already in the mailbox or queued 1028 1029-define(MAX_LOOK_AHEAD, 64*1024). 1030 1031%% Inlined. 1032log_loop(#state{cache_error = CE}=S, Pids, _Bins, _Sync, _Sz, _F) when CE =/= ok -> 1033 loop(cache_error(S, Pids)); 1034log_loop(#state{}=S, Pids, Bins, Sync, Sz, _F) when Sz > ?MAX_LOOK_AHEAD -> 1035 loop(log_end(S, Pids, Bins, Sync, Sz)); 1036log_loop(#state{messages = []}=S, Pids, Bins, Sync, Sz, F) -> 1037 receive 1038 Message -> 1039 log_loop(Message, Pids, Bins, Sync, Sz, F, S) 1040 after 0 -> 1041 loop(log_end(S, Pids, Bins, Sync, Sz)) 1042 end; 1043log_loop(#state{messages = [M | Ms]}=S, Pids, Bins, Sync, Sz, F) -> 1044 S1 = S#state{messages = Ms}, 1045 log_loop(M, Pids, Bins, Sync, Sz, F, S1). 1046 1047%% Items logged after the last sync request found are sync:ed as well. 1048log_loop({alog, internal, B}, Pids, Bins, Sync, Sz, internal=F, S) -> 1049 %% alog of terms allowed for the internal format only 1050 log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B), F); 1051log_loop({alog, binary, B}, Pids, Bins, Sync, Sz, F, S) -> 1052 log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B), F); 1053log_loop({From, {log, internal, B}}, Pids, Bins, Sync, Sz, internal=F, S) -> 1054 %% log of terms allowed for the internal format only 1055 log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B), F); 1056log_loop({From, {log, binary, B}}, Pids, Bins, Sync, Sz, F, S) -> 1057 log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B), F); 1058log_loop({From, sync}, Pids, Bins, Sync, Sz, F, S) -> 1059 log_loop(S, Pids, Bins, [From | Sync], Sz, F); 1060log_loop(Message, Pids, Bins, Sync, Sz, _F, S) -> 1061 NS = log_end(S, Pids, Bins, Sync, Sz), 1062 handle(Message, NS). 1063 1064log_end(S, [], [], Sync, _Sz) -> 1065 log_end_sync(S, Sync); 1066log_end(#state{cnt = Cnt}=S, Pids, Bins, Sync, Sz) -> 1067 case do_log(get(log), rflat(Bins), Sz) of 1068 N when is_integer(N) -> 1069 ok = replies(Pids, ok), 1070 S1 = (state_ok(S))#state{cnt = Cnt + N}, 1071 log_end_sync(S1, Sync); 1072 {error, {error, {full, _Name}}, N} when Pids =:= [] -> 1073 log_end_sync(state_ok(S#state{cnt = Cnt + N}), Sync); 1074 {error, Error, N} -> 1075 ok = replies(Pids, Error), 1076 state_err(S#state{cnt = Cnt + N}, Error) 1077 end. 1078 1079%% Inlined. 1080log_end_sync(S, []) -> 1081 S; 1082log_end_sync(S, Sync) -> 1083 Res = do_sync(get(log)), 1084 ok = replies(Sync, Res), 1085 state_err(S, Res). 1086 1087%% Inlined. 1088rflat([B]) -> B; 1089rflat(B) -> rflat(B, []). 1090 1091rflat([B | Bs], L) -> 1092 rflat(Bs, B ++ L); 1093rflat([], L) -> L. 1094 1095%% -> {ok, Log} | {error, Error} 1096do_change_notify(L, Pid, Notify) -> 1097 case is_owner(Pid, L) of 1098 {true, Notify} -> 1099 {ok, L}; 1100 {true, _OldNotify} when Notify =/= true, Notify =/= false -> 1101 {error, {badarg, notify}}; 1102 {true, _OldNotify} -> 1103 Owners = lists:keydelete(Pid, 1, L#log.owners), 1104 L1 = L#log{owners = [{Pid, Notify} | Owners]}, 1105 {ok, L1}; 1106 false -> 1107 {error, {not_owner, Pid}} 1108 end. 1109 1110%% -> {stop, S} | {continue, S} 1111do_close(Pid, S) -> 1112 L = get(log), 1113 case is_owner(Pid, L) of 1114 {true, _Notify} -> 1115 close_owner(Pid, L, S); 1116 false -> 1117 close_user(Pid, L, S) 1118 end. 1119 1120%% -> {stop, S} | {continue, S} 1121close_owner(Pid, L, S) -> 1122 L1 = L#log{owners = lists:keydelete(Pid, 1, L#log.owners)}, 1123 put(log, L1), 1124 S2 = do_unblock(Pid, get(log), S), 1125 unlink(Pid), 1126 do_close2(L1, S2). 1127 1128%% -> {stop, S} | {continue, S} 1129close_user(Pid, #log{users=Users}=L, S) when Users > 0 -> 1130 L1 = L#log{users = Users - 1}, 1131 put(log, L1), 1132 S2 = do_unblock(Pid, get(log), S), 1133 do_close2(L1, S2); 1134close_user(_Pid, _L, S) -> 1135 {continue, S}. 1136 1137do_close2(#log{users = 0, owners = []}, S) -> 1138 {stop, S}; 1139do_close2(_L, S) -> 1140 {continue, S}. 1141 1142%%----------------------------------------------------------------- 1143%% Callback functions for system messages handling. 1144%%----------------------------------------------------------------- 1145system_continue(_Parent, _, State) -> 1146 loop(State). 1147 1148-spec system_terminate(_, _, _, #state{}) -> no_return(). 1149system_terminate(Reason, _Parent, _, State) -> 1150 _ = do_stop(State), 1151 exit(Reason). 1152 1153%%----------------------------------------------------------------- 1154%% Temporay code for upgrade. 1155%%----------------------------------------------------------------- 1156system_code_change(State, _Module, _OldVsn, _Extra) -> 1157 {ok, State}. 1158 1159 1160%%%---------------------------------------------------------------------- 1161%%% Internal functions 1162%%%---------------------------------------------------------------------- 1163-spec do_exit(#state{}, pid(), _, _) -> no_return(). 1164do_exit(S, From, Message0, Reason) -> 1165 R = do_stop(S), 1166 Message = case S#state.cache_error of 1167 Err when Err =/= ok -> Err; 1168 _ when R =:= closed -> Message0; 1169 _ when Message0 =:= ok -> R; 1170 _ -> Message0 1171 end, 1172 _ = disk_log_server:close(self()), 1173 ok = replies(From, Message), 1174 ?PROFILE(ep:done()), 1175 exit(Reason). 1176 1177-spec do_fast_exit(#state{}, pid(), _) -> no_return(). 1178do_fast_exit(S, Server, Message) -> 1179 _ = do_stop(S), 1180 Server ! {disk_log, self(), Message}, 1181 exit(normal). 1182 1183%% -> closed | Error 1184do_stop(S) -> 1185 proc_q(S#state.queue ++ S#state.messages), 1186 close_disk_log(get(log)). 1187 1188proc_q([{From, _R}|Tail]) when is_pid(From) -> 1189 From ! {disk_log, self(), {error, disk_log_stopped}}, 1190 proc_q(Tail); 1191proc_q([_|T]) -> %% async stuff 1192 proc_q(T); 1193proc_q([]) -> 1194 ok. 1195 1196%% -> log() 1197opening_pid(Pid, Notify, L) -> 1198 {ok, L1} = add_pid(Pid, Notify, L), 1199 L1. 1200 1201%% -> {ok, log()} | Error 1202add_pid(Pid, Notify, L) when is_pid(Pid) -> 1203 case is_owner(Pid, L) of 1204 false -> 1205 link(Pid), 1206 {ok, L#log{owners = [{Pid, Notify} | L#log.owners]}}; 1207 {true, Notify} -> 1208%% {error, {pid_already_connected, L#log.name}}; 1209 {ok, L}; 1210 {true, CurNotify} when Notify =/= CurNotify -> 1211 {error, {arg_mismatch, notify, CurNotify, Notify}} 1212 end; 1213add_pid(_NotAPid, _Notify, L) -> 1214 {ok, L#log{users = L#log.users + 1}}. 1215 1216unblock_pid(#log{blocked_by = none}) -> 1217 ok; 1218unblock_pid(#log{blocked_by = Pid}=L) -> 1219 case is_owner(Pid, L) of 1220 {true, _Notify} -> 1221 ok; 1222 false -> 1223 unlink(Pid) 1224 end. 1225 1226%% -> true | false 1227is_owner(Pid, L) -> 1228 case lists:keysearch(Pid, 1, L#log.owners) of 1229 {value, {_Pid, Notify}} -> 1230 {true, Notify}; 1231 false -> 1232 false 1233 end. 1234 1235%% ok | throw(Error) 1236rename_file(File, NewFile, halt) -> 1237 case file:rename(File, NewFile) of 1238 ok -> 1239 ok; 1240 Else -> 1241 file_error(NewFile, Else) 1242 end; 1243rename_file(File, NewFile, wrap) -> 1244 rename_file(wrap_file_extensions(File), File, NewFile, ok). 1245 1246rename_file([Ext|Exts], File, NewFile0, Res) -> 1247 NewFile = add_ext(NewFile0, Ext), 1248 NRes = case file:rename(add_ext(File, Ext), NewFile) of 1249 ok -> 1250 Res; 1251 Else -> 1252 file_error(NewFile, Else) 1253 end, 1254 rename_file(Exts, File, NewFile0, NRes); 1255rename_file([], _File, _NewFiles, Res) -> Res. 1256 1257file_error(FileName, {error, Error}) -> 1258 {error, {file_error, FileName, Error}}. 1259 1260%% "Old" error messages have been kept, arg_mismatch has been added. 1261%%-spec compare_arg(dlog_options(), #arg{}, 1262compare_arg([], _A, none, _OrigHead) -> 1263 % no header option given 1264 ok; 1265compare_arg([], _A, Head, OrigHead) when Head =/= OrigHead -> 1266 {error, {arg_mismatch, head, OrigHead, Head}}; 1267compare_arg([], _A, _Head, _OrigHead) -> 1268 ok; 1269compare_arg([{Attr, Val} | Tail], A, Head, OrigHead) -> 1270 case compare_arg(Attr, Val, A) of 1271 {not_ok, OrigVal} -> 1272 {error, {arg_mismatch, Attr, OrigVal, Val}}; 1273 ok -> 1274 compare_arg(Tail, A, Head, OrigHead); 1275 Error -> 1276 Error 1277 end. 1278 1279-spec compare_arg(atom(), _, #arg{}) -> 1280 'ok' | {'not_ok', _} | {'error', {atom(), _}}. 1281compare_arg(file, F, A) when F =/= A#arg.file -> 1282 {error, {name_already_open, A#arg.name}}; 1283compare_arg(mode, read_only, A) when A#arg.mode =:= read_write -> 1284 {error, {open_read_write, A#arg.name}}; 1285compare_arg(mode, read_write, A) when A#arg.mode =:= read_only -> 1286 {error, {open_read_only, A#arg.name}}; 1287compare_arg(type, T, A) when T =/= A#arg.type -> 1288 {not_ok, A#arg.type}; 1289compare_arg(format, F, A) when F =/= A#arg.format -> 1290 {not_ok, A#arg.format}; 1291compare_arg(repair, R, A) when R =/= A#arg.repair -> 1292 %% not used, but check it anyway... 1293 {not_ok, A#arg.repair}; 1294compare_arg(size, Sz, A) when Sz =/= A#arg.size, A#arg.type =:= wrap -> 1295 %% Note: if the given size does not match the size of an open 1296 %% halt log, the given size is ignored. 1297 {error, {size_mismatch, A#arg.size, Sz}}; 1298compare_arg(_Attr, _Val, _A) -> 1299 ok. 1300 1301%% -> {ok, Res, log(), Cnt} | Error 1302do_open(A) -> 1303 #arg{type = Type, format = Format, name = Name, head = Head0, 1304 file = FName, repair = Repair, size = Size, mode = Mode, 1305 quiet = Quiet, version = V, old_size = OldSize} = A, 1306 disk_log_1:set_quiet(Quiet), 1307 Head = mk_head(Head0, Format), 1308 case do_open2(Type, Format, Name, FName, Repair, OldSize, Mode, Head, V) of 1309 {ok, Ret, Extra, FormatType, NoItems} -> 1310 L = #log{name = Name, type = Type, format = Format, 1311 filename = FName, size = Size, 1312 format_type = FormatType, head = Head, mode = Mode, 1313 version = V, extra = Extra}, 1314 {ok, Ret, L, NoItems}; 1315 Error -> 1316 Error 1317 end. 1318 1319mk_head({head, Term}, internal) -> {ok, term_to_binary(Term)}; 1320mk_head({head, Bytes}, external) -> {ok, ensure_binary(Bytes)}; 1321mk_head(H, _) -> H. 1322 1323terms2bins([T | Ts]) -> 1324 [term_to_binary(T) | terms2bins(Ts)]; 1325terms2bins([]) -> 1326 []. 1327 1328ensure_binary_list(Bs) -> 1329 ensure_binary_list(Bs, Bs). 1330 1331ensure_binary_list([B | Bs], Bs0) when is_binary(B) -> 1332 ensure_binary_list(Bs, Bs0); 1333ensure_binary_list([], Bs0) -> 1334 Bs0; 1335ensure_binary_list(_, Bs0) -> 1336 make_binary_list(Bs0). 1337 1338make_binary_list([B | Bs]) -> 1339 [ensure_binary(B) | make_binary_list(Bs)]; 1340make_binary_list([]) -> 1341 []. 1342 1343ensure_binary(Bytes) -> 1344 iolist_to_binary(Bytes). 1345 1346%%----------------------------------------------------------------- 1347%% Change size of the logs in runtime. 1348%%----------------------------------------------------------------- 1349%% -> ok | {big, CurSize} | throw(Error) 1350do_change_size(#log{type = halt}=L, NewSize) -> 1351 Halt = L#log.extra, 1352 CurB = Halt#halt.curB, 1353 NewLog = L#log{extra = Halt#halt{size = NewSize}}, 1354 if 1355 NewSize =:= infinity -> 1356 erase(is_full), 1357 put(log, NewLog), 1358 ok; 1359 CurB =< NewSize -> 1360 erase(is_full), 1361 put(log, NewLog), 1362 ok; 1363 true -> 1364 {big, CurB} 1365 end; 1366do_change_size(#log{type = wrap}=L, NewSize) -> 1367 #log{extra = Extra, version = Version} = L, 1368 {ok, Handle} = disk_log_1:change_size_wrap(Extra, NewSize, Version), 1369 erase(is_full), 1370 put(log, L#log{extra = Handle}), 1371 ok. 1372 1373%% -> {ok, Head} | Error; Head = none | {head, H} | {M,F,A} 1374check_head({head, none}, _Format) -> 1375 {ok, none}; 1376check_head({head_func, {M, F, A}}, _Format) when is_atom(M), 1377 is_atom(F), 1378 is_list(A) -> 1379 {ok, {M, F, A}}; 1380check_head({head, Head}, external) -> 1381 case catch ensure_binary(Head) of 1382 {'EXIT', _} -> 1383 {error, {badarg, head}}; 1384 _ -> 1385 {ok, {head, Head}} 1386 end; 1387check_head({head, Term}, internal) -> 1388 {ok, {head, Term}}; 1389check_head(_Head, _Format) -> 1390 {error, {badarg, head}}. 1391 1392check_size(wrap, {NewMaxB,NewMaxF}) when 1393 is_integer(NewMaxB), is_integer(NewMaxF), 1394 NewMaxB > 0, NewMaxB =< ?MAX_BYTES, NewMaxF > 0, NewMaxF < ?MAX_FILES -> 1395 ok; 1396check_size(halt, NewSize) when is_integer(NewSize), NewSize > 0 -> 1397 ok; 1398check_size(halt, infinity) -> 1399 ok; 1400check_size(_, _) -> 1401 not_ok. 1402 1403%%----------------------------------------------------------------- 1404%% Increment a wrap log. 1405%%----------------------------------------------------------------- 1406%% -> {ok, log(), Lost} | {error, Error, log()} 1407do_inc_wrap_file(L) -> 1408 #log{format = Format, extra = Handle} = L, 1409 case Format of 1410 internal -> 1411 case disk_log_1:mf_int_inc(Handle, L#log.head) of 1412 {ok, Handle2, Lost} -> 1413 {ok, L#log{extra = Handle2}, Lost}; 1414 {error, Error, Handle2} -> 1415 {error, Error, L#log{extra = Handle2}} 1416 end; 1417 external -> 1418 case disk_log_1:mf_ext_inc(Handle, L#log.head) of 1419 {ok, Handle2, Lost} -> 1420 {ok, L#log{extra = Handle2}, Lost}; 1421 {error, Error, Handle2} -> 1422 {error, Error, L#log{extra = Handle2}} 1423 end 1424 end. 1425 1426 1427%%----------------------------------------------------------------- 1428%% Open a log file. 1429%%----------------------------------------------------------------- 1430%% -> {ok, Reply, log(), Cnt} | Error 1431%% Note: the header is always written, even if the log size is too small. 1432do_open2(halt, internal, Name, FName, Repair, Size, Mode, Head, _V) -> 1433 case catch disk_log_1:int_open(FName, Repair, Mode, Head) of 1434 {ok, {_Alloc, FdC, {NoItems, _NoBytes}, FileSize}} -> 1435 Halt = #halt{fdc = FdC, curB = FileSize, size = Size}, 1436 {ok, {ok, Name}, Halt, halt_int, NoItems}; 1437 {repaired, FdC, Rec, Bad, FileSize} -> 1438 Halt = #halt{fdc = FdC, curB = FileSize, size = Size}, 1439 {ok, {repaired, Name, {recovered, Rec}, {badbytes, Bad}}, 1440 Halt, halt_int, Rec}; 1441 Error -> 1442 Error 1443 end; 1444do_open2(wrap, internal, Name, FName, Repair, Size, Mode, Head, V) -> 1445 {MaxB, MaxF} = Size, 1446 case catch 1447 disk_log_1:mf_int_open(FName, MaxB, MaxF, Repair, Mode, Head, V) of 1448 {ok, Handle, Cnt} -> 1449 {ok, {ok, Name}, Handle, wrap_int, Cnt}; 1450 {repaired, Handle, Rec, Bad, Cnt} -> 1451 {ok, {repaired, Name, {recovered, Rec}, {badbytes, Bad}}, 1452 Handle, wrap_int, Cnt}; 1453 Error -> 1454 Error 1455 end; 1456do_open2(halt, external, Name, FName, Repair, Size, Mode, Head, _V) -> 1457 case catch disk_log_1:ext_open(FName, Repair, Mode, Head) of 1458 {ok, {_Alloc, FdC, {NoItems, _NoBytes}, FileSize}} -> 1459 Halt = #halt{fdc = FdC, curB = FileSize, size = Size}, 1460 {ok, {ok, Name}, Halt, halt_ext, NoItems}; 1461 Error -> 1462 Error 1463 end; 1464do_open2(wrap, external, Name, FName, Repair, Size, Mode, Head, V) -> 1465 {MaxB, MaxF} = Size, 1466 case catch 1467 disk_log_1:mf_ext_open(FName, MaxB, MaxF, Repair, Mode, Head, V) of 1468 {ok, Handle, Cnt} -> 1469 {ok, {ok, Name}, Handle, wrap_ext, Cnt}; 1470 Error -> 1471 Error 1472 end. 1473 1474%% -> closed | Error 1475close_disk_log(undefined) -> 1476 closed; 1477close_disk_log(L) -> 1478 unblock_pid(L), 1479 F = fun({Pid, _}) -> 1480 unlink(Pid) 1481 end, 1482 lists:foreach(F, L#log.owners), 1483 R = (catch close_disk_log2(L)), 1484 erase(log), 1485 R. 1486 1487-spec close_disk_log2(#log{}) -> 'closed'. % | throw(Error) 1488 1489close_disk_log2(L) -> 1490 case L of 1491 #log{format_type = halt_int, mode = Mode, extra = Halt} -> 1492 disk_log_1:close(Halt#halt.fdc, L#log.filename, Mode); 1493 #log{format_type = wrap_int, mode = Mode, extra = Handle} -> 1494 disk_log_1:mf_int_close(Handle, Mode); 1495 #log{format_type = halt_ext, extra = Halt} -> 1496 disk_log_1:fclose(Halt#halt.fdc, L#log.filename); 1497 #log{format_type = wrap_ext, mode = Mode, extra = Handle} -> 1498 disk_log_1:mf_ext_close(Handle, Mode) 1499 end, 1500 closed. 1501 1502do_format_error({error, Module, Error}) -> 1503 Module:format_error(Error); 1504do_format_error({error, Reason}) -> 1505 do_format_error(Reason); 1506do_format_error({Node, Error = {error, _Reason}}) -> 1507 lists:append(io_lib:format("~p: ", [Node]), do_format_error(Error)); 1508do_format_error({badarg, Arg}) -> 1509 io_lib:format("The argument ~p is missing, not recognized or " 1510 "not wellformed~n", [Arg]); 1511do_format_error({size_mismatch, OldSize, ArgSize}) -> 1512 io_lib:format("The given size ~p does not match the size ~p found on " 1513 "the disk log size file~n", [ArgSize, OldSize]); 1514do_format_error({read_only_mode, Log}) -> 1515 io_lib:format("The disk log ~tp has been opened read-only, but the " 1516 "requested operation needs read-write access~n", [Log]); 1517do_format_error({format_external, Log}) -> 1518 io_lib:format("The requested operation can only be applied on internally " 1519 "formatted disk logs, but ~tp is externally formatted~n", 1520 [Log]); 1521do_format_error({blocked_log, Log}) -> 1522 io_lib:format("The blocked disk log ~tp does not queue requests, or " 1523 "the log has been blocked by the calling process~n", [Log]); 1524do_format_error({full, Log}) -> 1525 io_lib:format("The halt log ~tp is full~n", [Log]); 1526do_format_error({not_blocked, Log}) -> 1527 io_lib:format("The disk log ~tp is not blocked~n", [Log]); 1528do_format_error({not_owner, Pid}) -> 1529 io_lib:format("The pid ~tp is not an owner of the disk log~n", [Pid]); 1530do_format_error({not_blocked_by_pid, Log}) -> 1531 io_lib:format("The disk log ~tp is blocked, but only the blocking pid " 1532 "can unblock a disk log~n", [Log]); 1533do_format_error({new_size_too_small, Log, CurrentSize}) -> 1534 io_lib:format("The current size ~p of the halt log ~tp is greater than the " 1535 "requested new size~n", [CurrentSize, Log]); 1536do_format_error({halt_log, Log}) -> 1537 io_lib:format("The halt log ~tp cannot be wrapped~n", [Log]); 1538do_format_error({same_file_name, Log}) -> 1539 io_lib:format("Current and new file name of the disk log ~tp " 1540 "are the same~n", [Log]); 1541do_format_error({arg_mismatch, Option, FirstValue, ArgValue}) -> 1542 io_lib:format("The value ~tp of the disk log option ~p does not match " 1543 "the current value ~tp~n", [ArgValue, Option, FirstValue]); 1544do_format_error({name_already_open, Log}) -> 1545 io_lib:format("The disk log ~tp has already opened another file~n", [Log]); 1546do_format_error({open_read_write, Log}) -> 1547 io_lib:format("The disk log ~tp has already been opened read-write~n", 1548 [Log]); 1549do_format_error({open_read_only, Log}) -> 1550 io_lib:format("The disk log ~tp has already been opened read-only~n", 1551 [Log]); 1552do_format_error({not_internal_wrap, Log}) -> 1553 io_lib:format("The requested operation cannot be applied since ~tp is not " 1554 "an internally formatted disk log~n", [Log]); 1555do_format_error(no_such_log) -> 1556 io_lib:format("There is no disk log with the given name~n", []); 1557do_format_error(nonode) -> 1558 io_lib:format("There seems to be no node up that can handle " 1559 "the request~n", []); 1560do_format_error(nodedown) -> 1561 io_lib:format("There seems to be no node up that can handle " 1562 "the request~n", []); 1563do_format_error({corrupt_log_file, FileName}) -> 1564 io_lib:format("The disk log file \"~ts\" contains corrupt data~n", 1565 [FileName]); 1566do_format_error({need_repair, FileName}) -> 1567 io_lib:format("The disk log file \"~ts\" has not been closed properly and " 1568 "needs repair~n", [FileName]); 1569do_format_error({not_a_log_file, FileName}) -> 1570 io_lib:format("The file \"~ts\" is not a wrap log file~n", [FileName]); 1571do_format_error({invalid_header, InvalidHeader}) -> 1572 io_lib:format("The disk log header is not wellformed: ~p~n", 1573 [InvalidHeader]); 1574do_format_error(end_of_log) -> 1575 io_lib:format("An attempt was made to step outside a not yet " 1576 "full wrap log~n", []); 1577do_format_error({invalid_index_file, FileName}) -> 1578 io_lib:format("The wrap log index file \"~ts\" cannot be used~n", 1579 [FileName]); 1580do_format_error({no_continuation, BadCont}) -> 1581 io_lib:format("The term ~p is not a chunk continuation~n", [BadCont]); 1582do_format_error({file_error, FileName, Reason}) -> 1583 io_lib:format("\"~ts\": ~tp~n", [FileName, file:format_error(Reason)]); 1584do_format_error(E) -> 1585 io_lib:format("~tp~n", [E]). 1586 1587do_info(L, Cnt) -> 1588 #log{name = Name, type = Type, mode = Mode, filename = File, 1589 extra = Extra, status = Status, owners = Owners, users = Users, 1590 format = Format, head = Head} = L, 1591 Size = case Type of 1592 wrap -> 1593 disk_log_1:get_wrap_size(Extra); 1594 halt -> 1595 Extra#halt.size 1596 end, 1597 RW = case Type of 1598 wrap when Mode =:= read_write -> 1599 #handle{curB = CurB, curF = CurF, 1600 cur_cnt = CurCnt, acc_cnt = AccCnt, 1601 noFull = NoFull, accFull = AccFull} = Extra, 1602 NewAccFull = AccFull + NoFull, 1603 NewExtra = Extra#handle{noFull = 0, accFull = NewAccFull}, 1604 put(log, L#log{extra = NewExtra}), 1605 [{no_current_bytes, CurB}, 1606 {no_current_items, CurCnt}, 1607 {no_items, Cnt}, 1608 {no_written_items, CurCnt + AccCnt}, 1609 {current_file, CurF}, 1610 {no_overflows, {NewAccFull, NoFull}} 1611 ]; 1612 halt when Mode =:= read_write -> 1613 IsFull = case get(is_full) of 1614 undefined -> false; 1615 _ -> true 1616 end, 1617 [{full, IsFull}, 1618 {no_written_items, Cnt} 1619 ]; 1620 _ when Mode =:= read_only -> 1621 [] 1622 end, 1623 HeadL = case Mode of 1624 read_write -> 1625 [{head, case Head of 1626 {ok, H} -> H; 1627 none -> Head; 1628 {_M, _F, _A} -> Head 1629 end}]; 1630 read_only -> 1631 [] 1632 end, 1633 Common = [{name, Name}, 1634 {file, File}, 1635 {type, Type}, 1636 {format, Format}, 1637 {size, Size}, 1638 {items, Cnt}, % kept for "backward compatibility" (undocumented) 1639 {owners, Owners}, 1640 {users, Users}] ++ 1641 HeadL ++ 1642 [{mode, Mode}, 1643 {status, Status}, 1644 {node, node()} 1645 ], 1646 Common ++ RW. 1647 1648do_block(Pid, QueueLogRecs, L) -> 1649 L2 = L#log{status = {blocked, QueueLogRecs}, blocked_by = Pid}, 1650 put(log, L2), 1651 case is_owner(Pid, L2) of 1652 {true, _Notify} -> 1653 ok; 1654 false -> 1655 link(Pid) 1656 end. 1657 1658do_unblock(Pid, #log{blocked_by = Pid}=L, S) -> 1659 do_unblock(L, S); 1660do_unblock(_Pid, _L, S) -> 1661 S. 1662 1663do_unblock(L, S) -> 1664 unblock_pid(L), 1665 L2 = L#log{blocked_by = none, status = ok}, 1666 put(log, L2), 1667 %% Since the block request is synchronous, and the blocking 1668 %% process is the only process that can unblock, all requests in 1669 %% 'messages' will have been put in 'queue' before the unblock 1670 %% request is granted. 1671 [] = S#state.messages, % assertion 1672 S#state{queue = [], messages = lists:reverse(S#state.queue)}. 1673 1674-spec do_log(#log{}, [binary()]) -> integer() | {'error', _, integer()}. 1675 1676do_log(L, B) -> 1677 do_log(L, B, iolist_size(B)). 1678 1679do_log(#log{type = halt}=L, B, BSz) -> 1680 #log{format = Format, extra = Halt} = L, 1681 #halt{curB = CurSize, size = Sz} = Halt, 1682 {Bs, BSize} = logl(B, Format, BSz), 1683 case get(is_full) of 1684 true -> 1685 {error, {error, {full, L#log.name}}, 0}; 1686 undefined when Sz =:= infinity; CurSize + BSize =< Sz -> 1687 halt_write(Halt, L, B, Bs, BSize); 1688 undefined -> 1689 halt_write_full(L, B, Format, 0) 1690 end; 1691do_log(#log{format_type = wrap_int}=L, B, _BSz) -> 1692 case disk_log_1:mf_int_log(L#log.extra, B, L#log.head) of 1693 {ok, Handle, Logged, Lost, Wraps} -> 1694 notify_owners_wrap(Wraps), 1695 put(log, L#log{extra = Handle}), 1696 Logged - Lost; 1697 {ok, Handle, Logged} -> 1698 put(log, L#log{extra = Handle}), 1699 Logged; 1700 {error, Error, Handle, Logged, Lost} -> 1701 put(log, L#log{extra = Handle}), 1702 {error, Error, Logged - Lost} 1703 end; 1704do_log(#log{format_type = wrap_ext}=L, B, _BSz) -> 1705 case disk_log_1:mf_ext_log(L#log.extra, B, L#log.head) of 1706 {ok, Handle, Logged, Lost, Wraps} -> 1707 notify_owners_wrap(Wraps), 1708 put(log, L#log{extra = Handle}), 1709 Logged - Lost; 1710 {ok, Handle, Logged} -> 1711 put(log, L#log{extra = Handle}), 1712 Logged; 1713 {error, Error, Handle, Logged, Lost} -> 1714 put(log, L#log{extra = Handle}), 1715 {error, Error, Logged - Lost} 1716 end. 1717 1718logl(B, external, undefined) -> 1719 {B, iolist_size(B)}; 1720logl(B, external, Sz) -> 1721 {B, Sz}; 1722logl(B, internal, _Sz) -> 1723 disk_log_1:logl(B). 1724 1725halt_write_full(L, [Bin | Bins], Format, N) -> 1726 B = [Bin], 1727 {Bs, BSize} = logl(B, Format, undefined), 1728 Halt = L#log.extra, 1729 #halt{curB = CurSize, size = Sz} = Halt, 1730 if 1731 CurSize + BSize =< Sz -> 1732 case halt_write(Halt, L, B, Bs, BSize) of 1733 N1 when is_integer(N1) -> 1734 halt_write_full(get(log), Bins, Format, N+N1); 1735 Error -> 1736 Error 1737 end; 1738 true -> 1739 halt_write_full(L, [], Format, N) 1740 end; 1741halt_write_full(L, _Bs, _Format, N) -> 1742 put(is_full, true), 1743 notify_owners(full), 1744 {error, {error, {full, L#log.name}}, N}. 1745 1746halt_write(Halt, L, B, Bs, BSize) -> 1747 case disk_log_1:fwrite(Halt#halt.fdc, L#log.filename, Bs, BSize) of 1748 {ok, NewFdC} -> 1749 NCurB = Halt#halt.curB + BSize, 1750 NewHalt = Halt#halt{fdc = NewFdC, curB = NCurB}, 1751 put(log, L#log{extra = NewHalt}), 1752 length(B); 1753 {Error, NewFdC} -> 1754 put(log, L#log{extra = Halt#halt{fdc = NewFdC}}), 1755 {error, Error, 0} 1756 end. 1757 1758%% -> ok | Error 1759do_write_cache(#log{filename = FName, type = halt, extra = Halt} = Log) -> 1760 {Reply, NewFdC} = disk_log_1:write_cache(Halt#halt.fdc, FName), 1761 put(log, Log#log{extra = Halt#halt{fdc = NewFdC}}), 1762 Reply; 1763do_write_cache(#log{type = wrap, extra = Handle} = Log) -> 1764 {Reply, NewHandle} = disk_log_1:mf_write_cache(Handle), 1765 put(log, Log#log{extra = NewHandle}), 1766 Reply. 1767 1768%% -> ok | Error 1769do_sync(#log{filename = FName, type = halt, extra = Halt} = Log) -> 1770 {Reply, NewFdC} = disk_log_1:sync(Halt#halt.fdc, FName), 1771 put(log, Log#log{extra = Halt#halt{fdc = NewFdC}}), 1772 Reply; 1773do_sync(#log{type = wrap, extra = Handle} = Log) -> 1774 {Reply, NewHandle} = disk_log_1:mf_sync(Handle), 1775 put(log, Log#log{extra = NewHandle}), 1776 Reply. 1777 1778%% -> ok | Error | throw(Error) 1779do_trunc(#log{type = halt}=L, Head) -> 1780 #log{filename = FName, extra = Halt} = L, 1781 FdC = Halt#halt.fdc, 1782 {Reply1, FdC2} = 1783 case L#log.format of 1784 internal -> 1785 disk_log_1:truncate(FdC, FName, Head); 1786 external -> 1787 case disk_log_1:truncate_at(FdC, FName, bof) of 1788 {ok, NFdC} when Head =:= none -> 1789 {ok, NFdC}; 1790 {ok, NFdC} -> 1791 {ok, H} = Head, 1792 disk_log_1:fwrite(NFdC, FName, H, byte_size(H)); 1793 R -> 1794 R 1795 end 1796 end, 1797 {Reply, NewHalt} = 1798 case disk_log_1:position(FdC2, FName, cur) of 1799 {ok, NewFdC, FileSize} when Reply1 =:= ok -> 1800 {ok, Halt#halt{fdc = NewFdC, curB = FileSize}}; 1801 {Reply2, NewFdC} -> 1802 {Reply2, Halt#halt{fdc = NewFdC}}; 1803 {ok, NewFdC, _} -> 1804 {Reply1, Halt#halt{fdc = NewFdC}} 1805 end, 1806 put(log, L#log{extra = NewHalt}), 1807 Reply; 1808do_trunc(#log{type = wrap}=L, Head) -> 1809 Handle = L#log.extra, 1810 OldHead = L#log.head, 1811 {MaxB, MaxF} = disk_log_1:get_wrap_size(Handle), 1812 ok = do_change_size(L, {MaxB, 1}), 1813 NewLog = trunc_wrap((get(log))#log{head = Head}), 1814 %% Just to remove all files with suffix > 1: 1815 NewLog2 = trunc_wrap(NewLog), 1816 NewHandle = (NewLog2#log.extra)#handle{noFull = 0, accFull = 0}, 1817 do_change_size(NewLog2#log{extra = NewHandle, head = OldHead}, 1818 {MaxB, MaxF}). 1819 1820trunc_wrap(L) -> 1821 case do_inc_wrap_file(L) of 1822 {ok, L2, _Lost} -> 1823 L2; 1824 {error, Error, _L2} -> 1825 throw(Error) 1826 end. 1827 1828do_chunk(#log{format_type = halt_int, extra = Halt} = L, Pos, B, N) -> 1829 FdC = Halt#halt.fdc, 1830 {NewFdC, Reply} = 1831 case L#log.mode of 1832 read_only -> 1833 disk_log_1:chunk_read_only(FdC, L#log.filename, Pos, B, N); 1834 read_write -> 1835 disk_log_1:chunk(FdC, L#log.filename, Pos, B, N) 1836 end, 1837 put(log, L#log{extra = Halt#halt{fdc = NewFdC}}), 1838 Reply; 1839do_chunk(#log{format_type = wrap_int, mode = read_only, 1840 extra = Handle} = Log, Pos, B, N) -> 1841 {NewHandle, Reply} = disk_log_1:mf_int_chunk_read_only(Handle, Pos, B, N), 1842 put(log, Log#log{extra = NewHandle}), 1843 Reply; 1844do_chunk(#log{format_type = wrap_int, extra = Handle} = Log, Pos, B, N) -> 1845 {NewHandle, Reply} = disk_log_1:mf_int_chunk(Handle, Pos, B, N), 1846 put(log, Log#log{extra = NewHandle}), 1847 Reply; 1848do_chunk(Log, _Pos, _B, _) -> 1849 {error, {format_external, Log#log.name}}. 1850 1851do_chunk_step(#log{format_type = wrap_int, extra = Handle}, Pos, N) -> 1852 disk_log_1:mf_int_chunk_step(Handle, Pos, N); 1853do_chunk_step(Log, _Pos, _N) -> 1854 {error, {not_internal_wrap, Log#log.name}}. 1855 1856%% Inlined. 1857replies(Pids, Reply) -> 1858 M = {disk_log, self(), Reply}, 1859 send_reply(Pids, M). 1860 1861send_reply(Pid, M) when is_pid(Pid) -> 1862 Pid ! M, 1863 ok; 1864send_reply([Pid | Pids], M) -> 1865 Pid ! M, 1866 send_reply(Pids, M); 1867send_reply([], _M) -> 1868 ok. 1869 1870reply(To, Reply, S) -> 1871 To ! {disk_log, self(), Reply}, 1872 loop(S). 1873 1874req(Log, R) -> 1875 case disk_log_server:get_log_pid(Log) of 1876 undefined -> 1877 {error, no_such_log}; 1878 Pid -> 1879 monitor_request(Pid, R) 1880 end. 1881 1882monitor_request(Pid, Req) -> 1883 Ref = erlang:monitor(process, Pid), 1884 Pid ! {self(), Req}, 1885 receive 1886 {'DOWN', Ref, process, Pid, _Info} -> 1887 {error, no_such_log}; 1888 {disk_log, Pid, Reply} when not is_tuple(Reply) orelse 1889 element(2, Reply) =/= disk_log_stopped -> 1890 erlang:demonitor(Ref, [flush]), 1891 Reply 1892 end. 1893 1894req2(Pid, R) -> 1895 monitor_request(Pid, R). 1896 1897merge_head(none, Head) -> 1898 Head; 1899merge_head(Head, _) -> 1900 Head. 1901 1902%% -> List of extensions of existing files (no dot included) | throw(FileError) 1903wrap_file_extensions(File) -> 1904 {_CurF, _CurFSz, _TotSz, NoOfFiles} = 1905 disk_log_1:read_index_file(File), 1906 Fs = if 1907 NoOfFiles >= 1 -> 1908 lists:seq(1, NoOfFiles); 1909 NoOfFiles =:= 0 -> 1910 [] 1911 end, 1912 Fun = fun(Ext) -> 1913 case file:read_file_info(add_ext(File, Ext)) of 1914 {ok, _} -> 1915 true; 1916 _Else -> 1917 false 1918 end 1919 end, 1920 lists:filter(Fun, ["idx", "siz" | Fs]). 1921 1922add_ext(File, Ext) -> 1923 lists:concat([File, ".", Ext]). 1924 1925notify(Log, R) -> 1926 case disk_log_server:get_log_pid(Log) of 1927 undefined -> 1928 {error, no_such_log}; 1929 Pid -> 1930 Pid ! R, 1931 ok 1932 end. 1933 1934notify_owners_wrap([]) -> 1935 ok; 1936notify_owners_wrap([N | Wraps]) -> 1937 notify_owners({wrap, N}), 1938 notify_owners_wrap(Wraps). 1939 1940notify_owners(Note) -> 1941 L = get(log), 1942 Msg = {disk_log, node(), L#log.name, Note}, 1943 lists:foreach(fun({Pid, true}) -> Pid ! Msg; 1944 (_) -> ok 1945 end, L#log.owners). 1946 1947cache_error(#state{cache_error=Error}=S, Pids) -> 1948 ok = replies(Pids, Error), 1949 state_err(S#state{cache_error = ok}, Error). 1950 1951state_ok(S) -> 1952 state_err(S, ok). 1953 1954-spec state_err(#state{}, dlog_state_error()) -> #state{}. 1955 1956state_err(S, Err) when S#state.error_status =:= Err -> S; 1957state_err(S, Err) -> 1958 notify_owners({error_status, Err}), 1959 S#state{error_status = Err}. 1960