1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2017-2021. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20-module(raw_file_io_delayed).
21
22-behavior(gen_statem).
23
24-export([close/1, sync/1, datasync/1, truncate/1, advise/4, allocate/3,
25         position/2, write/2, pwrite/2, pwrite/3,
26         read_line/1, read/2, pread/2, pread/3,
27         read_handle_info/2]).
28
29%% OTP internal.
30-export([ipread_s32bu_p32bu/3, sendfile/8, internal_get_nif_resource/1]).
31
32-export([open_layer/3]).
33
34-export([init/1, callback_mode/0, terminate/3]).
35-export([opening/3, opened/3]).
36
37-include("file_int.hrl").
38
39open_layer(Filename, Modes, Options) ->
40    Secret = make_ref(),
41    case gen_statem:start(?MODULE, {self(), Secret, Options}, []) of
42        {ok, Pid} ->
43            gen_statem:call(Pid, {'$open', Secret, Filename, Modes}, infinity);
44        Other ->
45            Other
46    end.
47
48callback_mode() -> state_functions.
49
50init({Owner, Secret, Options}) ->
51    Monitor = monitor(process, Owner),
52    Defaults =
53        #{ owner => Owner,
54           monitor => Monitor,
55           secret => Secret,
56           timer => none,
57           pid => self(),
58           buffer => prim_buffer:new(),
59           delay_size => 64 bsl 10,
60           delay_time => 2000 },
61    Data = fill_delay_values(Defaults, Options),
62    {ok, opening, Data}.
63
64fill_delay_values(Data, []) ->
65    Data;
66fill_delay_values(Data, [{delayed_write, Size, Time} | Options]) ->
67    fill_delay_values(Data#{ delay_size => Size, delay_time => Time }, Options);
68fill_delay_values(Data, [_ | Options]) ->
69    fill_delay_values(Data, Options).
70
71opening({call, From}, {'$open', Secret, Filename, Modes}, #{ secret := Secret } = Data) ->
72    case raw_file_io:open(Filename, Modes) of
73        {ok, PrivateFd} ->
74            PublicData = maps:with([owner, buffer, delay_size, pid], Data),
75            PublicFd = #file_descriptor{ module = ?MODULE, data = PublicData },
76
77            NewData = Data#{ handle => PrivateFd },
78            Response = {ok, PublicFd},
79            {next_state, opened, NewData, [{reply, From, Response}]};
80        Other ->
81            {stop_and_reply, normal, [{reply, From, Other}]}
82    end;
83opening(_Event, _Contents, _Data) ->
84    {keep_state_and_data, [postpone]}.
85
86%%
87
88opened(info, {'$timed_out', Secret}, #{ secret := Secret } = Data) ->
89    %% If the user writes something at this exact moment, the flush will fail
90    %% and the timer won't reset on the next write since the buffer won't be
91    %% empty (Unless we collided on a flush). We therefore reset the timeout to
92    %% ensure that data won't sit idle for extended periods of time.
93    case try_flush_write_buffer(Data) of
94        busy -> gen_statem:cast(self(), '$reset_timeout');
95        ok -> ok
96    end,
97    {keep_state, Data#{ timer => none }, []};
98
99opened(info, {'DOWN', Monitor, process, _Owner, Reason}, #{ monitor := Monitor } = Data) ->
100    if
101        Reason =/= kill -> try_flush_write_buffer(Data);
102        Reason =:= kill -> ignored
103    end,
104    {stop, shutdown};
105
106opened(info, _Message, _Data) ->
107    keep_state_and_data;
108
109opened({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
110    case flush_write_buffer(Data) of
111        ok ->
112            #{ handle := PrivateFd } = Data,
113            Response = ?CALL_FD(PrivateFd, close, []),
114            {stop_and_reply, normal, [{reply, From, Response}]};
115        Other ->
116            {stop_and_reply, normal, [{reply, From, Other}]}
117    end;
118
119opened({call, {Owner, _Tag} = From}, '$wait', #{ owner := Owner }) ->
120    %% Used in write/2 to synchronize writes on lock conflicts.
121    {keep_state_and_data, [{reply, From, ok}]};
122
123opened({call, {Owner, _Tag} = From}, '$synchronous_flush', #{ owner := Owner } = Data) ->
124    cancel_flush_timeout(Data),
125    Response = flush_write_buffer(Data),
126    {keep_state_and_data, [{reply, From, Response}]};
127
128opened({call, {Owner, _Tag} = From}, Command, #{ owner := Owner } = Data) ->
129    Response =
130        case flush_write_buffer(Data) of
131            ok -> dispatch_command(Data, Command);
132            Other -> Other
133        end,
134    {keep_state_and_data, [{reply, From, Response}]};
135
136opened({call, _From}, _Command, _Data) ->
137    %% The client functions filter this out, so we'll crash if the user does
138    %% anything stupid on purpose.
139    {shutdown, protocol_violation};
140
141opened(cast, '$reset_timeout', #{ delay_time := Timeout, secret := Secret } = Data) ->
142    cancel_flush_timeout(Data),
143    Timer = erlang:send_after(Timeout, self(), {'$timed_out', Secret}),
144    {keep_state, Data#{ timer => Timer }, []};
145
146opened(cast, _Message, _Data) ->
147    {keep_state_and_data, []}.
148
149dispatch_command(Data, [Function | Args]) ->
150    #{ handle := Handle } = Data,
151    Module = Handle#file_descriptor.module,
152    apply(Module, Function, [Handle | Args]).
153
154cancel_flush_timeout(#{ timer := none }) ->
155    ok;
156cancel_flush_timeout(#{ timer := Timer }) ->
157    _ = erlang:cancel_timer(Timer, [{async, true}]),
158    ok.
159
160try_flush_write_buffer(#{ buffer := Buffer, handle := PrivateFd }) ->
161    case prim_buffer:try_lock(Buffer) of
162        acquired ->
163            flush_write_buffer_1(Buffer, PrivateFd),
164            prim_buffer:unlock(Buffer),
165            ok;
166        busy ->
167            busy
168    end.
169
170%% This is only safe to use when there is no chance of conflict with the owner
171%% process, or in other words, "during synchronous calls outside of the locked
172%% section of write/2"
173flush_write_buffer(#{ buffer := Buffer, handle := PrivateFd }) ->
174    acquired = prim_buffer:try_lock(Buffer),
175    Result = flush_write_buffer_1(Buffer, PrivateFd),
176    prim_buffer:unlock(Buffer),
177    Result.
178
179flush_write_buffer_1(Buffer, PrivateFd) ->
180    case prim_buffer:size(Buffer) of
181        Size when Size > 0 ->
182            ?CALL_FD(PrivateFd, write, [prim_buffer:read_iovec(Buffer, Size)]);
183        0 ->
184            ok
185    end.
186
187terminate(_Reason, _State, _Data) ->
188    ok.
189
190%% Client functions
191
192write(Fd, IOData) ->
193    try
194        enqueue_write(Fd, erlang:iolist_to_iovec(IOData))
195    catch
196        error:badarg -> {error, badarg}
197    end.
198enqueue_write(_Fd, []) ->
199    ok;
200enqueue_write(Fd, IOVec) ->
201    %% get_fd_data will reject everyone except the process that opened the Fd,
202    %% so we can't race with anyone except the wrapper process.
203    #{ delay_size := DelaySize,
204       buffer := Buffer,
205       pid := Pid } = get_fd_data(Fd),
206    case prim_buffer:try_lock(Buffer) of
207        acquired ->
208            %% (The wrapper process will exit without flushing if we're killed
209            %% while holding the lock).
210            enqueue_write_locked(Pid, Buffer, DelaySize, IOVec);
211        busy ->
212            %% This can only happen while we're processing a timeout in the
213            %% wrapper process, so we perform a bogus call to get a completion
214            %% notification before trying again.
215            gen_statem:call(Pid, '$wait'),
216            enqueue_write(Fd, IOVec)
217    end.
218enqueue_write_locked(Pid, Buffer, DelaySize, IOVec) ->
219    %% The synchronous operations (write, forced flush) are safe since we're
220    %% running on the only process that can fill the buffer; a timeout being
221    %% processed just before $synchronous_flush will cause the flush to nop,
222    %% and a timeout sneaking in just before a synchronous write won't do
223    %% anything since the buffer is guaranteed to be empty at that point.
224    BufSize = prim_buffer:size(Buffer),
225    case is_iovec_smaller_than(IOVec, DelaySize - BufSize) of
226        true when BufSize > 0 ->
227            prim_buffer:write(Buffer, IOVec),
228            prim_buffer:unlock(Buffer);
229        true ->
230            prim_buffer:write(Buffer, IOVec),
231            prim_buffer:unlock(Buffer),
232            gen_statem:cast(Pid, '$reset_timeout');
233        false when BufSize > 0 ->
234            prim_buffer:write(Buffer, IOVec),
235            prim_buffer:unlock(Buffer),
236            gen_statem:call(Pid, '$synchronous_flush');
237        false ->
238            prim_buffer:unlock(Buffer),
239            gen_statem:call(Pid, [write, IOVec])
240    end.
241
242%% iolist_size/1 will always look through the entire list to get a precise
243%% amount, which is pretty inefficient since we only need to know whether we've
244%% hit the buffer threshold or not.
245%%
246%% We only handle the binary case since write/2 forcibly translates input to
247%% erlang:iovec().
248is_iovec_smaller_than(IOVec, Max) ->
249    is_iovec_smaller_than_1(IOVec, Max, 0).
250is_iovec_smaller_than_1(_IOVec, Max, Acc) when Acc >= Max ->
251    false;
252is_iovec_smaller_than_1([], _Max, _Acc) ->
253    true;
254is_iovec_smaller_than_1([Binary | Rest], Max, Acc) when is_binary(Binary) ->
255    is_iovec_smaller_than_1(Rest, Max, Acc + byte_size(Binary)).
256
257close(Fd) ->
258    wrap_call(Fd, [close]).
259
260sync(Fd) ->
261    wrap_call(Fd, [sync]).
262datasync(Fd) ->
263    wrap_call(Fd, [datasync]).
264
265truncate(Fd) ->
266    wrap_call(Fd, [truncate]).
267
268advise(Fd, Offset, Length, Advise) ->
269    wrap_call(Fd, [advise, Offset, Length, Advise]).
270allocate(Fd, Offset, Length) ->
271    wrap_call(Fd, [allocate, Offset, Length]).
272
273position(Fd, Mark) ->
274    wrap_call(Fd, [position, Mark]).
275
276pwrite(Fd, Offset, IOData) ->
277    try
278        CompactedData = erlang:iolist_to_iovec(IOData),
279        wrap_call(Fd, [pwrite, Offset, CompactedData])
280    catch
281        error:badarg -> {error, badarg}
282    end.
283pwrite(Fd, LocBytes) ->
284    try
285        CompactedLocBytes =
286            [ {Offset, erlang:iolist_to_iovec(IOData)} ||
287              {Offset, IOData} <- LocBytes ],
288        wrap_call(Fd, [pwrite, CompactedLocBytes])
289    catch
290        error:badarg -> {error, badarg}
291    end.
292
293read_line(Fd) ->
294    wrap_call(Fd, [read_line]).
295read(Fd, Size) ->
296    wrap_call(Fd, [read, Size]).
297pread(Fd, Offset, Size) ->
298    wrap_call(Fd, [pread, Offset, Size]).
299pread(Fd, LocNums) ->
300    wrap_call(Fd, [pread, LocNums]).
301
302ipread_s32bu_p32bu(Fd, Offset, MaxSize) ->
303    wrap_call(Fd, [ipread_s32bu_p32bu, Offset, MaxSize]).
304
305sendfile(_,_,_,_,_,_,_,_) ->
306    {error, enotsup}.
307
308internal_get_nif_resource(_) ->
309    {error, enotsup}.
310
311read_handle_info(Fd, Opts) ->
312    wrap_call(Fd, [Opts]).
313
314wrap_call(Fd, Command) ->
315    #{ pid := Pid } = get_fd_data(Fd),
316    try gen_statem:call(Pid, Command, infinity) of
317        Result -> Result
318    catch
319        exit:{normal, _StackTrace} -> {error, einval};
320        exit:{noproc, _StackTrace} -> {error, einval}
321    end.
322
323get_fd_data(#file_descriptor{ data = Data }) ->
324    #{ owner := Owner } = Data,
325    case self() of
326        Owner -> Data;
327        _ -> error(not_on_controlling_process)
328    end.
329