Loading...
Searching...
No Matches
bitemporal_dataloader.sas
Go to the documentation of this file.
1/**
2 @file
3 @brief Routine supporting multiple load types
4 @details Generic loader for multiple load types (UPDATE, SCD2, BITEMPORAL).
5
6 Handles all elements including metadata validation, PK checking, closeouts,
7 locking, logging, etc.
8
9 The staging table must be prepared with a unique business key. For bitemporal
10 this means a snapshot at both technical AND business time.
11
12ASSUMPTIONS:
13 - Base table has relevant datetime vars: 2xTechnical, 2xBusiness, 1xProcessed
14 - Staging table omits Technical or Processed datetimes (has Business only)
15 - Base table has no column names containing the string "___TMP___"
16 - Base &tech_from variable is not nullable. This should always be the case
17 anyway whenbuilding permanent bitemporal datasets.. But the point is that
18 this field is used to identify new records after the initial left join
19 from staging to base table.
20
21NOTES:
22 - All queries against BiTemporal tables need two filter conditions, as such:
23
24 where &bus_from LE [tstamp] LT &bus_to
25 AND &tx_from LE [tstamp] LT &tx_to
26
27 One cannot use BETWEEN
28 One cannot use &xx_from LE [tstamp] LE &xx_from (equivalent to above).
29 Background:
30http://stackoverflow.com/questions/20005950/best-practice-for-scd-date-pairs-closing-opening-timestamps
31
32Areas for optimisation
33 - loading temporal history (currently experimental)
34
35 ## Supporting tables
36
37 Supporting tables must exist in the library specified in the `dclib` param.
38
39 ### MPE_DATALOADS
40
41 This table is updated every time a successful load occurs, and includes
42 information such as:
43
44 @li library
45 @li dataset
46 @li message (supplied in the ETLSOURCE param)
47 @li new rows
48 @li deleted rows
49 @li changed rows
50 @li timestamp
51 @li the user making the load
52 @li the version of (this) macro used to make the load
53
54
55 @param [in] APPEND_DSN= (APPENDTABLE) Name of STAGING table
56 @param [in] CONFIG_TABLE= (&dclib..MPE_CONFIG) The table containing library
57 engine specific config. The following scopes are supported:
58 @li DCBL_REDSH
59 @param [in] LOADTYPE= (BITEMPORAL) Supported types:
60 @li TXTEMPORAL - loads a buskey with version times
61 @li BUSTEMPORAL - loads buskey with bus + ver times
62 @li UPDATE - updates a buskey with NO history
63 @param [in] PROCESSED= (0) This column obtains a current timestamp for changed
64 records when loading the target table. Default is 0 (not set). If the
65 target table contains a variable called PROCESSED_DTTM, and processed=0,
66 then this column will be used for applying the current timestamp.
67 @param RK_MAXKEYTABLE= (mpe_maxkeyvalues) The maxkeytable to use (must exist
68 in DCLIB)
69 @param [in] PK= Business key, space separated. Should NOT include temporal
70 fields.
71 @param [in] RK_UNDERLYING= If supplied will generate an RK based on these
72 (space separated) business key fields. In this case only ONE PK field should
73 be supplied, which is assumed to be the RK. The RK field, plus underlying
74 fields, should all exist on the base table. The underlying fields should
75 exist on the staging table (the RK / PK field will be overwritten).
76 The staging table should also be unique on its PK.
77
78 @param [in] dclib= (&dc_libref) The library containing DC configuration tables
79 @param [out] outds_del= (work.outds_del) Output table containing
80 deleted records
81 @param [out] outds_add= (work.outds_add) Output table containing
82 appended records
83 @param [out] outds_mod= (work.outds_mod) Output table containing
84 changed records
85 @param [out] outds_audit= (0) Load detailed changes to an audit table. Uses
86 the mp_storediffs.sas macro. Provide the base table here, to load.
87
88 <h4> Global Variables </h4>
89 The following global macro variables are used. These should be replaced by
90 macro parameters in future releases.
91
92 @li `dc_dttmtfmt`
93
94 <h4> SAS Macros </h4>
95 @li bitemporal_closeouts.sas
96 @li dc_assignlib.sas
97 @li mf_existds.sas
98 @li mf_existvar.sas
99 @li mf_fmtdttm.sas
100 @li mf_getattrn.sas
101 @li mf_getengine.sas
102 @li mf_getschema.sas
103 @li mf_getuniquefileref.sas
104 @li mf_getuniquename.sas
105 @li mf_getuser.sas
106 @li mf_getvarlist.sas
107 @li mf_verifymacvars.sas
108 @li mf_wordsinstr1butnotstr2.sas
109 @li mp_abort.sas
110 @li mp_dropmembers.sas
111 @li mp_lockanytable.sas
112 @li mp_lockfilecheck.sas
113 @li mp_retainedkey.sas
114 @li mp_storediffs.sas
115
116 @version 9.3
117 @author 4GL Apps Ltd.
118 @copyright 4GL Apps Ltd. This code may only be used within Data Controller
119 and may not be re-distributed or re-sold without the express permission of
120 4GL Apps Ltd.
121
122 @warning multitemporal loads (bitemporal for multiple points in business time)
123 are in experimental stage
124
125**/
126
127%macro bitemporal_dataloader(
128 bus_from= /* Business FROM datetime variable. Req'd on
129 STAGING & BASE tables.*/
130 ,bus_to = /* Business TO datetime variable. Req'd on
131 STAGING & BASE tables. */
132 ,bus_from_override= /* Provide a hard coded BUS_FROM datetime value.*/
133 ,bus_to_override= /* provide a hard coded BUS_TO datetime value */
134 ,tech_from= /* Technical FROM datetime variable. Req'd on
135 BASE table only. */
136 ,tech_to = /* Technical TO datetime variable. Req'd on BASE
137 table only. */
138 ,processed= 0
139 ,base_lib=WORK /* Libref of the BASE table. */
140 ,base_dsn=BASETABLE /* Name of BASE table. */
141 ,append_lib=WORK /* Libref of the STAGING table. */
142 ,append_dsn=APPENDTABLE
143 ,high_date='01JAN5999:00:00:00'dt /* High date to close out records */
144 ,PK= name sex
145 ,RK_UNDERLYING=
146 ,KEEPVARS= /* Provides option for removing unwanted vars from append table */
147 ,RK_UPDATE_MAXKEYTABLE=NO /* If switching (or mix matching) with regular
148 SCD2 loader then set this switch to YES to
149 ensure the MAXKEYTABLE is updated with the
150 current maximum RK value for the target table
151 */
152 ,CHECK_UNIQUENESS=YES /* Perform a check of the APPEND table to ensure it is
153 unique on its business key */
154 ,ETLSOURCE=demo /* supply a value ($50.) to show as ETLSOURCE in
155 &dclib..DATALOADS */
156 ,LOADTYPE=BITEMPORAL
157 ,RK_MAXKEYTABLE= mpe_maxkeyvalues
158 ,LOG=1 /* Switch to 0 to prevent records being added to
159 &mpelib..mpe_DATALOADS (ie when testing)*/
160 ,DELETE_COL= _____DELETE__THIS__RECORD_____
161 /* If this variable is found in the append dataset
162 then records are closed out (or deleted) in the
163 append table where that variable= "Yes" */
164 ,LOADTARGET=YES /* set to anything but uppercase YES to switch off
165 target table load and generate temp tables only */
166 ,CLOSE_VARS=
167/*a problem with regular SCD2 or TXTEMPORAL loads is that there is
168 no facility to close out removed records (all records are
169 assumed new or changed). But how does one determine which
170 records are removed? Short of loading the entire table
171 each time? This parameter allows a set of variables
172 (this should be a subset of the PK) to be declared, and
173 the macro will determine which records in the base table
174 need to be closed out ahead of the load.
175
176 For instance, given the following:
177
178 Base Table Staging Table
179 DATE ENTITY AMOUNT DATE ENTITY AMOUNT
180 JAN ACME4 66 JAN ACME4 66
181 FEB ACME4 99 FEB ACME4 99
182 FEB ACME1 22
183
184 By supplying DATE in CLOSE_VARS and DATE ENTITY as the PK,
185 the "FEB PAG 22" record would get closed out.
186 */
187 ,config_table=&dclib..MPE_CONFIG
188 ,dclib=&dc_libref
189 ,outds_del=work.outds_del
190 ,outds_add=work.outds_add
191 ,outds_mod=work.outds_mod
192 ,outds_audit=0
193 );
194
195/* when changing this macro, update the version num here */
196%local ver;
197%let ver=32;
198%put &sysmacroname entry vars:;
199%put _local_;
200
201%dc_assignlib(WRITE,&base_lib) /* may not already be assigned */
202
203/* return straight away if nothing to load */
204%let nobs= %mf_getattrn(&append_lib..&append_dsn,NLOBS);
205%if &nobs=-1 %then %do;
206 proc sql noprint; select count(*) into: nobs from &append_lib..&append_dsn;
207%end;
208%if &nobs=0 %then %do;
209 %put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
210 %put NOTE- Base dataset &append_lib..&append_dsn is empty. Nothing to upload!;
211 %put NOTE-;%put NOTE-;%put NOTE-;
212 %return;
213%end;
214
215/* hard exit if err condition exists */
216%mp_abort(iftrue= (&syscc > 0)
217 ,mac=bitemporal_dataloader
218 ,msg=%str(Bitemporal transform / job aborted due to SYSCC=&SYSCC status;)
219)
220
221%local engine_type;
222%let engine_type=%mf_getengine(&base_lib);
223%if (&engine_type=REDSHIFT or &engine_type=POSTGRES or &engine_type=SNOW)
224 and %length(&CLOSE_VARS)>0
225%then %do;
226 %put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
227 %put NOTE- CLOSE_VARS functionality not yet supported in &engine_type;
228 %put NOTE-;%put NOTE-;%put NOTE-;
229 %return;
230%end;
231
232/**
233 * The metadata functions (eg mf_existvar) will fail if the base table has a
234 * SAS lock. So, make a snapshot of the base table for further use.
235 * Also, make output tables (regardless).
236 */
237%local basecopy;
238%let basecopy=%mf_getuniquename(prefix=basecopy);
239
240data &basecopy &outds_mod &outds_add &outds_del;
241 set &base_lib..&base_dsn;
242 stop;
243run;
244%mp_abort(iftrue= (&syscc > 0)
245 ,mac=&_program
246 ,msg=%str(syscc=&syscc after base table copy - aborting due to table lock)
247)
248
249
250%local cols idx_pk md5_col ;
251%let md5_col=___TMP___md5;
252%let check_uniqueness=%upcase(&check_uniqueness);
253%let RK_UPDATE_MAXKEYTABLE=%upcase(&RK_UPDATE_MAXKEYTABLE);
254%let high_date=%unquote(&high_date);
255%let loadtype=%upcase(&loadtype);
256
257/* ensure irrelevant variables are cleared */
258%if &loadtype=BUSTEMPORAL %then %do;
259 %let tech_from=;
260 %let tech_to=;
261%end;
262%else %if &loadtype=TXTEMPORAL or &loadtype=UPDATE %then %do;
263 %let bus_from=;
264 %let bus_to=;
265%end;
266
267/* ensure relevant variables are supplied */
268%mp_abort(iftrue=(&loadtype=BITEMPORAL & %mf_verifymacvars(bus_from bus_to)=0)
269 ,mac=bitemporal_dataloader
270 ,msg=%str(Missing BUS_FROM / BUS_TO)
271)
272%mp_abort(iftrue=(&loadtype=TXTEMPORAL & %mf_verifymacvars(tech_from tech_to)=0)
273 ,mac=bitemporal_dataloader
274 ,msg=%str(Missing TECH_FROM / TECH_TO)
275)
276
277/**
278 * drop any tables (may be defined as views or vice versa preventing overwrite)
279 */
280%mp_dropmembers(append bitemp0_append bitemp_cols)
281
282/* SQL Server requires its own time values */
283/* 9.2 will only give picture format down to seconds. 9.3 allows
284 milliseconds by using lower S and defining the decimal in the format name..*/
285PROC FORMAT;
286 picture MyMSdt other='%0Y-%0m-%0dT%0H:%0M:%0S' (datatype=datetime);
287RUN;
288%local dbnow;
289%let dbnow="%sysfunc(datetime(),%mf_fmtdttm())"dt;
290
291data _null_;
292 /* convert space separated macvar to comma separated for SQL processing */
293 call symputx('PK_COMMA',tranwrd(compbl("&pk"),' ',','),'L');
294 call symputx('PK_CNT',countw("&pk",' '),'L');
295 now=&dbnow;
296 call symputx('NOW',now,'L');
297 call symputx('SQLNOW',cats("'",put(now,MyMSdt.),"'"),'L');
298 length etlsource $100;
299 etlsource=subpad(symget('etlsource'),1,100);
300 call symputx('etlsource',etlsource,'l');
301run;
302
303/**
304 * Even if no PROCESSED var provided, assume that any variable named
305 * PROCESSED_DTTM should be updated
306 */
307%if &processed=0 %then %do;
308 %if %mf_existvar(&basecopy,PROCESSED_DTTM)
309 %then %let processed=PROCESSED_DTTM;
310 %else %let processed=;
311%end;
312
313
314/* extract colnames for md5 creation / change tracking */
315proc contents noprint data=&base_lib..&base_dsn
316 out=work.bitemp_cols (keep=name type length varnum format:);
317run;
318proc sql noprint;
319select name into: cols separated by ','
320 from work.bitemp_cols
321 where upcase(name) not in
322 (%upcase("&bus_from","&bus_to"
323 ,"&tech_from","&tech_to"
324 ,"&processed","&delete_col")) ;
325select case when type in (2,6) then cats('put(md5(trim(',name,')),$hex32.)')
326 /* multiply by 1 to strip precision errors (eg 0 != 0) */
327 /* but ONLY if not missing, else will lose any special missing values */
328 else cats('put(md5(trim(put(ifn(missing('
329 ,name,'),',name,',',name,'*1),binary64.))),$hex32.)') end
330 into: stripcols separated by '||'
331 from work.bitemp_cols
332 where upcase(name) not in
333 (%upcase("&bus_from","&bus_to"
334 ,"&tech_from","&tech_to"
335 ,"&processed","&delete_col")) ;
336
337/* set default formats*/
338%let bus_from_fmt = datetime19.;
339%let bus_to_fmt = datetime19.;
340%let processed_fmt = datetime19.;
341
342%let tech_from_fmt = format=datetime19.;
343%let tech_to_fmt = format=datetime19.;
344
345
346%put &=stripcols;
347%put &=pk;
348
349data _null_;
350 set work.bitemp_cols;
351 if type=2 or type=6 then do;
352 length fmt $49.;
353 if format='' then fmt=cats('$',length,'.');
354 else fmt=cats(format,formatl,'.');
355 end;
356 else do;
357 if format='' then fmt=cats(length,'.');
358 else fmt=cats(format,formatl,'.',formatd);
359 end;
360 if upcase(name)="%upcase(&bus_from)" then
361 call symputx('bus_from_fmt',fmt,'L');
362 else if upcase(name)="%upcase(&bus_to)" then
363 call symputx('bus_to_fmt',fmt,'L');
364 else if upcase(name)="%upcase(&tech_from)" then
365 call symputx('tech_from_fmt',"format="!!fmt,'L');
366 else if upcase(name)="%upcase(&tech_to)" then
367 call symputx('tech_to_fmt',"format="!!fmt,'L');
368 else if upcase(name)="%upcase(&processed)" then
369 call symputx('processed_fmt',fmt,'L');
370run;
371
372%if %index(%quote(&cols),___TMP___) %then %do;
373 %let msg=%str(Table contains a variable name containing "___TMP___".%trim(
374 ) This may conflict with temp variable generation!!);
375 %mp_abort(msg=&msg,mac=bitemporal_dataloader);
376 %let syscc=5;
377 %return;
378%end;
379
380/* if transaction dates appear on the APPEND table, need to remove them */
381%local drop_tx_dates /* used in append table */
382 drop_tx_dates_noobs /* used to take the base table structure */;
383%if %mf_existvar(&append_lib..&append_dsn, &tech_from)
384 %then %let drop_tx_dates=&tech_from;
385%if %mf_existvar(&append_lib..&append_dsn, &tech_to)
386 %then %let drop_tx_dates=&drop_tx_dates &tech_to;
387%if %length(%trim(&drop_tx_dates))>0
388 %then %let drop_tx_dates=(drop=&drop_tx_dates);
389
390%if %mf_existvar(&basecopy, &tech_from)
391 %then %let drop_tx_dates_noobs=&tech_from;
392%if %mf_existvar(&basecopy, &tech_to)
393 %then %let drop_tx_dates_noobs=&drop_tx_dates_noobs &tech_to;
394%if %length(%trim(&drop_tx_dates_noobs))>0
395 %then %let drop_tx_dates_noobs=(drop=&drop_tx_dates_noobs obs=0);
396%else %let drop_tx_dates_noobs=(obs=0);
397
398
399/**
400 * Lock the table. This is necessary as we are doing a two part update (first
401 * closing records then appending new records). It is theoretically possible
402 * that an upload may occur whilst preparing the staging tables. And the
403 * staging tables are about to be prepared..
404 */
405%if &LOADTARGET = YES %then %do;
406 %put locking &base_lib..&base_dsn;
407 %mp_lockanytable(LOCK,
408 lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,ctl_ds=&dclib..mpe_lockanytable
409 )
410 %if "&outds_audit" ne "0" %then %do;
411 %put locking &outds_audit;
412 %mp_lockanytable(LOCK
413 ,lib=%scan(&outds_audit,1,.)
414 ,ds=%scan(&outds_audit,2,.)
415 ,ref=&ETLSOURCE
416 ,ctl_ds=&dclib..mpe_lockanytable
417 )
418 %end;
419%end;
420%else %do;
421 /* not an actual load, so avoid updating the max key table in next step. */
422 %let rk_update_maxkeytable=NO;
423%end;
424
425%if %length(&RK_UNDERLYING)>0 %then %do;
426 %mp_retainedkey(
427 base_lib=&base_lib
428 ,base_dsn=&base_dsn
429 ,append_lib=&append_lib
430 ,append_dsn=&append_dsn
431 ,retained_key=&pk
432 ,business_key=&rk_underlying
433 ,check_uniqueness=&CHECK_UNIQUENESS
434 ,outds=work.append
435 %if &rk_update_maxkeytable=NO %then %do;
436 ,maxkeytable=0
437 %end;
438 %else %do;
439 ,maxkeytable=&dclib..&RK_MAXKEYTABLE
440 %end;
441 ,locktable=&dclib..mpe_lockanytable
442 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
443 ,filter_str=%str( (where=( &now < &tech_to)) )
444 %end;
445 )
446%end;
447%else %do;
448 proc sql;
449 create view work.append as select * from &append_lib..&append_dsn;
450%end;
451/**
452* generate md5 for append table
453*/
454/* it is possible the source dataset has additional (unwanted) columns.
455 Drop if specified; */
456%if %length(&keepvars)>0 %then %do;
457 /* remove tech dates from keepvars as they are generated later */
458 %let keepvars=%sysfunc(tranwrd(%str( &keepvars ),%str( &tech_from ),%str( )));
459 %let keepvars=%sysfunc(tranwrd(%str( &keepvars ),%str( &tech_to ),%str( )));
460 %let keepvars=(keep=&keepvars &bus_from &bus_to &processed &md5_col);
461%end;
462
463/* CAS varchar types cause append issues here, so perform autoconvert
464 by creating empty local table first */
465data;
466 set &base_lib..&base_dsn &drop_tx_dates_noobs;
467run;
468%local emptybasetable; %let emptybasetable=&syslast;
469
470data work.bitemp0_append &keepvars &outds_del(drop=&md5_col )
471 %if "%substr(&sysver,1,1)" ne "4" and "%substr(&sysver,1,1)" ne "5" %then %do;
472 /nonote2err
473 %end;
474 ;
475 /* apply formats for bitemporal vars but not tx dates which are added later */
476 %if %length(&keepvars)>0 and &loadtype=BITEMPORAL %then %do;
477 format &bus_from &bus_from_fmt;
478 format &bus_to &bus_to_fmt;
479 %end;
480 set &emptybasetable /* base table reqd in case append has fewer cols */
481 work.append &drop_tx_dates;
482 %if %length(%str(&bus_from_override))>0 %then %do;
483 &bus_from= %unquote(&bus_from_override) ;
484 %end;
485 %if %length(%str(&bus_to_override))>0 %then %do;
486 &bus_to= %unquote(&bus_to_override) ;
487 %end;
488 length &md5_col $32;
489 &md5_col=put(md5(&stripcols),hex32.);
490 %if %length(&processed)>0 %then %do;
491 format &processed &processed_fmt;
492 &processed=&now;
493 %end;
494
495/**
496 * If a delete column exists then create the delete dataset
497 */
498%if %mf_existvar(&append_lib..&append_dsn, &delete_col) %then %do;
499 drop &delete_col;
500 if upcase(&delete_col) = "YES" then output &outds_del ;
501 else output work.bitemp0_append ;
502 run;
503
504 %if %mf_getattrn(&outds_del,NLOBS)>0 %then %do;
505 %bitemporal_closeouts(
506 tech_from=&tech_from
507 ,tech_to = &tech_to
508 ,base_lib=&base_lib
509 ,base_dsn=&base_dsn
510 ,append_lib=work
511 ,append_dsn=%scan(&outds_del,-1,.)
512 ,PK=&bus_from &pk
513 ,NOW=&dbnow
514 ,loadtarget=&loadtarget
515 ,loadtype=&loadtype
516 ,AUDITFOLDER=&dc_staging_area/&ETLSOURCE
517 )
518 %end;
519%end;
520%else %do;
521 output work.bitemp0_append;
522 run;
523%end;
524
525%mp_abort(iftrue= (&syscc gt 0 at line 494)
526 ,mac=&_program
527 ,msg=%str(syscc=&syscc)
528)
529
530%if %length(&close_vars)>0 %then %do;
531 /**
532 * need to close out records that are not provided
533 */
534 proc sql;
535 create table bitemp1_closevars1 as
536 select distinct a.%mf_getquotedstr(in_str=&pk,dlm=%str(,a.),quote=)
537 from &base_lib..&base_dsn a
538 inner join work.bitemp0_append b
539 on 1=1
540 /* join on closevars key */
541 %do idx_pk=1 %to %sysfunc(countw(&close_vars));
542 %let idx_val=%scan(&close_vars,&idx_pk);
543 and a.&idx_val=b.&idx_val
544 %end;
545 /* filter base on tech dates if necessary */
546 %if &loadtype=TXTEMPORAL %then %do;
547 where a.&tech_from <=&now and &now < a.&tech_to
548 %end;
549 ;
550 create table bitemp1_closevars2 as
551 select distinct a.*
552 from bitemp1_closevars1 a
553 left join work.bitemp0_append b
554 on 1=1
555 /* join on primary key */
556 %do idx_pk=1 %to %sysfunc(countw(&pk));
557 %let idx_val=%scan(&pk,&idx_pk);
558 and a.&idx_val=b.&idx_val
559 %end;
560 /* identify removed records by null value in a field in PK but not close_vars
561 */
562 where b.%scan(
563 %mf_wordsInStr1ButNotStr2(Str1=&pk,Str2=&close_vars),1,%str( )
564 ) IS NULL
565 ;
566
567 %if %mf_getattrn(bitemp1_closevars2,NLOBS)>0 %then %do;
568 %bitemporal_closeouts(
569 tech_from=&tech_from
570 ,tech_to = &tech_to
571 ,base_lib=&base_lib
572 ,base_dsn=&base_dsn
573 ,append_lib=work
574 ,append_dsn=bitemp1_closevars2
575 ,PK=&bus_from &pk
576 ,NOW=&dbnow
577 ,loadtarget=&loadtarget
578 ,loadtype=&loadtype
579 ,AUDITFOLDER=&dc_staging_area/&ETLSOURCE
580 )
581 %end;
582%end;
583
584/* return if nothing to load (was just deletes) */
585%if %mf_getattrn(work.bitemp0_append,NLOBS)=0 %then %do;
586 %put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
587 %put NOTE- No updates - just deletes!;
588 %put NOTE-;%put NOTE-;%put NOTE-;
589%end;
590
591
592/**
593 * If applying manual overrides to business dates, then the input table MUST
594 * be unique on the PK. Check, and if not - abort.
595 */
596%local msg;
597%if %length(&bus_from_override.&bus_to_override)>0 or &CHECK_UNIQUENESS=YES
598%then %do;
599 proc sort data=work.bitemp0_append out=work.bitemp0_check nodupkey;
600 by &pk;
601 run;
602 %if %mf_getattrn(work.bitemp0_check,NLOBS)
603 ne %mf_getattrn(work.bitemp0_append,NLOBS)
604 %then %do;
605 %let msg=INPUT table &append_lib..&append_dsn is not unique on PK (&pk);
606 %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE (&msg),
607 ctl_ds=&dclib..mpe_lockanytable
608 )
609 %mp_lockanytable(UNLOCK
610 ,lib=%scan(&outds_audit,1,.)
611 ,ds=%scan(&outds_audit,2,.)
612 ,ref=&ETLSOURCE
613 ,ctl_ds=&dclib..mpe_lockanytable
614 )
615 %mp_abort(msg=&msg,mac=bitemporal_dataloader.sas);
616 %end;
617%end;
618
619
620/**
621* extract from BASE table. Only want matching records, as could be very BIG.
622* New records are subsequently identified via left join and test for nulls.
623*/
624%local temp_table temp_table2 base_table baselib_schema;
625%put DCNOTE: Extracting matching observations from &base_lib..&base_dsn;
626
627%if &engine_type=OLEDB %then %do;
628 %let temp_table=##%mf_getuniquefileref(prefix=BTMP);
629 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
630 %let base_table=(select * from [dbo].&base_dsn
631 where convert(datetime,&SQLNOW) < &tech_to );
632 %else %let base_table=[dbo].&base_dsn;
633 proc sql;
634 create table &base_lib.."&temp_table"n as
635 select * from work.bitemp0_append;
636 /* open up a connection for pass through SQL */
637 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
638 create table work.bitemp0_base as select * from connection to myAlias(
639%end;
640%else %if &engine_type=REDSHIFT or &engine_type=POSTGRES or &engine_type=SNOW
641%then %do;
642 /* grab schema */
643 %let baselib_schema=%mf_getschema(&base_lib);
644 %if &baselib_schema.X ne X %then %let baselib_schema=&baselib_schema..;
645
646 /* grab redshift config */
647 %local redcnt; %let redcnt=0;
648 %if &engine_type=REDSHIFT %then %do;
649 data _null_;
650 set &config_table(where=(var_scope='DCBL_REDSH' and var_active=1));
651 x+1;
652 call symputx(cats('rednm',x),var_value,'l');
653 call symputx(cats('redval',x),var_value,'l');
654 call symputx('redcnt',x,'l');
655 run;
656 %end;
657 %let temp_table=%upcase(%mf_getuniquename(prefix=XDCTEMP));
658 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
659 %let base_table=(select * from &baselib_schema.&base_dsn
660 where timestamp &sqlnow < &tech_to );
661 %else %let base_table=&baselib_schema.&base_dsn;
662 /* make in-db empty table with PK + MD5 only */
663 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
664 %if &engine_type=SNOW %then %do;
665 exec (create transient table &baselib_schema.&temp_table
666 like &baselib_schema.&base_dsn
667 ) by myAlias;
668 %end;
669 %else %do;
670 /* cannot persist temp tables so must create a temporary permanent table */
671 exec (create table &temp_table (like &baselib_schema.&base_dsn)) by myAlias;
672 %if &engine_type=REDSHIFT %then %do;
673 exec (alter table &temp_table alter sortkey none) by myAlias;
674 %end;
675 %end;
676 %local dropcols;
677 %let dropcols=%mf_wordsinstr1butnotstr2(
678 str1=%upcase(%mf_getvarlist(&basecopy))
679 ,str2=%upcase(&pk)
680 );
681 %if %length(&dropcols>0) %then %do idx_pk=1 %to %sysfunc(countw(&dropcols));
682 %put &=dropcols;
683 %let idx_val=%scan(&dropcols,&idx_pk);
684 exec(alter table &temp_table drop column &idx_val;) by myAlias;
685 %end;
686 exec (alter table &temp_table add column &md5_col varchar(32);) by myAlias;
687 /* create view to strip formats and avoid warns in log */
688 data work.vw_bitemp0/view=work.vw_bitemp0;
689 set work.bitemp0_append(keep=&pk &md5_col);
690 format _all_;
691 run;
692 proc append base=&base_lib..&temp_table
693 %if &engine_type=REDSHIFT %then %do;
694 (
695 %do idx_pk=1 %to &redcnt;
696 &&rednm&idx_pk = &&redval&idxpk
697 %end;
698 )
699 %end;
700 data=work.vw_bitemp0 force nowarn;
701 run;
702 /* open up a connection for pass through SQL */
703 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
704 create table work.bitemp0_base as select * from connection to myAlias(
705%end;
706%else %if &engine_type=CAS %then %do;
707 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
708 %let base_table=&base_lib..&base_dsn
709 (where=(&tech_from <=&now and &now < &tech_to));
710 %else %let base_table=&base_lib..&base_dsn;
711 %let temp_table=CASUSER.%mf_getuniquename(prefix=DC);
712 data &temp_table;
713 set work.bitemp0_append;
714 run;
715 %let bitemp0base=CASUSER.%mf_getuniquename(prefix=DC);
716 proc fedsql sessref=dcsession;
717 create table &bitemp0base{options replace=true} as
718%end;
719%else %do;
720 %let temp_table=work.bitemp0_append;
721 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
722 %let base_table=&base_lib..&base_dsn
723 (where=(&tech_from <=&now and &now < &tech_to));
724 %else %let base_table=&base_lib..&base_dsn;
725 proc sql;
726 create table work.bitemp0_base as
727%end;
728
729 select a.&md5_col /* this identifies NEW records */
730 , b.*
731 /* assume first PK field cannot be null (if defined in a PK constraint then
732 it definitely cannot be null) */
733 , case when b.%scan(&pk,1) IS NULL then 1 else 0 end as ___TMP___NEW_FLG
734 from &baselib_schema.&temp_table a
735 left join &base_table b
736 on 1=1
737%do idx_pk=1 %to &pk_cnt;
738 %let idx_val=%scan(&pk,&idx_pk);
739 and a.&idx_val=b.&idx_val
740%end;
741
742
743%if &engine_type=OLEDB or &engine_type=REDSHIFT or &engine_type=POSTGRES
744or &engine_type=SNOW
745%then %do;
746 ); proc sql; drop table &base_lib.."&temp_table"n;
747%end;
748%else %if &engine_type=CAS %then %do;
749 ;
750 quit;
751 data work.bitemp0_base;
752 set &bitemp0base;
753 run;
754 proc sql;
755 drop table &temp_table;
756 drop table &bitemp0base;
757%end;
758%else %do;
759 ;
760%end;
761
762/**
763* matching & changed records are those without NULL key values
764* &idx_val resolves to rightmost PK value (loop above)
765*/
766%put syscc (line525)=&syscc, sqlrc=&sqlrc;
767%mp_abort(iftrue= (&syscc gt 0 or &sqlrc>0)
768 ,mac=&_program
769 ,msg=%str(syscc=&syscc sqlrc=&sqlrc)
770)
771
772%put hashcols2=&stripcols;
773proc sql;
774create table work.bitemp1_current(drop=___TMP___NEW_FLG) as
775 select *
776 , put(md5(&stripcols),$hex32.) as &md5_col
777 from work.bitemp0_base (drop=&md5_col)
778 where ___TMP___NEW_FLG=0;
779
780/**
781* NEW records were identified in ___TMP___NEW_FLG in bitemp0_base
782*/
783proc sql;
784create table &outds_add
785 (drop=&md5_col
786 %if %mf_existvar(work.bitemp0_base, &delete_col) %then %do;
787 &delete_col
788 %end;
789 )
790 as select a.*
791 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
792 ,&now as &tech_from &tech_from_fmt
793 ,&high_date as &tech_to &tech_to_fmt
794 %end;
795 from work.bitemp0_append a /* STAGING records (mix of existing & new) */
796 , work.bitemp0_base b /* BASE records (contains null values for new) */
797 where a.&md5_col=b.&md5_col /* took staging md5 across in left join */
798 and b.___TMP___NEW_FLG=1; /* NEW records also identified in bitemp0_base */
799
800
801/**
802* identify INSERTS. These are records with the same business key but
803* the bus_from and bus_to value are higher / lower (respectively)
804* such that the existing record needs to be SPLIT to surround the new
805* record.
806* eg: OLD RECORD from=1 to=10
807* NEW RECORD from=5 to=7
808*
809* APPENDED RECORDS:
810* - from=1 to=5
811* - from=5 to=7
812* - from=7 to=10
813*/
814
815/* inserts cannot happen with TXTEMPORAL */
816%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
817 /* IDENTIFY */
818 create table work.bitemp3_inserts as
819 select b.*
820 ,a.&bus_from as ___TMP___from
821 ,a.&bus_to as ___TMP___to
822 from work.bitemp0_append a
823 ,work.bitemp1_current b
824 where a.&bus_from > b.&bus_from
825 and a.&bus_to < b.&bus_to
826 %do idx_pk=1 %to &pk_cnt;
827 %let idx_val=%scan(&pk,&idx_pk);
828 and a.&idx_val=b.&idx_val
829 %end;
830 order by
831 /* compress blanks and then insert commas (as the datetime fields may
832 not be in use) */
833 %sysfunc(tranwrd(%sysfunc(compbl(
834 &pk &bus_from &bus_to &processed
835 )),%str( ), %str(,)))
836 ;
837
838 /* SPLIT */
839 data work.bitemp3a_inserts (drop=___TMP___from ___TMP___retain ___TMP___to) ;
840 set work.bitemp3_inserts;
841 by &pk &bus_from &bus_to &processed;
842 if first.&idx_val then do;
843 ___TMP___retain=&bus_to;
844 &bus_to=___TMP___from;
845 output;
846 &bus_to=___TMP___retain;
847 end;
848 if last.&idx_val then do;
849 &bus_from=___TMP___to;
850 output;
851 end;
852 run;
853%end;
854%else %do;
855 /* TX temporal load */
856 data work.bitemp3a_inserts;
857 set work.bitemp1_current;
858 stop;
859 run;
860%end;
861/* APPEND */
862proc sql;
863create view work.bitemp3a_view as
864 select * from work.bitemp1_current
865 where &md5_col not in (select &md5_col from work.bitemp3a_inserts);
866
867data bitemp3b_newbase;
868 set work.bitemp3a_inserts work.bitemp3a_view;
869run;
870
871/** do not use! this converts short numerics into 8 bytes
872proc sql;
873create table work.bitemp3b_newbase as
874 select * from work.bitemp3a_inserts
875union corr
876 select * from work.bitemp1_current
877 where &md5_col not in (select &md5_col from work.bitemp3a_inserts);
878*/
879
880/**
881* identify CHANGED records from staging.
882* Same business key with different temporal dates or md5 value
883* This table must be overlayed onto / into existing business history
884*/
885proc sql;
886create table work.bitemp4_updated as select distinct a.*
887 from work.bitemp0_append a
888 ,work.bitemp3b_newbase b
889 where 1=1
890 %do idx_pk=1 %to &pk_cnt;
891 %let idx_val=%scan(&pk,&idx_pk);
892 and a.&idx_val=b.&idx_val
893 %end;
894 and ( a.&md5_col ne b.&md5_col
895 %if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
896 OR (a.&bus_from ne b.&bus_from or a.&bus_to ne b.&bus_to)
897 %end;
898 )
899;
900
901/**
902 * This section would have been one simple step with union all
903 * but that converts short numerics into 8 bytes!
904 * so, convoluted alternative to retain the same functionality.
905 */
906
907/* base records */
908create view work.bitemp4_prep1 as
909 select 'BASE' as ___TMP___
910 ,b.*
911 from work.bitemp4_updated a
912 ,work.bitemp3b_newbase b
913 where 1
914 %do idx_pk=1 %to &pk_cnt;
915 %let idx_val=%scan(&pk,&idx_pk);
916 and a.&idx_val=b.&idx_val
917 %end;
918 ;
919/* updated records */
920create view work.bitemp4_prep2 as
921 select 'STAG' as ___TMP___ ,*
922 from work.bitemp4_updated;
923/* ensure we only keep columns that appear in both */
924%local bp1 bp2 bp3 bp4;
925%let bp1=%mf_getvarlist(bitemp4_prep1);
926%let bp2=%mf_getvarlist(bitemp4_prep2);
927%let bp3=%mf_wordsInStr1ButNotStr2(Str1=&bp1,Str2=&bp2);
928%let bp4=%mf_wordsInStr1ButNotStr2(Str1=&bp2,Str2=&bp1);
929data work.bitemp4_prep3/view=bitemp4_prep3;
930 set bitemp4_prep1 bitemp4_prep2;
931%if %length(XX&bp3&bp4)>2 %then %do;
932 drop &bp3 &bp4 ;
933%end;
934run;
935/* remove duplicates */
936proc sql;
937create table work.bitemp4a_allrecs as
938 select distinct *
939 from work.bitemp4_prep3
940 order by
941 /* compress blanks and then insert commas (as the datetime fields
942 may not be in use) */
943 %sysfunc(tranwrd(%sysfunc(compbl(
944 &pk &bus_from &bus_to &processed
945 )),%str( ), %str(,)))
946 ;
947
948%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
949 /* this section aligns the business dates
950 (eg for inserts or overlaps in the range) */
951 data work.bitemp4b_firstpass (drop=___TMP___cond ___TMP___from ___TMP___to );
952 set work.bitemp4a_allrecs;
953 by &pk &bus_from &bus_to &processed;
954 retain ___TMP___cond 'Name of Condition';
955 retain ___TMP___from ___TMP___to 0;
956 ___TMP___md5lag=lag(&md5_col);
957 /* reset retained variables */
958 if first.&idx_val then do;
959 call missing (___TMP___cond, ___TMP___from, ___TMP___to,___TMP___md5lag);
960 end;
961 else do;
962 /* if record is identical, carry forward bus_from (and bus_to if higher)*/
963 if &md5_col=___TMP___md5lag then do;
964 &bus_from=___TMP___from;
965 if &bus_to<___TMP___to then &bus_to=___TMP___to;
966 end;
967 end;
968
969 if ___TMP___='STAG' then do;
970 /* need to carry forward the closing record */
971 ___TMP___cond='Condition 1';
972 end;
973 else if ___TMP___cond='Condition 1' then do;
974 /* else ensure bus_from starts from prior record bus_to */
975 if &md5_col ne ___TMP___md5lag and &bus_from <= ___TMP___to
976 then &bus_from= ___TMP___to;
977 /* new record may replace old record entirely */
978 if &bus_to <= &bus_from then delete;
979 else call missing (___TMP___cond, ___TMP___from, ___TMP___to);
980 end;
981 ___TMP___from=&bus_from;
982 ___TMP___to=&bus_to;
983 run;
984%end;
985%else %do;
986 /* keep staged records only */
987 data work.bitemp4b_firstpass;
988 set work.bitemp4a_allrecs;
989 if ___TMP___='STAG';
990 run;
991%end;
992
993/* next phase is to pass through in reverse - so set up the sort statement */
994%local byvar;
995%do idx_pk=1 %to &pk_cnt;
996 %let byvar=&byvar descending %scan(&pk,&idx_pk);
997%end;
998%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL
999%then %let byvar=&byvar descending &bus_from descending &bus_to;
1000/* if matching bus dates supplied, need to ensure we also have a sort
1001 between BASE and STAGING tables */
1002%let byvar=&byvar descending ___TMP___;
1003
1004proc sort data=work.bitemp4b_firstpass out=work.bitemp4c_sort ;
1005 by &byvar;
1006run;
1007
1008/**
1009* Now (in reverse) pass back business start dates
1010*/
1011data work.bitemp4d_secondpass;
1012%if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
1013 &tech_from=&now;
1014 &tech_to=&high_date;
1015%end;
1016 set work.bitemp4c_sort ;
1017 by &byvar;
1018 retain ___TMP___cond 'Name of Condition';
1019 retain ___TMP___from ___TMP___to 0;
1020%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
1021/* put / _all_ /;*/
1022 ___TMP___md5lag=lag(&md5_col);
1023 if first.&idx_val then do;
1024 /* reset retained variables */
1025 call missing (___TMP___cond,___TMP___from,___TMP___to,___TMP___md5lag);
1026 end;
1027 else do;
1028 /* if record is identical, carry back bus_to */
1029 if &md5_col=___TMP___md5lag then &bus_to=___TMP___to;
1030 end;
1031
1032 if ___TMP___='STAG' then do;
1033 /* need to carry forward the closing record */
1034 ___TMP___cond='Condition 2';
1035 end;
1036 else if ___TMP___cond='Condition 2' then do;
1037 /* else ensure bus_to stops at subsequent record bus_from */
1038 if &md5_col ne ___TMP___md5lag and &bus_to >= ___TMP___from
1039 then &bus_to= ___TMP___from;
1040 /* new record may replace old record entirely */
1041 if &bus_from >= &bus_to then delete;
1042 if &bus_from=___TMP___from and &bus_to=___TMP___to then delete;
1043 else call missing (___TMP___cond, ___TMP___from, ___TMP___to);
1044 end;
1045 ___TMP___from=&bus_from;
1046 ___TMP___to=&bus_to;
1047
1048%end;
1049run;
1050%put syscc (line600)=&syscc;
1051/**
1052 There may still be some records (eg old business history) which have not
1053 changed.
1054 Need to identify these and remove from the append so they are not updated
1055 unnecessarily. This is done by generating a new md5 (which INCLUDES the
1056 business key) and any matching / identical records are split out (from those
1057 that need to be updated).
1058*/
1059
1060%if &loadtype=BITEMPORAL %then %do;
1061 %let cat_string=catx('|' ,&bus_from,&bus_to);
1062
1063 data work.bitemp5a_lkp (keep=&md5_col)
1064 %if "%substr(&sysver,1,1)" ne "4" & "%substr(&sysver,1,1)" ne "5" %then %do;
1065 /nonote2err
1066 %end;
1067 ;
1068 set work.bitemp0_base;
1069 /* for BITEMPORAL we need to compare business dates also */
1070 &md5_col=put(md5(&cat_string!!'|'!!&stripcols),$hex32.);
1071 run;
1072
1073 data bitemp5b_updates;
1074 set bitemp4d_secondpass;
1075 if _n_=1 then do;
1076 dcl hash md5_lkp(dataset:'bitemp5a_lkp');
1077 md5_lkp.definekey("&md5_col");
1078 md5_lkp.definedone();
1079 end;
1080 /* drop old md5 col as will rebuild with new business dates */
1081 &md5_col=put(md5(&cat_string!!'|'!!&stripcols),$hex32.) ;
1082 if md5_lkp.check()=0 then delete;
1083 run;
1084
1085 proc sql;
1086 /* get min bus from as will update (close out) all records from this point
1087 (for that PK)*/
1088 create table work.bitemp5d_subquery as
1089 select &pk_comma, min(&bus_from)as &bus_from, max(&bus_to) as &bus_to
1090 from work.bitemp5b_updates
1091 group by &pk_comma;
1092 /* index has a huge efficiency impact on upcoming nested subquery */
1093 create index index1 on work.bitemp5d_subquery(&pk_comma,&bus_from, &bus_to);
1094
1095 %let lastds=work.bitemp5b_updates;
1096%end;
1097%else %if &loadtype=TXTEMPORAL or &loadtype=UPDATE %then %do;
1098 proc sql;
1099 create table work.bitemp5d_subquery as
1100 select distinct &pk_comma
1101 from bitemp4d_secondpass;
1102 %let lastds=work.bitemp4d_secondpass;
1103%end;
1104%else %let lastds=work.bitemp4d_secondpass;
1105
1106/* create single append table (an overlapped pre-sert may be classed as
1107 both an update AND a new record). Also create temp views that may be
1108 used for pre-load analysis. */
1109data &outds_mod;
1110 set &lastds(drop=___TMP___: &md5_col);
1111run;
1112
1113data bitemp6_allrecs / view=bitemp6_allrecs;
1114 set &outds_mod /* UPDATED records */
1115 &outds_add /* NEW records */;
1116run;
1117
1118proc sort data=work.bitemp6_allrecs
1119 out=work.bitemp6_unique
1120 noduprec
1121 dupout=work.xx_BADBADBAD;
1122by _all_;
1123run;
1124
1125/* we have all our temp tables now so exit if this is all that is needed */
1126%if &LOADTARGET ne YES %then %return;
1127
1128/* also exit if an err condition exists */
1129
1130%if &syscc>0 %then %do;
1131 %put syscc=&syscc;
1132 %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
1133 ctl_ds=&dclib..mpe_lockanytable
1134 )
1135 %if "&outds_audit" ne "0" %then %do;
1136 %mp_lockanytable(UNLOCK
1137 ,lib=%scan(&outds_audit,1,.)
1138 ,ds=%scan(&outds_audit,2,.)
1139 ,ref=&ETLSOURCE
1140 ,ctl_ds=&dclib..mpe_lockanytable
1141 )
1142 %end;
1143%end;
1144%mp_abort(iftrue= (&syscc>0)
1145 ,mac=&sysmacroname in &_program
1146 ,msg=%str(Bitemporal transform / job aborted due to SYSCC=&SYSCC status)
1147)
1148
1149/* final check - abort if a lock has appeared on the target or audit table */
1150%mp_lockfilecheck(libds=&base_lib..&base_dsn)
1151%if %mf_existds(&outds_audit) %then %do;
1152 %mp_lockfilecheck(libds=&outds_audit)
1153%end;
1154
1155/**
1156* STAGING TABLES PREPARED, ERR CONDITION TESTED FOR.. NOW TO LOAD!!
1157*/
1158
1159/**
1160* First, CLOSE OUT changed records (if not a REPLACE)
1161* Note that SAS does not support ANSI standard for UPDATE with a join condition.
1162* However - this can be worked around using a nested subquery..
1163*/
1164data _null_;
1165 putlog "&sysmacroname: CLOSEOUTS commencing";
1166run;
1167
1168%if %mf_getattrn(&lastds,NLOBS)=0 %then %do;
1169 data _null_;
1170 putlog "&sysmacroname: No closeouts needed";
1171 run;
1172%end;
1173%else %if &engine_type=CAS %then %do;
1174 %mp_abort(iftrue= (&loadtype=BITEMPORAL or &loadtype=TXTEMPORAL)
1175 ,mac=&sysmacroname in &_program
1176 ,msg=%str(&loadtype not yet supported in CAS engine)
1177 )
1178 /* create temp table for deletions */
1179 %local delds;%let delds=%mf_getuniquename(prefix=DC);
1180 data casuser.&delds;
1181 set work.bitemp5d_subquery;
1182 run;
1183 /* delete the records */
1184 proc cas ;
1185 table.deleteRows / table={
1186 caslib="&base_lib",
1187 name="&base_dsn",
1188 where="1=1",
1189 whereTable={caslib='CASUSER',name="&delds"}
1190 };
1191 quit;
1192 /* drop temp table */
1193 proc sql;
1194 drop table CASUSER.&delds;
1195%end;
1196%else %if (&loadtype=BITEMPORAL or &loadtype=TXTEMPORAL or &loadtype=UPDATE)
1197%then %do;
1198 data _null_;
1199 putlog "&sysmacroname: &loadtype operation using &engine_type engine";
1200 run;
1201 %local flexinow;
1202 proc sql;
1203 /* if OLEDB then create a temp table for efficiency */
1204 %local innertable;
1205 %if &engine_type=OLEDB %then %do;
1206 %let innertable=[&temp_table];
1207 %let top_table=[dbo].&base_dsn;
1208 %let flexinow=&SQLNOW;
1209 create table &base_lib.."&temp_table"n as
1210 select * from work.bitemp5d_subquery;
1211 /* open up a connection for pass through SQL */
1212 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
1213 execute(
1214 %end;
1215 %else %if &engine_type=REDSHIFT or &engine_type=POSTGRES or &engine_type=SNOW
1216 %then %do;
1217 %let innertable=%upcase(%mf_getuniquename(prefix=XDCTEMP));
1218 %let top_table=&baselib_schema.&base_dsn;
1219 %let flexinow=timestamp &SQLNOW;
1220 /* make empty table first - must clone & drop extra cols
1221 as autoload is bad */
1222 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
1223 %if &engine_type=SNOW %then %do;
1224 exec (create transient table &baselib_schema.&innertable
1225 like &baselib_schema.&base_dsn
1226 ) by myAlias;
1227 %end;
1228 %else %do;
1229 exec (create table &innertable
1230 (like &baselib_schema.&base_dsn)
1231 ) by myAlias;
1232 %if &engine_type=REDSHIFT %then %do;
1233 exec (alter table &innertable alter sortkey none) by myAlias;
1234 %end;
1235 %end;
1236 %let dropcols=%mf_wordsinstr1butnotstr2(
1237 str1=%upcase(%mf_getvarlist(&basecopy))
1238 ,str2=%upcase(%mf_getvarlist(work.bitemp5d_subquery))
1239 );
1240 %if %length(&dropcols>0) %then %do idx_pk=1 %to %sysfunc(countw(&dropcols));
1241 %put &=dropcols;
1242 %let idx_val=%scan(&dropcols,&idx_pk);
1243 exec(alter table &innertable drop column &idx_val;) by myAlias;;
1244 %end;
1245 /* create view to strip formats and avoid warns in log */
1246 data work.vw_bitemp5d/view=work.vw_bitemp5d;
1247 set work.bitemp5d_subquery;
1248 format _all_;
1249 run;
1250 proc append base=&base_lib..&innertable (
1251 %do idx_pk=1 %to &redcnt;
1252 &&rednm&idx_pk = &&redval&idxpk
1253 %end;
1254 )
1255 data=work.vw_bitemp5d force nowarn;
1256 run;
1257 /* open up a connection for pass through SQL */
1258 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
1259 execute(
1260 %end;
1261 %else %do;
1262 %let innertable=bitemp5d_subquery;
1263 %let top_table=&base_lib..&base_dsn;
1264 %let flexinow=&now;
1265 %end;
1266
1267
1268 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
1269 update &top_table set &tech_to=&flexinow
1270 %if %length(&processed)>0 %then %do;
1271 ,&processed=&flexinow
1272 %end;
1273 where &tech_from <= &flexinow and &flexinow < &tech_to and
1274 %end;
1275 %else %if &loadtype=UPDATE %then %do;
1276 /* changed records are deleted then re-appended when doing UPDATEs */
1277 delete from &top_table where
1278 %end;
1279 %else %do;
1280 %put %str(ERR)OR: BUSTEMPORAL NOT YET SUPPORTED;
1281 %let syscc=5;
1282 %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
1283 ctl_ds=&dclib..mpe_lockanytable
1284 )
1285 %mp_lockanytable(UNLOCK
1286 ,lib=%scan(&outds_audit,1,.)
1287 ,ds=%scan(&outds_audit,2,.)
1288 ,ref=&ETLSOURCE
1289 ,ctl_ds=&dclib..mpe_lockanytable
1290 )
1291 %goto end_of_macro;
1292 %end;
1293
1294 /* perform join inside query as per
1295 http://stackoverflow.com/questions/24629793/update-with-a-proc-sql */
1296
1297 exists( select 1 from &baselib_schema.&innertable where
1298
1299 /* loop PK join */
1300 %do idx_pk=1 %to &pk_cnt;
1301 %let idx_val=%scan(&pk,&idx_pk);
1302 &base_dsn..&idx_val=&innertable..&idx_val and
1303 %end;
1304 %if &loadtype=BITEMPORAL %then %do;
1305 &base_dsn..&bus_from >= &innertable..&bus_from
1306 and &base_dsn..&bus_to <= &innertable..&bus_to and
1307 %end;
1308
1309 /* close the statement */
1310
1311 1=1);
1312
1313 %if &engine_type=OLEDB or &engine_type=REDSHIFT or &engine_type=POSTGRES
1314 or &engine_type=SNOW
1315 %then %do;
1316 ) by myAlias;
1317 execute (drop table &baselib_schema.&innertable) by myAlias;
1318 %end;
1319%end;
1320quit;
1321data _null_;
1322 putlog "&sysmacroname: Closeout complete";
1323run;
1324/**
1325 * Append the new / updated records
1326 */
1327%if &engine_type=CAS %then %do;
1328
1329 /* get varchar variables ready for casting */
1330 %local vcfmt vcrename vcassign vcdrop;
1331 data _null_;
1332 set work.bitemp_cols(where=(type=6)) end=last;
1333 length vcrename vcassign vcdrop vcfmt $32767 rancol $32;
1334 retain vcrename vcassign vcdrop vcfmt;
1335 if _n_=1 then vcrename='(rename=(';
1336 rancol=resolve('%mf_getuniquename()');
1337 vcfmt=trim(vcfmt)!!'length '!!cats(name)!!' varchar(*);';
1338 vcrename=trim(vcrename)!!' '!!cats(name,'=',rancol);
1339 vcassign=cats(vcassign,name,'=',rancol,';');
1340 vcdrop=cats(vcdrop,'drop '!!rancol,';');
1341 if last then do;
1342 vcrename=cats(vcrename,'))');
1343 call symputx('vcfmt',vcfmt);
1344 call symputx('vcrename',vcrename);
1345 call symputx('vcassign',vcassign);
1346 call symputx('vcdrop',vcdrop);
1347 end;
1348 run;
1349
1350 /* prepare a temp cas table with varchars casted */
1351 %let tmp=%mf_getuniquename();
1352 data casuser.&tmp ;
1353 &vcfmt
1354 set work.bitemp6_unique &vcrename;
1355 &vcassign
1356 &vcdrop
1357 run;
1358
1359 /* load the table with varchars applied*/
1360 data &base_lib..&base_dsn (append=yes )/sessref=dcsession ;
1361 set casuser.&tmp;
1362 run;
1363
1364 /* drop temp table */
1365 proc sql;
1366 drop table CASUSER.&tmp;
1367
1368 /* this code will not work as regular tables do not have varchars */
1369 /*
1370 proc casutil;
1371 load data=work.bitemp6_unique
1372 outcaslib="&base_lib" casout="&base_dsn" append ;
1373 quit;
1374 */
1375%end;
1376%else %if &engine_type=REDSHIFT or &engine_type=POSTGRES %then %do;
1377 proc append base=&base_lib..&base_dsn
1378 %if &engine_type=REDSHIFT %then %do;
1379 (
1380 %do idx_pk=1 %to &redcnt;
1381 &&rednm&idx_pk = &&redval&idxpk
1382 %end;
1383 )
1384 %end;
1385 data=bitemp6_unique force nowarn;
1386 run;
1387%end;
1388%else %do;
1389 proc append base=&base_lib..&base_dsn data=bitemp6_unique force nowarn; run;
1390%end;
1391
1392%mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
1393 ctl_ds=&dclib..mpe_lockanytable
1394)
1395
1396/* final check on syscc */
1397%mp_abort(iftrue= (&syscc >4)
1398 ,mac=&_program
1399 ,msg=%str(!!Upload NOT successful!! Failed on actual update / append stage..)
1400)
1401
1402%if &outds_audit ne 0 and &LOADTARGET=YES %then %do;
1403 data work.vw_outds_orig /view=work.vw_outds_orig;
1404 set work.bitemp0_base (drop=&md5_col);
1405 where ___TMP___NEW_FLG=0;
1406 drop ___TMP___NEW_FLG;
1407 run;
1408 /* update the AUDIT table */
1409 %if %mf_existds(&outds_audit) %then %do;
1410 options mprint;
1411 %mp_storediffs(&base_lib..&base_dsn
1412 ,work.vw_outds_orig
1413 ,&pk &bus_from
1414 ,delds=&outds_del
1415 ,modds=&outds_mod
1416 ,appds=&outds_add
1417 ,outds=work.mp_storediffs
1418 ,processed_dttm=&now
1419 ,loadref=%superq(etlsource)
1420 )
1421 /* exclude unchanged values in modified rows */
1422 data work.mp_storediffs;
1423 set work.mp_storediffs;
1424 if MOVE_TYPE="M" and IS_PK=0 and IS_DIFF=0 then delete;
1425 * putlog load_ref= libref= dsn= key_hash= tgtvar_nm=;
1426 run;
1427 proc append base=&outds_audit data=work.mp_storediffs;
1428 run;
1429 %mp_lockanytable(UNLOCK
1430 ,lib=%scan(&outds_audit,1,.)
1431 ,ds=%scan(&outds_audit,2,.)
1432 ,ref=&ETLSOURCE
1433 ,ctl_ds=&dclib..mpe_lockanytable
1434 )
1435 %end;
1436%end;
1437%mp_abort(iftrue= (&syscc >4)
1438 ,mac=bitemporal_dataloader
1439 ,msg=%str(Problem in audit stage (&outds_audit))
1440)
1441
1442%let user=%mf_getUser();
1443/**
1444 Notify as appropriate EMAILS DISABLED
1445
1446%sumo_alerts(ALERT_EVENT=UPDATE
1447 , ALERT_TARGET=&base_lib..&base_dsn
1448 , from_user= &user);
1449*/
1450/* monitor BiTemporal usage */
1451%if &log=1 %then %do;
1452 %put syscc=&syscc;
1453 /* do not perform duration calc in pass through */
1454 %local dur;
1455 data _null_;
1456 now=symget('now');
1457 dur=%sysfunc(datetime())-&now;
1458 call symputx('dur',dur,'l');
1459 run;
1460 proc sql;
1461 insert into &dclib..mpe_dataloads
1462 set libref=%upcase("&base_lib")
1463 ,DSN=%upcase("&base_dsn")
1464 ,ETLSOURCE="&ETLSOURCE"
1465 ,LOADTYPE="&loadtype"
1466 ,CHANGED_RECORDS=%mf_getattrn(&lastds,NLOBS)
1467 ,NEW_RECORDS=%mf_getattrn(&outds_add,NLOBS)
1468 ,DELETED_RECORDS=%mf_getattrn(&outds_del,NLOBS)
1469 ,DURATION=&dur
1470 ,MAC_VER="v&ver"
1471 ,user_nm="&user"
1472 ,PROCESSED_DTTM=&now;
1473 quit;
1474 %put syscc=&syscc;
1475%end;
1476%end_of_macro:
1477%mend bitemporal_dataloader;