Dit artikel is interessant voor diegenen die vaak te maken hebben met data-integratie.
Inleiding
Stel dat er een database is waar gebruikers altijd gegevens wijzigen (bijwerken of verwijderen). Misschien wordt deze database gebruikt door een grote applicatie die het niet toestaat om de tabelstructuur te wijzigen. De taak is om van tijd tot tijd gegevens uit deze database naar een andere database op een andere server te laden. De eenvoudigste manier om het probleem aan te pakken, is door de nieuwe gegevens van een brondatabase naar een doeldatabase te laden en de doeldatabase vooraf op te schonen. U kunt deze methode gebruiken zolang de tijd om gegevens te laden acceptabel is en de vooraf ingestelde deadlines niet overschrijdt. Wat als het meerdere dagen duurt om gegevens te laden? Bovendien leiden onstabiele communicatiekanalen tot de situatie dat het laden van gegevens stopt en opnieuw wordt gestart. Als u met deze obstakels wordt geconfronteerd, raad ik u aan een van de 'data reloading'-algoritmen te overwegen. Dit betekent dat alleen gegevenswijzigingen hebben plaatsgevonden sinds de laatste lading is geladen.
CDC
In SQL Server 2008 introduceerde Microsoft een datatrackingmechanisme genaamd Change Data Capture (CDC). In grote lijnen is het doel van dit mechanisme dat het inschakelen van CDC voor elke databasetabel een systeemtabel in dezelfde database zal creëren met een vergelijkbare naam als de originele tabel (het schema is als volgt:'cdc' als prefix plus de oude schemanaam plus ”_” en het einde “_CT”. De originele tabel is bijvoorbeeld dbo.Example, dan heet de systeemtabel cdc.dbo_Example_CT). Het slaat alle gegevens op die zijn gewijzigd.
Overweeg het voorbeeld om dieper in CDC te graven. Maar zorg er eerst voor dat SQL Agent die CDC gebruikt, werkt op de SQL Server-testinstantie.
Daarnaast gaan we een script overwegen dat een database en testtabel maakt, deze tabel vult met gegevens en CDC voor deze tabel inschakelt.
Om de taak te begrijpen en te vereenvoudigen, gebruiken we één SQL Server-instantie zonder de bron- en doeldatabases naar verschillende servers te distribueren.
use master go -- create a source database if not exists (select * from sys.databases where name = 'db_src_cdc') create database db_src_cdc go use db_src_cdc go -- enable CDC if it is disabled if not exists (select * from sys.databases where name = db_name() and is_cdc_enabled=1) exec sys.sp_cdc_enable_db go -- create a role for tables with CDC if not exists(select * from sys.sysusers where name = 'CDC_Reader' and issqlrole=1) create role CDC_Reader go -- create a table if object_id('dbo.Example','U') is null create table dbo.Example ( ID int identity constraint PK_Example primary key, Title varchar(200) not null ) go -- populate the table insert dbo.Example (Title) values ('One'),('Two'),('Three'),('Four'),('Five'); go -- enable CDC for the table if not exists (select * from sys.tables where is_tracked_by_cdc = 1 and name = 'Example') exec sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'Example', @role_name = 'CDC_Reader' go -- populate the table with some data. We will change or delete something update dbo.Example set Title = reverse(Title) where ID in (2,3,4); delete from dbo.Example where ID in (1,2); set identity_insert dbo.Example on; insert dbo.Example (ID, Title) values (1,'One'),(6,'Six'); set identity_insert dbo.Example off; go
Laten we nu eens kijken wat we hebben na het uitvoeren van dit script in de tabellen dbo.Example en cdc.dbo_Example_CT (opgemerkt moet worden dat CDC asynchroon is. Gegevens worden ingevuld in de tabellen waar het bijhouden van wijzigingen na een bepaalde tijdsperiode wordt opgeslagen ).
select * from dbo.Example;
ID Title ---- ---------------------- 1 One 3 eerhT 4 ruoF 5 Five 6 Six
select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operation __$update_mask ID Title ------ ---------------------- ----------- ---------------------- ------------ ---------------- --- ----------- 1 0x0000003A000000580005 NULL 0x0000003A000000580003 2 0x03 1 One 2 0x0000003A000000560006 NULL 0x0000003A000000560002 1 0x03 1 One 1 0x0000003A000000560006 NULL 0x0000003A000000560005 1 0x03 2 owT 2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Two 3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT 1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Three 2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 eerhT 1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 Four 2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 ruoF 1 0x0000003A000000580005 NULL 0x0000003A000000580004 2 0x03
Overweeg in detail de tabelstructuur waarin het bijhouden van wijzigingen is opgeslagen. De velden __ $start_lsn en __ $seqval zijn respectievelijk LSN (logvolgnummer in de database) en het transactienummer binnen de transactie. Er is een belangrijke eigenschap in deze velden, namelijk dat we er zeker van kunnen zijn dat het record met een hogere LSN later zal worden uitgevoerd. Dankzij deze eigenschap kunnen we gemakkelijk de laatste status van elk record in de query krijgen, door onze selectie te filteren op de voorwaarde - waarbij __ $ rn =1.
Het veld __$operation bevat de transactiecode:
- 1 – het record is verwijderd
- 2 – het record is ingevoegd
- 3, 4 – het record is bijgewerkt. De oude gegevens vóór de update zijn 3, de nieuwe gegevens zijn 4.
Naast servicevelden met voorvoegsel «__$», zijn de velden van de oorspronkelijke tabel volledig gedupliceerd. Deze informatie is voldoende voor ons om over te gaan tot de incrementele belasting.
Een database opzetten om gegevens te laden
Maak een tabel in onze testdoeldatabase waarin gegevens worden geladen, evenals een extra tabel om gegevens over het laadlogboek op te slaan.
use master go -- create a target database if not exists (select * from sys.databases where name = 'db_dst_cdc') create database db_dst_cdc go use db_dst_cdc go -- create a table if object_id('dbo.Example','U') is null create table dbo.Example ( ID int constraint PK_Example primary key, Title varchar(200) not null ) go -- create a table to store the load log if object_id('dbo.log_cdc','U') is null create table dbo.log_cdc ( table_name nvarchar(512) not null, dt datetime not null default getdate(), lsn binary(10) not null default(0x0), constraint pk_log_cdc primary key (table_name,dt desc) ) go
Ik wil uw aandacht vestigen op de velden van de LOG_CDC-tabel:
- TABLE_NAME slaat informatie op over welke tabel is geladen (het is mogelijk om in de toekomst meerdere tabellen te laden, van verschillende databases of zelfs van verschillende servers; het tabelformaat is 'SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME'
- DT is een veld van de laaddatum en -tijd, die optioneel is voor de incrementele lading. Het zal echter nuttig zijn voor het controleren van het laden.
- LSN – nadat een tabel is geladen, moeten we, indien nodig, informatie opslaan over de plaats waar de volgende lading moet worden gestart. Dienovereenkomstig voegen we na elke lading de laatste (maximale) __ $ start_lsn toe aan deze kolom.
Algoritme voor het laden van gegevens
Zoals hierboven beschreven, kunnen we met behulp van de query de laatste status van de tabel krijgen met behulp van vensterfuncties. Als we LSN kennen van de laatste lading, kunnen we de volgende keer dat we laden alle gegevens uit de bron filteren, waarvan de wijzigingen hoger zijn dan de opgeslagen LSN, als er ten minste één volledige vorige lading was:
with incr_Example as ( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc.cdc.dbo_Example_CT where __$operation <> 3 and __$start_lsn > @lsn ) select * from incr_Example
Dan kunnen we alle records voor de volledige lading krijgen, als de laad-LSN niet is opgeslagen:
with incr_Example as ( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc.cdc.dbo_Example_CT where __$operation <> 3 and __$start_lsn > @lsn ) , full_Example as ( select * from db_src_cdc.dbo.Example where @lsn is null ) select ID, Title, __$operation from incr_Example where __$rn = 1 union all select ID, Title, 2 as __$operation from full_Example
Dus, afhankelijk van de @LSN-waarde, zal deze query ofwel alle laatste wijzigingen weergeven (de tussentijdse overslaan) met de status Verwijderd of niet, of alle gegevens uit de originele tabel, waarbij status 2 (nieuw record) wordt toegevoegd – dit veld wordt alleen gebruikt voor het verenigen van twee selecties. Met deze query kunnen we eenvoudig volledig laden of opnieuw laden implementeren met behulp van het MERGE-commando (vanaf SQL 2008-versie).
Om knelpunten te voorkomen die alternatieve processen kunnen creëren en om overeenkomende gegevens uit verschillende tabellen te laden (in de toekomst zullen we verschillende tabellen laden en mogelijk kunnen er relationele relaties tussen zijn), raad ik aan om een DB-snapshot op de brondatabase te gebruiken ( een andere SQL 2008-functie).
De volledige tekst van de lading is als volgt:
[uitbreiden title=”Code”]
/* Algorithm of data loading */ -- create a database snapshot if exists (select * from sys.databases where name = 'db_src_cdc_ss' ) drop database db_src_cdc_ss; declare @query nvarchar(max); select @query = N'create database db_src_cdc_ss on ( name = N'''+name+ ''', filename = N'''+[filename]+'.ss'' ) as snapshot of db_src_cdc' from db_src_cdc.sys.sysfiles where groupid = 1; exec ( @query ); -- read LSN from the previous load declare @lsn binary(10) = (select max(lsn) from db_dst_cdc.dbo.log_cdc where table_name = 'localhost.db_src_cdc.dbo.Example'); -- clear a table before the complete load if @lsn is null truncate table db_dst_cdc.dbo.Example; -- load process with incr_Example as ( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc_ss.cdc.dbo_Example_CT where __$operation <> 3 and __$start_lsn > @lsn ) , full_Example as ( select * from db_src_cdc_ss.dbo.Example where @lsn is null ) , cte_Example as ( select ID, Title, __$operation from incr_Example where __$rn = 1 union all select ID, Title, 2 as __$operation from full_Example ) merge db_dst_cdc.dbo.Example as trg using cte_Example as src on trg.ID=src.ID when matched and __$operation = 1 then delete when matched and __$operation <> 1 then update set trg.Title = src.Title when not matched by target and __$operation <> 1 then insert (ID, Title) values (src.ID, src.Title); -- mark the end of the load process and the latest LSN insert db_dst_cdc.dbo.log_cdc (table_name, lsn) values ('localhost.db_src_cdc.dbo.Example', isnull((select max(__$start_lsn) from db_src_cdc_ss.cdc.dbo_Example_CT),0)) -- delete the database snapshot if exists (select * from sys.databases where name = 'db_src_cdc_ss' ) drop database db_src_cdc_ss
[/uitbreiden]