mpe_loader.sas
Go to the documentation of this file.
1 /**
2  @file
3  @brief Loads CSV and sends to the staging area for approval
4  @details this macro used to capture multiple CSVs (eg from one excel file) in
5  a staging area and send them all to a landing area. For simplicity this
6  functionality is now deprecated, and each load should be made as a seperate
7  request. If there is a use case to load multiple tables at once, the client
8  should manage this and load them seperately.
9 
10  @param url= used for debugging (provided by stagedata stp)
11  @param dlm= use to provide alternative delimeters for CSVs (not just comma)
12  @param [in] termstr= (crlf) Always crlf from adapter, whereas loadfile service
13  figures it out
14 
15  <h4> SAS Macros </h4>
16  @li dc_assignlib.sas
17  @li mf_getattrn.sas
18  @li mf_getuser.sas
19  @li mf_mkdir.sas
20  @li mf_verifymacvars.sas
21  @li mp_abort.sas
22  @li mp_cntlout.sas
23  @li mp_dirlist.sas
24  @li mp_lockanytable.sas
25  @li mpe_accesscheck.sas
26  @li mpe_alerts.sas
27  @li mpe_xlmapvalidate.sas
28  @li mpe_loadfail.sas
29  @li mpe_runhook.sas
30 
31  @version 9.2
32  @author 4GL Apps Ltd
33  @copyright 4GL Apps Ltd. This code may only be used within Data Controller
34  and may not be re-distributed or re-sold without the express permission of
35  4GL Apps Ltd.
36 **/
37 
38 %macro mpe_loader(
39  mperef= /* name of subfolder containing the staged data */
40  ,mDebug=0 /* set to 1 for development or debugging */
41  ,submitted_reason_txt= /* populates column of same name in sumo_approvals*/
42  ,approver= /* allows a userid to be provided for direct approval email */
43  ,url= /* optional - url for debugging */
44  ,dlm=%str(,)
45  ,termstr=crlf
46  ,dc_dttmtfmt=E8601DT26.6
47  );
48 %put entered mpe_loader from &=_program;
49 %put &=url;
50 %put &=termstr;
51 %put &=dlm;
52  /* determine full path to CSV directory */
53 %local now;
54 %let now=&dc_dttmtfmt;
55 %put &=now;
56 
57 /**
58  * get full path to package (only subdirectory passed through)
59  */
60 %mp_abort(
61  iftrue=(%mf_verifymacvars(mperef mpelocapprovals)=0)
62  ,mac=bitemporal_dataloader
63  ,msg=%str(Missing: mperef mpelocapprovals)
64 )
65 
66 %let csv_dir=%trim(&mpelocapprovals/&mperef);
67 
68 /* exit if package has already been uploaded */
69 %local check;
70 proc sql noprint;
71 select count(*) into: check
72  from &mpelib..mpe_loads
73  where csv_dir="&mperef";
74 %if &check %then %do;
75  %mp_abort(msg=Folder &mperef already has an entry in &mpelib..mpe_loads
76  ,mac=mpe_loader.sas);
77  %return;
78 %end;
79 
80 /* get CSV directory contents */
81 %mp_dirlist(path=&csv_dir,outds=WORK.getfiles)
82 data WORK.csvs;
83  set WORK.getfiles;
84  if upcase(scan(filename,3,'.'))='CSV' then do;
85  lib=upcase(scan(filename,1,'.'));
86  ds=upcase(scan(filename,2,'.'));
87  output;
88  end;
89 run;
90 
91 /* get table attributes */
92 proc sql noprint;
93 create table WORK.sumo_tables as
94  select a.filename, b.*
95  from WORK.csvs a
96  left join &mpelib..mpe_tables b
97  on a.lib=b.libref
98  and a.ds=b.dsn
99  where b.tx_from le &now
100  and &now lt b.tx_to;
101 
102 /* define user as meta user if available */
103 %local user;
104 %let user=%mf_getuser();
105 
106 /* check if there is actually a table to load */
107 %if %mf_getattrn(WORK.sumo_tables,NLOBS)=0 %then %do;
108  %let msg=Table not registered in &mpelib..mpe_tables;
109  %mpe_loadfail(
110  status=&msg
111  ,now=&now
112  ,mperef=&mperef
113  ,dc_dttmtfmt=&dc_dttmtfmt.
114  )
115  %mp_abort(msg=&msg,mac=mpe_loader.sas);
116  %return;
117 %end;
118 
119 proc sql;
120 insert into &mpelib..mpe_loads
121  set USER_NM="&user"
122  ,STATUS='IN PROGRESS'
123  ,CSV_dir="&mperef"
124  ,PROCESSED_DTTM=&now
125  ,reason_txt = symget('submitted_reason_txt');
126 
127 
128 /* import CSV */
129 
130 %let droplist=;
131 %let attrib=;
132 %let droplist=;
133 %let libref=;
134 %let DS=;
135 
136 /* get table info */
137 data _null_;
138  set sumo_tables;
139  libds=upcase(cats(libref,'.',dsn));
140  call symputx('orig_libds',libds);
141  is_fmt=0;
142  if substr(cats(reverse(dsn)),1,3)=:'CF-' then do;
143  libds=scan(libds,1,'-');
144  putlog "Format Catalog Captured";
145  libds='work.fmtextract';
146  is_fmt=1;
147  end;
148  call symputx('is_fmt',is_fmt);
149  call symputx('libds',libds);
150  call symputx('FNAME',filename);
151  call symputx('LIBREF',libref);
152  call symputx('DS',dsn);
153  call symputx('LOADTYPE',loadtype);
154  call symputx('BUSKEY',buskey);
155  call symputx('VAR_TXFROM',var_txfrom);
156  call symputx('VAR_TXTO',var_txto);
157  call symputx('VAR_BUSFROM',var_busfrom);
158  call symputx('VAR_BUSTO',var_busto);
159  call symputx('VAR_PROCESSED',var_processed);
160  call symputx('RK_UNDERLYING',RK_UNDERLYING);
161  call symputx('POST_EDIT_HOOK',POST_EDIT_HOOK);
162  call symputx('NOTES',NOTES);
163  call symputx('PK',coalescec(RK_UNDERLYING,buskey));
164  call symputx('NUM_OF_APPROVALS_REQUIRED',NUM_OF_APPROVALS_REQUIRED,'l');
165  put (_all_)(=);
166  stop;
167 run;
168 
169 %if %length(&ds)=0 %then %do;
170  %let msg=%str(ERR)OR: Unable to extract record from &mpelib..mpe_tables;
171  %mpe_loadfail(
172  status=FAILED
173  ,now=&now
174  ,mperef=&mperef
175  ,reason_txt=%quote(&msg)
176  ,dc_dttmtfmt=&dc_dttmtfmt.
177  )
178  %mp_abort(msg=&msg,mac=mpe_loader.sas);
179  %return;
180 %end;
181 
182 /* export format catalog */
183 %mp_cntlout(
184  iftrue=(&is_fmt=1)
185  ,libcat=&orig_libds
186  ,fmtlist=0
187  ,cntlout=work.fmtextract
188 )
189 
190 /* user must have EDIT access to load a table */
191 %mpe_accesscheck(&orig_libds
192  ,outds=work.sumo_access
193  ,user=&user
194  ,access_level=EDIT )
195 %put exiting accesscheck;
196 
197 %if %mf_getattrn(work.sumo_access,NLOBS)=0 %then %do;
198  %let msg=%str(ERR)OR: User is not authorised to edit &orig_libds!;
199  %mpe_loadfail(
200  status=UNAUTHORISED
201  ,now=&now
202  ,mperef=&mperef
203  ,reason_txt=%quote(&msg)
204  ,dc_dttmtfmt=&dc_dttmtfmt.
205  )
206  %mp_abort(msg=&msg,mac=mpe_loader.sas);
207  %return;
208 %end;
209 
210 %put now importing: "&csv_dir/&fname" termstr=&termstr;
211 /* get the variables from the CSV */
212 data vars_csv1(index=(idxname=(varnum name)) drop=infile);
213  infile "&csv_dir/&fname" lrecl=32767 dsd termstr=&termstr encoding='utf-8';
214  input;
215  length infile $32767;
216  infile=compress(_infile_,'"',);
217  infile=compress(infile,"'",);
218  format name $32.;
219  putlog 'received vars: ' infile;
220  call symputx('received_vars',infile,'l');
221  do varnum=1 to countw(infile,"&dlm");
222  /* keep writeable chars */
223  name=compress(upcase(scan(infile,varnum)),,'kw');
224  if name ne "_____DELETE__THIS__RECORD_____" then output;
225  end;
226  stop;
227 run;
228 %put received_vars = &received_vars;
229 
230 %dc_assignlib(WRITE,&libref)
231 
232 /* get list of variables and their formats */
233 proc contents noprint data=&libds
234  out=vars(keep=name type length varnum format:);
235 run;
236 data vars(keep=name type length varnum format);
237  set vars(rename=(format=format2 type=type2));
238  name=upcase(name);
239  format2=upcase(format2);
240  /* not interested in transaction or processing dates
241  (append table must be supplied without them) */
242  if name not in ("&VAR_TXFROM","&VAR_TXTO","&VAR_PROCESSED"
243  ,"_____DELETE__THIS__RECORD_____");
244  if type2 in (2,6) then do;
245  length format $49.;
246  if format2='' then format=cats('$',length,'.');
247  else format=cats(format2,max(formatl,length),'.');
248  type='char';
249  end;
250  else do;
251  if format2='' then format=cats(length,'.');
252  else if format2=:'DATETIME' or format2=:'E8601DT' then do;
253  format='DATETIME19.';
254  end;
255  else if format2=:'DATE' or format2=:'DDMMYY'
256  or format2=:'MMDDYY' or format2=:'YYMMDD'
257  or format2=:'E8601DA' or format2=:'B8601DA'
258  then do;
259  format='DATE9.';
260  end;
261  else if format2='BEST' & formatl=0 then format=cats('BEST',length,'.');
262  /*
263  else if format2=:'DATETIME' or format2=:'DATE' or format2=:'DDMMYY'
264  or format2=:'MMDDYY' or format2=:'YYMMDD' then do;
265  *date or datetime format so use original ;
266  dsid=open("&libref..&ds");
267  vnum=varnum(dsid,name);
268  format=varfmt(dsid,vnum);
269  dsid=close(dsid);
270  end;
271  */
272  else do;
273  if formatl=0 then formatl=length;
274  format=cats(format2,formatl,'.',formatd);
275  end;
276  type='num';
277  end;
278  put (_all_)(=);
279 run;
280 
281 /* build attrib statement */
282 data vars_attrib;
283  length attrib_statement $32767 type2 $20;
284  set vars end=lastobs;
285  retain attrib_statement;
286 
287  if type='char' then type2='$';
288  str1=catx(' ',name,'length=',cats(type2,length));
289  attrib_statement=trim(attrib_statement)!!' '!!trim(str1);
290 
291  if lastobs then call symputx('ATTRIB',attrib_statement,'L');
292 run;
293 
294 /* build input statement - first get vars in right order
295  and join with target formats*/
296 proc sql noprint;
297 create table vars_csv2 as
298  select b.*
299  from vars_csv1 a
300  left join vars_attrib b
301  on a.name=b.name
302  order by a.varnum;
303 
304 
305 /* now build input statement */
306 data final_check;
307  set vars_csv2 end=lastobs;
308  length input_statement $32767 type2 $20 droplist $32767;
309  retain input_statement droplist;
310 
311  /* Build input statement - CATCH EXCEPTIONS HERE!*/
312  if name in ('QUOTE_DTTM') then do;
313  name=cats(name,'2');
314  droplist=catx(' ',trim(droplist),name);
315  type2='$20.';/* converted below */
316  end;
317  else if type='char' then type2=cats('$CHAR', length,'.');
318  else if format='DATE9.' then type2='ANYDTDTE.';
319  else if format='DATETIME19.' then type2='ANYDTDTM.';
320  else if format=:'TIME' then type2='ANYDTTME.';
321  else if name='' then do;/* additional vars in input data */
322  name='_____DELETE__THIS__VARIABLE_____';
323  droplist=catx(' ',trim(droplist),'_____DELETE__THIS__VARIABLE_____');
324  type2='$1.';
325  end;
326  else type2='best32.';
327  * else type2=cats(length,'.');
328 
329  input_statement=catx(' ',input_statement,name,':',type2);
330 
331  if lastobs then do;
332  call symputx('INPUT', input_statement,'L');
333  if trim(droplist) ne '' then
334  call symputx('droplist',"drop "!!droplist!!';','l');
335  end;
336 run;
337 
338 %let mpeloadstop=0;
339 
340 data work.STAGING_DS;
341  &droplist;
342  infile "&csv_dir/&fname" dsd dlm="&dlm" lrecl=32767
343  firstobs=2 missover termstr=&termstr encoding='utf-8';
344  attrib &attrib ;
345  if _n_=1 then call missing (of _all_);
346  missing a b c d e f g h i j k l m n o p q r s t u v w x y z _;
347  input
348  %if %scan(%quote(&received_vars),1)=_____DELETE__THIS__RECORD_____ %then %do;
349  _____DELETE__THIS__RECORD_____: $3.
350  %end;
351  &input;
352 
353  %if %index(%quote(&attrib.),UNLIKELY_VAR ) %then %do;
354  /*UNLIKELY_VAR=input(UNLIKELY_VAR2,ANYDTDTM21.);*/
355  /* SPECIAL LOGIC FOR SPECIAL VARS */
356  %end;
357 
358  if _error_ ne 0 then do;
359  putlog _infile_;
360  call symputx('mpeloadstop',_n_);
361  stop;
362  end;
363  /* remove all blank rows */
364  if compress(cats(of _all_),'.')=' ' then delete;
365 run;
366 
367 %if &mpeloadstop>0 %then %do;
368  %if %symexist(SYSPRINTTOLOG) %then %let logloc=&SYSPRINTTOLOG;
369  %else %let logloc=%qsysfunc(getoption(LOG));
370  %put redirecting log output to capture return message;
371  %put currentloc=&logloc;
372  filename tmp temp;
373  proc printto log=tmp;run;
374  data _null_;
375  &droplist;
376  infile "&csv_dir/&fname" dsd dlm="&dlm" lrecl=32767 firstobs=2
377  missover termstr=&termstr;
378  attrib &attrib ;
379  input
380  %if %scan(%quote(&received_vars),1)=_____DELETE__THIS__RECORD_____
381  %then %do;
382  _____DELETE__THIS__RECORD_____: $3.
383  %end;
384  &input;
385  if _error_ then stop;
386  run;
387  /* get log back */
388  proc printto log=&logloc;run;
389  data _null_; infile tmp; input; putlog _infile_;run;
390  /* scan log for invalid data warnings */
391  data _null_;
392  infile tmp;
393  input;
394  length msg1 msg2 msg3 msg4 msg5 msg url $32767;
395  if index(_infile_,'NOTE: Invalid data for') then do;
396  msg1=_infile_;
397  input;
398  msg2=_infile_;
399  input;
400  msg3=_infile_;
401  input;
402  msg4=_infile_;
403  input;
404  msg5=_infile_;
405  url=symget('url');
406  msg=catx('\n',msg1,msg2,msg3,msg4,msg5,'\n',url);
407  call symputx('msg',msg);
408  stop;
409  end;
410  run;
411 
412  %mpe_loadfail(
413  status=FAILED
414  ,now=&now
415  ,mperef=&mperef
416  ,reason_txt=%superq(msg)
417  ,dc_dttmtfmt=&dc_dttmtfmt.
418  )
419  %return;
420 %end;
421 
422 /* check that the table is unique on PK */
423 proc sort data=work.STAGING_DS dupout=work.MPE_DUPS (keep=&pk) nodupkey;
424  by &pk;
425 run;
426 %if %mf_getattrn(work.MPE_DUPS,NLOBS)>0 %then %do;
427  %local duplist;
428  data _null_;
429  set work.mpe_dups;
430  %do i=1 %to %sysfunc(countw(&pk));
431  %let iWord=%scan(&pk,&i);
432  call symputx('duplist',symget('duplist')!!
433  " &iWord="!!cats(&iWord));
434  %end;
435  run;
436  %let msg=This upload contains duplicates on the Primary Key columns %trim(
437  )(&pk) \n Please remove the duplicates and try again. %trim(
438  )\n &duplist \n ;
439  %mp_abort(msg=%superq(msg),mac=mpe_loader.sas);
440  %return;
441 %end;
442 
443 %if &syscc gt 4 %then %do;
444  %let msg=SYSCC=&syscc prior to post edit hook (%superq(syserrortext));
445  %mpe_loadfail(
446  status=FAILED - &syscc
447  ,now=&now
448  ,mperef=&mperef
449  ,reason_txt=%superq(msg)
450  ,dc_dttmtfmt=&dc_dttmtfmt.
451  )
452  %return;
453 %end;
454 
455 /* If a Complex Excel Upload, needs to have the load ref added to the table */
456 %mpe_xlmapvalidate(&mperef,work.staging_ds,&mpelib,&orig_libds)
457 
458 /* Run the Post Edit Hook prior to creation of staging folder */
459 %mpe_runhook(POST_EDIT_HOOK)
460 
461 /* stop if err */
462 %if &syscc gt 4 %then %do;
463  %let msg=ERR in post edit hook (&post_edit_hook);
464  %mpe_loadfail(
465  status=FAILED - &syscc
466  ,now=&now
467  ,mperef=&mperef
468  ,reason_txt=%quote(&msg)
469  ,dc_dttmtfmt=&dc_dttmtfmt.
470  )
471  %return;
472 %end;
473 
474 
475 /**
476  * send to approve process
477  */
478 
479 /* create a dataset key (datetime plus 3 digit random number plus PID) */
480 
481 /* send dataset to approvals subfolder with same name as subfolder */
482 libname approval "&mpelocapprovals/&mperef";
483 data approval.&mperef;
484  set work.staging_ds;
485 run;
486 proc export data=approval.&mperef
487  outfile="&mpelocapprovals/&mperef/&mperef..csv"
488  dbms=csv
489  replace;
490 run;
491 
492 /* update the control dataset with relevant info */
493 data append_app;
494  if 0 then set &mpelib..mpe_submit;/* get formats */
495  call missing (of _all_);
496  TABLE_ID="&mperef";
497  submit_status_cd='SUBMITTED';
498  submitted_by_nm="%mf_getuser()";
499  base_lib="&libref";
500  base_ds="&ds";
501  submitted_on_dttm=&now;
502  submitted_reason_txt=symget('submitted_reason_txt');
503  input_vars=%mf_getattrn(approval.&mperef,NVARS);
504  input_obs=%mf_getattrn(approval.&mperef,NLOBS);
505  num_of_approvals_required=&NUM_OF_APPROVALS_REQUIRED;
506  num_of_approvals_remaining=&NUM_OF_APPROVALS_REQUIRED;
507  reviewed_by_nm='';
508  reviewed_on_dttm=.;
509 run;
510 
511 %mp_lockanytable(LOCK,lib=&mpelib,ds=mpe_submit,
512  ref=%str(&mperef update in &_program),
513  ctl_ds=&mpelib..mpe_lockanytable
514 )
515 proc append base= &mpelib..mpe_submit data=append_app;
516 run;
517 %mp_lockanytable(UNLOCK,
518  lib=&mpelib,ds=mpe_submit,
519  ctl_ds=&mpelib..mpe_lockanytable
520 )
521 
522 /* send email to REVIEW members */
523 %put sending mpe_alerts;
524 %mpe_alerts(alert_event=SUBMITTED
525  , alert_lib=&libref
526  , alert_ds=&ds
527  , dsid=&mperef
528 )
529 /* DISABLE EMAIL FOR NOW
530  %let b2=REASON: %quote(&submitted_reason_txt);
531 
532  %local URLNOTES;
533  %if %length(&notes)>0 %then %let URLNOTES=%quote(%sysfunc(urlencode(&notes)));
534 
535  %let b3=%str(Click to review / approve: )%trim(
536  )%str(http://&_srvname:&_srvport&_url?_PROGRAM=/Web/approvals&)%trim(
537  )TABLEID=&dsid%str(&)BASETABLE=&libref..&ds%str(&)NOTES=&URLNOTES;
538 
539  %let b4=%str(Reference ID: &mperef);
540 */
541 
542 %put mpe_loader finishing up with syscc=&syscc;
543 %if &syscc le 4 %then %do;
544  %local dur;
545  data _null_;
546  now=symget('now');
547  dur=%sysfunc(datetime())-&now;
548  call symputx('dur',dur,'l');
549  putlog 'Updating mpe_loads with the following query:';
550  putlog "update &mpelib..mpe_loads set STATUS='SUCCESS'";
551  putlog " , duration=" dur;
552  putlog " , processed_dttm=" now;
553  putlog " , approvals = '&libref..&ds'";
554  putlog " where CSV_DIR='&mperef';";
555  run;
556  proc sql;
557  update &mpelib..mpe_loads set STATUS='SUCCESS'
558  , duration=&dur
559  , processed_dttm=&now
560  , approvals = "&libref..&ds"
561  where CSV_DIR="&mperef";
562 %end;
563 %else %do;
564  %mpe_loadfail(
565  status="FAILED - &syscc"
566  ,now=&now
567  ,approvals=&libref..&ds
568  ,mperef=&mperef
569  ,dc_dttmtfmt=&dc_dttmtfmt.
570  )
571  %return;
572 %end;
573 
574 %mend mpe_loader;