mv_jobflow.sas
Go to the documentation of this file.
1 /**
2  @file
3  @brief Execute a series of job flows
4  @details Very (very) simple flow manager. Jobs execute in sequential waves,
5  all previous waves must finish successfully.
6 
7  The input table is formed as per below. Each observation represents one job.
8  Each variable is converted into a macro variable with the same name.
9 
10  ## Input table (minimum variables needed)
11 
12  @li _PROGRAM - Provides the path to the job itself
13  @li FLOW_ID - Numeric value, provides sequential ordering capability. Is
14  optional, will default to 0 if not provided.
15  @li _CONTEXTNAME - Dictates which context should be used to run the job. If
16  blank, or not provided, will default to `SAS Job Execution compute context`.
17 
18  Any additional variables provided in this table are converted into macro
19  variables and passed into the relevant job.
20 
21  |_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
22  |---|---|---|
23  |/Public/jobs/somejob1|0|SAS Job Execution compute context|
24  |/Public/jobs/somejob2|0|SAS Job Execution compute context|
25 
26  ## Output table (minimum variables produced)
27 
28  @li _PROGRAM - the SAS Drive path of the job
29  @li URI - the URI of the executed job
30  @li STATE - the completed state of the job
31  @li TIMESTAMP - the datetime that the job completed
32  @li JOBPARAMS - the parameters that were passed to the job
33  @li FLOW_ID - the id of the flow in which the job was executed
34 
35  ![https://i.imgur.com/nZE9PvT.png](https://i.imgur.com/nZE9PvT.png)
36 
37  To avoid hammering the box with many hits in rapid succession, a one
38  second pause is made between every request.
39 
40 
41  ## Example
42 
43  First, compile the macros:
44 
45  filename mc url
46  "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
47  %inc mc;
48 
49  Next, create some jobs (in this case, as web services):
50 
51  filename ft15f001 temp;
52  parmcards4;
53  %put this is job: &_program;
54  %put this was run in flow &flow_id;
55  data ;
56  rand=ranuni(0)*&macrovar1;
57  do x=1 to rand;
58  y=rand*&macrovar2;
59  if y=100 then abort;
60  output;
61  end;
62  run;
63  ;;;;
64  %mv_createwebservice(path=/Public/temp,name=demo1)
65  %mv_createwebservice(path=/Public/temp,name=demo2)
66 
67  Prepare an input table with 60 executions:
68 
69  data work.inputjobs;
70  _contextName='SAS Job Execution compute context';
71  do flow_id=1 to 3;
72  do i=1 to 20;
73  _program='/Public/temp/demo1';
74  macrovar1=10*i;
75  macrovar2=4*i;
76  output;
77  i+1;
78  _program='/Public/temp/demo2';
79  macrovar1=40*i;
80  macrovar2=44*i;
81  output;
82  end;
83  end;
84  run;
85 
86  Trigger the flow
87 
88  %mv_jobflow(inds=work.inputjobs
89  ,maxconcurrency=4
90  ,outds=work.results
91  ,outref=myjoblog
92  )
93 
94  data _null_;
95  infile myjoblog;
96  input; put _infile_;
97  run;
98 
99 
100  @param [in] access_token_var= The global macro variable to contain the
101  access token
102  @param [in] grant_type= valid values:
103  @li password
104  @li authorization_code
105  @li detect - will check if access_token exists, if not will use
106  sas_services if a SASStudioV session else authorization_code. Default
107  option.
108  @li sas_services - will use oauth_bearer=sas_services
109  @param [in] inds= The input dataset containing a list of jobs and parameters
110  @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
111  @param [in] raise_err=0 Set to 1 to raise SYSCC when a job does not complete
112  succcessfully
113  @param [in] mdebug= set to 1 to enable DEBUG messages
114  @param [out] outds= The output dataset containing the results
115  @param [out] outref= The output fileref to which to append the log file(s).
116 
117  @version VIYA V.03.05
118  @author Allan Bowe, source: https://github.com/sasjs/core
119 
120  <h4> SAS Macros </h4>
121  @li mf_nobs.sas
122  @li mp_abort.sas
123  @li mf_getplatform.sas
124  @li mf_getuniquefileref.sas
125  @li mf_existvarlist.sas
126  @li mv_jobwaitfor.sas
127  @li mv_jobexecute.sas
128 
129 **/
130 
131 %macro mv_jobflow(inds=0,outds=work.mv_jobflow
132  ,maxconcurrency=8
133  ,access_token_var=ACCESS_TOKEN
134  ,grant_type=sas_services
135  ,outref=0
136  ,raise_err=0
137  ,mdebug=0
138  );
139 %local dbg;
140 %if &mdebug=1 %then %do;
141  %put &sysmacroname entry vars:;
142  %put _local_;
143  %put inds vars:;
144  data _null_;
145  set &inds;
146  putlog (_all_)(=);
147  run;
148 %end;
149 %else %let dbg=*;
150 
151 %local oauth_bearer;
152 %if &grant_type=detect %then %do;
153  %if %symexist(&access_token_var) %then %let grant_type=authorization_code;
154  %else %let grant_type=sas_services;
155 %end;
156 %if &grant_type=sas_services %then %do;
157  %let oauth_bearer=oauth_bearer=sas_services;
158  %let &access_token_var=;
159 %end;
160 
161 %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
162  and &grant_type ne sas_services
163  )
164  ,mac=&sysmacroname
165  ,msg=%str(Invalid value for grant_type: &grant_type)
166 )
167 
168 %mp_abort(iftrue=("&inds"="0")
169  ,mac=&sysmacroname
170  ,msg=%str(Input dataset was not provided)
171 )
172 %mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
173  ,mac=&sysmacroname
174  ,msg=%str(The _PROGRAM column must exist on input dataset &inds)
175 )
176 %mp_abort(iftrue=(&maxconcurrency<1)
177  ,mac=&sysmacroname
178  ,msg=%str(The maxconcurrency variable should be a positive integer)
179 )
180 
181 /* set defaults if not provided */
182 %if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
183  data &inds;
184  %if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
185  length _CONTEXTNAME $128;
186  retain _CONTEXTNAME "SAS Job Execution compute context";
187  %end;
188  %if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
189  retain FLOW_ID 0;
190  %end;
191  set &inds;
192  &dbg. putlog (_all_)(=);
193  run;
194 %end;
195 
196 %local missings;
197 proc sql noprint;
198 select count(*) into: missings
199  from &inds
200  where flow_id is null or _program is null;
201 %mp_abort(iftrue=(&missings>0)
202  ,mac=&sysmacroname
203  ,msg=%str(input dataset has &missings missing values for FLOW_ID or _PROGRAM)
204 )
205 
206 %if %mf_nobs(&inds)=0 %then %do;
207  %put No observations in &inds! Leaving macro &sysmacroname;
208  %return;
209 %end;
210 
211 /* ensure output table is available */
212 data &outds;run;
213 proc sql;
214 drop table &outds;
215 
216 options noquotelenmax;
217 %local base_uri; /* location of rest apis */
218 %let base_uri=%mf_getplatform(VIYARESTAPI);
219 
220 
221 /* get flows */
222 proc sort data=&inds;
223  by flow_id;
224 run;
225 data _null_;
226  set &inds (keep=flow_id) end=last;
227  by flow_id;
228  if last.flow_id then do;
229  cnt+1;
230  call symputx(cats('flow',cnt),flow_id,'l');
231  end;
232  if last then call symputx('flowcnt',cnt,'l');
233 run;
234 
235 /* prepare temporary datasets and frefs */
236 %local fid jid jds jjson jdsapp jdsrunning jdswaitfor jfref;
237 data;run;%let jds=&syslast;
238 data;run;%let jjson=&syslast;
239 data;run;%let jdsapp=&syslast;
240 data;run;%let jdsrunning=&syslast;
241 data;run;%let jdswaitfor=&syslast;
242 %let jfref=%mf_getuniquefileref();
243 
244 /* start loop */
245 %do fid=1 %to &flowcnt;
246 
247  %if not ( &raise_err and &syscc ) %then %do;
248 
249  %put preparing job attributes for flow &&flow&fid;
250  %local jds jcnt;
251  data &jds(drop=_contextName _program);
252  set &inds(where=(flow_id=&&flow&fid));
253  if _contextName='' then _contextName="SAS Job Execution compute context";
254  call symputx(cats('job',_n_),_program,'l');
255  call symputx(cats('context',_n_),_contextName,'l');
256  call symputx('jcnt',_n_,'l');
257  &dbg. if _n_= 1 then putlog "Loop &fid";
258  &dbg. putlog (_all_)(=);
259  run;
260  %put exporting job variables in json format;
261  %do jid=1 %to &jcnt;
262  data &jjson;
263  set &jds;
264  if _n_=&jid then do;
265  output;
266  stop;
267  end;
268  run;
269  proc json out=&jfref;
270  export &jjson / nosastags fmtnumeric;
271  run;
272  data _null_;
273  infile &jfref lrecl=32767;
274  input;
275  jparams=cats('jparams',symget('jid'));
276  call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
277  run;
278  %local jobuid&jid;
279  %let jobuid&jid=0; /* used in next loop */
280  %end;
281  %local concurrency completed;
282  %let concurrency=0;
283  %let completed=0;
284  proc sql; drop table &jdsrunning;
285  %do jid=1 %to &jcnt;
286  /**
287  * now we can execute the jobs up to the maxconcurrency setting
288  */
289  %if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
290 
291  /* check to see if the job finished in the previous round */
292  %if %sysfunc(exist(&outds))=1 %then %do;
293  %local jobcheck; %let jobcheck=0;
294  proc sql noprint;
295  select count(*) into: jobcheck
296  from &outds where uuid="&&jobuid&jid";
297  %if &jobcheck>0 %then %do;
298  %put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
299  %let job&jid=0;
300  %end;
301  %end;
302 
303  /* check if job was triggered and, if
304  so, if we have enough slots to run? */
305  %if ("&&jobuid&jid"="0") and (&concurrency<&maxconcurrency) %then %do;
306 
307  /* But only start if no issues detected so far */
308  %if not ( &raise_err and &syscc ) %then %do;
309 
310  %local jobname jobpath;
311  %let jobname=%scan(&&job&jid,-1,/);
312  %let jobpath=
313  %substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
314 
315  %put executing &jobpath/&jobname with paramstring &&jparams&jid;
316  %mv_jobexecute(path=&jobpath
317  ,name=&jobname
318  ,paramstring=%superq(jparams&jid)
319  ,outds=&jdsapp
320  ,contextname=&&context&jid
321  )
322  data &jdsapp;
323  format jobparams $32767.;
324  set &jdsapp(where=(method='GET' and rel='state'));
325  jobparams=symget("jparams&jid");
326  /* uri here has the /state suffix */
327  uuid=scan(uri,-2,'/');
328  call symputx("jobuid&jid",uuid,'l');
329  run;
330  proc append base=&jdsrunning data=&jdsapp;
331  run;
332  %let concurrency=%eval(&concurrency+1);
333  /* sleep one second after every request to smooth the impact */
334  data _null_;
335  call sleep(1,1);
336  run;
337 
338  %end;
339  %else %do; /* Job was skipped due to problems */
340 
341  %put jobid &&job&jid in flow &fid skipped due to SYSCC (&syscc);
342  %let completed = %eval(&completed+1);
343  %let job&jid=0; /* Indicate job has finished */
344 
345  %end;
346 
347  %end;
348  %end;
349  %if &jid=&jcnt %then %do;
350  /* we are at the end of the loop - check which jobs have finished */
351  %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref
352  ,raise_err=&raise_err,mdebug=&mdebug)
353  %local done;
354  %let done=%mf_nobs(&jdswaitfor);
355  %if &done>0 %then %do;
356  %let completed=%eval(&completed+&done);
357  %let concurrency=%eval(&concurrency-&done);
358  data &jdsapp;
359  set &jdswaitfor;
360  flow_id=&&flow&fid;
361  uuid=scan(uri,-1,'/');
362  run;
363  proc append base=&outds data=&jdsapp;
364  run;
365  %end;
366  proc sql;
367  delete from &jdsrunning
368  where uuid in (select uuid from &outds
369  where state in ('canceled','completed','failed')
370  );
371 
372  /* loop again if jobs are left */
373  %if &completed < &jcnt %then %do;
374  %let jid=0;
375  %put looping flow &fid again;
376  %put &completed of &jcnt jobs completed, &concurrency jobs running;
377  %end;
378  %end;
379  %end;
380 
381  %end;
382  %else %do;
383 
384  %put Flow &&flow&fid skipped due to SYSCC (&syscc);
385 
386  %end;
387  /* back up and execute the next flow */
388 %end;
389 
390 %if &mdebug=1 %then %do;
391  %put &sysmacroname exit vars:;
392  %put _local_;
393 %end;
394 
395 %mend mv_jobflow;