Criação de um pipeline de processamento de dados usando uma tabela de diretório

Você pode combinar uma tabela de diretório, que rastreia e armazena metadados em nível de arquivo em um estágio, com outros objetos Snowflake, como fluxos e tarefas, para construir um pipeline de processamento de dados.

Um fluxo registra as alterações na linguagem de manipulação de dados (DML) feitas em uma tabela de diretório, tabela, tabela externa ou as tabelas subjacentes em uma exibição. Uma tarefa executa uma única ação, que pode ser um comando SQL ou uma UDF extensa. Você pode agendar uma tarefa ou executá-la sob demanda.

Exemplo: criar um pipeline simples para processar PDFs

Este exemplo cria um pipeline de processamento de dados simples que faz o seguinte:

  1. Detecta arquivos PDF adicionados a um estágio.

  2. Extrai dados dos arquivos.

  3. Insere os dados em uma tabela Snowflake.

O pipeline usa um fluxo para detectar alterações em uma tabela de diretório no estágio e uma tarefa que executa uma função definida pelo usuário (UDF) para processar os arquivos.

O diagrama a seguir resume como funciona o pipeline de exemplo:

Um pipeline de processamento de dados simples que usa um fluxo para rastrear alterações em uma tabela de diretórios.

Etapa 1: criar um estágio com uma tabela de diretório habilitada

Crie um estágio interno com uma tabela de diretórios habilitada. A instrução de exemplo define o tipo ENCRYPTION como SNOWFLAKE_SSE para permitir o acesso a dados não estruturados no estágio.

CREATE OR REPLACE STAGE my_pdf_stage
  ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE')
  DIRECTORY = ( ENABLE = TRUE);
Copy

Etapa 2: criar um fluxo na tabela de diretório

Em seguida, crie um fluxo na tabela de diretório especificando o estágio ao qual a tabela de diretório pertence. O fluxo rastreará as alterações na tabela de diretório. Na etapa 5 deste exemplo, usamos esse fluxo para construir uma tarefa.

CREATE STREAM my_pdf_stream ON STAGE my_pdf_stage;
Copy

Etapa 3: criar uma função definida pelo usuário para analisar PDFs

Crie uma função definida pelo usuário (UDF) que extrai dados de arquivos PDF. A tarefa que criamos na etapa 5 chamará a UDF para processar arquivos recém-adicionados no estágio.

A instrução de exemplo a seguir cria uma UDF chamada PDF_PARSE que processa arquivos PDF contendo dados de avaliação do produto. A UDF extrai dados de campos de formulário usando a biblioteca PyPDF2. Ela retorna um dicionário que contém os nomes e valores dos formulários como pares chave-valor.

Nota

A UDF lê arquivos especificados dinamicamente usando a classe SnowflakeFile. Para saber mais sobre SnowflakeFile, consulte Como ler um arquivo especificado dinamicamente com SnowflakeFile.

CREATE OR REPLACE FUNCTION PDF_PARSE(file_path string)
  RETURNS VARIANT
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.8'
  HANDLER = 'parse_pdf_fields'
  PACKAGES=('typing-extensions','PyPDF2','snowflake-snowpark-python')
  AS
  $$
  from pathlib import Path
  import PyPDF2 as pypdf
  from io import BytesIO
  from snowflake.snowpark.files import SnowflakeFile

  def parse_pdf_fields(file_path):
      with SnowflakeFile.open(file_path, 'rb') as f:
        buffer = BytesIO(f.readall())
      reader = pypdf.PdfFileReader(buffer)
      fields = reader.getFields()
      field_dict = {}
      for k, v in fields.items():
          if "/V" in v.keys():
              field_dict[v["/T"]] = v["/V"].replace("/", "") if v["/V"].startswith("/") else v["/V"]

      return field_dict
  $$;
Copy

Etapa 4: criar uma tabela para armazenar o conteúdo do arquivo

Em seguida, crie uma tabela onde cada linha armazene informações sobre um arquivo no estágio em colunas denominadas file_name e file_data. A tarefa que criamos na etapa 5 deste exemplo carregará dados nesta tabela.

CREATE OR REPLACE TABLE prod_reviews (
  file_name varchar,
  file_data variant
);
Copy

Etapa 5: criar uma tarefa

Crie uma tarefa agendada que verifique o fluxo em busca de novos arquivos no estágio e insira os dados do arquivo na tabela prod_reviews.

A instrução a seguir cria uma tarefa agendada usando o fluxo criado na etapa 2. A tarefa usa a função SYSTEM$STREAM_HAS_DATA para verificar se o fluxo contém registros de captura de dados de alterações (CDC).

CREATE OR REPLACE TASK load_new_file_data
  WAREHOUSE = 'MY_WAREHOUSE'
  SCHEDULE = '1 minute'
  COMMENT = 'Process new files on the stage and insert their data into the prod_reviews table.'
  WHEN
  SYSTEM$STREAM_HAS_DATA('my_pdf_stream')
  AS
  INSERT INTO prod_reviews (
    SELECT relative_path as file_name,
    PDF_PARSE(build_scoped_file_url('@my_pdf_stage', relative_path)) as file_data
    FROM my_pdf_stream
    WHERE METADATA$ACTION='INSERT'
  );
Copy

Etapa 6: executar a tarefa para testar o pipeline

Para verificar se o pipeline funciona, você pode adicionar arquivos ao estágio, executar manualmente a tarefa e consultar a tabela product_reviews.

Comece adicionando alguns arquivos PDF ao estágio my_pdf_stage e depois atualize o estágio.

Nota

Este exemplo usa comandos PUT, que não podem ser executados em uma planilha na interface da web do Snowflake. Para fazer upload de arquivos com Snowsight, consulte Carregamento de arquivos em um estágio interno nomeado.

PUT file:///my/file/path/prod_review1.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
PUT file:///my/file/path/prod_review2.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;

ALTER STAGE my_pdf_stage REFRESH;
Copy

Você pode consultar o fluxo para verificar se ele registrou os dois arquivos PDF que adicionamos ao estágio.

SELECT * FROM my_pdf_stream;
Copy

Agora, execute a tarefa para processar os arquivos PDF e atualizar a tabela product_reviews.

EXECUTE TASK load_new_file_data;
+----------------------------------------------------------+
| status                                                   |
|----------------------------------------------------------|
| Task LOAD_NEW_FILE_DATA is scheduled to run immediately. |
+----------------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.178s
Copy

Consulte a tabela product_reviews para ver se a tarefa adicionou uma linha para cada arquivo PDF.

select * from prod_reviews;
+------------------+----------------------------------+
| FILE_NAME        | FILE_DATA                        |
|------------------+----------------------------------|
| prod_review1.pdf | {                                |
|                  |   "FirstName": "John",           |
|                  |   "LastName": "Johnson",         |
|                  |   "Middle Name": "Michael",      |
|                  |   "Product": "Tennis Shoes",     |
|                  |   "Purchase Date": "03/15/2022", |
|                  |   "Recommend": "Yes"             |
|                  | }                                |
| prod_review2.pdf | {                                |
|                  |   "FirstName": "Emily",          |
|                  |   "LastName": "Smith",           |
|                  |   "Middle Name": "Ann",          |
|                  |   "Product": "Red Skateboard",   |
|                  |   "Purchase Date": "01/10/2023", |
|                  |   "Recommend": "MayBe"           |
|                  | }                                |
+------------------+----------------------------------+
Copy

Por fim, você pode criar uma exibição que analise os objetos da coluna FILE_DATA em colunas separadas. Você pode então consultar a exibição para analisar e trabalhar com o conteúdo do arquivo.

CREATE OR REPLACE VIEW prod_review_info_v
  AS
  WITH file_data
  AS (
      SELECT
        file_name
        , parse_json(file_data) AS file_data
      FROM prod_reviews
  )
  SELECT
      file_name
      , file_data:FirstName::varchar AS first_name
      , file_data:LastName::varchar AS last_name
      , file_data:"Middle Name"::varchar AS middle_name
      , file_data:Product::varchar AS product
      , file_data:"Purchase Date"::date AS purchase_date
      , file_data:Recommend::varchar AS recommended
      , build_scoped_file_url(@my_pdf_stage, file_name) AS scoped_review_url
  FROM file_data;

SELECT * FROM prod_review_info_v;


| FILE_NAME        | FIRST_NAME | LAST_NAME | MIDDLE_NAME | PRODUCT        | PURCHASE_DATE | RECOMMENDED | SCOPED_REVIEW_URL                                                                                                                                                                                                                                                                                                                                                                                                              |
||
| prod_review1.pdf | John       | Johnson   | Michael     | Tennis Shoes   | 2022-03-15    | Yes         | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/RZ4s%2bJLa6iHmLouHA79b94tg%2f3SDA%2bOQX01pAYo%2bl6gAxiLK8FGB%2bv8L2QSB51tWP%2fBemAbpFd%2btKfEgKibhCXN2QdMCNraOcC1uLdR7XV40JRIrB4gDYkpHxx3HpCSlKkqXeuBll%2fyZW9Dc6ZEtwF19GbnEBR9FwiUgyqWjqSf4KTmgWKv5gFCpxwqsQgofJs%2fqINOy%2bOaRPa%2b65gcnPpY2Dc1tGkJGC%2fT110Iw30cKuMGZ2HU%3d              |
| prod_review2.pdf | Emily      | Smith     | Ann         | Red Skateboard | 2023-01-10    | MayBe       | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/g3glgIbGik3VOmgcnltZxVNQed8%2fSBehlXbgdZBZqS1iAEsFPd8pkUNB1DSQEHoHfHcWLsaLblAdSpPIZm7wDwaHGvbeRbLit6nvE%2be2LHOsPR1UEJrNn83o%2fZyq4kVCIgKeSfMeGH2Gmrvi82JW%2fDOyZJITgCEZzpvWGC9Rmnr1A8vux47uZj9MYjdiN2Hho3uL9ExeFVo8FUtR%2fHkdCJKIzCRidD5oP55m9p2ml2yHOkDJW50%3d                            |

Copy