mv_jobflow.sas
Go to the documentation of this file.
1 /**
2  @file
3  @brief Execute a series of job flows
4  @details Very (very) simple flow manager. Jobs execute in sequential waves,
5  all previous waves must finish successfully.
6 
7  The input table is formed as per below. Each observation represents one job.
8  Each variable is converted into a macro variable with the same name.
9 
10  ## Input table (minimum variables needed)
11 
12  @li _PROGRAM - Provides the path to the job itself
13  @li FLOW_ID - Numeric value, provides sequential ordering capability. Is
14  optional, will default to 0 if not provided.
15  @li _CONTEXTNAME - Dictates which context should be used to run the job. If
16  blank, or not provided, will default to `SAS Job Execution compute context`.
17 
18  Any additional variables provided in this table are converted into macro
19  variables and passed into the relevant job.
20 
21  |_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
22  |---|---|---|
23  |/Public/jobs/somejob1|0|SAS Job Execution compute context|
24  |/Public/jobs/somejob2|0|SAS Job Execution compute context|
25 
26  ## Output table (minimum variables produced)
27 
28  @li _PROGRAM - the SAS Drive path of the job
29  @li URI - the URI of the executed job
30  @li STATE - the completed state of the job
31  @li TIMESTAMP - the datetime that the job completed
32  @li JOBPARAMS - the parameters that were passed to the job
33  @li FLOW_ID - the id of the flow in which the job was executed
34 
35  ![https://i.imgur.com/nZE9PvT.png](https://i.imgur.com/nZE9PvT.png)
36 
37  To avoid hammering the box with many hits in rapid succession, a one
38  second pause is made between every request.
39 
40 
41  ## Example
42 
43  First, compile the macros:
44 
45  filename mc url
46  "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
47  %inc mc;
48 
49  Next, create some jobs (in this case, as web services):
50 
51  filename ft15f001 temp;
52  parmcards4;
53  %put this is job: &_program;
54  %put this was run in flow &flow_id;
55  data ;
56  rand=ranuni(0)*&macrovar1;
57  do x=1 to rand;
58  y=rand*&macrovar2;
59  if y=100 then abort;
60  output;
61  end;
62  run;
63  ;;;;
64  %mv_createwebservice(path=/Public/temp,name=demo1)
65  %mv_createwebservice(path=/Public/temp,name=demo2)
66 
67  Prepare an input table with 60 executions:
68 
69  data work.inputjobs;
70  _contextName='SAS Job Execution compute context';
71  do flow_id=1 to 3;
72  do i=1 to 20;
73  _program='/Public/temp/demo1';
74  macrovar1=10*i;
75  macrovar2=4*i;
76  output;
77  i+1;
78  _program='/Public/temp/demo2';
79  macrovar1=40*i;
80  macrovar2=44*i;
81  output;
82  end;
83  end;
84  run;
85 
86  Trigger the flow
87 
88  %mv_jobflow(inds=work.inputjobs
89  ,maxconcurrency=4
90  ,outds=work.results
91  ,outref=myjoblog
92  )
93 
94  data _null_;
95  infile myjoblog;
96  input; put _infile_;
97  run;
98 
99 
100  @param [in] access_token_var= The global macro variable to contain the
101  access token
102  @param [in] grant_type= valid values:
103  @li password
104  @li authorization_code
105  @li detect - will check if access_token exists, if not will use
106  sas_services if a SASStudioV session else authorization_code. Default
107  option.
108  @li sas_services - will use oauth_bearer=sas_services
109  @param [in] inds= The input dataset containing a list of jobs and parameters
110  @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
111  @param [in] raise_err=0 Set to 1 to raise SYSCC when a job does not complete
112  succcessfully
113  @param [in] mdebug= set to 1 to enable DEBUG messages
114  @param [out] outds= The output dataset containing the results
115  @param [out] outref= The output fileref to which to append the log file(s).
116 
117  @version VIYA V.03.05
118  @author Allan Bowe, source: https://github.com/sasjs/core
119 
120  <h4> SAS Macros </h4>
121  @li mf_nobs.sas
122  @li mp_abort.sas
123  @li mf_getplatform.sas
124  @li mf_getuniquefileref.sas
125  @li mf_existvarlist.sas
126  @li mv_jobwaitfor.sas
127  @li mv_jobexecute.sas
128 
129 **/
130 
131 %macro mv_jobflow(inds=0,outds=work.mv_jobflow
132  ,maxconcurrency=8
133  ,access_token_var=ACCESS_TOKEN
134  ,grant_type=sas_services
135  ,outref=0
136  ,raise_err=0
137  ,mdebug=0
138  );
139 %local dbg;
140 %if &mdebug=1 %then %do;
141  %put &sysmacroname entry vars:;
142  %put _local_;
143  %put inds vars:;
144  data _null_;
145  set &inds;
146  putlog (_all_)(=);
147  run;
148 %end;
149 %else %let dbg=*;
150 
151 %local oauth_bearer;
152 %if &grant_type=detect %then %do;
153  %if %symexist(&access_token_var) %then %let grant_type=authorization_code;
154  %else %let grant_type=sas_services;
155 %end;
156 %if &grant_type=sas_services %then %do;
157  %let oauth_bearer=oauth_bearer=sas_services;
158  %let &access_token_var=;
159 %end;
160 
161 %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
162  and &grant_type ne sas_services
163  )
164  ,mac=&sysmacroname
165  ,msg=%str(Invalid value for grant_type: &grant_type)
166 )
167 
168 %mp_abort(iftrue=("&inds"="0")
169  ,mac=&sysmacroname
170  ,msg=%str(Input dataset was not provided)
171 )
172 %mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
173  ,mac=&sysmacroname
174  ,msg=%str(The _PROGRAM column must exist on input dataset &inds)
175 )
176 %mp_abort(iftrue=(&maxconcurrency<1)
177  ,mac=&sysmacroname
178  ,msg=%str(The maxconcurrency variable should be a positive integer)
179 )
180 
181 /* set defaults if not provided */
182 %if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
183  data &inds;
184  %if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
185  length _CONTEXTNAME $128;
186  retain _CONTEXTNAME "SAS Job Execution compute context";
187  %end;
188  %if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
189  retain FLOW_ID 0;
190  %end;
191  /* https://github.com/sasjs/adapter/pull/845#issuecomment-2956589644 */
192  retain _omitSessionResults "false";
193  set &inds;
194  &dbg. putlog (_all_)(=);
195  run;
196 %end;
197 
198 %local missings;
199 proc sql noprint;
200 select count(*) into: missings
201  from &inds
202  where flow_id is null or _program is null;
203 %mp_abort(iftrue=(&missings>0)
204  ,mac=&sysmacroname
205  ,msg=%str(input dataset has &missings missing values for FLOW_ID or _PROGRAM)
206 )
207 
208 %if %mf_nobs(&inds)=0 %then %do;
209  %put No observations in &inds! Leaving macro &sysmacroname;
210  %return;
211 %end;
212 
213 /* ensure output table is available */
214 data &outds;run;
215 proc sql;
216 drop table &outds;
217 
218 options noquotelenmax;
219 %local base_uri; /* location of rest apis */
220 %let base_uri=%mf_getplatform(VIYARESTAPI);
221 
222 
223 /* get flows */
224 proc sort data=&inds;
225  by flow_id;
226 run;
227 data _null_;
228  set &inds (keep=flow_id) end=last;
229  by flow_id;
230  if last.flow_id then do;
231  cnt+1;
232  call symputx(cats('flow',cnt),flow_id,'l');
233  end;
234  if last then call symputx('flowcnt',cnt,'l');
235 run;
236 
237 /* prepare temporary datasets and frefs */
238 %local fid jid jds jjson jdsapp jdsrunning jdswaitfor jfref;
239 data;run;%let jds=&syslast;
240 data;run;%let jjson=&syslast;
241 data;run;%let jdsapp=&syslast;
242 data;run;%let jdsrunning=&syslast;
243 data;run;%let jdswaitfor=&syslast;
244 %let jfref=%mf_getuniquefileref();
245 
246 /* start loop */
247 %do fid=1 %to &flowcnt;
248 
249  %if not ( &raise_err and &syscc ) %then %do;
250 
251  %put preparing job attributes for flow &&flow&fid;
252  %local jds jcnt;
253  data &jds(drop=_contextName _program);
254  set &inds(where=(flow_id=&&flow&fid));
255  if _contextName='' then _contextName="SAS Job Execution compute context";
256  call symputx(cats('job',_n_),_program,'l');
257  call symputx(cats('context',_n_),_contextName,'l');
258  call symputx('jcnt',_n_,'l');
259  &dbg. if _n_= 1 then putlog "Loop &fid";
260  &dbg. putlog (_all_)(=);
261  run;
262  %put exporting job variables in json format;
263  %do jid=1 %to &jcnt;
264  data &jjson;
265  set &jds;
266  if _n_=&jid then do;
267  output;
268  stop;
269  end;
270  run;
271  proc json out=&jfref;
272  export &jjson / nosastags fmtnumeric;
273  run;
274  data _null_;
275  infile &jfref lrecl=32767;
276  input;
277  jparams=cats('jparams',symget('jid'));
278  call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
279  run;
280  %local jobuid&jid;
281  %let jobuid&jid=0; /* used in next loop */
282  %end;
283  %local concurrency completed;
284  %let concurrency=0;
285  %let completed=0;
286  proc sql; drop table &jdsrunning;
287  %do jid=1 %to &jcnt;
288  /**
289  * now we can execute the jobs up to the maxconcurrency setting
290  */
291  %if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
292 
293  /* check to see if the job finished in the previous round */
294  %if %sysfunc(exist(&outds))=1 %then %do;
295  %local jobcheck; %let jobcheck=0;
296  proc sql noprint;
297  select count(*) into: jobcheck
298  from &outds where uuid="&&jobuid&jid";
299  %if &jobcheck>0 %then %do;
300  %put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
301  %let job&jid=0;
302  %end;
303  %end;
304 
305  /* check if job was triggered and, if
306  so, if we have enough slots to run? */
307  %if ("&&jobuid&jid"="0") and (&concurrency<&maxconcurrency) %then %do;
308 
309  /* But only start if no issues detected so far */
310  %if not ( &raise_err and &syscc ) %then %do;
311 
312  %local jobname jobpath;
313  %let jobname=%scan(&&job&jid,-1,/);
314  %let jobpath=
315  %substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
316 
317  %put executing &jobpath/&jobname with paramstring &&jparams&jid;
318  %mv_jobexecute(path=&jobpath
319  ,name=&jobname
320  ,paramstring=%superq(jparams&jid)
321  ,outds=&jdsapp
322  ,contextname=&&context&jid
323  )
324  data &jdsapp;
325  format jobparams $32767.;
326  set &jdsapp(where=(method='GET' and rel='state'));
327  jobparams=symget("jparams&jid");
328  /* uri here has the /state suffix */
329  uuid=scan(uri,-2,'/');
330  call symputx("jobuid&jid",uuid,'l');
331  run;
332  proc append base=&jdsrunning data=&jdsapp;
333  run;
334  %let concurrency=%eval(&concurrency+1);
335  /* sleep one second after every request to smooth the impact */
336  data _null_;
337  call sleep(1,1);
338  run;
339 
340  %end;
341  %else %do; /* Job was skipped due to problems */
342 
343  %put jobid &&job&jid in flow &fid skipped due to SYSCC (&syscc);
344  %let completed = %eval(&completed+1);
345  %let job&jid=0; /* Indicate job has finished */
346 
347  %end;
348 
349  %end;
350  %end;
351  %if &jid=&jcnt %then %do;
352  /* we are at the end of the loop - check which jobs have finished */
353  %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref
354  ,raise_err=&raise_err,mdebug=&mdebug)
355  %local done;
356  %let done=%mf_nobs(&jdswaitfor);
357  %if &done>0 %then %do;
358  %let completed=%eval(&completed+&done);
359  %let concurrency=%eval(&concurrency-&done);
360  data &jdsapp;
361  set &jdswaitfor;
362  flow_id=&&flow&fid;
363  uuid=scan(uri,-1,'/');
364  run;
365  proc append base=&outds data=&jdsapp;
366  run;
367  %end;
368  proc sql;
369  delete from &jdsrunning
370  where uuid in (select uuid from &outds
371  where state in ('canceled','completed','failed')
372  );
373 
374  /* loop again if jobs are left */
375  %if &completed < &jcnt %then %do;
376  %let jid=0;
377  %put looping flow &fid again;
378  %put &completed of &jcnt jobs completed, &concurrency jobs running;
379  %end;
380  %end;
381  %end;
382 
383  %end;
384  %else %do;
385 
386  %put Flow &&flow&fid skipped due to SYSCC (&syscc);
387 
388  %end;
389  /* back up and execute the next flow */
390 %end;
391 
392 %if &mdebug=1 %then %do;
393  %put &sysmacroname exit vars:;
394  %put _local_;
395 %end;
396 
397 %mend mv_jobflow;