bitemporal_dataloader.sas
Go to the documentation of this file.
1 /**
2  @file
3  @brief Routine supporting multiple load types
4  @details Generic loader for multiple load types (UPDATE, SCD2, BITEMPORAL).
5 
6  Handles all elements including metadata validation, PK checking, closeouts,
7  locking, logging, etc.
8 
9  The staging table must be prepared with a unique business key. For bitemporal
10  this means a snapshot at both technical AND business time.
11 
12 ASSUMPTIONS:
13  - Base table has relevant datetime vars: 2xTechnical, 2xBusiness, 1xProcessed
14  - Staging table omits Technical or Processed datetimes (has Business only)
15  - Base table has no column names containing the string "___TMP___"
16  - Base &tech_from variable is not nullable. This should always be the case
17  anyway whenbuilding permanent bitemporal datasets.. But the point is that
18  this field is used to identify new records after the initial left join
19  from staging to base table.
20 
21 NOTES:
22  - All queries against BiTemporal tables need two filter conditions, as such:
23 
24  where &bus_from LE [tstamp] LT &bus_to
25  AND &tx_from LE [tstamp] LT &tx_to
26 
27  One cannot use BETWEEN
28  One cannot use &xx_from LE [tstamp] LE &xx_from (equivalent to above).
29  Background:
30 http://stackoverflow.com/questions/20005950/best-practice-for-scd-date-pairs-closing-opening-timestamps
31 
32 Areas for optimisation
33  - loading temporal history (currently experimental)
34 
35  ## Supporting tables
36 
37  Supporting tables must exist in the library specified in the `dclib` param.
38 
39  ### MPE_DATALOADS
40 
41  This table is updated every time a successful load occurs, and includes
42  information such as:
43 
44  @li library
45  @li dataset
46  @li message (supplied in the ETLSOURCE param)
47  @li new rows
48  @li deleted rows
49  @li changed rows
50  @li timestamp
51  @li the user making the load
52  @li the version of (this) macro used to make the load
53 
54 
55  @param [in] APPEND_DSN= (APPENDTABLE) Name of STAGING table
56  @param [in] CONFIG_TABLE= (&dclib..MPE_CONFIG) The table containing library
57  engine specific config. The following scopes are supported:
58  @li DCBL_REDSH
59  @param [in] LOADTYPE= (BITEMPORAL) Supported types:
60  @li TXTEMPORAL - loads a buskey with version times
61  @li BUSTEMPORAL - loads buskey with bus + ver times
62  @li UPDATE - updates a buskey with NO history
63  @param [in] PROCESSED= (0) This column obtains a current timestamp for changed
64  records when loading the target table. Default is 0 (not set). If the
65  target table contains a variable called PROCESSED_DTTM, and processed=0,
66  then this column will be used for applying the current timestamp.
67  @param RK_MAXKEYTABLE= (mpe_maxkeyvalues) The maxkeytable to use (must exist
68  in DCLIB)
69  @param [in] PK= Business key, space separated. Should NOT include temporal
70  fields.
71  @param [in] RK_UNDERLYING= If supplied will generate an RK based on these
72  (space separated) business key fields. In this case only ONE PK field should
73  be supplied, which is assumed to be the RK. The RK field, plus underlying
74  fields, should all exist on the base table. The underlying fields should
75  exist on the staging table (the RK / PK field will be overwritten).
76  The staging table should also be unique on its PK.
77 
78  @param [in] dclib= (&dc_libref) The library containing DC configuration tables
79  @param [out] outds_del= (work.outds_del) Output table containing
80  deleted records
81  @param [out] outds_add= (work.outds_add) Output table containing
82  appended records
83  @param [out] outds_mod= (work.outds_mod) Output table containing
84  changed records
85  @param [out] outds_audit= (0) Load detailed changes to an audit table. Uses
86  the mp_storediffs.sas macro. Provide the base table here, to load.
87 
88  <h4> Global Variables </h4>
89  The following global macro variables are used. These should be replaced by
90  macro parameters in future releases.
91 
92  @li `dc_dttmtfmt`
93 
94  <h4> SAS Macros </h4>
95  @li bitemporal_closeouts.sas
96  @li dc_assignlib.sas
97  @li mf_existds.sas
98  @li mf_existvar.sas
99  @li mf_fmtdttm.sas
100  @li mf_getattrn.sas
101  @li mf_getengine.sas
102  @li mf_getschema.sas
103  @li mf_getuniquefileref.sas
104  @li mf_getuniquename.sas
105  @li mf_getuser.sas
106  @li mf_getvarlist.sas
107  @li mf_verifymacvars.sas
108  @li mf_wordsinstr1butnotstr2.sas
109  @li mp_abort.sas
110  @li mp_dropmembers.sas
111  @li mp_lockanytable.sas
112  @li mp_lockfilecheck.sas
113  @li mp_retainedkey.sas
114  @li mp_storediffs.sas
115 
116  @version 9.3
117  @author 4GL Apps Ltd.
118  @copyright 4GL Apps Ltd. This code may only be used within Data Controller
119  and may not be re-distributed or re-sold without the express permission of
120  4GL Apps Ltd.
121 
122  @warning multitemporal loads (bitemporal for multiple points in business time)
123  are in experimental stage
124 
125 **/
126 
127 %macro bitemporal_dataloader(
128  bus_from= /* Business FROM datetime variable. Req'd on
129  STAGING & BASE tables.*/
130  ,bus_to = /* Business TO datetime variable. Req'd on
131  STAGING & BASE tables. */
132  ,bus_from_override= /* Provide a hard coded BUS_FROM datetime value.*/
133  ,bus_to_override= /* provide a hard coded BUS_TO datetime value */
134  ,tech_from= /* Technical FROM datetime variable. Req'd on
135  BASE table only. */
136  ,tech_to = /* Technical TO datetime variable. Req'd on BASE
137  table only. */
138  ,processed= 0
139  ,base_lib=WORK /* Libref of the BASE table. */
140  ,base_dsn=BASETABLE /* Name of BASE table. */
141  ,append_lib=WORK /* Libref of the STAGING table. */
142  ,append_dsn=APPENDTABLE
143  ,high_date='01JAN5999:00:00:00'dt /* High date to close out records */
144  ,PK= name sex
145  ,RK_UNDERLYING=
146  ,KEEPVARS= /* Provides option for removing unwanted vars from append table */
147  ,RK_UPDATE_MAXKEYTABLE=NO /* If switching (or mix matching) with regular
148  SCD2 loader then set this switch to YES to
149  ensure the MAXKEYTABLE is updated with the
150  current maximum RK value for the target table
151  */
152  ,CHECK_UNIQUENESS=YES /* Perform a check of the APPEND table to ensure it is
153  unique on its business key */
154  ,ETLSOURCE=demo /* supply a value ($50.) to show as ETLSOURCE in
155  &dclib..DATALOADS */
156  ,LOADTYPE=BITEMPORAL
157  ,RK_MAXKEYTABLE= mpe_maxkeyvalues
158  ,LOG=1 /* Switch to 0 to prevent records being added to
159  &mpelib..mpe_DATALOADS (ie when testing)*/
160  ,DELETE_COL= _____DELETE__THIS__RECORD_____
161  /* If this variable is found in the append dataset
162  then records are closed out (or deleted) in the
163  append table where that variable= "Yes" */
164  ,LOADTARGET=YES /* set to anything but uppercase YES to switch off
165  target table load and generate temp tables only */
166  ,CLOSE_VARS=
167 /*a problem with regular SCD2 or TXTEMPORAL loads is that there is
168  no facility to close out removed records (all records are
169  assumed new or changed). But how does one determine which
170  records are removed? Short of loading the entire table
171  each time? This parameter allows a set of variables
172  (this should be a subset of the PK) to be declared, and
173  the macro will determine which records in the base table
174  need to be closed out ahead of the load.
175 
176  For instance, given the following:
177 
178  Base Table Staging Table
179  DATE ENTITY AMOUNT DATE ENTITY AMOUNT
180  JAN ACME4 66 JAN ACME4 66
181  FEB ACME4 99 FEB ACME4 99
182  FEB ACME1 22
183 
184  By supplying DATE in CLOSE_VARS and DATE ENTITY as the PK,
185  the "FEB PAG 22" record would get closed out.
186  */
187  ,config_table=&dclib..MPE_CONFIG
188  ,dclib=&dc_libref
189  ,outds_del=work.outds_del
190  ,outds_add=work.outds_add
191  ,outds_mod=work.outds_mod
192  ,outds_audit=0
193  );
194 
195 /* when changing this macro, update the version num here */
196 %local ver;
197 %let ver=32;
198 %put &sysmacroname entry vars:;
199 %put _local_;
200 
201 %dc_assignlib(WRITE,&base_lib) /* may not already be assigned */
202 
203 /* return straight away if nothing to load */
204 %let nobs= %mf_getattrn(&append_lib..&append_dsn,NLOBS);
205 %if &nobs=-1 %then %do;
206  proc sql noprint; select count(*) into: nobs from &append_lib..&append_dsn;
207 %end;
208 %if &nobs=0 %then %do;
209  %put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
210  %put NOTE- Base dataset &append_lib..&append_dsn is empty. Nothing to upload!;
211  %put NOTE-;%put NOTE-;%put NOTE-;
212  %return;
213 %end;
214 
215 /* hard exit if err condition exists */
216 %mp_abort(iftrue= (&syscc > 0)
217  ,mac=bitemporal_dataloader
218  ,msg=%str(Bitemporal transform / job aborted due to SYSCC=&SYSCC status;)
219 )
220 
221 %local engine_type;
222 %let engine_type=%mf_getengine(&base_lib);
223 %if (&engine_type=REDSHIFT or &engine_type=POSTGRES) and %length(&CLOSE_VARS)>0
224 %then %do;
225  %put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
226  %put NOTE- CLOSE_VARS functionality not yet supported in &engine_type;
227  %put NOTE-;%put NOTE-;%put NOTE-;
228  %return;
229 %end;
230 
231 /**
232  * The metadata functions (eg mf_existvar) will fail if the base table has a
233  * SAS lock. So, make a snapshot of the base table for further use.
234  * Also, make output tables (regardless).
235  */
236 %local basecopy;
237 %let basecopy=%mf_getuniquename(prefix=basecopy);
238 
239 data &basecopy &outds_mod &outds_add &outds_del;
240  set &base_lib..&base_dsn;
241  stop;
242 run;
243 %mp_abort(iftrue= (&syscc > 0)
244  ,mac=&_program
245  ,msg=%str(syscc=&syscc after base table copy - aborting due to table lock)
246 )
247 
248 
249 %local cols idx_pk md5_col ;
250 %let md5_col=___TMP___md5;
251 %let check_uniqueness=%upcase(&check_uniqueness);
252 %let RK_UPDATE_MAXKEYTABLE=%upcase(&RK_UPDATE_MAXKEYTABLE);
253 %let high_date=%unquote(&high_date);
254 %let loadtype=%upcase(&loadtype);
255 
256 /* ensure irrelevant variables are cleared */
257 %if &loadtype=BUSTEMPORAL %then %do;
258  %let tech_from=;
259  %let tech_to=;
260 %end;
261 %else %if &loadtype=TXTEMPORAL or &loadtype=UPDATE %then %do;
262  %let bus_from=;
263  %let bus_to=;
264 %end;
265 
266 /* ensure relevant variables are supplied */
267 %mp_abort(iftrue=(&loadtype=BITEMPORAL & %mf_verifymacvars(bus_from bus_to)=0)
268  ,mac=bitemporal_dataloader
269  ,msg=%str(Missing BUS_FROM / BUS_TO)
270 )
271 %mp_abort(iftrue=(&loadtype=TXTEMPORAL & %mf_verifymacvars(tech_from tech_to)=0)
272  ,mac=bitemporal_dataloader
273  ,msg=%str(Missing TECH_FROM / TECH_TO)
274 )
275 
276 /**
277  * drop any tables (may be defined as views or vice versa preventing overwrite)
278  */
279 %mp_dropmembers(append bitemp0_append bitemp_cols)
280 
281 /* SQL Server requires its own time values */
282 /* 9.2 will only give picture format down to seconds. 9.3 allows
283  milliseconds by using lower S and defining the decimal in the format name..*/
284 PROC FORMAT;
285  picture MyMSdt other='%0Y-%0m-%0dT%0H:%0M:%0S' (datatype=datetime);
286 RUN;
287 %local dbnow;
288 %let dbnow="%sysfunc(datetime(),%mf_fmtdttm())"dt;
289 
290 data _null_;
291  /* convert space separated macvar to comma separated for SQL processing */
292  call symputx('PK_COMMA',tranwrd(compbl("&pk"),' ',','),'L');
293  call symputx('PK_CNT',countw("&pk",' '),'L');
294  now=&dbnow;
295  call symputx('NOW',now,'L');
296  call symputx('SQLNOW',cats("'",put(now,MyMSdt.),"'"),'L');
297  length etlsource $100;
298  etlsource=subpad(symget('etlsource'),1,100);
299  call symputx('etlsource',etlsource,'l');
300 run;
301 
302 /**
303  * Even if no PROCESSED var provided, assume that any variable named
304  * PROCESSED_DTTM should be updated
305  */
306 %if &processed=0 %then %do;
307  %if %mf_existvar(&basecopy,PROCESSED_DTTM)
308  %then %let processed=PROCESSED_DTTM;
309  %else %let processed=;
310 %end;
311 
312 
313 /* extract colnames for md5 creation / change tracking */
314 proc contents noprint data=&base_lib..&base_dsn
315  out=work.bitemp_cols (keep=name type length varnum format:);
316 run;
317 proc sql noprint;
318 select name into: cols separated by ','
319  from work.bitemp_cols
320  where upcase(name) not in
321  (%upcase("&bus_from","&bus_to"
322  ,"&tech_from","&tech_to"
323  ,"&processed","&delete_col")) ;
324 select case when type in (2,6) then cats('put(md5(trim(',name,')),$hex32.)')
325  /* multiply by 1 to strip precision errors (eg 0 != 0) */
326  /* but ONLY if not missing, else will lose any special missing values */
327  else cats('put(md5(trim(put(ifn(missing('
328  ,name,'),',name,',',name,'*1),binary64.))),$hex32.)') end
329  into: stripcols separated by '||'
330  from work.bitemp_cols
331  where upcase(name) not in
332  (%upcase("&bus_from","&bus_to"
333  ,"&tech_from","&tech_to"
334  ,"&processed","&delete_col")) ;
335 
336 /* set default formats*/
337 %let bus_from_fmt = datetime19.;
338 %let bus_to_fmt = datetime19.;
339 %let processed_fmt = datetime19.;
340 
341 %let tech_from_fmt = format=datetime19.;
342 %let tech_to_fmt = format=datetime19.;
343 
344 
345 %put &=stripcols;
346 %put &=pk;
347 
348 data _null_;
349  set work.bitemp_cols;
350  if type=2 or type=6 then do;
351  length fmt $49.;
352  if format='' then fmt=cats('$',length,'.');
353  else fmt=cats(format,formatl,'.');
354  end;
355  else do;
356  if format='' then fmt=cats(length,'.');
357  else fmt=cats(format,formatl,'.',formatd);
358  end;
359  if upcase(name)="%upcase(&bus_from)" then
360  call symputx('bus_from_fmt',fmt,'L');
361  else if upcase(name)="%upcase(&bus_to)" then
362  call symputx('bus_to_fmt',fmt,'L');
363  else if upcase(name)="%upcase(&tech_from)" then
364  call symputx('tech_from_fmt',"format="!!fmt,'L');
365  else if upcase(name)="%upcase(&tech_to)" then
366  call symputx('tech_to_fmt',"format="!!fmt,'L');
367  else if upcase(name)="%upcase(&processed)" then
368  call symputx('processed_fmt',fmt,'L');
369 run;
370 
371 %if %index(%quote(&cols),___TMP___) %then %do;
372  %let msg=%str(Table contains a variable name containing "___TMP___".%trim(
373  ) This may conflict with temp variable generation!!);
374  %mp_abort(msg=&msg,mac=bitemporal_dataloader);
375  %let syscc=5;
376  %return;
377 %end;
378 
379 /* if transaction dates appear on the APPEND table, need to remove them */
380 %local drop_tx_dates /* used in append table */
381  drop_tx_dates_noobs /* used to take the base table structure */;
382 %if %mf_existvar(&append_lib..&append_dsn, &tech_from)
383  %then %let drop_tx_dates=&tech_from;
384 %if %mf_existvar(&append_lib..&append_dsn, &tech_to)
385  %then %let drop_tx_dates=&drop_tx_dates &tech_to;
386 %if %length(%trim(&drop_tx_dates))>0
387  %then %let drop_tx_dates=(drop=&drop_tx_dates);
388 
389 %if %mf_existvar(&basecopy, &tech_from)
390  %then %let drop_tx_dates_noobs=&tech_from;
391 %if %mf_existvar(&basecopy, &tech_to)
392  %then %let drop_tx_dates_noobs=&drop_tx_dates_noobs &tech_to;
393 %if %length(%trim(&drop_tx_dates_noobs))>0
394  %then %let drop_tx_dates_noobs=(drop=&drop_tx_dates_noobs obs=0);
395 %else %let drop_tx_dates_noobs=(obs=0);
396 
397 
398 /**
399  * Lock the table. This is necessary as we are doing a two part update (first
400  * closing records then appending new records). It is theoretically possible
401  * that an upload may occur whilst preparing the staging tables. And the
402  * staging tables are about to be prepared..
403  */
404 %if &LOADTARGET = YES %then %do;
405  %put locking &base_lib..&base_dsn;
406  %mp_lockanytable(LOCK,
407  lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,ctl_ds=&dclib..mpe_lockanytable
408  )
409  %if "&outds_audit" ne "0" %then %do;
410  %put locking &outds_audit;
411  %mp_lockanytable(LOCK
412  ,lib=%scan(&outds_audit,1,.)
413  ,ds=%scan(&outds_audit,2,.)
414  ,ref=&ETLSOURCE
415  ,ctl_ds=&dclib..mpe_lockanytable
416  )
417  %end;
418 %end;
419 %else %do;
420  /* not an actual load, so avoid updating the max key table in next step. */
421  %let rk_update_maxkeytable=NO;
422 %end;
423 
424 %if %length(&RK_UNDERLYING)>0 %then %do;
425  %mp_retainedkey(
426  base_lib=&base_lib
427  ,base_dsn=&base_dsn
428  ,append_lib=&append_lib
429  ,append_dsn=&append_dsn
430  ,retained_key=&pk
431  ,business_key=&rk_underlying
432  ,check_uniqueness=&CHECK_UNIQUENESS
433  ,outds=work.append
434  %if &rk_update_maxkeytable=NO %then %do;
435  ,maxkeytable=0
436  %end;
437  %else %do;
438  ,maxkeytable=&dclib..&RK_MAXKEYTABLE
439  %end;
440  ,locktable=&dclib..mpe_lockanytable
441  %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
442  ,filter_str=%str( (where=( &now < &tech_to)) )
443  %end;
444  )
445 %end;
446 %else %do;
447  proc sql;
448  create view work.append as select * from &append_lib..&append_dsn;
449 %end;
450 /**
451 * generate md5 for append table
452 */
453 /* it is possible the source dataset has additional (unwanted) columns.
454  Drop if specified; */
455 %if %length(&keepvars)>0 %then %do;
456  /* remove tech dates from keepvars as they are generated later */
457  %let keepvars=%sysfunc(tranwrd(%str( &keepvars ),%str( &tech_from ),%str( )));
458  %let keepvars=%sysfunc(tranwrd(%str( &keepvars ),%str( &tech_to ),%str( )));
459  %let keepvars=(keep=&keepvars &bus_from &bus_to &processed &md5_col);
460 %end;
461 
462 /* CAS varchar types cause append issues here, so perform autoconvert
463  by creating empty local table first */
464 data;
465  set &base_lib..&base_dsn &drop_tx_dates_noobs;
466 run;
467 %local emptybasetable; %let emptybasetable=&syslast;
468 
469 data work.bitemp0_append &keepvars &outds_del(drop=&md5_col )
470  %if "%substr(&sysver,1,1)" ne "4" and "%substr(&sysver,1,1)" ne "5" %then %do;
471  /nonote2err
472  %end;
473  ;
474  /* apply formats for bitemporal vars but not tx dates which are added later */
475  %if %length(&keepvars)>0 and &loadtype=BITEMPORAL %then %do;
476  format &bus_from &bus_from_fmt;
477  format &bus_to &bus_to_fmt;
478  %end;
479  set &emptybasetable /* base table reqd in case append has fewer cols */
480  work.append &drop_tx_dates;
481  %if %length(%str(&bus_from_override))>0 %then %do;
482  &bus_from= %unquote(&bus_from_override) ;
483  %end;
484  %if %length(%str(&bus_to_override))>0 %then %do;
485  &bus_to= %unquote(&bus_to_override) ;
486  %end;
487  length &md5_col $32;
488  &md5_col=put(md5(&stripcols),hex32.);
489  %if %length(&processed)>0 %then %do;
490  format &processed &processed_fmt;
491  &processed=&now;
492  %end;
493 
494 /**
495  * If a delete column exists then create the delete dataset
496  */
497 %if %mf_existvar(&append_lib..&append_dsn, &delete_col) %then %do;
498  drop &delete_col;
499  if upcase(&delete_col) = "YES" then output &outds_del ;
500  else output work.bitemp0_append ;
501  run;
502 
503  %if %mf_getattrn(&outds_del,NLOBS)>0 %then %do;
504  %bitemporal_closeouts(
505  tech_from=&tech_from
506  ,tech_to = &tech_to
507  ,base_lib=&base_lib
508  ,base_dsn=&base_dsn
509  ,append_lib=work
510  ,append_dsn=%scan(&outds_del,-1,.)
511  ,PK=&bus_from &pk
512  ,NOW=&dbnow
513  ,loadtarget=&loadtarget
514  ,loadtype=&loadtype
515  )
516  %end;
517 %end;
518 %else %do;
519  output work.bitemp0_append;
520  run;
521 %end;
522 
523 %mp_abort(iftrue= (&syscc gt 0 at line 494)
524  ,mac=&_program
525  ,msg=%str(syscc=&syscc)
526 )
527 
528 %if %length(&close_vars)>0 %then %do;
529  /**
530  * need to close out records that are not provided
531  */
532  proc sql;
533  create table bitemp1_closevars1 as
534  select distinct a.%mf_getquotedstr(in_str=&pk,dlm=%str(,a.),quote=)
535  from &base_lib..&base_dsn a
536  inner join work.bitemp0_append b
537  on 1=1
538  /* join on closevars key */
539  %do idx_pk=1 %to %sysfunc(countw(&close_vars));
540  %let idx_val=%scan(&close_vars,&idx_pk);
541  and a.&idx_val=b.&idx_val
542  %end;
543  /* filter base on tech dates if necessary */
544  %if &loadtype=TXTEMPORAL %then %do;
545  where a.&tech_from <=&now and &now < a.&tech_to
546  %end;
547  ;
548  create table bitemp1_closevars2 as
549  select distinct a.*
550  from bitemp1_closevars1 a
551  left join work.bitemp0_append b
552  on 1=1
553  /* join on primary key */
554  %do idx_pk=1 %to %sysfunc(countw(&pk));
555  %let idx_val=%scan(&pk,&idx_pk);
556  and a.&idx_val=b.&idx_val
557  %end;
558  /* identify removed records by null value in a field in PK but not close_vars
559  */
560  where b.%scan(
561  %mf_wordsInStr1ButNotStr2(Str1=&pk,Str2=&close_vars),1,%str( )
562  ) IS NULL
563  ;
564 
565  %if %mf_getattrn(bitemp1_closevars2,NLOBS)>0 %then %do;
566  %bitemporal_closeouts(
567  tech_from=&tech_from
568  ,tech_to = &tech_to
569  ,base_lib=&base_lib
570  ,base_dsn=&base_dsn
571  ,append_lib=work
572  ,append_dsn=bitemp1_closevars2
573  ,PK=&bus_from &pk
574  ,NOW=&dbnow
575  ,loadtarget=&loadtarget
576  ,loadtype=&loadtype
577  )
578  %end;
579 %end;
580 
581 /* return if nothing to load (was just deletes) */
582 %if %mf_getattrn(work.bitemp0_append,NLOBS)=0 %then %do;
583  %put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
584  %put NOTE- No updates - just deletes!;
585  %put NOTE-;%put NOTE-;%put NOTE-;
586 %end;
587 
588 
589 /**
590  * If applying manual overrides to business dates, then the input table MUST
591  * be unique on the PK. Check, and if not - abort.
592  */
593 %local msg;
594 %if %length(&bus_from_override.&bus_to_override)>0 or &CHECK_UNIQUENESS=YES
595 %then %do;
596  proc sort data=work.bitemp0_append out=work.bitemp0_check nodupkey;
597  by &pk;
598  run;
599  %if %mf_getattrn(work.bitemp0_check,NLOBS)
600  ne %mf_getattrn(work.bitemp0_append,NLOBS)
601  %then %do;
602  %let msg=INPUT table &append_lib..&append_dsn is not unique on PK (&pk);
603  %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE (&msg),
604  ctl_ds=&dclib..mpe_lockanytable
605  )
606  %mp_lockanytable(UNLOCK
607  ,lib=%scan(&outds_audit,1,.)
608  ,ds=%scan(&outds_audit,2,.)
609  ,ref=&ETLSOURCE
610  ,ctl_ds=&dclib..mpe_lockanytable
611  )
612  %mp_abort(msg=&msg,mac=bitemporal_dataloader.sas);
613  %end;
614 %end;
615 
616 
617 /**
618 * extract from BASE table. Only want matching records, as could be very BIG.
619 * New records are subsequently identified via left join and test for nulls.
620 */
621 %local temp_table temp_table2 base_table baselib_schema;
622 %put DCNOTE: Extracting matching observations from &base_lib..&base_dsn;
623 
624 %if &engine_type=OLEDB %then %do;
625  %let temp_table=##%mf_getuniquefileref(prefix=BTMP)_&base_dsn;
626  %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
627  %let base_table=(select * from [dbo].&base_dsn
628  where convert(datetime,&SQLNOW) < &tech_to );
629  %else %let base_table=[dbo].&base_dsn;
630  proc sql;
631  create table &base_lib.."&temp_table"n as
632  select * from work.bitemp0_append;
633  /* open up a connection for pass through SQL */
634  %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
635  create table work.bitemp0_base as select * from connection to myAlias(
636 %end;
637 %else %if &engine_type=REDSHIFT or &engine_type=POSTGRES %then %do;
638  /* grab schema */
639  %let baselib_schema=%mf_getschema(&base_lib);
640  %if &baselib_schema.X ne X %then %let baselib_schema=&baselib_schema..;
641 
642  /* grab redshift config */
643  %local redcnt; %let redcnt=0;
644  %if &engine_type=REDSHIFT %then %do;
645  data _null_;
646  set &config_table(where=(var_scope='DCBL_REDSH' and var_active=1));
647  x+1;
648  call symputx(cats('rednm',x),var_value,'l');
649  call symputx(cats('redval',x),var_value,'l');
650  call symputx('redcnt',x,'l');
651  run;
652  %end;
653  /* cannot persist temp tables so must create a temporary permanent table */
654  %let temp_table=%mf_getuniquename(prefix=XDCTEMP);
655  %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
656  %let base_table=(select * from &baselib_schema.&base_dsn
657  where timestamp &sqlnow < &tech_to );
658  %else %let base_table=&baselib_schema.&base_dsn;
659  /* make empty table first - must clone & drop extra cols as autoload is bad */
660  %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
661 
662  exec (create table &temp_table (like &baselib_schema.&base_dsn)) by myAlias;
663  %if &engine_type=REDSHIFT %then %do;
664  exec (alter table &temp_table alter sortkey none) by myAlias;
665  %end;
666  %local dropcols;
667  %let dropcols=%mf_wordsinstr1butnotstr2(
668  str1=%upcase(%mf_getvarlist(&basecopy))
669  ,str2=%upcase(&pk)
670  );
671  %if %length(&dropcols>0) %then %do idx_pk=1 %to %sysfunc(countw(&dropcols));
672  %put &=dropcols;
673  %let idx_val=%scan(&dropcols,&idx_pk);
674  exec(alter table &temp_table drop column &idx_val;) by myAlias;
675  %end;
676  exec (alter table &temp_table add column &md5_col varchar(32);) by myAlias;
677  /* create view to strip formats and avoid warns in log */
678  data work.vw_bitemp0/view=work.vw_bitemp0;
679  set work.bitemp0_append(keep=&pk &md5_col);
680  format _all_;
681  run;
682  proc append base=&base_lib..&temp_table
683  %if &engine_type=REDSHIFT %then %do;
684  (
685  %do idx_pk=1 %to &redcnt;
686  &&rednm&idx_pk = &&redval&idxpk
687  %end;
688  )
689  %end;
690  data=work.vw_bitemp0 force nowarn;
691  run;
692  /* open up a connection for pass through SQL */
693  %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
694  create table work.bitemp0_base as select * from connection to myAlias(
695 %end;
696 %else %if &engine_type=CAS %then %do;
697  %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
698  %let base_table=&base_lib..&base_dsn
699  (where=(&tech_from <=&now and &now < &tech_to));
700  %else %let base_table=&base_lib..&base_dsn;
701  %let temp_table=CASUSER.%mf_getuniquename(prefix=DC);
702  data &temp_table;
703  set work.bitemp0_append;
704  run;
705  %let bitemp0base=CASUSER.%mf_getuniquename(prefix=DC);
706  proc fedsql sessref=dcsession;
707  create table &bitemp0base{options replace=true} as
708 %end;
709 %else %do;
710  %let temp_table=work.bitemp0_append;
711  %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
712  %let base_table=&base_lib..&base_dsn
713  (where=(&tech_from <=&now and &now < &tech_to));
714  %else %let base_table=&base_lib..&base_dsn;
715  proc sql;
716  create table work.bitemp0_base as
717 %end;
718 
719  select a.&md5_col /* this identifies NEW records */
720  , b.*
721  /* assume first PK field cannot be null (if defined in a PK constraint then
722  it definitely cannot be null) */
723  , case when b.%scan(&pk,1) IS NULL then 1 else 0 end as ___TMP___NEW_FLG
724  from &baselib_schema.&temp_table a
725  left join &base_table b
726  on 1=1
727 %do idx_pk=1 %to &pk_cnt;
728  %let idx_val=%scan(&pk,&idx_pk);
729  and a.&idx_val=b.&idx_val
730 %end;
731 
732 
733 %if &engine_type=OLEDB or &engine_type=REDSHIFT or &engine_type=POSTGRES
734 %then %do;
735  ); proc sql; drop table &base_lib.."&temp_table"n;
736 %end;
737 %else %if &engine_type=CAS %then %do;
738  ;
739  quit;
740  data work.bitemp0_base;
741  set &bitemp0base;
742  run;
743  proc sql;
744  drop table &temp_table;
745  drop table &bitemp0base;
746 %end;
747 %else %do;
748  ;
749 %end;
750 
751 /**
752 * matching & changed records are those without NULL key values
753 * &idx_val resolves to rightmost PK value (loop above)
754 */
755 %put syscc (line525)=&syscc, sqlrc=&sqlrc;
756 %mp_abort(iftrue= (&syscc gt 0 or &sqlrc>0)
757  ,mac=&_program
758  ,msg=%str(syscc=&syscc sqlrc=&sqlrc)
759 )
760 
761 %put hashcols2=&stripcols;
762 proc sql;
763 create table work.bitemp1_current(drop=___TMP___NEW_FLG) as
764  select *
765  , put(md5(&stripcols),$hex32.) as &md5_col
766  from work.bitemp0_base (drop=&md5_col)
767  where ___TMP___NEW_FLG=0;
768 
769 /**
770 * NEW records were identified in ___TMP___NEW_FLG in bitemp0_base
771 */
772 proc sql;
773 create table &outds_add
774  (drop=&md5_col
775  %if %mf_existvar(work.bitemp0_base, &delete_col) %then %do;
776  &delete_col
777  %end;
778  )
779  as select a.*
780  %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
781  ,&now as &tech_from &tech_from_fmt
782  ,&high_date as &tech_to &tech_to_fmt
783  %end;
784  from work.bitemp0_append a /* STAGING records (mix of existing & new) */
785  , work.bitemp0_base b /* BASE records (contains null values for new) */
786  where a.&md5_col=b.&md5_col /* took staging md5 across in left join */
787  and b.___TMP___NEW_FLG=1; /* NEW records also identified in bitemp0_base */
788 
789 
790 /**
791 * identify INSERTS. These are records with the same business key but
792 * the bus_from and bus_to value are higher / lower (respectively)
793 * such that the existing record needs to be SPLIT to surround the new
794 * record.
795 * eg: OLD RECORD from=1 to=10
796 * NEW RECORD from=5 to=7
797 *
798 * APPENDED RECORDS:
799 * - from=1 to=5
800 * - from=5 to=7
801 * - from=7 to=10
802 */
803 
804 /* inserts cannot happen with TXTEMPORAL */
805 %if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
806  /* IDENTIFY */
807  create table work.bitemp3_inserts as
808  select b.*
809  ,a.&bus_from as ___TMP___from
810  ,a.&bus_to as ___TMP___to
811  from work.bitemp0_append a
812  ,work.bitemp1_current b
813  where a.&bus_from > b.&bus_from
814  and a.&bus_to < b.&bus_to
815  %do idx_pk=1 %to &pk_cnt;
816  %let idx_val=%scan(&pk,&idx_pk);
817  and a.&idx_val=b.&idx_val
818  %end;
819  order by
820  /* compress blanks and then insert commas (as the datetime fields may
821  not be in use) */
822  %sysfunc(tranwrd(%sysfunc(compbl(
823  &pk &bus_from &bus_to &processed
824  )),%str( ), %str(,)))
825  ;
826 
827  /* SPLIT */
828  data work.bitemp3a_inserts (drop=___TMP___from ___TMP___retain ___TMP___to) ;
829  set work.bitemp3_inserts;
830  by &pk &bus_from &bus_to &processed;
831  if first.&idx_val then do;
832  ___TMP___retain=&bus_to;
833  &bus_to=___TMP___from;
834  output;
835  &bus_to=___TMP___retain;
836  end;
837  if last.&idx_val then do;
838  &bus_from=___TMP___to;
839  output;
840  end;
841  run;
842 %end;
843 %else %do;
844  /* TX temporal load */
845  data work.bitemp3a_inserts;
846  set work.bitemp1_current;
847  stop;
848  run;
849 %end;
850 /* APPEND */
851 proc sql;
852 create view work.bitemp3a_view as
853  select * from work.bitemp1_current
854  where &md5_col not in (select &md5_col from work.bitemp3a_inserts);
855 
856 data bitemp3b_newbase;
857  set work.bitemp3a_inserts work.bitemp3a_view;
858 run;
859 
860 /** do not use! this converts short numerics into 8 bytes
861 proc sql;
862 create table work.bitemp3b_newbase as
863  select * from work.bitemp3a_inserts
864 union corr
865  select * from work.bitemp1_current
866  where &md5_col not in (select &md5_col from work.bitemp3a_inserts);
867 */
868 
869 /**
870 * identify CHANGED records from staging.
871 * Same business key with different temporal dates or md5 value
872 * This table must be overlayed onto / into existing business history
873 */
874 proc sql;
875 create table work.bitemp4_updated as select distinct a.*
876  from work.bitemp0_append a
877  ,work.bitemp3b_newbase b
878  where 1=1
879  %do idx_pk=1 %to &pk_cnt;
880  %let idx_val=%scan(&pk,&idx_pk);
881  and a.&idx_val=b.&idx_val
882  %end;
883  and ( a.&md5_col ne b.&md5_col
884  %if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
885  OR (a.&bus_from ne b.&bus_from or a.&bus_to ne b.&bus_to)
886  %end;
887  )
888 ;
889 
890 /**
891  * This section would have been one simple step with union all
892  * but that converts short numerics into 8 bytes!
893  * so, convoluted alternative to retain the same functionality.
894  */
895 
896 /* base records */
897 create view work.bitemp4_prep1 as
898  select 'BASE' as ___TMP___
899  ,b.*
900  from work.bitemp4_updated a
901  ,work.bitemp3b_newbase b
902  where 1
903  %do idx_pk=1 %to &pk_cnt;
904  %let idx_val=%scan(&pk,&idx_pk);
905  and a.&idx_val=b.&idx_val
906  %end;
907  ;
908 /* updated records */
909 create view work.bitemp4_prep2 as
910  select 'STAG' as ___TMP___ ,*
911  from work.bitemp4_updated;
912 /* ensure we only keep columns that appear in both */
913 %local bp1 bp2 bp3 bp4;
914 %let bp1=%mf_getvarlist(bitemp4_prep1);
915 %let bp2=%mf_getvarlist(bitemp4_prep2);
916 %let bp3=%mf_wordsInStr1ButNotStr2(Str1=&bp1,Str2=&bp2);
917 %let bp4=%mf_wordsInStr1ButNotStr2(Str1=&bp2,Str2=&bp1);
918 data work.bitemp4_prep3/view=bitemp4_prep3;
919  set bitemp4_prep1 bitemp4_prep2;
920 %if %length(XX&bp3&bp4)>2 %then %do;
921  drop &bp3 &bp4 ;
922 %end;
923 run;
924 /* remove duplicates */
925 proc sql;
926 create table work.bitemp4a_allrecs as
927  select distinct *
928  from work.bitemp4_prep3
929  order by
930  /* compress blanks and then insert commas (as the datetime fields
931  may not be in use) */
932  %sysfunc(tranwrd(%sysfunc(compbl(
933  &pk &bus_from &bus_to &processed
934  )),%str( ), %str(,)))
935  ;
936 
937 %if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
938  /* this section aligns the business dates
939  (eg for inserts or overlaps in the range) */
940  data work.bitemp4b_firstpass (drop=___TMP___cond ___TMP___from ___TMP___to );
941  set work.bitemp4a_allrecs;
942  by &pk &bus_from &bus_to &processed;
943  retain ___TMP___cond 'Name of Condition';
944  retain ___TMP___from ___TMP___to 0;
945  ___TMP___md5lag=lag(&md5_col);
946  /* reset retained variables */
947  if first.&idx_val then do;
948  call missing (___TMP___cond, ___TMP___from, ___TMP___to,___TMP___md5lag);
949  end;
950  else do;
951  /* if record is identical, carry forward bus_from (and bus_to if higher)*/
952  if &md5_col=___TMP___md5lag then do;
953  &bus_from=___TMP___from;
954  if &bus_to<___TMP___to then &bus_to=___TMP___to;
955  end;
956  end;
957 
958  if ___TMP___='STAG' then do;
959  /* need to carry forward the closing record */
960  ___TMP___cond='Condition 1';
961  end;
962  else if ___TMP___cond='Condition 1' then do;
963  /* else ensure bus_from starts from prior record bus_to */
964  if &md5_col ne ___TMP___md5lag and &bus_from <= ___TMP___to
965  then &bus_from= ___TMP___to;
966  /* new record may replace old record entirely */
967  if &bus_to <= &bus_from then delete;
968  else call missing (___TMP___cond, ___TMP___from, ___TMP___to);
969  end;
970  ___TMP___from=&bus_from;
971  ___TMP___to=&bus_to;
972  run;
973 %end;
974 %else %do;
975  /* keep staged records only */
976  data work.bitemp4b_firstpass;
977  set work.bitemp4a_allrecs;
978  if ___TMP___='STAG';
979  run;
980 %end;
981 
982 /* next phase is to pass through in reverse - so set up the sort statement */
983 %local byvar;
984 %do idx_pk=1 %to &pk_cnt;
985  %let byvar=&byvar descending %scan(&pk,&idx_pk);
986 %end;
987 %if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL
988 %then %let byvar=&byvar descending &bus_from descending &bus_to;
989 /* if matching bus dates supplied, need to ensure we also have a sort
990  between BASE and STAGING tables */
991 %let byvar=&byvar descending ___TMP___;
992 
993 proc sort data=work.bitemp4b_firstpass out=work.bitemp4c_sort ;
994  by &byvar;
995 run;
996 
997 /**
998 * Now (in reverse) pass back business start dates
999 */
1000 data work.bitemp4d_secondpass;
1001 %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
1002  &tech_from=&now;
1003  &tech_to=&high_date;
1004 %end;
1005  set work.bitemp4c_sort ;
1006  by &byvar;
1007  retain ___TMP___cond 'Name of Condition';
1008  retain ___TMP___from ___TMP___to 0;
1009 %if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
1010 /* put / _all_ /;*/
1011  ___TMP___md5lag=lag(&md5_col);
1012  if first.&idx_val then do;
1013  /* reset retained variables */
1014  call missing (___TMP___cond,___TMP___from,___TMP___to,___TMP___md5lag);
1015  end;
1016  else do;
1017  /* if record is identical, carry back bus_to */
1018  if &md5_col=___TMP___md5lag then &bus_to=___TMP___to;
1019  end;
1020 
1021  if ___TMP___='STAG' then do;
1022  /* need to carry forward the closing record */
1023  ___TMP___cond='Condition 2';
1024  end;
1025  else if ___TMP___cond='Condition 2' then do;
1026  /* else ensure bus_to stops at subsequent record bus_from */
1027  if &md5_col ne ___TMP___md5lag and &bus_to >= ___TMP___from
1028  then &bus_to= ___TMP___from;
1029  /* new record may replace old record entirely */
1030  if &bus_from >= &bus_to then delete;
1031  if &bus_from=___TMP___from and &bus_to=___TMP___to then delete;
1032  else call missing (___TMP___cond, ___TMP___from, ___TMP___to);
1033  end;
1034  ___TMP___from=&bus_from;
1035  ___TMP___to=&bus_to;
1036 
1037 %end;
1038 run;
1039 %put syscc (line600)=&syscc;
1040 /**
1041  There may still be some records (eg old business history) which have not
1042  changed.
1043  Need to identify these and remove from the append so they are not updated
1044  unnecessarily. This is done by generating a new md5 (which INCLUDES the
1045  business key) and any matching / identical records are split out (from those
1046  that need to be updated).
1047 */
1048 
1049 %if &loadtype=BITEMPORAL %then %do;
1050  %let cat_string=catx('|' ,&bus_from,&bus_to);
1051 
1052  data work.bitemp5a_lkp (keep=&md5_col)
1053  %if "%substr(&sysver,1,1)" ne "4" & "%substr(&sysver,1,1)" ne "5" %then %do;
1054  /nonote2err
1055  %end;
1056  ;
1057  set work.bitemp0_base;
1058  /* for BITEMPORAL we need to compare business dates also */
1059  &md5_col=put(md5(&cat_string!!'|'!!&stripcols),$hex32.);
1060  run;
1061 
1062  data bitemp5b_updates;
1063  set bitemp4d_secondpass;
1064  if _n_=1 then do;
1065  dcl hash md5_lkp(dataset:'bitemp5a_lkp');
1066  md5_lkp.definekey("&md5_col");
1067  md5_lkp.definedone();
1068  end;
1069  /* drop old md5 col as will rebuild with new business dates */
1070  &md5_col=put(md5(&cat_string!!'|'!!&stripcols),$hex32.) ;
1071  if md5_lkp.check()=0 then delete;
1072  run;
1073 
1074  proc sql;
1075  /* get min bus from as will update (close out) all records from this point
1076  (for that PK)*/
1077  create table work.bitemp5d_subquery as
1078  select &pk_comma, min(&bus_from)as &bus_from, max(&bus_to) as &bus_to
1079  from work.bitemp5b_updates
1080  group by &pk_comma;
1081  /* index has a huge efficiency impact on upcoming nested subquery */
1082  create index index1 on work.bitemp5d_subquery(&pk_comma,&bus_from, &bus_to);
1083 
1084  %let lastds=work.bitemp5b_updates;
1085 %end;
1086 %else %if &loadtype=TXTEMPORAL or &loadtype=UPDATE %then %do;
1087  proc sql;
1088  create table work.bitemp5d_subquery as
1089  select distinct &pk_comma
1090  from bitemp4d_secondpass;
1091  %let lastds=work.bitemp4d_secondpass;
1092 %end;
1093 %else %let lastds=work.bitemp4d_secondpass;
1094 
1095 /* create single append table (an overlapped pre-sert may be classed as
1096  both an update AND a new record). Also create temp views that may be
1097  used for pre-load analysis. */
1098 data &outds_mod;
1099  set &lastds(drop=___TMP___: &md5_col);
1100 run;
1101 
1102 data bitemp6_allrecs / view=bitemp6_allrecs;
1103  set &outds_mod /* UPDATED records */
1104  &outds_add /* NEW records */;
1105 run;
1106 
1107 proc sort data=work.bitemp6_allrecs
1108  out=work.bitemp6_unique
1109  noduprec
1110  dupout=work.xx_BADBADBAD;
1111 by _all_;
1112 run;
1113 
1114 /* we have all our temp tables now so exit if this is all that is needed */
1115 %if &LOADTARGET ne YES %then %return;
1116 
1117 /* also exit if an err condition exists */
1118 
1119 %if &syscc>0 %then %do;
1120  %put syscc=&syscc;
1121  %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
1122  ctl_ds=&dclib..mpe_lockanytable
1123  )
1124  %if "&outds_audit" ne "0" %then %do;
1125  %mp_lockanytable(UNLOCK
1126  ,lib=%scan(&outds_audit,1,.)
1127  ,ds=%scan(&outds_audit,2,.)
1128  ,ref=&ETLSOURCE
1129  ,ctl_ds=&dclib..mpe_lockanytable
1130  )
1131  %end;
1132 %end;
1133 %mp_abort(iftrue= (&syscc>0)
1134  ,mac=&sysmacroname in &_program
1135  ,msg=%str(Bitemporal transform / job aborted due to SYSCC=&SYSCC status)
1136 )
1137 
1138 /* final check - abort if a lock has appeared on the target or audit table */
1139 %mp_lockfilecheck(libds=&base_lib..&base_dsn)
1140 %if %mf_existds(&outds_audit) %then %do;
1141  %mp_lockfilecheck(libds=&outds_audit)
1142 %end;
1143 
1144 /**
1145 * STAGING TABLES PREPARED, ERR CONDITION TESTED FOR.. NOW TO LOAD!!
1146 */
1147 
1148 /**
1149 * First, CLOSE OUT changed records (if not a REPLACE)
1150 * Note that SAS does not support ANSI standard for UPDATE with a join condition.
1151 * However - this can be worked around using a nested subquery..
1152 */
1153 data _null_;
1154  putlog "&sysmacroname: CLOSEOUTS commencing";
1155 run;
1156 
1157 %if %mf_getattrn(&lastds,NLOBS)=0 %then %do;
1158  data _null_;
1159  putlog "&sysmacroname: No closeouts needed";
1160  run;
1161 %end;
1162 %else %if &engine_type=CAS %then %do;
1163  %mp_abort(iftrue= (&loadtype=BITEMPORAL or &loadtype=TXTEMPORAL)
1164  ,mac=&sysmacroname in &_program
1165  ,msg=%str(&loadtype not yet supported in CAS engine)
1166  )
1167  /* create temp table for deletions */
1168  %local delds;%let delds=%mf_getuniquename(prefix=DC);
1169  data casuser.&delds;
1170  set work.bitemp5d_subquery;
1171  run;
1172  /* delete the records */
1173  proc cas ;
1174  table.deleteRows / table={
1175  caslib="&base_lib",
1176  name="&base_dsn",
1177  where="1=1",
1178  whereTable={caslib='CASUSER',name="&delds"}
1179  };
1180  quit;
1181  /* drop temp table */
1182  proc sql;
1183  drop table CASUSER.&delds;
1184 %end;
1185 %else %if (&loadtype=BITEMPORAL or &loadtype=TXTEMPORAL or &loadtype=UPDATE)
1186 %then %do;
1187  data _null_;
1188  putlog "&sysmacroname: &loadtype operation using &engine_type engine";
1189  run;
1190  %local flexinow;
1191  proc sql;
1192  /* if OLEDB then create a temp table for efficiency */
1193  %local innertable;
1194  %if &engine_type=OLEDB %then %do;
1195  %let innertable=[&temp_table];
1196  %let top_table=[dbo].&base_dsn;
1197  %let flexinow=&SQLNOW;
1198  create table &base_lib.."&temp_table"n as
1199  select * from work.bitemp5d_subquery;
1200  /* open up a connection for pass through SQL */
1201  %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
1202  execute(
1203  %end;
1204  %else %if &engine_type=REDSHIFT or &engine_type=POSTGRES %then %do;
1205  %let innertable=%mf_getuniquename(prefix=XDCTEMP);
1206  %let top_table=&baselib_schema.&base_dsn;
1207  %let flexinow=timestamp &SQLNOW;
1208  /* make empty table first - must clone & drop extra cols
1209  as autoload is bad */
1210  %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
1211  exec (create table &innertable (like &baselib_schema.&base_dsn)) by myAlias;
1212  %if &engine_type=REDSHIFT %then %do;
1213  exec (alter table &innertable alter sortkey none) by myAlias;
1214  %end;
1215  %let dropcols=%mf_wordsinstr1butnotstr2(
1216  str1=%upcase(%mf_getvarlist(&basecopy))
1217  ,str2=%upcase(%mf_getvarlist(work.bitemp5d_subquery))
1218  );
1219  %if %length(&dropcols>0) %then %do idx_pk=1 %to %sysfunc(countw(&dropcols));
1220  %put &=dropcols;
1221  %let idx_val=%scan(&dropcols,&idx_pk);
1222  exec(alter table &innertable drop column &idx_val;) by myAlias;;
1223  %end;
1224  /* create view to strip formats and avoid warns in log */
1225  data work.vw_bitemp5d/view=work.vw_bitemp5d;
1226  set work.bitemp5d_subquery;
1227  format _all_;
1228  run;
1229  proc append base=&base_lib..&innertable (
1230  %do idx_pk=1 %to &redcnt;
1231  &&rednm&idx_pk = &&redval&idxpk
1232  %end;
1233  )
1234  data=work.vw_bitemp5d force nowarn;
1235  run;
1236  /* open up a connection for pass through SQL */
1237  %dc_assignlib(WRITE,&base_lib,passthru=myAlias)
1238  execute(
1239  %end;
1240  %else %do;
1241  %let innertable=bitemp5d_subquery;
1242  %let top_table=&base_lib..&base_dsn;
1243  %let flexinow=&now;
1244  %end;
1245 
1246 
1247  %if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
1248  update &top_table set &tech_to=&flexinow
1249  %if %length(&processed)>0 %then %do;
1250  ,&processed=&flexinow
1251  %end;
1252  where &tech_from <= &flexinow and &flexinow < &tech_to and
1253  %end;
1254  %else %if &loadtype=UPDATE %then %do;
1255  /* changed records are deleted then re-appended when doing UPDATEs */
1256  delete from &top_table where
1257  %end;
1258  %else %do;
1259  %put %str(ERR)OR: BUSTEMPORAL NOT YET SUPPORTED;
1260  %let syscc=5;
1261  %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
1262  ctl_ds=&dclib..mpe_lockanytable
1263  )
1264  %mp_lockanytable(UNLOCK
1265  ,lib=%scan(&outds_audit,1,.)
1266  ,ds=%scan(&outds_audit,2,.)
1267  ,ref=&ETLSOURCE
1268  ,ctl_ds=&dclib..mpe_lockanytable
1269  )
1270  %goto end_of_macro;
1271  %end;
1272 
1273  /* perform join inside query as per
1274  http://stackoverflow.com/questions/24629793/update-with-a-proc-sql */
1275 
1276  exists( select 1 from &baselib_schema.&innertable where
1277 
1278  /* loop PK join */
1279  %do idx_pk=1 %to &pk_cnt;
1280  %let idx_val=%scan(&pk,&idx_pk);
1281  &base_dsn..&idx_val=&innertable..&idx_val and
1282  %end;
1283  %if &loadtype=BITEMPORAL %then %do;
1284  &base_dsn..&bus_from >= &innertable..&bus_from
1285  and &base_dsn..&bus_to <= &innertable..&bus_to and
1286  %end;
1287 
1288  /* close the statement */
1289 
1290  1=1);
1291 
1292  %if &engine_type=OLEDB or &engine_type=REDSHIFT or &engine_type=POSTGRES
1293  %then %do;
1294  ) by myAlias;
1295  execute (drop table &baselib_schema.&innertable) by myAlias;
1296  %end;
1297 %end;
1298 quit;
1299 data _null_;
1300  putlog "&sysmacroname: Closeout complete";
1301 run;
1302 /**
1303  * Append the new / updated records
1304  */
1305 %if &engine_type=CAS %then %do;
1306 
1307  /* get varchar variables ready for casting */
1308  %local vcfmt vcrename vcassign vcdrop;
1309  data _null_;
1310  set work.bitemp_cols(where=(type=6)) end=last;
1311  length vcrename vcassign vcdrop vcfmt $32767 rancol $32;
1312  retain vcrename vcassign vcdrop vcfmt;
1313  if _n_=1 then vcrename='(rename=(';
1314  rancol=resolve('%mf_getuniquename()');
1315  vcfmt=trim(vcfmt)!!'length '!!cats(name)!!' varchar(*);';
1316  vcrename=trim(vcrename)!!' '!!cats(name,'=',rancol);
1317  vcassign=cats(vcassign,name,'=',rancol,';');
1318  vcdrop=cats(vcdrop,'drop '!!rancol,';');
1319  if last then do;
1320  vcrename=cats(vcrename,'))');
1321  call symputx('vcfmt',vcfmt);
1322  call symputx('vcrename',vcrename);
1323  call symputx('vcassign',vcassign);
1324  call symputx('vcdrop',vcdrop);
1325  end;
1326  run;
1327 
1328  /* prepare a temp cas table with varchars casted */
1329  %let tmp=%mf_getuniquename();
1330  data casuser.&tmp ;
1331  &vcfmt
1332  set work.bitemp6_unique &vcrename;
1333  &vcassign
1334  &vcdrop
1335  run;
1336 
1337  /* load the table with varchars applied*/
1338  data &base_lib..&base_dsn (append=yes )/sessref=dcsession ;
1339  set casuser.&tmp;
1340  run;
1341 
1342  /* drop temp table */
1343  proc sql;
1344  drop table CASUSER.&tmp;
1345 
1346  /* this code will not work as regular tables do not have varchars */
1347  /*
1348  proc casutil;
1349  load data=work.bitemp6_unique
1350  outcaslib="&base_lib" casout="&base_dsn" append ;
1351  quit;
1352  */
1353 %end;
1354 %else %if &engine_type=REDSHIFT or &engine_type=POSTGRES %then %do;
1355  proc append base=&base_lib..&base_dsn
1356  %if &engine_type=REDSHIFT %then %do;
1357  (
1358  %do idx_pk=1 %to &redcnt;
1359  &&rednm&idx_pk = &&redval&idxpk
1360  %end;
1361  )
1362  %end;
1363  data=bitemp6_unique force nowarn;
1364  run;
1365 %end;
1366 %else %do;
1367  proc append base=&base_lib..&base_dsn data=bitemp6_unique force nowarn; run;
1368 %end;
1369 
1370 %mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
1371  ctl_ds=&dclib..mpe_lockanytable
1372 )
1373 
1374 /* final check on syscc */
1375 %mp_abort(iftrue= (&syscc >4)
1376  ,mac=&_program
1377  ,msg=%str(!!Upload NOT successful!! Failed on actual update / append stage..)
1378 )
1379 
1380 %if &outds_audit ne 0 and &LOADTARGET=YES %then %do;
1381  data work.vw_outds_orig /view=work.vw_outds_orig;
1382  set work.bitemp0_base (drop=&md5_col);
1383  where ___TMP___NEW_FLG=0;
1384  drop ___TMP___NEW_FLG;
1385  run;
1386  /* update the AUDIT table */
1387  %if %mf_existds(&outds_audit) %then %do;
1388  options mprint;
1389  %mp_storediffs(&base_lib..&base_dsn
1390  ,work.vw_outds_orig
1391  ,&pk &bus_from
1392  ,delds=&outds_del
1393  ,modds=&outds_mod
1394  ,appds=&outds_add
1395  ,outds=work.mp_storediffs
1396  ,processed_dttm=&now
1397  ,loadref=%superq(etlsource)
1398  )
1399  /* exclude unchanged values in modified rows */
1400  data work.mp_storediffs;
1401  set work.mp_storediffs;
1402  if MOVE_TYPE="M" and IS_PK=0 and IS_DIFF=0 then delete;
1403  * putlog load_ref= libref= dsn= key_hash= tgtvar_nm=;
1404  run;
1405  proc append base=&outds_audit data=work.mp_storediffs;
1406  run;
1407  %mp_lockanytable(UNLOCK
1408  ,lib=%scan(&outds_audit,1,.)
1409  ,ds=%scan(&outds_audit,2,.)
1410  ,ref=&ETLSOURCE
1411  ,ctl_ds=&dclib..mpe_lockanytable
1412  )
1413  %end;
1414 %end;
1415 %mp_abort(iftrue= (&syscc >4)
1416  ,mac=bitemporal_dataloader
1417  ,msg=%str(Problem in audit stage (&outds_audit))
1418 )
1419 
1420 %let user=%mf_getUser();
1421 /**
1422  Notify as appropriate EMAILS DISABLED
1423 
1424 %sumo_alerts(ALERT_EVENT=UPDATE
1425  , ALERT_TARGET=&base_lib..&base_dsn
1426  , from_user= &user);
1427 */
1428 /* monitor BiTemporal usage */
1429 %if &log=1 %then %do;
1430  %put syscc=&syscc;
1431  /* do not perform duration calc in pass through */
1432  %local dur;
1433  data _null_;
1434  now=symget('now');
1435  dur=%sysfunc(datetime())-&now;
1436  call symputx('dur',dur,'l');
1437  run;
1438  proc sql;
1439  insert into &dclib..mpe_dataloads
1440  set libref=%upcase("&base_lib")
1441  ,DSN=%upcase("&base_dsn")
1442  ,ETLSOURCE="&ETLSOURCE"
1443  ,LOADTYPE="&loadtype"
1444  ,CHANGED_RECORDS=%mf_getattrn(&lastds,NLOBS)
1445  ,NEW_RECORDS=%mf_getattrn(&outds_add,NLOBS)
1446  ,DELETED_RECORDS=%mf_getattrn(&outds_del,NLOBS)
1447  ,DURATION=&dur
1448  ,MAC_VER="v&ver"
1449  ,user_nm="&user"
1450  ,PROCESSED_DTTM=&now;
1451  quit;
1452  %put syscc=&syscc;
1453 %end;
1454 %end_of_macro:
1455 %mend bitemporal_dataloader;