1%%%
2%%%  Copyright 2011 © INRIA
3%%%
4%%%  Author : Nicolas Niclausse <nicolas.niclausse@inria.fr>
5%%%  Created: 04 mai 2011 by Nicolas Niclausse <nicolas.niclausse@inria.fr>
6%%%
7%%%  This program is free software; you can redistribute it and/or modify
8%%%  it under the terms of the GNU General Public License as published by
9%%%  the Free Software Foundation; either version 2 of the License, or
10%%%  (at your option) any later version.
11%%%
12%%%  This program is distributed in the hope that it will be useful,
13%%%  but WITHOUT ANY WARRANTY; without even the implied warranty of
14%%%  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15%%%  GNU General Public License for more details.
16%%%
17%%%  You should have received a copy of the GNU General Public License
18%%%  along with this program; if not, write to the Free Software
19%%%  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
20%%%
21%%%  In addition, as a special exception, you have the permission to
22%%%  link the code of this program with any library released under
23%%%  the EPL license and distribute linked combinations including
24%%%  the two; the MPL (Mozilla Public License), which EPL (Erlang
25%%%  Public License) is based on, is included in this exception.
26%%%
27%%% @doc
28%%%
29%%% @end
30
31-module(ts_job_notify).
32-vc('$Id: ts_notify.erl,v 0.0 2011/05/04 11:18:48 nniclaus Exp $ ').
33-author('nicolas.niclausse@inria.fr').
34
35
36-behaviour(gen_server).
37
38-include("ts_macros.hrl").
39-include("ts_job.hrl").
40
41%% API
42-export([start_link/0]).
43
44-export([listen/1, monitor/1, demonitor/1, wait_jobs/1]).
45
46%% gen_server callbacks
47-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
48         terminate/2, code_change/3]).
49
50-define(SERVER, ?MODULE).
51
52-record(state, {port,           % listen port
53                acceptsock,     % The socket we are accept()ing at
54                acceptloop_pid, % The PID of the companion process that blocks
55                jobs}).
56
57
58%%%===================================================================
59%%% API
60%%%===================================================================
61
62%%--------------------------------------------------------------------
63%% @doc
64%% Starts the server
65%%
66%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
67%% @end
68%%--------------------------------------------------------------------
69start_link() ->
70    gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
71
72listen(Port) ->
73    gen_server:cast({global, ?MODULE}, {listen, Port}).
74
75monitor({JobID, OwnerPid, StartTime, QueuedTime, Dump}) ->
76    gen_server:cast({global, ?MODULE}, {monitor, {JobID, OwnerPid, StartTime, QueuedTime,Dump}}).
77
78demonitor({JobID}) ->
79    gen_server:cast({global, ?MODULE}, {monitor, {JobID}}).
80
81wait_jobs(Pid) ->
82    gen_server:cast({global, ?MODULE}, {wait_jobs, Pid}).
83
84%%%===================================================================
85%%% gen_server callbacks
86%%%===================================================================
87
88%%--------------------------------------------------------------------
89%% @private
90%% @doc
91%% Initializes the server
92%%
93%% @spec init(Args) -> {ok, State} |
94%%                     {ok, State, Timeout} |
95%%                     ignore |
96%%                     {stop, Reason}
97%% @end
98%%--------------------------------------------------------------------
99init([]) ->
100    ?LOG("Starting~n",?INFO),
101    case global:whereis_name(ts_config_server) of
102        undefined ->
103            {ok, #state{jobs=ets:new(jobs,[{keypos, #job_session.jobid}])}};
104        _Pid ->
105            ?LOG("Config server is alive !~n",?INFO),
106            case ts_config_server:get_jobs_state() of
107                {Jobs,Port} ->
108                    ?LOG("Got backup of node state~n",?DEB),
109                    {noreply,NewState} = handle_cast({listen,Port}, #state{jobs=Jobs,port=Port}),
110                    {ok, NewState};
111                Else ->
112                    ?LOGF("Got this from config server:~p~n",[Else],?DEB),
113                    {ok, #state{jobs=ets:new(jobs,[{keypos, #job_session.jobid}])}}
114            end
115    end.
116
117%%--------------------------------------------------------------------
118%% @private
119%% @doc
120%% Handling call messages
121%%
122%% @spec handle_call(Request, From, State) ->
123%%                                   {reply, Reply, State} |
124%%                                   {reply, Reply, State, Timeout} |
125%%                                   {noreply, State} |
126%%                                   {noreply, State, Timeout} |
127%%                                   {stop, Reason, Reply, State} |
128%%                                   {stop, Reason, State}
129%% @end
130%%--------------------------------------------------------------------
131
132handle_call({accepted, _Tag, Sock}, _From, State) ->
133    ?LOGF("New socket:~p~n", [Sock],?DEB),
134    {reply, continue, State#state{}};
135
136handle_call({accept_error, _Tag, Error}, _From, State) ->
137    ?LOGF("accept() failed ~p~n",[Error],?ERR),
138    {stop, Error, stop, State};
139
140handle_call(_Request, _From, State) ->
141    Reply = ok,
142    {reply, Reply, State}.
143
144%%--------------------------------------------------------------------
145%% @private
146%% @doc
147%% Handling cast messages
148%%
149%% @spec handle_cast(Msg, State) -> {noreply, State} |
150%%                                  {noreply, State, Timeout} |
151%%                                  {stop, Reason, State}
152%% @end
153%%--------------------------------------------------------------------
154handle_cast({monitor, {JobID, OwnerPid, SubmitTS, QueuedTS,Dump}}, State=#state{jobs=Jobs}) ->
155    ?LOGF("monitoring job ~p from pid ~p~n",[JobID,OwnerPid],?DEB),
156    ets:insert(Jobs,#job_session{jobid=JobID,owner=OwnerPid, submission_time=SubmitTS, queue_time=QueuedTS,dump=Dump}),
157    SubmitTime=ts_utils:elapsed(SubmitTS,QueuedTS),
158    ts_mon_cache:add([{sum,job_queued,1},{sample,tr_job_submit,SubmitTime}]),
159    {noreply, State};
160handle_cast({demonitor, {JobID}}, State=#state{jobs=Jobs}) ->
161    ets:delete(Jobs,JobID),
162    {noreply, State};
163handle_cast({wait_jobs, Pid}, State=#state{jobs=Jobs}) ->
164    %% look for all jobs started by this pid
165    ?LOGF("look for job of ~p~n",[Pid],?DEB),
166    check_jobs(Jobs,Pid),
167    {noreply, State};
168
169handle_cast({listen, undefined}, State) ->
170    ?LOG("No listen port defined, can't open listening socket (don't worry: it's normal if you don't use job notifications) ~n",?INFO),
171    {noreply, State};
172handle_cast({listen,Port}, State) ->
173    Opts = [{reuseaddr, true}, {active, once}],
174    case gen_tcp:listen(Port, Opts) of
175        {ok, ListenSock} ->
176            ?LOGF("Listening on port ~p done, start accepting loop~n",[Port],?INFO),
177            {noreply, State#state
178                   {acceptsock=ListenSock,
179                    port=Port,
180                    acceptloop_pid = spawn_link(ts_utils,
181                                                accept_loop,
182                                                [self(), unused, ListenSock])}};
183        {error, Reason} ->
184            ?LOGF("Error when trying to listen to socket: ~p~n",[Reason],?ERR),
185            {noreply, State}
186    end;
187handle_cast(_Msg, State) ->
188    {noreply, State}.
189
190%%--------------------------------------------------------------------
191%% @private
192%% @doc
193%% Handling all non call/cast messages
194%%
195%% @spec handle_info(Info, State) -> {noreply, State} |
196%%                                   {noreply, State, Timeout} |
197%%                                   {stop, Reason, State}
198%% @end
199%%--------------------------------------------------------------------
200handle_info({tcp, Socket, Data}, State=#state{jobs=Jobs}) ->
201%% OAR:
202%% args are job_id,job_name,TAG,comment
203%% TAG can be:
204%%   - RUNNING : when the job is launched
205%%   - END : when the job is finished normally
206%%   - ERROR : when the job is finished abnormally
207%%   - INFO : used when oardel is called on the job
208%%   - SUSPENDED : when the job is suspended
209%%   - RESUMING : when the job is resumed
210    ?LOGF("received ~p from socket ~p",[Data,Socket],?DEB),
211    case string:tokens(Data," ") of
212        [Id, _Name, "RUNNING"|_] ->
213            ?LOGF("look for job ~p in table",[Id],?DEB),
214            case ets:lookup(Jobs,Id) of
215                [] ->
216                    ?LOGF("Job owner of ~p is unknown",[Id],?NOTICE);
217                [Job] ->
218                    Now=?NOW,
219                    Queued=ts_utils:elapsed(Job#job_session.queue_time,Now),
220                    ts_mon_cache:add([{sample,tr_job_wait,Queued},{sum,job_running,1}, {sum,job_queued,-1}]),
221                    ets:update_element(Jobs,Id,{#job_session.start_time,Now})
222            end;
223        [Id, Name, "END"|_] ->
224            case ets:lookup(Jobs,Id) of
225                [] ->
226                    ?LOGF("Job owner of ~p is unknown",[Id],?NOTICE);
227                [Job=#job_session{start_time=undefined}] ->
228                    ?LOGF("ERROR: Start time of job ~p is unknown",[Id],?ERR),
229                    ts_mon_cache:add([{sum,job_running,-1}, {sum,ok_job ,1}]),
230                    ets:delete_object(Jobs,Job),
231                    check_jobs(Jobs,Job#job_session.owner);
232                [Job]->
233                    Now=?NOW,
234                    Duration=ts_utils:elapsed(Job#job_session.start_time,Now),
235                    ts_mon_cache:add([{sample,tr_job_duration,Duration},{sum,job_running,-1}, {sum,ok_job ,1}]),
236                    ts_job:dump(Job#job_session.dump,{none,Job#job_session{end_time=Now,status="ok"},Name}),
237                    ets:delete_object(Jobs,Job),
238                    check_jobs(Jobs,Job#job_session.owner)
239            end;
240        [Id, Name, "ERROR"|_] ->
241            case ets:lookup(Jobs,Id) of
242                [] ->
243                    ?LOGF("Job owner of ~p is unknown",[Id],?NOTICE);
244                [Job=#job_session{start_time=undefined}] ->
245                    ?LOGF("ERROR: start time of job ~p is unknown",[Id],?ERR),
246                    ts_mon_cache:add([{sum,job_running,-1}, {sum,error_job,1}]),
247                    ets:delete_object(Jobs,Job),
248                    check_jobs(Jobs,Job#job_session.owner);
249                [Job]->
250                    Now=?NOW,
251                    Duration=ts_utils:elapsed(Job#job_session.start_time,Now),
252                    ts_mon_cache:add([{sample,tr_job_duration,Duration},{sum,job_running,-1}, {sum,error_job,1}]),
253                    ts_job:dump(Job#job_session.dump,{none,Job#job_session{end_time=Now,status="error"},Name}),
254                    ets:delete_object(Jobs,Job),
255                    check_jobs(Jobs,Job#job_session.owner)
256                end;
257        [_Id, _Name, "INFO"|_] ->
258            ok;
259        [_Id, _Name, "SUSPENDED"|_] ->
260            ok;
261        [_Id, _Name, "RESUMING"|_] ->
262            ok
263    end,
264    inet:setopts(Socket,[{active,once}]),
265    {noreply, State};
266handle_info({tcp_closed, _Socket}, State) ->
267    {noreply, State};
268handle_info({'ETS-TRANSFER',_Tab,_FromPid,_GiftData}, State=#state{}) ->
269    ?LOG("Got ownership on job state table", ?NOTICE),
270    {noreply, State};
271handle_info(Info, State) ->
272    ?LOGF("Unexpected message received: ~p", [Info], ?WARN),
273   {noreply, State}.
274
275%%--------------------------------------------------------------------
276%% @private
277%% @doc
278%% This function is called by a gen_server when it is about to
279%% terminate. It should be the opposite of Module:init/1 and do any
280%% necessary cleaning up. When it returns, the gen_server terminates
281%% with Reason. The return value is ignored.
282%%
283%% @spec terminate(Reason, State) -> void()
284%% @end
285%%--------------------------------------------------------------------
286terminate(normal, _State) ->
287    ?LOG("Terminating for normal reason", ?WARN),
288    ok;
289terminate(Reason, State) when is_integer(State#state.port)->
290    ?LOGF("Terminating for reason ~p", [Reason], ?WARN),
291    Pid=global:whereis_name(ts_config_server),
292    ?LOGF("Config server pid is  ~p", [Pid], ?DEB),
293    ets:give_away(State#state.jobs,Pid,State#state.port),
294    ok;
295terminate(Reason, State) ->
296    ?LOGF("Terminating for reason ~p ~p", [Reason,State], ?WARN),
297    ok.
298
299%%--------------------------------------------------------------------
300%% @private
301%% @doc
302%% Convert process state when code is changed
303%%
304%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
305%% @end
306%%--------------------------------------------------------------------
307code_change(_OldVsn, State, _Extra) ->
308    {ok, State}.
309
310%%%===================================================================
311%%% Internal functions
312%%%===================================================================
313
314check_jobs(Jobs,Pid)->
315    case ets:match_object(Jobs, #job_session{owner=Pid, _='_'}) of
316        [] ->
317            ?LOGF("no jobs for pid ~p~n",[Pid],?DEB),
318            Pid ! {erlang, ok, nojobs};
319        PidJobs->
320            ?LOGF("still ~p jobs for pid ~p~n",[length(PidJobs),Pid],?INFO)
321    end.
322
323