Zugriff auf Dateidaten von Cloudservices mit Snowpark Connect for Spark

Durch Snowpark Connect for Spark können Sie direkt mit externen Cloudspeichersystemen wie Amazon S3, Google Cloud Storage und Azure Blob interagieren. Sie können Daten aus dem Cloudspeicher in Snowflake einlesen, diese Daten verarbeiten und dann zurückschreiben.

Sie könnten Snowpark Connect for Spark zum Beispiel verwenden, um die folgenden Aufgaben auszuführen:

  • Rohdaten aufnehmen.

    Dateien (z. B. CSV, JSONund Parquet) in S3, Google Cloud oder Azure hochzuladen, bevor sie in Snowflake verschoben werden.

  • Daten für die nachgelagerte Verwendung exportieren.

    Verarbeitete Snowpark DataFrames für ML-Training, Freigabe für externe Partner oder weitere Spark-basierte Analysen zurück in den Cloudspeicher schreiben.

  • Hybride Pipelines erstellen.

    Einen Teil der Pipeline in Snowflake beibehalten, während die Kompatibilität mit vorhandenen Data Lakes erhalten bleibt.

  • Vorschriften erfüllen oder Kosten senken.

    Bestimmte Datensets aufgrund von Vorschriften, Governance oder Budgeteinschränkungen extern speichern.

Verwenden Sie die unter diesem Thema aufgeführten Schritte, um aus Dateien zu lesen bzw. in Dateien zu schreiben, die bei diesen Cloudserviceanbietern gespeichert sind. Sie können auf Dateien entweder über externe Snowflake-Stagingbereiche oder über den direkten Zugriff zugreifen.

Einschränkungen

Bei Verwendung von Snowpark Connect for Spark bei der Arbeit mit Cloudservices sollten Sie die folgenden Einschränkungen beachten:

  • Authentifizierung –Snowpark Connect for Spark verwaltet Cloud-Anmeldeinformationen nicht automatisch. Sie müssen die Zugriffsschlüssel (AWS), Speicherkontoschlüssel oder SAS-Token (Azure) konfigurieren oder externe Stagingbereiche selbst verwalten. Abgelaufene oder fehlende Anmeldeinformationen führen zu Lese-/Schreibfehlern.

  • Leistung – Cloud-E/A hängt von der Netzwerkbandbreite und der Latenz des Objektspeichers ab. Das Lesen vieler kleiner Dateien kann die Leistung erheblich beeinträchtigen.

  • Formatunterstützung – Stellen Sie sicher, dass die Dateiformate, die Sie lesen und schreiben, unterstützt werden. Derzeit hat Snowpark Connect for Spark Parität mit gängigen Formaten, einschließlich TEXT, CSV, JSONund Parquet. Erweiterte Features (wie Parquet-Partitionserkennung und JSON-Schemaentwicklung) können sich von Spark unterscheiden.

  • Berechtigungen und Richtlinien – Das Schreiben in Cloud-Buckets erfordert entsprechende IAM/ACL-Richtlinien. Möglicherweise stoßen Sie auf einen AccessDenied-Fehler, wenn die Richtlinien nicht zwischen Snowflake-Rollen und Cloud-Anmeldeinformationen abgeglichen werden.

Best Practices

Um die zuverlässigste Integration zu erhalten, die gut funktioniert, befolgen Sie die folgenden Best Practices:

  • Verwenden Sie sichere, temporäre Anmeldeinformationen und ändern Sie die Anmeldeinformationen häufig.

  • Partitionieren Sie Daten und bilden Sie Daten-Buckets.

    Wenn Sie Parquet schreiben, partitionieren Sie auf häufig gefilterte Spalten, um die Kosten für das Scannen zu reduzieren. Verwenden Sie weniger, größere Dateien (z. B. mit jeweils 100MB bis 500MB) anstelle vieler kleiner Dateien.

  • Validieren Sie das Schema beim Schreiben.

    Definieren Sie das Schema immer explizit, insbesondere bei semistrukturierten Formaten wie JSON und CSV. Dies verhindert eine Abweichung zwischen Snowflake und externen Daten.

  • Überwachen Sie Kosten.

    Erwägen Sie, vor dem Schreiben Dateien zu konsolidieren und Daten zu filtern, um die Kosten zu senken. Beim Cloudanbieter fallen pro Anfrage und pro gescanntem Byte Kosten an.

  • Standardisieren Sie API-Aufrufe

    Befolgen Sie die dokumentierten Anleitungen genau, wenn Sie Funktionen und Parameter nutzen, und vermeiden Sie Ad-hoc-Varianten. Auf diese Weise können Sie die Kompatibilität aufrechterhalten, Regressionen verhindern und das erwartete Verhalten bei verschiedenen Cloudanbietern sicherstellen.

Zugriff mithilfe externer Snowflake-Stagingbereiche

  1. Konfigurieren Sie den sicheren Zugriff auf Amazon S3, um einen externen Stagingbereich zu erstellen, der auf Ihren S3-Speicherort verweist.

  2. Lesen Sie aus Ihrem externen Stagingbereich.

    # Read CSV
    spark.read.csv('@<your external stage name>/<file path>')
    spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file
    
    # Write to CSV
    df.write.csv('@<your external stage name>/<file path>')
    df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file
    
    # Read Text
    spark.read.text('@<your external stage name>/<file path>')
    
    # Write to Text
    df.write.text('@<your external stage name>/<file path>')
    df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>')
    
    # Read Parquet
    spark.read.parquet('@<your external stage name>/<file path>')
    
    # Write to Parquet
    df.write.parquet('@<your external stage name>/<file path>')
    
    # Read JSON
    spark.read.json('@<your external stage name>/<file path>')
    
    # Write to JSON
    df.write.json('@<your external stage name>/<file path>')
    
    Copy

Zugriff über direkten Zugriff

Mit den hier beschriebenen Schritten und dem Code können Sie direkt auf Dateien bei Cloudserviceanbietern zugreifen.

  1. Legen Sie die Spark-Konfiguration mit AWS-Anmeldeinformationen fest.

    # For S3 related access with public/private buckets, please add these config change
    spark.conf.set("spark.hadoop.fs.s3a.connection.ssl.enabled","false")
    spark.conf.set("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
    spark.conf.set("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.3.2")
    
    # For private S3 access, please also provide credentials
    spark.conf.set("spark.hadoop.fs.s3a.access.key","<AWS_ACCESS_KEY_ID>")
    spark.conf.set("spark.hadoop.fs.s3a.secret.key","<AWS_SECRET_ACCESS_KEY>")
    spark.conf.set("spark.hadoop.fs.s3a.session.token","<AWS_SESSION_TOKEN>")
    
    Copy
  2. Lesen und schreiben Sie direkt mit S3.

    # Read CSV
    spark.read.csv('s3a://<bucket name>/<file path>')
    spark.read.option("header", True).csv('s3a://<bucket name>/<file path>') # read with header in file
    
    # Write to CSV
    df.write.csv('s3a://<bucket name>/<file path>')
    df.write.option("header", True).csv('s3a://<bucket name>/<file path>') # write with header in file
    
    # Read Text
    spark.read.text('s3a://<bucket name>/<file path>')
    
    # Write to Text
    df.write.text('s3a://<bucket name>/<file path>')
    df.write.format("text").mode("overwrite").save('s3a://<bucket name>/<file path>')
    
    # Read Parquet
    spark.read.parquet('s3a://<bucket name>/<file path>')
    
    # Write to Parquet
    df.write.parquet('s3a://<bucket name>/<file path>')
    
    # Read JSON
    spark.read.json('s3a://<bucket name>/<file path>')
    
    # Write to JSON
    df.write.json('s3a://<bucket name>/<file path>')
    
    Copy