Loading...
Searching...
No Matches
bitemporal_dataloader.sas
Go to the documentation of this file.
1/**
2 @file
3 @brief Routine supporting multiple load types
4 @details Generic loader for multiple load types (UPDATE, SCD2, BITEMPORAL).
5
6 Handles all elements including metadata validation, PK checking, closeouts,
7 locking, logging, etc.
8
9 The staging table must be prepared with a unique business key. For bitemporal
10 this means a snapshot at both technical AND business time.
11
12ASSUMPTIONS:
13 - Base table has relevant datetime vars: 2xTechnical, 2xBusiness, 1xProcessed
14 - Staging table omits Technical or Processed datetimes (has Business only)
15 - Base table has no column names containing the string "___TMP___"
16 - Base &tech_from variable is not nullable. This should always be the case
17 anyway whenbuilding permanent bitemporal datasets.. But the point is that
18 this field is used to identify new records after the initial left join
19 from staging to base table.
20
21NOTES:
22 - All queries against BiTemporal tables need two filter conditions, as such:
23
24 where &bus_from LE [tstamp] LT &bus_to
25 AND &tx_from LE [tstamp] LT &tx_to
26
27 One cannot use BETWEEN
28 One cannot use &xx_from LE [tstamp] LE &xx_from (equivalent to above).
29 Background: https://stackoverflow.com/questions/20005950
30
31Areas for optimisation
32 - loading temporal history (currently experimental)
33
34 ## Supporting tables
35
36 Supporting tables must exist in the library specified in the `dclib` param.
37
38 ### MPE_DATALOADS
39
40 This table is updated every time a successful load occurs, and includes
41 information such as:
42
43 @li library
44 @li dataset
45 @li message (supplied in the ETLSOURCE param)
46 @li new rows
47 @li deleted rows
48 @li changed rows
49 @li timestamp
50 @li the user making the load
51 @li the version of (this) macro used to make the load
52
53
54 @param [in] APPEND_DSN= (APPENDTABLE) Name of STAGING table
55 @param [in] CONFIG_TABLE= (&dclib..MPE_CONFIG) The table containing library
56 engine specific config. The following scopes are supported:
57 @li DCBL_REDSH
58 @param [in] LOADTYPE= (BITEMPORAL) Supported types:
59 @li TXTEMPORAL - loads a buskey with version times
60 @li BUSTEMPORAL - loads buskey with bus + ver times
61 @li UPDATE - updates a buskey with NO history
62 @param [in] PROCESSED= (0) This column obtains a current timestamp for changed
63 records when loading the target table. Default is 0 (not set). If the
64 target table contains a variable called PROCESSED_DTTM, and processed=0,
65 then this column will be used for applying the current timestamp.
66 @param RK_MAXKEYTABLE= (mpe_maxkeyvalues) The maxkeytable to use (must exist
67 in DCLIB)
68 @param [in] PK= Business key, space separated. Should NOT include temporal
69 fields.
70 @param [in] RK_UNDERLYING= If supplied will generate an RK based on these
71 (space separated) business key fields. In this case only ONE PK field should
72 be supplied, which is assumed to be the RK. The RK field, plus underlying
73 fields, should all exist on the base table. The underlying fields should
74 exist on the staging table (the RK / PK field will be overwritten).
75 The staging table should also be unique on its PK.
76
77 @param [in] dclib= (&dc_libref) The library containing DC configuration tables
78 @param [out] outds_del= (work.outds_del) Output table containing
79 deleted records
80 @param [out] outds_add= (work.outds_add) Output table containing
81 appended records
82 @param [out] outds_mod= (work.outds_mod) Output table containing
83 changed records
84 @param [out] outds_audit= (0) Load detailed changes to an audit table. Uses
85 the mp_storediffs.sas macro. Provide the base table here, to load.
86
87 <h4> Global Variables </h4>
88 The following global macro variables are used. These should be replaced by
89 macro parameters in future releases.
90
91 @li `dc_dttmtfmt`
92
93 <h4> SAS Macros </h4>
94 @li bitemporal_closeouts.sas
95 @li dc_assignlib.sas
96 @li mf_existds.sas
97 @li mf_existvar.sas
98 @li mf_fmtdttm.sas
99 @li mf_getattrn.sas
100 @li mf_getengine.sas
101 @li mf_getschema.sas
102 @li mf_getuniquefileref.sas
103 @li mf_getuniquename.sas
104 @li mf_getuser.sas
105 @li mf_getvarlist.sas
106 @li mf_verifymacvars.sas
107 @li mf_wordsinstr1butnotstr2.sas
108 @li mp_abort.sas
109 @li mp_dropmembers.sas
110 @li mp_lockanytable.sas
111 @li mp_lockfilecheck.sas
112 @li mp_retainedkey.sas
113 @li mp_storediffs.sas
114
115 @version 9.3
116 @author 4GL Apps Ltd.
117 @copyright 4GL Apps Ltd. This code may only be used within Data Controller
118 and may not be re-distributed or re-sold without the express permission of
119 4GL Apps Ltd.
120
121 @warning multitemporal loads (bitemporal for multiple points in business time)
122 are in experimental stage
123
124**/
125
126%macro bitemporal_dataloader(
127 bus_from= /* Business FROM datetime variable. Req'd on
128 STAGING & BASE tables.*/
129 ,bus_to = /* Business TO datetime variable. Req'd on
130 STAGING & BASE tables. */
131 ,bus_from_override= /* Provide a hard coded BUS_FROM datetime value.*/
132 ,bus_to_override= /* provide a hard coded BUS_TO datetime value */
133 ,tech_from= /* Technical FROM datetime variable. Req'd on
134 BASE table only. */
135 ,tech_to = /* Technical TO datetime variable. Req'd on BASE
136 table only. */
137 ,processed= 0
138 ,base_lib=WORK /* Libref of the BASE table. */
139 ,base_dsn=BASETABLE /* Name of BASE table. */
140 ,append_lib=WORK /* Libref of the STAGING table. */
141 ,append_dsn=APPENDTABLE
142 ,high_date='01JAN5999:00:00:00'dt /* High date to close out records */
143 ,PK= name sex
144 ,RK_UNDERLYING=
145 ,KEEPVARS= /* Provides option for removing unwanted vars from append table */
146 ,RK_UPDATE_MAXKEYTABLE=NO /* If switching (or mix matching) with regular
147 SCD2 loader then set this switch to YES to
148 ensure the MAXKEYTABLE is updated with the
149 current maximum RK value for the target table
150 */
151 ,CHECK_UNIQUENESS=YES /* Perform a check of the APPEND table to ensure it is
152 unique on its business key */
153 ,ETLSOURCE=demo /* supply a value ($50.) to show as ETLSOURCE in
154 &dclib..DATALOADS */
155 ,LOADTYPE=BITEMPORAL
156 ,RK_MAXKEYTABLE= mpe_maxkeyvalues
157 ,LOG=1 /* Switch to 0 to prevent records being added to
158 &mpelib..mpe_DATALOADS (ie when testing)*/
159 ,DELETE_COL= _____DELETE__THIS__RECORD_____
160 /* If this variable is found in the append dataset
161 then records are closed out (or deleted) in the
162 append table where that variable= "Yes" */
163 ,LOADTARGET=YES /* set to anything but uppercase YES to switch off
164 target table load and generate temp tables only */
165 ,CLOSE_VARS=
166/*a problem with regular SCD2 or TXTEMPORAL loads is that there is
167 no facility to close out removed records (all records are
168 assumed new or changed). But how does one determine which
169 records are removed? Short of loading the entire table
170 each time? This parameter allows a set of variables
171 (this should be a subset of the PK) to be declared, and
172 the macro will determine which records in the base table
173 need to be closed out ahead of the load.
174
175 For instance, given the following:
176
177 Base Table Staging Table
178 DATE ENTITY AMOUNT DATE ENTITY AMOUNT
179 JAN ACME4 66 JAN ACME4 66
180 FEB ACME4 99 FEB ACME4 99
181 FEB ACME1 22
182
183 By supplying DATE in CLOSE_VARS and DATE ENTITY as the PK,
184 the "FEB PAG 22" record would get closed out.
185 */
186 ,config_table=&dclib..MPE_CONFIG
187 ,dclib=&dc_libref
188 ,outds_del=work.outds_del
189 ,outds_add=work.outds_add
190 ,outds_mod=work.outds_mod
191 ,outds_audit=0
192 );
193
194/* when changing this macro, update the version num here */
195%local ver;
196%let ver=32;
197%put &sysmacroname entry vars:;
198%put _local_;
199
200%dc_assignlib(WRITE,&base_lib) /* may not already be assigned */
201
202/* return straight away if nothing to load */
203%let nobs= %mf_getattrn(&append_lib..&append_dsn,NLOBS);
204%if &nobs=-1 %then %do;
205 proc sql noprint; select count(*) into: nobs from &append_lib..&append_dsn;
206%end;
207%if &nobs=0 %then %do;
208 %put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
209 %put NOTE- Base dataset &append_lib..&append_dsn is empty. Nothing to upload!;
210 %put NOTE-;%put NOTE-;%put NOTE-;
211 %return;
212%end;
213
214/* hard exit if err condition exists */
215%mp_abort(iftrue= (&syscc > 0)
216 ,mac=bitemporal_dataloader
217 ,msg=%str(Bitemporal transform / job aborted due to SYSCC=&SYSCC status;)
218)
219
220%local engine_type;
221%let engine_type=%mf_getengine(&base_lib);
222%if %length(&CLOSE_VARS)>0 and (&engine_type=REDSHIFT or &engine_type=POSTGRES
223or &engine_type=SNOW or &engine_type=SASIOSNF)
224%then %do;
225 %put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
226 %put NOTE- CLOSE_VARS functionality not yet supported in &engine_type;
227 %put NOTE-;%put NOTE-;%put NOTE-;
228 %return;
229%end;
230
231/**
232 * The metadata functions (eg mf_existvar) will fail if the base table has a
233 * SAS lock. So, make a snapshot of the base table for further use.
234 * Also, make output tables (regardless).
235 */
236%local basecopy;
237%let basecopy=%mf_getuniquename(prefix=basecopy);
238
239data &basecopy &outds_mod &outds_add &outds_del;
240 set &base_lib..&base_dsn;
241 stop;
242run;
243%mp_abort(iftrue= (&syscc > 0)
244 ,mac=&_program
245 ,msg=%str(syscc=&syscc after base table copy - aborting due to table lock)
246)
247
248
249%local cols idx_pk md5_col ;
250%let md5_col=___TMP___md5;
251%let check_uniqueness=%upcase(&check_uniqueness);
252%let RK_UPDATE_MAXKEYTABLE=%upcase(&RK_UPDATE_MAXKEYTABLE);
253%let high_date=%unquote(&high_date);
254%let loadtype=%upcase(&loadtype);
255
256/* ensure irrelevant variables are cleared */
257%if &loadtype=BUSTEMPORAL %then %do;
258 %let tech_from=;
259 %let tech_to=;
260%end;
261%else %if &loadtype=TXTEMPORAL or &loadtype=UPDATE %then %do;
262 %let bus_from=;
263 %let bus_to=;
264%end;
265
266/* ensure relevant variables are supplied */
267%mp_abort(iftrue=(&loadtype=BITEMPORAL & %mf_verifymacvars(bus_from bus_to)=0)
268 ,mac=bitemporal_dataloader
269 ,msg=%str(Missing BUS_FROM / BUS_TO)
270)
271%mp_abort(iftrue=(&loadtype=TXTEMPORAL & %mf_verifymacvars(tech_from tech_to)=0)
272 ,mac=bitemporal_dataloader
273 ,msg=%str(Missing TECH_FROM / TECH_TO)
274)
275
276/**
277 * drop any tables (may be defined as views or vice versa preventing overwrite)
278 */
279%mp_dropmembers(append bitemp0_append bitemp_cols)
280
281/* SQL Server requires its own time values */
282/* 9.2 will only give picture format down to seconds. 9.3 allows
283 milliseconds by using lower S and defining the decimal in the format name..*/
284PROC FORMAT;
285 picture MyMSdt other='%0Y-%0m-%0dT%0H:%0M:%0S' (datatype=datetime);
286RUN;
287%local dbnow;
288%let dbnow="%sysfunc(datetime(),%mf_fmtdttm())"dt;
289
290data _null_;
291 /* convert space separated macvar to comma separated for SQL processing */
292 call symputx('PK_COMMA',tranwrd(compbl("&pk"),' ',','),'L');
293 call symputx('PK_CNT',countw("&pk",' '),'L');
294 now=&dbnow;
295 call symputx('NOW',now,'L');
296 call symputx('SQLNOW',cats("'",put(now,MyMSdt.),"'"),'L');
297 length etlsource $100;
298 etlsource=subpad(symget('etlsource'),1,100);
299 call symputx('etlsource',etlsource,'l');
300run;
301
302/**
303 * Even if no PROCESSED var provided, assume that any variable named
304 * PROCESSED_DTTM should be updated
305 */
306%if &processed=0 %then %do;
307 %if %mf_existvar(&basecopy,PROCESSED_DTTM)
308 %then %let processed=PROCESSED_DTTM;
309 %else %let processed=;
310%end;
311
312
313/* extract colnames for md5 creation / change tracking */
314proc contents noprint data=&base_lib..&base_dsn
315 out=work.bitemp_cols (keep=name type length varnum format:);
316run;
317proc sql noprint;
318select name into: cols separated by ','
319 from work.bitemp_cols
320 where upcase(name) not in
321 (%upcase("&bus_from","&bus_to"
322 ,"&tech_from","&tech_to"
323 ,"&processed","&delete_col")) ;
324select case when type in (2,6) then cats('put(md5(trim(',name,')),$hex32.)')
325 /* multiply by 1 to strip precision errors (eg 0 != 0) */
326 /* but ONLY if not missing, else will lose any special missing values */
327 else cats('put(md5(trim(put(ifn(missing('
328 ,name,'),',name,',',name,'*1),binary64.))),$hex32.)') end
329 into: stripcols separated by '||'
330 from work.bitemp_cols
331 where upcase(name) not in
332 (%upcase("&bus_from","&bus_to"
333 ,"&tech_from","&tech_to"
334 ,"&processed","&delete_col")) ;
335
336/* set default formats*/
337%let bus_from_fmt = datetime19.;
338%let bus_to_fmt = datetime19.;
339%let processed_fmt = datetime19.;
340
341%let tech_from_fmt = format=datetime19.;
342%let tech_to_fmt = format=datetime19.;
343
344
345%put &=stripcols;
346%put &=pk;
347
348data _null_;
349 set work.bitemp_cols;
350 if type=2 or type=6 then do;
351 length fmt $49.;
352 if format='' then fmt=cats('$',length,'.');
353 else fmt=cats(format,formatl,'.');
354 end;
355 else do;
356 if format='' then fmt=cats(length,'.');
357 else fmt=cats(format,formatl,'.',formatd);
358 end;
359 if upcase(name)="%upcase(&bus_from)" then
360 call symputx('bus_from_fmt',fmt,'L');
361 else if upcase(name)="%upcase(&bus_to)" then
362 call symputx('bus_to_fmt',fmt,'L');
363 else if upcase(name)="%upcase(&tech_from)" then
364 call symputx('tech_from_fmt',"format="!!fmt,'L');
365 else if upcase(name)="%upcase(&tech_to)" then
366 call symputx('tech_to_fmt',"format="!!fmt,'L');
367 else if upcase(name)="%upcase(&processed)" then
368 call symputx('processed_fmt',fmt,'L');
369run;
370
371%if %index(%quote(&cols),___TMP___) %then %do;
372 %let msg=%str(Table contains a variable name containing "___TMP___".%trim(
373 ) This may conflict with temp variable generation!!);
374 %mp_abort(msg=&msg,mac=bitemporal_dataloader);
375 %let syscc=5;
376 %return;
377%end;
378
379/* if transaction dates appear on the APPEND table, need to remove them */
380%local drop_tx_dates /* used in append table */
381 drop_tx_dates_noobs /* used to take the base table structure */;
382%if %mf_existvar(&append_lib..&append_dsn, &tech_from)
383 %then %let drop_tx_dates=&tech_from;
384%if %mf_existvar(&append_lib..&append_dsn, &tech_to)
385 %then %let drop_tx_dates=&drop_tx_dates &tech_to;
386%if %length(%trim(&drop_tx_dates))>0
387 %then %let drop_tx_dates=(drop=&drop_tx_dates);
388
389%if %mf_existvar(&basecopy, &tech_from)
390 %then %let drop_tx_dates_noobs=&tech_from;
391%if %mf_existvar(&basecopy, &tech_to)
392 %then %let drop_tx_dates_noobs=&drop_tx_dates_noobs &tech_to;
393%if %length(%trim(&drop_tx_dates_noobs))>0
394 %then %let drop_tx_dates_noobs=(drop=&drop_tx_dates_noobs obs=0);
395%else %let drop_tx_dates_noobs=(obs=0);
396
397
398/**
399 * Lock the table. This is necessary as we are doing a two part update (first
400 * closing records then appending new records). It is theoretically possible
401 * that an upload may occur whilst preparing the staging tables. And the
402 * staging tables are about to be prepared..
403 */
404%if &LOADTARGET = YES %then %do;
405 %put locking &base_lib..&base_dsn;
406 %mp_lockanytable(LOCK,
407 lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,ctl_ds=&dclib..mpe_lockanytable
408 )
409 %if "&outds_audit" ne "0" %then %do;
410 %put locking &outds_audit;
411 %mp_lockanytable(LOCK
412 ,lib=%scan(&outds_audit,1,.)
413 ,ds=%scan(&outds_audit,2,.)
414 ,ref=&ETLSOURCE
415 ,ctl_ds=&dclib..mpe_lockanytable
416 )
417 %end;
418%end;
419%else %do;
420 /* not an actual load, so avoid updating the max key table in next step. */
421 %let rk_update_maxkeytable=NO;
422%end;
423
424%if %length(&RK_UNDERLYING)>0 %then %do;
425 %mp_retainedkey(
426 base_lib=&base_lib
427 ,base_dsn=&base_dsn
428 ,append_lib=&append_lib
429 ,append_dsn=&append_dsn
430 ,retained_key=&pk
431 ,business_key=&rk_underlying
432 ,check_uniqueness=&CHECK_UNIQUENESS
433 ,outds=work.append
434 %if &rk_update_maxkeytable=NO %then %do;
435 ,maxkeytable=0
436 %end;
437 %else %do;
438 ,maxkeytable=&dclib..&RK_MAXKEYTABLE
439 %end;
440 ,locktable=&dclib..mpe_lockanytable
441 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
442 ,filter_str=%str( (where=( &now < &tech_to)) )
443 %end;
444 )
445%end;
446%else %do;
447 proc sql;
448 create view work.append as select * from &append_lib..&append_dsn;
449%end;
450/**
451* generate md5 for append table
452*/
453/* it is possible the source dataset has additional (unwanted) columns.
454 Drop if specified; */
455%if %length(&keepvars)>0 %then %do;
456 /* remove tech dates from keepvars as they are generated later */
457 %let keepvars=%sysfunc(tranwrd(%str( &keepvars ),%str( &tech_from ),%str( )));
458 %let keepvars=%sysfunc(tranwrd(%str( &keepvars ),%str( &tech_to ),%str( )));
459 %let keepvars=(keep=&keepvars &bus_from &bus_to &processed &md5_col);
460%end;
461
462/* CAS varchar types cause append issues here, so perform autoconvert
463 by creating empty local table first */
464data;
465 set &base_lib..&base_dsn &drop_tx_dates_noobs;
466run;
467%local emptybasetable; %let emptybasetable=&syslast;
468
469data work.bitemp0_append &keepvars &outds_del(drop=&md5_col )
470 %if "%substr(&sysver,1,1)" ne "4" and "%substr(&sysver,1,1)" ne "5" %then %do;
471 /nonote2err
472 %end;
473 ;
474 /* apply formats for bitemporal vars but not tx dates which are added later */
475 %if %length(&keepvars)>0 and &loadtype=BITEMPORAL %then %do;
476 format &bus_from &bus_from_fmt;
477 format &bus_to &bus_to_fmt;
478 %end;
479 set &emptybasetable /* base table reqd in case append has fewer cols */
480 work.append &drop_tx_dates;
481 %if %length(%str(&bus_from_override))>0 %then %do;
482 &bus_from= %unquote(&bus_from_override) ;
483 %end;
484 %if %length(%str(&bus_to_override))>0 %then %do;
485 &bus_to= %unquote(&bus_to_override) ;
486 %end;
487 length &md5_col $32;
488 &md5_col=put(md5(&stripcols),hex32.);
489 %if %length(&processed)>0 %then %do;
490 format &processed &processed_fmt;
491 &processed=&now;
492 %end;
493
494/**
495 * If a delete column exists then create the delete dataset
496 */
497%if %mf_existvar(&append_lib..&append_dsn, &delete_col) %then %do;
498 drop &delete_col;
499 if upcase(&delete_col) = "YES" then output &outds_del ;
500 else output work.bitemp0_append ;
501 run;
502
503 %if %mf_getattrn(&outds_del,NLOBS)>0 %then %do;
504 %bitemporal_closeouts(
505 tech_from=&tech_from
506 ,tech_to = &tech_to
507 ,base_lib=&base_lib
508 ,base_dsn=&base_dsn
509 ,append_lib=work
510 ,append_dsn=%scan(&outds_del,-1,.)
511 ,PK=&bus_from &pk
512 ,NOW=&dbnow
513 ,loadtarget=&loadtarget
514 ,loadtype=&loadtype
515 ,AUDITFOLDER=&dc_staging_area/&ETLSOURCE
516 )
517 %end;
518%end;
519%else %do;
520 output work.bitemp0_append;
521 run;
522%end;
523
524%mp_abort(iftrue= (&syscc gt 0 at line 494)
525 ,mac=&_program
526 ,msg=%str(syscc=&syscc)
527)
528
529%if %length(&close_vars)>0 %then %do;
530 /**
531 * need to close out records that are not provided
532 */
533 proc sql;
534 create table bitemp1_closevars1 as
535 select distinct a.%mf_getquotedstr(in_str=&pk,dlm=%str(,a.),quote=)
536 from &base_lib..&base_dsn a
537 inner join work.bitemp0_append b
538 on 1=1
539 /* join on closevars key */
540 %do idx_pk=1 %to %sysfunc(countw(&close_vars));
541 %let idx_val=%scan(&close_vars,&idx_pk);
542 and a.&idx_val=b.&idx_val
543 %end;
544 /* filter base on tech dates if necessary */
545 %if &loadtype=TXTEMPORAL %then %do;
546 where a.&tech_from <=&now and &now < a.&tech_to
547 %end;
548 ;
549 create table bitemp1_closevars2 as
550 select distinct a.*
551 from bitemp1_closevars1 a
552 left join work.bitemp0_append b
553 on 1=1
554 /* join on primary key */
555 %do idx_pk=1 %to %sysfunc(countw(&pk));
556 %let idx_val=%scan(&pk,&idx_pk);
557 and a.&idx_val=b.&idx_val
558 %end;
559 /* identify removed records by null value in a field in PK but not close_vars
560 */
561 where b.%scan(
562 %mf_wordsInStr1ButNotStr2(Str1=&pk,Str2=&close_vars),1,%str( )
563 ) IS NULL
564 ;
565
566 %if %mf_getattrn(bitemp1_closevars2,NLOBS)>0 %then %do;
567 %bitemporal_closeouts(
568 tech_from=&tech_from
569 ,tech_to = &tech_to
570 ,base_lib=&base_lib
571 ,base_dsn=&base_dsn
572 ,append_lib=work
573 ,append_dsn=bitemp1_closevars2
574 ,PK=&bus_from &pk
575 ,NOW=&dbnow
576 ,loadtarget=&loadtarget
577 ,loadtype=&loadtype
578 ,AUDITFOLDER=&dc_staging_area/&ETLSOURCE
579 )
580 %end;
581%end;
582
583/* return if nothing to load (was just deletes) */
584%if %mf_getattrn(work.bitemp0_append,NLOBS)=0 %then %do;
585 %put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
586 %put NOTE- No updates - just deletes!;
587 %put NOTE-;%put NOTE-;%put NOTE-;
588%end;
589
590
591/**
592 * If applying manual overrides to business dates, then the input table MUST
593 * be unique on the PK. Check, and if not - abort.
594 */
595%local msg;
596%if %length(&bus_from_override.&bus_to_override)>0 or &CHECK_UNIQUENESS=YES
597%then %do;
598 proc sort data=work.bitemp0_append out=work.bitemp0_check nodupkey;
599 by &pk;
600 run;
601 %if %mf_getattrn(work.bitemp0_check,NLOBS)
602 ne %mf_getattrn(work.bitemp0_append,NLOBS)
603 %then %do;
604 %let msg=INPUT table &append_lib..&append_dsn is not unique on PK (&pk);
605 %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE (&msg),
606 ctl_ds=&dclib..mpe_lockanytable
607 )
608 %mp_lockanytable(UNLOCK
609 ,lib=%scan(&outds_audit,1,.)
610 ,ds=%scan(&outds_audit,2,.)
611 ,ref=&ETLSOURCE
612 ,ctl_ds=&dclib..mpe_lockanytable
613 )
614 %mp_abort(msg=&msg,mac=bitemporal_dataloader.sas);
615 %end;
616%end;
617
618
619/**
620* extract from BASE table. Only want matching records, as could be very BIG.
621* New records are subsequently identified via left join and test for nulls.
622*/
623%local temp_table temp_table2 base_table baselib_schema;
624%put DCNOTE: Extracting matching observations from &base_lib..&base_dsn;
625
626%if &engine_type=OLEDB %then %do;
627 %let temp_table=##%mf_getuniquefileref(prefix=BTMP);
628 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
629 %let base_table=(select * from [dbo].&base_dsn
630 where convert(datetime,&SQLNOW) < &tech_to );
631 %else %let base_table=[dbo].&base_dsn;
632 proc sql;
633 create table &base_lib.."&temp_table"n as
634 select * from work.bitemp0_append;
635 /* open up a connection for pass through SQL */
636 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
637 create table work.bitemp0_base as select * from connection to myAlias(
638%end;
639%else %if &engine_type=REDSHIFT or &engine_type=POSTGRES or &engine_type=SNOW
640or &engine_type=SASIOSNF
641%then %do;
642 /* grab schema */
643 %let baselib_schema=%mf_getschema(&base_lib);
644 %if &baselib_schema.X ne X %then %let baselib_schema=&baselib_schema..;
645
646 /* grab redshift config */
647 %local redcnt; %let redcnt=0;
648 %if &engine_type=REDSHIFT %then %do;
649 data _null_;
650 set &config_table(where=(var_scope='DCBL_REDSH' and var_active=1));
651 x+1;
652 call symputx(cats('rednm',x),var_value,'l');
653 call symputx(cats('redval',x),var_value,'l');
654 call symputx('redcnt',x,'l');
655 run;
656 %end;
657 %let temp_table=%upcase(%mf_getuniquename(prefix=XDCTEMP));
658 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
659 %let base_table=(select * from &baselib_schema.&base_dsn
660 where timestamp &sqlnow < &tech_to );
661 %else %let base_table=&baselib_schema.&base_dsn;
662 /* make in-db empty table with PK + MD5 only */
663 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
664 %if &engine_type=SNOW or &engine_type=SASIOSNF %then %do;
665 exec (create transient table &baselib_schema.&temp_table
666 like &baselib_schema.&base_dsn
667 ) by myAlias;
668 %end;
669 %else %do;
670 /* cannot persist temp tables so must create a temporary permanent table */
671 exec (create table &temp_table (like &baselib_schema.&base_dsn)) by myAlias;
672 %if &engine_type=REDSHIFT %then %do;
673 exec (alter table &temp_table alter sortkey none) by myAlias;
674 %end;
675 %end;
676 %local dropcols;
677 %let dropcols=%mf_wordsinstr1butnotstr2(
678 str1=%upcase(%mf_getvarlist(&basecopy))
679 ,str2=%upcase(&pk)
680 );
681 %if %length(&dropcols>0) %then %do idx_pk=1 %to %sysfunc(countw(&dropcols));
682 %put &=dropcols;
683 %let idx_val=%scan(&dropcols,&idx_pk);
684 exec(alter table &temp_table drop column &idx_val;) by myAlias;
685 %end;
686 exec (alter table &temp_table add column &md5_col varchar(32);) by myAlias;
687 /* create view to strip formats and avoid warns in log */
688 data work.vw_bitemp0/view=work.vw_bitemp0;
689 /* inherit remote length to handle byte expansion */
690 if 0 then set &base_lib..&temp_table(keep=&md5_col);
691 set work.bitemp0_append(keep=&pk &md5_col);
692 format _all_;
693 run;
694
695 proc append base=&base_lib..&temp_table
696 %if &engine_type=REDSHIFT %then %do;
697 (
698 %do idx_pk=1 %to &redcnt;
699 &&rednm&idx_pk = &&redval&idxpk
700 %end;
701 )
702 %end;
703 data=work.vw_bitemp0 force nowarn;
704 run;
705 /* open up a connection for pass through SQL */
706 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
707 create table work.bitemp0_base as select * from connection to myAlias(
708%end;
709%else %if &engine_type=CAS %then %do;
710 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
711 %let base_table=&base_lib..&base_dsn
712 (where=(&tech_from <=&now and &now < &tech_to));
713 %else %let base_table=&base_lib..&base_dsn;
714 %let temp_table=CASUSER.%mf_getuniquename(prefix=DC);
715 data &temp_table;
716 set work.bitemp0_append;
717 run;
718 %let bitemp0base=CASUSER.%mf_getuniquename(prefix=DC);
719 proc fedsql sessref=dcsession;
720 create table &bitemp0base{options replace=true} as
721%end;
722%else %do;
723 %let temp_table=work.bitemp0_append;
724 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
725 %let base_table=&base_lib..&base_dsn
726 (where=(&tech_from <=&now and &now < &tech_to));
727 %else %let base_table=&base_lib..&base_dsn;
728 proc sql;
729 create table work.bitemp0_base as
730%end;
731
732 select a.&md5_col /* this identifies NEW records */
733 , b.*
734 /* assume first PK field cannot be null (if defined in a PK constraint then
735 it definitely cannot be null) */
736 , case when b.%scan(&pk,1) IS NULL then 1 else 0 end as ___TMP___NEW_FLG
737 from &baselib_schema.&temp_table a
738 left join &base_table b
739 on 1=1
740%do idx_pk=1 %to &pk_cnt;
741 %let idx_val=%scan(&pk,&idx_pk);
742 and a.&idx_val=b.&idx_val
743%end;
744
745
746%if &engine_type=OLEDB or &engine_type=REDSHIFT or &engine_type=POSTGRES
747or &engine_type=SNOW or &engine_type=SASIOSNF
748%then %do;
749 ); proc sql; drop table &base_lib.."&temp_table"n;
750%end;
751%else %if &engine_type=CAS %then %do;
752 ;
753 quit;
754 data work.bitemp0_base;
755 set &bitemp0base;
756 run;
757 proc sql;
758 drop table &temp_table;
759 drop table &bitemp0base;
760%end;
761%else %do;
762 ;
763%end;
764
765/**
766* matching & changed records are those without NULL key values
767* &idx_val resolves to rightmost PK value (loop above)
768*/
769%put syscc (line525)=&syscc, sqlrc=&sqlrc;
770%mp_abort(iftrue= (&syscc gt 0 or &sqlrc>0)
771 ,mac=&_program
772 ,msg=%str(syscc=&syscc sqlrc=&sqlrc)
773)
774
775%put hashcols2=&stripcols;
776proc sql;
777create table work.bitemp1_current(drop=___TMP___NEW_FLG) as
778 select *
779 , put(md5(&stripcols),$hex32.) as &md5_col
780 from work.bitemp0_base (drop=&md5_col)
781 where ___TMP___NEW_FLG=0;
782
783/**
784* NEW records were identified in ___TMP___NEW_FLG in bitemp0_base
785*/
786proc sql;
787create table &outds_add
788 (drop=&md5_col
789 %if %mf_existvar(work.bitemp0_base, &delete_col) %then %do;
790 &delete_col
791 %end;
792 )
793 as select a.*
794 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
795 ,&now as &tech_from &tech_from_fmt
796 ,&high_date as &tech_to &tech_to_fmt
797 %end;
798 from work.bitemp0_append a /* STAGING records (mix of existing & new) */
799 , work.bitemp0_base b /* BASE records (contains null values for new) */
800 where a.&md5_col=b.&md5_col /* took staging md5 across in left join */
801 and b.___TMP___NEW_FLG=1; /* NEW records also identified in bitemp0_base */
802
803
804/**
805* identify INSERTS. These are records with the same business key but
806* the bus_from and bus_to value are higher / lower (respectively)
807* such that the existing record needs to be SPLIT to surround the new
808* record.
809* eg: OLD RECORD from=1 to=10
810* NEW RECORD from=5 to=7
811*
812* APPENDED RECORDS:
813* - from=1 to=5
814* - from=5 to=7
815* - from=7 to=10
816*/
817
818/* inserts cannot happen with TXTEMPORAL */
819%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
820 /* IDENTIFY */
821 create table work.bitemp3_inserts as
822 select b.*
823 ,a.&bus_from as ___TMP___from
824 ,a.&bus_to as ___TMP___to
825 from work.bitemp0_append a
826 ,work.bitemp1_current b
827 where a.&bus_from > b.&bus_from
828 and a.&bus_to < b.&bus_to
829 %do idx_pk=1 %to &pk_cnt;
830 %let idx_val=%scan(&pk,&idx_pk);
831 and a.&idx_val=b.&idx_val
832 %end;
833 order by
834 /* compress blanks and then insert commas (as the datetime fields may
835 not be in use) */
836 %sysfunc(tranwrd(%sysfunc(compbl(
837 &pk &bus_from &bus_to &processed
838 )),%str( ), %str(,)))
839 ;
840
841 /* SPLIT */
842 data work.bitemp3a_inserts (drop=___TMP___from ___TMP___retain ___TMP___to) ;
843 set work.bitemp3_inserts;
844 by &pk &bus_from &bus_to &processed;
845 if first.&idx_val then do;
846 ___TMP___retain=&bus_to;
847 &bus_to=___TMP___from;
848 output;
849 &bus_to=___TMP___retain;
850 end;
851 if last.&idx_val then do;
852 &bus_from=___TMP___to;
853 output;
854 end;
855 run;
856%end;
857%else %do;
858 /* TX temporal load */
859 data work.bitemp3a_inserts;
860 set work.bitemp1_current;
861 stop;
862 run;
863%end;
864/* APPEND */
865proc sql;
866create view work.bitemp3a_view as
867 select * from work.bitemp1_current
868 where &md5_col not in (select &md5_col from work.bitemp3a_inserts);
869
870data bitemp3b_newbase;
871 set work.bitemp3a_inserts work.bitemp3a_view;
872run;
873
874/** do not use! this converts short numerics into 8 bytes
875proc sql;
876create table work.bitemp3b_newbase as
877 select * from work.bitemp3a_inserts
878union corr
879 select * from work.bitemp1_current
880 where &md5_col not in (select &md5_col from work.bitemp3a_inserts);
881*/
882
883/**
884* identify CHANGED records from staging.
885* Same business key with different temporal dates or md5 value
886* This table must be overlayed onto / into existing business history
887*/
888proc sql;
889create table work.bitemp4_updated as select distinct a.*
890 from work.bitemp0_append a
891 ,work.bitemp3b_newbase b
892 where 1=1
893 %do idx_pk=1 %to &pk_cnt;
894 %let idx_val=%scan(&pk,&idx_pk);
895 and a.&idx_val=b.&idx_val
896 %end;
897 and ( a.&md5_col ne b.&md5_col
898 %if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
899 OR (a.&bus_from ne b.&bus_from or a.&bus_to ne b.&bus_to)
900 %end;
901 )
902;
903
904/**
905 * This section would have been one simple step with union all
906 * but that converts short numerics into 8 bytes!
907 * so, convoluted alternative to retain the same functionality.
908 */
909
910/* base records */
911create view work.bitemp4_prep1 as
912 select 'BASE' as ___TMP___
913 ,b.*
914 from work.bitemp4_updated a
915 ,work.bitemp3b_newbase b
916 where 1
917 %do idx_pk=1 %to &pk_cnt;
918 %let idx_val=%scan(&pk,&idx_pk);
919 and a.&idx_val=b.&idx_val
920 %end;
921 ;
922/* updated records */
923create view work.bitemp4_prep2 as
924 select 'STAG' as ___TMP___ ,*
925 from work.bitemp4_updated;
926/* ensure we only keep columns that appear in both */
927%local bp1 bp2 bp3 bp4;
928%let bp1=%mf_getvarlist(bitemp4_prep1);
929%let bp2=%mf_getvarlist(bitemp4_prep2);
930%let bp3=%mf_wordsInStr1ButNotStr2(Str1=&bp1,Str2=&bp2);
931%let bp4=%mf_wordsInStr1ButNotStr2(Str1=&bp2,Str2=&bp1);
932data work.bitemp4_prep3/view=bitemp4_prep3;
933 set bitemp4_prep1 bitemp4_prep2;
934%if %length(XX&bp3&bp4)>2 %then %do;
935 drop &bp3 &bp4 ;
936%end;
937run;
938/* remove duplicates */
939proc sql;
940create table work.bitemp4a_allrecs as
941 select distinct *
942 from work.bitemp4_prep3
943 order by
944 /* compress blanks and then insert commas (as the datetime fields
945 may not be in use) */
946 %sysfunc(tranwrd(%sysfunc(compbl(
947 &pk &bus_from &bus_to &processed
948 )),%str( ), %str(,)))
949 ;
950
951%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
952 /* this section aligns the business dates
953 (eg for inserts or overlaps in the range) */
954 data work.bitemp4b_firstpass (drop=___TMP___cond ___TMP___from ___TMP___to );
955 set work.bitemp4a_allrecs;
956 by &pk &bus_from &bus_to &processed;
957 retain ___TMP___cond 'Name of Condition';
958 retain ___TMP___from ___TMP___to 0;
959 ___TMP___md5lag=lag(&md5_col);
960 /* reset retained variables */
961 if first.&idx_val then do;
962 call missing (___TMP___cond, ___TMP___from, ___TMP___to,___TMP___md5lag);
963 end;
964 else do;
965 /* if record is identical, carry forward bus_from (and bus_to if higher)*/
966 if &md5_col=___TMP___md5lag then do;
967 &bus_from=___TMP___from;
968 if &bus_to<___TMP___to then &bus_to=___TMP___to;
969 end;
970 end;
971
972 if ___TMP___='STAG' then do;
973 /* need to carry forward the closing record */
974 ___TMP___cond='Condition 1';
975 end;
976 else if ___TMP___cond='Condition 1' then do;
977 /* else ensure bus_from starts from prior record bus_to */
978 if &md5_col ne ___TMP___md5lag and &bus_from <= ___TMP___to
979 then &bus_from= ___TMP___to;
980 /* new record may replace old record entirely */
981 if &bus_to <= &bus_from then delete;
982 else call missing (___TMP___cond, ___TMP___from, ___TMP___to);
983 end;
984 ___TMP___from=&bus_from;
985 ___TMP___to=&bus_to;
986 run;
987%end;
988%else %do;
989 /* keep staged records only */
990 data work.bitemp4b_firstpass;
991 set work.bitemp4a_allrecs;
992 if ___TMP___='STAG';
993 run;
994%end;
995
996/* next phase is to pass through in reverse - so set up the sort statement */
997%local byvar;
998%do idx_pk=1 %to &pk_cnt;
999 %let byvar=&byvar descending %scan(&pk,&idx_pk);
1000%end;
1001%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL
1002%then %let byvar=&byvar descending &bus_from descending &bus_to;
1003/* if matching bus dates supplied, need to ensure we also have a sort
1004 between BASE and STAGING tables */
1005%let byvar=&byvar descending ___TMP___;
1006
1007proc sort data=work.bitemp4b_firstpass out=work.bitemp4c_sort ;
1008 by &byvar;
1009run;
1010
1011/**
1012* Now (in reverse) pass back business start dates
1013*/
1014data work.bitemp4d_secondpass;
1015%if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
1016 &tech_from=&now;
1017 &tech_to=&high_date;
1018%end;
1019 set work.bitemp4c_sort ;
1020 by &byvar;
1021 retain ___TMP___cond 'Name of Condition';
1022 retain ___TMP___from ___TMP___to 0;
1023%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
1024/* put / _all_ /;*/
1025 ___TMP___md5lag=lag(&md5_col);
1026 if first.&idx_val then do;
1027 /* reset retained variables */
1028 call missing (___TMP___cond,___TMP___from,___TMP___to,___TMP___md5lag);
1029 end;
1030 else do;
1031 /* if record is identical, carry back bus_to */
1032 if &md5_col=___TMP___md5lag then &bus_to=___TMP___to;
1033 end;
1034
1035 if ___TMP___='STAG' then do;
1036 /* need to carry forward the closing record */
1037 ___TMP___cond='Condition 2';
1038 end;
1039 else if ___TMP___cond='Condition 2' then do;
1040 /* else ensure bus_to stops at subsequent record bus_from */
1041 if &md5_col ne ___TMP___md5lag and &bus_to >= ___TMP___from
1042 then &bus_to= ___TMP___from;
1043 /* new record may replace old record entirely */
1044 if &bus_from >= &bus_to then delete;
1045 if &bus_from=___TMP___from and &bus_to=___TMP___to then delete;
1046 else call missing (___TMP___cond, ___TMP___from, ___TMP___to);
1047 end;
1048 ___TMP___from=&bus_from;
1049 ___TMP___to=&bus_to;
1050
1051%end;
1052run;
1053%put syscc (line600)=&syscc;
1054/**
1055 There may still be some records (eg old business history) which have not
1056 changed.
1057 Need to identify these and remove from the append so they are not updated
1058 unnecessarily. This is done by generating a new md5 (which INCLUDES the
1059 business key) and any matching / identical records are split out (from those
1060 that need to be updated).
1061*/
1062
1063%if &loadtype=BITEMPORAL %then %do;
1064 %let cat_string=catx('|' ,&bus_from,&bus_to);
1065
1066 data work.bitemp5a_lkp (keep=&md5_col)
1067 %if "%substr(&sysver,1,1)" ne "4" & "%substr(&sysver,1,1)" ne "5" %then %do;
1068 /nonote2err
1069 %end;
1070 ;
1071 set work.bitemp0_base;
1072 /* for BITEMPORAL we need to compare business dates also */
1073 &md5_col=put(md5(&cat_string!!'|'!!&stripcols),$hex32.);
1074 run;
1075
1076 data bitemp5b_updates;
1077 set bitemp4d_secondpass;
1078 if _n_=1 then do;
1079 dcl hash md5_lkp(dataset:'bitemp5a_lkp');
1080 md5_lkp.definekey("&md5_col");
1081 md5_lkp.definedone();
1082 end;
1083 /* drop old md5 col as will rebuild with new business dates */
1084 &md5_col=put(md5(&cat_string!!'|'!!&stripcols),$hex32.) ;
1085 if md5_lkp.check()=0 then delete;
1086 run;
1087
1088 proc sql;
1089 /* get min bus from as will update (close out) all records from this point
1090 (for that PK)*/
1091 create table work.bitemp5d_subquery as
1092 select &pk_comma, min(&bus_from)as &bus_from, max(&bus_to) as &bus_to
1093 from work.bitemp5b_updates
1094 group by &pk_comma;
1095 /* index has a huge efficiency impact on upcoming nested subquery */
1096 create index index1 on work.bitemp5d_subquery(&pk_comma,&bus_from, &bus_to);
1097
1098 %let lastds=work.bitemp5b_updates;
1099%end;
1100%else %if &loadtype=TXTEMPORAL or &loadtype=UPDATE %then %do;
1101 proc sql;
1102 create table work.bitemp5d_subquery as
1103 select distinct &pk_comma
1104 from bitemp4d_secondpass;
1105 %let lastds=work.bitemp4d_secondpass;
1106%end;
1107%else %let lastds=work.bitemp4d_secondpass;
1108
1109/* create single append table (an overlapped pre-sert may be classed as
1110 both an update AND a new record). Also create temp views that may be
1111 used for pre-load analysis. */
1112data &outds_mod;
1113 set &lastds(drop=___TMP___: &md5_col);
1114run;
1115
1116data bitemp6_allrecs / view=bitemp6_allrecs;
1117 set &outds_mod /* UPDATED records */
1118 &outds_add /* NEW records */;
1119run;
1120
1121proc sort data=work.bitemp6_allrecs
1122 out=work.bitemp6_unique
1123 noduprec
1124 dupout=work.xx_BADBADBAD;
1125by _all_;
1126run;
1127
1128/* we have all our temp tables now so exit if this is all that is needed */
1129%if &LOADTARGET ne YES %then %return;
1130
1131/* also exit if an err condition exists */
1132
1133%if &syscc>0 %then %do;
1134 %put syscc=&syscc;
1135 %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
1136 ctl_ds=&dclib..mpe_lockanytable
1137 )
1138 %if "&outds_audit" ne "0" %then %do;
1139 %mp_lockanytable(UNLOCK
1140 ,lib=%scan(&outds_audit,1,.)
1141 ,ds=%scan(&outds_audit,2,.)
1142 ,ref=&ETLSOURCE
1143 ,ctl_ds=&dclib..mpe_lockanytable
1144 )
1145 %end;
1146%end;
1147%mp_abort(iftrue= (&syscc>0)
1148 ,mac=&sysmacroname in &_program
1149 ,msg=%str(Bitemporal transform / job aborted due to SYSCC=&SYSCC status)
1150)
1151
1152/* final check - abort if a lock has appeared on the target or audit table */
1153%mp_lockfilecheck(libds=&base_lib..&base_dsn)
1154%if %mf_existds(&outds_audit) %then %do;
1155 %mp_lockfilecheck(libds=&outds_audit)
1156%end;
1157
1158/**
1159* STAGING TABLES PREPARED, ERR CONDITION TESTED FOR.. NOW TO LOAD!!
1160*/
1161
1162/**
1163* First, CLOSE OUT changed records (if not a REPLACE)
1164* Note that SAS does not support ANSI standard for UPDATE with a join condition.
1165* However - this can be worked around using a nested subquery..
1166*/
1167data _null_;
1168 putlog "&sysmacroname: CLOSEOUTS commencing";
1169run;
1170
1171%if %mf_getattrn(&lastds,NLOBS)=0 %then %do;
1172 data _null_;
1173 putlog "&sysmacroname: No closeouts needed";
1174 run;
1175%end;
1176%else %if &engine_type=CAS %then %do;
1177 %mp_abort(iftrue= (&loadtype=BITEMPORAL or &loadtype=TXTEMPORAL)
1178 ,mac=&sysmacroname in &_program
1179 ,msg=%str(&loadtype not yet supported in CAS engine)
1180 )
1181 /* create temp table for deletions */
1182 %local delds;%let delds=%mf_getuniquename(prefix=DC);
1183 data casuser.&delds;
1184 set work.bitemp5d_subquery;
1185 run;
1186 /* delete the records */
1187 proc cas ;
1188 table.deleteRows / table={
1189 caslib="&base_lib",
1190 name="&base_dsn",
1191 where="1=1",
1192 whereTable={caslib='CASUSER',name="&delds"}
1193 };
1194 quit;
1195 /* drop temp table */
1196 proc sql;
1197 drop table CASUSER.&delds;
1198%end;
1199%else %if (&loadtype=BITEMPORAL or &loadtype=TXTEMPORAL or &loadtype=UPDATE)
1200%then %do;
1201 data _null_;
1202 putlog "&sysmacroname: &loadtype operation using *&engine_type* engine";
1203 run;
1204 %local flexinow;
1205 proc sql;
1206 /* if OLEDB then create a temp table for efficiency */
1207 %local innertable;
1208 %if &engine_type=OLEDB %then %do;
1209 %let innertable=[&temp_table];
1210 %let top_table=[dbo].&base_dsn;
1211 %let flexinow=&SQLNOW;
1212 create table &base_lib.."&temp_table"n as
1213 select * from work.bitemp5d_subquery;
1214 /* open up a connection for pass through SQL */
1215 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
1216 execute(
1217 %end;
1218 %else %if &engine_type=REDSHIFT or &engine_type=POSTGRES or &engine_type=SNOW
1219 or &engine_type=SASIOSNF
1220 %then %do;
1221 %let innertable=%upcase(%mf_getuniquename(prefix=XDCTEMP));
1222 %let top_table=&baselib_schema.&base_dsn;
1223 %let flexinow=timestamp &SQLNOW;
1224 /* make empty table first - must clone & drop extra cols
1225 as autoload is bad */
1226 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
1227 %if &engine_type=SNOW or &engine_type=SASIOSNF %then %do;
1228 exec (create transient table &baselib_schema.&innertable
1229 like &baselib_schema.&base_dsn
1230 ) by myAlias;
1231 %end;
1232 %else %do;
1233 exec (create table &innertable
1234 (like &baselib_schema.&base_dsn)
1235 ) by myAlias;
1236 %if &engine_type=REDSHIFT %then %do;
1237 exec (alter table &innertable alter sortkey none) by myAlias;
1238 %end;
1239 %end;
1240 %let dropcols=%mf_wordsinstr1butnotstr2(
1241 str1=%upcase(%mf_getvarlist(&basecopy))
1242 ,str2=%upcase(%mf_getvarlist(work.bitemp5d_subquery))
1243 );
1244 %if %length(&dropcols>0) %then %do idx_pk=1 %to %sysfunc(countw(&dropcols));
1245 %put &=dropcols;
1246 %let idx_val=%scan(&dropcols,&idx_pk);
1247 exec(alter table &innertable drop column &idx_val;) by myAlias;;
1248 %end;
1249 /* create view to strip formats and avoid warns in log */
1250 data work.vw_bitemp5d/view=work.vw_bitemp5d;
1251 set work.bitemp5d_subquery;
1252 format _all_;
1253 run;
1254 proc append base=&base_lib..&innertable (
1255 %do idx_pk=1 %to &redcnt;
1256 &&rednm&idx_pk = &&redval&idxpk
1257 %end;
1258 )
1259 data=work.vw_bitemp5d force nowarn;
1260 run;
1261 /* open up a connection for pass through SQL */
1262 %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
1263 execute(
1264 %end;
1265 %else %do;
1266 %put Not using passthrough for *&engine_type* engine;
1267 %let innertable=bitemp5d_subquery;
1268 %let top_table=&base_lib..&base_dsn;
1269 %let flexinow=&now;
1270 %end;
1271
1272
1273 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
1274 update &top_table set &tech_to=&flexinow
1275 %if %length(&processed)>0 %then %do;
1276 ,&processed=&flexinow
1277 %end;
1278 where &tech_from <= &flexinow and &flexinow < &tech_to and
1279 %end;
1280 %else %if &loadtype=UPDATE %then %do;
1281 /* changed records are deleted then re-appended when doing UPDATEs */
1282 delete from &top_table where
1283 %end;
1284 %else %do;
1285 %put %str(ERR)OR: BUSTEMPORAL NOT YET SUPPORTED;
1286 %let syscc=5;
1287 %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
1288 ctl_ds=&dclib..mpe_lockanytable
1289 )
1290 %mp_lockanytable(UNLOCK
1291 ,lib=%scan(&outds_audit,1,.)
1292 ,ds=%scan(&outds_audit,2,.)
1293 ,ref=&ETLSOURCE
1294 ,ctl_ds=&dclib..mpe_lockanytable
1295 )
1296 %goto end_of_macro;
1297 %end;
1298
1299 /* perform join inside query as per
1300 http://stackoverflow.com/questions/24629793/update-with-a-proc-sql */
1301
1302 exists( select 1 from &baselib_schema.&innertable where
1303
1304 /* loop PK join */
1305 %do idx_pk=1 %to &pk_cnt;
1306 %let idx_val=%scan(&pk,&idx_pk);
1307 &base_dsn..&idx_val=&innertable..&idx_val and
1308 %end;
1309 %if &loadtype=BITEMPORAL %then %do;
1310 &base_dsn..&bus_from >= &innertable..&bus_from
1311 and &base_dsn..&bus_to <= &innertable..&bus_to and
1312 %end;
1313
1314 /* close the statement */
1315
1316 1=1);
1317
1318 %if &engine_type=OLEDB or &engine_type=REDSHIFT or &engine_type=POSTGRES
1319 or &engine_type=SNOW or &engine_type=SASIOSNF
1320 %then %do;
1321 ) by myAlias;
1322 execute (drop table &baselib_schema.&innertable) by myAlias;
1323 %end;
1324%end;
1325quit;
1326data _null_;
1327 putlog "&sysmacroname: Closeout complete";
1328run;
1329/**
1330 * Append the new / updated records
1331 */
1332%if &engine_type=CAS %then %do;
1333
1334 /* get varchar variables ready for casting */
1335 %local vcfmt vcrename vcassign vcdrop;
1336 data _null_;
1337 set work.bitemp_cols(where=(type=6)) end=last;
1338 length vcrename vcassign vcdrop vcfmt $32767 rancol $32;
1339 retain vcrename vcassign vcdrop vcfmt;
1340 if _n_=1 then vcrename='(rename=(';
1341 rancol=resolve('%mf_getuniquename()');
1342 vcfmt=trim(vcfmt)!!'length '!!cats(name)!!' varchar(*);';
1343 vcrename=trim(vcrename)!!' '!!cats(name,'=',rancol);
1344 vcassign=cats(vcassign,name,'=',rancol,';');
1345 vcdrop=cats(vcdrop,'drop '!!rancol,';');
1346 if last then do;
1347 vcrename=cats(vcrename,'))');
1348 call symputx('vcfmt',vcfmt);
1349 call symputx('vcrename',vcrename);
1350 call symputx('vcassign',vcassign);
1351 call symputx('vcdrop',vcdrop);
1352 end;
1353 run;
1354
1355 /* prepare a temp cas table with varchars casted */
1356 %let tmp=%mf_getuniquename();
1357 data casuser.&tmp ;
1358 &vcfmt
1359 set work.bitemp6_unique &vcrename;
1360 &vcassign
1361 &vcdrop
1362 run;
1363
1364 /* load the table with varchars applied*/
1365 data &base_lib..&base_dsn (append=yes )/sessref=dcsession ;
1366 set casuser.&tmp;
1367 run;
1368
1369 /* drop temp table */
1370 proc sql;
1371 drop table CASUSER.&tmp;
1372
1373 /* this code will not work as regular tables do not have varchars */
1374 /*
1375 proc casutil;
1376 load data=work.bitemp6_unique
1377 outcaslib="&base_lib" casout="&base_dsn" append ;
1378 quit;
1379 */
1380%end;
1381%else %if &engine_type=REDSHIFT or &engine_type=POSTGRES %then %do;
1382 proc append base=&base_lib..&base_dsn
1383 %if &engine_type=REDSHIFT %then %do;
1384 (
1385 %do idx_pk=1 %to &redcnt;
1386 &&rednm&idx_pk = &&redval&idxpk
1387 %end;
1388 )
1389 %end;
1390 data=bitemp6_unique force nowarn;
1391 run;
1392%end;
1393%else %do;
1394 proc append base=&base_lib..&base_dsn data=bitemp6_unique force nowarn; run;
1395%end;
1396
1397%mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
1398 ctl_ds=&dclib..mpe_lockanytable
1399)
1400
1401/* final check on syscc */
1402%mp_abort(iftrue= (&syscc >4)
1403 ,mac=&_program
1404 ,msg=%str(!!Upload NOT successful!! Failed on actual update / append stage..)
1405)
1406
1407%if &outds_audit ne 0 and &LOADTARGET=YES %then %do;
1408 data work.vw_outds_orig /view=work.vw_outds_orig;
1409 set work.bitemp0_base (drop=&md5_col);
1410 where ___TMP___NEW_FLG=0;
1411 drop ___TMP___NEW_FLG;
1412 run;
1413 /* update the AUDIT table */
1414 %if %mf_existds(&outds_audit) %then %do;
1415 options mprint;
1416 %mp_storediffs(&base_lib..&base_dsn
1417 ,work.vw_outds_orig
1418 ,&pk &bus_from
1419 ,delds=&outds_del
1420 ,modds=&outds_mod
1421 ,appds=&outds_add
1422 ,outds=work.mp_storediffs
1423 ,processed_dttm=&now
1424 ,loadref=%superq(etlsource)
1425 )
1426 /* exclude unchanged values in modified rows */
1427 data work.mp_storediffs;
1428 set work.mp_storediffs;
1429 if MOVE_TYPE="M" and IS_PK=0 and IS_DIFF=0 then delete;
1430 * putlog load_ref= libref= dsn= key_hash= tgtvar_nm=;
1431 run;
1432 proc append base=&outds_audit data=work.mp_storediffs;
1433 run;
1434 %mp_lockanytable(UNLOCK
1435 ,lib=%scan(&outds_audit,1,.)
1436 ,ds=%scan(&outds_audit,2,.)
1437 ,ref=&ETLSOURCE
1438 ,ctl_ds=&dclib..mpe_lockanytable
1439 )
1440 %end;
1441%end;
1442%mp_abort(iftrue= (&syscc >4)
1443 ,mac=bitemporal_dataloader
1444 ,msg=%str(Problem in audit stage (&outds_audit))
1445)
1446
1447%let user=%mf_getUser();
1448/**
1449 Notify as appropriate EMAILS DISABLED
1450
1451%sumo_alerts(ALERT_EVENT=UPDATE
1452 , ALERT_TARGET=&base_lib..&base_dsn
1453 , from_user= &user);
1454*/
1455/* monitor BiTemporal usage */
1456%if &log=1 %then %do;
1457 %put syscc=&syscc;
1458 /* do not perform duration calc in pass through */
1459 %local dur;
1460 data _null_;
1461 now=symget('now');
1462 dur=%sysfunc(datetime())-&now;
1463 call symputx('dur',dur,'l');
1464 run;
1465 proc sql;
1466 insert into &dclib..mpe_dataloads
1467 set libref=%upcase("&base_lib")
1468 ,DSN=%upcase("&base_dsn")
1469 ,ETLSOURCE="&ETLSOURCE"
1470 ,LOADTYPE="&loadtype"
1471 ,CHANGED_RECORDS=%mf_getattrn(&lastds,NLOBS)
1472 ,NEW_RECORDS=%mf_getattrn(&outds_add,NLOBS)
1473 ,DELETED_RECORDS=%mf_getattrn(&outds_del,NLOBS)
1474 ,DURATION=&dur
1475 ,MAC_VER="v&ver"
1476 ,user_nm="&user"
1477 ,PROCESSED_DTTM=&now;
1478 quit;
1479 %put syscc=&syscc;
1480%end;
1481%end_of_macro:
1482%mend bitemporal_dataloader;