Acesso a dados de arquivos de serviço de nuvem com Snowpark Connect for Spark¶
Com Snowpark Connect for Spark, você pode interagir diretamente com sistemas externos de armazenamento em nuvem, tais como Amazon S3, Google Cloud Storage e Azure Blob. Você pode ler os dados do armazenamento em nuvem no Snowflake, processar os dados e depois gravá-los de volta.
Por exemplo, você pode querer usar Snowpark Connect for Spark para executar as seguintes tarefas:
Ingerir dados brutos.
Arquivos de local (por exemplo, CSV, JSON, e Parquet) em S3, Google Cloud ou Azure antes de movê-los para o Snowflake.
Exportar dados para uso downstream.
Gravar DataFrames do Snowpark processados de volta ao armazenamento em nuvem para treinamento de ML, compartilhamento com parceiros externos ou outras análises baseadas em Spark.
Criar pipelines híbridos.
Mantenha parte do pipeline no Snowflake, mantendo a compatibilidade com os data lakes existentes.
Siga os regulamentos ou reduza os custos.
Armazene conjuntos de dados específicos externamente devido a regulamentações, governança ou restrições orçamentárias.
Use as etapas listadas neste tópico para ler e gravar em arquivos armazenados nestes provedores de serviços de nuvem. Você pode acessar os arquivos usando os estágios externos do Snowflake ou o acesso direto.
Advertências¶
Ao usar Snowpark Connect for Spark para trabalhar com serviços de nuvem, tenha em mente as seguintes advertências:
Autenticação—Snowpark Connect for Spark não gerencia automaticamente as credenciais da nuvem. Você deve configurar as chaves de acesso (AWS), chaves de conta de armazenamento ou tokens SAS (Azure) ou mantém estágios externos por conta própria. Credenciais expiradas ou ausentes resultarão em falhas de leitura/gravação.
Desempenho—O Cloud I/O depende da largura de banda da rede e da latência do armazenamento de objetos. A leitura de muitos arquivos pequenos pode ter um impacto significativo no desempenho.
Suporte ao formato — Certifique-se de que os formatos de arquivo que você está lendo e escrevendo sejam suportados. Atualmente Snowpark Connect for Spark tem paridade com formatos comuns, incluindo TEXT, CSV, JSON e Parquet. Entretanto, recursos avançados (como a descoberta de partição Parquet e JSON a evolução do esquema) pode ser diferente do Spark.
Permissões e políticas: gravar em buckets de nuvem requer políticas de IAM/ACL adequadas. Você pode encontrar um erro AccessDenied se as políticas não estiverem alinhadas entre as funções do Snowflake e as credenciais da nuvem.
Práticas recomendadas¶
Para obter a integração mais confiável e com bom desempenho, siga estas práticas recomendadas:
Use credenciais seguras e temporárias e alterne as credenciais com frequência.
Dados de partição e bucket.
Ao escrever Parquet, particione em colunas filtradas com frequência para reduzir os custos de varredura. Use arquivos maiores e em menor número (por exemplo, em 100MB a 500MB cada) em vez de muitos arquivos pequenos.
Validar o esquema na gravação.
Sempre defina o esquema explicitamente, especialmente para formatos semiestruturados como JSON e CSV. Isso evita desvios entre o Snowflake e dados externos.
Monitorar os custos.
Considere consolidar arquivos e filtrar dados antes de gravar para reduzir custos. Os custos do provedor de nuvem são acumulados por solicitação e por byte escaneado.
Padronizar chamadas de API.
Siga a orientação documentada precisamente ao usar a funcionalidade e os parâmetros, evitando variações ad-hoc. Dessa forma, você pode manter a compatibilidade, evitar regressões e garantir o comportamento esperado em diferentes provedores de nuvem.
Acesso usando estágios externos do Snowflake¶
Configuração de acesso seguro ao Amazon S3 para criar um estágio externo que aponte para seu local S3.
Leia a partir de seu estágio externo.
# Read CSV spark.read.csv('@<your external stage name>/<file path>') spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file # Write to CSV df.write.csv('@<your external stage name>/<file path>') df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file # Read Text spark.read.text('@<your external stage name>/<file path>') # Write to Text df.write.text('@<your external stage name>/<file path>') df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>') # Read Parquet spark.read.parquet('@<your external stage name>/<file path>') # Write to Parquet df.write.parquet('@<your external stage name>/<file path>') # Read JSON spark.read.json('@<your external stage name>/<file path>') # Write to JSON df.write.json('@<your external stage name>/<file path>')
Configuração de acesso seguro ao Azure para criar um estágio externo que aponte para seu contêiner Azure.
Leia a partir de seu estágio externo.
# Read CSV spark.read.csv('@<your external stage name>/<file path>') spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file # Write to CSV df.write.csv('@<your external stage name>/<file path>') df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file # Read Text spark.read.text('@<your external stage name>/<file path>') # Write to Text df.write.text('@<your external stage name>/<file path>') df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>') # Read Parquet spark.read.parquet('@<your external stage name>/<file path>') # Write to Parquet df.write.parquet('@<your external stage name>/<file path>') # Read JSON spark.read.json('@<your external stage name>/<file path>') # Write to JSON df.write.json('@<your external stage name>/<file path>')
Configuração de acesso seguro ao Google Cloud para criar um estágio externo que aponte para seu bucket do Google Cloud Storage.
Leia a partir de seu estágio externo.
# Read CSV spark.read.csv('@<your external stage name>/<file path>') spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file # Write to CSV df.write.csv('@<your external stage name>/<file path>') df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file # Read Text spark.read.text('@<your external stage name>/<file path>') # Write to Text df.write.text('@<your external stage name>/<file path>') df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>') # Read Parquet spark.read.parquet('@<your external stage name>/<file path>') # Write to Parquet df.write.parquet('@<your external stage name>/<file path>') # Read JSON spark.read.json('@<your external stage name>/<file path>') # Write to JSON df.write.json('@<your external stage name>/<file path>')
Acesso usando o acesso direto¶
Você pode acessar arquivos diretamente nos provedores de serviços de nuvem usando as etapas e o código descritos aqui.
Definição da configuração do Spark com credenciais da AWS.
# For S3 related access with public/private buckets, please add these config change spark.conf.set("spark.hadoop.fs.s3a.connection.ssl.enabled","false") spark.conf.set("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") spark.conf.set("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.3.2") # For private S3 access, please also provide credentials spark.conf.set("spark.hadoop.fs.s3a.access.key","<AWS_ACCESS_KEY_ID>") spark.conf.set("spark.hadoop.fs.s3a.secret.key","<AWS_SECRET_ACCESS_KEY>") spark.conf.set("spark.hadoop.fs.s3a.session.token","<AWS_SESSION_TOKEN>")
Leia e grave diretamente com o S3.
# Read CSV spark.read.csv('s3a://<bucket name>/<file path>') spark.read.option("header", True).csv('s3a://<bucket name>/<file path>') # read with header in file # Write to CSV df.write.csv('s3a://<bucket name>/<file path>') df.write.option("header", True).csv('s3a://<bucket name>/<file path>') # write with header in file # Read Text spark.read.text('s3a://<bucket name>/<file path>') # Write to Text df.write.text('s3a://<bucket name>/<file path>') df.write.format("text").mode("overwrite").save('s3a://<bucket name>/<file path>') # Read Parquet spark.read.parquet('s3a://<bucket name>/<file path>') # Write to Parquet df.write.parquet('s3a://<bucket name>/<file path>') # Read JSON spark.read.json('s3a://<bucket name>/<file path>') # Write to JSON df.write.json('s3a://<bucket name>/<file path>')
Defina a configuração do Spark com as credenciais do Azure.
# For private Azure access, please also provide blob SAS token # * Make sure all required permissions are in place before proceeding spark.conf.set("fs.azure.sas.fixed.token.<storage-account>.dfs.core.windows.net","<Shared Access Token>")
Leia e grave diretamente com o Azure.
# Read CSV spark.read.csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') spark.read.option("header", True).csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # read with header in file # Write to CSV df.write.csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') df.write.option("header", True).csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # write with header in file # Read Text spark.read.text('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Write to Text df.write.text('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') df.write.format("text").mode("overwrite").save('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Read Parquet spark.read.parquet('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Write to Parquet df.write.parquet('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Read JSON spark.read.json('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Write to JSON df.write.json('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>')