sql >> Database >  >> RDS >> Mysql

Python en MySQL gebruiken in het ETL-proces:Python en SQLAlchemy gebruiken

In de vorige twee artikelen van deze serie hebben we besproken hoe Python en SQLAlchemy kunnen worden gebruikt om het ETL-proces uit te voeren. Vandaag doen we hetzelfde, maar deze keer met Python en SQL Alchemy zonder SQL-commando's in tekstvorm. Hierdoor kunnen we SQLAlchemy gebruiken, ongeacht de database-engine waarmee we zijn verbonden. Laten we beginnen.

Vandaag bespreken we hoe u het ETL-proces kunt uitvoeren met Python en SQLAlchemy. We zullen een script maken om dagelijkse gegevens uit onze operationele database te extraheren, deze te transformeren en vervolgens in ons datawarehouse te laden.

Dit is het derde artikel in de reeks. Als je de eerste twee artikelen (Python en MySQL gebruiken in het ETL-proces en SQLAlchemy) nog niet hebt gelezen, raad ik je ten zeerste aan dit te doen voordat je verdergaat.

Deze hele serie is een vervolg op onze datawarehouse-serie:

  • Een DWH maken, deel één:een bedrijfsgegevensmodel voor abonnementen
  • Een DWH maken, deel twee:een bedrijfsgegevensmodel voor abonnementen
  • Een datawarehouse maken, deel 3:een bedrijfsgegevensmodel voor abonnementen

Oké, laten we beginnen met het onderwerp van vandaag. Laten we eerst eens kijken naar de datamodellen.

De gegevensmodellen



Operationeel (live) databasegegevensmodel




DWH-gegevensmodel


Dit zijn de twee datamodellen die we gaan gebruiken. Bekijk deze artikelen voor meer informatie over datawarehouses (DWH's):

  • Het sterrenschema
  • Het sneeuwvlokschema
  • Sterschema vs. Sneeuwvlokschema

Waarom SQLAlchemy?

Het hele idee achter SQLAlchemy is dat nadat we databases hebben geïmporteerd, we geen SQL-code nodig hebben die specifiek is voor de gerelateerde database-engine. In plaats daarvan kunnen we objecten importeren in SQLAlchemy en de SQLAlchemy-syntaxis gebruiken voor instructies. Dat stelt ons in staat om dezelfde taal te gebruiken, ongeacht met welke database-engine we zijn verbonden. Het belangrijkste voordeel hier is dat een ontwikkelaar niet voor de verschillen tussen verschillende database-engines hoeft te zorgen. Uw SQLAlchemy-programma werkt precies hetzelfde (met kleine wijzigingen) als u migreert naar een andere database-engine.

Ik heb besloten om alleen SQLAlchemy-opdrachten en Python-lijsten te gebruiken om te communiceren met tijdelijke opslag en tussen verschillende databases. De belangrijkste redenen voor deze beslissing zijn dat 1) Python-lijsten bekend zijn en 2) de code leesbaar zou zijn voor mensen zonder Python-vaardigheden.

Dit wil niet zeggen dat SQLAlchemy perfect is. Het heeft bepaalde beperkingen, die we later zullen bespreken. Laten we voor nu even naar de onderstaande code kijken:

Het script en het resultaat uitvoeren

Dit is de Python-opdracht die wordt gebruikt om ons script aan te roepen. Het script controleert de gegevens in de operationele database, vergelijkt de waarden met de DWH en importeert de nieuwe waarden. In dit voorbeeld werken we waarden bij in twee dimensietabellen en één feitentabel; het script retourneert de juiste uitvoer. Het hele script is zo geschreven dat je het meerdere keren per dag kunt uitvoeren. Het zal de "oude" gegevens voor die dag verwijderen en vervangen door nieuwe.

Laten we het hele script analyseren, te beginnen bij het begin.

SQLAlchemy importeren

Het eerste dat we moeten doen, is de modules importeren die we in het script zullen gebruiken. Gewoonlijk importeert u uw modules terwijl u het script schrijft. In de meeste gevallen weet u in het begin niet precies welke modules u nodig heeft.

from datetime import date

# import SQLAlchemy
from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case

We hebben de datetime van Python geïmporteerd module, die ons lessen geeft die met datums werken.

Vervolgens hebben we de sqlalchemy module. We importeren niet de hele module, alleen de dingen die we nodig hebben – sommige specifiek voor SQLAlchemy (create_engine , MetaData , Table ), sommige delen van de SQL-instructie (select , and_ , case ), en func , waarmee we functies als count() . kunnen gebruiken en sum() .

Verbinding maken met de databases

We moeten verbinding maken met twee databases op onze server. We kunnen indien nodig verbinding maken met meer databases (MySQL, SQL Server of andere) vanaf verschillende servers. In dit geval zijn beide databases MySQL-databases en worden ze op mijn lokale computer opgeslagen.

# connect to databases
engine_live = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_live')
connection_live = engine_live.connect()
engine_dwh = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_dwh')
connection_dwh = engine_dwh.connect()

metadata = MetaData(bind=None)

We hebben twee engines en twee verbindingen gemaakt. Ik zal hier niet in details treden omdat we dit al in het vorige artikel hebben uitgelegd.

De dim_time bijwerken Afmeting

Doel:de datum van gisteren invoegen als deze nog niet in de tabel staat.

In ons script werken we twee dimensietabellen bij met nieuwe waarden. De rest volgt hetzelfde patroon, dus we zullen dit maar één keer bespreken; we hoeven bijna identieke code niet nog een paar keer op te schrijven.

Het idee is heel eenvoudig. We zullen altijd het script uitvoeren om nieuwe gegevens voor gisteren in te voegen. Daarom moeten we controleren of die datum in de dimensietabel is ingevoegd. Als het er al is, doen we niets; als dat niet het geval is, voegen we het toe. Laten we eens kijken naar de code om de dim_time . bij te werken tafel.

Eerst controleren we of de datum bestaat. Als het niet bestaat, voegen we het toe. We beginnen met het opslaan van de datum van gisteren in een variabele. In Python doe je het op deze manier:

yesterday = date.fromordinal(date.today().toordinal()-1)
yesterday_str = str(yesterday)

De eerste regel neemt een huidige datum, converteert deze naar een numerieke waarde, trekt 1 af van die waarde en converteert die numerieke waarde terug naar een datum (gisteren =vandaag – 1 ). De tweede regel slaat de datum op in tekstformaat.

Vervolgens testen we of de datum al in de database staat:

table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh)
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str)
result = connection_dwh.execute(stmt).fetchall()
date_exists = len(result)

Na het laden van de tabel voeren we een query uit die alle rijen uit de dimensietabel moet retourneren waarbij de tijd-/datumwaarde gelijk is aan gisteren. Het resultaat kan 0 hebben (niet zo'n datum in de tabel) of 1 rij (de datum staat al in de tabel).

Als de datum nog niet in de tabel staat, gebruiken we de opdracht insert() om deze toe te voegen:

if date_exists == 0:
  print("New value added.")
  stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday())
  connection_dwh.execute(stmt)
else:
  print("No new values.")

Een nieuw ding dat ik hier wil noemen, is het gebruik van. .year , .month , .isocalendar()[1] , en .weekday om dateparts te krijgen.

De dim_city bijwerken Afmeting

Doel:voeg nieuwe steden in als die er zijn (d.w.z. vergelijk de lijst met steden in de live database met de lijst met steden in het DWH en voeg ontbrekende toe).

Het bijwerken van de dim_time dimensie was vrij eenvoudig. We hebben eenvoudigweg getest of er een datum in de tabel stond en hebben deze ingevoegd als deze er nog niet was. Om een ​​waarde in de DWH-database te testen, hebben we een Python-variabele gebruikt (gisteren ). We zullen dat proces opnieuw gebruiken, maar deze keer met lijsten.

Aangezien er geen gemakkelijke manier is om tabellen uit verschillende databases te combineren in een enkele SQLAlchemy-query, kunnen we de benadering die wordt beschreven in deel 1 van deze serie niet gebruiken. Daarom hebben we een object nodig om de waarden op te slaan die nodig zijn om te communiceren tussen deze twee databases. Ik heb besloten om lijsten te gebruiken, omdat ze veel voorkomen en hun werk goed doen.

Eerst laden we het country en city tabellen van een live database naar de relevante objecten.

# dim_city
print("\nUpdating... dim_city")
table_city = Table('city', metadata, autoload = True, autoload_with = engine_live)
table_country = Table('country', metadata, autoload = True, autoload_with = engine_live)
table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)

Vervolgens laden we de dim_city tabel van het DWH in een lijst:

# load whole dwh table in the list
stmt = select([table_dim_city]);
table_dim_city_list = connection_dwh.execute(stmt).fetchall()

Vervolgens doen we hetzelfde voor de waarden uit de live database. We sluiten aan bij de tafels country en city dus we hebben alle benodigde gegevens in deze lijst:

# load all live values in the list
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\
	.select_from(table_city\
	.join(table_country))
table_city_list = connection_live.execute(stmt).fetchall()

Nu gaan we door de lijst met de gegevens uit de live database. Voor elke record vergelijken we waarden (city_name , postal_code , en country_name ). Als we dergelijke waarden niet vinden, voegen we een nieuwe record toe aan de dim_city tafel.

# loop through live_db table
# for each record test if it is missing in the dwh table
new_values_added = 0
for city in table_city_list:
	id = -1;
	for dim_city in table_dim_city_list:
		if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]:
			id = dim_city[0]
	if id == -1:
		stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2])
		connection_dwh.execute(stmt)
		new_values_added = 1
if new_values_added == 0:
	print("No new values.")
else:
	print("New value(s) added.")

Om te bepalen of de waarde al in de DWH staat, hebben we een combinatie van kenmerken getest die uniek moet zijn. (De primaire sleutel uit de live database helpt ons hier niet veel.) We kunnen vergelijkbare code gebruiken om andere woordenboeken bij te werken. Het is niet de mooiste oplossing, maar het is nog steeds een mooie elegante. En het zal precies doen wat we nodig hebben.

Bijwerken van de fact_customer_subscribe Tabel

Doel:als we oude gegevens hebben voor de datum van gisteren, verwijder deze dan eerst. Voeg de gegevens van gisteren toe aan het DWH - ongeacht of we iets in de vorige stap hebben verwijderd of niet.

Nadat we alle dimensietabellen hebben bijgewerkt, moeten we de feitentabellen bijwerken. In ons script werken we slechts één feitentabel bij. De redenering is dezelfde als in de vorige sectie:het bijwerken van andere tabellen zou hetzelfde patroon volgen, dus we zouden de code meestal herhalen.

Voordat we waarden in de feitentabel invoegen, moeten we de waarden van de gerelateerde sleutels uit de dimensietabellen weten. Om dat te doen, laden we dimensies opnieuw in lijsten en vergelijken ze met waarden uit de live database.

Het eerste wat we zullen doen is de klant laden en fact_customer_subscribed tabellen in objecten:

# fact_customer_subscribed
print("\nUpdating... fact_customer_subscribed")

table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live)
table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)

Nu moeten we sleutels vinden voor de gerelateerde tijddimensie. Aangezien we altijd gegevens voor gisteren invoegen, zoeken we naar die datum in de dim_time tafel en gebruik zijn ID. De query retourneert 1 rij en de ID staat op de eerste positie (de index begint bij 0, dus dat is result[0][0] ):

# find key for the dim_time dimension
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday)
result = connection_dwh.execute(stmt).fetchall()
dim_time_id = result[0][0]

Voor die tijd zullen we alle bijbehorende records uit de feitentabel verwijderen:

# delete any existing data in the fact table for that time dimension value
stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id)
connection_dwh.execute(stmt)

Ok, nu hebben we de ID van de tijddimensie opgeslagen in de dim_time_id variabel. Dit was gemakkelijk omdat we slechts één tijddimensiewaarde kunnen hebben. Voor de stadsdimensie zal het verhaal anders zijn. Eerst laden we alle de waarden die we nodig hebben – waarden die de stad op unieke wijze beschrijven (niet de ID), en geaggregeerde waarden:

# prepare data for insert
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\
	.select_from(table_customer\
	.join(table_city)\
	.join(table_country))\
	.group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)

Er zijn een paar dingen die ik wil benadrukken over de bovenstaande vraag:

  • func.sum(...) is SUM(...) van “standaard SQL”.
  • De case(...) syntaxis gebruikt and_ vóór voorwaarden, niet tussen hen.
  • .label(...) functioneert als een SQL AS-alias.
  • We gebruiken \ om naar de volgende regel te gaan en de leesbaarheid van de query te vergroten. (Geloof me, het is vrijwel onleesbaar zonder de schuine streep - ik heb het geprobeerd :) )
  • .group_by(...) speelt de rol van SQL's GROUP BY.

Vervolgens doorlopen we elke record die is geretourneerd met de vorige query. Voor elke record vergelijken we waarden die een stad uniek definiëren (city_name , postal_code , country_name ) met de waarden die zijn opgeslagen in de lijst die is gemaakt op basis van de DWH dim_city tafel. Als alle drie de waarden overeenkomen, slaan we de ID uit de lijst op en gebruiken deze bij het invoegen van nieuwe gegevens. Op deze manier hebben we voor elk record ID's voor beide dimensies:

# loop through all new records
# use time dimension
# for each record find key for city dimension
# insert row
new_values = connection_live.execute(stmt).fetchall()
for new_value in new_values:
	dim_city_id = -1;
	for dim_city in table_dim_city_list:
		if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]:
			dim_city_id = dim_city[0]
	if dim_city_id > 0:	
		stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6])
		connection_dwh.execute(stmt_insert)
		dim_city_id = -1
print("Completed.")

En dat is alles. We hebben onze DWH bijgewerkt. Het script zou veel langer zijn als we alle dimensie- en feitentabellen zouden bijwerken. De complexiteit zou ook groter zijn wanneer een feitentabel is gerelateerd aan meer dimensietabellen. In dat geval hebben we een voor . nodig lus voor elke dimensietabel.

Dit werkt niet!

Ik was erg teleurgesteld toen ik dit script schreef en er toen achter kwam dat zoiets niet werkt:

stmt = select([table_city.columns.city_name])\
	.select_from(table_city\
	.outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\
	.where(table_dim_city.columns.id.is_(None))

In dit voorbeeld probeer ik tabellen uit twee verschillende databases te gebruiken. Als we twee afzonderlijke verbindingen tot stand brengen, "ziet" de eerste verbinding geen tabellen van een andere verbinding. Als we rechtstreeks verbinding maken met de server en niet met een database, kunnen we geen tabellen laden.

Totdat dit verandert (hopelijk snel), moet je een soort structuur gebruiken (bijvoorbeeld wat we vandaag hebben gedaan) om tussen de twee databases te communiceren. Dit bemoeilijkt de code, omdat u een enkele zoekopdracht moet vervangen door twee lijsten en genest for lussen.

Deel uw mening over SQLAlchemy en Python

Dit was het laatste artikel in deze serie. Maar wie weet? Misschien proberen we een andere aanpak in komende artikelen, dus houd ons in de gaten. Deel in de tussentijd uw mening over SQLAlchemy en Python in combinatie met databases. Wat denk je dat we missen in dit artikel? Wat zou je toevoegen? Vertel het ons in de reacties hieronder.

Je kunt het volledige script dat we in dit artikel hebben gebruikt hier downloaden.

En speciale dank gaat uit naar Dirk J Bosman (@dirkjobosman), die deze serie artikelen heeft aanbevolen.


  1. Identiteitszaad resetten na het verwijderen van records in SQL Server

  2. Waarom geen uitvoer wanneer PLSQL Anonymous-blok is voltooid?

  3. SQL Server recursieve query

  4. SQL Server Express-back-updatabase | Hoe u SQL Express-back-up automatiseert en opschoont