1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 2017-2021. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20-module(raw_file_io_delayed). 21 22-behavior(gen_statem). 23 24-export([close/1, sync/1, datasync/1, truncate/1, advise/4, allocate/3, 25 position/2, write/2, pwrite/2, pwrite/3, 26 read_line/1, read/2, pread/2, pread/3, 27 read_handle_info/2]). 28 29%% OTP internal. 30-export([ipread_s32bu_p32bu/3, sendfile/8, internal_get_nif_resource/1]). 31 32-export([open_layer/3]). 33 34-export([init/1, callback_mode/0, terminate/3]). 35-export([opening/3, opened/3]). 36 37-include("file_int.hrl"). 38 39open_layer(Filename, Modes, Options) -> 40 Secret = make_ref(), 41 case gen_statem:start(?MODULE, {self(), Secret, Options}, []) of 42 {ok, Pid} -> 43 gen_statem:call(Pid, {'$open', Secret, Filename, Modes}, infinity); 44 Other -> 45 Other 46 end. 47 48callback_mode() -> state_functions. 49 50init({Owner, Secret, Options}) -> 51 Monitor = monitor(process, Owner), 52 Defaults = 53 #{ owner => Owner, 54 monitor => Monitor, 55 secret => Secret, 56 timer => none, 57 pid => self(), 58 buffer => prim_buffer:new(), 59 delay_size => 64 bsl 10, 60 delay_time => 2000 }, 61 Data = fill_delay_values(Defaults, Options), 62 {ok, opening, Data}. 63 64fill_delay_values(Data, []) -> 65 Data; 66fill_delay_values(Data, [{delayed_write, Size, Time} | Options]) -> 67 fill_delay_values(Data#{ delay_size => Size, delay_time => Time }, Options); 68fill_delay_values(Data, [_ | Options]) -> 69 fill_delay_values(Data, Options). 70 71opening({call, From}, {'$open', Secret, Filename, Modes}, #{ secret := Secret } = Data) -> 72 case raw_file_io:open(Filename, Modes) of 73 {ok, PrivateFd} -> 74 PublicData = maps:with([owner, buffer, delay_size, pid], Data), 75 PublicFd = #file_descriptor{ module = ?MODULE, data = PublicData }, 76 77 NewData = Data#{ handle => PrivateFd }, 78 Response = {ok, PublicFd}, 79 {next_state, opened, NewData, [{reply, From, Response}]}; 80 Other -> 81 {stop_and_reply, normal, [{reply, From, Other}]} 82 end; 83opening(_Event, _Contents, _Data) -> 84 {keep_state_and_data, [postpone]}. 85 86%% 87 88opened(info, {'$timed_out', Secret}, #{ secret := Secret } = Data) -> 89 %% If the user writes something at this exact moment, the flush will fail 90 %% and the timer won't reset on the next write since the buffer won't be 91 %% empty (Unless we collided on a flush). We therefore reset the timeout to 92 %% ensure that data won't sit idle for extended periods of time. 93 case try_flush_write_buffer(Data) of 94 busy -> gen_statem:cast(self(), '$reset_timeout'); 95 ok -> ok 96 end, 97 {keep_state, Data#{ timer => none }, []}; 98 99opened(info, {'DOWN', Monitor, process, _Owner, Reason}, #{ monitor := Monitor } = Data) -> 100 if 101 Reason =/= kill -> try_flush_write_buffer(Data); 102 Reason =:= kill -> ignored 103 end, 104 {stop, shutdown}; 105 106opened(info, _Message, _Data) -> 107 keep_state_and_data; 108 109opened({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) -> 110 case flush_write_buffer(Data) of 111 ok -> 112 #{ handle := PrivateFd } = Data, 113 Response = ?CALL_FD(PrivateFd, close, []), 114 {stop_and_reply, normal, [{reply, From, Response}]}; 115 Other -> 116 {stop_and_reply, normal, [{reply, From, Other}]} 117 end; 118 119opened({call, {Owner, _Tag} = From}, '$wait', #{ owner := Owner }) -> 120 %% Used in write/2 to synchronize writes on lock conflicts. 121 {keep_state_and_data, [{reply, From, ok}]}; 122 123opened({call, {Owner, _Tag} = From}, '$synchronous_flush', #{ owner := Owner } = Data) -> 124 cancel_flush_timeout(Data), 125 Response = flush_write_buffer(Data), 126 {keep_state_and_data, [{reply, From, Response}]}; 127 128opened({call, {Owner, _Tag} = From}, Command, #{ owner := Owner } = Data) -> 129 Response = 130 case flush_write_buffer(Data) of 131 ok -> dispatch_command(Data, Command); 132 Other -> Other 133 end, 134 {keep_state_and_data, [{reply, From, Response}]}; 135 136opened({call, _From}, _Command, _Data) -> 137 %% The client functions filter this out, so we'll crash if the user does 138 %% anything stupid on purpose. 139 {shutdown, protocol_violation}; 140 141opened(cast, '$reset_timeout', #{ delay_time := Timeout, secret := Secret } = Data) -> 142 cancel_flush_timeout(Data), 143 Timer = erlang:send_after(Timeout, self(), {'$timed_out', Secret}), 144 {keep_state, Data#{ timer => Timer }, []}; 145 146opened(cast, _Message, _Data) -> 147 {keep_state_and_data, []}. 148 149dispatch_command(Data, [Function | Args]) -> 150 #{ handle := Handle } = Data, 151 Module = Handle#file_descriptor.module, 152 apply(Module, Function, [Handle | Args]). 153 154cancel_flush_timeout(#{ timer := none }) -> 155 ok; 156cancel_flush_timeout(#{ timer := Timer }) -> 157 _ = erlang:cancel_timer(Timer, [{async, true}]), 158 ok. 159 160try_flush_write_buffer(#{ buffer := Buffer, handle := PrivateFd }) -> 161 case prim_buffer:try_lock(Buffer) of 162 acquired -> 163 flush_write_buffer_1(Buffer, PrivateFd), 164 prim_buffer:unlock(Buffer), 165 ok; 166 busy -> 167 busy 168 end. 169 170%% This is only safe to use when there is no chance of conflict with the owner 171%% process, or in other words, "during synchronous calls outside of the locked 172%% section of write/2" 173flush_write_buffer(#{ buffer := Buffer, handle := PrivateFd }) -> 174 acquired = prim_buffer:try_lock(Buffer), 175 Result = flush_write_buffer_1(Buffer, PrivateFd), 176 prim_buffer:unlock(Buffer), 177 Result. 178 179flush_write_buffer_1(Buffer, PrivateFd) -> 180 case prim_buffer:size(Buffer) of 181 Size when Size > 0 -> 182 ?CALL_FD(PrivateFd, write, [prim_buffer:read_iovec(Buffer, Size)]); 183 0 -> 184 ok 185 end. 186 187terminate(_Reason, _State, _Data) -> 188 ok. 189 190%% Client functions 191 192write(Fd, IOData) -> 193 try 194 enqueue_write(Fd, erlang:iolist_to_iovec(IOData)) 195 catch 196 error:badarg -> {error, badarg} 197 end. 198enqueue_write(_Fd, []) -> 199 ok; 200enqueue_write(Fd, IOVec) -> 201 %% get_fd_data will reject everyone except the process that opened the Fd, 202 %% so we can't race with anyone except the wrapper process. 203 #{ delay_size := DelaySize, 204 buffer := Buffer, 205 pid := Pid } = get_fd_data(Fd), 206 case prim_buffer:try_lock(Buffer) of 207 acquired -> 208 %% (The wrapper process will exit without flushing if we're killed 209 %% while holding the lock). 210 enqueue_write_locked(Pid, Buffer, DelaySize, IOVec); 211 busy -> 212 %% This can only happen while we're processing a timeout in the 213 %% wrapper process, so we perform a bogus call to get a completion 214 %% notification before trying again. 215 gen_statem:call(Pid, '$wait'), 216 enqueue_write(Fd, IOVec) 217 end. 218enqueue_write_locked(Pid, Buffer, DelaySize, IOVec) -> 219 %% The synchronous operations (write, forced flush) are safe since we're 220 %% running on the only process that can fill the buffer; a timeout being 221 %% processed just before $synchronous_flush will cause the flush to nop, 222 %% and a timeout sneaking in just before a synchronous write won't do 223 %% anything since the buffer is guaranteed to be empty at that point. 224 BufSize = prim_buffer:size(Buffer), 225 case is_iovec_smaller_than(IOVec, DelaySize - BufSize) of 226 true when BufSize > 0 -> 227 prim_buffer:write(Buffer, IOVec), 228 prim_buffer:unlock(Buffer); 229 true -> 230 prim_buffer:write(Buffer, IOVec), 231 prim_buffer:unlock(Buffer), 232 gen_statem:cast(Pid, '$reset_timeout'); 233 false when BufSize > 0 -> 234 prim_buffer:write(Buffer, IOVec), 235 prim_buffer:unlock(Buffer), 236 gen_statem:call(Pid, '$synchronous_flush'); 237 false -> 238 prim_buffer:unlock(Buffer), 239 gen_statem:call(Pid, [write, IOVec]) 240 end. 241 242%% iolist_size/1 will always look through the entire list to get a precise 243%% amount, which is pretty inefficient since we only need to know whether we've 244%% hit the buffer threshold or not. 245%% 246%% We only handle the binary case since write/2 forcibly translates input to 247%% erlang:iovec(). 248is_iovec_smaller_than(IOVec, Max) -> 249 is_iovec_smaller_than_1(IOVec, Max, 0). 250is_iovec_smaller_than_1(_IOVec, Max, Acc) when Acc >= Max -> 251 false; 252is_iovec_smaller_than_1([], _Max, _Acc) -> 253 true; 254is_iovec_smaller_than_1([Binary | Rest], Max, Acc) when is_binary(Binary) -> 255 is_iovec_smaller_than_1(Rest, Max, Acc + byte_size(Binary)). 256 257close(Fd) -> 258 wrap_call(Fd, [close]). 259 260sync(Fd) -> 261 wrap_call(Fd, [sync]). 262datasync(Fd) -> 263 wrap_call(Fd, [datasync]). 264 265truncate(Fd) -> 266 wrap_call(Fd, [truncate]). 267 268advise(Fd, Offset, Length, Advise) -> 269 wrap_call(Fd, [advise, Offset, Length, Advise]). 270allocate(Fd, Offset, Length) -> 271 wrap_call(Fd, [allocate, Offset, Length]). 272 273position(Fd, Mark) -> 274 wrap_call(Fd, [position, Mark]). 275 276pwrite(Fd, Offset, IOData) -> 277 try 278 CompactedData = erlang:iolist_to_iovec(IOData), 279 wrap_call(Fd, [pwrite, Offset, CompactedData]) 280 catch 281 error:badarg -> {error, badarg} 282 end. 283pwrite(Fd, LocBytes) -> 284 try 285 CompactedLocBytes = 286 [ {Offset, erlang:iolist_to_iovec(IOData)} || 287 {Offset, IOData} <- LocBytes ], 288 wrap_call(Fd, [pwrite, CompactedLocBytes]) 289 catch 290 error:badarg -> {error, badarg} 291 end. 292 293read_line(Fd) -> 294 wrap_call(Fd, [read_line]). 295read(Fd, Size) -> 296 wrap_call(Fd, [read, Size]). 297pread(Fd, Offset, Size) -> 298 wrap_call(Fd, [pread, Offset, Size]). 299pread(Fd, LocNums) -> 300 wrap_call(Fd, [pread, LocNums]). 301 302ipread_s32bu_p32bu(Fd, Offset, MaxSize) -> 303 wrap_call(Fd, [ipread_s32bu_p32bu, Offset, MaxSize]). 304 305sendfile(_,_,_,_,_,_,_,_) -> 306 {error, enotsup}. 307 308internal_get_nif_resource(_) -> 309 {error, enotsup}. 310 311read_handle_info(Fd, Opts) -> 312 wrap_call(Fd, [Opts]). 313 314wrap_call(Fd, Command) -> 315 #{ pid := Pid } = get_fd_data(Fd), 316 try gen_statem:call(Pid, Command, infinity) of 317 Result -> Result 318 catch 319 exit:{normal, _StackTrace} -> {error, einval}; 320 exit:{noproc, _StackTrace} -> {error, einval} 321 end. 322 323get_fd_data(#file_descriptor{ data = Data }) -> 324 #{ owner := Owner } = Data, 325 case self() of 326 Owner -> Data; 327 _ -> error(not_on_controlling_process) 328 end. 329