mpe_loader.sas
Go to the documentation of this file.
1 /**
2  @file
3  @brief Loads CSV and sends to the staging area for approval
4  @details this macro used to capture multiple CSVs (eg from one excel file) in
5  a staging area and send them all to a landing area. For simplicity this
6  functionality is now deprecated, and each load should be made as a seperate
7  request. If there is a use case to load multiple tables at once, the client
8  should manage this and load them seperately.
9 
10  @param url= used for debugging (provided by stagedata stp)
11  @param dlm= use to provide alternative delimeters for CSVs (not just comma)
12  @param [in] termstr= (crlf) Always crlf from adapter, whereas loadfile service
13  figures it out
14 
15  <h4> SAS Macros </h4>
16  @li dc_assignlib.sas
17  @li mf_getattrn.sas
18  @li mf_getuser.sas
19  @li mf_mkdir.sas
20  @li mf_verifymacvars.sas
21  @li mp_abort.sas
22  @li mp_cntlout.sas
23  @li mp_dirlist.sas
24  @li mp_lockanytable.sas
25  @li mpe_accesscheck.sas
26  @li mpe_alerts.sas
27  @li mpe_xlmapvalidate.sas
28  @li mpe_loadfail.sas
29  @li mpe_runhook.sas
30 
31  @version 9.2
32  @author 4GL Apps Ltd
33  @copyright 4GL Apps Ltd. This code may only be used within Data Controller
34  and may not be re-distributed or re-sold without the express permission of
35  4GL Apps Ltd.
36 **/
37 
38 %macro mpe_loader(
39  mperef= /* name of subfolder containing the staged data */
40  ,mDebug=0 /* set to 1 for development or debugging */
41  ,submitted_reason_txt= /* populates column of same name in sumo_approvals*/
42  ,approver= /* allows a userid to be provided for direct approval email */
43  ,url= /* optional - url for debugging */
44  ,dlm=%str(,)
45  ,termstr=crlf
46  ,dc_dttmtfmt=E8601DT26.6
47  );
48 %put entered mpe_loader from &=_program;
49 %put &=url;
50 %put &=termstr;
51 %put &=dlm;
52  /* determine full path to CSV directory */
53 %local now;
54 %let now=&dc_dttmtfmt;
55 %put &=now;
56 
57 /**
58  * get full path to package (only subdirectory passed through)
59  */
60 %mp_abort(
61  iftrue=(%mf_verifymacvars(mperef mpelocapprovals)=0)
62  ,mac=bitemporal_dataloader
63  ,msg=%str(Missing: mperef mpelocapprovals)
64 )
65 
66 %let csv_dir=%trim(&mpelocapprovals/&mperef);
67 
68 /* exit if package has already been uploaded */
69 %local check;
70 proc sql noprint;
71 select count(*) into: check
72  from &mpelib..mpe_loads
73  where csv_dir="&mperef";
74 %if &check %then %do;
75  %mp_abort(msg=Folder &mperef already has an entry in &mpelib..mpe_loads
76  ,mac=mpe_loader.sas);
77  %return;
78 %end;
79 
80 /* get CSV directory contents */
81 %mp_dirlist(path=&csv_dir,outds=WORK.getfiles)
82 data WORK.csvs;
83  set WORK.getfiles;
84  if upcase(scan(filename,3,'.'))='CSV' then do;
85  lib=upcase(scan(filename,1,'.'));
86  ds=upcase(scan(filename,2,'.'));
87  output;
88  end;
89 run;
90 
91 /* get table attributes */
92 proc sql noprint;
93 create table WORK.sumo_tables as
94  select a.filename, b.*
95  from WORK.csvs a
96  left join &mpelib..mpe_tables b
97  on a.lib=b.libref
98  and a.ds=b.dsn
99  where b.tx_from le &now
100  and &now lt b.tx_to;
101 
102 /* define user as meta user if available */
103 %local user;
104 %let user=%mf_getuser();
105 
106 /* check if there is actually a table to load */
107 %if %mf_getattrn(WORK.sumo_tables,NLOBS)=0 %then %do;
108  %let msg=Table not registered in &mpelib..mpe_tables;
109  %mpe_loadfail(
110  status=&msg
111  ,now=&now
112  ,mperef=&mperef
113  ,dc_dttmtfmt=&dc_dttmtfmt.
114  )
115  %mp_abort(msg=&msg,mac=mpe_loader.sas);
116  %return;
117 %end;
118 
119 proc sql;
120 insert into &mpelib..mpe_loads
121  set USER_NM="&user"
122  ,STATUS='IN PROGRESS'
123  ,CSV_dir="&mperef"
124  ,PROCESSED_DTTM=&now
125  ,reason_txt = symget('submitted_reason_txt');
126 
127 
128 /* import CSV */
129 
130 %let droplist=;
131 %let attrib=;
132 %let droplist=;
133 %let libref=;
134 %let DS=;
135 
136 /* get table info */
137 data _null_;
138  set sumo_tables;
139  libds=upcase(cats(libref,'.',dsn));
140  call symputx('orig_libds',libds);
141  is_fmt=0;
142  if substr(cats(reverse(dsn)),1,3)=:'CF-' then do;
143  libds=scan(libds,1,'-');
144  putlog "Format Catalog Captured";
145  libds='work.fmtextract';
146  is_fmt=1;
147  end;
148  call symputx('is_fmt',is_fmt);
149  call symputx('libds',libds);
150  call symputx('FNAME',filename);
151  call symputx('LIBREF',libref);
152  call symputx('DS',dsn);
153  call symputx('LOADTYPE',loadtype);
154  call symputx('BUSKEY',buskey);
155  call symputx('VAR_TXFROM',var_txfrom);
156  call symputx('VAR_TXTO',var_txto);
157  call symputx('VAR_BUSFROM',var_busfrom);
158  call symputx('VAR_BUSTO',var_busto);
159  call symputx('VAR_PROCESSED',var_processed);
160  call symputx('RK_UNDERLYING',RK_UNDERLYING);
161  call symputx('POST_EDIT_HOOK',POST_EDIT_HOOK);
162  call symputx('NOTES',NOTES);
163  call symputx('PK',coalescec(RK_UNDERLYING,buskey));
164  call symputx('NUM_OF_APPROVALS_REQUIRED',NUM_OF_APPROVALS_REQUIRED,'l');
165  put (_all_)(=);
166  stop;
167 run;
168 
169 %if %length(&ds)=0 %then %do;
170  %let msg=%str(ERR)OR: Unable to extract record from &mpelib..mpe_tables;
171  %mpe_loadfail(
172  status=FAILED
173  ,now=&now
174  ,mperef=&mperef
175  ,reason_txt=%quote(&msg)
176  ,dc_dttmtfmt=&dc_dttmtfmt.
177  )
178  %mp_abort(msg=&msg,mac=mpe_loader.sas);
179  %return;
180 %end;
181 
182 /* export format catalog */
183 %mp_cntlout(
184  iftrue=(&is_fmt=1)
185  ,libcat=&orig_libds
186  ,fmtlist=0
187  ,cntlout=work.fmtextract
188 )
189 
190 /* user must have EDIT access to load a table */
191 %mpe_accesscheck(&orig_libds
192  ,outds=work.sumo_access
193  ,user=&user
194  ,access_level=EDIT )
195 %put exiting accesscheck;
196 
197 %if %mf_getattrn(work.sumo_access,NLOBS)=0 %then %do;
198  %let msg=%str(ERR)OR: User is not authorised to edit &orig_libds!;
199  %mpe_loadfail(
200  status=UNAUTHORISED
201  ,now=&now
202  ,mperef=&mperef
203  ,reason_txt=%quote(&msg)
204  ,dc_dttmtfmt=&dc_dttmtfmt.
205  )
206  %mp_abort(msg=&msg,mac=mpe_loader.sas);
207  %return;
208 %end;
209 
210 %put now importing: "&csv_dir/&fname" termstr=&termstr;
211 /* get the variables from the CSV */
212 data vars_csv1(index=(idxname=(varnum name)) drop=infile);
213  infile "&csv_dir/&fname" lrecl=32767 dsd termstr=&termstr encoding='utf-8';
214  input;
215  length infile $32767;
216  infile=compress(_infile_,'"',);
217  infile=compress(infile,"'",);
218  format name $32.;
219  putlog 'received vars: ' infile;
220  call symputx('received_vars',infile,'l');
221  do varnum=1 to countw(infile,"&dlm");
222  /* keep writeable chars */
223  name=compress(upcase(scan(infile,varnum)),,'kw');
224  if name ne "_____DELETE__THIS__RECORD_____" then output;
225  end;
226  stop;
227 run;
228 %put received_vars = &received_vars;
229 
230 %dc_assignlib(WRITE,&libref)
231 
232 /* get list of variables and their formats */
233 proc contents noprint data=&libds
234  out=vars(keep=name type length varnum format:);
235 run;
236 data vars(keep=name type length varnum format);
237  set vars(rename=(format=format2 type=type2));
238  name=upcase(name);
239  format2=upcase(format2);
240  /* not interested in transaction or processing dates
241  (append table must be supplied without them) */
242  if name not in ("&VAR_TXFROM","&VAR_TXTO","&VAR_PROCESSED"
243  ,"_____DELETE__THIS__RECORD_____");
244  if type2 in (2,6) then do;
245  length format $49.;
246  if format2='' then format=cats('$',length,'.');
247  else format=cats(format2,max(formatl,length),'.');
248  type='char';
249  end;
250  else do;
251  if format2='' then format=cats(length,'.');
252  else if format2=:'DATETIME' or format2=:'E8601DT' or format=:'NLDATM'
253  then do;
254  format='DATETIME19.';
255  end;
256  else if format2=:'DATE' or format2=:'DDMMYY'
257  or format2=:'MMDDYY' or format2=:'YYMMDD'
258  or format2=:'E8601DA' or format2=:'B8601DA'
259  or format=:'NLDATE'
260  then do;
261  format='DATE9.';
262  end;
263  else if format2='BEST' & formatl=0 then format=cats('BEST',length,'.');
264  else do;
265  if formatl=0 then formatl=length;
266  format=cats(format2,formatl,'.',formatd);
267  end;
268  type='num';
269  end;
270  put (_all_)(=);
271 run;
272 
273 /* build attrib statement */
274 data vars_attrib;
275  length attrib_statement $32767 type2 $20;
276  set vars end=lastobs;
277  retain attrib_statement;
278 
279  if type='char' then type2='$';
280  str1=catx(' ',name,'length=',cats(type2,length));
281  attrib_statement=trim(attrib_statement)!!' '!!trim(str1);
282 
283  if lastobs then call symputx('ATTRIB',attrib_statement,'L');
284 run;
285 
286 /* build input statement - first get vars in right order
287  and join with target formats*/
288 proc sql noprint;
289 create table vars_csv2 as
290  select b.*
291  from vars_csv1 a
292  left join vars_attrib b
293  on a.name=b.name
294  order by a.varnum;
295 
296 
297 /* now build input statement */
298 data final_check;
299  set vars_csv2 end=lastobs;
300  length input_statement $32767 type2 $20 droplist $32767;
301  retain input_statement droplist;
302 
303  /* Build input statement - CATCH EXCEPTIONS HERE!*/
304  if name in ('QUOTE_DTTM') then do;
305  name=cats(name,'2');
306  droplist=catx(' ',trim(droplist),name);
307  type2='$20.';/* converted below */
308  end;
309  else if type='char' then type2=cats('$CHAR', length,'.');
310  else if format='DATE9.' then type2='ANYDTDTE.';
311  else if format='DATETIME19.' then type2='ANYDTDTM.';
312  else if format=:'TIME' then type2='ANYDTTME.';
313  else if name='' then do;/* additional vars in input data */
314  name='_____DELETE__THIS__VARIABLE_____';
315  droplist=catx(' ',trim(droplist),'_____DELETE__THIS__VARIABLE_____');
316  type2='$1.';
317  end;
318  else type2='best32.';
319  * else type2=cats(length,'.');
320 
321  input_statement=catx(' ',input_statement,name,':',type2);
322 
323  if lastobs then do;
324  call symputx('INPUT', input_statement,'L');
325  if trim(droplist) ne '' then
326  call symputx('droplist',"drop "!!droplist!!';','l');
327  end;
328 run;
329 
330 %let mpeloadstop=0;
331 
332 data work.STAGING_DS;
333  &droplist;
334  infile "&csv_dir/&fname" dsd dlm="&dlm" lrecl=32767
335  firstobs=2 missover termstr=&termstr encoding='utf-8';
336  attrib _____DELETE__THIS__RECORD_____ length=$3 &attrib ;
337  if _n_=1 then call missing (of _all_);
338  missing a b c d e f g h i j k l m n o p q r s t u v w x y z _;
339  input
340  %if %scan(%quote(&received_vars),1)=_____DELETE__THIS__RECORD_____ %then %do;
341  _____DELETE__THIS__RECORD_____: $3.
342  %end;
343  &input;
344 
345  %if %index(%quote(&attrib.),UNLIKELY_VAR ) %then %do;
346  /*UNLIKELY_VAR=input(UNLIKELY_VAR2,ANYDTDTM21.);*/
347  /* SPECIAL LOGIC FOR SPECIAL VARS */
348  %end;
349 
350  if _error_ ne 0 then do;
351  putlog _infile_;
352  call symputx('mpeloadstop',_n_);
353  stop;
354  end;
355  /* remove all blank rows */
356  if compress(cats(of _all_),'.')=' ' then delete;
357 run;
358 
359 %if &mpeloadstop>0 %then %do;
360  %if %symexist(SYSPRINTTOLOG) %then %let logloc=&SYSPRINTTOLOG;
361  %else %let logloc=%qsysfunc(getoption(LOG));
362  %put redirecting log output to capture return message;
363  %put currentloc=&logloc;
364  filename tmp temp;
365  proc printto log=tmp;run;
366  data _null_;
367  &droplist;
368  infile "&csv_dir/&fname" dsd dlm="&dlm" lrecl=32767 firstobs=2
369  missover termstr=&termstr;
370  attrib &attrib ;
371  input
372  %if %scan(%quote(&received_vars),1)=_____DELETE__THIS__RECORD_____
373  %then %do;
374  _____DELETE__THIS__RECORD_____: $3.
375  %end;
376  &input;
377  if _error_ then stop;
378  run;
379  /* get log back */
380  proc printto log=&logloc;run;
381  data _null_; infile tmp; input; putlog _infile_;run;
382  /* scan log for invalid data warnings */
383  data _null_;
384  infile tmp;
385  input;
386  length msg1 msg2 msg3 msg4 msg5 msg url $32767;
387  if index(_infile_,'NOTE: Invalid data for') then do;
388  msg1=_infile_;
389  input;
390  msg2=_infile_;
391  input;
392  msg3=_infile_;
393  input;
394  msg4=_infile_;
395  input;
396  msg5=_infile_;
397  url=symget('url');
398  msg=catx('\n',msg1,msg2,msg3,msg4,msg5,'\n',url);
399  call symputx('msg',msg);
400  stop;
401  end;
402  run;
403 
404  %mpe_loadfail(
405  status=FAILED
406  ,now=&now
407  ,mperef=&mperef
408  ,reason_txt=%superq(msg)
409  ,dc_dttmtfmt=&dc_dttmtfmt.
410  )
411  %return;
412 %end;
413 
414 /* check that the table is unique on PK */
415 proc sort data=work.STAGING_DS dupout=work.MPE_DUPS (keep=&pk) nodupkey;
416  by &pk;
417 run;
418 %if %mf_getattrn(work.MPE_DUPS,NLOBS)>0 %then %do;
419  %local duplist;
420  data _null_;
421  set work.mpe_dups;
422  %do i=1 %to %sysfunc(countw(&pk));
423  %let iWord=%scan(&pk,&i);
424  call symputx('duplist',symget('duplist')!!
425  " &iWord="!!cats(&iWord));
426  %end;
427  run;
428  %let msg=This upload contains duplicates on the Primary Key columns %trim(
429  )(&pk) \n Please remove the duplicates and try again. %trim(
430  )\n &duplist \n ;
431  %mp_abort(msg=%superq(msg),mac=mpe_loader.sas);
432  %return;
433 %end;
434 
435 %if &syscc gt 4 %then %do;
436  %let msg=SYSCC=&syscc prior to post edit hook (%superq(syserrortext));
437  %mpe_loadfail(
438  status=FAILED - &syscc
439  ,now=&now
440  ,mperef=&mperef
441  ,reason_txt=%superq(msg)
442  ,dc_dttmtfmt=&dc_dttmtfmt.
443  )
444  %return;
445 %end;
446 
447 /* If a Complex Excel Upload, needs to have the load ref added to the table */
448 %mpe_xlmapvalidate(&mperef,work.staging_ds,&mpelib,&orig_libds)
449 
450 /* Run the Post Edit Hook prior to creation of staging folder */
451 %mpe_runhook(POST_EDIT_HOOK)
452 
453 /* stop if err */
454 %if &syscc gt 4 %then %do;
455  %let msg=ERR in post edit hook (&post_edit_hook);
456  %mpe_loadfail(
457  status=FAILED - &syscc
458  ,now=&now
459  ,mperef=&mperef
460  ,reason_txt=%quote(&msg)
461  ,dc_dttmtfmt=&dc_dttmtfmt.
462  )
463  %return;
464 %end;
465 
466 
467 /**
468  * send to approve process
469  */
470 
471 /* create a dataset key (datetime plus 3 digit random number plus PID) */
472 
473 /* send dataset to approvals subfolder with same name as subfolder */
474 libname approval "&mpelocapprovals/&mperef";
475 data approval.&mperef;
476  set work.staging_ds;
477 run;
478 proc export data=approval.&mperef
479  outfile="&mpelocapprovals/&mperef/&mperef..csv"
480  dbms=csv
481  replace;
482 run;
483 
484 /* update the control dataset with relevant info */
485 data append_app;
486  if 0 then set &mpelib..mpe_submit;/* get formats */
487  call missing (of _all_);
488  TABLE_ID="&mperef";
489  submit_status_cd='SUBMITTED';
490  submitted_by_nm="%mf_getuser()";
491  base_lib="&libref";
492  base_ds="&ds";
493  submitted_on_dttm=&now;
494  submitted_reason_txt=symget('submitted_reason_txt');
495  input_vars=%mf_getattrn(approval.&mperef,NVARS);
496  input_obs=%mf_getattrn(approval.&mperef,NLOBS);
497  num_of_approvals_required=&NUM_OF_APPROVALS_REQUIRED;
498  num_of_approvals_remaining=&NUM_OF_APPROVALS_REQUIRED;
499  reviewed_by_nm='';
500  reviewed_on_dttm=.;
501 run;
502 
503 %mp_lockanytable(LOCK,lib=&mpelib,ds=mpe_submit,
504  ref=%str(&mperef update in &_program),
505  ctl_ds=&mpelib..mpe_lockanytable
506 )
507 proc append base= &mpelib..mpe_submit data=append_app;
508 run;
509 %mp_lockanytable(UNLOCK,
510  lib=&mpelib,ds=mpe_submit,
511  ctl_ds=&mpelib..mpe_lockanytable
512 )
513 
514 /* send email to REVIEW members */
515 %put sending mpe_alerts;
516 %mpe_alerts(alert_event=SUBMITTED
517  , alert_lib=&libref
518  , alert_ds=&ds
519  , dsid=&mperef
520 )
521 /* DISABLE EMAIL FOR NOW
522  %let b2=REASON: %quote(&submitted_reason_txt);
523 
524  %local URLNOTES;
525  %if %length(&notes)>0 %then %let URLNOTES=%quote(%sysfunc(urlencode(&notes)));
526 
527  %let b3=%str(Click to review / approve: )%trim(
528  )%str(http://&_srvname:&_srvport&_url?_PROGRAM=/Web/approvals&)%trim(
529  )TABLEID=&dsid%str(&)BASETABLE=&libref..&ds%str(&)NOTES=&URLNOTES;
530 
531  %let b4=%str(Reference ID: &mperef);
532 */
533 
534 %put mpe_loader finishing up with syscc=&syscc;
535 %if &syscc le 4 %then %do;
536  %local dur;
537  data _null_;
538  now=symget('now');
539  dur=%sysfunc(datetime())-&now;
540  call symputx('dur',dur,'l');
541  putlog 'Updating mpe_loads with the following query:';
542  putlog "update &mpelib..mpe_loads set STATUS='SUCCESS'";
543  putlog " , duration=" dur;
544  putlog " , processed_dttm=" now;
545  putlog " , approvals = '&libref..&ds'";
546  putlog " where CSV_DIR='&mperef';";
547  run;
548  proc sql;
549  update &mpelib..mpe_loads set STATUS='SUCCESS'
550  , duration=&dur
551  , processed_dttm=&now
552  , approvals = "&libref..&ds"
553  where CSV_DIR="&mperef";
554 %end;
555 %else %do;
556  %mpe_loadfail(
557  status="FAILED - &syscc"
558  ,now=&now
559  ,approvals=&libref..&ds
560  ,mperef=&mperef
561  ,dc_dttmtfmt=&dc_dttmtfmt.
562  )
563  %return;
564 %end;
565 
566 %mend mpe_loader;