Accéder aux données de fichiers de service cloud avec Snowpark Connect for Spark¶
Avec Snowpark Connect for Spark, vous pouvez interagir directement avec des systèmes de stockage cloud externes tels que Amazon S3, Google Cloud Storage et Azure Blob. Vous pouvez lire des données du stockage cloud dans Snowflake, traiter les données, puis les réécrire.
Par exemple, vous pourriez vouloir utiliser Snowpark Connect for Spark pour effectuer les tâches suivantes :
Ingérer des données brutes.
Déposez des fichiers (par exemple, CSV, JSON, et Parquet) dans S3, Google Cloud ou Azure avant de les déplacer dans Snowflake.
Exporter des données pour une utilisation en aval.
Écrivez des DataFrames Snowpark traités dans le stockage cloud pour la formation ML, le partage avec des partenaires externes ou d’autres analyses basées sur Spark.
Créer des pipelines hybrides.
Conservez une partie du pipeline dans Snowflake tout en maintenant la compatibilité avec les data lakes existants.
Respecter les réglementations ou réduire les coûts.
Stockez des ensembles de données spécifiques en externe en raison des réglementations, de la gouvernance ou des contraintes budgétaires.
Suivez la procédure indiquée dans cette rubrique pour lire et écrire dans des fichiers stockés sur ces fournisseurs de services cloud. Vous pouvez accéder aux fichiers en utilisant des zones de préparation externes Snowflake ou un accès direct.
Mises en garde¶
Lorsque vous utilisez Snowpark Connect for Spark pour utiliser des services cloud, gardez à l’esprit les mises en garde suivantes :
Authentification - Snowpark Connect for Spark ne gère pas automatiquement les informations d’identification cloud. Vous devez configurer des clés d’accès (AWS), des clés de compte de stockage ou des jetons SAS (Azure), ou gérez vous-même les zones de préparation externes. Les identifiants de connexion obsolètes ou manquants entraîneront des échecs de lecture/écriture.
Performance - Les E/S cloud dépendent de la bande passante du réseau et de la latence du stockage d’objets. La lecture de nombreux petits fichiers peut avoir des conséquences importantes sur les performances.
Prise en charge des formats - Assurez-vous que les formats de fichiers que vous lisez et écrivez sont pris en charge. Actuellement Snowpark Connect for Spark équivaut aux formats courants, y compris TEXT, CSV, JSON et Parquet. Cependant, des fonctions avancées (telles que la découverte des partitions Parquet et l’évolution du schéma JSON) peuvent différer de Spark.
Autorisations et politiques - L’écriture dans des compartiments cloud nécessite des politiques IAM/ACL appropriées. Vous risquez de rencontrer une erreur AccessDenied si les politiques ne sont pas alignées entre les rôles Snowflake et les identifiants de connexion cloud.
Meilleures pratiques¶
Pour obtenir l’intégration la plus fiable qui fonctionne bien, suivez ces bonnes pratiques :
Utilisez des identifiants temporaires sécurisés et changer fréquemment les identifiants de connexion en les utilisant à tour de rôle.
Partitionner et compartimenter les données.
Lorsque vous écrivez des fichiers au format Parquet, partitionnez les données sur des colonnes fréquemment filtrées afin de réduire les coûts d’analyse. Utilisez moins de fichiers mais des fichiers plus volumineux (par exemple, chacun de 100MB à 500MB) au lieu de nombreux petits fichiers.
Validez le schéma lors de l’écriture.
Définissez toujours le schéma de manière explicite, en particulier pour les formats semi-structurés tels que JSON et CSV. Cela empêche la dérive de valeur entre les données Snowflake et les données externes.
Surveiller les coûts.
Envisagez de consolider les fichiers et de filtrer les données avant l’écriture afin de réduire les coûts. Les coûts des fournisseurs cloud sont cumulés par requête et par octet analysé.
Normaliser les appels de l’API.
Suivez précisément les conseils fournis lors de l’utilisation des fonctionnalités et des paramètres en évitant les variations ad hoc. De cette manière, vous pouvez maintenir la compatibilité, empêcher les régressions et obtenir le comportement attendu entre les différents fournisseurs de services cloud.
Accès en utilisant des zones de préparation externes Snowflake¶
Configurez l’accès sécurisé à Amazon S3 pour créer une zone de préparation externe qui pointe vers votre emplacement S3.
Lisez à partir de votre zone de préparation externe.
# Read CSV spark.read.csv('@<your external stage name>/<file path>') spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file # Write to CSV df.write.csv('@<your external stage name>/<file path>') df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file # Read Text spark.read.text('@<your external stage name>/<file path>') # Write to Text df.write.text('@<your external stage name>/<file path>') df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>') # Read Parquet spark.read.parquet('@<your external stage name>/<file path>') # Write to Parquet df.write.parquet('@<your external stage name>/<file path>') # Read JSON spark.read.json('@<your external stage name>/<file path>') # Write to JSON df.write.json('@<your external stage name>/<file path>')
Configurez l’accès sécurisé à Azure pour créer une zone de préparation externe qui pointe vers votre conteneur Azure.
Lisez à partir de votre zone de préparation externe.
# Read CSV spark.read.csv('@<your external stage name>/<file path>') spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file # Write to CSV df.write.csv('@<your external stage name>/<file path>') df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file # Read Text spark.read.text('@<your external stage name>/<file path>') # Write to Text df.write.text('@<your external stage name>/<file path>') df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>') # Read Parquet spark.read.parquet('@<your external stage name>/<file path>') # Write to Parquet df.write.parquet('@<your external stage name>/<file path>') # Read JSON spark.read.json('@<your external stage name>/<file path>') # Write to JSON df.write.json('@<your external stage name>/<file path>')
Configurez l’accès sécurisé à Google Cloud pour créer une zone de préparation externe qui pointe vers votre compartiment Google Cloud Storage.
Lisez à partir de votre zone de préparation externe.
# Read CSV spark.read.csv('@<your external stage name>/<file path>') spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file # Write to CSV df.write.csv('@<your external stage name>/<file path>') df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file # Read Text spark.read.text('@<your external stage name>/<file path>') # Write to Text df.write.text('@<your external stage name>/<file path>') df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>') # Read Parquet spark.read.parquet('@<your external stage name>/<file path>') # Write to Parquet df.write.parquet('@<your external stage name>/<file path>') # Read JSON spark.read.json('@<your external stage name>/<file path>') # Write to JSON df.write.json('@<your external stage name>/<file path>')
Accès en utilisant l’accès direct¶
Vous pouvez accéder aux fichiers directement sur les fournisseurs de services cloud en suivant la procédure et en utilisant le code décrits ici.
Définissez la configuration Spark avec des identifiants AWS.
# For S3 related access with public/private buckets, please add these config change spark.conf.set("spark.hadoop.fs.s3a.connection.ssl.enabled","false") spark.conf.set("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") spark.conf.set("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.3.2") # For private S3 access, please also provide credentials spark.conf.set("spark.hadoop.fs.s3a.access.key","<AWS_ACCESS_KEY_ID>") spark.conf.set("spark.hadoop.fs.s3a.secret.key","<AWS_SECRET_ACCESS_KEY>") spark.conf.set("spark.hadoop.fs.s3a.session.token","<AWS_SESSION_TOKEN>")
Lisez et écrivez directement avec S3.
# Read CSV spark.read.csv('s3a://<bucket name>/<file path>') spark.read.option("header", True).csv('s3a://<bucket name>/<file path>') # read with header in file # Write to CSV df.write.csv('s3a://<bucket name>/<file path>') df.write.option("header", True).csv('s3a://<bucket name>/<file path>') # write with header in file # Read Text spark.read.text('s3a://<bucket name>/<file path>') # Write to Text df.write.text('s3a://<bucket name>/<file path>') df.write.format("text").mode("overwrite").save('s3a://<bucket name>/<file path>') # Read Parquet spark.read.parquet('s3a://<bucket name>/<file path>') # Write to Parquet df.write.parquet('s3a://<bucket name>/<file path>') # Read JSON spark.read.json('s3a://<bucket name>/<file path>') # Write to JSON df.write.json('s3a://<bucket name>/<file path>')
Définissez la configuration Spark avec les identifiants de connexion Azure.
# For private Azure access, please also provide blob SAS token # * Make sure all required permissions are in place before proceeding spark.conf.set("fs.azure.sas.fixed.token.<storage-account>.dfs.core.windows.net","<Shared Access Token>")
Lisez et écrivez directement avec Azure.
# Read CSV spark.read.csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') spark.read.option("header", True).csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # read with header in file # Write to CSV df.write.csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') df.write.option("header", True).csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # write with header in file # Read Text spark.read.text('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Write to Text df.write.text('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') df.write.format("text").mode("overwrite").save('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Read Parquet spark.read.parquet('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Write to Parquet df.write.parquet('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Read JSON spark.read.json('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Write to JSON df.write.json('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>')