1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 1997-2018. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20-module(disk_log). 21 22%% Efficient file based log - process part 23 24-export([start/0, istart_link/1, 25 log/2, log_terms/2, blog/2, blog_terms/2, 26 alog/2, alog_terms/2, balog/2, balog_terms/2, 27 close/1, lclose/1, lclose/2, sync/1, open/1, 28 truncate/1, truncate/2, btruncate/2, 29 reopen/2, reopen/3, breopen/3, inc_wrap_file/1, change_size/2, 30 change_notify/3, change_header/2, 31 chunk/2, chunk/3, bchunk/2, bchunk/3, chunk_step/3, chunk_info/1, 32 block/1, block/2, unblock/1, info/1, format_error/1, 33 accessible_logs/0]). 34 35%% Internal exports 36-export([init/2, internal_open/2, 37 system_continue/3, system_terminate/4, system_code_change/4]). 38 39%% To be used by disk_log_h.erl (not (yet) in Erlang/OTP) only. 40-export([ll_open/1, ll_close/1, do_log/2, do_sync/1, do_info/2]). 41 42%% To be used by wrap_log_reader only. 43-export([ichunk_end/2]). 44 45%% To be used for debugging only: 46-export([pid2name/1]). 47 48-export_type([continuation/0]). 49 50-type dlog_state_error() :: 'ok' | {'error', term()}. 51 52-record(state, {queue = [], 53 messages = [], 54 parent, 55 server, 56 cnt = 0 :: non_neg_integer(), 57 args, 58 error_status = ok :: dlog_state_error(), 59 cache_error = ok %% cache write error after timeout 60 }). 61 62-include("disk_log.hrl"). 63 64-define(failure(Error, Function, Arg), 65 {{failed, Error}, [{?MODULE, Function, Arg}]}). 66 67%%-define(PROFILE(C), C). 68-define(PROFILE(C), void). 69 70-compile({inline,[{log_loop,6},{log_end_sync,2},{replies,2},{rflat,1}]}). 71 72%%%---------------------------------------------------------------------- 73%%% Contract type specifications 74%%%---------------------------------------------------------------------- 75 76-opaque continuation() :: #continuation{}. 77 78-type file_error() :: term(). % XXX: refine 79-type invalid_header() :: term(). % XXX: refine 80 81%%%---------------------------------------------------------------------- 82%%% API 83%%%---------------------------------------------------------------------- 84 85%%----------------------------------------------------------------- 86%% This module implements the API, and the processes for each log. 87%% There is one process per log. 88%%----------------------------------------------------------------- 89 90-type open_error_rsn() :: 'no_such_log' 91 | {'badarg', term()} 92 | {'size_mismatch', CurrentSize :: dlog_size(), 93 NewSize :: dlog_size()} 94 | {'arg_mismatch', OptionName :: dlog_optattr(), 95 CurrentValue :: term(), Value :: term()} 96 | {'name_already_open', Log :: log()} 97 | {'open_read_write', Log :: log()} 98 | {'open_read_only', Log :: log()} 99 | {'need_repair', Log :: log()} 100 | {'not_a_log_file', FileName :: file:filename()} 101 | {'invalid_index_file', FileName :: file:filename()} 102 | {'invalid_header', invalid_header()} 103 | {'file_error', file:filename(), file_error()} 104 | {'node_already_open', Log :: log()}. 105-type dist_error_rsn() :: 'nodedown' | open_error_rsn(). 106-type ret() :: {'ok', Log :: log()} 107 | {'repaired', Log :: log(), 108 {'recovered', Rec :: non_neg_integer()}, 109 {'badbytes', Bad :: non_neg_integer()}}. 110-type open_ret() :: ret() | {'error', open_error_rsn()}. 111-type dist_open_ret() :: {[{node(), ret()}], 112 [{node(), {'error', dist_error_rsn()}}]}. 113 114-spec open(ArgL) -> open_ret() | dist_open_ret() when 115 ArgL :: dlog_options(). 116open(A) -> 117 disk_log_server:open(check_arg(A, #arg{options = A})). 118 119-type log_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()} 120 | {'format_external', log()} | {'blocked_log', log()} 121 | {'full', log()} | {'invalid_header', invalid_header()} 122 | {'file_error', file:filename(), file_error()}. 123 124-spec log(Log, Term) -> ok | {error, Reason :: log_error_rsn()} when 125 Log :: log(), 126 Term :: term(). 127log(Log, Term) -> 128 req(Log, {log, internal, [term_to_binary(Term)]}). 129 130-spec blog(Log, Bytes) -> ok | {error, Reason :: log_error_rsn()} when 131 Log :: log(), 132 Bytes :: iodata(). 133blog(Log, Bytes) -> 134 req(Log, {log, external, [ensure_binary(Bytes)]}). 135 136-spec log_terms(Log, TermList) -> ok | {error, Resaon :: log_error_rsn()} when 137 Log :: log(), 138 TermList :: [term()]. 139log_terms(Log, Terms) -> 140 Bs = terms2bins(Terms), 141 req(Log, {log, internal, Bs}). 142 143-spec blog_terms(Log, BytesList) -> 144 ok | {error, Reason :: log_error_rsn()} when 145 Log :: log(), 146 BytesList :: [iodata()]. 147blog_terms(Log, Bytess) -> 148 Bs = ensure_binary_list(Bytess), 149 req(Log, {log, external, Bs}). 150 151-type notify_ret() :: 'ok' | {'error', 'no_such_log'}. 152 153-spec alog(Log, Term) -> notify_ret() when 154 Log :: log(), 155 Term :: term(). 156alog(Log, Term) -> 157 notify(Log, {alog, internal, [term_to_binary(Term)]}). 158 159-spec alog_terms(Log, TermList) -> notify_ret() when 160 Log :: log(), 161 TermList :: [term()]. 162alog_terms(Log, Terms) -> 163 Bs = terms2bins(Terms), 164 notify(Log, {alog, internal, Bs}). 165 166-spec balog(Log, Bytes) -> notify_ret() when 167 Log :: log(), 168 Bytes :: iodata(). 169balog(Log, Bytes) -> 170 notify(Log, {alog, external, [ensure_binary(Bytes)]}). 171 172-spec balog_terms(Log, ByteList) -> notify_ret() when 173 Log :: log(), 174 ByteList :: [iodata()]. 175balog_terms(Log, Bytess) -> 176 Bs = ensure_binary_list(Bytess), 177 notify(Log, {alog, external, Bs}). 178 179-type close_error_rsn() ::'no_such_log' | 'nonode' 180 | {'file_error', file:filename(), file_error()}. 181 182-spec close(Log) -> 'ok' | {'error', close_error_rsn()} when 183 Log :: log(). 184close(Log) -> 185 req(Log, close). 186 187-type lclose_error_rsn() :: 'no_such_log' 188 | {'file_error', file:filename(), file_error()}. 189 190-spec lclose(Log) -> 'ok' | {'error', lclose_error_rsn()} when 191 Log :: log(). 192lclose(Log) -> 193 lclose(Log, node()). 194 195-spec lclose(Log, Node) -> 'ok' | {'error', lclose_error_rsn()} when 196 Log :: log(), 197 Node :: node(). 198lclose(Log, Node) -> 199 lreq(Log, close, Node). 200 201-type trunc_error_rsn() :: 'no_such_log' | 'nonode' 202 | {'read_only_mode', log()} 203 | {'blocked_log', log()} 204 | {'invalid_header', invalid_header()} 205 | {'file_error', file:filename(), file_error()}. 206 207-spec truncate(Log) -> 'ok' | {'error', trunc_error_rsn()} when 208 Log :: log(). 209truncate(Log) -> 210 req(Log, {truncate, none, truncate, 1}). 211 212-spec truncate(Log, Head) -> 'ok' | {'error', trunc_error_rsn()} when 213 Log :: log(), 214 Head :: term(). 215truncate(Log, Head) -> 216 req(Log, {truncate, {ok, term_to_binary(Head)}, truncate, 2}). 217 218-spec btruncate(Log, BHead) -> 'ok' | {'error', trunc_error_rsn()} when 219 Log :: log(), 220 BHead :: iodata(). 221btruncate(Log, Head) -> 222 req(Log, {truncate, {ok, ensure_binary(Head)}, btruncate, 2}). 223 224-type reopen_error_rsn() :: no_such_log 225 | nonode 226 | {read_only_mode, log()} 227 | {blocked_log, log()} 228 | {same_file_name, log()} | 229 {invalid_index_file, file:filename()} 230 | {invalid_header, invalid_header()} 231 | {'file_error', file:filename(), file_error()}. 232 233-spec reopen(Log, File) -> 'ok' | {'error', reopen_error_rsn()} when 234 Log :: log(), 235 File :: file:filename(). 236reopen(Log, NewFile) -> 237 req(Log, {reopen, NewFile, none, reopen, 2}). 238 239-spec reopen(Log, File, Head) -> 'ok' | {'error', reopen_error_rsn()} when 240 Log :: log(), 241 File :: file:filename(), 242 Head :: term(). 243reopen(Log, NewFile, NewHead) -> 244 req(Log, {reopen, NewFile, {ok, term_to_binary(NewHead)}, reopen, 3}). 245 246-spec breopen(Log, File, BHead) -> 'ok' | {'error', reopen_error_rsn()} when 247 Log :: log(), 248 File :: file:filename(), 249 BHead :: iodata(). 250breopen(Log, NewFile, NewHead) -> 251 req(Log, {reopen, NewFile, {ok, ensure_binary(NewHead)}, breopen, 3}). 252 253-type inc_wrap_error_rsn() :: 'no_such_log' | 'nonode' 254 | {'read_only_mode', log()} 255 | {'blocked_log', log()} | {'halt_log', log()} 256 | {'invalid_header', invalid_header()} 257 | {'file_error', file:filename(), file_error()}. 258 259-spec inc_wrap_file(Log) -> 'ok' | {'error', inc_wrap_error_rsn()} when 260 Log :: log(). 261inc_wrap_file(Log) -> 262 req(Log, inc_wrap_file). 263 264-spec change_size(Log, Size) -> 'ok' | {'error', Reason} when 265 Log :: log(), 266 Size :: dlog_size(), 267 Reason :: no_such_log | nonode | {read_only_mode, Log} 268 | {blocked_log, Log} 269 | {new_size_too_small, Log, CurrentSize :: pos_integer()} 270 | {badarg, size} 271 | {file_error, file:filename(), file_error()}. 272change_size(Log, NewSize) -> 273 req(Log, {change_size, NewSize}). 274 275-spec change_notify(Log, Owner, Notify) -> 'ok' | {'error', Reason} when 276 Log :: log(), 277 Owner :: pid(), 278 Notify :: boolean(), 279 Reason :: no_such_log | nonode | {blocked_log, Log} 280 | {badarg, notify} | {not_owner, Owner}. 281change_notify(Log, Pid, NewNotify) -> 282 req(Log, {change_notify, Pid, NewNotify}). 283 284-spec change_header(Log, Header) -> 'ok' | {'error', Reason} when 285 Log :: log(), 286 Header :: {head, dlog_head_opt()} 287 | {head_func, MFA :: {atom(), atom(), list()}}, 288 Reason :: no_such_log | nonode | {read_only_mode, Log} 289 | {blocked_log, Log} | {badarg, head}. 290change_header(Log, NewHead) -> 291 req(Log, {change_header, NewHead}). 292 293-type sync_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()} 294 | {'blocked_log', log()} 295 | {'file_error', file:filename(), file_error()}. 296 297-spec sync(Log) -> 'ok' | {'error', sync_error_rsn()} when 298 Log :: log(). 299sync(Log) -> 300 req(Log, sync). 301 302-type block_error_rsn() :: 'no_such_log' | 'nonode' | {'blocked_log', log()}. 303 304-spec block(Log) -> 'ok' | {'error', block_error_rsn()} when 305 Log :: log(). 306block(Log) -> 307 block(Log, true). 308 309-spec block(Log, QueueLogRecords) -> 'ok' | {'error', block_error_rsn()} when 310 Log :: log(), 311 QueueLogRecords :: boolean(). 312block(Log, QueueLogRecords) -> 313 req(Log, {block, QueueLogRecords}). 314 315-type unblock_error_rsn() :: 'no_such_log' | 'nonode' 316 | {'not_blocked', log()} 317 | {'not_blocked_by_pid', log()}. 318 319-spec unblock(Log) -> 'ok' | {'error', unblock_error_rsn()} when 320 Log :: log(). 321unblock(Log) -> 322 req(Log, unblock). 323 324-spec format_error(Error) -> io_lib:chars() when 325 Error :: term(). 326format_error(Error) -> 327 do_format_error(Error). 328 329-type dlog_info() :: {name, Log :: log()} 330 | {file, File :: file:filename()} 331 | {type, Type :: dlog_type()} 332 | {format, Format :: dlog_format()} 333 | {size, Size :: dlog_size()} 334 | {mode, Mode :: dlog_mode()} 335 | {owners, [{pid(), Notify :: boolean()}]} 336 | {users, Users :: non_neg_integer()} 337 | {status, Status :: 338 ok | {blocked, QueueLogRecords :: boolean()}} 339 | {node, Node :: node()} 340 | {distributed, Dist :: local | [node()]} 341 | {head, Head :: none 342 | {head, term()} 343 | (MFA :: {atom(), atom(), list()})} 344 | {no_written_items, NoWrittenItems ::non_neg_integer()} 345 | {full, Full :: boolean} 346 | {no_current_bytes, non_neg_integer()} 347 | {no_current_items, non_neg_integer()} 348 | {no_items, non_neg_integer()} 349 | {current_file, pos_integer()} 350 | {no_overflows, {SinceLogWasOpened :: non_neg_integer(), 351 SinceLastInfo :: non_neg_integer()}}. 352-spec info(Log) -> InfoList | {'error', no_such_log} when 353 Log :: log(), 354 InfoList :: [dlog_info()]. 355info(Log) -> 356 sreq(Log, info). 357 358-spec pid2name(Pid) -> {'ok', Log} | 'undefined' when 359 Pid :: pid(), 360 Log :: log(). 361pid2name(Pid) -> 362 disk_log_server:start(), 363 case ets:lookup(?DISK_LOG_PID_TABLE, Pid) of 364 [] -> undefined; 365 [{_Pid, Log}] -> {ok, Log} 366 end. 367 368%% This function Takes 3 args, a Log, a Continuation and N. 369%% It retuns a {Cont2, ObjList} | eof | {error, Reason} 370%% The initial continuation is the atom 'start' 371 372-type chunk_error_rsn() :: no_such_log 373 | {format_external, log()} 374 | {blocked_log, log()} 375 | {badarg, continuation} 376 | {not_internal_wrap, log()} 377 | {corrupt_log_file, FileName :: file:filename()} 378 | {file_error, file:filename(), file_error()}. 379 380-type chunk_ret() :: {Continuation2 :: continuation(), Terms :: [term()]} 381 | {Continuation2 :: continuation(), 382 Terms :: [term()], 383 Badbytes :: non_neg_integer()} 384 | eof 385 | {error, Reason :: chunk_error_rsn()}. 386 387-spec chunk(Log, Continuation) -> chunk_ret() when 388 Log :: log(), 389 Continuation :: start | continuation(). 390chunk(Log, Cont) -> 391 chunk(Log, Cont, infinity). 392 393-spec chunk(Log, Continuation, N) -> chunk_ret() when 394 Log :: log(), 395 Continuation :: start | continuation(), 396 N :: pos_integer() | infinity. 397chunk(Log, Cont, infinity) -> 398 %% There cannot be more than ?MAX_CHUNK_SIZE terms in a chunk. 399 ichunk(Log, Cont, ?MAX_CHUNK_SIZE); 400chunk(Log, Cont, N) when is_integer(N), N > 0 -> 401 ichunk(Log, Cont, N). 402 403ichunk(Log, start, N) -> 404 R = sreq(Log, {chunk, 0, [], N}), 405 ichunk_end(R, Log); 406ichunk(Log, More, N) when is_record(More, continuation) -> 407 R = req2(More#continuation.pid, 408 {chunk, More#continuation.pos, More#continuation.b, N}), 409 ichunk_end(R, Log); 410ichunk(_Log, _, _) -> 411 {error, {badarg, continuation}}. 412 413ichunk_end({C, R}, Log) when is_record(C, continuation) -> 414 ichunk_end(R, read_write, Log, C, 0); 415ichunk_end({C, R, Bad}, Log) when is_record(C, continuation) -> 416 ichunk_end(R, read_only, Log, C, Bad); 417ichunk_end(R, _Log) -> 418 R. 419 420%% Create the terms on the client's heap, not the server's. 421%% The list of binaries is reversed. 422ichunk_end(R, Mode, Log, C, Bad) -> 423 case catch bins2terms(R, []) of 424 {'EXIT', _} -> 425 RR = lists:reverse(R), 426 ichunk_bad_end(RR, Mode, Log, C, Bad, []); 427 Ts when Bad > 0 -> 428 {C, Ts, Bad}; 429 Ts when Bad =:= 0 -> 430 {C, Ts} 431 end. 432 433bins2terms([], L) -> 434 L; 435bins2terms([B | Bs], L) -> 436 bins2terms(Bs, [binary_to_term(B) | L]). 437 438ichunk_bad_end([B | Bs], Mode, Log, C, Bad, A) -> 439 case catch binary_to_term(B) of 440 {'EXIT', _} when read_write =:= Mode -> 441 InfoList = info(Log), 442 {value, {file, FileName}} = lists:keysearch(file, 1, InfoList), 443 File = case C#continuation.pos of 444 Pos when is_integer(Pos) -> FileName; % halt log 445 {FileNo, _} -> add_ext(FileName, FileNo) % wrap log 446 end, 447 {error, {corrupt_log_file, File}}; 448 {'EXIT', _} when read_only =:= Mode -> 449 Reread = lists:foldl(fun(Bin, Sz) -> 450 Sz + byte_size(Bin) + ?HEADERSZ 451 end, 0, Bs), 452 NewPos = case C#continuation.pos of 453 Pos when is_integer(Pos) -> Pos-Reread; 454 {FileNo, Pos} -> {FileNo, Pos-Reread} 455 end, 456 NewBad = Bad + byte_size(B), 457 {C#continuation{pos = NewPos, b = []}, lists:reverse(A), NewBad}; 458 T -> 459 ichunk_bad_end(Bs, Mode, Log, C, Bad, [T | A]) 460 end. 461 462-type bchunk_ret() :: {Continuation2 :: continuation(), 463 Binaries :: [binary()]} 464 | {Continuation2 :: continuation(), 465 Binaries :: [binary()], 466 Badbytes :: non_neg_integer()} 467 | eof 468 | {error, Reason :: chunk_error_rsn()}. 469 470-spec bchunk(Log, Continuation) -> bchunk_ret() when 471 Log :: log(), 472 Continuation :: start | continuation(). 473bchunk(Log, Cont) -> 474 bchunk(Log, Cont, infinity). 475 476-spec bchunk(Log, Continuation, N) -> bchunk_ret() when 477 Log :: log(), 478 Continuation :: start | continuation(), 479 N :: pos_integer() | infinity. 480bchunk(Log, Cont, infinity) -> 481 %% There cannot be more than ?MAX_CHUNK_SIZE terms in a chunk. 482 bichunk(Log, Cont, ?MAX_CHUNK_SIZE); 483bchunk(Log, Cont, N) when is_integer(N), N > 0 -> 484 bichunk(Log, Cont, N). 485 486bichunk(Log, start, N) -> 487 R = sreq(Log, {chunk, 0, [], N}), 488 bichunk_end(R); 489bichunk(_Log, #continuation{pid = Pid, pos = Pos, b = B}, N) -> 490 R = req2(Pid, {chunk, Pos, B, N}), 491 bichunk_end(R); 492bichunk(_Log, _, _) -> 493 {error, {badarg, continuation}}. 494 495bichunk_end({C = #continuation{}, R}) -> 496 {C, lists:reverse(R)}; 497bichunk_end({C = #continuation{}, R, Bad}) -> 498 {C, lists:reverse(R), Bad}; 499bichunk_end(R) -> 500 R. 501 502-spec chunk_step(Log, Continuation, Step) -> 503 {'ok', any()} | {'error', Reason} when 504 Log :: log(), 505 Continuation :: start | continuation(), 506 Step :: integer(), 507 Reason :: no_such_log | end_of_log | {format_external, Log} 508 | {blocked_log, Log} | {badarg, continuation} 509 | {file_error, file:filename(), file_error()}. 510chunk_step(Log, Cont, N) when is_integer(N) -> 511 ichunk_step(Log, Cont, N). 512 513ichunk_step(Log, start, N) -> 514 sreq(Log, {chunk_step, 0, N}); 515ichunk_step(_Log, More, N) when is_record(More, continuation) -> 516 req2(More#continuation.pid, {chunk_step, More#continuation.pos, N}); 517ichunk_step(_Log, _, _) -> 518 {error, {badarg, continuation}}. 519 520-spec chunk_info(Continuation) -> InfoList | {error, Reason} when 521 Continuation :: continuation(), 522 InfoList :: [{node, Node :: node()}, ...], 523 Reason :: {no_continuation, Continuation}. 524chunk_info(More = #continuation{}) -> 525 [{node, node(More#continuation.pid)}]; 526chunk_info(BadCont) -> 527 {error, {no_continuation, BadCont}}. 528 529-spec accessible_logs() -> {[LocalLog], [DistributedLog]} when 530 LocalLog :: log(), 531 DistributedLog :: log(). 532accessible_logs() -> 533 disk_log_server:accessible_logs(). 534 535istart_link(Server) -> 536 {ok, proc_lib:spawn_link(disk_log, init, [self(), Server])}. 537 538%% Only for backwards compatibility, could probably be removed. 539-spec start() -> 'ok'. 540start() -> 541 disk_log_server:start(). 542 543internal_open(Pid, A) -> 544 req2(Pid, {internal_open, A}). 545 546%%% ll_open() and ll_close() are used by disk_log_h.erl, a module not 547%%% (yet) in Erlang/OTP. 548 549%% -spec ll_open(dlog_options()) -> {'ok', Res :: _, #log{}, Cnt :: _} | Error. 550ll_open(A) -> 551 case check_arg(A, #arg{options = A}) of 552 {ok, L} -> do_open(L); 553 Error -> Error 554 end. 555 556%% -> closed | throw(Error) 557ll_close(Log) -> 558 close_disk_log2(Log). 559 560check_arg([], Res) -> 561 Ret = case Res#arg.head of 562 none -> 563 {ok, Res}; 564 _ -> 565 case check_head(Res#arg.head, Res#arg.format) of 566 {ok, Head} -> 567 {ok, Res#arg{head = Head}}; 568 Error -> 569 Error 570 end 571 end, 572 573 if %% check result 574 Res#arg.name =:= 0 -> 575 {error, {badarg, name}}; 576 Res#arg.file =:= none -> 577 case catch lists:concat([Res#arg.name, ".LOG"]) of 578 {'EXIT',_} -> {error, {badarg, file}}; 579 FName -> check_arg([], Res#arg{file = FName}) 580 end; 581 Res#arg.repair =:= truncate, Res#arg.mode =:= read_only -> 582 {error, {badarg, repair_read_only}}; 583 Res#arg.type =:= halt, is_tuple(Res#arg.size) -> 584 {error, {badarg, size}}; 585 Res#arg.type =:= wrap -> 586 {OldSize, Version} = 587 disk_log_1:read_size_file_version(Res#arg.file), 588 check_wrap_arg(Ret, OldSize, Version); 589 true -> 590 Ret 591 end; 592check_arg([{file, F} | Tail], Res) when is_list(F) -> 593 check_arg(Tail, Res#arg{file = F}); 594check_arg([{file, F} | Tail], Res) when is_atom(F) -> 595 check_arg(Tail, Res#arg{file = F}); 596check_arg([{linkto, Pid} |Tail], Res) when is_pid(Pid) -> 597 check_arg(Tail, Res#arg{linkto = Pid}); 598check_arg([{linkto, none} |Tail], Res) -> 599 check_arg(Tail, Res#arg{linkto = none}); 600check_arg([{name, Name}|Tail], Res) -> 601 check_arg(Tail, Res#arg{name = Name}); 602check_arg([{repair, true}|Tail], Res) -> 603 check_arg(Tail, Res#arg{repair = true}); 604check_arg([{repair, false}|Tail], Res) -> 605 check_arg(Tail, Res#arg{repair = false}); 606check_arg([{repair, truncate}|Tail], Res) -> 607 check_arg(Tail, Res#arg{repair = truncate}); 608check_arg([{size, Int}|Tail], Res) when is_integer(Int), Int > 0 -> 609 check_arg(Tail, Res#arg{size = Int}); 610check_arg([{size, infinity}|Tail], Res) -> 611 check_arg(Tail, Res#arg{size = infinity}); 612check_arg([{size, {MaxB,MaxF}}|Tail], Res) when is_integer(MaxB), 613 is_integer(MaxF), 614 MaxB > 0, MaxB =< ?MAX_BYTES, 615 MaxF > 0, MaxF < ?MAX_FILES -> 616 check_arg(Tail, Res#arg{size = {MaxB, MaxF}}); 617check_arg([{type, wrap}|Tail], Res) -> 618 check_arg(Tail, Res#arg{type = wrap}); 619check_arg([{type, halt}|Tail], Res) -> 620 check_arg(Tail, Res#arg{type = halt}); 621check_arg([{format, internal}|Tail], Res) -> 622 check_arg(Tail, Res#arg{format = internal}); 623check_arg([{format, external}|Tail], Res) -> 624 check_arg(Tail, Res#arg{format = external}); 625check_arg([{distributed, []}|Tail], Res) -> 626 check_arg(Tail, Res#arg{distributed = false}); 627check_arg([{distributed, Nodes}|Tail], Res) when is_list(Nodes) -> 628 check_arg(Tail, Res#arg{distributed = {true, Nodes}}); 629check_arg([{notify, true}|Tail], Res) -> 630 check_arg(Tail, Res#arg{notify = true}); 631check_arg([{notify, false}|Tail], Res) -> 632 check_arg(Tail, Res#arg{notify = false}); 633check_arg([{head_func, HeadFunc}|Tail], Res) -> 634 check_arg(Tail, Res#arg{head = {head_func, HeadFunc}}); 635check_arg([{head, Term}|Tail], Res) -> 636 check_arg(Tail, Res#arg{head = {head, Term}}); 637check_arg([{mode, read_only}|Tail], Res) -> 638 check_arg(Tail, Res#arg{mode = read_only}); 639check_arg([{mode, read_write}|Tail], Res) -> 640 check_arg(Tail, Res#arg{mode = read_write}); 641check_arg([{quiet, Boolean}|Tail], Res) when is_boolean(Boolean) -> 642 check_arg(Tail, Res#arg{quiet = Boolean}); 643check_arg(Arg, _) -> 644 {error, {badarg, Arg}}. 645 646check_wrap_arg({ok, Res}, {0,0}, _Version) when Res#arg.size =:= infinity -> 647 {error, {badarg, size}}; 648check_wrap_arg({ok, Res}, OldSize, Version) when Res#arg.size =:= infinity -> 649 NewRes = Res#arg{size = OldSize}, 650 check_wrap_arg({ok, NewRes}, OldSize, Version); 651check_wrap_arg({ok, Res}, {0,0}, Version) -> 652 {ok, Res#arg{version = Version}}; 653check_wrap_arg({ok, Res}, OldSize, Version) when OldSize =:= Res#arg.size -> 654 {ok, Res#arg{version = Version}}; 655check_wrap_arg({ok, Res}, _OldSize, Version) when Res#arg.repair =:= truncate, 656 is_tuple(Res#arg.size) -> 657 {ok, Res#arg{version = Version}}; 658check_wrap_arg({ok, Res}, OldSize, _Version) when is_tuple(Res#arg.size) -> 659 {error, {size_mismatch, OldSize, Res#arg.size}}; 660check_wrap_arg({ok, _Res}, _OldSize, _Version) -> 661 {error, {badarg, size}}; 662check_wrap_arg(Ret, _OldSize, _Version) -> 663 Ret. 664 665%%%----------------------------------------------------------------- 666%%% Server functions 667%%%----------------------------------------------------------------- 668init(Parent, Server) -> 669 ?PROFILE(ep:do()), 670 process_flag(trap_exit, true), 671 loop(#state{parent = Parent, server = Server}). 672 673loop(#state{messages = []}=State) -> 674 receive 675 Message -> 676 handle(Message, State) 677 end; 678loop(#state{messages = [M | Ms]}=State) -> 679 handle(M, State#state{messages = Ms}). 680 681handle({From, write_cache}, S) when From =:= self() -> 682 case catch do_write_cache(get(log)) of 683 ok -> 684 loop(S); 685 Error -> 686 loop(S#state{cache_error = Error}) 687 end; 688handle({From, {log, Format, B}}=Message, S) -> 689 case get(log) of 690 #log{mode = read_only}=L -> 691 reply(From, {error, {read_only_mode, L#log.name}}, S); 692 #log{status = ok, format=external}=L when Format =:= internal -> 693 reply(From, {error, {format_external, L#log.name}}, S); 694 #log{status = ok, format=LogFormat} -> 695 log_loop(S, From, [B], [], iolist_size(B), LogFormat); 696 #log{status = {blocked, false}}=L -> 697 reply(From, {error, {blocked_log, L#log.name}}, S); 698 #log{blocked_by = From}=L -> 699 reply(From, {error, {blocked_log, L#log.name}}, S); 700 _ -> 701 enqueue(Message, S) 702 end; 703handle({alog, Format, B}=Message, S) -> 704 case get(log) of 705 #log{mode = read_only} -> 706 notify_owners({read_only,B}), 707 loop(S); 708 #log{status = ok, format = external} when Format =:= internal -> 709 notify_owners({format_external, B}), 710 loop(S); 711 #log{status = ok, format=LogFormat} -> 712 log_loop(S, [], [B], [], iolist_size(B), LogFormat); 713 #log{status = {blocked, false}} -> 714 notify_owners({blocked_log, B}), 715 loop(S); 716 _ -> 717 enqueue(Message, S) 718 end; 719handle({From, {block, QueueLogRecs}}=Message, S) -> 720 case get(log) of 721 #log{status = ok}=L -> 722 do_block(From, QueueLogRecs, L), 723 reply(From, ok, S); 724 #log{status = {blocked, false}}=L -> 725 reply(From, {error, {blocked_log, L#log.name}}, S); 726 #log{blocked_by = From}=L -> 727 reply(From, {error, {blocked_log, L#log.name}}, S); 728 _ -> 729 enqueue(Message, S) 730 end; 731handle({From, unblock}, S) -> 732 case get(log) of 733 #log{status = ok}=L -> 734 reply(From, {error, {not_blocked, L#log.name}}, S); 735 #log{blocked_by = From}=L -> 736 S2 = do_unblock(L, S), 737 reply(From, ok, S2); 738 L -> 739 reply(From, {error, {not_blocked_by_pid, L#log.name}}, S) 740 end; 741handle({From, sync}=Message, S) -> 742 case get(log) of 743 #log{mode = read_only}=L -> 744 reply(From, {error, {read_only_mode, L#log.name}}, S); 745 #log{status = ok, format=LogFormat} -> 746 log_loop(S, [], [], [From], 0, LogFormat); 747 #log{status = {blocked, false}}=L -> 748 reply(From, {error, {blocked_log, L#log.name}}, S); 749 #log{blocked_by = From}=L -> 750 reply(From, {error, {blocked_log, L#log.name}}, S); 751 _ -> 752 enqueue(Message, S) 753 end; 754handle({From, {truncate, Head, F, A}}=Message, S) -> 755 case get(log) of 756 #log{mode = read_only}=L -> 757 reply(From, {error, {read_only_mode, L#log.name}}, S); 758 #log{status = ok} when S#state.cache_error =/= ok -> 759 loop(cache_error(S, [From])); 760 #log{status = ok}=L -> 761 H = merge_head(Head, L#log.head), 762 case catch do_trunc(L, H) of 763 ok -> 764 erase(is_full), 765 notify_owners({truncated, S#state.cnt}), 766 N = if Head =:= none -> 0; true -> 1 end, 767 reply(From, ok, (state_ok(S))#state{cnt = N}); 768 Error -> 769 do_exit(S, From, Error, ?failure(Error, F, A)) 770 end; 771 #log{status = {blocked, false}}=L -> 772 reply(From, {error, {blocked_log, L#log.name}}, S); 773 #log{blocked_by = From}=L -> 774 reply(From, {error, {blocked_log, L#log.name}}, S); 775 _ -> 776 enqueue(Message, S) 777 end; 778handle({From, {chunk, Pos, B, N}}=Message, S) -> 779 case get(log) of 780 #log{status = ok} when S#state.cache_error =/= ok -> 781 loop(cache_error(S, [From])); 782 #log{status = ok}=L -> 783 R = do_chunk(L, Pos, B, N), 784 reply(From, R, S); 785 #log{blocked_by = From}=L -> 786 R = do_chunk(L, Pos, B, N), 787 reply(From, R, S); 788 #log{status = {blocked, false}}=L -> 789 reply(From, {error, {blocked_log, L#log.name}}, S); 790 _L -> 791 enqueue(Message, S) 792 end; 793handle({From, {chunk_step, Pos, N}}=Message, S) -> 794 case get(log) of 795 #log{status = ok} when S#state.cache_error =/= ok -> 796 loop(cache_error(S, [From])); 797 #log{status = ok}=L -> 798 R = do_chunk_step(L, Pos, N), 799 reply(From, R, S); 800 #log{blocked_by = From}=L -> 801 R = do_chunk_step(L, Pos, N), 802 reply(From, R, S); 803 #log{status = {blocked, false}}=L -> 804 reply(From, {error, {blocked_log, L#log.name}}, S); 805 _ -> 806 enqueue(Message, S) 807 end; 808handle({From, {change_notify, Pid, NewNotify}}=Message, S) -> 809 case get(log) of 810 #log{status = ok}=L -> 811 case do_change_notify(L, Pid, NewNotify) of 812 {ok, L1} -> 813 put(log, L1), 814 reply(From, ok, S); 815 Error -> 816 reply(From, Error, S) 817 end; 818 #log{status = {blocked, false}}=L -> 819 reply(From, {error, {blocked_log, L#log.name}}, S); 820 #log{blocked_by = From}=L -> 821 reply(From, {error, {blocked_log, L#log.name}}, S); 822 _ -> 823 enqueue(Message, S) 824 end; 825handle({From, {change_header, NewHead}}=Message, S) -> 826 case get(log) of 827 #log{mode = read_only}=L -> 828 reply(From, {error, {read_only_mode, L#log.name}}, S); 829 #log{status = ok, format = Format}=L -> 830 case check_head(NewHead, Format) of 831 {ok, Head} -> 832 put(log, L#log{head = mk_head(Head, Format)}), 833 reply(From, ok, S); 834 Error -> 835 reply(From, Error, S) 836 end; 837 #log{status = {blocked, false}}=L -> 838 reply(From, {error, {blocked_log, L#log.name}}, S); 839 #log{blocked_by = From}=L -> 840 reply(From, {error, {blocked_log, L#log.name}}, S); 841 _ -> 842 enqueue(Message, S) 843 end; 844handle({From, {change_size, NewSize}}=Message, S) -> 845 case get(log) of 846 #log{mode = read_only}=L -> 847 reply(From, {error, {read_only_mode, L#log.name}}, S); 848 #log{status = ok}=L -> 849 case check_size(L#log.type, NewSize) of 850 ok -> 851 case catch do_change_size(L, NewSize) of % does the put 852 ok -> 853 reply(From, ok, S); 854 {big, CurSize} -> 855 reply(From, 856 {error, 857 {new_size_too_small, L#log.name, CurSize}}, 858 S); 859 Else -> 860 reply(From, Else, state_err(S, Else)) 861 end; 862 not_ok -> 863 reply(From, {error, {badarg, size}}, S) 864 end; 865 #log{status = {blocked, false}}=L -> 866 reply(From, {error, {blocked_log, L#log.name}}, S); 867 #log{blocked_by = From}=L -> 868 reply(From, {error, {blocked_log, L#log.name}}, S); 869 _ -> 870 enqueue(Message, S) 871 end; 872handle({From, inc_wrap_file}=Message, S) -> 873 case get(log) of 874 #log{mode = read_only}=L -> 875 reply(From, {error, {read_only_mode, L#log.name}}, S); 876 #log{type = halt}=L -> 877 reply(From, {error, {halt_log, L#log.name}}, S); 878 #log{status = ok} when S#state.cache_error =/= ok -> 879 loop(cache_error(S, [From])); 880 #log{status = ok}=L -> 881 case catch do_inc_wrap_file(L) of 882 {ok, L2, Lost} -> 883 put(log, L2), 884 notify_owners({wrap, Lost}), 885 reply(From, ok, S#state{cnt = S#state.cnt-Lost}); 886 {error, Error, L2} -> 887 put(log, L2), 888 reply(From, Error, state_err(S, Error)) 889 end; 890 #log{status = {blocked, false}}=L -> 891 reply(From, {error, {blocked_log, L#log.name}}, S); 892 #log{blocked_by = From}=L -> 893 reply(From, {error, {blocked_log, L#log.name}}, S); 894 _ -> 895 enqueue(Message, S) 896 end; 897handle({From, {reopen, NewFile, Head, F, A}}, S) -> 898 case get(log) of 899 #log{mode = read_only}=L -> 900 reply(From, {error, {read_only_mode, L#log.name}}, S); 901 #log{status = ok} when S#state.cache_error =/= ok -> 902 loop(cache_error(S, [From])); 903 #log{status = ok, filename = NewFile}=L -> 904 reply(From, {error, {same_file_name, L#log.name}}, S); 905 #log{status = ok}=L -> 906 case catch close_disk_log2(L) of 907 closed -> 908 File = L#log.filename, 909 case catch rename_file(File, NewFile, L#log.type) of 910 ok -> 911 H = merge_head(Head, L#log.head), 912 case do_open((S#state.args)#arg{name = L#log.name, 913 repair = truncate, 914 head = H, 915 file = File}) of 916 {ok, Res, L2, Cnt} -> 917 put(log, L2#log{owners = L#log.owners, 918 head = L#log.head, 919 users = L#log.users}), 920 notify_owners({truncated, S#state.cnt}), 921 erase(is_full), 922 case Res of 923 {error, _} -> 924 do_exit(S, From, Res, 925 ?failure(Res, F, A)); 926 _ -> 927 reply(From, ok, S#state{cnt = Cnt}) 928 end; 929 Res -> 930 do_exit(S, From, Res, ?failure(Res, F, A)) 931 end; 932 Error -> 933 do_exit(S, From, Error, ?failure(Error, reopen, 2)) 934 end; 935 Error -> 936 do_exit(S, From, Error, ?failure(Error, F, A)) 937 end; 938 L -> 939 reply(From, {error, {blocked_log, L#log.name}}, S) 940 end; 941handle({Server, {internal_open, A}}, S) -> 942 case get(log) of 943 undefined -> 944 case do_open(A) of % does the put 945 {ok, Res, L, Cnt} -> 946 put(log, opening_pid(A#arg.linkto, A#arg.notify, L)), 947 reply(Server, Res, S#state{args=A, cnt=Cnt}); 948 Res -> 949 do_fast_exit(S, Server, Res) 950 end; 951 L -> 952 TestH = mk_head(A#arg.head, A#arg.format), 953 case compare_arg(A#arg.options, S#state.args, TestH, L#log.head) of 954 ok -> 955 case add_pid(A#arg.linkto, A#arg.notify, L) of 956 {ok, L1} -> 957 put(log, L1), 958 reply(Server, {ok, L#log.name}, S); 959 Error -> 960 reply(Server, Error, S) 961 end; 962 Error -> 963 reply(Server, Error, S) 964 end 965 end; 966handle({From, close}, S) -> 967 case do_close(From, S) of 968 {stop, S1} -> 969 do_exit(S1, From, ok, normal); 970 {continue, S1} -> 971 reply(From, ok, S1) 972 end; 973handle({From, info}, S) -> 974 reply(From, do_info(get(log), S#state.cnt), S); 975handle({'EXIT', From, Reason}, #state{parent=From}=S) -> 976 %% Parent orders shutdown. 977 _ = do_stop(S), 978 exit(Reason); 979handle({'EXIT', From, Reason}, #state{server=From}=S) -> 980 %% The server is gone. 981 _ = do_stop(S), 982 exit(Reason); 983handle({'EXIT', From, _Reason}, S) -> 984 L = get(log), 985 case is_owner(From, L) of 986 {true, _Notify} -> 987 case close_owner(From, L, S) of 988 {stop, S1} -> 989 _ = do_stop(S1), 990 exit(normal); 991 {continue, S1} -> 992 loop(S1) 993 end; 994 false -> 995 %% 'users' is not decremented. 996 S1 = do_unblock(From, get(log), S), 997 loop(S1) 998 end; 999handle({system, From, Req}, S) -> 1000 sys:handle_system_msg(Req, From, S#state.parent, ?MODULE, [], S); 1001handle(_, S) -> 1002 loop(S). 1003 1004enqueue(Message, #state{queue = Queue}=S) -> 1005 loop(S#state{queue = [Message | Queue]}). 1006 1007%% Collect further log and sync requests already in the mailbox or queued 1008 1009-define(MAX_LOOK_AHEAD, 64*1024). 1010 1011%% Inlined. 1012log_loop(#state{cache_error = CE}=S, Pids, _Bins, _Sync, _Sz, _F) when CE =/= ok -> 1013 loop(cache_error(S, Pids)); 1014log_loop(#state{}=S, Pids, Bins, Sync, Sz, _F) when Sz > ?MAX_LOOK_AHEAD -> 1015 loop(log_end(S, Pids, Bins, Sync, Sz)); 1016log_loop(#state{messages = []}=S, Pids, Bins, Sync, Sz, F) -> 1017 receive 1018 Message -> 1019 log_loop(Message, Pids, Bins, Sync, Sz, F, S) 1020 after 0 -> 1021 loop(log_end(S, Pids, Bins, Sync, Sz)) 1022 end; 1023log_loop(#state{messages = [M | Ms]}=S, Pids, Bins, Sync, Sz, F) -> 1024 S1 = S#state{messages = Ms}, 1025 log_loop(M, Pids, Bins, Sync, Sz, F, S1). 1026 1027%% Items logged after the last sync request found are sync:ed as well. 1028log_loop({alog, internal, B}, Pids, Bins, Sync, Sz, internal=F, S) -> 1029 %% alog of terms allowed for the internal format only 1030 log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B), F); 1031log_loop({alog, binary, B}, Pids, Bins, Sync, Sz, F, S) -> 1032 log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B), F); 1033log_loop({From, {log, internal, B}}, Pids, Bins, Sync, Sz, internal=F, S) -> 1034 %% log of terms allowed for the internal format only 1035 log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B), F); 1036log_loop({From, {log, binary, B}}, Pids, Bins, Sync, Sz, F, S) -> 1037 log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B), F); 1038log_loop({From, sync}, Pids, Bins, Sync, Sz, F, S) -> 1039 log_loop(S, Pids, Bins, [From | Sync], Sz, F); 1040log_loop(Message, Pids, Bins, Sync, Sz, _F, S) -> 1041 NS = log_end(S, Pids, Bins, Sync, Sz), 1042 handle(Message, NS). 1043 1044log_end(S, [], [], Sync, _Sz) -> 1045 log_end_sync(S, Sync); 1046log_end(#state{cnt = Cnt}=S, Pids, Bins, Sync, Sz) -> 1047 case do_log(get(log), rflat(Bins), Sz) of 1048 N when is_integer(N) -> 1049 ok = replies(Pids, ok), 1050 S1 = (state_ok(S))#state{cnt = Cnt + N}, 1051 log_end_sync(S1, Sync); 1052 {error, {error, {full, _Name}}, N} when Pids =:= [] -> 1053 log_end_sync(state_ok(S#state{cnt = Cnt + N}), Sync); 1054 {error, Error, N} -> 1055 ok = replies(Pids, Error), 1056 state_err(S#state{cnt = Cnt + N}, Error) 1057 end. 1058 1059%% Inlined. 1060log_end_sync(S, []) -> 1061 S; 1062log_end_sync(S, Sync) -> 1063 Res = do_sync(get(log)), 1064 ok = replies(Sync, Res), 1065 state_err(S, Res). 1066 1067%% Inlined. 1068rflat([B]) -> B; 1069rflat(B) -> rflat(B, []). 1070 1071rflat([B | Bs], L) -> 1072 rflat(Bs, B ++ L); 1073rflat([], L) -> L. 1074 1075%% -> {ok, Log} | {error, Error} 1076do_change_notify(L, Pid, Notify) -> 1077 case is_owner(Pid, L) of 1078 {true, Notify} -> 1079 {ok, L}; 1080 {true, _OldNotify} when Notify =/= true, Notify =/= false -> 1081 {error, {badarg, notify}}; 1082 {true, _OldNotify} -> 1083 Owners = lists:keydelete(Pid, 1, L#log.owners), 1084 L1 = L#log{owners = [{Pid, Notify} | Owners]}, 1085 {ok, L1}; 1086 false -> 1087 {error, {not_owner, Pid}} 1088 end. 1089 1090%% -> {stop, S} | {continue, S} 1091do_close(Pid, S) -> 1092 L = get(log), 1093 case is_owner(Pid, L) of 1094 {true, _Notify} -> 1095 close_owner(Pid, L, S); 1096 false -> 1097 close_user(Pid, L, S) 1098 end. 1099 1100%% -> {stop, S} | {continue, S} 1101close_owner(Pid, L, S) -> 1102 L1 = L#log{owners = lists:keydelete(Pid, 1, L#log.owners)}, 1103 put(log, L1), 1104 S2 = do_unblock(Pid, get(log), S), 1105 unlink(Pid), 1106 do_close2(L1, S2). 1107 1108%% -> {stop, S} | {continue, S} 1109close_user(Pid, #log{users=Users}=L, S) when Users > 0 -> 1110 L1 = L#log{users = Users - 1}, 1111 put(log, L1), 1112 S2 = do_unblock(Pid, get(log), S), 1113 do_close2(L1, S2); 1114close_user(_Pid, _L, S) -> 1115 {continue, S}. 1116 1117do_close2(#log{users = 0, owners = []}, S) -> 1118 {stop, S}; 1119do_close2(_L, S) -> 1120 {continue, S}. 1121 1122%%----------------------------------------------------------------- 1123%% Callback functions for system messages handling. 1124%%----------------------------------------------------------------- 1125system_continue(_Parent, _, State) -> 1126 loop(State). 1127 1128-spec system_terminate(_, _, _, #state{}) -> no_return(). 1129system_terminate(Reason, _Parent, _, State) -> 1130 _ = do_stop(State), 1131 exit(Reason). 1132 1133%%----------------------------------------------------------------- 1134%% Temporay code for upgrade. 1135%%----------------------------------------------------------------- 1136system_code_change(State, _Module, _OldVsn, _Extra) -> 1137 {ok, State}. 1138 1139 1140%%%---------------------------------------------------------------------- 1141%%% Internal functions 1142%%%---------------------------------------------------------------------- 1143-spec do_exit(#state{}, pid(), _, _) -> no_return(). 1144do_exit(S, From, Message0, Reason) -> 1145 R = do_stop(S), 1146 Message = case S#state.cache_error of 1147 Err when Err =/= ok -> Err; 1148 _ when R =:= closed -> Message0; 1149 _ when Message0 =:= ok -> R; 1150 _ -> Message0 1151 end, 1152 _ = disk_log_server:close(self()), 1153 ok = replies(From, Message), 1154 ?PROFILE(ep:done()), 1155 exit(Reason). 1156 1157-spec do_fast_exit(#state{}, pid(), _) -> no_return(). 1158do_fast_exit(S, Server, Message) -> 1159 _ = do_stop(S), 1160 Server ! {disk_log, self(), Message}, 1161 exit(normal). 1162 1163%% -> closed | Error 1164do_stop(S) -> 1165 proc_q(S#state.queue ++ S#state.messages), 1166 close_disk_log(get(log)). 1167 1168proc_q([{From, _R}|Tail]) when is_pid(From) -> 1169 From ! {disk_log, self(), {error, disk_log_stopped}}, 1170 proc_q(Tail); 1171proc_q([_|T]) -> %% async stuff 1172 proc_q(T); 1173proc_q([]) -> 1174 ok. 1175 1176%% -> log() 1177opening_pid(Pid, Notify, L) -> 1178 {ok, L1} = add_pid(Pid, Notify, L), 1179 L1. 1180 1181%% -> {ok, log()} | Error 1182add_pid(Pid, Notify, L) when is_pid(Pid) -> 1183 case is_owner(Pid, L) of 1184 false -> 1185 link(Pid), 1186 {ok, L#log{owners = [{Pid, Notify} | L#log.owners]}}; 1187 {true, Notify} -> 1188%% {error, {pid_already_connected, L#log.name}}; 1189 {ok, L}; 1190 {true, CurNotify} when Notify =/= CurNotify -> 1191 {error, {arg_mismatch, notify, CurNotify, Notify}} 1192 end; 1193add_pid(_NotAPid, _Notify, L) -> 1194 {ok, L#log{users = L#log.users + 1}}. 1195 1196unblock_pid(#log{blocked_by = none}) -> 1197 ok; 1198unblock_pid(#log{blocked_by = Pid}=L) -> 1199 case is_owner(Pid, L) of 1200 {true, _Notify} -> 1201 ok; 1202 false -> 1203 unlink(Pid) 1204 end. 1205 1206%% -> true | false 1207is_owner(Pid, L) -> 1208 case lists:keysearch(Pid, 1, L#log.owners) of 1209 {value, {_Pid, Notify}} -> 1210 {true, Notify}; 1211 false -> 1212 false 1213 end. 1214 1215%% ok | throw(Error) 1216rename_file(File, NewFile, halt) -> 1217 case file:rename(File, NewFile) of 1218 ok -> 1219 ok; 1220 Else -> 1221 file_error(NewFile, Else) 1222 end; 1223rename_file(File, NewFile, wrap) -> 1224 rename_file(wrap_file_extensions(File), File, NewFile, ok). 1225 1226rename_file([Ext|Exts], File, NewFile0, Res) -> 1227 NewFile = add_ext(NewFile0, Ext), 1228 NRes = case file:rename(add_ext(File, Ext), NewFile) of 1229 ok -> 1230 Res; 1231 Else -> 1232 file_error(NewFile, Else) 1233 end, 1234 rename_file(Exts, File, NewFile0, NRes); 1235rename_file([], _File, _NewFiles, Res) -> Res. 1236 1237file_error(FileName, {error, Error}) -> 1238 {error, {file_error, FileName, Error}}. 1239 1240%% "Old" error messages have been kept, arg_mismatch has been added. 1241%%-spec compare_arg(dlog_options(), #arg{}, 1242compare_arg([], _A, none, _OrigHead) -> 1243 % no header option given 1244 ok; 1245compare_arg([], _A, Head, OrigHead) when Head =/= OrigHead -> 1246 {error, {arg_mismatch, head, OrigHead, Head}}; 1247compare_arg([], _A, _Head, _OrigHead) -> 1248 ok; 1249compare_arg([{Attr, Val} | Tail], A, Head, OrigHead) -> 1250 case compare_arg(Attr, Val, A) of 1251 {not_ok, OrigVal} -> 1252 {error, {arg_mismatch, Attr, OrigVal, Val}}; 1253 ok -> 1254 compare_arg(Tail, A, Head, OrigHead); 1255 Error -> 1256 Error 1257 end. 1258 1259-spec compare_arg(atom(), _, #arg{}) -> 1260 'ok' | {'not_ok', _} | {'error', {atom(), _}}. 1261compare_arg(file, F, A) when F =/= A#arg.file -> 1262 {error, {name_already_open, A#arg.name}}; 1263compare_arg(mode, read_only, A) when A#arg.mode =:= read_write -> 1264 {error, {open_read_write, A#arg.name}}; 1265compare_arg(mode, read_write, A) when A#arg.mode =:= read_only -> 1266 {error, {open_read_only, A#arg.name}}; 1267compare_arg(type, T, A) when T =/= A#arg.type -> 1268 {not_ok, A#arg.type}; 1269compare_arg(format, F, A) when F =/= A#arg.format -> 1270 {not_ok, A#arg.format}; 1271compare_arg(repair, R, A) when R =/= A#arg.repair -> 1272 %% not used, but check it anyway... 1273 {not_ok, A#arg.repair}; 1274compare_arg(_Attr, _Val, _A) -> 1275 ok. 1276 1277%% -> {ok, Res, log(), Cnt} | Error 1278do_open(A) -> 1279 #arg{type = Type, format = Format, name = Name, head = Head0, 1280 file = FName, repair = Repair, size = Size, mode = Mode, 1281 quiet = Quiet, version = V} = A, 1282 disk_log_1:set_quiet(Quiet), 1283 Head = mk_head(Head0, Format), 1284 case do_open2(Type, Format, Name, FName, Repair, Size, Mode, Head, V) of 1285 {ok, Ret, Extra, FormatType, NoItems} -> 1286 L = #log{name = Name, type = Type, format = Format, 1287 filename = FName, size = Size, 1288 format_type = FormatType, head = Head, mode = Mode, 1289 version = V, extra = Extra}, 1290 {ok, Ret, L, NoItems}; 1291 Error -> 1292 Error 1293 end. 1294 1295mk_head({head, Term}, internal) -> {ok, term_to_binary(Term)}; 1296mk_head({head, Bytes}, external) -> {ok, ensure_binary(Bytes)}; 1297mk_head(H, _) -> H. 1298 1299terms2bins([T | Ts]) -> 1300 [term_to_binary(T) | terms2bins(Ts)]; 1301terms2bins([]) -> 1302 []. 1303 1304ensure_binary_list(Bs) -> 1305 ensure_binary_list(Bs, Bs). 1306 1307ensure_binary_list([B | Bs], Bs0) when is_binary(B) -> 1308 ensure_binary_list(Bs, Bs0); 1309ensure_binary_list([], Bs0) -> 1310 Bs0; 1311ensure_binary_list(_, Bs0) -> 1312 make_binary_list(Bs0). 1313 1314make_binary_list([B | Bs]) -> 1315 [ensure_binary(B) | make_binary_list(Bs)]; 1316make_binary_list([]) -> 1317 []. 1318 1319ensure_binary(Bytes) -> 1320 iolist_to_binary(Bytes). 1321 1322%%----------------------------------------------------------------- 1323%% Change size of the logs in runtime. 1324%%----------------------------------------------------------------- 1325%% -> ok | {big, CurSize} | throw(Error) 1326do_change_size(#log{type = halt}=L, NewSize) -> 1327 Halt = L#log.extra, 1328 CurB = Halt#halt.curB, 1329 NewLog = L#log{extra = Halt#halt{size = NewSize}}, 1330 if 1331 NewSize =:= infinity -> 1332 erase(is_full), 1333 put(log, NewLog), 1334 ok; 1335 CurB =< NewSize -> 1336 erase(is_full), 1337 put(log, NewLog), 1338 ok; 1339 true -> 1340 {big, CurB} 1341 end; 1342do_change_size(#log{type = wrap}=L, NewSize) -> 1343 #log{extra = Extra, version = Version} = L, 1344 {ok, Handle} = disk_log_1:change_size_wrap(Extra, NewSize, Version), 1345 erase(is_full), 1346 put(log, L#log{extra = Handle}), 1347 ok. 1348 1349%% -> {ok, Head} | Error; Head = none | {head, H} | {M,F,A} 1350check_head({head, none}, _Format) -> 1351 {ok, none}; 1352check_head({head_func, {M, F, A}}, _Format) when is_atom(M), 1353 is_atom(F), 1354 is_list(A) -> 1355 {ok, {M, F, A}}; 1356check_head({head, Head}, external) -> 1357 case catch ensure_binary(Head) of 1358 {'EXIT', _} -> 1359 {error, {badarg, head}}; 1360 _ -> 1361 {ok, {head, Head}} 1362 end; 1363check_head({head, Term}, internal) -> 1364 {ok, {head, Term}}; 1365check_head(_Head, _Format) -> 1366 {error, {badarg, head}}. 1367 1368check_size(wrap, {NewMaxB,NewMaxF}) when 1369 is_integer(NewMaxB), is_integer(NewMaxF), 1370 NewMaxB > 0, NewMaxB =< ?MAX_BYTES, NewMaxF > 0, NewMaxF < ?MAX_FILES -> 1371 ok; 1372check_size(halt, NewSize) when is_integer(NewSize), NewSize > 0 -> 1373 ok; 1374check_size(halt, infinity) -> 1375 ok; 1376check_size(_, _) -> 1377 not_ok. 1378 1379%%----------------------------------------------------------------- 1380%% Increment a wrap log. 1381%%----------------------------------------------------------------- 1382%% -> {ok, log(), Lost} | {error, Error, log()} 1383do_inc_wrap_file(L) -> 1384 #log{format = Format, extra = Handle} = L, 1385 case Format of 1386 internal -> 1387 case disk_log_1:mf_int_inc(Handle, L#log.head) of 1388 {ok, Handle2, Lost} -> 1389 {ok, L#log{extra = Handle2}, Lost}; 1390 {error, Error, Handle2} -> 1391 {error, Error, L#log{extra = Handle2}} 1392 end; 1393 external -> 1394 case disk_log_1:mf_ext_inc(Handle, L#log.head) of 1395 {ok, Handle2, Lost} -> 1396 {ok, L#log{extra = Handle2}, Lost}; 1397 {error, Error, Handle2} -> 1398 {error, Error, L#log{extra = Handle2}} 1399 end 1400 end. 1401 1402 1403%%----------------------------------------------------------------- 1404%% Open a log file. 1405%%----------------------------------------------------------------- 1406%% -> {ok, Reply, log(), Cnt} | Error 1407%% Note: the header is always written, even if the log size is too small. 1408do_open2(halt, internal, Name, FName, Repair, Size, Mode, Head, _V) -> 1409 case catch disk_log_1:int_open(FName, Repair, Mode, Head) of 1410 {ok, {_Alloc, FdC, {NoItems, _NoBytes}, FileSize}} -> 1411 Halt = #halt{fdc = FdC, curB = FileSize, size = Size}, 1412 {ok, {ok, Name}, Halt, halt_int, NoItems}; 1413 {repaired, FdC, Rec, Bad, FileSize} -> 1414 Halt = #halt{fdc = FdC, curB = FileSize, size = Size}, 1415 {ok, {repaired, Name, {recovered, Rec}, {badbytes, Bad}}, 1416 Halt, halt_int, Rec}; 1417 Error -> 1418 Error 1419 end; 1420do_open2(wrap, internal, Name, FName, Repair, Size, Mode, Head, V) -> 1421 {MaxB, MaxF} = Size, 1422 case catch 1423 disk_log_1:mf_int_open(FName, MaxB, MaxF, Repair, Mode, Head, V) of 1424 {ok, Handle, Cnt} -> 1425 {ok, {ok, Name}, Handle, wrap_int, Cnt}; 1426 {repaired, Handle, Rec, Bad, Cnt} -> 1427 {ok, {repaired, Name, {recovered, Rec}, {badbytes, Bad}}, 1428 Handle, wrap_int, Cnt}; 1429 Error -> 1430 Error 1431 end; 1432do_open2(halt, external, Name, FName, Repair, Size, Mode, Head, _V) -> 1433 case catch disk_log_1:ext_open(FName, Repair, Mode, Head) of 1434 {ok, {_Alloc, FdC, {NoItems, _NoBytes}, FileSize}} -> 1435 Halt = #halt{fdc = FdC, curB = FileSize, size = Size}, 1436 {ok, {ok, Name}, Halt, halt_ext, NoItems}; 1437 Error -> 1438 Error 1439 end; 1440do_open2(wrap, external, Name, FName, Repair, Size, Mode, Head, V) -> 1441 {MaxB, MaxF} = Size, 1442 case catch 1443 disk_log_1:mf_ext_open(FName, MaxB, MaxF, Repair, Mode, Head, V) of 1444 {ok, Handle, Cnt} -> 1445 {ok, {ok, Name}, Handle, wrap_ext, Cnt}; 1446 Error -> 1447 Error 1448 end. 1449 1450%% -> closed | Error 1451close_disk_log(undefined) -> 1452 closed; 1453close_disk_log(L) -> 1454 unblock_pid(L), 1455 F = fun({Pid, _}) -> 1456 unlink(Pid) 1457 end, 1458 lists:foreach(F, L#log.owners), 1459 R = (catch close_disk_log2(L)), 1460 erase(log), 1461 R. 1462 1463-spec close_disk_log2(#log{}) -> 'closed'. % | throw(Error) 1464 1465close_disk_log2(L) -> 1466 case L of 1467 #log{format_type = halt_int, mode = Mode, extra = Halt} -> 1468 disk_log_1:close(Halt#halt.fdc, L#log.filename, Mode); 1469 #log{format_type = wrap_int, mode = Mode, extra = Handle} -> 1470 disk_log_1:mf_int_close(Handle, Mode); 1471 #log{format_type = halt_ext, extra = Halt} -> 1472 disk_log_1:fclose(Halt#halt.fdc, L#log.filename); 1473 #log{format_type = wrap_ext, mode = Mode, extra = Handle} -> 1474 disk_log_1:mf_ext_close(Handle, Mode) 1475 end, 1476 closed. 1477 1478do_format_error({error, Module, Error}) -> 1479 Module:format_error(Error); 1480do_format_error({error, Reason}) -> 1481 do_format_error(Reason); 1482do_format_error({Node, Error = {error, _Reason}}) -> 1483 lists:append(io_lib:format("~p: ", [Node]), do_format_error(Error)); 1484do_format_error({badarg, Arg}) -> 1485 io_lib:format("The argument ~p is missing, not recognized or " 1486 "not wellformed~n", [Arg]); 1487do_format_error({size_mismatch, OldSize, ArgSize}) -> 1488 io_lib:format("The given size ~p does not match the size ~p found on " 1489 "the disk log size file~n", [ArgSize, OldSize]); 1490do_format_error({read_only_mode, Log}) -> 1491 io_lib:format("The disk log ~tp has been opened read-only, but the " 1492 "requested operation needs read-write access~n", [Log]); 1493do_format_error({format_external, Log}) -> 1494 io_lib:format("The requested operation can only be applied on internally " 1495 "formatted disk logs, but ~tp is externally formatted~n", 1496 [Log]); 1497do_format_error({blocked_log, Log}) -> 1498 io_lib:format("The blocked disk log ~tp does not queue requests, or " 1499 "the log has been blocked by the calling process~n", [Log]); 1500do_format_error({full, Log}) -> 1501 io_lib:format("The halt log ~tp is full~n", [Log]); 1502do_format_error({not_blocked, Log}) -> 1503 io_lib:format("The disk log ~tp is not blocked~n", [Log]); 1504do_format_error({not_owner, Pid}) -> 1505 io_lib:format("The pid ~tp is not an owner of the disk log~n", [Pid]); 1506do_format_error({not_blocked_by_pid, Log}) -> 1507 io_lib:format("The disk log ~tp is blocked, but only the blocking pid " 1508 "can unblock a disk log~n", [Log]); 1509do_format_error({new_size_too_small, Log, CurrentSize}) -> 1510 io_lib:format("The current size ~p of the halt log ~tp is greater than the " 1511 "requested new size~n", [CurrentSize, Log]); 1512do_format_error({halt_log, Log}) -> 1513 io_lib:format("The halt log ~tp cannot be wrapped~n", [Log]); 1514do_format_error({same_file_name, Log}) -> 1515 io_lib:format("Current and new file name of the disk log ~tp " 1516 "are the same~n", [Log]); 1517do_format_error({arg_mismatch, Option, FirstValue, ArgValue}) -> 1518 io_lib:format("The value ~tp of the disk log option ~p does not match " 1519 "the current value ~tp~n", [ArgValue, Option, FirstValue]); 1520do_format_error({name_already_open, Log}) -> 1521 io_lib:format("The disk log ~tp has already opened another file~n", [Log]); 1522do_format_error({node_already_open, Log}) -> 1523 io_lib:format("The distribution option of the disk log ~tp does not match " 1524 "already open log~n", [Log]); 1525do_format_error({open_read_write, Log}) -> 1526 io_lib:format("The disk log ~tp has already been opened read-write~n", 1527 [Log]); 1528do_format_error({open_read_only, Log}) -> 1529 io_lib:format("The disk log ~tp has already been opened read-only~n", 1530 [Log]); 1531do_format_error({not_internal_wrap, Log}) -> 1532 io_lib:format("The requested operation cannot be applied since ~tp is not " 1533 "an internally formatted disk log~n", [Log]); 1534do_format_error(no_such_log) -> 1535 io_lib:format("There is no disk log with the given name~n", []); 1536do_format_error(nonode) -> 1537 io_lib:format("There seems to be no node up that can handle " 1538 "the request~n", []); 1539do_format_error(nodedown) -> 1540 io_lib:format("There seems to be no node up that can handle " 1541 "the request~n", []); 1542do_format_error({corrupt_log_file, FileName}) -> 1543 io_lib:format("The disk log file \"~ts\" contains corrupt data~n", 1544 [FileName]); 1545do_format_error({need_repair, FileName}) -> 1546 io_lib:format("The disk log file \"~ts\" has not been closed properly and " 1547 "needs repair~n", [FileName]); 1548do_format_error({not_a_log_file, FileName}) -> 1549 io_lib:format("The file \"~ts\" is not a wrap log file~n", [FileName]); 1550do_format_error({invalid_header, InvalidHeader}) -> 1551 io_lib:format("The disk log header is not wellformed: ~p~n", 1552 [InvalidHeader]); 1553do_format_error(end_of_log) -> 1554 io_lib:format("An attempt was made to step outside a not yet " 1555 "full wrap log~n", []); 1556do_format_error({invalid_index_file, FileName}) -> 1557 io_lib:format("The wrap log index file \"~ts\" cannot be used~n", 1558 [FileName]); 1559do_format_error({no_continuation, BadCont}) -> 1560 io_lib:format("The term ~p is not a chunk continuation~n", [BadCont]); 1561do_format_error({file_error, FileName, Reason}) -> 1562 io_lib:format("\"~ts\": ~tp~n", [FileName, file:format_error(Reason)]); 1563do_format_error(E) -> 1564 io_lib:format("~tp~n", [E]). 1565 1566do_info(L, Cnt) -> 1567 #log{name = Name, type = Type, mode = Mode, filename = File, 1568 extra = Extra, status = Status, owners = Owners, users = Users, 1569 format = Format, head = Head} = L, 1570 Size = case Type of 1571 wrap -> 1572 disk_log_1:get_wrap_size(Extra); 1573 halt -> 1574 Extra#halt.size 1575 end, 1576 Distribution = 1577 case disk_log_server:get_log_pids(Name) of 1578 {local, _Pid} -> 1579 local; 1580 {distributed, Pids} -> 1581 [node(P) || P <- Pids]; 1582 undefined -> % "cannot happen" 1583 [] 1584 end, 1585 RW = case Type of 1586 wrap when Mode =:= read_write -> 1587 #handle{curB = CurB, curF = CurF, 1588 cur_cnt = CurCnt, acc_cnt = AccCnt, 1589 noFull = NoFull, accFull = AccFull} = Extra, 1590 NewAccFull = AccFull + NoFull, 1591 NewExtra = Extra#handle{noFull = 0, accFull = NewAccFull}, 1592 put(log, L#log{extra = NewExtra}), 1593 [{no_current_bytes, CurB}, 1594 {no_current_items, CurCnt}, 1595 {no_items, Cnt}, 1596 {no_written_items, CurCnt + AccCnt}, 1597 {current_file, CurF}, 1598 {no_overflows, {NewAccFull, NoFull}} 1599 ]; 1600 halt when Mode =:= read_write -> 1601 IsFull = case get(is_full) of 1602 undefined -> false; 1603 _ -> true 1604 end, 1605 [{full, IsFull}, 1606 {no_written_items, Cnt} 1607 ]; 1608 _ when Mode =:= read_only -> 1609 [] 1610 end, 1611 HeadL = case Mode of 1612 read_write -> 1613 [{head, Head}]; 1614 read_only -> 1615 [] 1616 end, 1617 Common = [{name, Name}, 1618 {file, File}, 1619 {type, Type}, 1620 {format, Format}, 1621 {size, Size}, 1622 {items, Cnt}, % kept for "backward compatibility" (undocumented) 1623 {owners, Owners}, 1624 {users, Users}] ++ 1625 HeadL ++ 1626 [{mode, Mode}, 1627 {status, Status}, 1628 {node, node()}, 1629 {distributed, Distribution} 1630 ], 1631 Common ++ RW. 1632 1633do_block(Pid, QueueLogRecs, L) -> 1634 L2 = L#log{status = {blocked, QueueLogRecs}, blocked_by = Pid}, 1635 put(log, L2), 1636 case is_owner(Pid, L2) of 1637 {true, _Notify} -> 1638 ok; 1639 false -> 1640 link(Pid) 1641 end. 1642 1643do_unblock(Pid, #log{blocked_by = Pid}=L, S) -> 1644 do_unblock(L, S); 1645do_unblock(_Pid, _L, S) -> 1646 S. 1647 1648do_unblock(L, S) -> 1649 unblock_pid(L), 1650 L2 = L#log{blocked_by = none, status = ok}, 1651 put(log, L2), 1652 %% Since the block request is synchronous, and the blocking 1653 %% process is the only process that can unblock, all requests in 1654 %% 'messages' will have been put in 'queue' before the unblock 1655 %% request is granted. 1656 [] = S#state.messages, % assertion 1657 S#state{queue = [], messages = lists:reverse(S#state.queue)}. 1658 1659-spec do_log(#log{}, [binary()]) -> integer() | {'error', _, integer()}. 1660 1661do_log(L, B) -> 1662 do_log(L, B, iolist_size(B)). 1663 1664do_log(#log{type = halt}=L, B, BSz) -> 1665 #log{format = Format, extra = Halt} = L, 1666 #halt{curB = CurSize, size = Sz} = Halt, 1667 {Bs, BSize} = logl(B, Format, BSz), 1668 case get(is_full) of 1669 true -> 1670 {error, {error, {full, L#log.name}}, 0}; 1671 undefined when Sz =:= infinity; CurSize + BSize =< Sz -> 1672 halt_write(Halt, L, B, Bs, BSize); 1673 undefined -> 1674 halt_write_full(L, B, Format, 0) 1675 end; 1676do_log(#log{format_type = wrap_int}=L, B, _BSz) -> 1677 case disk_log_1:mf_int_log(L#log.extra, B, L#log.head) of 1678 {ok, Handle, Logged, Lost, Wraps} -> 1679 notify_owners_wrap(Wraps), 1680 put(log, L#log{extra = Handle}), 1681 Logged - Lost; 1682 {ok, Handle, Logged} -> 1683 put(log, L#log{extra = Handle}), 1684 Logged; 1685 {error, Error, Handle, Logged, Lost} -> 1686 put(log, L#log{extra = Handle}), 1687 {error, Error, Logged - Lost} 1688 end; 1689do_log(#log{format_type = wrap_ext}=L, B, _BSz) -> 1690 case disk_log_1:mf_ext_log(L#log.extra, B, L#log.head) of 1691 {ok, Handle, Logged, Lost, Wraps} -> 1692 notify_owners_wrap(Wraps), 1693 put(log, L#log{extra = Handle}), 1694 Logged - Lost; 1695 {ok, Handle, Logged} -> 1696 put(log, L#log{extra = Handle}), 1697 Logged; 1698 {error, Error, Handle, Logged, Lost} -> 1699 put(log, L#log{extra = Handle}), 1700 {error, Error, Logged - Lost} 1701 end. 1702 1703logl(B, external, undefined) -> 1704 {B, iolist_size(B)}; 1705logl(B, external, Sz) -> 1706 {B, Sz}; 1707logl(B, internal, _Sz) -> 1708 disk_log_1:logl(B). 1709 1710halt_write_full(L, [Bin | Bins], Format, N) -> 1711 B = [Bin], 1712 {Bs, BSize} = logl(B, Format, undefined), 1713 Halt = L#log.extra, 1714 #halt{curB = CurSize, size = Sz} = Halt, 1715 if 1716 CurSize + BSize =< Sz -> 1717 case halt_write(Halt, L, B, Bs, BSize) of 1718 N1 when is_integer(N1) -> 1719 halt_write_full(get(log), Bins, Format, N+N1); 1720 Error -> 1721 Error 1722 end; 1723 true -> 1724 halt_write_full(L, [], Format, N) 1725 end; 1726halt_write_full(L, _Bs, _Format, N) -> 1727 put(is_full, true), 1728 notify_owners(full), 1729 {error, {error, {full, L#log.name}}, N}. 1730 1731halt_write(Halt, L, B, Bs, BSize) -> 1732 case disk_log_1:fwrite(Halt#halt.fdc, L#log.filename, Bs, BSize) of 1733 {ok, NewFdC} -> 1734 NCurB = Halt#halt.curB + BSize, 1735 NewHalt = Halt#halt{fdc = NewFdC, curB = NCurB}, 1736 put(log, L#log{extra = NewHalt}), 1737 length(B); 1738 {Error, NewFdC} -> 1739 put(log, L#log{extra = Halt#halt{fdc = NewFdC}}), 1740 {error, Error, 0} 1741 end. 1742 1743%% -> ok | Error 1744do_write_cache(#log{filename = FName, type = halt, extra = Halt} = Log) -> 1745 {Reply, NewFdC} = disk_log_1:write_cache(Halt#halt.fdc, FName), 1746 put(log, Log#log{extra = Halt#halt{fdc = NewFdC}}), 1747 Reply; 1748do_write_cache(#log{type = wrap, extra = Handle} = Log) -> 1749 {Reply, NewHandle} = disk_log_1:mf_write_cache(Handle), 1750 put(log, Log#log{extra = NewHandle}), 1751 Reply. 1752 1753%% -> ok | Error 1754do_sync(#log{filename = FName, type = halt, extra = Halt} = Log) -> 1755 {Reply, NewFdC} = disk_log_1:sync(Halt#halt.fdc, FName), 1756 put(log, Log#log{extra = Halt#halt{fdc = NewFdC}}), 1757 Reply; 1758do_sync(#log{type = wrap, extra = Handle} = Log) -> 1759 {Reply, NewHandle} = disk_log_1:mf_sync(Handle), 1760 put(log, Log#log{extra = NewHandle}), 1761 Reply. 1762 1763%% -> ok | Error | throw(Error) 1764do_trunc(#log{type = halt}=L, Head) -> 1765 #log{filename = FName, extra = Halt} = L, 1766 FdC = Halt#halt.fdc, 1767 {Reply1, FdC2} = 1768 case L#log.format of 1769 internal -> 1770 disk_log_1:truncate(FdC, FName, Head); 1771 external -> 1772 case disk_log_1:truncate_at(FdC, FName, bof) of 1773 {ok, NFdC} when Head =:= none -> 1774 {ok, NFdC}; 1775 {ok, NFdC} -> 1776 {ok, H} = Head, 1777 disk_log_1:fwrite(NFdC, FName, H, byte_size(H)); 1778 R -> 1779 R 1780 end 1781 end, 1782 {Reply, NewHalt} = 1783 case disk_log_1:position(FdC2, FName, cur) of 1784 {ok, NewFdC, FileSize} when Reply1 =:= ok -> 1785 {ok, Halt#halt{fdc = NewFdC, curB = FileSize}}; 1786 {Reply2, NewFdC} -> 1787 {Reply2, Halt#halt{fdc = NewFdC}}; 1788 {ok, NewFdC, _} -> 1789 {Reply1, Halt#halt{fdc = NewFdC}} 1790 end, 1791 put(log, L#log{extra = NewHalt}), 1792 Reply; 1793do_trunc(#log{type = wrap}=L, Head) -> 1794 Handle = L#log.extra, 1795 OldHead = L#log.head, 1796 {MaxB, MaxF} = disk_log_1:get_wrap_size(Handle), 1797 ok = do_change_size(L, {MaxB, 1}), 1798 NewLog = trunc_wrap((get(log))#log{head = Head}), 1799 %% Just to remove all files with suffix > 1: 1800 NewLog2 = trunc_wrap(NewLog), 1801 NewHandle = (NewLog2#log.extra)#handle{noFull = 0, accFull = 0}, 1802 do_change_size(NewLog2#log{extra = NewHandle, head = OldHead}, 1803 {MaxB, MaxF}). 1804 1805trunc_wrap(L) -> 1806 case do_inc_wrap_file(L) of 1807 {ok, L2, _Lost} -> 1808 L2; 1809 {error, Error, _L2} -> 1810 throw(Error) 1811 end. 1812 1813do_chunk(#log{format_type = halt_int, extra = Halt} = L, Pos, B, N) -> 1814 FdC = Halt#halt.fdc, 1815 {NewFdC, Reply} = 1816 case L#log.mode of 1817 read_only -> 1818 disk_log_1:chunk_read_only(FdC, L#log.filename, Pos, B, N); 1819 read_write -> 1820 disk_log_1:chunk(FdC, L#log.filename, Pos, B, N) 1821 end, 1822 put(log, L#log{extra = Halt#halt{fdc = NewFdC}}), 1823 Reply; 1824do_chunk(#log{format_type = wrap_int, mode = read_only, 1825 extra = Handle} = Log, Pos, B, N) -> 1826 {NewHandle, Reply} = disk_log_1:mf_int_chunk_read_only(Handle, Pos, B, N), 1827 put(log, Log#log{extra = NewHandle}), 1828 Reply; 1829do_chunk(#log{format_type = wrap_int, extra = Handle} = Log, Pos, B, N) -> 1830 {NewHandle, Reply} = disk_log_1:mf_int_chunk(Handle, Pos, B, N), 1831 put(log, Log#log{extra = NewHandle}), 1832 Reply; 1833do_chunk(Log, _Pos, _B, _) -> 1834 {error, {format_external, Log#log.name}}. 1835 1836do_chunk_step(#log{format_type = wrap_int, extra = Handle}, Pos, N) -> 1837 disk_log_1:mf_int_chunk_step(Handle, Pos, N); 1838do_chunk_step(Log, _Pos, _N) -> 1839 {error, {not_internal_wrap, Log#log.name}}. 1840 1841%% Inlined. 1842replies(Pids, Reply) -> 1843 M = {disk_log, self(), Reply}, 1844 send_reply(Pids, M). 1845 1846send_reply(Pid, M) when is_pid(Pid) -> 1847 Pid ! M, 1848 ok; 1849send_reply([Pid | Pids], M) -> 1850 Pid ! M, 1851 send_reply(Pids, M); 1852send_reply([], _M) -> 1853 ok. 1854 1855reply(To, Reply, S) -> 1856 To ! {disk_log, self(), Reply}, 1857 loop(S). 1858 1859req(Log, R) -> 1860 case disk_log_server:get_log_pids(Log) of 1861 {local, Pid} -> 1862 monitor_request(Pid, R); 1863 undefined -> 1864 {error, no_such_log}; 1865 {distributed, Pids} -> 1866 multi_req({self(), R}, Pids) 1867 end. 1868 1869multi_req(Msg, Pids) -> 1870 Refs = 1871 lists:map(fun(Pid) -> 1872 Ref = erlang:monitor(process, Pid), 1873 Pid ! Msg, 1874 {Pid, Ref} 1875 end, Pids), 1876 lists:foldl(fun({Pid, Ref}, Reply) -> 1877 receive 1878 {'DOWN', Ref, process, Pid, _Info} -> 1879 Reply; 1880 {disk_log, Pid, _Reply} -> 1881 erlang:demonitor(Ref, [flush]), 1882 ok 1883 end 1884 end, {error, nonode}, Refs). 1885 1886sreq(Log, R) -> 1887 case nearby_pid(Log, node()) of 1888 undefined -> 1889 {error, no_such_log}; 1890 Pid -> 1891 monitor_request(Pid, R) 1892 end. 1893 1894%% Local req - always talk to log on Node 1895lreq(Log, R, Node) -> 1896 case nearby_pid(Log, Node) of 1897 Pid when is_pid(Pid), node(Pid) =:= Node -> 1898 monitor_request(Pid, R); 1899 _Else -> 1900 {error, no_such_log} 1901 end. 1902 1903nearby_pid(Log, Node) -> 1904 case disk_log_server:get_log_pids(Log) of 1905 undefined -> 1906 undefined; 1907 {local, Pid} -> 1908 Pid; 1909 {distributed, Pids} -> 1910 get_near_pid(Pids, Node) 1911 end. 1912 1913-spec get_near_pid([pid(),...], node()) -> pid(). 1914 1915get_near_pid([Pid | _], Node) when node(Pid) =:= Node -> Pid; 1916get_near_pid([Pid], _ ) -> Pid; 1917get_near_pid([_ | T], Node) -> get_near_pid(T, Node). 1918 1919monitor_request(Pid, Req) -> 1920 Ref = erlang:monitor(process, Pid), 1921 Pid ! {self(), Req}, 1922 receive 1923 {'DOWN', Ref, process, Pid, _Info} -> 1924 {error, no_such_log}; 1925 {disk_log, Pid, Reply} when not is_tuple(Reply) orelse 1926 element(2, Reply) =/= disk_log_stopped -> 1927 erlang:demonitor(Ref, [flush]), 1928 Reply 1929 end. 1930 1931req2(Pid, R) -> 1932 monitor_request(Pid, R). 1933 1934merge_head(none, Head) -> 1935 Head; 1936merge_head(Head, _) -> 1937 Head. 1938 1939%% -> List of extensions of existing files (no dot included) | throw(FileError) 1940wrap_file_extensions(File) -> 1941 {_CurF, _CurFSz, _TotSz, NoOfFiles} = 1942 disk_log_1:read_index_file(File), 1943 Fs = if 1944 NoOfFiles >= 1 -> 1945 lists:seq(1, NoOfFiles); 1946 NoOfFiles =:= 0 -> 1947 [] 1948 end, 1949 Fun = fun(Ext) -> 1950 case file:read_file_info(add_ext(File, Ext)) of 1951 {ok, _} -> 1952 true; 1953 _Else -> 1954 false 1955 end 1956 end, 1957 lists:filter(Fun, ["idx", "siz" | Fs]). 1958 1959add_ext(File, Ext) -> 1960 lists:concat([File, ".", Ext]). 1961 1962notify(Log, R) -> 1963 case disk_log_server:get_log_pids(Log) of 1964 undefined -> 1965 {error, no_such_log}; 1966 {local, Pid} -> 1967 Pid ! R, 1968 ok; 1969 {distributed, Pids} -> 1970 lists:foreach(fun(Pid) -> Pid ! R end, Pids), 1971 ok 1972 end. 1973 1974notify_owners_wrap([]) -> 1975 ok; 1976notify_owners_wrap([N | Wraps]) -> 1977 notify_owners({wrap, N}), 1978 notify_owners_wrap(Wraps). 1979 1980notify_owners(Note) -> 1981 L = get(log), 1982 Msg = {disk_log, node(), L#log.name, Note}, 1983 lists:foreach(fun({Pid, true}) -> Pid ! Msg; 1984 (_) -> ok 1985 end, L#log.owners). 1986 1987cache_error(#state{cache_error=Error}=S, Pids) -> 1988 ok = replies(Pids, Error), 1989 state_err(S#state{cache_error = ok}, Error). 1990 1991state_ok(S) -> 1992 state_err(S, ok). 1993 1994-spec state_err(#state{}, dlog_state_error()) -> #state{}. 1995 1996state_err(S, Err) when S#state.error_status =:= Err -> S; 1997state_err(S, Err) -> 1998 notify_owners({error_status, Err}), 1999 S#state{error_status = Err}. 2000