Loading...
Searching...
No Matches
mpe_loader.sas
Go to the documentation of this file.
1/**
2 @file
3 @brief Loads CSV and sends to the staging area for approval
4 @details this macro used to capture multiple CSVs (eg from one excel file) in
5 a staging area and send them all to a landing area. For simplicity this
6 functionality is now deprecated, and each load should be made as a seperate
7 request. If there is a use case to load multiple tables at once, the client
8 should manage this and load them seperately.
9
10 @param url= used for debugging (provided by stagedata stp)
11 @param dlm= use to provide alternative delimeters for CSVs (not just comma)
12 @param [in] termstr= (crlf) Always crlf from adapter, whereas loadfile service
13 figures it out
14
15 <h4> SAS Macros </h4>
16 @li dc_assignlib.sas
17 @li mf_getattrn.sas
18 @li mf_getuser.sas
19 @li mf_mkdir.sas
20 @li mf_verifymacvars.sas
21 @li mp_abort.sas
22 @li mp_cntlout.sas
23 @li mp_dirlist.sas
24 @li mp_lockanytable.sas
25 @li mpe_accesscheck.sas
26 @li mpe_alerts.sas
27 @li mpe_xlmapvalidate.sas
28 @li mpe_loadfail.sas
29 @li mpe_runhook.sas
30
31 @version 9.2
32 @author 4GL Apps Ltd
33 @copyright 4GL Apps Ltd. This code may only be used within Data Controller
34 and may not be re-distributed or re-sold without the express permission of
35 4GL Apps Ltd.
36**/
37
38%macro mpe_loader(
39 mperef= /* name of subfolder containing the staged data */
40 ,mDebug=0 /* set to 1 for development or debugging */
41 ,submitted_reason_txt= /* populates column of same name in sumo_approvals*/
42 ,approver= /* allows a userid to be provided for direct approval email */
43 ,url= /* optional - url for debugging */
44 ,dlm=%str(,)
45 ,termstr=crlf
46 ,dc_dttmtfmt=E8601DT26.6
47 );
48%put entered mpe_loader from &=_program;
49%put &=url;
50%put &=termstr;
51%put &=dlm;
52 /* determine full path to CSV directory */
53%local now;
54%let now=&dc_dttmtfmt;
55%put &=now;
56
57/**
58 * get full path to package (only subdirectory passed through)
59 */
60%mp_abort(
61 iftrue=(%mf_verifymacvars(mperef mpelocapprovals)=0)
62 ,mac=bitemporal_dataloader
63 ,msg=%str(Missing: mperef mpelocapprovals)
64)
65
66%let csv_dir=%trim(&mpelocapprovals/&mperef);
67
68/* exit if package has already been uploaded */
69%local check;
70proc sql noprint;
71select count(*) into: check
72 from &mpelib..mpe_loads
73 where csv_dir="&mperef";
74%if &check %then %do;
75 %mp_abort(msg=Folder &mperef already has an entry in &mpelib..mpe_loads
76 ,mac=mpe_loader.sas);
77 %return;
78%end;
79
80/* get CSV directory contents */
81%mp_dirlist(path=&csv_dir,outds=WORK.getfiles)
82data WORK.csvs;
83 set WORK.getfiles;
84 if upcase(scan(filename,3,'.'))='CSV' then do;
85 lib=upcase(scan(filename,1,'.'));
86 ds=upcase(scan(filename,2,'.'));
87 output;
88 end;
89run;
90
91/* get table attributes */
92proc sql noprint;
93create table WORK.sumo_tables as
94 select a.filename, b.*
95 from WORK.csvs a
96 left join &mpelib..mpe_tables b
97 on a.lib=b.libref
98 and a.ds=b.dsn
99 where b.tx_from le &now
100 and &now lt b.tx_to;
101
102/* define user as meta user if available */
103%local user;
104%let user=%mf_getuser();
105
106/* check if there is actually a table to load */
107%if %mf_getattrn(WORK.sumo_tables,NLOBS)=0 %then %do;
108 %let msg=Table not registered in &mpelib..mpe_tables;
109 %mpe_loadfail(
110 status=&msg
111 ,now=&now
112 ,mperef=&mperef
113 ,dc_dttmtfmt=&dc_dttmtfmt.
114 )
115 %mp_abort(msg=&msg,mac=mpe_loader.sas);
116 %return;
117%end;
118
119proc sql;
120insert into &mpelib..mpe_loads
121 set USER_NM="&user"
122 ,STATUS='IN PROGRESS'
123 ,CSV_dir="&mperef"
124 ,PROCESSED_DTTM=&now
125 ,reason_txt = symget('submitted_reason_txt');
126
127
128/* import CSV */
129
130%let droplist=;
131%let attrib=;
132%let droplist=;
133%let libref=;
134%let DS=;
135
136/* get table info */
137data _null_;
138 set sumo_tables;
139 libds=upcase(cats(libref,'.',dsn));
140 call symputx('orig_libds',libds);
141 is_fmt=0;
142 if substr(cats(reverse(dsn)),1,3)=:'CF-' then do;
143 libds=scan(libds,1,'-');
144 putlog "Format Catalog Captured";
145 libds='work.fmtextract';
146 is_fmt=1;
147 end;
148 call symputx('is_fmt',is_fmt);
149 call symputx('libds',libds);
150 call symputx('FNAME',filename);
151 call symputx('LIBREF',libref);
152 call symputx('DS',dsn);
153 call symputx('LOADTYPE',loadtype);
154 call symputx('BUSKEY',buskey);
155 call symputx('VAR_TXFROM',var_txfrom);
156 call symputx('VAR_TXTO',var_txto);
157 call symputx('VAR_BUSFROM',var_busfrom);
158 call symputx('VAR_BUSTO',var_busto);
159 call symputx('VAR_PROCESSED',var_processed);
160 call symputx('RK_UNDERLYING',RK_UNDERLYING);
161 call symputx('POST_EDIT_HOOK',POST_EDIT_HOOK);
162 call symputx('NOTES',NOTES);
163 call symputx('PK',coalescec(RK_UNDERLYING,buskey));
164 call symputx('NUM_OF_APPROVALS_REQUIRED',NUM_OF_APPROVALS_REQUIRED,'l');
165 put (_all_)(=);
166 stop;
167run;
168
169%if %length(&ds)=0 %then %do;
170 %let msg=%str(ERR)OR: Unable to extract record from &mpelib..mpe_tables;
171 %mpe_loadfail(
172 status=FAILED
173 ,now=&now
174 ,mperef=&mperef
175 ,reason_txt=%quote(&msg)
176 ,dc_dttmtfmt=&dc_dttmtfmt.
177 )
178 %mp_abort(msg=&msg,mac=mpe_loader.sas);
179 %return;
180%end;
181
182/* export format catalog */
183%mp_cntlout(
184 iftrue=(&is_fmt=1)
185 ,libcat=&orig_libds
186 ,fmtlist=0
187 ,cntlout=work.fmtextract
188)
189
190/* user must have EDIT access to load a table */
191%mpe_accesscheck(&orig_libds
192 ,outds=work.sumo_access
193 ,user=&user
194 ,access_level=EDIT )
195%put exiting accesscheck;
196
197%if %mf_getattrn(work.sumo_access,NLOBS)=0 %then %do;
198 %let msg=%str(ERR)OR: User is not authorised to edit &orig_libds!;
199 %mpe_loadfail(
200 status=UNAUTHORISED
201 ,now=&now
202 ,mperef=&mperef
203 ,reason_txt=%quote(&msg)
204 ,dc_dttmtfmt=&dc_dttmtfmt.
205 )
206 %mp_abort(msg=&msg,mac=mpe_loader.sas);
207 %return;
208%end;
209
210%put now importing: "&csv_dir/&fname" termstr=&termstr;
211/* get the variables from the CSV */
212data vars_csv1(index=(idxname=(varnum name)) drop=infile);
213 infile "&csv_dir/&fname" lrecl=32767 dsd termstr=&termstr encoding='utf-8';
214 input;
215 length infile $32767;
216 infile=compress(_infile_,'"',);
217 infile=compress(infile,"'",);
218 format name $32.;
219 putlog 'received vars: ' infile;
220 call symputx('received_vars',infile,'l');
221 do varnum=1 to countw(infile,"&dlm");
222 /* keep writeable chars */
223 name=compress(upcase(scan(infile,varnum)),,'kw');
224 if name ne "_____DELETE__THIS__RECORD_____" then output;
225 end;
226 stop;
227run;
228%put received_vars = &received_vars;
229
230%dc_assignlib(WRITE,&libref)
231
232/* get list of variables and their formats */
233proc contents noprint data=&libds
234 out=vars(keep=name type length varnum format:);
235run;
236data vars(keep=name type length varnum format);
237 set vars(rename=(format=format2 type=type2));
238 name=upcase(name);
239 format2=upcase(format2);
240 /* not interested in transaction or processing dates
241 (append table must be supplied without them) */
242 if name not in ("&VAR_TXFROM","&VAR_TXTO","&VAR_PROCESSED"
243 ,"_____DELETE__THIS__RECORD_____");
244 if type2 in (2,6) then do;
245 length format $49.;
246 if format2='' then format=cats('$',length,'.');
247 else format=cats(format2,max(formatl,length),'.');
248 type='char';
249 end;
250 else do;
251 if format2='' then format=cats(length,'.');
252 else if format2=:'DATETIME' or format2=:'E8601DT' or format2=:'NLDATM'
253 then do;
254 format='DATETIME19.';
255 end;
256 else if format2=:'DATE' or format2=:'DDMMYY'
257 or format2=:'MMDDYY' or format2=:'YYMMDD'
258 or format2=:'E8601DA' or format2=:'B8601DA'
259 or format2=:'NLDATE'
260 then do;
261 format='DATE9.';
262 end;
263 else if format2='BEST' & formatl=0 then format=cats('BEST',length,'.');
264 else do;
265 if formatl=0 then formatl=length;
266 format=cats(format2,formatl,'.',formatd);
267 end;
268 type='num';
269 end;
270 put (_all_)(=);
271run;
272
273/* build attrib statement */
274data vars_attrib;
275 length attrib_statement $32767 type2 $20;
276 set vars end=lastobs;
277 retain attrib_statement;
278
279 if type='char' then type2='$';
280 str1=catx(' ',name,'length=',cats(type2,length));
281 attrib_statement=trim(attrib_statement)!!' '!!trim(str1);
282
283 if lastobs then call symputx('ATTRIB',attrib_statement,'L');
284run;
285
286/* build input statement - first get vars in right order
287 and join with target formats*/
288proc sql noprint;
289create table vars_csv2 as
290 select b.*
291 from vars_csv1 a
292 left join vars_attrib b
293 on a.name=b.name
294 order by a.varnum;
295
296
297/* now build input statement */
298data final_check;
299 set vars_csv2 end=lastobs;
300 length input_statement $32767 type2 $20 droplist $32767;
301 retain input_statement droplist;
302
303 /* Build input statement - CATCH EXCEPTIONS HERE!*/
304 if name in ('QUOTE_DTTM') then do;
305 name=cats(name,'2');
306 droplist=catx(' ',trim(droplist),name);
307 type2='$20.';/* converted below */
308 end;
309 else if type='char' then type2=cats('$CHAR', length,'.');
310 else if format='DATE9.' then type2='ANYDTDTE.';
311 else if format='DATETIME19.' then type2='ANYDTDTM.';
312 else if format=:'TIME' then type2='ANYDTTME.';
313 else if name='' then do;/* additional vars in input data */
314 name='_____DELETE__THIS__VARIABLE_____';
315 droplist=catx(' ',trim(droplist),'_____DELETE__THIS__VARIABLE_____');
316 type2='$1.';
317 end;
318 else type2='best32.';
319 * else type2=cats(length,'.');
320
321 input_statement=catx(' ',input_statement,name,':',type2);
322
323 if lastobs then do;
324 call symputx('INPUT', input_statement,'L');
325 if trim(droplist) ne '' then
326 call symputx('droplist',"drop "!!droplist!!';','l');
327 end;
328run;
329
330%let mpeloadstop=0;
331
332data work.STAGING_DS;
333 &droplist;
334 infile "&csv_dir/&fname" dsd dlm="&dlm" lrecl=32767
335 firstobs=2 missover termstr=&termstr encoding='utf-8';
336 attrib _____DELETE__THIS__RECORD_____ length=$3 &attrib ;
337 if _n_=1 then call missing (of _all_);
338 missing a b c d e f g h i j k l m n o p q r s t u v w x y z _;
339 input
340 %if %scan(%quote(&received_vars),1)=_____DELETE__THIS__RECORD_____ %then %do;
341 _____DELETE__THIS__RECORD_____: $3.
342 %end;
343 &input;
344
345 %if %index(%quote(&attrib.),UNLIKELY_VAR ) %then %do;
346 /*UNLIKELY_VAR=input(UNLIKELY_VAR2,ANYDTDTM21.);*/
347 /* SPECIAL LOGIC FOR SPECIAL VARS */
348 %end;
349
350 if _error_ ne 0 then do;
351 putlog _infile_;
352 call symputx('mpeloadstop',_n_);
353 stop;
354 end;
355 /* remove all blank rows */
356 if compress(cats(of _all_),'.')=' ' then delete;
357run;
358
359%if &mpeloadstop>0 %then %do;
360 %if %symexist(SYSPRINTTOLOG) %then %let logloc=&SYSPRINTTOLOG;
361 %else %let logloc=%qsysfunc(getoption(LOG));
362 %put redirecting log output to capture return message;
363 %put currentloc=&logloc;
364 filename tmp temp;
365 proc printto log=tmp;run;
366 data _null_;
367 &droplist;
368 infile "&csv_dir/&fname" dsd dlm="&dlm" lrecl=32767 firstobs=2
369 missover termstr=&termstr;
370 attrib &attrib ;
371 input
372 %if %scan(%quote(&received_vars),1)=_____DELETE__THIS__RECORD_____
373 %then %do;
374 _____DELETE__THIS__RECORD_____: $3.
375 %end;
376 &input;
377 if _error_ then stop;
378 run;
379 /* get log back */
380 proc printto log=&logloc;run;
381 data _null_; infile tmp; input; putlog _infile_;run;
382 /* scan log for invalid data warnings */
383 data _null_;
384 infile tmp;
385 input;
386 length msg1 msg2 msg3 msg4 msg5 msg url $32767;
387 if index(_infile_,'NOTE: Invalid data for') then do;
388 msg1=_infile_;
389 input;
390 msg2=_infile_;
391 input;
392 msg3=_infile_;
393 input;
394 msg4=_infile_;
395 input;
396 msg5=_infile_;
397 url=symget('url');
398 msg=catx('\n',msg1,msg2,msg3,msg4,msg5,'\n',url);
399 call symputx('msg',msg);
400 stop;
401 end;
402 run;
403
404 %mpe_loadfail(
405 status=FAILED
406 ,now=&now
407 ,mperef=&mperef
408 ,reason_txt=%superq(msg)
409 ,dc_dttmtfmt=&dc_dttmtfmt.
410 )
411 %return;
412%end;
413
414/* check that the table is unique on PK */
415proc sort data=work.STAGING_DS dupout=work.MPE_DUPS (keep=&pk) nodupkey;
416 by &pk;
417run;
418%if %mf_getattrn(work.MPE_DUPS,NLOBS)>0 %then %do;
419 %local duplist;
420 data _null_;
421 set work.mpe_dups;
422 %do i=1 %to %sysfunc(countw(&pk));
423 %let iWord=%scan(&pk,&i);
424 call symputx('duplist',symget('duplist')!!
425 " &iWord="!!cats(&iWord));
426 %end;
427 run;
428 %let msg=This upload contains duplicates on the Primary Key columns %trim(
429 )(&pk) \n Please remove the duplicates and try again. %trim(
430 )\n &duplist \n ;
431 %mp_abort(msg=%superq(msg),mac=mpe_loader.sas);
432 %return;
433%end;
434
435%if &syscc gt 4 %then %do;
436 %let msg=SYSCC=&syscc prior to post edit hook (%superq(syserrortext));
437 %mpe_loadfail(
438 status=FAILED - &syscc
439 ,now=&now
440 ,mperef=&mperef
441 ,reason_txt=%superq(msg)
442 ,dc_dttmtfmt=&dc_dttmtfmt.
443 )
444 %return;
445%end;
446
447/* If a Complex Excel Upload, needs to have the load ref added to the table */
448%mpe_xlmapvalidate(&mperef,work.staging_ds,&mpelib,&orig_libds)
449
450/* Run the Post Edit Hook prior to creation of staging folder */
451%mpe_runhook(POST_EDIT_HOOK)
452
453/* stop if err */
454%if &syscc gt 4 %then %do;
455 %let msg=ERR in post edit hook (&post_edit_hook);
456 %mpe_loadfail(
457 status=FAILED - &syscc
458 ,now=&now
459 ,mperef=&mperef
460 ,reason_txt=%quote(&msg)
461 ,dc_dttmtfmt=&dc_dttmtfmt.
462 )
463 %return;
464%end;
465
466
467/**
468 * send to approve process
469 */
470
471/* create a dataset key (datetime plus 3 digit random number plus PID) */
472
473/* send dataset to approvals subfolder with same name as subfolder */
474libname approval "&mpelocapprovals/&mperef";
475data approval.&mperef;
476 set work.staging_ds;
477run;
478proc export data=approval.&mperef
479 outfile="&mpelocapprovals/&mperef/&mperef..csv"
480 dbms=csv
481 replace;
482run;
483
484/* update the control dataset with relevant info */
485data append_app;
486 if 0 then set &mpelib..mpe_submit;/* get formats */
487 call missing (of _all_);
488 TABLE_ID="&mperef";
489 submit_status_cd='SUBMITTED';
490 submitted_by_nm="%mf_getuser()";
491 base_lib="&libref";
492 base_ds="&ds";
493 submitted_on_dttm=&now;
494 submitted_reason_txt=symget('submitted_reason_txt');
495 input_vars=%mf_getattrn(approval.&mperef,NVARS);
496 input_obs=%mf_getattrn(approval.&mperef,NLOBS);
497 num_of_approvals_required=&NUM_OF_APPROVALS_REQUIRED;
498 num_of_approvals_remaining=&NUM_OF_APPROVALS_REQUIRED;
499 reviewed_by_nm='';
500 reviewed_on_dttm=.;
501run;
502
503%mp_lockanytable(LOCK,lib=&mpelib,ds=mpe_submit,
504 ref=%str(&mperef update in &_program),
505 ctl_ds=&mpelib..mpe_lockanytable
506)
507proc append base= &mpelib..mpe_submit data=append_app;
508run;
509%mp_lockanytable(UNLOCK,
510 lib=&mpelib,ds=mpe_submit,
511 ctl_ds=&mpelib..mpe_lockanytable
512)
513
514/* send email to REVIEW members */
515%put sending mpe_alerts;
516%mpe_alerts(alert_event=SUBMITTED
517 , alert_lib=&libref
518 , alert_ds=&ds
519 , dsid=&mperef
520)
521/* DISABLE EMAIL FOR NOW
522 %let b2=REASON: %quote(&submitted_reason_txt);
523
524 %local URLNOTES;
525 %if %length(&notes)>0 %then %let URLNOTES=%quote(%sysfunc(urlencode(&notes)));
526
527 %let b3=%str(Click to review / approve: )%trim(
528 )%str(http://&_srvname:&_srvport&_url?_PROGRAM=/Web/approvals&)%trim(
529 )TABLEID=&dsid%str(&)BASETABLE=&libref..&ds%str(&)NOTES=&URLNOTES;
530
531 %let b4=%str(Reference ID: &mperef);
532*/
533
534%put mpe_loader finishing up with syscc=&syscc;
535%if &syscc le 4 %then %do;
536 %local dur;
537 data _null_;
538 now=symget('now');
539 dur=%sysfunc(datetime())-&now;
540 call symputx('dur',dur,'l');
541 putlog 'Updating mpe_loads with the following query:';
542 putlog "update &mpelib..mpe_loads set STATUS='SUCCESS'";
543 putlog " , duration=" dur;
544 putlog " , processed_dttm=" now;
545 putlog " , approvals = '&libref..&ds'";
546 putlog " where CSV_DIR='&mperef';";
547 run;
548 proc sql;
549 update &mpelib..mpe_loads set STATUS='SUCCESS'
550 , duration=&dur
551 , processed_dttm=&now
552 , approvals = "&libref..&ds"
553 where CSV_DIR="&mperef";
554%end;
555%else %do;
556 %mpe_loadfail(
557 status="FAILED - &syscc"
558 ,now=&now
559 ,approvals=&libref..&ds
560 ,mperef=&mperef
561 ,dc_dttmtfmt=&dc_dttmtfmt.
562 )
563 %return;
564%end;
565
566%mend mpe_loader;