mv_jobwaitfor.sas
Go to the documentation of this file.
1 /**
2  @file
3  @brief Takes a table of running jobs and waits for ANY/ALL of them to complete
4  @details Will poll `/jobs/{jobId}/state` at set intervals until ANY or ALL
5  jobs are completed. Completion is determined by reference to the returned
6  _state_, as per the following table:
7 
8  | state | Wait? | Notes|
9  |-----------|-------|------|
10  | idle | yes | We assume processing will continue. Beware of idle sessions with no code submitted! |
11  | pending | yes | Job is preparing to run |
12  | running | yes | Job is running|
13  | canceled | no | Job was cancelled|
14  | completed | no | Job finished - does not mean it was successful. Check stateDetails|
15  | failed | no | Job failed to execute, could be a problem when calling the apis|
16 
17 
18  ## Example
19 
20  First, compile the macros:
21 
22  filename mc url
23  "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
24  %inc mc;
25 
26  Next, create a job (in this case, as a web service):
27 
28  filename ft15f001 temp;
29  parmcards4;
30  data ;
31  rand=ranuni(0)*1000000;
32  do x=1 to rand;
33  y=rand*x;
34  output;
35  end;
36  run;
37  ;;;;
38  %mv_createwebservice(path=/Public/temp,name=demo)
39 
40  Then, execute the job,multiple times, and wait for them all to finish:
41 
42  %mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds1)
43  %mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds2)
44  %mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds3)
45  %mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds4)
46 
47  data work.jobs;
48  set work.ds1 work.ds2 work.ds3 work.ds4;
49  where method='GET' and rel='state';
50  run;
51 
52  %mv_jobwaitfor(ALL,inds=work.jobs,outds=work.jobstates)
53 
54  Delete the job:
55 
56  %mv_deletejes(path=/Public/temp,name=demo)
57 
58  @param [in] access_token_var= The global macro variable to contain the access
59  token
60  @param [in] grant_type= valid values:
61 
62  - password
63  - authorization_code
64  - detect - will check if access_token exists, if not will use sas_services
65  if a SASStudioV session else authorization_code. Default option.
66  - sas_services - will use oauth_bearer=sas_services
67 
68  @param [in] action=Either ALL (to wait for every job) or ANY (if one job
69  completes, processing will continue). Default=ALL.
70  @param [in] inds= The input dataset containing the list of job uris, in the
71  following format: `/jobExecution/jobs/&JOBID./state` and the corresponding
72  job name. The uri should be in a `uri` variable, and the job path/name
73  should be in a `_program` variable.
74  @param [in] raise_err=0 Set to 1 to raise SYSCC when a job does not complete
75  succcessfully
76  @param [in] mdebug= set to 1 to enable DEBUG messages
77  @param [out] outds= The output dataset containing the list of states by job
78  (default=work.mv_jobexecute)
79  @param [out] outref= A fileref to which the spawned job logs should be
80  appended.
81 
82  @version VIYA V.03.04
83  @author Allan Bowe, source: https://github.com/sasjs/core
84 
85  <h4> Dependencies </h4>
86  @li mp_abort.sas
87  @li mf_getplatform.sas
88  @li mf_getuniquefileref.sas
89  @li mf_getuniquelibref.sas
90  @li mf_existvar.sas
91  @li mf_nobs.sas
92  @li mv_getjoblog.sas
93 
94 **/
95 
96 %macro mv_jobwaitfor(action
97  ,access_token_var=ACCESS_TOKEN
98  ,grant_type=sas_services
99  ,inds=0
100  ,outds=work.mv_jobwaitfor
101  ,outref=0
102  ,raise_err=0
103  ,mdebug=0
104  );
105 %local dbg;
106 %if &mdebug=1 %then %do;
107  %put &sysmacroname entry vars:;
108  %put _local_;
109 %end;
110 %else %let dbg=*;
111 
112 %local oauth_bearer;
113 %if &grant_type=detect %then %do;
114  %if %symexist(&access_token_var) %then %let grant_type=authorization_code;
115  %else %let grant_type=sas_services;
116 %end;
117 %if &grant_type=sas_services %then %do;
118  %let oauth_bearer=oauth_bearer=sas_services;
119  %let &access_token_var=;
120 %end;
121 
122 %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
123  and &grant_type ne sas_services
124  )
125  ,mac=&sysmacroname
126  ,msg=%str(Invalid value for grant_type: &grant_type)
127 )
128 
129 %mp_abort(iftrue=("&inds"="0")
130  ,mac=&sysmacroname
131  ,msg=%str(input dataset not provided)
132 )
133 %mp_abort(iftrue=(%mf_existvar(&inds,uri)=0)
134  ,mac=&sysmacroname
135  ,msg=%str(The URI variable was not found in the input dataset(&inds))
136 )
137 %mp_abort(iftrue=(%mf_existvar(&inds,_program)=0)
138  ,mac=&sysmacroname
139  ,msg=%str(The _PROGRAM variable was not found in the input dataset(&inds))
140 )
141 
142 %if %mf_nobs(&inds)=0 %then %do;
143  %put NOTE: Zero observations in &inds, &sysmacroname will now exit;
144  %return;
145 %end;
146 
147 options noquotelenmax;
148 %local base_uri; /* location of rest apis */
149 %let base_uri=%mf_getplatform(VIYARESTAPI);
150 
151 data _null_;
152  length jobparams $32767;
153  set &inds end=last;
154  call symputx(cats('joburi',_n_),substr(uri,1,55),'l');
155  call symputx(cats('jobname',_n_),_program,'l');
156  call symputx(cats('jobparams',_n_),jobparams,'l');
157  if last then call symputx('uricnt',_n_,'l');
158 run;
159 
160 %local runcnt;
161 %if &action=ALL %then %let runcnt=&uricnt;
162 %else %if &action=ANY %then %let runcnt=1;
163 %else %let runcnt=&uricnt;
164 
165 %local fname0 ;
166 %let fname0=%mf_getuniquefileref();
167 
168 data &outds;
169  format _program uri $128. state $32. stateDetails $32. timestamp datetime19.
170  jobparams $32767.;
171  call missing (of _all_);
172  stop;
173 run;
174 
175 %local i;
176 %do i=1 %to &uricnt;
177  %if "&&joburi&i" ne "0" %then %do;
178  proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&&joburi&i";
179  headers "Accept"="application/json"
180  %if &grant_type=authorization_code %then %do;
181  "Authorization"="Bearer &&&access_token_var"
182  %end; ;
183  run;
184  %if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201
185  %then %do;
186  data _null_;infile &fname0;input;putlog _infile_;run;
187  %mp_abort(mac=&sysmacroname
188  ,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
189  )
190  %end;
191 
192  %let status=notset;
193 
194  %local libref1;
195  %let libref1=%mf_getuniquelibref();
196  libname &libref1 json fileref=&fname0;
197 
198  data _null_;
199  length state stateDetails $32;
200  set &libref1..root;
201  call symputx('status',state,'l');
202  call symputx('stateDetails',stateDetails,'l');
203  run;
204 
205  libname &libref1 clear;
206 
207  %if &status=completed or &status=failed or &status=canceled %then %do;
208  %local plainuri;
209  %let plainuri=%substr(&&joburi&i,1,55);
210  proc sql;
211  insert into &outds set
212  _program="&&jobname&i",
213  uri="&plainuri",
214  state="&status",
215  stateDetails=symget("stateDetails"),
216  timestamp=datetime(),
217  jobparams=symget("jobparams&i");
218  %let joburi&i=0; /* do not re-check */
219  /* fetch log */
220  %if %str(&outref) ne 0 %then %do;
221  %mv_getjoblog(uri=&plainuri,outref=&outref,mdebug=&mdebug)
222  %end;
223  %end;
224  %else %if &status=idle or &status=pending or &status=running %then %do;
225  data _null_;
226  call sleep(1,1);
227  run;
228  %end;
229  %else %do;
230  %mp_abort(mac=&sysmacroname
231  ,msg=%str(status &status not expected!!)
232  )
233  %end;
234 
235  %if (&raise_err) %then %do;
236  %if (&status = canceled or &status = failed or %length(&stateDetails)>0)
237  %then %do;
238  %if ("&stateDetails" = "%str(war)ning") %then %let SYSCC=4;
239  %else %let SYSCC=5;
240  %put %str(ERR)OR: Job &&jobname&i. did not complete. &stateDetails;
241  %return;
242  %end;
243  %end;
244 
245  %end;
246  %if &i=&uricnt %then %do;
247  %local goback;
248  %let goback=0;
249  proc sql noprint;
250  select count(*) into:goback from &outds;
251  %if &goback lt &runcnt %then %let i=0;
252  %end;
253 %end;
254 
255 %if &mdebug=1 %then %do;
256  %put &sysmacroname exit vars:;
257  %put _local_;
258 %end;
259 %else %do;
260  /* clear refs */
261  filename &fname0 clear;
262 %end;
263 %mend mv_jobwaitfor;