Acesso a dados de arquivos de serviço de nuvem com Snowpark Connect for Spark

Com Snowpark Connect for Spark, você pode interagir diretamente com sistemas externos de armazenamento em nuvem, tais como Amazon S3, Google Cloud Storage e Azure Blob. Você pode ler os dados do armazenamento em nuvem no Snowflake, processar os dados e depois gravá-los de volta.

Por exemplo, você pode querer usar Snowpark Connect for Spark para executar as seguintes tarefas:

  • Ingerir dados brutos.

    Arquivos de local (por exemplo, CSV, JSON, e Parquet) em S3, Google Cloud ou Azure antes de movê-los para o Snowflake.

  • Exportar dados para uso downstream.

    Gravar DataFrames do Snowpark processados de volta ao armazenamento em nuvem para treinamento de ML, compartilhamento com parceiros externos ou outras análises baseadas em Spark.

  • Criar pipelines híbridos.

    Mantenha parte do pipeline no Snowflake, mantendo a compatibilidade com os data lakes existentes.

  • Siga os regulamentos ou reduza os custos.

    Armazene conjuntos de dados específicos externamente devido a regulamentações, governança ou restrições orçamentárias.

Use as etapas listadas neste tópico para ler e gravar em arquivos armazenados nestes provedores de serviços de nuvem. Você pode acessar os arquivos usando os estágios externos do Snowflake ou o acesso direto.

Advertências

Ao usar Snowpark Connect for Spark para trabalhar com serviços de nuvem, tenha em mente as seguintes advertências:

  • Autenticação—Snowpark Connect for Spark não gerencia automaticamente as credenciais da nuvem. Você deve configurar as chaves de acesso (AWS), chaves de conta de armazenamento ou tokens SAS (Azure) ou mantém estágios externos por conta própria. Credenciais expiradas ou ausentes resultarão em falhas de leitura/gravação.

  • Desempenho—O Cloud I/O depende da largura de banda da rede e da latência do armazenamento de objetos. A leitura de muitos arquivos pequenos pode ter um impacto significativo no desempenho.

  • Suporte ao formato — Certifique-se de que os formatos de arquivo que você está lendo e escrevendo sejam suportados. Atualmente Snowpark Connect for Spark tem paridade com formatos comuns, incluindo TEXT, CSV, JSON e Parquet. Entretanto, recursos avançados (como a descoberta de partição Parquet e JSON a evolução do esquema) pode ser diferente do Spark.

  • Permissões e políticas: gravar em buckets de nuvem requer políticas de IAM/ACL adequadas. Você pode encontrar um erro AccessDenied se as políticas não estiverem alinhadas entre as funções do Snowflake e as credenciais da nuvem.

Práticas recomendadas

Para obter a integração mais confiável e com bom desempenho, siga estas práticas recomendadas:

  • Use credenciais seguras e temporárias e alterne as credenciais com frequência.

  • Dados de partição e bucket.

    Ao escrever Parquet, particione em colunas filtradas com frequência para reduzir os custos de varredura. Use arquivos maiores e em menor número (por exemplo, em 100MB a 500MB cada) em vez de muitos arquivos pequenos.

  • Validar o esquema na gravação.

    Sempre defina o esquema explicitamente, especialmente para formatos semiestruturados como JSON e CSV. Isso evita desvios entre o Snowflake e dados externos.

  • Monitorar os custos.

    Considere consolidar arquivos e filtrar dados antes de gravar para reduzir custos. Os custos do provedor de nuvem são acumulados por solicitação e por byte escaneado.

  • Padronizar chamadas de API.

    Siga a orientação documentada precisamente ao usar a funcionalidade e os parâmetros, evitando variações ad-hoc. Dessa forma, você pode manter a compatibilidade, evitar regressões e garantir o comportamento esperado em diferentes provedores de nuvem.

Acesso usando estágios externos do Snowflake

  1. Configuração de acesso seguro ao Amazon S3 para criar um estágio externo que aponte para seu local S3.

  2. Leia a partir de seu estágio externo.

    # Read CSV
    spark.read.csv('@<your external stage name>/<file path>')
    spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file
    
    # Write to CSV
    df.write.csv('@<your external stage name>/<file path>')
    df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file
    
    # Read Text
    spark.read.text('@<your external stage name>/<file path>')
    
    # Write to Text
    df.write.text('@<your external stage name>/<file path>')
    df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>')
    
    # Read Parquet
    spark.read.parquet('@<your external stage name>/<file path>')
    
    # Write to Parquet
    df.write.parquet('@<your external stage name>/<file path>')
    
    # Read JSON
    spark.read.json('@<your external stage name>/<file path>')
    
    # Write to JSON
    df.write.json('@<your external stage name>/<file path>')
    
    Copy

Acesso usando o acesso direto

Você pode acessar arquivos diretamente nos provedores de serviços de nuvem usando as etapas e o código descritos aqui.

  1. Definição da configuração do Spark com credenciais da AWS.

    # For S3 related access with public/private buckets, please add these config change
    spark.conf.set("spark.hadoop.fs.s3a.connection.ssl.enabled","false")
    spark.conf.set("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
    spark.conf.set("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.3.2")
    
    # For private S3 access, please also provide credentials
    spark.conf.set("spark.hadoop.fs.s3a.access.key","<AWS_ACCESS_KEY_ID>")
    spark.conf.set("spark.hadoop.fs.s3a.secret.key","<AWS_SECRET_ACCESS_KEY>")
    spark.conf.set("spark.hadoop.fs.s3a.session.token","<AWS_SESSION_TOKEN>")
    
    Copy
  2. Leia e grave diretamente com o S3.

    # Read CSV
    spark.read.csv('s3a://<bucket name>/<file path>')
    spark.read.option("header", True).csv('s3a://<bucket name>/<file path>') # read with header in file
    
    # Write to CSV
    df.write.csv('s3a://<bucket name>/<file path>')
    df.write.option("header", True).csv('s3a://<bucket name>/<file path>') # write with header in file
    
    # Read Text
    spark.read.text('s3a://<bucket name>/<file path>')
    
    # Write to Text
    df.write.text('s3a://<bucket name>/<file path>')
    df.write.format("text").mode("overwrite").save('s3a://<bucket name>/<file path>')
    
    # Read Parquet
    spark.read.parquet('s3a://<bucket name>/<file path>')
    
    # Write to Parquet
    df.write.parquet('s3a://<bucket name>/<file path>')
    
    # Read JSON
    spark.read.json('s3a://<bucket name>/<file path>')
    
    # Write to JSON
    df.write.json('s3a://<bucket name>/<file path>')
    
    Copy