Criação de um pipeline de processamento de dados usando uma tabela de diretório¶
Você pode combinar uma tabela de diretório, que rastreia e armazena metadados em nível de arquivo em um estágio, com outros objetos Snowflake, como fluxos e tarefas, para construir um pipeline de processamento de dados.
Um fluxo registra as alterações na linguagem de manipulação de dados (DML) feitas em uma tabela de diretório, tabela, tabela externa ou as tabelas subjacentes em uma exibição. Uma tarefa executa uma única ação, que pode ser um comando SQL ou uma UDF extensa. Você pode agendar uma tarefa ou executá-la sob demanda.
Exemplo: criar um pipeline simples para processar PDFs¶
Este exemplo cria um pipeline de processamento de dados simples que faz o seguinte:
Detecta arquivos PDF adicionados a um estágio.
Extrai dados dos arquivos.
Insere os dados em uma tabela Snowflake.
O pipeline usa um fluxo para detectar alterações em uma tabela de diretório no estágio e uma tarefa que executa uma função definida pelo usuário (UDF) para processar os arquivos.
O diagrama a seguir resume como funciona o pipeline de exemplo:
Etapa 1: criar um estágio com uma tabela de diretório habilitada¶
Crie um estágio interno com uma tabela de diretórios habilitada. A instrução de exemplo define o tipo ENCRYPTION
como SNOWFLAKE_SSE
para permitir o acesso a dados não estruturados no estágio.
CREATE OR REPLACE STAGE my_pdf_stage
ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE')
DIRECTORY = ( ENABLE = TRUE);
Etapa 2: criar um fluxo na tabela de diretório¶
Em seguida, crie um fluxo na tabela de diretório especificando o estágio ao qual a tabela de diretório pertence. O fluxo rastreará as alterações na tabela de diretório. Na etapa 5 deste exemplo, usamos esse fluxo para construir uma tarefa.
CREATE STREAM my_pdf_stream ON STAGE my_pdf_stage;
Etapa 3: criar uma função definida pelo usuário para analisar PDFs¶
Crie uma função definida pelo usuário (UDF) que extrai dados de arquivos PDF. A tarefa que criamos na etapa 5 chamará a UDF para processar arquivos recém-adicionados no estágio.
A instrução de exemplo a seguir cria uma UDF chamada PDF_PARSE
que processa arquivos PDF contendo dados de avaliação do produto. A UDF extrai dados de campos de formulário usando a biblioteca PyPDF2. Ela retorna um dicionário que contém os nomes e valores dos formulários como pares chave-valor.
Nota
A UDF lê arquivos especificados dinamicamente usando a classe SnowflakeFile
. Para saber mais sobre SnowflakeFile
, consulte Como ler um arquivo especificado dinamicamente com SnowflakeFile.
CREATE OR REPLACE FUNCTION PDF_PARSE(file_path string)
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'parse_pdf_fields'
PACKAGES=('typing-extensions','PyPDF2','snowflake-snowpark-python')
AS
$$
from pathlib import Path
import PyPDF2 as pypdf
from io import BytesIO
from snowflake.snowpark.files import SnowflakeFile
def parse_pdf_fields(file_path):
with SnowflakeFile.open(file_path, 'rb') as f:
buffer = BytesIO(f.readall())
reader = pypdf.PdfFileReader(buffer)
fields = reader.getFields()
field_dict = {}
for k, v in fields.items():
if "/V" in v.keys():
field_dict[v["/T"]] = v["/V"].replace("/", "") if v["/V"].startswith("/") else v["/V"]
return field_dict
$$;
Etapa 4: criar uma tabela para armazenar o conteúdo do arquivo¶
Em seguida, crie uma tabela onde cada linha armazene informações sobre um arquivo no estágio em colunas denominadas file_name
e file_data
. A tarefa que criamos na etapa 5 deste exemplo carregará dados nesta tabela.
CREATE OR REPLACE TABLE prod_reviews (
file_name varchar,
file_data variant
);
Etapa 5: criar uma tarefa¶
Crie uma tarefa agendada que verifique o fluxo em busca de novos arquivos no estágio e insira os dados do arquivo na tabela prod_reviews
.
A instrução a seguir cria uma tarefa agendada usando o fluxo criado na etapa 2. A tarefa usa a função SYSTEM$STREAM_HAS_DATA para verificar se o fluxo contém registros de captura de dados de alterações (CDC).
CREATE OR REPLACE TASK load_new_file_data
WAREHOUSE = 'MY_WAREHOUSE'
SCHEDULE = '1 minute'
COMMENT = 'Process new files on the stage and insert their data into the prod_reviews table.'
WHEN
SYSTEM$STREAM_HAS_DATA('my_pdf_stream')
AS
INSERT INTO prod_reviews (
SELECT relative_path as file_name,
PDF_PARSE(build_scoped_file_url('@my_pdf_stage', relative_path)) as file_data
FROM my_pdf_stream
WHERE METADATA$ACTION='INSERT'
);
Etapa 6: executar a tarefa para testar o pipeline¶
Para verificar se o pipeline funciona, você pode adicionar arquivos ao estágio, executar manualmente a tarefa e consultar a tabela product_reviews
.
Comece adicionando alguns arquivos PDF ao estágio my_pdf_stage
e depois atualize o estágio.
Nota
Este exemplo usa comandos PUT, que não podem ser executados em uma planilha na interface da web do Snowflake. Para fazer upload de arquivos com Snowsight, consulte Carregamento de arquivos em um estágio interno nomeado.
PUT file:///my/file/path/prod_review1.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
PUT file:///my/file/path/prod_review2.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
ALTER STAGE my_pdf_stage REFRESH;
Você pode consultar o fluxo para verificar se ele registrou os dois arquivos PDF que adicionamos ao estágio.
SELECT * FROM my_pdf_stream;
Agora, execute a tarefa para processar os arquivos PDF e atualizar a tabela product_reviews
.
EXECUTE TASK load_new_file_data;
+----------------------------------------------------------+
| status |
|----------------------------------------------------------|
| Task LOAD_NEW_FILE_DATA is scheduled to run immediately. |
+----------------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.178s
Consulte a tabela product_reviews
para ver se a tarefa adicionou uma linha para cada arquivo PDF.
select * from prod_reviews;
+------------------+----------------------------------+
| FILE_NAME | FILE_DATA |
|------------------+----------------------------------|
| prod_review1.pdf | { |
| | "FirstName": "John", |
| | "LastName": "Johnson", |
| | "Middle Name": "Michael", |
| | "Product": "Tennis Shoes", |
| | "Purchase Date": "03/15/2022", |
| | "Recommend": "Yes" |
| | } |
| prod_review2.pdf | { |
| | "FirstName": "Emily", |
| | "LastName": "Smith", |
| | "Middle Name": "Ann", |
| | "Product": "Red Skateboard", |
| | "Purchase Date": "01/10/2023", |
| | "Recommend": "MayBe" |
| | } |
+------------------+----------------------------------+
Por fim, você pode criar uma exibição que analise os objetos da coluna FILE_DATA
em colunas separadas. Você pode então consultar a exibição para analisar e trabalhar com o conteúdo do arquivo.
CREATE OR REPLACE VIEW prod_review_info_v
AS
WITH file_data
AS (
SELECT
file_name
, parse_json(file_data) AS file_data
FROM prod_reviews
)
SELECT
file_name
, file_data:FirstName::varchar AS first_name
, file_data:LastName::varchar AS last_name
, file_data:"Middle Name"::varchar AS middle_name
, file_data:Product::varchar AS product
, file_data:"Purchase Date"::date AS purchase_date
, file_data:Recommend::varchar AS recommended
, build_scoped_file_url(@my_pdf_stage, file_name) AS scoped_review_url
FROM file_data;
SELECT * FROM prod_review_info_v;
+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| FILE_NAME | FIRST_NAME | LAST_NAME | MIDDLE_NAME | PRODUCT | PURCHASE_DATE | RECOMMENDED | SCOPED_REVIEW_URL |
|------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| prod_review1.pdf | John | Johnson | Michael | Tennis Shoes | 2022-03-15 | Yes | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/RZ4s%2bJLa6iHmLouHA79b94tg%2f3SDA%2bOQX01pAYo%2bl6gAxiLK8FGB%2bv8L2QSB51tWP%2fBemAbpFd%2btKfEgKibhCXN2QdMCNraOcC1uLdR7XV40JRIrB4gDYkpHxx3HpCSlKkqXeuBll%2fyZW9Dc6ZEtwF19GbnEBR9FwiUgyqWjqSf4KTmgWKv5gFCpxwqsQgofJs%2fqINOy%2bOaRPa%2b65gcnPpY2Dc1tGkJGC%2fT110Iw30cKuMGZ2HU%3d |
| prod_review2.pdf | Emily | Smith | Ann | Red Skateboard | 2023-01-10 | MayBe | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/g3glgIbGik3VOmgcnltZxVNQed8%2fSBehlXbgdZBZqS1iAEsFPd8pkUNB1DSQEHoHfHcWLsaLblAdSpPIZm7wDwaHGvbeRbLit6nvE%2be2LHOsPR1UEJrNn83o%2fZyq4kVCIgKeSfMeGH2Gmrvi82JW%2fDOyZJITgCEZzpvWGC9Rmnr1A8vux47uZj9MYjdiN2Hho3uL9ExeFVo8FUtR%2fHkdCJKIzCRidD5oP55m9p2ml2yHOkDJW50%3d |
+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+