1%%% 2%%% Copyright 2011 © INRIA 3%%% 4%%% Author : Nicolas Niclausse <nicolas.niclausse@inria.fr> 5%%% Created: 04 mai 2011 by Nicolas Niclausse <nicolas.niclausse@inria.fr> 6%%% 7%%% This program is free software; you can redistribute it and/or modify 8%%% it under the terms of the GNU General Public License as published by 9%%% the Free Software Foundation; either version 2 of the License, or 10%%% (at your option) any later version. 11%%% 12%%% This program is distributed in the hope that it will be useful, 13%%% but WITHOUT ANY WARRANTY; without even the implied warranty of 14%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15%%% GNU General Public License for more details. 16%%% 17%%% You should have received a copy of the GNU General Public License 18%%% along with this program; if not, write to the Free Software 19%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 20%%% 21%%% In addition, as a special exception, you have the permission to 22%%% link the code of this program with any library released under 23%%% the EPL license and distribute linked combinations including 24%%% the two; the MPL (Mozilla Public License), which EPL (Erlang 25%%% Public License) is based on, is included in this exception. 26%%% 27%%% @doc 28%%% 29%%% @end 30 31-module(ts_job_notify). 32-vc('$Id: ts_notify.erl,v 0.0 2011/05/04 11:18:48 nniclaus Exp $ '). 33-author('nicolas.niclausse@inria.fr'). 34 35 36-behaviour(gen_server). 37 38-include("ts_macros.hrl"). 39-include("ts_job.hrl"). 40 41%% API 42-export([start_link/0]). 43 44-export([listen/1, monitor/1, demonitor/1, wait_jobs/1]). 45 46%% gen_server callbacks 47-export([init/1, handle_call/3, handle_cast/2, handle_info/2, 48 terminate/2, code_change/3]). 49 50-define(SERVER, ?MODULE). 51 52-record(state, {port, % listen port 53 acceptsock, % The socket we are accept()ing at 54 acceptloop_pid, % The PID of the companion process that blocks 55 jobs}). 56 57 58%%%=================================================================== 59%%% API 60%%%=================================================================== 61 62%%-------------------------------------------------------------------- 63%% @doc 64%% Starts the server 65%% 66%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} 67%% @end 68%%-------------------------------------------------------------------- 69start_link() -> 70 gen_server:start_link({global, ?MODULE}, ?MODULE, [], []). 71 72listen(Port) -> 73 gen_server:cast({global, ?MODULE}, {listen, Port}). 74 75monitor({JobID, OwnerPid, StartTime, QueuedTime, Dump}) -> 76 gen_server:cast({global, ?MODULE}, {monitor, {JobID, OwnerPid, StartTime, QueuedTime,Dump}}). 77 78demonitor({JobID}) -> 79 gen_server:cast({global, ?MODULE}, {monitor, {JobID}}). 80 81wait_jobs(Pid) -> 82 gen_server:cast({global, ?MODULE}, {wait_jobs, Pid}). 83 84%%%=================================================================== 85%%% gen_server callbacks 86%%%=================================================================== 87 88%%-------------------------------------------------------------------- 89%% @private 90%% @doc 91%% Initializes the server 92%% 93%% @spec init(Args) -> {ok, State} | 94%% {ok, State, Timeout} | 95%% ignore | 96%% {stop, Reason} 97%% @end 98%%-------------------------------------------------------------------- 99init([]) -> 100 ?LOG("Starting~n",?INFO), 101 case global:whereis_name(ts_config_server) of 102 undefined -> 103 {ok, #state{jobs=ets:new(jobs,[{keypos, #job_session.jobid}])}}; 104 _Pid -> 105 ?LOG("Config server is alive !~n",?INFO), 106 case ts_config_server:get_jobs_state() of 107 {Jobs,Port} -> 108 ?LOG("Got backup of node state~n",?DEB), 109 {noreply,NewState} = handle_cast({listen,Port}, #state{jobs=Jobs,port=Port}), 110 {ok, NewState}; 111 Else -> 112 ?LOGF("Got this from config server:~p~n",[Else],?DEB), 113 {ok, #state{jobs=ets:new(jobs,[{keypos, #job_session.jobid}])}} 114 end 115 end. 116 117%%-------------------------------------------------------------------- 118%% @private 119%% @doc 120%% Handling call messages 121%% 122%% @spec handle_call(Request, From, State) -> 123%% {reply, Reply, State} | 124%% {reply, Reply, State, Timeout} | 125%% {noreply, State} | 126%% {noreply, State, Timeout} | 127%% {stop, Reason, Reply, State} | 128%% {stop, Reason, State} 129%% @end 130%%-------------------------------------------------------------------- 131 132handle_call({accepted, _Tag, Sock}, _From, State) -> 133 ?LOGF("New socket:~p~n", [Sock],?DEB), 134 {reply, continue, State#state{}}; 135 136handle_call({accept_error, _Tag, Error}, _From, State) -> 137 ?LOGF("accept() failed ~p~n",[Error],?ERR), 138 {stop, Error, stop, State}; 139 140handle_call(_Request, _From, State) -> 141 Reply = ok, 142 {reply, Reply, State}. 143 144%%-------------------------------------------------------------------- 145%% @private 146%% @doc 147%% Handling cast messages 148%% 149%% @spec handle_cast(Msg, State) -> {noreply, State} | 150%% {noreply, State, Timeout} | 151%% {stop, Reason, State} 152%% @end 153%%-------------------------------------------------------------------- 154handle_cast({monitor, {JobID, OwnerPid, SubmitTS, QueuedTS,Dump}}, State=#state{jobs=Jobs}) -> 155 ?LOGF("monitoring job ~p from pid ~p~n",[JobID,OwnerPid],?DEB), 156 ets:insert(Jobs,#job_session{jobid=JobID,owner=OwnerPid, submission_time=SubmitTS, queue_time=QueuedTS,dump=Dump}), 157 SubmitTime=ts_utils:elapsed(SubmitTS,QueuedTS), 158 ts_mon_cache:add([{sum,job_queued,1},{sample,tr_job_submit,SubmitTime}]), 159 {noreply, State}; 160handle_cast({demonitor, {JobID}}, State=#state{jobs=Jobs}) -> 161 ets:delete(Jobs,JobID), 162 {noreply, State}; 163handle_cast({wait_jobs, Pid}, State=#state{jobs=Jobs}) -> 164 %% look for all jobs started by this pid 165 ?LOGF("look for job of ~p~n",[Pid],?DEB), 166 check_jobs(Jobs,Pid), 167 {noreply, State}; 168 169handle_cast({listen, undefined}, State) -> 170 ?LOG("No listen port defined, can't open listening socket (don't worry: it's normal if you don't use job notifications) ~n",?INFO), 171 {noreply, State}; 172handle_cast({listen,Port}, State) -> 173 Opts = [{reuseaddr, true}, {active, once}], 174 case gen_tcp:listen(Port, Opts) of 175 {ok, ListenSock} -> 176 ?LOGF("Listening on port ~p done, start accepting loop~n",[Port],?INFO), 177 {noreply, State#state 178 {acceptsock=ListenSock, 179 port=Port, 180 acceptloop_pid = spawn_link(ts_utils, 181 accept_loop, 182 [self(), unused, ListenSock])}}; 183 {error, Reason} -> 184 ?LOGF("Error when trying to listen to socket: ~p~n",[Reason],?ERR), 185 {noreply, State} 186 end; 187handle_cast(_Msg, State) -> 188 {noreply, State}. 189 190%%-------------------------------------------------------------------- 191%% @private 192%% @doc 193%% Handling all non call/cast messages 194%% 195%% @spec handle_info(Info, State) -> {noreply, State} | 196%% {noreply, State, Timeout} | 197%% {stop, Reason, State} 198%% @end 199%%-------------------------------------------------------------------- 200handle_info({tcp, Socket, Data}, State=#state{jobs=Jobs}) -> 201%% OAR: 202%% args are job_id,job_name,TAG,comment 203%% TAG can be: 204%% - RUNNING : when the job is launched 205%% - END : when the job is finished normally 206%% - ERROR : when the job is finished abnormally 207%% - INFO : used when oardel is called on the job 208%% - SUSPENDED : when the job is suspended 209%% - RESUMING : when the job is resumed 210 ?LOGF("received ~p from socket ~p",[Data,Socket],?DEB), 211 case string:tokens(Data," ") of 212 [Id, _Name, "RUNNING"|_] -> 213 ?LOGF("look for job ~p in table",[Id],?DEB), 214 case ets:lookup(Jobs,Id) of 215 [] -> 216 ?LOGF("Job owner of ~p is unknown",[Id],?NOTICE); 217 [Job] -> 218 Now=?NOW, 219 Queued=ts_utils:elapsed(Job#job_session.queue_time,Now), 220 ts_mon_cache:add([{sample,tr_job_wait,Queued},{sum,job_running,1}, {sum,job_queued,-1}]), 221 ets:update_element(Jobs,Id,{#job_session.start_time,Now}) 222 end; 223 [Id, Name, "END"|_] -> 224 case ets:lookup(Jobs,Id) of 225 [] -> 226 ?LOGF("Job owner of ~p is unknown",[Id],?NOTICE); 227 [Job=#job_session{start_time=undefined}] -> 228 ?LOGF("ERROR: Start time of job ~p is unknown",[Id],?ERR), 229 ts_mon_cache:add([{sum,job_running,-1}, {sum,ok_job ,1}]), 230 ets:delete_object(Jobs,Job), 231 check_jobs(Jobs,Job#job_session.owner); 232 [Job]-> 233 Now=?NOW, 234 Duration=ts_utils:elapsed(Job#job_session.start_time,Now), 235 ts_mon_cache:add([{sample,tr_job_duration,Duration},{sum,job_running,-1}, {sum,ok_job ,1}]), 236 ts_job:dump(Job#job_session.dump,{none,Job#job_session{end_time=Now,status="ok"},Name}), 237 ets:delete_object(Jobs,Job), 238 check_jobs(Jobs,Job#job_session.owner) 239 end; 240 [Id, Name, "ERROR"|_] -> 241 case ets:lookup(Jobs,Id) of 242 [] -> 243 ?LOGF("Job owner of ~p is unknown",[Id],?NOTICE); 244 [Job=#job_session{start_time=undefined}] -> 245 ?LOGF("ERROR: start time of job ~p is unknown",[Id],?ERR), 246 ts_mon_cache:add([{sum,job_running,-1}, {sum,error_job,1}]), 247 ets:delete_object(Jobs,Job), 248 check_jobs(Jobs,Job#job_session.owner); 249 [Job]-> 250 Now=?NOW, 251 Duration=ts_utils:elapsed(Job#job_session.start_time,Now), 252 ts_mon_cache:add([{sample,tr_job_duration,Duration},{sum,job_running,-1}, {sum,error_job,1}]), 253 ts_job:dump(Job#job_session.dump,{none,Job#job_session{end_time=Now,status="error"},Name}), 254 ets:delete_object(Jobs,Job), 255 check_jobs(Jobs,Job#job_session.owner) 256 end; 257 [_Id, _Name, "INFO"|_] -> 258 ok; 259 [_Id, _Name, "SUSPENDED"|_] -> 260 ok; 261 [_Id, _Name, "RESUMING"|_] -> 262 ok 263 end, 264 inet:setopts(Socket,[{active,once}]), 265 {noreply, State}; 266handle_info({tcp_closed, _Socket}, State) -> 267 {noreply, State}; 268handle_info({'ETS-TRANSFER',_Tab,_FromPid,_GiftData}, State=#state{}) -> 269 ?LOG("Got ownership on job state table", ?NOTICE), 270 {noreply, State}; 271handle_info(Info, State) -> 272 ?LOGF("Unexpected message received: ~p", [Info], ?WARN), 273 {noreply, State}. 274 275%%-------------------------------------------------------------------- 276%% @private 277%% @doc 278%% This function is called by a gen_server when it is about to 279%% terminate. It should be the opposite of Module:init/1 and do any 280%% necessary cleaning up. When it returns, the gen_server terminates 281%% with Reason. The return value is ignored. 282%% 283%% @spec terminate(Reason, State) -> void() 284%% @end 285%%-------------------------------------------------------------------- 286terminate(normal, _State) -> 287 ?LOG("Terminating for normal reason", ?WARN), 288 ok; 289terminate(Reason, State) when is_integer(State#state.port)-> 290 ?LOGF("Terminating for reason ~p", [Reason], ?WARN), 291 Pid=global:whereis_name(ts_config_server), 292 ?LOGF("Config server pid is ~p", [Pid], ?DEB), 293 ets:give_away(State#state.jobs,Pid,State#state.port), 294 ok; 295terminate(Reason, State) -> 296 ?LOGF("Terminating for reason ~p ~p", [Reason,State], ?WARN), 297 ok. 298 299%%-------------------------------------------------------------------- 300%% @private 301%% @doc 302%% Convert process state when code is changed 303%% 304%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} 305%% @end 306%%-------------------------------------------------------------------- 307code_change(_OldVsn, State, _Extra) -> 308 {ok, State}. 309 310%%%=================================================================== 311%%% Internal functions 312%%%=================================================================== 313 314check_jobs(Jobs,Pid)-> 315 case ets:match_object(Jobs, #job_session{owner=Pid, _='_'}) of 316 [] -> 317 ?LOGF("no jobs for pid ~p~n",[Pid],?DEB), 318 Pid ! {erlang, ok, nojobs}; 319 PidJobs-> 320 ?LOGF("still ~p jobs for pid ~p~n",[length(PidJobs),Pid],?INFO) 321 end. 322 323