sql >> Database >  >> RDS >> PostgreSQL

INSERT of UPDATE bulkgegevens van dataframe/CSV naar PostgreSQL-database

In dit specifieke geval is het beter om naar het DB-API-niveau te gaan, omdat je een aantal tools nodig hebt die zelfs niet rechtstreeks door SQLAlchemy Core worden weergegeven, zoals copy_expert() . Dat kan met raw_connection() . Als je brongegevens een CSV-bestand zijn, heb je in dit geval helemaal geen panda's nodig. Begin met het maken van een tijdelijke verzameltabel, kopieer gegevens naar de tijdelijke tabel en voeg deze in de doeltabel in met conflicthantering:

conn = engine.raw_connection()

try:
    with conn.cursor() as cur:
        cur.execute("""CREATE TEMPORARY TABLE TEST_STAGING ( LIKE TEST_TABLE )
                       ON COMMIT DROP""")

        with open("your_source.csv") as data:
            cur.copy_expert("""COPY TEST_STAGING ( itemid, title, street, pincode )
                               FROM STDIN WITH CSV""", data)

        cur.execute("""INSERT INTO TEST_TABLE ( itemid, title, street, pincode )
                       SELECT itemid, title, street, pincode
                       FROM TEST_STAGING
                       ON CONFLICT ( itemid )
                       DO UPDATE SET title = EXCLUDED.title
                                   , street = EXCLUDED.street
                                   , pincode = EXCLUDED.pincode""")

except:
    conn.rollback()
    raise

else:
    conn.commit()

finally:
    conn.close()

Als uw brongegevens daarentegen het DataFrame zijn, , kunt u nog steeds COPY . gebruiken door een functie door te geven als method= naar to_sql() . De functie zou zelfs alle bovenstaande logica kunnen verbergen:

import csv

from io import StringIO
from psycopg2 import sql

def psql_upsert_copy(table, conn, keys, data_iter):
    dbapi_conn = conn.connection

    buf = StringIO()
    writer = csv.writer(buf)
    writer.writerows(data_iter)
    buf.seek(0)

    if table.schema:
        table_name = sql.SQL("{}.{}").format(
            sql.Identifier(table.schema), sql.Identifier(table.name))
    else:
        table_name = sql.Identifier(table.name)

    tmp_table_name = sql.Identifier(table.name + "_staging")
    columns = sql.SQL(", ").join(map(sql.Identifier, keys))

    with dbapi_conn.cursor() as cur:
        # Create the staging table
        stmt = "CREATE TEMPORARY TABLE {} ( LIKE {} ) ON COMMIT DROP"
        stmt = sql.SQL(stmt).format(tmp_table_name, table_name)
        cur.execute(stmt)

        # Populate the staging table
        stmt = "COPY {} ( {} ) FROM STDIN WITH CSV"
        stmt = sql.SQL(stmt).format(tmp_table_name, columns)
        cur.copy_expert(stmt, buf)

        # Upsert from the staging table to the destination. First find
        # out what the primary key columns are.
        stmt = """
               SELECT kcu.column_name
               FROM information_schema.table_constraints tco
               JOIN information_schema.key_column_usage kcu 
               ON kcu.constraint_name = tco.constraint_name
               AND kcu.constraint_schema = tco.constraint_schema
               WHERE tco.constraint_type = 'PRIMARY KEY'
               AND tco.table_name = %s
               """
        args = (table.name,)

        if table.schema:
            stmt += "AND tco.table_schema = %s"
            args += (table.schema,)

        cur.execute(stmt, args)
        pk_columns = {row[0] for row in cur.fetchall()}
        # Separate "data" columns from (primary) key columns
        data_columns = [k for k in keys if k not in pk_columns]
        # Build conflict_target
        pk_columns = sql.SQL(", ").join(map(sql.Identifier, pk_columns))

        set_ = sql.SQL(", ").join([
            sql.SQL("{} = EXCLUDED.{}").format(k, k)
            for k in map(sql.Identifier, data_columns)])

        stmt = """
               INSERT INTO {} ( {} )
               SELECT {}
               FROM {}
               ON CONFLICT ( {} )
               DO UPDATE SET {}
               """

        stmt = sql.SQL(stmt).format(
            table_name, columns, columns, tmp_table_name, pk_columns, set_)
        cur.execute(stmt)

U zou dan het nieuwe DataFrame . invoegen met behulp van

df.to_sql("test_table", engine,
          method=psql_upsert_copy,
          index=False,
          if_exists="append")

Met behulp van deze methode duurde het upsting van ~ 1.000.000 rijen ongeveer 16 seconden op deze machine met een lokale database.




  1. rare karakters zoals ‪ ‬ ‏

  2. Mysql-insert werkt niet en geeft geen fouten

  3. Hoe bereken ik de week van het jaar in Oracle met een niet-standaard eerste dag van de week?

  4. Hoe krijg ik de eerste en laatste datum van het lopende jaar?