1%%% 2%%% Copyright 2011 © INRIA 3%%% 4%%% Author : Nicolas Niclausse <nicolas.niclausse@inria.fr> 5%%% Created: 4 mai 2011 by Nicolas Niclausse <nicolas.niclausse@inria.fr> 6%%% 7%%% This program is free software; you can redistribute it and/or modify 8%%% it under the terms of the GNU General Public License as published by 9%%% the Free Software Foundation; either version 2 of the License, or 10%%% (at your option) any later version. 11%%% 12%%% This program is distributed in the hope that it will be useful, 13%%% but WITHOUT ANY WARRANTY; without even the implied warranty of 14%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15%%% GNU General Public License for more details. 16%%% 17%%% You should have received a copy of the GNU General Public License 18%%% along with this program; if not, write to the Free Software 19%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 20%%% 21 22%%% In addition, as a special exception, you have the permission to 23%%% link the code of this program with any library released under 24%%% the EPL license and distribute linked combinations including 25%%% the two; the MPL (Mozilla Public License), which EPL (Erlang 26%%% Public License) is based on, is included in this exception. 27 28-module(ts_job). 29-author('nicolas.niclausse@inria.fr'). 30 31-behaviour(ts_plugin). 32 33-include("ts_macros.hrl"). 34-include("ts_profile.hrl"). 35-include("ts_job.hrl"). 36-include_lib("kernel/include/file.hrl"). 37 38-export([add_dynparams/4, 39 get_message/2, 40 session_defaults/0, 41 dump/2, 42 parse/2, 43 parse_bidi/2, 44 parse_config/2, 45 decode_buffer/2, 46 new_session/0]). 47 48 49%%==================================================================== 50%% Data Types 51%%==================================================================== 52 53%% @type dyndata() = #dyndata{proto=ProtoData::term(),dynvars=list()}. 54%% Dynamic data structure 55%% @end 56 57%% @type server() = {Host::tuple(),Port::integer(),Protocol::atom()}. 58%% Host/Port/Protocol tuple 59%% @end 60 61%% @type param() = {dyndata(), server()}. 62%% Dynamic data structure 63%% @end 64 65%% @type hostdata() = {Host::tuple(),Port::integer()}. 66%% Host/Port pair 67%% @end 68 69%% @type client_data() = binary() | closed. 70%% Data passed to a protocol implementation is either a binary or the 71%% atom closed indicating that the server closed the tcp connection. 72%% @end 73 74%%==================================================================== 75%% API 76%%==================================================================== 77 78parse_config(El,Config) -> 79 ts_config_job:parse_config(El, Config). 80 81%% @spec session_defaults() -> {ok, Persistent} | {ok, Persistent, Bidi} 82%% Persistent = bool() 83%% Bidi = bool() 84%% @doc Default parameters for sessions of this protocol. Persistent 85%% is true if connections are preserved after the underlying tcp 86%% connection closes. Bidi should be true for bidirectional protocols 87%% where the protocol module needs to reply to data sent from the 88%% server. @end 89session_defaults() -> 90 {ok, true}. % not relevant for erlang type (?). 91 92%% @spec new_session() -> State::term() 93%% @doc Initialises the state for a new protocol session. 94%% @end 95new_session() -> 96 #job_session{}. 97 98%% @spec decode_buffer(Buffer::binary(),Session::record(job)) -> NewBuffer::binary() 99%% @doc We need to decode buffer (remove chunks, decompress ...) for 100%% matching or dyn_variables 101%% @end 102decode_buffer(Buffer,#job_session{}) -> 103 Buffer. 104 105%% @spec add_dynparams(Subst, dyndata(), param(), hostdata()) -> {dyndata(), server()} | dyndata() 106%% Subst = term() 107%% @doc Updates the dynamic request data structure created by 108%% {@link ts_protocol:init_dynparams/0. init_dynparams/0}. 109%% @end 110add_dynparams(false, {_DynVars,Session}, Param, HostData) -> 111 add_dynparams(Session, Param, HostData); 112add_dynparams(true, {DynVars,Session}, Param, HostData) -> 113 NewParam = subst(Param, DynVars), 114 add_dynparams(Session,NewParam, HostData). 115 116add_dynparams(#job_session{}, Param, _HostData) -> 117 Param. 118 119%%---------------------------------------------------------------------- 120%% @spec subst(record(job), term()) -> record(job) 121%% @doc Replace on the fly dynamic element of the request. 122%% @end 123%%---------------------------------------------------------------------- 124subst(Job=#job{duration=D,req=Req,walltime=WT,resources=Res,options=Opts,jobid=Id}, DynVars) -> 125 Job#job{duration=ts_search:subst(D,DynVars), 126 req=ts_search:subst(Req,DynVars), 127 resources=ts_search:subst(Res,DynVars), 128 walltime=ts_search:subst(WT,DynVars), 129 options=ts_search:subst(Opts,DynVars), 130 jobid=ts_search:subst(Id,DynVars)}. 131 132 133dump(protocol,{none,#job_session{jobid=JobId,owner=Owner,submission_time=Sub,queue_time=Q, 134 start_time=Start,end_time=E,status=Status},Name})-> 135 {R,_}=lists:mapfoldl(fun(A,Acc) -> 136 {integer_to_list(round(ts_utils:elapsed(Acc,A))),A} 137 end,Sub,[Q,Start,E]), 138 Date=integer_to_list(round(ts_utils:time2sec_hires(Sub))), 139 Data=ts_utils:join(";",[JobId,Name,Date]++R++[Status]), 140 ts_mon:dump({protocol, Owner, Data }); 141dump(_P,_Args) -> 142 ok. 143 144%% @spec parse(Data::client_data(), State) -> {NewState, Opts, Close} 145%% State = #state_rcv{} 146%% Opts = proplist() 147%% Close = bool() 148%% @doc 149%% Opts is a list of inet:setopts socket options. Don't change the 150%% active/passive mode here as tsung will set {active,once} before 151%% your options. 152%% Setting Close to true will cause tsung to close the connection to 153%% the server. 154%% @end 155parse({os, cmd, _Args, "Admission Rule ERROR" ++ Tail},State=#state_rcv{session=_S})-> 156 ?LOGF("ERROR, admission rule: ~p",[Tail],?WARN), 157 ts_mon_cache:add([{sum,error_job_admission_rule,1}]), 158 {State#state_rcv{ack_done=true,datasize=length(Tail)+21}, [], false}; 159parse({os, cmd, _Args, Res},State=#state_rcv{session=S,dump=Dump}) when is_list(Res)-> 160 ?LOGF("os:cmd result: ~p",[Res],?DEB), 161 %% oarsub output: 162 %% [ADMISSION RULE] Modify resource description with type constraints 163 %% Generate a job key... 164 %% OAR_JOB_ID=468822 165 Lines = string:tokens(Res,"\n"), 166 case lists:last(Lines) of 167 "OAR_JOB_ID="++ID -> 168 ?LOGF("OK,job id is ~p",[ID],?INFO), 169 ts_job_notify:monitor({ID,self(),S#job_session.submission_time, ?NOW,Dump}), 170 {State#state_rcv{ack_done=true,datasize=length(Res)}, [], false}; 171 _ -> 172 {State#state_rcv{ack_done=true,datasize=length(Res)}, [], false} 173 end; 174parse(nojobs,State) -> 175 ?LOGF(" no jobs in queue for ~p, stop waiting",[self()],?DEB), 176 {State#state_rcv{ack_done=true}, [], false}; 177parse({Mod, Fun, Args, Res},State) -> 178 ?LOGF(" result: ~p",[{Mod, Fun, Args, Res}],?DEB), 179 {State#state_rcv{ack_done=false}, [], false}. 180 181%% @spec parse_bidi(Data, State) -> {nodata, NewState} | {Data, NewState} 182%% Data = client_data() 183%% NewState = term() 184%% State = term() 185%% @doc Parse a block of data from the server. No reply will be sent 186%% if the return value is nodata, otherwise the Data binary will be 187%% sent back to the server immediately. 188%% @end 189parse_bidi(Data, State) -> 190 ts_plugin:parse_bidi(Data,State). 191 192%% @spec get_message(record(job),record(state_rcv)) -> {Message::term(),record(state_rcv)} 193%% @doc Creates a new message to send to the connected server. 194%% @end 195get_message(#job{type=oar,req=wait_jobs},#state_rcv{session=Session}) -> 196 ts_job_notify:wait_jobs(self()), 197 {{erlang, now,[], 0},Session}; % we could use any function call, the result is not used 198get_message(Job=#job{duration=D},State) when is_integer(D)-> 199 get_message(Job#job{duration=integer_to_list(D)},State); 200get_message(Job=#job{notify_port=P},State) when is_integer(P)-> 201 get_message(Job#job{notify_port=integer_to_list(P)},State); 202get_message(#job{type=oar,user=U,req=submit, name=N,script=S, resources=R, queue=Q, walltime=W,notify_port=P, notify_script=NS,duration=D,options=Opts},#state_rcv{session=Session}) -> 203 Submit = case U of 204 undefined -> "oarsub "; 205 User -> "sudo -u "++User++" oarsub " 206 end, 207 Queue = case Q of 208 "" -> ""; 209 _ -> "-q "++ Q 210 end, 211 Cmd=Submit++Queue++" -l "++R++ ",walltime="++W 212 ++" -n " ++N ++" " 213 ++ Opts ++ " " 214 ++" --notify \"exec:" ++NS++" "++P++"\" " 215 ++"\""++S++" "++D++"\"", 216 ?LOGF("Will run ~p",[Cmd],?INFO), 217 Message = {os, cmd, [Cmd], length(Cmd) }, 218 {Message, Session#job_session{submission_time=?NOW}}. 219