Zugriff auf Dateidaten von Cloudservices mit Snowpark Connect for Spark¶
Durch Snowpark Connect for Spark können Sie direkt mit externen Cloudspeichersystemen wie Amazon S3, Google Cloud Storage und Azure Blob interagieren. Sie können Daten aus dem Cloudspeicher in Snowflake einlesen, diese Daten verarbeiten und dann zurückschreiben.
Sie könnten Snowpark Connect for Spark zum Beispiel verwenden, um die folgenden Aufgaben auszuführen:
Rohdaten aufnehmen.
Dateien (z. B. CSV, JSONund Parquet) in S3, Google Cloud oder Azure hochzuladen, bevor sie in Snowflake verschoben werden.
Daten für die nachgelagerte Verwendung exportieren.
Verarbeitete Snowpark DataFrames für ML-Training, Freigabe für externe Partner oder weitere Spark-basierte Analysen zurück in den Cloudspeicher schreiben.
Hybride Pipelines erstellen.
Einen Teil der Pipeline in Snowflake beibehalten, während die Kompatibilität mit vorhandenen Data Lakes erhalten bleibt.
Vorschriften erfüllen oder Kosten senken.
Bestimmte Datensets aufgrund von Vorschriften, Governance oder Budgeteinschränkungen extern speichern.
Verwenden Sie die unter diesem Thema aufgeführten Schritte, um aus Dateien zu lesen bzw. in Dateien zu schreiben, die bei diesen Cloudserviceanbietern gespeichert sind. Sie können auf Dateien entweder über externe Snowflake-Stagingbereiche oder über den direkten Zugriff zugreifen.
Einschränkungen¶
Bei Verwendung von Snowpark Connect for Spark bei der Arbeit mit Cloudservices sollten Sie die folgenden Einschränkungen beachten:
Authentifizierung –Snowpark Connect for Spark verwaltet Cloud-Anmeldeinformationen nicht automatisch. Sie müssen die Zugriffsschlüssel (AWS), Speicherkontoschlüssel oder SAS-Token (Azure) konfigurieren oder externe Stagingbereiche selbst verwalten. Abgelaufene oder fehlende Anmeldeinformationen führen zu Lese-/Schreibfehlern.
Leistung – Cloud-E/A hängt von der Netzwerkbandbreite und der Latenz des Objektspeichers ab. Das Lesen vieler kleiner Dateien kann die Leistung erheblich beeinträchtigen.
Formatunterstützung – Stellen Sie sicher, dass die Dateiformate, die Sie lesen und schreiben, unterstützt werden. Derzeit hat Snowpark Connect for Spark Parität mit gängigen Formaten, einschließlich TEXT, CSV, JSONund Parquet. Erweiterte Features (wie Parquet-Partitionserkennung und JSON-Schemaentwicklung) können sich von Spark unterscheiden.
Berechtigungen und Richtlinien – Das Schreiben in Cloud-Buckets erfordert entsprechende IAM/ACL-Richtlinien. Möglicherweise stoßen Sie auf einen AccessDenied-Fehler, wenn die Richtlinien nicht zwischen Snowflake-Rollen und Cloud-Anmeldeinformationen abgeglichen werden.
Best Practices¶
Um die zuverlässigste Integration zu erhalten, die gut funktioniert, befolgen Sie die folgenden Best Practices:
Verwenden Sie sichere, temporäre Anmeldeinformationen und ändern Sie die Anmeldeinformationen häufig.
Partitionieren Sie Daten und bilden Sie Daten-Buckets.
Wenn Sie Parquet schreiben, partitionieren Sie auf häufig gefilterte Spalten, um die Kosten für das Scannen zu reduzieren. Verwenden Sie weniger, größere Dateien (z. B. mit jeweils 100MB bis 500MB) anstelle vieler kleiner Dateien.
Validieren Sie das Schema beim Schreiben.
Definieren Sie das Schema immer explizit, insbesondere bei semistrukturierten Formaten wie JSON und CSV. Dies verhindert eine Abweichung zwischen Snowflake und externen Daten.
Überwachen Sie Kosten.
Erwägen Sie, vor dem Schreiben Dateien zu konsolidieren und Daten zu filtern, um die Kosten zu senken. Beim Cloudanbieter fallen pro Anfrage und pro gescanntem Byte Kosten an.
Standardisieren Sie API-Aufrufe
Befolgen Sie die dokumentierten Anleitungen genau, wenn Sie Funktionen und Parameter nutzen, und vermeiden Sie Ad-hoc-Varianten. Auf diese Weise können Sie die Kompatibilität aufrechterhalten, Regressionen verhindern und das erwartete Verhalten bei verschiedenen Cloudanbietern sicherstellen.
Zugriff mithilfe externer Snowflake-Stagingbereiche¶
Konfigurieren Sie den sicheren Zugriff auf Amazon S3, um einen externen Stagingbereich zu erstellen, der auf Ihren S3-Speicherort verweist.
Lesen Sie aus Ihrem externen Stagingbereich.
# Read CSV spark.read.csv('@<your external stage name>/<file path>') spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file # Write to CSV df.write.csv('@<your external stage name>/<file path>') df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file # Read Text spark.read.text('@<your external stage name>/<file path>') # Write to Text df.write.text('@<your external stage name>/<file path>') df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>') # Read Parquet spark.read.parquet('@<your external stage name>/<file path>') # Write to Parquet df.write.parquet('@<your external stage name>/<file path>') # Read JSON spark.read.json('@<your external stage name>/<file path>') # Write to JSON df.write.json('@<your external stage name>/<file path>')
Konfigurieren Sie den sicheren Zugriff auf Azure, um einen externen Stagingbereich zu erstellen, der auf Ihren Azure-Container verweist.
Lesen Sie aus Ihrem externen Stagingbereich.
# Read CSV spark.read.csv('@<your external stage name>/<file path>') spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file # Write to CSV df.write.csv('@<your external stage name>/<file path>') df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file # Read Text spark.read.text('@<your external stage name>/<file path>') # Write to Text df.write.text('@<your external stage name>/<file path>') df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>') # Read Parquet spark.read.parquet('@<your external stage name>/<file path>') # Write to Parquet df.write.parquet('@<your external stage name>/<file path>') # Read JSON spark.read.json('@<your external stage name>/<file path>') # Write to JSON df.write.json('@<your external stage name>/<file path>')
Konfigurieren Sie den sicheren Zugriff auf Google Cloud, um einen externen Stagingbereich zu erstellen, der auf Ihren Google Cloud Storage-Bucket verweist.
Lesen Sie aus Ihrem externen Stagingbereich.
# Read CSV spark.read.csv('@<your external stage name>/<file path>') spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file # Write to CSV df.write.csv('@<your external stage name>/<file path>') df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file # Read Text spark.read.text('@<your external stage name>/<file path>') # Write to Text df.write.text('@<your external stage name>/<file path>') df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>') # Read Parquet spark.read.parquet('@<your external stage name>/<file path>') # Write to Parquet df.write.parquet('@<your external stage name>/<file path>') # Read JSON spark.read.json('@<your external stage name>/<file path>') # Write to JSON df.write.json('@<your external stage name>/<file path>')
Zugriff über direkten Zugriff¶
Mit den hier beschriebenen Schritten und dem Code können Sie direkt auf Dateien bei Cloudserviceanbietern zugreifen.
Legen Sie die Spark-Konfiguration mit AWS-Anmeldeinformationen fest.
# For S3 related access with public/private buckets, please add these config change spark.conf.set("spark.hadoop.fs.s3a.connection.ssl.enabled","false") spark.conf.set("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") spark.conf.set("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.3.2") # For private S3 access, please also provide credentials spark.conf.set("spark.hadoop.fs.s3a.access.key","<AWS_ACCESS_KEY_ID>") spark.conf.set("spark.hadoop.fs.s3a.secret.key","<AWS_SECRET_ACCESS_KEY>") spark.conf.set("spark.hadoop.fs.s3a.session.token","<AWS_SESSION_TOKEN>")
Lesen und schreiben Sie direkt mit S3.
# Read CSV spark.read.csv('s3a://<bucket name>/<file path>') spark.read.option("header", True).csv('s3a://<bucket name>/<file path>') # read with header in file # Write to CSV df.write.csv('s3a://<bucket name>/<file path>') df.write.option("header", True).csv('s3a://<bucket name>/<file path>') # write with header in file # Read Text spark.read.text('s3a://<bucket name>/<file path>') # Write to Text df.write.text('s3a://<bucket name>/<file path>') df.write.format("text").mode("overwrite").save('s3a://<bucket name>/<file path>') # Read Parquet spark.read.parquet('s3a://<bucket name>/<file path>') # Write to Parquet df.write.parquet('s3a://<bucket name>/<file path>') # Read JSON spark.read.json('s3a://<bucket name>/<file path>') # Write to JSON df.write.json('s3a://<bucket name>/<file path>')
Legen Sie die Spark-Konfiguration mit Azure-Anmeldeinformationen fest.
# For private Azure access, please also provide blob SAS token # * Make sure all required permissions are in place before proceeding spark.conf.set("fs.azure.sas.fixed.token.<storage-account>.dfs.core.windows.net","<Shared Access Token>")
Lesen und schreiben Sie direkt mit Azure.
# Read CSV spark.read.csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') spark.read.option("header", True).csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # read with header in file # Write to CSV df.write.csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') df.write.option("header", True).csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # write with header in file # Read Text spark.read.text('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Write to Text df.write.text('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') df.write.format("text").mode("overwrite").save('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Read Parquet spark.read.parquet('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Write to Parquet df.write.parquet('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Read JSON spark.read.json('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Write to JSON df.write.json('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>')