Opção 1: Carregamento de dados usando a API REST Snowpipe

Este tópico descreve como chamar os pontos de extremidade REST públicos para carregar dados e recuperar relatórios do histórico de carregamento. As instruções assumem que você tenha completado as instruções de configuração em Preparação para carregamento de dados usando a API REST Snowpipe.

Neste tópico:

Carregamento de dados

O carregamento é feito em duas etapas:

Etapa 1

Prepare seus arquivos de dados:

  • Estágio interno: Use o comando PUT para preparar seus arquivos.

  • Estágio externo: Use as ferramentas do cliente fornecidas pelo provedor da nuvem para copiar seus arquivos para o local do estágio (Amazon S3, Google Cloud Storage ou Microsoft Azure).

Etapa 2

Envie uma solicitação ao ponto de extremidade REST insertFiles para carregar os arquivos de dados preparados.

Para sua conveniência, neste tópico são fornecidos programas de amostra Java e Python que ilustram como enviar um ponto de extremidade REST.

Programa de amostra para o SDK Java

import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryRangeResponse;
import net.snowflake.ingest.connection.HistoryResponse;
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder;
import org.bouncycastle.operator.InputDecryptorProvider;
import org.bouncycastle.operator.OperatorCreationException;
import org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo;
import org.bouncycastle.pkcs.PKCSException;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Paths;
import java.security.PrivateKey;
import java.security.Security;
import java.time.Instant;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SDKTest
{
  // Path to the private key file that you generated earlier.
  private static final String PRIVATE_KEY_FILE = "/<path>/rsa_key.p8";

  public static class PrivateKeyReader
  {
    // If you generated an encrypted private key, implement this method to return
    // the passphrase for decrypting your private key.
    private static String getPrivateKeyPassphrase() {
      return "<private_key_passphrase>";
    }

    public static PrivateKey get(String filename)
            throws Exception
    {
      PrivateKeyInfo privateKeyInfo = null;
      Security.addProvider(new BouncyCastleProvider());
      // Read an object from the private key file.
      PEMParser pemParser = new PEMParser(new FileReader(Paths.get(filename).toFile()));
      Object pemObject = pemParser.readObject();
      if (pemObject instanceof PKCS8EncryptedPrivateKeyInfo) {
        // Handle the case where the private key is encrypted.
        PKCS8EncryptedPrivateKeyInfo encryptedPrivateKeyInfo = (PKCS8EncryptedPrivateKeyInfo) pemObject;
        String passphrase = getPrivateKeyPassphrase();
        InputDecryptorProvider pkcs8Prov = new JceOpenSSLPKCS8DecryptorProviderBuilder().build(passphrase.toCharArray());
        privateKeyInfo = encryptedPrivateKeyInfo.decryptPrivateKeyInfo(pkcs8Prov);
      } else if (pemObject instanceof PrivateKeyInfo) {
        // Handle the case where the private key is unencrypted.
        privateKeyInfo = (PrivateKeyInfo) pemObject;
      }
      pemParser.close();
      JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider(BouncyCastleProvider.PROVIDER_NAME);
      return converter.getPrivateKey(privateKeyInfo);
    }
  }

  private static HistoryResponse waitForFilesHistory(SimpleIngestManager manager,
                                                     Set<String> files)
          throws Exception
  {
    ExecutorService service = Executors.newSingleThreadExecutor();

    class GetHistory implements
            Callable<HistoryResponse>
    {
      private Set<String> filesWatchList;
      GetHistory(Set<String> files)
      {
        this.filesWatchList = files;
      }
      String beginMark = null;

      public HistoryResponse call()
              throws Exception
      {
        HistoryResponse filesHistory = null;
        while (true)
        {
          Thread.sleep(500);
          HistoryResponse response = manager.getHistory(null, null, beginMark);
          if (response.getNextBeginMark() != null)
          {
            beginMark = response.getNextBeginMark();
          }
          if (response != null && response.files != null)
          {
            for (HistoryResponse.FileEntry entry : response.files)
            {
              //if we have a complete file that we've
              // loaded with the same name..
              String filename = entry.getPath();
              if (entry.getPath() != null && entry.isComplete() &&
                      filesWatchList.contains(filename))
              {
                if (filesHistory == null)
                {
                  filesHistory = new HistoryResponse();
                  filesHistory.setPipe(response.getPipe());
                }
                filesHistory.files.add(entry);
                filesWatchList.remove(filename);
                //we can return true!
                if (filesWatchList.isEmpty()) {
                  return filesHistory;
                }
              }
            }
          }
        }
      }
    }

    GetHistory historyCaller = new GetHistory(files);
    //fork off waiting for a load to the service
    Future<HistoryResponse> result = service.submit(historyCaller);

    HistoryResponse response = result.get(2, TimeUnit.MINUTES);
    return response;
  }

  public static void main(String[] args)
  {
    final String host = "<account_identifier>.snowflakecomputing.com";
    final String user = "<user_login_name>";
    final String pipe = "<db_name>.<schema_name>.<pipe_name>";
    try
    {
      final long oneHourMillis = 1000 * 3600L;
      String startTime = Instant
              .ofEpochMilli(System.currentTimeMillis() - 4 * oneHourMillis).toString();
      final PrivateKey privateKey = PrivateKeyReader.get(PRIVATE_KEY_FILE);
      SimpleIngestManager manager = new SimpleIngestManager(host.split("\.")[0], user, pipe, privateKey, "https", host, 443);
      List<StagedFileWrapper> files = new ArrayList<>();
      // Add the paths and sizes the files that you want to load.
      // Use paths that are relative to the stage where the files are located
      // (the stage that is specified in the pipe definition)..
      files.add(new StagedFileWrapper("<path>/<filename>", <file_size_in_bytes> /* file size is optional but recommended, pass null when it is not available */));
      files.add(new StagedFileWrapper("<path>/<filename>", <file_size_in_bytes> /* file size is optional but recommended, pass null when it is not available */));
      ...
      manager.ingestFiles(files, null);
      HistoryResponse history = waitForFilesHistory(manager, files);
      System.out.println("Received history response: " + history.toString());
      String endTime = Instant
              .ofEpochMilli(System.currentTimeMillis()).toString();

      HistoryRangeResponse historyRangeResponse =
              manager.getHistoryRange(null,
                                      startTime,
                                      endTime);
      System.out.println("Received history range response: " +
                                 historyRangeResponse.toString());

    }
    catch (Exception e)
    {
      e.printStackTrace();
    }

  }
}
Copy

Este exemplo utiliza as APIs Bouncy Castle Crypto. A fim de compilar e executar esse exemplo, você deve incluir os seguintes arquivos JAR em seu classpath:

  • o arquivo JAR do provedor (bcprov-jdkversions.jar)

  • o arquivo JAR PKIX / CMS / EAC / PKCS / OCSP / TSP / OPENSSL (bcpkix-jdkversions.jar)

onde versions especifica as versões do arquivo JDK compatíveis com o arquivo JAR.

Antes de compilar o código de amostra, substitua os seguintes valores de espaço reservado:

PRIVATE_KEY_FILE = "/<caminho>/rsa_key.p8"

Especifique o caminho local para o arquivo de chave privada que você criou em Uso da autenticação de par de chaves e rodízio de chaves (em Preparação para carregamento de dados usando a API REST Snowpipe).

return "<senha_de_chave_privada>" em getPrivateKeyPassphrase()

Se você gerou uma chave criptografada, implemente o método getPrivateKeyPassphrase() para retornar a senha para descriptografar essa chave.

host = "<identificador_de_conta>.snowflakecomputing.com"

Especifique as informações de seu host na forma de um URL.

O formato preferido do identificador de conta é o seguinte:

organization_name-account_name

Nomes de sua conta e organização no Snowflake. Para obter mais detalhes, consulte Formato 1 (preferido): Nome da conta em sua organização.

Alternativamente, especifique seu localizador de contas, juntamente com a região e a plataforma de nuvem onde a conta é hospedada, se necessário. Para obter mais detalhes, consulte Formato 2 (legado): Localizador de conta em uma região.

user = "<nome_de_login_de_usuário>"

Especifique seu nome de login Snowflake.

pipe = "<nome_db>.<nome_de_esquema>.<nome_de_canal>"

Especifique o nome totalmente qualificado do canal a ser usado para carregar os dados.

files.add("<caminho>/<nome_de_arquivo>", <tamanho_do_arquivo_em_bytes>)

Especifique o caminho para seus arquivos a serem carregados na lista de objetos de arquivo.

Opcionalmente, especifique o tamanho de cada arquivo, em bytes, para evitar atrasos quando o Snowpipe calcular as operações necessárias para carregar os dados.

O caminho que você especificar deve ser relativo ao estágio onde os arquivos estão localizados. Inclua o nome completo de cada arquivo, incluindo a extensão do arquivo. Por exemplo, um arquivo CSV que é comprimido com gzip pode ter a extensão .csv.gz.

Programa de amostra para o SDK Python

from logging import getLogger
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.utils.uris import DEFAULT_SCHEME
from datetime import timedelta
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
import time
import datetime
import os
import logging

logging.basicConfig(
        filename='/tmp/ingest.log',
        level=logging.DEBUG)
logger = getLogger(__name__)

# If you generated an encrypted private key, implement this method to return
# the passphrase for decrypting your private key.
def get_private_key_passphrase():
  return '<private_key_passphrase>'

with open("/<private_key_path>/rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  get_private_key_passphrase().encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

ingest_manager = SimpleIngestManager(account='<account_identifier>',
                                     host='<account_identifier>.snowflakecomputing.com',
                                     user='<user_login_name>',
                                     pipe='<db_name>.<schema_name>.<pipe_name>',
                                     private_key=private_key_text)
# List of files, but wrapped into a class
staged_file_list = [
  StagedFile('<path>/<filename>', <file_size_in_bytes>),  # file size is optional but recommended, pass None if not available
  StagedFile('<path>/<filename>', <file_size_in_bytes>),  # file size is optional but recommended, pass None if not available
  ...
  ]

try:
    resp = ingest_manager.ingest_files(staged_file_list)
except HTTPError as e:
    # HTTP error, may need to retry
    logger.error(e)
    exit(1)

# This means Snowflake has received file and will start loading
assert(resp['responseCode'] == 'SUCCESS')

# Needs to wait for a while to get result in history
while True:
    history_resp = ingest_manager.get_history()

    if len(history_resp['files']) > 0:
        print('Ingest Report:\n')
        print(history_resp)
        break
    else:
        # wait for 20 seconds
        time.sleep(20)

    hour = timedelta(hours=1)
    date = datetime.datetime.utcnow() - hour
    history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z')

    print('\nHistory scan report: \n')
    print(history_range_resp)
Copy

Antes de executar o código de amostra, substitua os seguintes valores de espaço reservado:

<caminho_de_chave_privada>

Especifique o caminho local para o arquivo de chave privada que você criou em Uso da autenticação de par de chaves e rodízio de chaves (em Preparação para carregamento de dados usando a API REST Snowpipe).

return "<senha_de_chave_privada>" em get_private_key_passphrase()

Se você gerou uma chave criptografada, implemente a função get_private_key_passphrase() para retornar a senha para descriptografar essa chave.

account='<identificador_de_conta>'

Especifique o identificador único para sua conta (fornecido pela Snowflake). Consulte a descrição host.

host='<identificador_de_conta>.snowflakecomputing.com'

Especifique o nome de host exclusivo de sua conta Snowflake.

O formato preferido do identificador de conta é o seguinte:

organization_name-account_name

Nomes de sua conta e organização no Snowflake. Para obter mais detalhes, consulte Formato 1 (preferido): Nome da conta em sua organização.

Alternativamente, especifique seu localizador de contas, juntamente com a região e a plataforma de nuvem onde a conta é hospedada, se necessário. Para obter mais detalhes, consulte Formato 2 (legado): Localizador de conta em uma região.

user='<user_login_name>'

Especifique seu nome de login Snowflake.

pipe='<db_name>.<schema_name>.<pipe_name>'

Especifique o nome totalmente qualificado do canal a ser usado para carregar os dados.

file_list=['<path>/<filename>', '<path>/<filename>'] | staged_file_list=[StagedFile('<path>/<filename>', <file_size_in_bytes>), StagedFile('<path>/<filename>', <file_size_in_bytes>)]

Especifique o caminho para seus arquivos a serem carregados na lista de objetos de arquivo.

O caminho que você especificar deve ser relativo ao estágio onde os arquivos estão localizados. Inclua o nome completo de cada arquivo, incluindo a extensão do arquivo. Por exemplo, um arquivo CSV que é comprimido com gzip pode ter a extensão .csv.gz.

Opcionalmente, especifique o tamanho de cada arquivo, em bytes, para evitar atrasos quando o Snowpipe calcular as operações necessárias para carregar os dados.

Visualização do histórico de carregamento

O Snowflake fornece pontos de extremidade REST e uma função de tabela Snowflake Information Schema para visualizar seu histórico de carregamento:

Observe que a consulta da função de tabela do Information Schema ou da exibição do Account Usage, ao contrário de chamar os pontos de extremidade REST, requer um warehouse em funcionamento.

Eliminação de arquivos preparados

Exclua os arquivos preparados depois de carregar os dados com sucesso e não precisar mais dos arquivos. Para obter instruções, consulte Eliminação de arquivos preparados depois que o Snowpipe carrega os dados.