1%%%
2%%%  Copyright 2011 © INRIA
3%%%
4%%%  Author : Nicolas Niclausse <nicolas.niclausse@inria.fr>
5%%%  Created: 4 mai 2011 by Nicolas Niclausse <nicolas.niclausse@inria.fr>
6%%%
7%%%  This program is free software; you can redistribute it and/or modify
8%%%  it under the terms of the GNU General Public License as published by
9%%%  the Free Software Foundation; either version 2 of the License, or
10%%%  (at your option) any later version.
11%%%
12%%%  This program is distributed in the hope that it will be useful,
13%%%  but WITHOUT ANY WARRANTY; without even the implied warranty of
14%%%  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15%%%  GNU General Public License for more details.
16%%%
17%%%  You should have received a copy of the GNU General Public License
18%%%  along with this program; if not, write to the Free Software
19%%%  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
20%%%
21
22%%%  In addition, as a special exception, you have the permission to
23%%%  link the code of this program with any library released under
24%%%  the EPL license and distribute linked combinations including
25%%%  the two; the MPL (Mozilla Public License), which EPL (Erlang
26%%%  Public License) is based on, is included in this exception.
27
28-module(ts_job).
29-author('nicolas.niclausse@inria.fr').
30
31-behaviour(ts_plugin).
32
33-include("ts_macros.hrl").
34-include("ts_profile.hrl").
35-include("ts_job.hrl").
36-include_lib("kernel/include/file.hrl").
37
38-export([add_dynparams/4,
39         get_message/2,
40         session_defaults/0,
41         dump/2,
42         parse/2,
43         parse_bidi/2,
44         parse_config/2,
45         decode_buffer/2,
46         new_session/0]).
47
48
49%%====================================================================
50%% Data Types
51%%====================================================================
52
53%% @type dyndata() = #dyndata{proto=ProtoData::term(),dynvars=list()}.
54%% Dynamic data structure
55%% @end
56
57%% @type server() = {Host::tuple(),Port::integer(),Protocol::atom()}.
58%% Host/Port/Protocol tuple
59%% @end
60
61%% @type param() = {dyndata(), server()}.
62%% Dynamic data structure
63%% @end
64
65%% @type hostdata() = {Host::tuple(),Port::integer()}.
66%% Host/Port pair
67%% @end
68
69%% @type client_data() = binary() | closed.
70%% Data passed to a protocol implementation is either a binary or the
71%% atom closed indicating that the server closed the tcp connection.
72%% @end
73
74%%====================================================================
75%% API
76%%====================================================================
77
78parse_config(El,Config) ->
79     ts_config_job:parse_config(El, Config).
80
81%% @spec session_defaults() -> {ok, Persistent} | {ok, Persistent, Bidi}
82%% Persistent = bool()
83%% Bidi = bool()
84%% @doc Default parameters for sessions of this protocol. Persistent
85%% is true if connections are preserved after the underlying tcp
86%% connection closes. Bidi should be true for bidirectional protocols
87%% where the protocol module needs to reply to data sent from the
88%% server. @end
89session_defaults() ->
90    {ok, true}. % not relevant for erlang type (?).
91
92%% @spec new_session() -> State::term()
93%% @doc Initialises the state for a new protocol session.
94%% @end
95new_session() ->
96    #job_session{}.
97
98%% @spec decode_buffer(Buffer::binary(),Session::record(job)) ->  NewBuffer::binary()
99%% @doc We need to decode buffer (remove chunks, decompress ...) for
100%%      matching or dyn_variables
101%% @end
102decode_buffer(Buffer,#job_session{}) ->
103    Buffer.
104
105%% @spec add_dynparams(Subst, dyndata(), param(), hostdata()) -> {dyndata(), server()} | dyndata()
106%% Subst = term()
107%% @doc Updates the dynamic request data structure created by
108%% {@link ts_protocol:init_dynparams/0. init_dynparams/0}.
109%% @end
110add_dynparams(false, {_DynVars,Session}, Param, HostData) ->
111    add_dynparams(Session, Param, HostData);
112add_dynparams(true,  {DynVars,Session}, Param, HostData) ->
113    NewParam = subst(Param, DynVars),
114    add_dynparams(Session,NewParam, HostData).
115
116add_dynparams(#job_session{}, Param, _HostData) ->
117    Param.
118
119%%----------------------------------------------------------------------
120%% @spec subst(record(job), term()) -> record(job)
121%% @doc Replace on the fly dynamic element of the request.
122%% @end
123%%----------------------------------------------------------------------
124subst(Job=#job{duration=D,req=Req,walltime=WT,resources=Res,options=Opts,jobid=Id}, DynVars) ->
125    Job#job{duration=ts_search:subst(D,DynVars),
126            req=ts_search:subst(Req,DynVars),
127            resources=ts_search:subst(Res,DynVars),
128            walltime=ts_search:subst(WT,DynVars),
129            options=ts_search:subst(Opts,DynVars),
130            jobid=ts_search:subst(Id,DynVars)}.
131
132
133dump(protocol,{none,#job_session{jobid=JobId,owner=Owner,submission_time=Sub,queue_time=Q,
134                                 start_time=Start,end_time=E,status=Status},Name})->
135    {R,_}=lists:mapfoldl(fun(A,Acc) ->
136                                 {integer_to_list(round(ts_utils:elapsed(Acc,A))),A}
137                         end,Sub,[Q,Start,E]),
138    Date=integer_to_list(round(ts_utils:time2sec_hires(Sub))),
139    Data=ts_utils:join(";",[JobId,Name,Date]++R++[Status]),
140    ts_mon:dump({protocol, Owner, Data });
141dump(_P,_Args) ->
142    ok.
143
144%% @spec parse(Data::client_data(), State) -> {NewState, Opts, Close}
145%% State = #state_rcv{}
146%% Opts = proplist()
147%% Close = bool()
148%% @doc
149%% Opts is a list of inet:setopts socket options. Don't change the
150%% active/passive mode here as tsung will set {active,once} before
151%% your options.
152%% Setting Close to true will cause tsung to close the connection to
153%% the server.
154%% @end
155parse({os, cmd, _Args, "Admission Rule ERROR" ++ Tail},State=#state_rcv{session=_S})->
156    ?LOGF("ERROR, admission rule: ~p",[Tail],?WARN),
157    ts_mon_cache:add([{sum,error_job_admission_rule,1}]),
158    {State#state_rcv{ack_done=true,datasize=length(Tail)+21}, [], false};
159parse({os, cmd, _Args, Res},State=#state_rcv{session=S,dump=Dump}) when is_list(Res)->
160    ?LOGF("os:cmd result: ~p",[Res],?DEB),
161  %% oarsub output:
162  %% [ADMISSION RULE] Modify resource description with type constraints
163  %% Generate a job key...
164  %% OAR_JOB_ID=468822
165    Lines = string:tokens(Res,"\n"),
166    case lists:last(Lines) of
167        "OAR_JOB_ID="++ID ->
168            ?LOGF("OK,job id is ~p",[ID],?INFO),
169            ts_job_notify:monitor({ID,self(),S#job_session.submission_time, ?NOW,Dump}),
170            {State#state_rcv{ack_done=true,datasize=length(Res)}, [], false};
171        _ ->
172            {State#state_rcv{ack_done=true,datasize=length(Res)}, [], false}
173    end;
174parse(nojobs,State) ->
175    ?LOGF(" no jobs in queue for ~p, stop waiting",[self()],?DEB),
176    {State#state_rcv{ack_done=true}, [], false};
177parse({Mod, Fun, Args, Res},State) ->
178    ?LOGF(" result: ~p",[{Mod, Fun, Args, Res}],?DEB),
179    {State#state_rcv{ack_done=false}, [], false}.
180
181%% @spec parse_bidi(Data, State) -> {nodata, NewState} | {Data, NewState}
182%% Data = client_data()
183%% NewState = term()
184%% State = term()
185%% @doc Parse a block of data from the server. No reply will be sent
186%% if the return value is nodata, otherwise the Data binary will be
187%% sent back to the server immediately.
188%% @end
189parse_bidi(Data, State) ->
190    ts_plugin:parse_bidi(Data,State).
191
192%% @spec get_message(record(job),record(state_rcv)) -> {Message::term(),record(state_rcv)}
193%% @doc Creates a new message to send to the connected server.
194%% @end
195get_message(#job{type=oar,req=wait_jobs},#state_rcv{session=Session}) ->
196    ts_job_notify:wait_jobs(self()),
197    {{erlang, now,[], 0},Session}; % we could use any function call, the result is not used
198get_message(Job=#job{duration=D},State) when is_integer(D)->
199    get_message(Job#job{duration=integer_to_list(D)},State);
200get_message(Job=#job{notify_port=P},State) when is_integer(P)->
201    get_message(Job#job{notify_port=integer_to_list(P)},State);
202get_message(#job{type=oar,user=U,req=submit, name=N,script=S, resources=R, queue=Q, walltime=W,notify_port=P, notify_script=NS,duration=D,options=Opts},#state_rcv{session=Session}) ->
203    Submit = case U of
204                 undefined -> "oarsub ";
205                 User      -> "sudo -u "++User++" oarsub "
206             end,
207    Queue = case Q of
208                "" -> "";
209                _  -> "-q "++ Q
210            end,
211    Cmd=Submit++Queue++" -l "++R++ ",walltime="++W
212        ++" -n " ++N ++" "
213        ++ Opts ++ " "
214        ++" --notify \"exec:" ++NS++" "++P++"\" "
215        ++"\""++S++" "++D++"\"",
216    ?LOGF("Will run ~p",[Cmd],?INFO),
217    Message = {os, cmd, [Cmd], length(Cmd) },
218    {Message, Session#job_session{submission_time=?NOW}}.
219