Loading...
Searching...
No Matches
bitemporal_dataloader.sas
Go to the documentation of this file.
1/**
2 @file
3 @brief Routine supporting multiple load types
4 @details Generic loader for multiple load types (UPDATE, SCD2, BITEMPORAL).
5
6 Handles all elements including metadata validation, PK checking, closeouts,
7 locking, logging, etc.
8
9 The staging table must be prepared with a unique business key. For bitemporal
10 this means a snapshot at both technical AND business time.
11
12ASSUMPTIONS:
13 - Base table has relevant datetime vars: 2xTechnical, 2xBusiness, 1xProcessed
14 - Staging table omits Technical or Processed datetimes (has Business only)
15 - Base table has no column names containing the string "___TMP___"
16 - Base &tech_from variable is not nullable. This should always be the case
17 anyway whenbuilding permanent bitemporal datasets.. But the point is that
18 this field is used to identify new records after the initial left join
19 from staging to base table.
20
21NOTES:
22 - All queries against BiTemporal tables need two filter conditions, as such:
23
24 where &bus_from LE [tstamp] LT &bus_to
25 AND &tx_from LE [tstamp] LT &tx_to
26
27 One cannot use BETWEEN
28 One cannot use &xx_from LE [tstamp] LE &xx_from (equivalent to above).
29 Background:
30http://stackoverflow.com/questions/20005950/best-practice-for-scd-date-pairs-closing-opening-timestamps
31
32Areas for optimisation
33 - loading temporal history (currently experimental)
34
35 ## Supporting tables
36
37 Supporting tables must exist in the library specified in the `dclib` param.
38
39 ### MPE_DATALOADS
40
41 This table is updated every time a successful load occurs, and includes
42 information such as:
43
44 @li library
45 @li dataset
46 @li message (supplied in the ETLSOURCE param)
47 @li new rows
48 @li deleted rows
49 @li changed rows
50 @li timestamp
51 @li the user making the load
52 @li the version of (this) macro used to make the load
53
54
55 @param [in] APPEND_DSN= (APPENDTABLE) Name of STAGING table
56 @param [in] CONFIG_TABLE= (&dclib..MPE_CONFIG) The table containing library
57 engine specific config. The following scopes are supported:
58 @li DCBL_REDSH
59 @param [in] LOADTYPE= (BITEMPORAL) Supported types:
60 @li TXTEMPORAL - loads a buskey with version times
61 @li BUSTEMPORAL - loads buskey with bus + ver times
62 @li UPDATE - updates a buskey with NO history
63 @param [in] PROCESSED= (0) This column obtains a current timestamp for changed
64 records when loading the target table. Default is 0 (not set). If the
65 target table contains a variable called PROCESSED_DTTM, and processed=0,
66 then this column will be used for applying the current timestamp.
67 @param RK_MAXKEYTABLE= (mpe_maxkeyvalues) The maxkeytable to use (must exist
68 in DCLIB)
69 @param [in] PK= Business key, space separated. Should NOT include temporal
70 fields.
71 @param [in] RK_UNDERLYING= If supplied will generate an RK based on these
72 (space separated) business key fields. In this case only ONE PK field should
73 be supplied, which is assumed to be the RK. The RK field, plus underlying
74 fields, should all exist on the base table. The underlying fields should
75 exist on the staging table (the RK / PK field will be overwritten).
76 The staging table should also be unique on its PK.
77
78 @param [in] dclib= (&dc_libref) The library containing DC configuration tables
79 @param [out] outds_del= (work.outds_del) Output table containing
80 deleted records
81 @param [out] outds_add= (work.outds_add) Output table containing
82 appended records
83 @param [out] outds_mod= (work.outds_mod) Output table containing
84 changed records
85 @param [out] outds_audit= (0) Load detailed changes to an audit table. Uses
86 the mp_storediffs.sas macro. Provide the base table here, to load.
87
88 <h4> Global Variables </h4>
89 The following global macro variables are used. These should be replaced by
90 macro parameters in future releases.
91
92 @li `dc_dttmtfmt`
93
94 <h4> SAS Macros </h4>
95 @li bitemporal_closeouts.sas
96 @li dc_assignlib.sas
97 @li mf_existds.sas
98 @li mf_existvar.sas
99 @li mf_fmtdttm.sas
100 @li mf_getattrn.sas
101 @li mf_getengine.sas
102 @li mf_getschema.sas
103 @li mf_getuniquefileref.sas
104 @li mf_getuniquename.sas
105 @li mf_getuser.sas
106 @li mf_getvarlist.sas
107 @li mf_verifymacvars.sas
108 @li mf_wordsinstr1butnotstr2.sas
109 @li mp_abort.sas
110 @li mp_dropmembers.sas
111 @li mp_lockanytable.sas
112 @li mp_lockfilecheck.sas
113 @li mp_retainedkey.sas
114 @li mp_storediffs.sas
115
116 @version 9.3
117 @author 4GL Apps Ltd.
118 @copyright 4GL Apps Ltd. This code may only be used within Data Controller
119 and may not be re-distributed or re-sold without the express permission of
120 4GL Apps Ltd.
121
122 @warning multitemporal loads (bitemporal for multiple points in business time)
123 are in experimental stage
124
125**/
126
127%macro bitemporal_dataloader(
128 bus_from= /* Business FROM datetime variable. Req'd on
129 STAGING & BASE tables.*/
130 ,bus_to = /* Business TO datetime variable. Req'd on
131 STAGING & BASE tables. */
132 ,bus_from_override= /* Provide a hard coded BUS_FROM datetime value.*/
133 ,bus_to_override= /* provide a hard coded BUS_TO datetime value */
134 ,tech_from= /* Technical FROM datetime variable. Req'd on
135 BASE table only. */
136 ,tech_to = /* Technical TO datetime variable. Req'd on BASE
137 table only. */
138 ,processed= 0
139 ,base_lib=WORK /* Libref of the BASE table. */
140 ,base_dsn=BASETABLE /* Name of BASE table. */
141 ,append_lib=WORK /* Libref of the STAGING table. */
142 ,append_dsn=APPENDTABLE
143 ,high_date='01JAN5999:00:00:00'dt /* High date to close out records */
144 ,PK= name sex
145 ,RK_UNDERLYING=
146 ,KEEPVARS= /* Provides option for removing unwanted vars from append table */
147 ,RK_UPDATE_MAXKEYTABLE=NO /* If switching (or mix matching) with regular
148 SCD2 loader then set this switch to YES to
149 ensure the MAXKEYTABLE is updated with the
150 current maximum RK value for the target table
151 */
152 ,CHECK_UNIQUENESS=YES /* Perform a check of the APPEND table to ensure it is
153 unique on its business key */
154 ,ETLSOURCE=demo /* supply a value ($50.) to show as ETLSOURCE in
155 &dclib..DATALOADS */
156 ,LOADTYPE=BITEMPORAL
157 ,RK_MAXKEYTABLE= mpe_maxkeyvalues
158 ,LOG=1 /* Switch to 0 to prevent records being added to
159 &mpelib..mpe_DATALOADS (ie when testing)*/
160 ,DELETE_COL= _____DELETE__THIS__RECORD_____
161 /* If this variable is found in the append dataset
162 then records are closed out (or deleted) in the
163 append table where that variable= "Yes" */
164 ,LOADTARGET=YES /* set to anything but uppercase YES to switch off
165 target table load and generate temp tables only */
166 ,CLOSE_VARS=
167/*a problem with regular SCD2 or TXTEMPORAL loads is that there is
168 no facility to close out removed records (all records are
169 assumed new or changed). But how does one determine which
170 records are removed? Short of loading the entire table
171 each time? This parameter allows a set of variables
172 (this should be a subset of the PK) to be declared, and
173 the macro will determine which records in the base table
174 need to be closed out ahead of the load.
175
176 For instance, given the following:
177
178 Base Table Staging Table
179 DATE ENTITY AMOUNT DATE ENTITY AMOUNT
180 JAN ACME4 66 JAN ACME4 66
181 FEB ACME4 99 FEB ACME4 99
182 FEB ACME1 22
183
184 By supplying DATE in CLOSE_VARS and DATE ENTITY as the PK,
185 the "FEB PAG 22" record would get closed out.
186 */
187 ,config_table=&dclib..MPE_CONFIG
188 ,dclib=&dc_libref
189 ,outds_del=work.outds_del
190 ,outds_add=work.outds_add
191 ,outds_mod=work.outds_mod
192 ,outds_audit=0
193 );
194
195/* when changing this macro, update the version num here */
196%local ver;
197%let ver=32;
198%put &sysmacroname entry vars:;
199%put _local_;
200
201%dc_assignlib(WRITE,&base_lib) /* may not already be assigned */
202
203/* return straight away if nothing to load */
204%let nobs= %mf_getattrn(&append_lib..&append_dsn,NLOBS);
205%if &nobs=-1 %then %do;
206 proc sql noprint; select count(*) into: nobs from &append_lib..&append_dsn;
207%end;
208%if &nobs=0 %then %do;
209 %put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
210 %put NOTE- Base dataset &append_lib..&append_dsn is empty. Nothing to upload!;
211 %put NOTE-;%put NOTE-;%put NOTE-;
212 %return;
213%end;
214
215/* hard exit if err condition exists */
216%mp_abort(iftrue= (&syscc > 0)
217 ,mac=bitemporal_dataloader
218 ,msg=%str(Bitemporal transform / job aborted due to SYSCC=&SYSCC status;)
219)
220
221%local engine_type;
222%let engine_type=%mf_getengine(&base_lib);
223%if (&engine_type=REDSHIFT or &engine_type=POSTGRES) and %length(&CLOSE_VARS)>0
224%then %do;
225 %put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
226 %put NOTE- CLOSE_VARS functionality not yet supported in &engine_type;
227 %put NOTE-;%put NOTE-;%put NOTE-;
228 %return;
229%end;
230
231/**
232 * The metadata functions (eg mf_existvar) will fail if the base table has a
233 * SAS lock. So, make a snapshot of the base table for further use.
234 * Also, make output tables (regardless).
235 */
236%local basecopy;
237%let basecopy=%mf_getuniquename(prefix=basecopy);
238
239data &basecopy &outds_mod &outds_add &outds_del;
240 set &base_lib..&base_dsn;
241 stop;
242run;
243%mp_abort(iftrue= (&syscc > 0)
244 ,mac=&_program
245 ,msg=%str(syscc=&syscc after base table copy - aborting due to table lock)
246)
247
248
249%local cols idx_pk md5_col ;
250%let md5_col=___TMP___md5;
251%let check_uniqueness=%upcase(&check_uniqueness);
252%let RK_UPDATE_MAXKEYTABLE=%upcase(&RK_UPDATE_MAXKEYTABLE);
253%let high_date=%unquote(&high_date);
254%let loadtype=%upcase(&loadtype);
255
256/* ensure irrelevant variables are cleared */
257%if &loadtype=BUSTEMPORAL %then %do;
258 %let tech_from=;
259 %let tech_to=;
260%end;
261%else %if &loadtype=TXTEMPORAL or &loadtype=UPDATE %then %do;
262 %let bus_from=;
263 %let bus_to=;
264%end;
265
266/* ensure relevant variables are supplied */
267%mp_abort(iftrue=(&loadtype=BITEMPORAL & %mf_verifymacvars(bus_from bus_to)=0)
268 ,mac=bitemporal_dataloader
269 ,msg=%str(Missing BUS_FROM / BUS_TO)
270)
271%mp_abort(iftrue=(&loadtype=TXTEMPORAL & %mf_verifymacvars(tech_from tech_to)=0)
272 ,mac=bitemporal_dataloader
273 ,msg=%str(Missing TECH_FROM / TECH_TO)
274)
275
276/**
277 * drop any tables (may be defined as views or vice versa preventing overwrite)
278 */
279%mp_dropmembers(append bitemp0_append bitemp_cols)
280
281/* SQL Server requires its own time values */
282/* 9.2 will only give picture format down to seconds. 9.3 allows
283 milliseconds by using lower S and defining the decimal in the format name..*/
284PROC FORMAT;
285 picture MyMSdt other='%0Y-%0m-%0dT%0H:%0M:%0S' (datatype=datetime);
286RUN;
287%local dbnow;
288%let dbnow="%sysfunc(datetime(),%mf_fmtdttm())"dt;
289
290data _null_;
291 /* convert space separated macvar to comma separated for SQL processing */
292 call symputx('PK_COMMA',tranwrd(compbl("&pk"),' ',','),'L');
293 call symputx('PK_CNT',countw("&pk",' '),'L');
294 now=&dbnow;
295 call symputx('NOW',now,'L');
296 call symputx('SQLNOW',cats("'",put(now,MyMSdt.),"'"),'L');
297 length etlsource $100;
298 etlsource=subpad(symget('etlsource'),1,100);
299 call symputx('etlsource',etlsource,'l');
300run;
301
302/**
303 * Even if no PROCESSED var provided, assume that any variable named
304 * PROCESSED_DTTM should be updated
305 */
306%if &processed=0 %then %do;
307 %if %mf_existvar(&basecopy,PROCESSED_DTTM)
308 %then %let processed=PROCESSED_DTTM;
309 %else %let processed=;
310%end;
311
312
313/* extract colnames for md5 creation / change tracking */
314proc contents noprint data=&base_lib..&base_dsn
315 out=work.bitemp_cols (keep=name type length varnum format:);
316run;
317proc sql noprint;
318select name into: cols separated by ','
319 from work.bitemp_cols
320 where upcase(name) not in
321 (%upcase("&bus_from","&bus_to"
322 ,"&tech_from","&tech_to"
323 ,"&processed","&delete_col")) ;
324select case when type in (2,6) then cats('put(md5(trim(',name,')),$hex32.)')
325 /* multiply by 1 to strip precision errors (eg 0 != 0) */
326 /* but ONLY if not missing, else will lose any special missing values */
327 else cats('put(md5(trim(put(ifn(missing('
328 ,name,'),',name,',',name,'*1),binary64.))),$hex32.)') end
329 into: stripcols separated by '||'
330 from work.bitemp_cols
331 where upcase(name) not in
332 (%upcase("&bus_from","&bus_to"
333 ,"&tech_from","&tech_to"
334 ,"&processed","&delete_col")) ;
335
336/* set default formats*/
337%let bus_from_fmt = datetime19.;
338%let bus_to_fmt = datetime19.;
339%let processed_fmt = datetime19.;
340
341%let tech_from_fmt = format=datetime19.;
342%let tech_to_fmt = format=datetime19.;
343
344
345%put &=stripcols;
346%put &=pk;
347
348data _null_;
349 set work.bitemp_cols;
350 if type=2 or type=6 then do;
351 length fmt $49.;
352 if format='' then fmt=cats('$',length,'.');
353 else fmt=cats(format,formatl,'.');
354 end;
355 else do;
356 if format='' then fmt=cats(length,'.');
357 else fmt=cats(format,formatl,'.',formatd);
358 end;
359 if upcase(name)="%upcase(&bus_from)" then
360 call symputx('bus_from_fmt',fmt,'L');
361 else if upcase(name)="%upcase(&bus_to)" then
362 call symputx('bus_to_fmt',fmt,'L');
363 else if upcase(name)="%upcase(&tech_from)" then
364 call symputx('tech_from_fmt',"format="!!fmt,'L');
365 else if upcase(name)="%upcase(&tech_to)" then
366 call symputx('tech_to_fmt',"format="!!fmt,'L');
367 else if upcase(name)="%upcase(&processed)" then
368 call symputx('processed_fmt',fmt,'L');
369run;
370
371%if %index(%quote(&cols),___TMP___) %then %do;
372 %let msg=%str(Table contains a variable name containing "___TMP___".%trim(
373 ) This may conflict with temp variable generation!!);
374 %mp_abort(msg=&msg,mac=bitemporal_dataloader);
375 %let syscc=5;
376 %return;
377%end;
378
379/* if transaction dates appear on the APPEND table, need to remove them */
380%local drop_tx_dates /* used in append table */
381 drop_tx_dates_noobs /* used to take the base table structure */;
382%if %mf_existvar(&append_lib..&append_dsn, &tech_from)
383 %then %let drop_tx_dates=&tech_from;
384%if %mf_existvar(&append_lib..&append_dsn, &tech_to)
385 %then %let drop_tx_dates=&drop_tx_dates &tech_to;
386%if %length(%trim(&drop_tx_dates))>0
387 %then %let drop_tx_dates=(drop=&drop_tx_dates);
388
389%if %mf_existvar(&basecopy, &tech_from)
390 %then %let drop_tx_dates_noobs=&tech_from;
391%if %mf_existvar(&basecopy, &tech_to)
392 %then %let drop_tx_dates_noobs=&drop_tx_dates_noobs &tech_to;
393%if %length(%trim(&drop_tx_dates_noobs))>0
394 %then %let drop_tx_dates_noobs=(drop=&drop_tx_dates_noobs obs=0);
395%else %let drop_tx_dates_noobs=(obs=0);
396
397
398/**
399 * Lock the table. This is necessary as we are doing a two part update (first
400 * closing records then appending new records). It is theoretically possible
401 * that an upload may occur whilst preparing the staging tables. And the
402 * staging tables are about to be prepared..
403 */
404%if &LOADTARGET = YES %then %do;
405 %put locking &base_lib..&base_dsn;
406 %mp_lockanytable(LOCK,
407 lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,ctl_ds=&dclib..mpe_lockanytable
408 )
409 %if "&outds_audit" ne "0" %then %do;
410 %put locking &outds_audit;
411 %mp_lockanytable(LOCK
412 ,lib=%scan(&outds_audit,1,.)
413 ,ds=%scan(&outds_audit,2,.)
414 ,ref=&ETLSOURCE
415 ,ctl_ds=&dclib..mpe_lockanytable
416 )
417 %end;
418%end;
419%else %do;
420 /* not an actual load, so avoid updating the max key table in next step. */
421 %let rk_update_maxkeytable=NO;
422%end;
423
424%if %length(&RK_UNDERLYING)>0 %then %do;
425 %mp_retainedkey(
426 base_lib=&base_lib
427 ,base_dsn=&base_dsn
428 ,append_lib=&append_lib
429 ,append_dsn=&append_dsn
430 ,retained_key=&pk
431 ,business_key=&rk_underlying
432 ,check_uniqueness=&CHECK_UNIQUENESS
433 ,outds=work.append
434 %if &rk_update_maxkeytable=NO %then %do;
435 ,maxkeytable=0
436 %end;
437 %else %do;
438 ,maxkeytable=&dclib..&RK_MAXKEYTABLE
439 %end;
440 ,locktable=&dclib..mpe_lockanytable
441 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
442 ,filter_str=%str( (where=( &now < &tech_to)) )
443 %end;
444 )
445%end;
446%else %do;
447 proc sql;
448 create view work.append as select * from &append_lib..&append_dsn;
449%end;
450/**
451* generate md5 for append table
452*/
453/* it is possible the source dataset has additional (unwanted) columns.
454 Drop if specified; */
455%if %length(&keepvars)>0 %then %do;
456 /* remove tech dates from keepvars as they are generated later */
457 %let keepvars=%sysfunc(tranwrd(%str( &keepvars ),%str( &tech_from ),%str( )));
458 %let keepvars=%sysfunc(tranwrd(%str( &keepvars ),%str( &tech_to ),%str( )));
459 %let keepvars=(keep=&keepvars &bus_from &bus_to &processed &md5_col);
460%end;
461
462/* CAS varchar types cause append issues here, so perform autoconvert
463 by creating empty local table first */
464data;
465 set &base_lib..&base_dsn &drop_tx_dates_noobs;
466run;
467%local emptybasetable; %let emptybasetable=&syslast;
468
469data work.bitemp0_append &keepvars &outds_del(drop=&md5_col )
470 %if "%substr(&sysver,1,1)" ne "4" and "%substr(&sysver,1,1)" ne "5" %then %do;
471 /nonote2err
472 %end;
473 ;
474 /* apply formats for bitemporal vars but not tx dates which are added later */
475 %if %length(&keepvars)>0 and &loadtype=BITEMPORAL %then %do;
476 format &bus_from &bus_from_fmt;
477 format &bus_to &bus_to_fmt;
478 %end;
479 set &emptybasetable /* base table reqd in case append has fewer cols */
480 work.append &drop_tx_dates;
481 %if %length(%str(&bus_from_override))>0 %then %do;
482 &bus_from= %unquote(&bus_from_override) ;
483 %end;
484 %if %length(%str(&bus_to_override))>0 %then %do;
485 &bus_to= %unquote(&bus_to_override) ;
486 %end;
487 length &md5_col $32;
488 &md5_col=put(md5(&stripcols),hex32.);
489 %if %length(&processed)>0 %then %do;
490 format &processed &processed_fmt;
491 &processed=&now;
492 %end;
493
494/**
495 * If a delete column exists then create the delete dataset
496 */
497%if %mf_existvar(&append_lib..&append_dsn, &delete_col) %then %do;
498 drop &delete_col;
499 if upcase(&delete_col) = "YES" then output &outds_del ;
500 else output work.bitemp0_append ;
501 run;
502
503 %if %mf_getattrn(&outds_del,NLOBS)>0 %then %do;
504 %bitemporal_closeouts(
505 tech_from=&tech_from
506 ,tech_to = &tech_to
507 ,base_lib=&base_lib
508 ,base_dsn=&base_dsn
509 ,append_lib=work
510 ,append_dsn=%scan(&outds_del,-1,.)
511 ,PK=&bus_from &pk
512 ,NOW=&dbnow
513 ,loadtarget=&loadtarget
514 ,loadtype=&loadtype
515 )
516 %end;
517%end;
518%else %do;
519 output work.bitemp0_append;
520 run;
521%end;
522
523%mp_abort(iftrue= (&syscc gt 0 at line 494)
524 ,mac=&_program
525 ,msg=%str(syscc=&syscc)
526)
527
528%if %length(&close_vars)>0 %then %do;
529 /**
530 * need to close out records that are not provided
531 */
532 proc sql;
533 create table bitemp1_closevars1 as
534 select distinct a.%mf_getquotedstr(in_str=&pk,dlm=%str(,a.),quote=)
535 from &base_lib..&base_dsn a
536 inner join work.bitemp0_append b
537 on 1=1
538 /* join on closevars key */
539 %do idx_pk=1 %to %sysfunc(countw(&close_vars));
540 %let idx_val=%scan(&close_vars,&idx_pk);
541 and a.&idx_val=b.&idx_val
542 %end;
543 /* filter base on tech dates if necessary */
544 %if &loadtype=TXTEMPORAL %then %do;
545 where a.&tech_from <=&now and &now < a.&tech_to
546 %end;
547 ;
548 create table bitemp1_closevars2 as
549 select distinct a.*
550 from bitemp1_closevars1 a
551 left join work.bitemp0_append b
552 on 1=1
553 /* join on primary key */
554 %do idx_pk=1 %to %sysfunc(countw(&pk));
555 %let idx_val=%scan(&pk,&idx_pk);
556 and a.&idx_val=b.&idx_val
557 %end;
558 /* identify removed records by null value in a field in PK but not close_vars
559 */
560 where b.%scan(
561 %mf_wordsInStr1ButNotStr2(Str1=&pk,Str2=&close_vars),1,%str( )
562 ) IS NULL
563 ;
564
565 %if %mf_getattrn(bitemp1_closevars2,NLOBS)>0 %then %do;
566 %bitemporal_closeouts(
567 tech_from=&tech_from
568 ,tech_to = &tech_to
569 ,base_lib=&base_lib
570 ,base_dsn=&base_dsn
571 ,append_lib=work
572 ,append_dsn=bitemp1_closevars2
573 ,PK=&bus_from &pk
574 ,NOW=&dbnow
575 ,loadtarget=&loadtarget
576 ,loadtype=&loadtype
577 )
578 %end;
579%end;
580
581/* return if nothing to load (was just deletes) */
582%if %mf_getattrn(work.bitemp0_append,NLOBS)=0 %then %do;
583 %put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
584 %put NOTE- No updates - just deletes!;
585 %put NOTE-;%put NOTE-;%put NOTE-;
586%end;
587
588
589/**
590 * If applying manual overrides to business dates, then the input table MUST
591 * be unique on the PK. Check, and if not - abort.
592 */
593%local msg;
594%if %length(&bus_from_override.&bus_to_override)>0 or &CHECK_UNIQUENESS=YES
595%then %do;
596 proc sort data=work.bitemp0_append out=work.bitemp0_check nodupkey;
597 by &pk;
598 run;
599 %if %mf_getattrn(work.bitemp0_check,NLOBS)
600 ne %mf_getattrn(work.bitemp0_append,NLOBS)
601 %then %do;
602 %let msg=INPUT table &append_lib..&append_dsn is not unique on PK (&pk);
603 %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE (&msg),
604 ctl_ds=&dclib..mpe_lockanytable
605 )
606 %mp_lockanytable(UNLOCK
607 ,lib=%scan(&outds_audit,1,.)
608 ,ds=%scan(&outds_audit,2,.)
609 ,ref=&ETLSOURCE
610 ,ctl_ds=&dclib..mpe_lockanytable
611 )
612 %mp_abort(msg=&msg,mac=bitemporal_dataloader.sas);
613 %end;
614%end;
615
616
617/**
618* extract from BASE table. Only want matching records, as could be very BIG.
619* New records are subsequently identified via left join and test for nulls.
620*/
621%local temp_table temp_table2 base_table baselib_schema;
622%put DCNOTE: Extracting matching observations from &base_lib..&base_dsn;
623
624%if &engine_type=OLEDB %then %do;
625 %let temp_table=##%mf_getuniquefileref(prefix=BTMP);
626 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
627 %let base_table=(select * from [dbo].&base_dsn
628 where convert(datetime,&SQLNOW) < &tech_to );
629 %else %let base_table=[dbo].&base_dsn;
630 proc sql;
631 create table &base_lib.."&temp_table"n as
632 select * from work.bitemp0_append;
633 /* open up a connection for pass through SQL */
634 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
635 create table work.bitemp0_base as select * from connection to myAlias(
636%end;
637%else %if &engine_type=REDSHIFT or &engine_type=POSTGRES %then %do;
638 /* grab schema */
639 %let baselib_schema=%mf_getschema(&base_lib);
640 %if &baselib_schema.X ne X %then %let baselib_schema=&baselib_schema..;
641
642 /* grab redshift config */
643 %local redcnt; %let redcnt=0;
644 %if &engine_type=REDSHIFT %then %do;
645 data _null_;
646 set &config_table(where=(var_scope='DCBL_REDSH' and var_active=1));
647 x+1;
648 call symputx(cats('rednm',x),var_value,'l');
649 call symputx(cats('redval',x),var_value,'l');
650 call symputx('redcnt',x,'l');
651 run;
652 %end;
653 /* cannot persist temp tables so must create a temporary permanent table */
654 %let temp_table=%mf_getuniquename(prefix=XDCTEMP);
655 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
656 %let base_table=(select * from &baselib_schema.&base_dsn
657 where timestamp &sqlnow < &tech_to );
658 %else %let base_table=&baselib_schema.&base_dsn;
659 /* make empty table first - must clone & drop extra cols as autoload is bad */
660 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
661
662 exec (create table &temp_table (like &baselib_schema.&base_dsn)) by myAlias;
663 %if &engine_type=REDSHIFT %then %do;
664 exec (alter table &temp_table alter sortkey none) by myAlias;
665 %end;
666 %local dropcols;
667 %let dropcols=%mf_wordsinstr1butnotstr2(
668 str1=%upcase(%mf_getvarlist(&basecopy))
669 ,str2=%upcase(&pk)
670 );
671 %if %length(&dropcols>0) %then %do idx_pk=1 %to %sysfunc(countw(&dropcols));
672 %put &=dropcols;
673 %let idx_val=%scan(&dropcols,&idx_pk);
674 exec(alter table &temp_table drop column &idx_val;) by myAlias;
675 %end;
676 exec (alter table &temp_table add column &md5_col varchar(32);) by myAlias;
677 /* create view to strip formats and avoid warns in log */
678 data work.vw_bitemp0/view=work.vw_bitemp0;
679 set work.bitemp0_append(keep=&pk &md5_col);
680 format _all_;
681 run;
682 proc append base=&base_lib..&temp_table
683 %if &engine_type=REDSHIFT %then %do;
684 (
685 %do idx_pk=1 %to &redcnt;
686 &&rednm&idx_pk = &&redval&idxpk
687 %end;
688 )
689 %end;
690 data=work.vw_bitemp0 force nowarn;
691 run;
692 /* open up a connection for pass through SQL */
693 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
694 create table work.bitemp0_base as select * from connection to myAlias(
695%end;
696%else %if &engine_type=CAS %then %do;
697 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
698 %let base_table=&base_lib..&base_dsn
699 (where=(&tech_from <=&now and &now < &tech_to));
700 %else %let base_table=&base_lib..&base_dsn;
701 %let temp_table=CASUSER.%mf_getuniquename(prefix=DC);
702 data &temp_table;
703 set work.bitemp0_append;
704 run;
705 %let bitemp0base=CASUSER.%mf_getuniquename(prefix=DC);
706 proc fedsql sessref=dcsession;
707 create table &bitemp0base{options replace=true} as
708%end;
709%else %do;
710 %let temp_table=work.bitemp0_append;
711 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
712 %let base_table=&base_lib..&base_dsn
713 (where=(&tech_from <=&now and &now < &tech_to));
714 %else %let base_table=&base_lib..&base_dsn;
715 proc sql;
716 create table work.bitemp0_base as
717%end;
718
719 select a.&md5_col /* this identifies NEW records */
720 , b.*
721 /* assume first PK field cannot be null (if defined in a PK constraint then
722 it definitely cannot be null) */
723 , case when b.%scan(&pk,1) IS NULL then 1 else 0 end as ___TMP___NEW_FLG
724 from &baselib_schema.&temp_table a
725 left join &base_table b
726 on 1=1
727%do idx_pk=1 %to &pk_cnt;
728 %let idx_val=%scan(&pk,&idx_pk);
729 and a.&idx_val=b.&idx_val
730%end;
731
732
733%if &engine_type=OLEDB or &engine_type=REDSHIFT or &engine_type=POSTGRES
734%then %do;
735 ); proc sql; drop table &base_lib.."&temp_table"n;
736%end;
737%else %if &engine_type=CAS %then %do;
738 ;
739 quit;
740 data work.bitemp0_base;
741 set &bitemp0base;
742 run;
743 proc sql;
744 drop table &temp_table;
745 drop table &bitemp0base;
746%end;
747%else %do;
748 ;
749%end;
750
751/**
752* matching & changed records are those without NULL key values
753* &idx_val resolves to rightmost PK value (loop above)
754*/
755%put syscc (line525)=&syscc, sqlrc=&sqlrc;
756%mp_abort(iftrue= (&syscc gt 0 or &sqlrc>0)
757 ,mac=&_program
758 ,msg=%str(syscc=&syscc sqlrc=&sqlrc)
759)
760
761%put hashcols2=&stripcols;
762proc sql;
763create table work.bitemp1_current(drop=___TMP___NEW_FLG) as
764 select *
765 , put(md5(&stripcols),$hex32.) as &md5_col
766 from work.bitemp0_base (drop=&md5_col)
767 where ___TMP___NEW_FLG=0;
768
769/**
770* NEW records were identified in ___TMP___NEW_FLG in bitemp0_base
771*/
772proc sql;
773create table &outds_add
774 (drop=&md5_col
775 %if %mf_existvar(work.bitemp0_base, &delete_col) %then %do;
776 &delete_col
777 %end;
778 )
779 as select a.*
780 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
781 ,&now as &tech_from &tech_from_fmt
782 ,&high_date as &tech_to &tech_to_fmt
783 %end;
784 from work.bitemp0_append a /* STAGING records (mix of existing & new) */
785 , work.bitemp0_base b /* BASE records (contains null values for new) */
786 where a.&md5_col=b.&md5_col /* took staging md5 across in left join */
787 and b.___TMP___NEW_FLG=1; /* NEW records also identified in bitemp0_base */
788
789
790/**
791* identify INSERTS. These are records with the same business key but
792* the bus_from and bus_to value are higher / lower (respectively)
793* such that the existing record needs to be SPLIT to surround the new
794* record.
795* eg: OLD RECORD from=1 to=10
796* NEW RECORD from=5 to=7
797*
798* APPENDED RECORDS:
799* - from=1 to=5
800* - from=5 to=7
801* - from=7 to=10
802*/
803
804/* inserts cannot happen with TXTEMPORAL */
805%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
806 /* IDENTIFY */
807 create table work.bitemp3_inserts as
808 select b.*
809 ,a.&bus_from as ___TMP___from
810 ,a.&bus_to as ___TMP___to
811 from work.bitemp0_append a
812 ,work.bitemp1_current b
813 where a.&bus_from > b.&bus_from
814 and a.&bus_to < b.&bus_to
815 %do idx_pk=1 %to &pk_cnt;
816 %let idx_val=%scan(&pk,&idx_pk);
817 and a.&idx_val=b.&idx_val
818 %end;
819 order by
820 /* compress blanks and then insert commas (as the datetime fields may
821 not be in use) */
822 %sysfunc(tranwrd(%sysfunc(compbl(
823 &pk &bus_from &bus_to &processed
824 )),%str( ), %str(,)))
825 ;
826
827 /* SPLIT */
828 data work.bitemp3a_inserts (drop=___TMP___from ___TMP___retain ___TMP___to) ;
829 set work.bitemp3_inserts;
830 by &pk &bus_from &bus_to &processed;
831 if first.&idx_val then do;
832 ___TMP___retain=&bus_to;
833 &bus_to=___TMP___from;
834 output;
835 &bus_to=___TMP___retain;
836 end;
837 if last.&idx_val then do;
838 &bus_from=___TMP___to;
839 output;
840 end;
841 run;
842%end;
843%else %do;
844 /* TX temporal load */
845 data work.bitemp3a_inserts;
846 set work.bitemp1_current;
847 stop;
848 run;
849%end;
850/* APPEND */
851proc sql;
852create view work.bitemp3a_view as
853 select * from work.bitemp1_current
854 where &md5_col not in (select &md5_col from work.bitemp3a_inserts);
855
856data bitemp3b_newbase;
857 set work.bitemp3a_inserts work.bitemp3a_view;
858run;
859
860/** do not use! this converts short numerics into 8 bytes
861proc sql;
862create table work.bitemp3b_newbase as
863 select * from work.bitemp3a_inserts
864union corr
865 select * from work.bitemp1_current
866 where &md5_col not in (select &md5_col from work.bitemp3a_inserts);
867*/
868
869/**
870* identify CHANGED records from staging.
871* Same business key with different temporal dates or md5 value
872* This table must be overlayed onto / into existing business history
873*/
874proc sql;
875create table work.bitemp4_updated as select distinct a.*
876 from work.bitemp0_append a
877 ,work.bitemp3b_newbase b
878 where 1=1
879 %do idx_pk=1 %to &pk_cnt;
880 %let idx_val=%scan(&pk,&idx_pk);
881 and a.&idx_val=b.&idx_val
882 %end;
883 and ( a.&md5_col ne b.&md5_col
884 %if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
885 OR (a.&bus_from ne b.&bus_from or a.&bus_to ne b.&bus_to)
886 %end;
887 )
888;
889
890/**
891 * This section would have been one simple step with union all
892 * but that converts short numerics into 8 bytes!
893 * so, convoluted alternative to retain the same functionality.
894 */
895
896/* base records */
897create view work.bitemp4_prep1 as
898 select 'BASE' as ___TMP___
899 ,b.*
900 from work.bitemp4_updated a
901 ,work.bitemp3b_newbase b
902 where 1
903 %do idx_pk=1 %to &pk_cnt;
904 %let idx_val=%scan(&pk,&idx_pk);
905 and a.&idx_val=b.&idx_val
906 %end;
907 ;
908/* updated records */
909create view work.bitemp4_prep2 as
910 select 'STAG' as ___TMP___ ,*
911 from work.bitemp4_updated;
912/* ensure we only keep columns that appear in both */
913%local bp1 bp2 bp3 bp4;
914%let bp1=%mf_getvarlist(bitemp4_prep1);
915%let bp2=%mf_getvarlist(bitemp4_prep2);
916%let bp3=%mf_wordsInStr1ButNotStr2(Str1=&bp1,Str2=&bp2);
917%let bp4=%mf_wordsInStr1ButNotStr2(Str1=&bp2,Str2=&bp1);
918data work.bitemp4_prep3/view=bitemp4_prep3;
919 set bitemp4_prep1 bitemp4_prep2;
920%if %length(XX&bp3&bp4)>2 %then %do;
921 drop &bp3 &bp4 ;
922%end;
923run;
924/* remove duplicates */
925proc sql;
926create table work.bitemp4a_allrecs as
927 select distinct *
928 from work.bitemp4_prep3
929 order by
930 /* compress blanks and then insert commas (as the datetime fields
931 may not be in use) */
932 %sysfunc(tranwrd(%sysfunc(compbl(
933 &pk &bus_from &bus_to &processed
934 )),%str( ), %str(,)))
935 ;
936
937%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
938 /* this section aligns the business dates
939 (eg for inserts or overlaps in the range) */
940 data work.bitemp4b_firstpass (drop=___TMP___cond ___TMP___from ___TMP___to );
941 set work.bitemp4a_allrecs;
942 by &pk &bus_from &bus_to &processed;
943 retain ___TMP___cond 'Name of Condition';
944 retain ___TMP___from ___TMP___to 0;
945 ___TMP___md5lag=lag(&md5_col);
946 /* reset retained variables */
947 if first.&idx_val then do;
948 call missing (___TMP___cond, ___TMP___from, ___TMP___to,___TMP___md5lag);
949 end;
950 else do;
951 /* if record is identical, carry forward bus_from (and bus_to if higher)*/
952 if &md5_col=___TMP___md5lag then do;
953 &bus_from=___TMP___from;
954 if &bus_to<___TMP___to then &bus_to=___TMP___to;
955 end;
956 end;
957
958 if ___TMP___='STAG' then do;
959 /* need to carry forward the closing record */
960 ___TMP___cond='Condition 1';
961 end;
962 else if ___TMP___cond='Condition 1' then do;
963 /* else ensure bus_from starts from prior record bus_to */
964 if &md5_col ne ___TMP___md5lag and &bus_from <= ___TMP___to
965 then &bus_from= ___TMP___to;
966 /* new record may replace old record entirely */
967 if &bus_to <= &bus_from then delete;
968 else call missing (___TMP___cond, ___TMP___from, ___TMP___to);
969 end;
970 ___TMP___from=&bus_from;
971 ___TMP___to=&bus_to;
972 run;
973%end;
974%else %do;
975 /* keep staged records only */
976 data work.bitemp4b_firstpass;
977 set work.bitemp4a_allrecs;
978 if ___TMP___='STAG';
979 run;
980%end;
981
982/* next phase is to pass through in reverse - so set up the sort statement */
983%local byvar;
984%do idx_pk=1 %to &pk_cnt;
985 %let byvar=&byvar descending %scan(&pk,&idx_pk);
986%end;
987%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL
988%then %let byvar=&byvar descending &bus_from descending &bus_to;
989/* if matching bus dates supplied, need to ensure we also have a sort
990 between BASE and STAGING tables */
991%let byvar=&byvar descending ___TMP___;
992
993proc sort data=work.bitemp4b_firstpass out=work.bitemp4c_sort ;
994 by &byvar;
995run;
996
997/**
998* Now (in reverse) pass back business start dates
999*/
1000data work.bitemp4d_secondpass;
1001%if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
1002 &tech_from=&now;
1003 &tech_to=&high_date;
1004%end;
1005 set work.bitemp4c_sort ;
1006 by &byvar;
1007 retain ___TMP___cond 'Name of Condition';
1008 retain ___TMP___from ___TMP___to 0;
1009%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
1010/* put / _all_ /;*/
1011 ___TMP___md5lag=lag(&md5_col);
1012 if first.&idx_val then do;
1013 /* reset retained variables */
1014 call missing (___TMP___cond,___TMP___from,___TMP___to,___TMP___md5lag);
1015 end;
1016 else do;
1017 /* if record is identical, carry back bus_to */
1018 if &md5_col=___TMP___md5lag then &bus_to=___TMP___to;
1019 end;
1020
1021 if ___TMP___='STAG' then do;
1022 /* need to carry forward the closing record */
1023 ___TMP___cond='Condition 2';
1024 end;
1025 else if ___TMP___cond='Condition 2' then do;
1026 /* else ensure bus_to stops at subsequent record bus_from */
1027 if &md5_col ne ___TMP___md5lag and &bus_to >= ___TMP___from
1028 then &bus_to= ___TMP___from;
1029 /* new record may replace old record entirely */
1030 if &bus_from >= &bus_to then delete;
1031 if &bus_from=___TMP___from and &bus_to=___TMP___to then delete;
1032 else call missing (___TMP___cond, ___TMP___from, ___TMP___to);
1033 end;
1034 ___TMP___from=&bus_from;
1035 ___TMP___to=&bus_to;
1036
1037%end;
1038run;
1039%put syscc (line600)=&syscc;
1040/**
1041 There may still be some records (eg old business history) which have not
1042 changed.
1043 Need to identify these and remove from the append so they are not updated
1044 unnecessarily. This is done by generating a new md5 (which INCLUDES the
1045 business key) and any matching / identical records are split out (from those
1046 that need to be updated).
1047*/
1048
1049%if &loadtype=BITEMPORAL %then %do;
1050 %let cat_string=catx('|' ,&bus_from,&bus_to);
1051
1052 data work.bitemp5a_lkp (keep=&md5_col)
1053 %if "%substr(&sysver,1,1)" ne "4" & "%substr(&sysver,1,1)" ne "5" %then %do;
1054 /nonote2err
1055 %end;
1056 ;
1057 set work.bitemp0_base;
1058 /* for BITEMPORAL we need to compare business dates also */
1059 &md5_col=put(md5(&cat_string!!'|'!!&stripcols),$hex32.);
1060 run;
1061
1062 data bitemp5b_updates;
1063 set bitemp4d_secondpass;
1064 if _n_=1 then do;
1065 dcl hash md5_lkp(dataset:'bitemp5a_lkp');
1066 md5_lkp.definekey("&md5_col");
1067 md5_lkp.definedone();
1068 end;
1069 /* drop old md5 col as will rebuild with new business dates */
1070 &md5_col=put(md5(&cat_string!!'|'!!&stripcols),$hex32.) ;
1071 if md5_lkp.check()=0 then delete;
1072 run;
1073
1074 proc sql;
1075 /* get min bus from as will update (close out) all records from this point
1076 (for that PK)*/
1077 create table work.bitemp5d_subquery as
1078 select &pk_comma, min(&bus_from)as &bus_from, max(&bus_to) as &bus_to
1079 from work.bitemp5b_updates
1080 group by &pk_comma;
1081 /* index has a huge efficiency impact on upcoming nested subquery */
1082 create index index1 on work.bitemp5d_subquery(&pk_comma,&bus_from, &bus_to);
1083
1084 %let lastds=work.bitemp5b_updates;
1085%end;
1086%else %if &loadtype=TXTEMPORAL or &loadtype=UPDATE %then %do;
1087 proc sql;
1088 create table work.bitemp5d_subquery as
1089 select distinct &pk_comma
1090 from bitemp4d_secondpass;
1091 %let lastds=work.bitemp4d_secondpass;
1092%end;
1093%else %let lastds=work.bitemp4d_secondpass;
1094
1095/* create single append table (an overlapped pre-sert may be classed as
1096 both an update AND a new record). Also create temp views that may be
1097 used for pre-load analysis. */
1098data &outds_mod;
1099 set &lastds(drop=___TMP___: &md5_col);
1100run;
1101
1102data bitemp6_allrecs / view=bitemp6_allrecs;
1103 set &outds_mod /* UPDATED records */
1104 &outds_add /* NEW records */;
1105run;
1106
1107proc sort data=work.bitemp6_allrecs
1108 out=work.bitemp6_unique
1109 noduprec
1110 dupout=work.xx_BADBADBAD;
1111by _all_;
1112run;
1113
1114/* we have all our temp tables now so exit if this is all that is needed */
1115%if &LOADTARGET ne YES %then %return;
1116
1117/* also exit if an err condition exists */
1118
1119%if &syscc>0 %then %do;
1120 %put syscc=&syscc;
1121 %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
1122 ctl_ds=&dclib..mpe_lockanytable
1123 )
1124 %if "&outds_audit" ne "0" %then %do;
1125 %mp_lockanytable(UNLOCK
1126 ,lib=%scan(&outds_audit,1,.)
1127 ,ds=%scan(&outds_audit,2,.)
1128 ,ref=&ETLSOURCE
1129 ,ctl_ds=&dclib..mpe_lockanytable
1130 )
1131 %end;
1132%end;
1133%mp_abort(iftrue= (&syscc>0)
1134 ,mac=&sysmacroname in &_program
1135 ,msg=%str(Bitemporal transform / job aborted due to SYSCC=&SYSCC status)
1136)
1137
1138/* final check - abort if a lock has appeared on the target or audit table */
1139%mp_lockfilecheck(libds=&base_lib..&base_dsn)
1140%if %mf_existds(&outds_audit) %then %do;
1141 %mp_lockfilecheck(libds=&outds_audit)
1142%end;
1143
1144/**
1145* STAGING TABLES PREPARED, ERR CONDITION TESTED FOR.. NOW TO LOAD!!
1146*/
1147
1148/**
1149* First, CLOSE OUT changed records (if not a REPLACE)
1150* Note that SAS does not support ANSI standard for UPDATE with a join condition.
1151* However - this can be worked around using a nested subquery..
1152*/
1153data _null_;
1154 putlog "&sysmacroname: CLOSEOUTS commencing";
1155run;
1156
1157%if %mf_getattrn(&lastds,NLOBS)=0 %then %do;
1158 data _null_;
1159 putlog "&sysmacroname: No closeouts needed";
1160 run;
1161%end;
1162%else %if &engine_type=CAS %then %do;
1163 %mp_abort(iftrue= (&loadtype=BITEMPORAL or &loadtype=TXTEMPORAL)
1164 ,mac=&sysmacroname in &_program
1165 ,msg=%str(&loadtype not yet supported in CAS engine)
1166 )
1167 /* create temp table for deletions */
1168 %local delds;%let delds=%mf_getuniquename(prefix=DC);
1169 data casuser.&delds;
1170 set work.bitemp5d_subquery;
1171 run;
1172 /* delete the records */
1173 proc cas ;
1174 table.deleteRows / table={
1175 caslib="&base_lib",
1176 name="&base_dsn",
1177 where="1=1",
1178 whereTable={caslib='CASUSER',name="&delds"}
1179 };
1180 quit;
1181 /* drop temp table */
1182 proc sql;
1183 drop table CASUSER.&delds;
1184%end;
1185%else %if (&loadtype=BITEMPORAL or &loadtype=TXTEMPORAL or &loadtype=UPDATE)
1186%then %do;
1187 data _null_;
1188 putlog "&sysmacroname: &loadtype operation using &engine_type engine";
1189 run;
1190 %local flexinow;
1191 proc sql;
1192 /* if OLEDB then create a temp table for efficiency */
1193 %local innertable;
1194 %if &engine_type=OLEDB %then %do;
1195 %let innertable=[&temp_table];
1196 %let top_table=[dbo].&base_dsn;
1197 %let flexinow=&SQLNOW;
1198 create table &base_lib.."&temp_table"n as
1199 select * from work.bitemp5d_subquery;
1200 /* open up a connection for pass through SQL */
1201 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
1202 execute(
1203 %end;
1204 %else %if &engine_type=REDSHIFT or &engine_type=POSTGRES %then %do;
1205 %let innertable=%mf_getuniquename(prefix=XDCTEMP);
1206 %let top_table=&baselib_schema.&base_dsn;
1207 %let flexinow=timestamp &SQLNOW;
1208 /* make empty table first - must clone & drop extra cols
1209 as autoload is bad */
1210 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
1211 exec (create table &innertable (like &baselib_schema.&base_dsn)) by myAlias;
1212 %if &engine_type=REDSHIFT %then %do;
1213 exec (alter table &innertable alter sortkey none) by myAlias;
1214 %end;
1215 %let dropcols=%mf_wordsinstr1butnotstr2(
1216 str1=%upcase(%mf_getvarlist(&basecopy))
1217 ,str2=%upcase(%mf_getvarlist(work.bitemp5d_subquery))
1218 );
1219 %if %length(&dropcols>0) %then %do idx_pk=1 %to %sysfunc(countw(&dropcols));
1220 %put &=dropcols;
1221 %let idx_val=%scan(&dropcols,&idx_pk);
1222 exec(alter table &innertable drop column &idx_val;) by myAlias;;
1223 %end;
1224 /* create view to strip formats and avoid warns in log */
1225 data work.vw_bitemp5d/view=work.vw_bitemp5d;
1226 set work.bitemp5d_subquery;
1227 format _all_;
1228 run;
1229 proc append base=&base_lib..&innertable (
1230 %do idx_pk=1 %to &redcnt;
1231 &&rednm&idx_pk = &&redval&idxpk
1232 %end;
1233 )
1234 data=work.vw_bitemp5d force nowarn;
1235 run;
1236 /* open up a connection for pass through SQL */
1237 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
1238 execute(
1239 %end;
1240 %else %do;
1241 %let innertable=bitemp5d_subquery;
1242 %let top_table=&base_lib..&base_dsn;
1243 %let flexinow=&now;
1244 %end;
1245
1246
1247 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
1248 update &top_table set &tech_to=&flexinow
1249 %if %length(&processed)>0 %then %do;
1250 ,&processed=&flexinow
1251 %end;
1252 where &tech_from <= &flexinow and &flexinow < &tech_to and
1253 %end;
1254 %else %if &loadtype=UPDATE %then %do;
1255 /* changed records are deleted then re-appended when doing UPDATEs */
1256 delete from &top_table where
1257 %end;
1258 %else %do;
1259 %put %str(ERR)OR: BUSTEMPORAL NOT YET SUPPORTED;
1260 %let syscc=5;
1261 %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
1262 ctl_ds=&dclib..mpe_lockanytable
1263 )
1264 %mp_lockanytable(UNLOCK
1265 ,lib=%scan(&outds_audit,1,.)
1266 ,ds=%scan(&outds_audit,2,.)
1267 ,ref=&ETLSOURCE
1268 ,ctl_ds=&dclib..mpe_lockanytable
1269 )
1270 %goto end_of_macro;
1271 %end;
1272
1273 /* perform join inside query as per
1274 http://stackoverflow.com/questions/24629793/update-with-a-proc-sql */
1275
1276 exists( select 1 from &baselib_schema.&innertable where
1277
1278 /* loop PK join */
1279 %do idx_pk=1 %to &pk_cnt;
1280 %let idx_val=%scan(&pk,&idx_pk);
1281 &base_dsn..&idx_val=&innertable..&idx_val and
1282 %end;
1283 %if &loadtype=BITEMPORAL %then %do;
1284 &base_dsn..&bus_from >= &innertable..&bus_from
1285 and &base_dsn..&bus_to <= &innertable..&bus_to and
1286 %end;
1287
1288 /* close the statement */
1289
1290 1=1);
1291
1292 %if &engine_type=OLEDB or &engine_type=REDSHIFT or &engine_type=POSTGRES
1293 %then %do;
1294 ) by myAlias;
1295 execute (drop table &baselib_schema.&innertable) by myAlias;
1296 %end;
1297%end;
1298quit;
1299data _null_;
1300 putlog "&sysmacroname: Closeout complete";
1301run;
1302/**
1303 * Append the new / updated records
1304 */
1305%if &engine_type=CAS %then %do;
1306
1307 /* get varchar variables ready for casting */
1308 %local vcfmt vcrename vcassign vcdrop;
1309 data _null_;
1310 set work.bitemp_cols(where=(type=6)) end=last;
1311 length vcrename vcassign vcdrop vcfmt $32767 rancol $32;
1312 retain vcrename vcassign vcdrop vcfmt;
1313 if _n_=1 then vcrename='(rename=(';
1314 rancol=resolve('%mf_getuniquename()');
1315 vcfmt=trim(vcfmt)!!'length '!!cats(name)!!' varchar(*);';
1316 vcrename=trim(vcrename)!!' '!!cats(name,'=',rancol);
1317 vcassign=cats(vcassign,name,'=',rancol,';');
1318 vcdrop=cats(vcdrop,'drop '!!rancol,';');
1319 if last then do;
1320 vcrename=cats(vcrename,'))');
1321 call symputx('vcfmt',vcfmt);
1322 call symputx('vcrename',vcrename);
1323 call symputx('vcassign',vcassign);
1324 call symputx('vcdrop',vcdrop);
1325 end;
1326 run;
1327
1328 /* prepare a temp cas table with varchars casted */
1329 %let tmp=%mf_getuniquename();
1330 data casuser.&tmp ;
1331 &vcfmt
1332 set work.bitemp6_unique &vcrename;
1333 &vcassign
1334 &vcdrop
1335 run;
1336
1337 /* load the table with varchars applied*/
1338 data &base_lib..&base_dsn (append=yes )/sessref=dcsession ;
1339 set casuser.&tmp;
1340 run;
1341
1342 /* drop temp table */
1343 proc sql;
1344 drop table CASUSER.&tmp;
1345
1346 /* this code will not work as regular tables do not have varchars */
1347 /*
1348 proc casutil;
1349 load data=work.bitemp6_unique
1350 outcaslib="&base_lib" casout="&base_dsn" append ;
1351 quit;
1352 */
1353%end;
1354%else %if &engine_type=REDSHIFT or &engine_type=POSTGRES %then %do;
1355 proc append base=&base_lib..&base_dsn
1356 %if &engine_type=REDSHIFT %then %do;
1357 (
1358 %do idx_pk=1 %to &redcnt;
1359 &&rednm&idx_pk = &&redval&idxpk
1360 %end;
1361 )
1362 %end;
1363 data=bitemp6_unique force nowarn;
1364 run;
1365%end;
1366%else %do;
1367 proc append base=&base_lib..&base_dsn data=bitemp6_unique force nowarn; run;
1368%end;
1369
1370%mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
1371 ctl_ds=&dclib..mpe_lockanytable
1372)
1373
1374/* final check on syscc */
1375%mp_abort(iftrue= (&syscc >4)
1376 ,mac=&_program
1377 ,msg=%str(!!Upload NOT successful!! Failed on actual update / append stage..)
1378)
1379
1380%if &outds_audit ne 0 and &LOADTARGET=YES %then %do;
1381 data work.vw_outds_orig /view=work.vw_outds_orig;
1382 set work.bitemp0_base (drop=&md5_col);
1383 where ___TMP___NEW_FLG=0;
1384 drop ___TMP___NEW_FLG;
1385 run;
1386 /* update the AUDIT table */
1387 %if %mf_existds(&outds_audit) %then %do;
1388 options mprint;
1389 %mp_storediffs(&base_lib..&base_dsn
1390 ,work.vw_outds_orig
1391 ,&pk &bus_from
1392 ,delds=&outds_del
1393 ,modds=&outds_mod
1394 ,appds=&outds_add
1395 ,outds=work.mp_storediffs
1396 ,processed_dttm=&now
1397 ,loadref=%superq(etlsource)
1398 )
1399 /* exclude unchanged values in modified rows */
1400 data work.mp_storediffs;
1401 set work.mp_storediffs;
1402 if MOVE_TYPE="M" and IS_PK=0 and IS_DIFF=0 then delete;
1403 * putlog load_ref= libref= dsn= key_hash= tgtvar_nm=;
1404 run;
1405 proc append base=&outds_audit data=work.mp_storediffs;
1406 run;
1407 %mp_lockanytable(UNLOCK
1408 ,lib=%scan(&outds_audit,1,.)
1409 ,ds=%scan(&outds_audit,2,.)
1410 ,ref=&ETLSOURCE
1411 ,ctl_ds=&dclib..mpe_lockanytable
1412 )
1413 %end;
1414%end;
1415%mp_abort(iftrue= (&syscc >4)
1416 ,mac=bitemporal_dataloader
1417 ,msg=%str(Problem in audit stage (&outds_audit))
1418)
1419
1420%let user=%mf_getUser();
1421/**
1422 Notify as appropriate EMAILS DISABLED
1423
1424%sumo_alerts(ALERT_EVENT=UPDATE
1425 , ALERT_TARGET=&base_lib..&base_dsn
1426 , from_user= &user);
1427*/
1428/* monitor BiTemporal usage */
1429%if &log=1 %then %do;
1430 %put syscc=&syscc;
1431 /* do not perform duration calc in pass through */
1432 %local dur;
1433 data _null_;
1434 now=symget('now');
1435 dur=%sysfunc(datetime())-&now;
1436 call symputx('dur',dur,'l');
1437 run;
1438 proc sql;
1439 insert into &dclib..mpe_dataloads
1440 set libref=%upcase("&base_lib")
1441 ,DSN=%upcase("&base_dsn")
1442 ,ETLSOURCE="&ETLSOURCE"
1443 ,LOADTYPE="&loadtype"
1444 ,CHANGED_RECORDS=%mf_getattrn(&lastds,NLOBS)
1445 ,NEW_RECORDS=%mf_getattrn(&outds_add,NLOBS)
1446 ,DELETED_RECORDS=%mf_getattrn(&outds_del,NLOBS)
1447 ,DURATION=&dur
1448 ,MAC_VER="v&ver"
1449 ,user_nm="&user"
1450 ,PROCESSED_DTTM=&now;
1451 quit;
1452 %put syscc=&syscc;
1453%end;
1454%end_of_macro:
1455%mend bitemporal_dataloader;