bitemporal_dataloader.sas
Go to the documentation of this file.
1 /**
2  @file
3  @brief Routine supporting multiple load types
4  @details Generic loader for multiple load types (UPDATE, SCD2, BITEMPORAL).
5 
6  Handles all elements including metadata validation, PK checking, closeouts,
7  locking, logging, etc.
8 
9  The staging table must be prepared with a unique business key. For bitemporal
10  this means a snapshot at both technical AND business time.
11 
12 ASSUMPTIONS:
13  - Base table has relevant datetime vars: 2xTechnical, 2xBusiness, 1xProcessed
14  - Staging table omits Technical or Processed datetimes (has Business only)
15  - Base table has no column names containing the string "___TMP___"
16  - Base &tech_from variable is not nullable. This should always be the case
17  anyway whenbuilding permanent bitemporal datasets.. But the point is that
18  this field is used to identify new records after the initial left join
19  from staging to base table.
20 
21 NOTES:
22  - All queries against BiTemporal tables need two filter conditions, as such:
23 
24  where &bus_from LE [tstamp] LT &bus_to
25  AND &tx_from LE [tstamp] LT &tx_to
26 
27  One cannot use BETWEEN
28  One cannot use &xx_from LE [tstamp] LE &xx_from (equivalent to above).
29  Background:
30 http://stackoverflow.com/questions/20005950/best-practice-for-scd-date-pairs-closing-opening-timestamps
31 
32 Areas for optimisation
33  - loading temporal history (currently experimental)
34 
35  ## Supporting tables
36 
37  Supporting tables must exist in the library specified in the `dclib` param.
38 
39  ### MPE_DATALOADS
40 
41  This table is updated every time a successful load occurs, and includes
42  information such as:
43 
44  @li library
45  @li dataset
46  @li message (supplied in the ETLSOURCE param)
47  @li new rows
48  @li deleted rows
49  @li changed rows
50  @li timestamp
51  @li the user making the load
52  @li the version of (this) macro used to make the load
53 
54 
55  @param [in] APPEND_DSN= (APPENDTABLE) Name of STAGING table
56  @param [in] CONFIG_TABLE= (&dclib..MPE_CONFIG) The table containing library
57  engine specific config. The following scopes are supported:
58  @li DCBL_REDSH
59  @param [in] LOADTYPE= (BITEMPORAL) Supported types:
60  @li TXTEMPORAL - loads a buskey with version times
61  @li BUSTEMPORAL - loads buskey with bus + ver times
62  @li UPDATE - updates a buskey with NO history
63  @param [in] PROCESSED= (0) This column obtains a current timestamp for changed
64  records when loading the target table. Default is 0 (not set). If the
65  target table contains a variable called PROCESSED_DTTM, and processed=0,
66  then this column will be used for applying the current timestamp.
67  @param RK_MAXKEYTABLE= (mpe_maxkeyvalues) The maxkeytable to use (must exist
68  in DCLIB)
69  @param [in] PK= Business key, space separated. Should NOT include temporal
70  fields.
71  @param [in] RK_UNDERLYING= If supplied will generate an RK based on these
72  (space separated) business key fields. In this case only ONE PK field should
73  be supplied, which is assumed to be the RK. The RK field, plus underlying
74  fields, should all exist on the base table. The underlying fields should
75  exist on the staging table (the RK / PK field will be overwritten).
76  The staging table should also be unique on its PK.
77 
78  @param [in] dclib= (&dc_libref) The library containing DC configuration tables
79  @param [out] outds_del= (work.outds_del) Output table containing
80  deleted records
81  @param [out] outds_add= (work.outds_add) Output table containing
82  appended records
83  @param [out] outds_mod= (work.outds_mod) Output table containing
84  changed records
85  @param [out] outds_audit= (0) Load detailed changes to an audit table. Uses
86  the mp_storediffs.sas macro. Provide the base table here, to load.
87 
88  <h4> Global Variables </h4>
89  The following global macro variables are used. These should be replaced by
90  macro parameters in future releases.
91 
92  @li `dc_dttmtfmt`
93 
94  <h4> SAS Macros </h4>
95  @li bitemporal_closeouts.sas
96  @li dc_assignlib.sas
97  @li mf_existds.sas
98  @li mf_existvar.sas
99  @li mf_fmtdttm.sas
100  @li mf_getattrn.sas
101  @li mf_getengine.sas
102  @li mf_getschema.sas
103  @li mf_getuniquename.sas
104  @li mf_getuser.sas
105  @li mf_getvarlist.sas
106  @li mf_verifymacvars.sas
107  @li mf_wordsinstr1butnotstr2.sas
108  @li mp_abort.sas
109  @li mp_dropmembers.sas
110  @li mp_lockanytable.sas
111  @li mp_lockfilecheck.sas
112  @li mp_retainedkey.sas
113  @li mp_storediffs.sas
114 
115  @version 9.3
116  @author 4GL Apps Ltd.
117  @copyright 4GL Apps Ltd. This code may only be used within Data Controller
118  and may not be re-distributed or re-sold without the express permission of
119  4GL Apps Ltd.
120 
121  @warning multitemporal loads (bitemporal for multiple points in business time)
122  are in experimental stage
123 
124 **/
125 
126 %macro bitemporal_dataloader(
127  bus_from= /* Business FROM datetime variable. Req'd on
128  STAGING & BASE tables.*/
129  ,bus_to = /* Business TO datetime variable. Req'd on
130  STAGING & BASE tables. */
131  ,bus_from_override= /* Provide a hard coded BUS_FROM datetime value.*/
132  ,bus_to_override= /* provide a hard coded BUS_TO datetime value */
133  ,tech_from= /* Technical FROM datetime variable. Req'd on
134  BASE table only. */
135  ,tech_to = /* Technical TO datetime variable. Req'd on BASE
136  table only. */
137  ,processed= 0
138  ,base_lib=WORK /* Libref of the BASE table. */
139  ,base_dsn=BASETABLE /* Name of BASE table. */
140  ,append_lib=WORK /* Libref of the STAGING table. */
141  ,append_dsn=APPENDTABLE
142  ,high_date='01JAN5999:00:00:00'dt /* High date to close out records */
143  ,PK= name sex
144  ,RK_UNDERLYING=
145  ,KEEPVARS= /* Provides option for removing unwanted vars from append table */
146  ,RK_UPDATE_MAXKEYTABLE=NO /* If switching (or mix matching) with regular
147  SCD2 loader then set this switch to YES to
148  ensure the MAXKEYTABLE is updated with the
149  current maximum RK value for the target table
150  */
151  ,CHECK_UNIQUENESS=YES /* Perform a check of the APPEND table to ensure it is
152  unique on its business key */
153  ,ETLSOURCE=demo /* supply a value ($50.) to show as ETLSOURCE in
154  &dclib..DATALOADS */
155  ,LOADTYPE=BITEMPORAL
156  ,RK_MAXKEYTABLE= mpe_maxkeyvalues
157  ,LOG=1 /* Switch to 0 to prevent records being added to
158  &mpelib..mpe_DATALOADS (ie when testing)*/
159  ,DELETE_COL= _____DELETE__THIS__RECORD_____
160  /* If this variable is found in the append dataset
161  then records are closed out (or deleted) in the
162  append table where that variable= "Yes" */
163  ,LOADTARGET=YES /* set to anything but uppercase YES to switch off
164  target table load and generate temp tables only */
165  ,CLOSE_VARS=
166 /*a problem with regular SCD2 or TXTEMPORAL loads is that there is
167  no facility to close out removed records (all records are
168  assumed new or changed). But how does one determine which
169  records are removed? Short of loading the entire table
170  each time? This parameter allows a set of variables
171  (this should be a subset of the PK) to be declared, and
172  the macro will determine which records in the base table
173  need to be closed out ahead of the load.
174 
175  For instance, given the following:
176 
177  Base Table Staging Table
178  DATE ENTITY AMOUNT DATE ENTITY AMOUNT
179  JAN ACME4 66 JAN ACME4 66
180  FEB ACME4 99 FEB ACME4 99
181  FEB ACME1 22
182 
183  By supplying DATE in CLOSE_VARS and DATE ENTITY as the PK,
184  the "FEB PAG 22" record would get closed out.
185  */
186  ,config_table=&dclib..MPE_CONFIG
187  ,dclib=&dc_libref
188  ,outds_del=work.outds_del
189  ,outds_add=work.outds_add
190  ,outds_mod=work.outds_mod
191  ,outds_audit=0
192  );
193 
194 /* when changing this macro, update the version num here */
195 %local ver;
196 %let ver=32;
197 %put &sysmacroname entry vars:;
198 %put _local_;
199 
200 %dc_assignlib(WRITE,&base_lib) /* may not already be assigned */
201 
202 /* return straight away if nothing to load */
203 %let nobs= %mf_getattrn(&append_lib..&append_dsn,NLOBS);
204 %if &nobs=-1 %then %do;
205  proc sql noprint; select count(*) into: nobs from &append_lib..&append_dsn;
206 %end;
207 %if &nobs=0 %then %do;
208  %put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
209  %put NOTE- Base dataset &append_lib..&append_dsn is empty. Nothing to upload!;
210  %put NOTE-;%put NOTE-;%put NOTE-;
211  %return;
212 %end;
213 
214 /* hard exit if err condition exists */
215 %mp_abort(iftrue= (&syscc > 0)
216  ,mac=bitemporal_dataloader
217  ,msg=%str(Bitemporal transform / job aborted due to SYSCC=&SYSCC status;)
218 )
219 
220 %local engine_type;
221 %let engine_type=%mf_getengine(&base_lib);
222 %if (&engine_type=REDSHIFT or &engine_type=POSTGRES) and %length(&CLOSE_VARS)>0
223 %then %do;
224  %put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
225  %put NOTE- CLOSE_VARS functionality not yet supported in &engine_type;
226  %put NOTE-;%put NOTE-;%put NOTE-;
227  %return;
228 %end;
229 
230 /**
231  * The metadata functions (eg mf_existvar) will fail if the base table has a
232  * SAS lock. So, make a snapshot of the base table for further use.
233  * Also, make output tables (regardless).
234  */
235 %local basecopy;
236 %let basecopy=%mf_getuniquename(prefix=basecopy);
237 
238 data &basecopy &outds_mod &outds_add &outds_del;
239  set &base_lib..&base_dsn;
240  stop;
241 run;
242 %mp_abort(iftrue= (&syscc > 0)
243  ,mac=&_program
244  ,msg=%str(syscc=&syscc after base table copy - aborting due to table lock)
245 )
246 
247 
248 %local cols idx_pk md5_col ;
249 %let md5_col=___TMP___md5;
250 %let check_uniqueness=%upcase(&check_uniqueness);
251 %let RK_UPDATE_MAXKEYTABLE=%upcase(&RK_UPDATE_MAXKEYTABLE);
252 %let high_date=%unquote(&high_date);
253 %let loadtype=%upcase(&loadtype);
254 
255 /* ensure irrelevant variables are cleared */
256 %if &loadtype=BUSTEMPORAL %then %do;
257  %let tech_from=;
258  %let tech_to=;
259 %end;
260 %else %if &loadtype=TXTEMPORAL or &loadtype=UPDATE %then %do;
261  %let bus_from=;
262  %let bus_to=;
263 %end;
264 
265 /* ensure relevant variables are supplied */
266 %mp_abort(iftrue=(&loadtype=BITEMPORAL & %mf_verifymacvars(bus_from bus_to)=0)
267  ,mac=bitemporal_dataloader
268  ,msg=%str(Missing BUS_FROM / BUS_TO)
269 )
270 %mp_abort(iftrue=(&loadtype=TXTEMPORAL & %mf_verifymacvars(tech_from tech_to)=0)
271  ,mac=bitemporal_dataloader
272  ,msg=%str(Missing TECH_FROM / TECH_TO)
273 )
274 
275 /**
276  * drop any tables (may be defined as views or vice versa preventing overwrite)
277  */
278 %mp_dropmembers(append bitemp0_append bitemp_cols)
279 
280 /* SQL Server requires its own time values */
281 /* 9.2 will only give picture format down to seconds. 9.3 allows
282  milliseconds by using lower S and defining the decimal in the format name..*/
283 PROC FORMAT;
284  picture MyMSdt other='%0Y-%0m-%0dT%0H:%0M:%0S' (datatype=datetime);
285 RUN;
286 %local dbnow;
287 %let dbnow="%sysfunc(datetime(),%mf_fmtdttm())"dt;
288 
289 data _null_;
290  /* convert space separated macvar to comma separated for SQL processing */
291  call symputx('PK_COMMA',tranwrd(compbl("&pk"),' ',','),'L');
292  call symputx('PK_CNT',countw("&pk",' '),'L');
293  now=&dbnow;
294  call symputx('NOW',now,'L');
295  call symputx('SQLNOW',cats("'",put(now,MyMSdt.),"'"),'L');
296  length etlsource $100;
297  etlsource=subpad(symget('etlsource'),1,100);
298  call symputx('etlsource',etlsource,'l');
299 run;
300 
301 /**
302  * Even if no PROCESSED var provided, assume that any variable named
303  * PROCESSED_DTTM should be updated
304  */
305 %if &processed=0 %then %do;
306  %if %mf_existvar(&basecopy,PROCESSED_DTTM)
307  %then %let processed=PROCESSED_DTTM;
308  %else %let processed=;
309 %end;
310 
311 
312 /* extract colnames for md5 creation / change tracking */
313 proc contents noprint data=&base_lib..&base_dsn
314  out=work.bitemp_cols (keep=name type length varnum format:);
315 run;
316 proc sql noprint;
317 select name into: cols separated by ','
318  from work.bitemp_cols
319  where upcase(name) not in
320  (%upcase("&bus_from","&bus_to"
321  ,"&tech_from","&tech_to"
322  ,"&processed","&delete_col")) ;
323 select case when type in (2,6) then cats('put(md5(trim(',name,')),$hex32.)')
324  /* multiply by 1 to strip precision errors (eg 0 != 0) */
325  /* but ONLY if not missing, else will lose any special missing values */
326  else cats('put(md5(trim(put(ifn(missing('
327  ,name,'),',name,',',name,'*1),binary64.))),$hex32.)') end
328  into: stripcols separated by '||'
329  from work.bitemp_cols
330  where upcase(name) not in
331  (%upcase("&bus_from","&bus_to"
332  ,"&tech_from","&tech_to"
333  ,"&processed","&delete_col")) ;
334 
335 /* set default formats*/
336 %let bus_from_fmt = datetime19.;
337 %let bus_to_fmt = datetime19.;
338 %let processed_fmt = datetime19.;
339 
340 %let tech_from_fmt = format=datetime19.;
341 %let tech_to_fmt = format=datetime19.;
342 
343 
344 %put &=stripcols;
345 %put &=pk;
346 
347 data _null_;
348  set work.bitemp_cols;
349  if type=2 or type=6 then do;
350  length fmt $49.;
351  if format='' then fmt=cats('$',length,'.');
352  else fmt=cats(format,formatl,'.');
353  end;
354  else do;
355  if format='' then fmt=cats(length,'.');
356  else fmt=cats(format,formatl,'.',formatd);
357  end;
358  if upcase(name)="%upcase(&bus_from)" then
359  call symputx('bus_from_fmt',fmt,'L');
360  else if upcase(name)="%upcase(&bus_to)" then
361  call symputx('bus_to_fmt',fmt,'L');
362  else if upcase(name)="%upcase(&tech_from)" then
363  call symputx('tech_from_fmt',"format="!!fmt,'L');
364  else if upcase(name)="%upcase(&tech_to)" then
365  call symputx('tech_to_fmt',"format="!!fmt,'L');
366  else if upcase(name)="%upcase(&processed)" then
367  call symputx('processed_fmt',fmt,'L');
368 run;
369 
370 %if %index(%quote(&cols),___TMP___) %then %do;
371  %let msg=%str(Table contains a variable name containing "___TMP___".%trim(
372  ) This may conflict with temp variable generation!!);
373  %mp_abort(msg=&msg,mac=bitemporal_dataloader);
374  %let syscc=5;
375  %return;
376 %end;
377 
378 /* if transaction dates appear on the APPEND table, need to remove them */
379 %local drop_tx_dates /* used in append table */
380  drop_tx_dates_noobs /* used to take the base table structure */;
381 %if %mf_existvar(&append_lib..&append_dsn, &tech_from)
382  %then %let drop_tx_dates=&tech_from;
383 %if %mf_existvar(&append_lib..&append_dsn, &tech_to)
384  %then %let drop_tx_dates=&drop_tx_dates &tech_to;
385 %if %length(%trim(&drop_tx_dates))>0
386  %then %let drop_tx_dates=(drop=&drop_tx_dates);
387 
388 %if %mf_existvar(&basecopy, &tech_from)
389  %then %let drop_tx_dates_noobs=&tech_from;
390 %if %mf_existvar(&basecopy, &tech_to)
391  %then %let drop_tx_dates_noobs=&drop_tx_dates_noobs &tech_to;
392 %if %length(%trim(&drop_tx_dates_noobs))>0
393  %then %let drop_tx_dates_noobs=(drop=&drop_tx_dates_noobs obs=0);
394 %else %let drop_tx_dates_noobs=(obs=0);
395 
396 
397 /**
398  * Lock the table. This is necessary as we are doing a two part update (first
399  * closing records then appending new records). It is theoretically possible
400  * that an upload may occur whilst preparing the staging tables. And the
401  * staging tables are about to be prepared..
402  */
403 %if &LOADTARGET = YES %then %do;
404  %put locking &base_lib..&base_dsn;
405  %mp_lockanytable(LOCK,
406  lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,ctl_ds=&dclib..mpe_lockanytable
407  )
408  %if "&outds_audit" ne "0" %then %do;
409  %put locking &outds_audit;
410  %mp_lockanytable(LOCK
411  ,lib=%scan(&outds_audit,1,.)
412  ,ds=%scan(&outds_audit,2,.)
413  ,ref=&ETLSOURCE
414  ,ctl_ds=&dclib..mpe_lockanytable
415  )
416  %end;
417 %end;
418 %else %do;
419  /* not an actual load, so avoid updating the max key table in next step. */
420  %let rk_update_maxkeytable=NO;
421 %end;
422 
423 %if %length(&RK_UNDERLYING)>0 %then %do;
424  %mp_retainedkey(
425  base_lib=&base_lib
426  ,base_dsn=&base_dsn
427  ,append_lib=&append_lib
428  ,append_dsn=&append_dsn
429  ,retained_key=&pk
430  ,business_key=&rk_underlying
431  ,check_uniqueness=&CHECK_UNIQUENESS
432  ,outds=work.append
433  %if &rk_update_maxkeytable=NO %then %do;
434  ,maxkeytable=0
435  %end;
436  %else %do;
437  ,maxkeytable=&dclib..&RK_MAXKEYTABLE
438  %end;
439  ,locktable=&dclib..mpe_lockanytable
440  %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
441  ,filter_str=%str( (where=( &now < &tech_to)) )
442  %end;
443  )
444 %end;
445 %else %do;
446  proc sql;
447  create view work.append as select * from &append_lib..&append_dsn;
448 %end;
449 /**
450 * generate md5 for append table
451 */
452 /* it is possible the source dataset has additional (unwanted) columns.
453  Drop if specified; */
454 %if %length(&keepvars)>0 %then %do;
455  /* remove tech dates from keepvars as they are generated later */
456  %let keepvars=%sysfunc(tranwrd(%str( &keepvars ),%str( &tech_from ),%str( )));
457  %let keepvars=%sysfunc(tranwrd(%str( &keepvars ),%str( &tech_to ),%str( )));
458  %let keepvars=(keep=&keepvars &bus_from &bus_to &processed &md5_col);
459 %end;
460 
461 /* CAS varchar types cause append issues here, so perform autoconvert
462  by creating empty local table first */
463 data;
464  set &base_lib..&base_dsn &drop_tx_dates_noobs;
465 run;
466 %local emptybasetable; %let emptybasetable=&syslast;
467 
468 data work.bitemp0_append &keepvars &outds_del(drop=&md5_col )
469  %if "%substr(&sysver,1,1)" ne "4" and "%substr(&sysver,1,1)" ne "5" %then %do;
470  /nonote2err
471  %end;
472  ;
473  /* apply formats for bitemporal vars but not tx dates which are added later */
474  %if %length(&keepvars)>0 and &loadtype=BITEMPORAL %then %do;
475  format &bus_from &bus_from_fmt;
476  format &bus_to &bus_to_fmt;
477  %end;
478  set &emptybasetable /* base table reqd in case append has fewer cols */
479  work.append &drop_tx_dates;
480  %if %length(%str(&bus_from_override))>0 %then %do;
481  &bus_from= %unquote(&bus_from_override) ;
482  %end;
483  %if %length(%str(&bus_to_override))>0 %then %do;
484  &bus_to= %unquote(&bus_to_override) ;
485  %end;
486  length &md5_col $32;
487  &md5_col=put(md5(&stripcols),hex32.);
488  %if %length(&processed)>0 %then %do;
489  format &processed &processed_fmt;
490  &processed=&now;
491  %end;
492 
493 /**
494  * If a delete column exists then create the delete dataset
495  */
496 %if %mf_existvar(&append_lib..&append_dsn, &delete_col) %then %do;
497  drop &delete_col;
498  if upcase(&delete_col) = "YES" then output &outds_del ;
499  else output work.bitemp0_append ;
500  run;
501 
502  %if %mf_getattrn(&outds_del,NLOBS)>0 %then %do;
503  %bitemporal_closeouts(
504  tech_from=&tech_from
505  ,tech_to = &tech_to
506  ,base_lib=&base_lib
507  ,base_dsn=&base_dsn
508  ,append_lib=work
509  ,append_dsn=%scan(&outds_del,-1,.)
510  ,PK=&bus_from &pk
511  ,NOW=&dbnow
512  ,loadtarget=&loadtarget
513  ,loadtype=&loadtype
514  )
515  %end;
516 %end;
517 %else %do;
518  output work.bitemp0_append;
519  run;
520 %end;
521 
522 %mp_abort(iftrue= (&syscc gt 0 at line 494)
523  ,mac=&_program
524  ,msg=%str(syscc=&syscc)
525 )
526 
527 %if %length(&close_vars)>0 %then %do;
528  /**
529  * need to close out records that are not provided
530  */
531  proc sql;
532  create table bitemp1_closevars1 as
533  select distinct a.%mf_getquotedstr(in_str=&pk,dlm=%str(,a.),quote=)
534  from &base_lib..&base_dsn a
535  inner join work.bitemp0_append b
536  on 1=1
537  /* join on closevars key */
538  %do idx_pk=1 %to %sysfunc(countw(&close_vars));
539  %let idx_val=%scan(&close_vars,&idx_pk);
540  and a.&idx_val=b.&idx_val
541  %end;
542  /* filter base on tech dates if necessary */
543  %if &loadtype=TXTEMPORAL %then %do;
544  where a.&tech_from <=&now and &now < a.&tech_to
545  %end;
546  ;
547  create table bitemp1_closevars2 as
548  select distinct a.*
549  from bitemp1_closevars1 a
550  left join work.bitemp0_append b
551  on 1=1
552  /* join on primary key */
553  %do idx_pk=1 %to %sysfunc(countw(&pk));
554  %let idx_val=%scan(&pk,&idx_pk);
555  and a.&idx_val=b.&idx_val
556  %end;
557  /* identify removed records by null value in a field in PK but not close_vars
558  */
559  where b.%scan(
560  %mf_wordsInStr1ButNotStr2(Str1=&pk,Str2=&close_vars),1,%str( )
561  ) IS NULL
562  ;
563 
564  %if %mf_getattrn(bitemp1_closevars2,NLOBS)>0 %then %do;
565  %bitemporal_closeouts(
566  tech_from=&tech_from
567  ,tech_to = &tech_to
568  ,base_lib=&base_lib
569  ,base_dsn=&base_dsn
570  ,append_lib=work
571  ,append_dsn=bitemp1_closevars2
572  ,PK=&bus_from &pk
573  ,NOW=&dbnow
574  ,loadtarget=&loadtarget
575  ,loadtype=&loadtype
576  )
577  %end;
578 %end;
579 
580 /* return if nothing to load (was just deletes) */
581 %if %mf_getattrn(work.bitemp0_append,NLOBS)=0 %then %do;
582  %put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
583  %put NOTE- No updates - just deletes!;
584  %put NOTE-;%put NOTE-;%put NOTE-;
585 %end;
586 
587 
588 /**
589  * If applying manual overrides to business dates, then the input table MUST
590  * be unique on the PK. Check, and if not - abort.
591  */
592 %local msg;
593 %if %length(&bus_from_override.&bus_to_override)>0 or &CHECK_UNIQUENESS=YES
594 %then %do;
595  proc sort data=work.bitemp0_append out=work.bitemp0_check nodupkey;
596  by &pk;
597  run;
598  %if %mf_getattrn(work.bitemp0_check,NLOBS)
599  ne %mf_getattrn(work.bitemp0_append,NLOBS)
600  %then %do;
601  %let msg=INPUT table &append_lib..&append_dsn is not unique on PK (&pk);
602  %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE (&msg),
603  ctl_ds=&dclib..mpe_lockanytable
604  )
605  %mp_lockanytable(UNLOCK
606  ,lib=%scan(&outds_audit,1,.)
607  ,ds=%scan(&outds_audit,2,.)
608  ,ref=&ETLSOURCE
609  ,ctl_ds=&dclib..mpe_lockanytable
610  )
611  %mp_abort(msg=&msg,mac=bitemporal_dataloader.sas);
612  %end;
613 %end;
614 
615 
616 /**
617 * extract from BASE table. Only want matching records, as could be very BIG.
618 * New records are subsequently identified via left join and test for nulls.
619 */
620 %local temp_table temp_table2 base_table baselib_schema;
621 %put DCNOTE: Extracting matching observations from &base_lib..&base_dsn;
622 
623 %if &engine_type=OLEDB %then %do;
624  %let temp_table=##BITEMP_&base_dsn;
625  %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
626  %let base_table=(select * from [dbo].&base_dsn
627  where convert(datetime,&SQLNOW) < &tech_to );
628  %else %let base_table=[dbo].&base_dsn;
629  proc sql;
630  create table &base_lib.."&temp_table"n as
631  select * from work.bitemp0_append;
632  /* open up a connection for pass through SQL */
633  %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
634  create table work.bitemp0_base as select * from connection to myAlias(
635 %end;
636 %else %if &engine_type=REDSHIFT or &engine_type=POSTGRES %then %do;
637  /* grab schema */
638  %let baselib_schema=%mf_getschema(&base_lib);
639  %if &baselib_schema.X ne X %then %let baselib_schema=&baselib_schema..;
640 
641  /* grab redshift config */
642  %local redcnt; %let redcnt=0;
643  %if &engine_type=REDSHIFT %then %do;
644  data _null_;
645  set &config_table(where=(var_scope='DCBL_REDSH' and var_active=1));
646  x+1;
647  call symputx(cats('rednm',x),var_value,'l');
648  call symputx(cats('redval',x),var_value,'l');
649  call symputx('redcnt',x,'l');
650  run;
651  %end;
652  /* cannot persist temp tables so must create a temporary permanent table */
653  %let temp_table=%mf_getuniquename(prefix=XDCTEMP);
654  %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
655  %let base_table=(select * from &baselib_schema.&base_dsn
656  where timestamp &sqlnow < &tech_to );
657  %else %let base_table=&baselib_schema.&base_dsn;
658  /* make empty table first - must clone & drop extra cols as autoload is bad */
659  %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
660 
661  exec (create table &temp_table (like &baselib_schema.&base_dsn)) by myAlias;
662  %if &engine_type=REDSHIFT %then %do;
663  exec (alter table &temp_table alter sortkey none) by myAlias;
664  %end;
665  %local dropcols;
666  %let dropcols=%mf_wordsinstr1butnotstr2(
667  str1=%upcase(%mf_getvarlist(&basecopy))
668  ,str2=%upcase(&pk)
669  );
670  %if %length(&dropcols>0) %then %do idx_pk=1 %to %sysfunc(countw(&dropcols));
671  %put &=dropcols;
672  %let idx_val=%scan(&dropcols,&idx_pk);
673  exec(alter table &temp_table drop column &idx_val;) by myAlias;
674  %end;
675  exec (alter table &temp_table add column &md5_col varchar(32);) by myAlias;
676  /* create view to strip formats and avoid warns in log */
677  data work.vw_bitemp0/view=work.vw_bitemp0;
678  set work.bitemp0_append(keep=&pk &md5_col);
679  format _all_;
680  run;
681  proc append base=&base_lib..&temp_table
682  %if &engine_type=REDSHIFT %then %do;
683  (
684  %do idx_pk=1 %to &redcnt;
685  &&rednm&idx_pk = &&redval&idxpk
686  %end;
687  )
688  %end;
689  data=work.vw_bitemp0 force nowarn;
690  run;
691  /* open up a connection for pass through SQL */
692  %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
693  create table work.bitemp0_base as select * from connection to myAlias(
694 %end;
695 %else %if &engine_type=CAS %then %do;
696  %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
697  %let base_table=&base_lib..&base_dsn
698  (where=(&tech_from <=&now and &now < &tech_to));
699  %else %let base_table=&base_lib..&base_dsn;
700  %let temp_table=CASUSER.%mf_getuniquename(prefix=DC);
701  data &temp_table;
702  set work.bitemp0_append;
703  run;
704  %let bitemp0base=CASUSER.%mf_getuniquename(prefix=DC);
705  proc fedsql sessref=dcsession;
706  create table &bitemp0base{options replace=true} as
707 %end;
708 %else %do;
709  %let temp_table=work.bitemp0_append;
710  %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
711  %let base_table=&base_lib..&base_dsn
712  (where=(&tech_from <=&now and &now < &tech_to));
713  %else %let base_table=&base_lib..&base_dsn;
714  proc sql;
715  create table work.bitemp0_base as
716 %end;
717 
718  select a.&md5_col /* this identifies NEW records */
719  , b.*
720  /* assume first PK field cannot be null (if defined in a PK constraint then
721  it definitely cannot be null) */
722  , case when b.%scan(&pk,1) IS NULL then 1 else 0 end as ___TMP___NEW_FLG
723  from &baselib_schema.&temp_table a
724  left join &base_table b
725  on 1=1
726 %do idx_pk=1 %to &pk_cnt;
727  %let idx_val=%scan(&pk,&idx_pk);
728  and a.&idx_val=b.&idx_val
729 %end;
730 
731 
732 %if &engine_type=OLEDB or &engine_type=REDSHIFT or &engine_type=POSTGRES
733 %then %do;
734  ); proc sql; drop table &base_lib.."&temp_table"n;
735 %end;
736 %else %if &engine_type=CAS %then %do;
737  ;
738  quit;
739  data work.bitemp0_base;
740  set &bitemp0base;
741  run;
742  proc sql;
743  drop table &temp_table;
744  drop table &bitemp0base;
745 %end;
746 %else %do;
747  ;
748 %end;
749 
750 /**
751 * matching & changed records are those without NULL key values
752 * &idx_val resolves to rightmost PK value (loop above)
753 */
754 %put syscc (line525)=&syscc, sqlrc=&sqlrc;
755 %mp_abort(iftrue= (&syscc gt 0 or &sqlrc>0)
756  ,mac=&_program
757  ,msg=%str(syscc=&syscc sqlrc=&sqlrc)
758 )
759 
760 %put hashcols2=&stripcols;
761 proc sql;
762 create table work.bitemp1_current(drop=___TMP___NEW_FLG) as
763  select *
764  , put(md5(&stripcols),$hex32.) as &md5_col
765  from work.bitemp0_base (drop=&md5_col)
766  where ___TMP___NEW_FLG=0;
767 
768 /**
769 * NEW records were identified in ___TMP___NEW_FLG in bitemp0_base
770 */
771 proc sql;
772 create table &outds_add
773  (drop=&md5_col
774  %if %mf_existvar(work.bitemp0_base, &delete_col) %then %do;
775  &delete_col
776  %end;
777  )
778  as select a.*
779  %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
780  ,&now as &tech_from &tech_from_fmt
781  ,&high_date as &tech_to &tech_to_fmt
782  %end;
783  from work.bitemp0_append a /* STAGING records (mix of existing & new) */
784  , work.bitemp0_base b /* BASE records (contains null values for new) */
785  where a.&md5_col=b.&md5_col /* took staging md5 across in left join */
786  and b.___TMP___NEW_FLG=1; /* NEW records also identified in bitemp0_base */
787 
788 
789 /**
790 * identify INSERTS. These are records with the same business key but
791 * the bus_from and bus_to value are higher / lower (respectively)
792 * such that the existing record needs to be SPLIT to surround the new
793 * record.
794 * eg: OLD RECORD from=1 to=10
795 * NEW RECORD from=5 to=7
796 *
797 * APPENDED RECORDS:
798 * - from=1 to=5
799 * - from=5 to=7
800 * - from=7 to=10
801 */
802 
803 /* inserts cannot happen with TXTEMPORAL */
804 %if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
805  /* IDENTIFY */
806  create table work.bitemp3_inserts as
807  select b.*
808  ,a.&bus_from as ___TMP___from
809  ,a.&bus_to as ___TMP___to
810  from work.bitemp0_append a
811  ,work.bitemp1_current b
812  where a.&bus_from > b.&bus_from
813  and a.&bus_to < b.&bus_to
814  %do idx_pk=1 %to &pk_cnt;
815  %let idx_val=%scan(&pk,&idx_pk);
816  and a.&idx_val=b.&idx_val
817  %end;
818  order by
819  /* compress blanks and then insert commas (as the datetime fields may
820  not be in use) */
821  %sysfunc(tranwrd(%sysfunc(compbl(
822  &pk &bus_from &bus_to &processed
823  )),%str( ), %str(,)))
824  ;
825 
826  /* SPLIT */
827  data work.bitemp3a_inserts (drop=___TMP___from ___TMP___retain ___TMP___to) ;
828  set work.bitemp3_inserts;
829  by &pk &bus_from &bus_to &processed;
830  if first.&idx_val then do;
831  ___TMP___retain=&bus_to;
832  &bus_to=___TMP___from;
833  output;
834  &bus_to=___TMP___retain;
835  end;
836  if last.&idx_val then do;
837  &bus_from=___TMP___to;
838  output;
839  end;
840  run;
841 %end;
842 %else %do;
843  /* TX temporal load */
844  data work.bitemp3a_inserts;
845  set work.bitemp1_current;
846  stop;
847  run;
848 %end;
849 /* APPEND */
850 proc sql;
851 create view work.bitemp3a_view as
852  select * from work.bitemp1_current
853  where &md5_col not in (select &md5_col from work.bitemp3a_inserts);
854 
855 data bitemp3b_newbase;
856  set work.bitemp3a_inserts work.bitemp3a_view;
857 run;
858 
859 /** do not use! this converts short numerics into 8 bytes
860 proc sql;
861 create table work.bitemp3b_newbase as
862  select * from work.bitemp3a_inserts
863 union corr
864  select * from work.bitemp1_current
865  where &md5_col not in (select &md5_col from work.bitemp3a_inserts);
866 */
867 
868 /**
869 * identify CHANGED records from staging.
870 * Same business key with different temporal dates or md5 value
871 * This table must be overlayed onto / into existing business history
872 */
873 proc sql;
874 create table work.bitemp4_updated as select distinct a.*
875  from work.bitemp0_append a
876  ,work.bitemp3b_newbase b
877  where 1=1
878  %do idx_pk=1 %to &pk_cnt;
879  %let idx_val=%scan(&pk,&idx_pk);
880  and a.&idx_val=b.&idx_val
881  %end;
882  and ( a.&md5_col ne b.&md5_col
883  %if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
884  OR (a.&bus_from ne b.&bus_from or a.&bus_to ne b.&bus_to)
885  %end;
886  )
887 ;
888 
889 /**
890  * This section would have been one simple step with union all
891  * but that converts short numerics into 8 bytes!
892  * so, convoluted alternative to retain the same functionality.
893  */
894 
895 /* base records */
896 create view work.bitemp4_prep1 as
897  select 'BASE' as ___TMP___
898  ,b.*
899  from work.bitemp4_updated a
900  ,work.bitemp3b_newbase b
901  where 1
902  %do idx_pk=1 %to &pk_cnt;
903  %let idx_val=%scan(&pk,&idx_pk);
904  and a.&idx_val=b.&idx_val
905  %end;
906  ;
907 /* updated records */
908 create view work.bitemp4_prep2 as
909  select 'STAG' as ___TMP___ ,*
910  from work.bitemp4_updated;
911 /* ensure we only keep columns that appear in both */
912 %local bp1 bp2 bp3 bp4;
913 %let bp1=%mf_getvarlist(bitemp4_prep1);
914 %let bp2=%mf_getvarlist(bitemp4_prep2);
915 %let bp3=%mf_wordsInStr1ButNotStr2(Str1=&bp1,Str2=&bp2);
916 %let bp4=%mf_wordsInStr1ButNotStr2(Str1=&bp2,Str2=&bp1);
917 data work.bitemp4_prep3/view=bitemp4_prep3;
918  set bitemp4_prep1 bitemp4_prep2;
919 %if %length(XX&bp3&bp4)>2 %then %do;
920  drop &bp3 &bp4 ;
921 %end;
922 run;
923 /* remove duplicates */
924 proc sql;
925 create table work.bitemp4a_allrecs as
926  select distinct *
927  from work.bitemp4_prep3
928  order by
929  /* compress blanks and then insert commas (as the datetime fields
930  may not be in use) */
931  %sysfunc(tranwrd(%sysfunc(compbl(
932  &pk &bus_from &bus_to &processed
933  )),%str( ), %str(,)))
934  ;
935 
936 %if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
937  /* this section aligns the business dates
938  (eg for inserts or overlaps in the range) */
939  data work.bitemp4b_firstpass (drop=___TMP___cond ___TMP___from ___TMP___to );
940  set work.bitemp4a_allrecs;
941  by &pk &bus_from &bus_to &processed;
942  retain ___TMP___cond 'Name of Condition';
943  retain ___TMP___from ___TMP___to 0;
944  ___TMP___md5lag=lag(&md5_col);
945  /* reset retained variables */
946  if first.&idx_val then do;
947  call missing (___TMP___cond, ___TMP___from, ___TMP___to,___TMP___md5lag);
948  end;
949  else do;
950  /* if record is identical, carry forward bus_from (and bus_to if higher)*/
951  if &md5_col=___TMP___md5lag then do;
952  &bus_from=___TMP___from;
953  if &bus_to<___TMP___to then &bus_to=___TMP___to;
954  end;
955  end;
956 
957  if ___TMP___='STAG' then do;
958  /* need to carry forward the closing record */
959  ___TMP___cond='Condition 1';
960  end;
961  else if ___TMP___cond='Condition 1' then do;
962  /* else ensure bus_from starts from prior record bus_to */
963  if &md5_col ne ___TMP___md5lag and &bus_from <= ___TMP___to
964  then &bus_from= ___TMP___to;
965  /* new record may replace old record entirely */
966  if &bus_to <= &bus_from then delete;
967  else call missing (___TMP___cond, ___TMP___from, ___TMP___to);
968  end;
969  ___TMP___from=&bus_from;
970  ___TMP___to=&bus_to;
971  run;
972 %end;
973 %else %do;
974  /* keep staged records only */
975  data work.bitemp4b_firstpass;
976  set work.bitemp4a_allrecs;
977  if ___TMP___='STAG';
978  run;
979 %end;
980 
981 /* next phase is to pass through in reverse - so set up the sort statement */
982 %local byvar;
983 %do idx_pk=1 %to &pk_cnt;
984  %let byvar=&byvar descending %scan(&pk,&idx_pk);
985 %end;
986 %if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL
987 %then %let byvar=&byvar descending &bus_from descending &bus_to;
988 /* if matching bus dates supplied, need to ensure we also have a sort
989  between BASE and STAGING tables */
990 %let byvar=&byvar descending ___TMP___;
991 
992 proc sort data=work.bitemp4b_firstpass out=work.bitemp4c_sort ;
993  by &byvar;
994 run;
995 
996 /**
997 * Now (in reverse) pass back business start dates
998 */
999 data work.bitemp4d_secondpass;
1000 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
1001  &tech_from=&now;
1002  &tech_to=&high_date;
1003 %end;
1004  set work.bitemp4c_sort ;
1005  by &byvar;
1006  retain ___TMP___cond 'Name of Condition';
1007  retain ___TMP___from ___TMP___to 0;
1008 %if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
1009 /* put / _all_ /;*/
1010  ___TMP___md5lag=lag(&md5_col);
1011  if first.&idx_val then do;
1012  /* reset retained variables */
1013  call missing (___TMP___cond,___TMP___from,___TMP___to,___TMP___md5lag);
1014  end;
1015  else do;
1016  /* if record is identical, carry back bus_to */
1017  if &md5_col=___TMP___md5lag then &bus_to=___TMP___to;
1018  end;
1019 
1020  if ___TMP___='STAG' then do;
1021  /* need to carry forward the closing record */
1022  ___TMP___cond='Condition 2';
1023  end;
1024  else if ___TMP___cond='Condition 2' then do;
1025  /* else ensure bus_to stops at subsequent record bus_from */
1026  if &md5_col ne ___TMP___md5lag and &bus_to >= ___TMP___from
1027  then &bus_to= ___TMP___from;
1028  /* new record may replace old record entirely */
1029  if &bus_from >= &bus_to then delete;
1030  if &bus_from=___TMP___from and &bus_to=___TMP___to then delete;
1031  else call missing (___TMP___cond, ___TMP___from, ___TMP___to);
1032  end;
1033  ___TMP___from=&bus_from;
1034  ___TMP___to=&bus_to;
1035 
1036 %end;
1037 run;
1038 %put syscc (line600)=&syscc;
1039 /**
1040  There may still be some records (eg old business history) which have not
1041  changed.
1042  Need to identify these and remove from the append so they are not updated
1043  unnecessarily. This is done by generating a new md5 (which INCLUDES the
1044  business key) and any matching / identical records are split out (from those
1045  that need to be updated).
1046 */
1047 
1048 %if &loadtype=BITEMPORAL %then %do;
1049  %let cat_string=catx('|' ,&bus_from,&bus_to);
1050 
1051  data bitemp5a_lkp (keep=&md5_col);
1052  set bitemp0_base;
1053  /* for BITEMPORAL we need to compare business dates also */
1054  &md5_col=put(md5(&cat_string!!'|'!!&stripcols),$hex32.);
1055  run;
1056 
1057  data bitemp5b_updates;
1058  set bitemp4d_secondpass;
1059  if _n_=1 then do;
1060  dcl hash md5_lkp(dataset:'bitemp5a_lkp');
1061  md5_lkp.definekey("&md5_col");
1062  md5_lkp.definedone();
1063  end;
1064  /* drop old md5 col as will rebuild with new business dates */
1065  &md5_col=put(md5(&cat_string!!'|'!!&stripcols),$hex32.) ;
1066  if md5_lkp.check()=0 then delete;
1067  run;
1068 
1069  proc sql;
1070  /* get min bus from as will update (close out) all records from this point
1071  (for that PK)*/
1072  create table work.bitemp5d_subquery as
1073  select &pk_comma, min(&bus_from)as &bus_from, max(&bus_to) as &bus_to
1074  from work.bitemp5b_updates
1075  group by &pk_comma;
1076  /* index has a huge efficiency impact on upcoming nested subquery */
1077  create index index1 on work.bitemp5d_subquery(&pk_comma,&bus_from, &bus_to);
1078 
1079  %let lastds=work.bitemp5b_updates;
1080 %end;
1081 %else %if &loadtype=TXTEMPORAL or &loadtype=UPDATE %then %do;
1082  proc sql;
1083  create table work.bitemp5d_subquery as
1084  select distinct &pk_comma
1085  from bitemp4d_secondpass;
1086  %let lastds=work.bitemp4d_secondpass;
1087 %end;
1088 %else %let lastds=work.bitemp4d_secondpass;
1089 
1090 /* create single append table (an overlapped pre-sert may be classed as
1091  both an update AND a new record). Also create temp views that may be
1092  used for pre-load analysis. */
1093 data &outds_mod;
1094  set &lastds(drop=___TMP___: &md5_col);
1095 run;
1096 
1097 data bitemp6_allrecs / view=bitemp6_allrecs;
1098  set &outds_mod /* UPDATED records */
1099  &outds_add /* NEW records */;
1100 run;
1101 
1102 proc sort data=work.bitemp6_allrecs
1103  out=work.bitemp6_unique
1104  noduprec
1105  dupout=work.xx_BADBADBAD;
1106 by _all_;
1107 run;
1108 
1109 /* we have all our temp tables now so exit if this is all that is needed */
1110 %if &LOADTARGET ne YES %then %return;
1111 
1112 /* also exit if an err condition exists */
1113 
1114 %if &syscc>0 %then %do;
1115  %put syscc=&syscc;
1116  %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
1117  ctl_ds=&dclib..mpe_lockanytable
1118  )
1119  %if "&outds_audit" ne "0" %then %do;
1120  %mp_lockanytable(UNLOCK
1121  ,lib=%scan(&outds_audit,1,.)
1122  ,ds=%scan(&outds_audit,2,.)
1123  ,ref=&ETLSOURCE
1124  ,ctl_ds=&dclib..mpe_lockanytable
1125  )
1126  %end;
1127 %end;
1128 %mp_abort(iftrue= (&syscc>0)
1129  ,mac=&sysmacroname in &_program
1130  ,msg=%str(Bitemporal transform / job aborted due to SYSCC=&SYSCC status)
1131 )
1132 
1133 /* final check - abort if a lock has appeared on the target or audit table */
1134 %mp_lockfilecheck(libds=&base_lib..&base_dsn)
1135 %if %mf_existds(&outds_audit) %then %do;
1136  %mp_lockfilecheck(libds=&outds_audit)
1137 %end;
1138 
1139 /**
1140 * STAGING TABLES PREPARED, ERR CONDITION TESTED FOR.. NOW TO LOAD!!
1141 */
1142 
1143 /**
1144 * First, CLOSE OUT changed records (if not a REPLACE)
1145 * Note that SAS does not support ANSI standard for UPDATE with a join condition.
1146 * However - this can be worked around using a nested subquery..
1147 */
1148 data _null_;
1149  putlog "&sysmacroname: CLOSEOUTS commencing";
1150 run;
1151 
1152 %if %mf_getattrn(&lastds,NLOBS)=0 %then %do;
1153  data _null_;
1154  putlog "&sysmacroname: No closeouts needed";
1155  run;
1156 %end;
1157 %else %if &engine_type=CAS %then %do;
1158  %mp_abort(iftrue= (&loadtype=BITEMPORAL or &loadtype=TXTEMPORAL)
1159  ,mac=&sysmacroname in &_program
1160  ,msg=%str(&loadtype not yet supported in CAS engine)
1161  )
1162  /* create temp table for deletions */
1163  %local delds;%let delds=%mf_getuniquename(prefix=DC);
1164  data casuser.&delds;
1165  set work.bitemp5d_subquery;
1166  run;
1167  /* delete the records */
1168  proc cas ;
1169  table.deleteRows / table={
1170  caslib="&base_lib",
1171  name="&base_dsn",
1172  where="1=1",
1173  whereTable={caslib='CASUSER',name="&delds"}
1174  };
1175  quit;
1176  /* drop temp table */
1177  proc sql;
1178  drop table CASUSER.&delds;
1179 %end;
1180 %else %if (&loadtype=BITEMPORAL or &loadtype=TXTEMPORAL or &loadtype=UPDATE)
1181 %then %do;
1182  data _null_;
1183  putlog "&sysmacroname: &loadtype operation using &engine_type engine";
1184  run;
1185  %local flexinow;
1186  proc sql;
1187  /* if OLEDB then create a temp table for efficiency */
1188  %local innertable;
1189  %if &engine_type=OLEDB %then %do;
1190  %let innertable=[##BITEMP_&base_dsn];
1191  %let top_table=[dbo].&base_dsn;
1192  %let flexinow=&SQLNOW;
1193  create table &base_lib.."##BITEMP_&base_dsn"n as
1194  select * from work.bitemp5d_subquery;
1195  /* open up a connection for pass through SQL */
1196  %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
1197  execute(
1198  %end;
1199  %else %if &engine_type=REDSHIFT or &engine_type=POSTGRES %then %do;
1200  %let innertable=%mf_getuniquename(prefix=XDCTEMP);
1201  %let top_table=&baselib_schema.&base_dsn;
1202  %let flexinow=timestamp &SQLNOW;
1203  /* make empty table first - must clone & drop extra cols
1204  as autoload is bad */
1205  %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
1206  exec (create table &innertable (like &baselib_schema.&base_dsn)) by myAlias;
1207  %if &engine_type=REDSHIFT %then %do;
1208  exec (alter table &innertable alter sortkey none) by myAlias;
1209  %end;
1210  %let dropcols=%mf_wordsinstr1butnotstr2(
1211  str1=%upcase(%mf_getvarlist(&basecopy))
1212  ,str2=%upcase(%mf_getvarlist(work.bitemp5d_subquery))
1213  );
1214  %if %length(&dropcols>0) %then %do idx_pk=1 %to %sysfunc(countw(&dropcols));
1215  %put &=dropcols;
1216  %let idx_val=%scan(&dropcols,&idx_pk);
1217  exec(alter table &innertable drop column &idx_val;) by myAlias;;
1218  %end;
1219  /* create view to strip formats and avoid warns in log */
1220  data work.vw_bitemp5d/view=work.vw_bitemp5d;
1221  set work.bitemp5d_subquery;
1222  format _all_;
1223  run;
1224  proc append base=&base_lib..&innertable (
1225  %do idx_pk=1 %to &redcnt;
1226  &&rednm&idx_pk = &&redval&idxpk
1227  %end;
1228  )
1229  data=work.vw_bitemp5d force nowarn;
1230  run;
1231  /* open up a connection for pass through SQL */
1232  %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
1233  execute(
1234  %end;
1235  %else %do;
1236  %let innertable=bitemp5d_subquery;
1237  %let top_table=&base_lib..&base_dsn;
1238  %let flexinow=&now;
1239  %end;
1240 
1241 
1242  %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
1243  update &top_table set &tech_to=&flexinow
1244  %if %length(&processed)>0 %then %do;
1245  ,&processed=&flexinow
1246  %end;
1247  where &tech_from <= &flexinow and &flexinow < &tech_to and
1248  %end;
1249  %else %if &loadtype=UPDATE %then %do;
1250  /* changed records are deleted then re-appended when doing UPDATEs */
1251  delete from &top_table where
1252  %end;
1253  %else %do;
1254  %put %str(ERR)OR: BUSTEMPORAL NOT YET SUPPORTED;
1255  %let syscc=5;
1256  %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
1257  ctl_ds=&dclib..mpe_lockanytable
1258  )
1259  %mp_lockanytable(UNLOCK
1260  ,lib=%scan(&outds_audit,1,.)
1261  ,ds=%scan(&outds_audit,2,.)
1262  ,ref=&ETLSOURCE
1263  ,ctl_ds=&dclib..mpe_lockanytable
1264  )
1265  %goto end_of_macro;
1266  %end;
1267 
1268  /* perform join inside query as per
1269  http://stackoverflow.com/questions/24629793/update-with-a-proc-sql */
1270 
1271  exists( select 1 from &baselib_schema.&innertable where
1272 
1273  /* loop PK join */
1274  %do idx_pk=1 %to &pk_cnt;
1275  %let idx_val=%scan(&pk,&idx_pk);
1276  &base_dsn..&idx_val=&innertable..&idx_val and
1277  %end;
1278  %if &loadtype=BITEMPORAL %then %do;
1279  &base_dsn..&bus_from >= &innertable..&bus_from
1280  and &base_dsn..&bus_to <= &innertable..&bus_to and
1281  %end;
1282 
1283  /* close the statement */
1284 
1285  1=1);
1286 
1287  %if &engine_type=OLEDB or &engine_type=REDSHIFT or &engine_type=POSTGRES
1288  %then %do;
1289  ) by myAlias;
1290  execute (drop table &baselib_schema.&innertable) by myAlias;
1291  %end;
1292 %end;
1293 quit;
1294 data _null_;
1295  putlog "&sysmacroname: Closeout complete";
1296 run;
1297 /**
1298  * Append the new / updated records
1299  */
1300 %if &engine_type=CAS %then %do;
1301 
1302  /* get varchar variables ready for casting */
1303  %local vcfmt vcrename vcassign vcdrop;
1304  data _null_;
1305  set work.bitemp_cols(where=(type=6)) end=last;
1306  length vcrename vcassign vcdrop vcfmt $32767 rancol $32;
1307  retain vcrename vcassign vcdrop vcfmt;
1308  if _n_=1 then vcrename='(rename=(';
1309  rancol=resolve('%mf_getuniquename()');
1310  vcfmt=trim(vcfmt)!!'length '!!cats(name)!!' varchar(*);';
1311  vcrename=trim(vcrename)!!' '!!cats(name,'=',rancol);
1312  vcassign=cats(vcassign,name,'=',rancol,';');
1313  vcdrop=cats(vcdrop,'drop '!!rancol,';');
1314  if last then do;
1315  vcrename=cats(vcrename,'))');
1316  call symputx('vcfmt',vcfmt);
1317  call symputx('vcrename',vcrename);
1318  call symputx('vcassign',vcassign);
1319  call symputx('vcdrop',vcdrop);
1320  end;
1321  run;
1322 
1323  /* prepare a temp cas table with varchars casted */
1324  %let tmp=%mf_getuniquename();
1325  data casuser.&tmp ;
1326  &vcfmt
1327  set work.bitemp6_unique &vcrename;
1328  &vcassign
1329  &vcdrop
1330  run;
1331 
1332  /* load the table with varchars applied*/
1333  data &base_lib..&base_dsn (append=yes )/sessref=dcsession ;
1334  set casuser.&tmp;
1335  run;
1336 
1337  /* drop temp table */
1338  proc sql;
1339  drop table CASUSER.&tmp;
1340 
1341  /* this code will not work as regular tables do not have varchars */
1342  /*
1343  proc casutil;
1344  load data=work.bitemp6_unique
1345  outcaslib="&base_lib" casout="&base_dsn" append ;
1346  quit;
1347  */
1348 %end;
1349 %else %if &engine_type=REDSHIFT or &engine_type=POSTGRES %then %do;
1350  proc append base=&base_lib..&base_dsn
1351  %if &engine_type=REDSHIFT %then %do;
1352  (
1353  %do idx_pk=1 %to &redcnt;
1354  &&rednm&idx_pk = &&redval&idxpk
1355  %end;
1356  )
1357  %end;
1358  data=bitemp6_unique force nowarn;
1359  run;
1360 %end;
1361 %else %do;
1362  proc append base=&base_lib..&base_dsn data=bitemp6_unique force nowarn; run;
1363 %end;
1364 
1365 %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
1366  ctl_ds=&dclib..mpe_lockanytable
1367 )
1368 
1369 /* final check on syscc */
1370 %mp_abort(iftrue= (&syscc >4)
1371  ,mac=&_program
1372  ,msg=%str(!!Upload NOT successful!! Failed on actual update / append stage..)
1373 )
1374 
1375 %if &outds_audit ne 0 and &LOADTARGET=YES %then %do;
1376  data work.vw_outds_orig /view=work.vw_outds_orig;
1377  set work.bitemp0_base (drop=&md5_col);
1378  where ___TMP___NEW_FLG=0;
1379  drop ___TMP___NEW_FLG;
1380  run;
1381  /* update the AUDIT table */
1382  %if %mf_existds(&outds_audit) %then %do;
1383  options mprint;
1384  %mp_storediffs(&base_lib..&base_dsn
1385  ,work.vw_outds_orig
1386  ,&pk &bus_from
1387  ,delds=&outds_del
1388  ,modds=&outds_mod
1389  ,appds=&outds_add
1390  ,outds=work.mp_storediffs
1391  ,processed_dttm=&now
1392  ,loadref=%superq(etlsource)
1393  )
1394  /* exclude unchanged values in modified rows */
1395  data work.mp_storediffs;
1396  set work.mp_storediffs;
1397  if MOVE_TYPE="M" and IS_PK=0 and IS_DIFF=0 then delete;
1398  * putlog load_ref= libref= dsn= key_hash= tgtvar_nm=;
1399  run;
1400  proc append base=&outds_audit data=work.mp_storediffs;
1401  run;
1402  %mp_lockanytable(UNLOCK
1403  ,lib=%scan(&outds_audit,1,.)
1404  ,ds=%scan(&outds_audit,2,.)
1405  ,ref=&ETLSOURCE
1406  ,ctl_ds=&dclib..mpe_lockanytable
1407  )
1408  %end;
1409 %end;
1410 %mp_abort(iftrue= (&syscc >4)
1411  ,mac=bitemporal_dataloader
1412  ,msg=%str(Problem in audit stage (&outds_audit))
1413 )
1414 
1415 %let user=%mf_getUser();
1416 /**
1417  Notify as appropriate EMAILS DISABLED
1418 
1419 %sumo_alerts(ALERT_EVENT=UPDATE
1420  , ALERT_TARGET=&base_lib..&base_dsn
1421  , from_user= &user);
1422 */
1423 /* monitor BiTemporal usage */
1424 %if &log=1 %then %do;
1425  %put syscc=&syscc;
1426  /* do not perform duration calc in pass through */
1427  %local dur;
1428  data _null_;
1429  now=symget('now');
1430  dur=%sysfunc(datetime())-&now;
1431  call symputx('dur',dur,'l');
1432  run;
1433  proc sql;
1434  insert into &dclib..mpe_dataloads
1435  set libref=%upcase("&base_lib")
1436  ,DSN=%upcase("&base_dsn")
1437  ,ETLSOURCE="&ETLSOURCE"
1438  ,LOADTYPE="&loadtype"
1439  ,CHANGED_RECORDS=%mf_getattrn(&lastds,NLOBS)
1440  ,NEW_RECORDS=%mf_getattrn(&outds_add,NLOBS)
1441  ,DELETED_RECORDS=%mf_getattrn(&outds_del,NLOBS)
1442  ,DURATION=&dur
1443  ,MAC_VER="v&ver"
1444  ,user_nm="&user"
1445  ,PROCESSED_DTTM=&now;
1446  quit;
1447  %put syscc=&syscc;
1448 %end;
1449 %end_of_macro:
1450 %mend bitemporal_dataloader;