sql >> Database >  >> RDS >> Sqlserver

Incrementele belasting implementeren met behulp van Change Data Capture in SQL Server

Dit artikel is interessant voor diegenen die vaak te maken hebben met data-integratie.

Inleiding

Stel dat er een database is waar gebruikers altijd gegevens wijzigen (bijwerken of verwijderen). Misschien wordt deze database gebruikt door een grote applicatie die het niet toestaat om de tabelstructuur te wijzigen. De taak is om van tijd tot tijd gegevens uit deze database naar een andere database op een andere server te laden. De eenvoudigste manier om het probleem aan te pakken, is door de nieuwe gegevens van een brondatabase naar een doeldatabase te laden en de doeldatabase vooraf op te schonen. U kunt deze methode gebruiken zolang de tijd om gegevens te laden acceptabel is en de vooraf ingestelde deadlines niet overschrijdt. Wat als het meerdere dagen duurt om gegevens te laden? Bovendien leiden onstabiele communicatiekanalen tot de situatie dat het laden van gegevens stopt en opnieuw wordt gestart. Als u met deze obstakels wordt geconfronteerd, raad ik u aan een van de 'data reloading'-algoritmen te overwegen. Dit betekent dat alleen gegevenswijzigingen hebben plaatsgevonden sinds de laatste lading is geladen.

CDC

In SQL Server 2008 introduceerde Microsoft een datatrackingmechanisme genaamd Change Data Capture (CDC). In grote lijnen is het doel van dit mechanisme dat het inschakelen van CDC voor elke databasetabel een systeemtabel in dezelfde database zal creëren met een vergelijkbare naam als de originele tabel (het schema is als volgt:'cdc' als prefix plus de oude schemanaam plus ”_” en het einde “_CT”. De originele tabel is bijvoorbeeld dbo.Example, dan heet de systeemtabel cdc.dbo_Example_CT). Het slaat alle gegevens op die zijn gewijzigd.

Overweeg het voorbeeld om dieper in CDC te graven. Maar zorg er eerst voor dat SQL Agent die CDC gebruikt, werkt op de SQL Server-testinstantie.

Daarnaast gaan we een script overwegen dat een database en testtabel maakt, deze tabel vult met gegevens en CDC voor deze tabel inschakelt.

Om de taak te begrijpen en te vereenvoudigen, gebruiken we één SQL Server-instantie zonder de bron- en doeldatabases naar verschillende servers te distribueren.

use master
go
-- create a source database
if not exists (select * from sys.databases where name = 'db_src_cdc')
	create database db_src_cdc
go
use db_src_cdc
go
-- enable CDC if it is disabled
if not exists (select * from sys.databases where name = db_name() and is_cdc_enabled=1)
	exec sys.sp_cdc_enable_db
go
-- create a role for tables with CDC
if not exists(select * from sys.sysusers where name = 'CDC_Reader' and issqlrole=1)
	create role CDC_Reader
go
-- create a table
if object_id('dbo.Example','U') is null
	create table dbo.Example
	(
		ID int identity constraint PK_Example primary key,
		Title varchar(200) not null
	)
go
-- populate the table
insert dbo.Example (Title) values
('One'),('Two'),('Three'),('Four'),('Five');
go
-- enable CDC for the table
if not exists (select * from sys.tables where is_tracked_by_cdc = 1 and name = 'Example')
	exec sys.sp_cdc_enable_table
		@source_schema = 'dbo',
		@source_name = 'Example',
		@role_name = 'CDC_Reader'
go
-- populate the table with some data. We will change or delete something
update dbo.Example
set Title = reverse(Title)
where ID in (2,3,4);

delete from dbo.Example where ID in (1,2);

set identity_insert dbo.Example on;
insert dbo.Example (ID, Title) values
(1,'One'),(6,'Six');
set identity_insert dbo.Example off;
go

Laten we nu eens kijken wat we hebben na het uitvoeren van dit script in de tabellen dbo.Example en cdc.dbo_Example_CT (opgemerkt moet worden dat CDC asynchroon is. Gegevens worden ingevuld in de tabellen waar het bijhouden van wijzigingen na een bepaalde tijdsperiode wordt opgeslagen ).

select * from dbo.Example;
ID Title
---- ----------------------
   1 One
   3 eerhT 
   4 ruoF 
   5 Five 
   6 Six
select
	row_number() over
		(
			partition by ID
			order by
				__$start_lsn desc,
				__$seqval desc
		) as __$rn,
	*
from cdc.dbo_Example_CT;
__$rn __$start_lsn           __$end_lsn  __$seqval              __$operation __$update_mask    ID Title
------ ---------------------- ----------- ---------------------- ------------ ---------------- --- -----------
     1 0x0000003A000000580005 NULL        0x0000003A000000580003            2 0x03               1 One
     2 0x0000003A000000560006 NULL        0x0000003A000000560002            1 0x03               1 One
     1 0x0000003A000000560006 NULL        0x0000003A000000560005            1 0x03               2 owT
     2 0x0000003A000000540005 NULL        0x0000003A000000540002            3 0x02               2 Two
     3 0x0000003A000000540005 NULL        0x0000003A000000540002            4 0x02               2 owT
     1 0x0000003A000000540005 NULL        0x0000003A000000540003            3 0x02               3 Three
     2 0x0000003A000000540005 NULL        0x0000003A000000540003            4 0x02               3 eerhT
     1 0x0000003A000000540005 NULL        0x0000003A000000540004            3 0x02               4 Four
     2 0x0000003A000000540005 NULL        0x0000003A000000540004            4 0x02               4 ruoF
     1 0x0000003A000000580005 NULL        0x0000003A000000580004            2 0x03

Overweeg in detail de tabelstructuur waarin het bijhouden van wijzigingen is opgeslagen. De velden __ $start_lsn en __ $seqval zijn respectievelijk LSN (logvolgnummer in de database) en het transactienummer binnen de transactie. Er is een belangrijke eigenschap in deze velden, namelijk dat we er zeker van kunnen zijn dat het record met een hogere LSN later zal worden uitgevoerd. Dankzij deze eigenschap kunnen we gemakkelijk de laatste status van elk record in de query krijgen, door onze selectie te filteren op de voorwaarde - waarbij __ $ rn =1.

Het veld __$operation bevat de transactiecode:

  • 1 – het record is verwijderd
  • 2 – het record is ingevoegd
  • 3, 4 – het record is bijgewerkt. De oude gegevens vóór de update zijn 3, de nieuwe gegevens zijn 4.

Naast servicevelden met voorvoegsel «__$», zijn de velden van de oorspronkelijke tabel volledig gedupliceerd. Deze informatie is voldoende voor ons om over te gaan tot de incrementele belasting.

Een database opzetten om gegevens te laden

Maak een tabel in onze testdoeldatabase waarin gegevens worden geladen, evenals een extra tabel om gegevens over het laadlogboek op te slaan.

use master
go
-- create a target database 
if not exists (select * from sys.databases where name = 'db_dst_cdc')
	create database db_dst_cdc
go
use db_dst_cdc
go
-- create a table
if object_id('dbo.Example','U') is null
	create table dbo.Example
	(
		ID int constraint PK_Example primary key,
		Title varchar(200) not null
	)
go
-- create a table to store the load log
if object_id('dbo.log_cdc','U') is null
	create table dbo.log_cdc
	(
		table_name nvarchar(512) not null,
		dt datetime not null default getdate(),
		lsn binary(10) not null default(0x0),
		constraint pk_log_cdc primary key (table_name,dt desc)
	)
go

Ik wil uw aandacht vestigen op de velden van de LOG_CDC-tabel:

  • TABLE_NAME slaat informatie op over welke tabel is geladen (het is mogelijk om in de toekomst meerdere tabellen te laden, van verschillende databases of zelfs van verschillende servers; het tabelformaat is 'SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME'
  • DT is een veld van de laaddatum en -tijd, die optioneel is voor de incrementele lading. Het zal echter nuttig zijn voor het controleren van het laden.
  • LSN – nadat een tabel is geladen, moeten we, indien nodig, informatie opslaan over de plaats waar de volgende lading moet worden gestart. Dienovereenkomstig voegen we na elke lading de laatste (maximale) __ $ start_lsn toe aan deze kolom.

Algoritme voor het laden van gegevens

Zoals hierboven beschreven, kunnen we met behulp van de query de laatste status van de tabel krijgen met behulp van vensterfuncties. Als we LSN kennen van de laatste lading, kunnen we de volgende keer dat we laden alle gegevens uit de bron filteren, waarvan de wijzigingen hoger zijn dan de opgeslagen LSN, als er ten minste één volledige vorige lading was:

with incr_Example as
(
	select
		row_number() over
			(
				partition by ID
				order by
					__$start_lsn desc,
					__$seqval desc
			) as __$rn,
		*
	from db_src_cdc.cdc.dbo_Example_CT
	where
		__$operation <> 3 and
		__$start_lsn > @lsn
)
select * from incr_Example

Dan kunnen we alle records voor de volledige lading krijgen, als de laad-LSN niet is opgeslagen:

with incr_Example as
(
	select
		row_number() over
			(
				partition by ID
				order by
					__$start_lsn desc,
					__$seqval desc
			) as __$rn,
		*
	from db_src_cdc.cdc.dbo_Example_CT
	where
		__$operation <> 3 and
		__$start_lsn > @lsn
)
, full_Example as
(
	select *
	from db_src_cdc.dbo.Example
	where @lsn is null
)
select
	ID, Title, __$operation
from incr_Example
where __$rn = 1
union all
select
	ID, Title, 2 as __$operation
from full_Example

Dus, afhankelijk van de @LSN-waarde, zal deze query ofwel alle laatste wijzigingen weergeven (de tussentijdse overslaan) met de status Verwijderd of niet, of alle gegevens uit de originele tabel, waarbij status 2 (nieuw record) wordt toegevoegd – dit veld wordt alleen gebruikt voor het verenigen van twee selecties. Met deze query kunnen we eenvoudig volledig laden of opnieuw laden implementeren met behulp van het MERGE-commando (vanaf SQL 2008-versie).

Om knelpunten te voorkomen die alternatieve processen kunnen creëren en om overeenkomende gegevens uit verschillende tabellen te laden (in de toekomst zullen we verschillende tabellen laden en mogelijk kunnen er relationele relaties tussen zijn), raad ik aan om een ​​DB-snapshot op de brondatabase te gebruiken ( een andere SQL 2008-functie).

De volledige tekst van de lading is als volgt:

[uitbreiden title=”Code”]

/*
	Algorithm of data loading
*/
-- create a database snapshot
if exists (select * from sys.databases where name = 'db_src_cdc_ss' )
	drop database db_src_cdc_ss;

declare
	@query nvarchar(max);

select
	@query = N'create database db_src_cdc_ss on ( name = N'''+name+
		''', filename = N'''+[filename]+'.ss'' ) as snapshot of db_src_cdc'
from db_src_cdc.sys.sysfiles where groupid = 1;

exec ( @query );

-- read LSN from the previous load
declare @lsn binary(10) =
	(select max(lsn) from db_dst_cdc.dbo.log_cdc
	where table_name = 'localhost.db_src_cdc.dbo.Example');

-- clear a table before the complete load
if @lsn is null truncate table db_dst_cdc.dbo.Example;

-- load process
with incr_Example as
(
	select
		row_number() over
			(
				partition by ID
				order by
					__$start_lsn desc,
					__$seqval desc
			) as __$rn,
		*
	from db_src_cdc_ss.cdc.dbo_Example_CT
	where
		__$operation <> 3 and
		__$start_lsn > @lsn
)
, full_Example as
(
	select *
	from db_src_cdc_ss.dbo.Example
	where @lsn is null
)
, cte_Example as
(
	select
		ID, Title, __$operation
	from incr_Example
	where __$rn = 1
	union all
	select
		ID, Title, 2 as __$operation
	from full_Example
)
merge db_dst_cdc.dbo.Example as trg using cte_Example as src on trg.ID=src.ID
when matched and __$operation = 1 then delete
when matched and __$operation <> 1 then update set trg.Title = src.Title
when not matched by target and __$operation <> 1 then insert (ID, Title) values (src.ID, src.Title);

-- mark the end of the load process and the latest LSN
insert db_dst_cdc.dbo.log_cdc (table_name, lsn)
values ('localhost.db_src_cdc.dbo.Example', isnull((select max(__$start_lsn) from db_src_cdc_ss.cdc.dbo_Example_CT),0))

-- delete the database snapshot
if exists (select * from sys.databases where name = 'db_src_cdc_ss' )
	drop database db_src_cdc_ss

[/uitbreiden]


  1. Tips voor het upgraden van Percona XtraDB Cluster naar 8.0

  2. Waarom klaagt Oracle 10g niet over dubbelzinnigheid van kolommen?

  3. ATAN2() Functie in Oracle

  4. BUITENLANDSE SLEUTEL OP DELETE RESTRICT-fout - Oracle