De aanpak is zoals hierboven vermeld, met een S3-gebeurtenistrigger en een lambda-taak die luistert op de s3-bucket / objectlocatie. Zodra een bestand is geüpload naar de s3-locatie, wordt de lambda-taak uitgevoerd en in de lambda kunt u configureren om een AWS Glue-taak aan te roepen. Dit is precies wat we hebben gedaan en is succesvol live gegaan. Lambda heeft een levensduur van 15 minuten en het zou minder dan een minuut moeten duren om een lijmklus te activeren/starten.
Hier vindt u een voorbeeldbron ter referentie.
from __future__ import print_function
import json
import boto3
import time
import urllib
print('Loading function')
s3 = boto3.client('s3')
glue = boto3.client('glue')
def lambda_handler(event, context):
gluejobname="your-glue-job-name here"
try:
runId = glue.start_job_run(JobName=gluejobname)
status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
print("Job Status : ", status['JobRun']['JobRunState'])
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist '
'and your bucket is in the same region as this '
'function.'.format(source_bucket, source_bucket))
raise e
Voor het maken van een Lambda-functie gaat u naar AWS Lambdra->Maak een nieuwe functie vanaf Scratch->Selecteer S3 voor gebeurtenis en configureer vervolgens de S3-bucketlocaties, voorvoegsels zoals vereist. Kopieer vervolgens het bovenstaande codevoorbeeld, het inline codegebied en configureer de lijmtaaknaam indien nodig. Zorg ervoor dat u over alle vereiste IAM-rollen/toegangsinstellingen beschikt.
De lijmtaak moet voorzieningen hebben om verbinding te maken met uw Aurora, en dan kunt u de opdracht "LOAD FROM S3..." gebruiken die door Aurora wordt geleverd. Zorg ervoor dat alle instellingen/configuraties van parametergroepen naar behoefte worden uitgevoerd.
Laat het me weten als er problemen zijn.
UPDATE:VOORBEELD-codefragment voor LOAD FROM S3:
conn = mysql.connector.connect(host=url, user=uname, password=pwd, database=dbase)
cur = conn.cursor()
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS mydb.STG_TABLE;"
createStgTable2 = "CREATE TABLE mydb.STG_TABLE(COL1 VARCHAR(50) NOT NULL, COL2 VARCHAR(50), COL3 VARCHAR(50), COL4 CHAR(1) NOT NULL);"
loadQry = "LOAD DATA FROM S3 PREFIX 's3://<bucketname>/folder' REPLACE INTO TABLE mydb.STG_TABLE FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' IGNORE 1 LINES (@var1, @var2, @var3, @var4) SET col1= @var1, col2= @var2, col3= @var3, [email protected];"
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()