Como trabalhar com DataFrames no Snowpark Java

No Snowpark, a principal forma de consultar e processar dados é através de um DataFrame. Este tópico explica como trabalhar com DataFrames.

Neste tópico:

Para obter e manipular dados, use a classe DataFrame. Um DataFrame representa um conjunto de dados relacionais que é avaliado lentamente: ele só é executado quando uma ação específica é acionada. Em certo sentido, um DataFrame é como uma consulta que precisa ser avaliada para obter dados.

Para obter dados em um DataFrame:

  1. Construa um DataFrame especificando a fonte dos dados para o conjunto de dados.

    Por exemplo, você pode criar um DataFrame para guardar dados de uma tabela, um arquivo CSV externo ou a execução de uma instrução SQL.

  2. Especifique como o conjunto de dados no DataFrame deve ser transformado.

    Por exemplo, você pode especificar quais colunas devem ser selecionadas, como as linhas devem ser filtradas, como os resultados devem ser ordenados e agrupados, etc.

  3. Execute a instrução para obter os dados para o DataFrame.

    Para obter os dados no DataFrame, você deve invocar um método que execute uma ação (por exemplo, o método collect()).

As próximas seções explicam essas etapas com mais detalhes.

Como estabelecer os exemplos para esta seção

Alguns exemplos desta seção utilizam um DataFrame para consultar uma tabela chamada sample_product_data. Se você quiser executar esses exemplos, você pode criar essa tabela e preenchê-la com alguns dados executando as seguintes instruções SQL:

CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT, amount NUMBER(12, 2), quantity INT, product_date DATE);
INSERT INTO sample_product_data VALUES
    (1, 0, 5, 'Product 1', 'prod-1', 1, 10, 1.00, 15, TO_DATE('2021.01.01', 'YYYY.MM.DD')),
    (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20, 2.00, 30, TO_DATE('2021.02.01', 'YYYY.MM.DD')),
    (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30, 3.00, 45, TO_DATE('2021.03.01', 'YYYY.MM.DD')),
    (4, 0, 10, 'Product 2', 'prod-2', 2, 40, 4.00, 60, TO_DATE('2021.04.01', 'YYYY.MM.DD')),
    (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50, 5.00, 75, TO_DATE('2021.05.01', 'YYYY.MM.DD')),
    (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60, 6.00, 90, TO_DATE('2021.06.01', 'YYYY.MM.DD')),
    (7, 0, 20, 'Product 3', 'prod-3', 3, 70, 7.00, 105, TO_DATE('2021.07.01', 'YYYY.MM.DD')),
    (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80, 7.25, 120, TO_DATE('2021.08.01', 'YYYY.MM.DD')),
    (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90, 7.50, 135, TO_DATE('2021.09.01', 'YYYY.MM.DD')),
    (10, 0, 50, 'Product 4', 'prod-4', 4, 100, 7.75, 150, TO_DATE('2021.10.01', 'YYYY.MM.DD')),
    (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100, 8.00, 165, TO_DATE('2021.11.01', 'YYYY.MM.DD')),
    (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100, 8.50, 180, TO_DATE('2021.12.01', 'YYYY.MM.DD'));
Copy

Para verificar se a tabela foi criada, execute:

SELECT * FROM sample_product_data;
Copy

Como criar um DataFrame

Para construir um DataFrame, você pode usar métodos na classe Session. Cada um dos métodos a seguir constrói um DataFrame a partir de um tipo diferente de fonte de dados:

  • Para criar um DataFrame a partir de dados em uma tabela, exibição ou fluxo, chame o método table:

    // Create a DataFrame from the data in the "sample_product_data" table.
    DataFrame dfTable = session.table("sample_product_data");
    
    // Print out the first 10 rows.
    dfTable.show();
    
    Copy

    Nota

    O método table retorna um objeto Updatable. Updatable estende DataFrame e fornece métodos adicionais para trabalhar com dados na tabela (por exemplo, métodos para atualização e exclusão de dados). Consulte Atualização, eliminação e fusão de linhas em uma tabela.

  • Para criar um DataFrame a partir de valores especificados:

    1. Construa um conjunto de objetos Row que contenham os valores.

    2. Construa um objeto StructType que descreva os tipos de dados desses valores.

    3. Chame o método createDataFrame passando a array o objeto StructType.

     // Import name from the types package, which contains StructType and StructField.
    import com.snowflake.snowpark_java.types.*;
    ...
    
     // Create a DataFrame containing specified values.
     Row[] data = {Row.create(1, "a"), Row.create(2, "b")};
     StructType schema =
       StructType.create(
         new StructField("num", DataTypes.IntegerType),
         new StructField("str", DataTypes.StringType));
     DataFrame df = session.createDataFrame(data, schema);
    
     // Print the contents of the DataFrame.
     df.show();
    
    Copy

    Nota

    As palavras reservadas pelo Snowflake não são válidas como nomes de colunas na criação de um DataFrame. Para obter uma lista de palavras reservadas, consulte Palavras-chave reservadas e limitadas.

  • Para criar um DataFrame contendo um intervalo de valores, chame o método range:

    // Create a DataFrame from a range
    DataFrame dfRange = session.range(1, 10, 2);
    
    // Print the contents of the DataFrame.
    dfRange.show();
    
    Copy
  • Para criar um DataFrame para um arquivo em um estágio, chame read para obter um objeto DataFrameReader. No objeto DataFrameReader, chame o método correspondente ao formato dos dados no arquivo:

    // Create a DataFrame from data in a stage.
    DataFrame dfJson = session.read().json("@mystage2/data1.json");
    
    // Print the contents of the DataFrame.
    dfJson.show();
    
    Copy
  • Para criar um DataFrame para armazenar os resultados de uma consulta SQL, chame o método sql:

    // Create a DataFrame from a SQL query
    DataFrame dfSql = session.sql("SELECT name from sample_product_data");
    
    // Print the contents of the DataFrame.
    dfSql.show();
    
    Copy

    Nota: Embora você possa usar este método para executar instruções SELECT que obtêm dados de tabelas e arquivos preparados, você deve em vez disso usar os métodos table e read. Métodos como table e read podem fornecer melhor realce de sintaxe, realce de erros e complementação inteligente de código em ferramentas de desenvolvimento.

Especificação de como o conjunto de dados deve ser transformado

Para especificar quais colunas devem ser selecionadas e como os resultados devem ser filtrados, ordenados, agrupados, etc., chame os métodos de DataFrame que transformam o conjunto de dados. Para identificar colunas nesses métodos, use o método estático Functions.col ou uma expressão que avalie para uma coluna. (Consulte Como especificar colunas e expressões.)

Por exemplo:

  • Para especificar quais linhas devem ser retornadas, chame o método filter:

    // Create a DataFrame for the rows with the ID 1
    // in the "sample_product_data" table.
    DataFrame df = session.table("sample_product_data").filter(
      Functions.col("id").equal_to(Functions.lit(1)));
    df.show();
    
    Copy
  • Para especificar as colunas que devem ser selecionadas, chame o método select:

    // Create a DataFrame that contains the id, name, and serial_number
    // columns in te "sample_product_data" table.
    DataFrame df = session.table("sample_product_data").select(
      Functions.col("id"), Functions.col("name"), Functions.col("serial_number"));
    df.show();
    
    Copy

Cada método retorna um novo objeto DataFrame que foi transformado. (O método não afeta o objeto DataFrame original). Isso significa que se você quiser aplicar múltiplas transformações, você pode encadear chamadas a métodos, chamando cada método de transformação subsequente sobre o novo objeto DataFrame retornado pela chamada ao método anterior.

Observe que esses métodos de transformação não obtêm dados do banco de dados Snowflake. (Os métodos de ação descritos em Como executar uma ação para avaliar um DataFrame realizam a recuperação de dados). Os métodos de transformação simplesmente especificam como a instrução SQL deve ser construída.

Como especificar colunas e expressões

Ao chamar esses métodos de transformação, pode ser necessário especificar colunas ou expressões que utilizam colunas. Por exemplo, ao chamar o método select, você precisa especificar as colunas que devem ser selecionadas.

Para se referir a uma coluna, crie um objeto Column chamando o método estático Functions.col.

DataFrame dfProductInfo = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
dfProductInfo.show();
Copy

Nota

Para criar um objeto Column para um literal, consulte Como usar literais como objetos de coluna.

Ao especificar um filtro, projeção, condição de junção, etc., você pode usar objetos Column em uma expressão. Por exemplo:

  • Você pode usar objetos Column com o método filter para especificar uma condição de filtro:

    // Specify the equivalent of "WHERE id = 12"
    // in an SQL SELECT statement.
    DataFrame df = session.table("sample_product_data");
    df.filter(Functions.col("id").equal_to(Functions.lit(12))).show();
    
    Copy
    // Specify the equivalent of "WHERE key + category_id < 10"
    // in an SQL SELECT statement.
    DataFrame df2 = session.table("sample_product_data");
    df2.filter(Functions.col("key").plus(Functions.col("category_id")).lt(Functions.lit(10))).show();
    
    Copy
  • Você pode usar objetos Column com o método select para definir um alias:

    // Specify the equivalent of "SELECT key * 10 AS c"
    // in an SQL SELECT statement.
    DataFrame df3 = session.table("sample_product_data");
    df3.select(Functions.col("key").multiply(Functions.lit(10)).as("c")).show();
    
    Copy
  • Você pode usar objetos Column com o método join para definir uma condição de junção:

    // Specify the equivalent of "sample_a JOIN sample_b on sample_a.id_a = sample_b.id_a"
    // in an SQL SELECT statement.
    DataFrame dfLhs = session.table("sample_a");
    DataFrame dfRhs = session.table("sample_b");
    DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")));
    dfJoined.show();
    
    Copy

Como se referir a colunas em diferentes DataFrames

Ao se referir a colunas em dois objetos DataFrame diferentes com o mesmo nome (por exemplo, unindo os DataFrames naquela coluna), pode-se usar o método col em cada objeto DataFrame para se referir a uma coluna naquele objeto (por exemplo, df1.col("name") e df2.col("name")).

O exemplo a seguir demonstra como usar o método col para se referir a uma coluna em um DataFrame específico. O exemplo une dois objetos DataFrame, ambos contendo uma coluna chamada value. O exemplo usa o método as do objeto Column para mudar os nomes das colunas no DataFrame recém-criado.

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a"))).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"));
dfJoined.show();
Copy

Como utilizar aspas duplas em torno de identificadores de objetos (nomes de tabela, nomes de coluna, etc.)

Os nomes de bancos de dados, esquemas, tabelas e estágios que você especificar devem estar em conformidade com requisitos de identificadores do Snowflake. Quando você especifica um nome, o Snowflake considera que o nome está em maiúsculas. Por exemplo, as chamadas a seguir são equivalentes:

// The following calls are equivalent:
df.select(Functions.col("id123"));
df.select(Functions.col("ID123"));
Copy

Se o nome não estiver de acordo com os requisitos de identificador, deve-se usar aspas duplas (") em torno do nome. Use uma barra invertida (\) para escapar o caractere de aspas duplas dentro de uma cadeia de caracteres literal Scala. Por exemplo, como o nome da tabela a seguir não começa com uma letra ou um sublinhado, você deve usar aspas duplas em torno do nome:

DataFrame df = session.table("\"10tablename\"");
Copy

Observe que ao especificar o nome de uma coluna, você não precisa usar aspas duplas em torno do nome. A biblioteca do Snowpark coloca automaticamente o nome da coluna entre aspas duplas caso o nome não cumpra com os requisitos de identificador:

// The following calls are equivalent:
df.select(Functions.col("3rdID"));
df.select(Functions.col("\"3rdID\""));

// The following calls are equivalent:
df.select(Functions.col("id with space"));
df.select(Functions.col("\"id with space\""));
Copy

Se você já tiver acrescentado aspas duplas em torno de um nome de coluna, a biblioteca não inserirá aspas duplas adicionais em torno do nome.

Em alguns casos, o nome da coluna pode conter caracteres de aspas duplas:

describe table quoted;
+------------------------+ ...
| name                   | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted"   | ...
+------------------------+ ...
Copy

Como explicado em Requisitos para identificadores, para cada caractere de aspas duplas dentro de um identificador de aspas duplas, deve-se usar dois caracteres de aspas duplas (por exemplo, "name_with_""air""_quotes" e """column_name_quoted"""):

DataFrame dfTable = session.table("quoted");
dfTable.select("\"name_with_\"\"air\"\"_quotes\"");
dfTable.select("\"\"\"column_name_quoted\"\"\"");
Copy

Tenha em mente que quando um identificador é incluído entre aspas duplas (se você adicionou explicitamente as aspas ou se a biblioteca adicionou as aspas para você), o Snowflake trata o identificador diferenciando maiúsculas de minúsculas:

// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(Functions.col("id with space"));
df.select(Functions.col("ID WITH SPACE"));
Copy

Como usar literais como objetos de coluna

Para usar um literal em um método que passa um objeto Column, crie um objeto Column para o literal passando o literal para o método estático lit na classe Functions. Por exemplo:

// Show the first 10 rows in which category_id is greater than 5.
// Use `Functions.lit(5)` to create a Column object for the literal 5.
DataFrame df = session.table("sample_product_data");
df.filter(Functions.col("category_id").gt(Functions.lit(5))).show();
Copy

Se o literal é um ponto flutuante ou valor duplo em Java (por exemplo, 0.05 é tratado como Double por padrão), a biblioteca do Snowpark gera um SQL que implicitamente converte o valor para o tipo de dados correspondente do Snowpark (por exemplo, 0.05::DOUBLE). Isso pode produzir um valor aproximado que difira do número exato especificado.

Por exemplo, o código a seguir não exibe linhas correspondentes, mesmo que o filtro (que corresponde a valores maiores ou iguais a 0.05) deva corresponder às linhas do DataFrame:

// Create a DataFrame that contains the value 0.05.
DataFrame df = session.sql("select 0.05 :: Numeric(5, 2) as a");

// Applying this filter results in no matching rows in the DataFrame.
df.filter(Functions.col("a").leq(Functions.lit(0.06).minus(Functions.lit(0.01)))).show();
Copy

O problema é que Functions.lit(0.06) e Functions.lit(0.01) produzem valores aproximados para 0.06 e 0.01, não os valores exatos.

Para evitar esse problema, converta o literal para o tipo Snowpark que você deseja usar. Por exemplo, para usar um NUMBER com uma precisão de 5 e escala de 2:

import com.snowflake.snowpark_java.types.*;
...

df.filter(Functions.col("a").leq(Functions.lit(0.06).cast(DataTypes.createDecimalType(5, 2)).minus(Functions.lit(0.01).cast(DataTypes.createDecimalType(5, 2))))).show();
Copy

Como converter um objeto de coluna para um tipo específico

Para converter um objeto Column a um tipo específico, chame o método cast e passe um objeto do tipo com.snowflake.snowpark_java.types package. Por exemplo, para converter um literal para um NUMBER com uma precisão de 5 e escala de 2:

// Import for the DecimalType class..
import com.snowflake.snowpark_java.types.*;

Column decimalValue = Functions.lit(0.05).cast(DataTypes.createDecimalType(5,2));
Copy

Como encadear chamadas a métodos

Como cada método que transforma um objeto DataFrame returna um novo objeto DataFrame que tem a transformação aplicada, você pode encadear chamadas a métodos para produzir um novo DataFrame que é transformado de maneiras adicionais.

O exemplo a seguir retorna um DataFrame que está configurado para:

  • Consultar a tabela sample_product_data.

  • Retornar a linha com id = 1.

  • Selecionar as colunas name e serial_number.

DataFrame dfProductInfo = session.table("sample_product_data").filter(Functions.col("id").equal_to(Functions.lit(1))).select(Functions.col("name"), Functions.col("serial_number"));
dfProductInfo.show();
Copy

Neste exemplo:

  • session.table("sample_product_data") retorna um DataFrame para a tabela sample_product_data.

    Embora o DataFrame ainda não contenha os dados da tabela, o objeto contém as definições das colunas da tabela.

  • filter(Functions.col("id").equal_to(Functions.lit(1))) retorna um DataFrame para a tabela sample_product_data que está preparada para retornar a linha com id = 1.

    Observe novamente que o DataFrame ainda não contém a linha correspondente da tabela. A linha correspondente não é obtida até que você chame um método de ação.

  • select(Functions.col("name"), Functions.col("serial_number")) retorna um DataFrame que contém as colunas name e serial_number para a linha da tabela sample_product_data que tem id = 1.

Quando você fizer chamadas a métodos encadeadas, tenha em mente que a ordem das chamadas é importante. Cada chamada de método retorna um DataFrame que foi transformado. Certifique-se de que as chamadas subsequentes funcionem com os DataFrame transformados.

Por exemplo, no código abaixo, o método select retorna um DataFrame que contém apenas duas colunas: name e serial_number. A chamada ao método filter nesse DataFrame falha porque usa a coluna id, que não está no DataFrame transformado.

// This fails with the error "invalid identifier 'ID'."
DataFrame dfProductInfo = session.table("sample_product_data").select(Functions.col("name"), Functions.col("serial_number")).filter(Functions.col("id").equal_to(Functions.lit(1)));
dfProductInfo.show();
Copy

Em contraste, o seguinte código é executado com sucesso porque o método filter() é chamado em um DataFrame que contém todas as colunas da tabela sample_product_data (incluindo a coluna id):

// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
DataFrame dfProductInfo = session.table("sample_product_data").filter(Functions.col("id").equal_to(Functions.lit(1))).select(Functions.col("name"), Functions.col("serial_number"));
dfProductInfo.show();
Copy

Tenha em mente que talvez seja necessário fazer as chamadas ao método select e filter em uma ordem diferente daquela em que você usaria as palavras-chave equivalentes (SELECT e WHERE) em uma instrução SQL.

Como limitar o número de linhas em um DataFrame

Para limitar o número de linhas em um DataFrame, você pode usar o método de transformação limit.

A API do Snowpark também fornece métodos de ação para recuperar e imprimir um número limitado de linhas:

  • o método de ação first (para executar a consulta e retornar as primeiras n linhas)

  • o método de ação show (para executar a consulta e imprimir as primeiras n linhas)

Esses métodos efetivamente adicionam uma cláusula LIMIT à instrução SQL que é executada.

Como explicado nas notas de uso para LIMIT, os resultados não são determinísticos, a menos que você especifique uma ordem de classificação (ORDER BY) em conjunto com LIMIT.

Para manter a cláusula ORDER BY com a cláusula LIMIT (por exemplo, para que ORDER BY não esteja em uma subconsulta separada), você deve chamar o método que limita os resultados do DataFrame retornado pelo método sort.

Por exemplo, se você estiver encadeando chamadas a métodos:

DataFrame df = session.table("sample_product_data");

// Limit the number of rows to 5, sorted by parent_id.
DataFrame dfSubset = df.sort(Functions.col("parent_id")).limit(5);

// Return the first 5 rows, sorted by parent_id.
Row[] arrayOfRows = df.sort(Functions.col("parent_id")).first(5);

// Print the first 5 rows, sorted by parent_id.
df.sort(Functions.col("parent_id")).show(5);
Copy

Como obter definições das colunas

Para obter a definição das colunas no conjunto de dados para o DataFrame, chame o método schema. Esse método retorna um objeto StructType que contém uma Array de StructField objetos. Cada objeto StructField contém a definição de uma coluna.

import com.snowflake.snowpark_java.types.*;
...

// Get the StructType object that describes the columns in the
// underlying rowset.
StructType tableSchema = session.table("sample_product_data").schema();
System.out.println("Schema for sample_product_data: " + tableSchema);
Copy

No objeto devolvido StructType, os nomes das colunas são sempre normalizados. Identificadores sem aspas são retornados em maiúsculas, e identificadores entre aspas são devolvidos na caixa em que foram definidos.

O exemplo a seguir cria um DataFrame contendo as colunas ID e 3rd. Para o nome de coluna 3rd, a biblioteca do Snowpark coloca o nome automaticamente entre aspas duplas ("3rd") porque o nome não cumpre os requisitos para um identificador.

O exemplo chama o método schema e depois chama o método names no objeto retornado StructType para obter uma array de nomes de colunas. Os nomes são normalizados no método StructType retornado pelo método schema.

import java.util.Arrays;
...

// Create a DataFrame containing the "id" and "3rd" columns.
DataFrame dfSelectedColumns = session.table("sample_product_data").select(Functions.col("id"), Functions.col("3rd"));
// Print out the names of the columns in the schema.
System.out.println(Arrays.toString(dfSelectedColumns.schema().names()));
Copy

Junção de DataFrames

Para unir objetos DataFrame, chame o método join.

As seções a seguir explicam como usar DataFrames para realizar uma junção:

Configuração dos dados de exemplo para as junções

Os exemplos nas próximas seções utilizam dados de exemplo que você pode configurar executando as seguintes instruções SQL:

CREATE OR REPLACE TABLE sample_a (
  id_a INTEGER,
  name_a VARCHAR,
  value INTEGER
);
INSERT INTO sample_a (id_a, name_a, value) VALUES
  (10, 'A1', 5),
  (40, 'A2', 10),
  (80, 'A3', 15),
  (90, 'A4', 20)
;
CREATE OR REPLACE TABLE sample_b (
  id_b INTEGER,
  name_b VARCHAR,
  id_a INTEGER,
  value INTEGER
);
INSERT INTO sample_b (id_b, name_b, id_a, value) VALUES
  (4000, 'B1', 40, 10),
  (4001, 'B2', 10, 5),
  (9000, 'B3', 80, 15),
  (9099, 'B4', NULL, 200)
;
CREATE OR REPLACE TABLE sample_c (
  id_c INTEGER,
  name_c VARCHAR,
  id_a INTEGER,
  id_b INTEGER
);
INSERT INTO sample_c (id_c, name_c, id_a, id_b) VALUES
  (1012, 'C1', 10, NULL),
  (1040, 'C2', 40, 4000),
  (1041, 'C3', 40, 4001)
;
Copy

Como especificar as colunas para a junção

Com o método DataFrame.join, você pode especificar as colunas a serem usadas de uma das seguintes maneiras:

  • Especifique uma expressão de coluna que descreva a condição de junção.

  • Especifique uma ou mais colunas que devem ser usadas como colunas comuns na junção.

O exemplo a seguir realiza uma junção interna na coluna chamada id_a:

// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")));
dfJoined.show();
Copy

Observe que o exemplo usa o método DataFrame.col para especificar a condição a ser usada para a junção. Consulte Como especificar colunas e expressões para obter mais informações sobre este método.

Isso imprime a seguinte saída:

----------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |
----------------------------------------------------------------------
|10      |A1        |5        |4001    |B2        |10      |5        |
|40      |A2        |10       |4000    |B1        |40      |10       |
|80      |A3        |15       |9000    |B3        |80      |15       |
----------------------------------------------------------------------
Copy
Nomes de colunas idênticos duplicados no resultado da junção

No DataFrame resultante de uma junção, a biblioteca Snowpark utiliza os nomes das colunas encontradas nas tabelas que foram unidas, mesmo quando os nomes das colunas são idênticos entre tabelas. Quando isto acontece, estes nomes de coluna são duplicados no DataFrame resultante da junção. Para acessar uma coluna duplicada pelo nome, chame o método col no DataFrame que representa a tabela original da coluna. (Para obter mais informações sobre como especificar colunas, consulte Como se referir a colunas em diferentes DataFrames).

O código no exemplo a seguir une dois DataFrames, depois chama o método select no DataFrame unido. Ele especifica as colunas a serem selecionadas chamando o método col da variável que representa os respectivos objetos DataFrame: dfRhs e dfLhs. Ele usa o método as para dar às colunas novos nomes no DataFrame que o método select cria.

DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")));
DataFrame dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"));
dfSelected.show();
Copy

Isso imprime a seguinte saída:

------------------------------
|"LEFTVALUE"  |"RIGHTVALUE"  |
------------------------------
|5            |5             |
|10           |10            |
|15           |15            |
------------------------------
Copy
Eliminação de duplicação de colunas antes de salvar ou armazenar

Observe que quando um DataFrame resultante de uma junção inclui nomes de colunas duplicadas, você deve eliminar a duplicação ou renomear colunas para remover a duplicação no DataFrame antes de salvar o resultado em uma tabela ou armazenar o DataFrame em cache. Para nomes de colunas duplicadas em um DataFrame que você salva em uma tabela ou cache, a biblioteca do Snowpark substituirá os nomes de colunas duplicadas por alias para que não sejam mais duplicadas.

O exemplo seguinte ilustra como a saída de um DataFrame em cache poderia aparecer se os nomes das colunas ID_A e VALUE fossem duplicados em uma junção de duas tabelas, e depois não fossem deduplicados ou renomeados antes de armazenar o resultado em cache.

--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A"  |"NAME_A"  |"l_ZSz7_VALUE"  |"ID_B"  |"NAME_B"  |"r_heec_ID_A"  |"r_heec_VALUE"  |
--------------------------------------------------------------------------------------------------
|10             |A1        |5               |4001    |B2        |10             |5               |
|40             |A2        |10              |4000    |B1        |40             |10              |
|80             |A3        |15              |9000    |B3        |80             |15              |
--------------------------------------------------------------------------------------------------
Copy

Como realizar uma junção natural

Para realizar uma junção natural (onde DataFrames são unidas em colunas que têm o mesmo nome), chame o método .naturalJoin.

O exemplo a seguir junta os DataFrames para as tabelas sample_a e sample_b nas colunas comuns (a coluna id_a):

DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.naturalJoin(dfRhs);
dfJoined.show();
Copy

Isso imprime a seguinte saída:

---------------------------------------------------
|"ID_A"  |"VALUE"  |"NAME_A"  |"ID_B"  |"NAME_B"  |
---------------------------------------------------
|10      |5        |A1        |4001    |B2        |
|40      |10       |A2        |4000    |B1        |
|80      |15       |A3        |9000    |B3        |
---------------------------------------------------
Copy

Como especificar o tipo de junção

Por padrão, o método DataFrame.join cria uma junção interna. Para especificar um tipo diferente de junção, defina o argumento joinType para um dos seguintes valores:

Tipo de junção

joinType

Junção interna

inner (padrão)

Junção externa esquerda

left

Junção externa direita

right

Junção externa completa

full

Junção cruzada

cross

Por exemplo:

// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")), "left");
dfLeftOuterJoin.show();
Copy

Isso imprime a seguinte saída:

----------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |
----------------------------------------------------------------------
|40      |A2        |10       |4000    |B1        |40      |10       |
|10      |A1        |5        |4001    |B2        |10      |5        |
|80      |A3        |15       |9000    |B3        |80      |15       |
|90      |A4        |20       |NULL    |NULL      |NULL    |NULL     |
----------------------------------------------------------------------
Copy

Junção de tabelas múltiplas

Para juntar tabelas múltiplas:

  1. Crie um DataFrame para cada tabela.

  2. Chame o método DataFrame.join no primeiro DataFrame passando o segundo DataFrame.

  3. Usando o DataFrame retornado pelo método join, chame o método join, passando o terceiro DataFrame.

Você pode encadear as chamadas join como mostrado abaixo:

DataFrame dfFirst = session.table("sample_a");
DataFrame dfSecond  = session.table("sample_b");
DataFrame dfThird = session.table("sample_c");
DataFrame dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a").equal_to(dfSecond.col("id_a"))).join(dfThird, dfFirst.col("id_a").equal_to(dfThird.col("id_a")));
dfJoinThreeTables.show();
Copy

Isso imprime a seguinte saída:

------------------------------------------------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |"ID_C"  |"NAME_C"  |"ID_A"  |"ID_B"  |
------------------------------------------------------------------------------------------------------------
|10      |A1        |5        |4001    |B2        |10      |5        |1012    |C1        |10      |NULL    |
|40      |A2        |10       |4000    |B1        |40      |10       |1040    |C2        |40      |4000    |
|40      |A2        |10       |4000    |B1        |40      |10       |1041    |C3        |40      |4001    |
------------------------------------------------------------------------------------------------------------
Copy

Como realizar uma autojunção

Se você precisar unir uma tabela consigo mesma em diferentes colunas, você não pode realizar a autojunção com um único DataFrame. Os exemplos a seguir que usam um único DataFrame para realizar uma autojunção falham porque as expressões da coluna para "id" estão presentes nos lados esquerdo e direito da junção:

// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, Functions.col("id").equal_to(Functions.col("parent_id")));
Copy
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, df.col("id").equal_to(df.col("parent_id")));
Copy

Ambos os exemplos falham, com a seguinte exceção:

Exception in thread "main" com.snowflake.snowpark_java.SnowparkClientException:
  Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
  Instead, join this DataFrame to a clone() of itself.
Copy

Em vez disso, use o método clone para criar um clone do objeto DataFrame, e use os dois objetos DataFrame para realizar a junção:

// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
DataFrame dfLhs = session.table("sample_product_data");
// Clone the DataFrame object to use as the right-hand side of the join.
DataFrame dfRhs = dfLhs.clone();

// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id").equal_to(dfRhs.col("parent_id")));
dfJoined.show();
Copy

Se você quiser realizar uma autojunção na mesma coluna, chame o método join que passa no nome da coluna (ou uma array de nomes de colunas) para a cláusula USING:

// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, "key");
Copy

Como executar uma ação para avaliar um DataFrame

Como mencionado anteriormente, o DataFrame é avaliado lentamente, o que significa que a instrução SQL não é enviada ao servidor para execução até que você execute uma ação. Uma ação faz com que o DataFrame seja avaliado e envia a instrução SQL correspondente para o servidor para execução.

As seções a seguir explicam como executar uma ação de forma síncrona e assíncrona em um DataFrame:

Como executar uma ação de forma síncrona

Para executar uma ação de forma síncrona, chame um dos seguintes métodos de ação:

Método para executar uma ação de forma síncrona

Descrição

DataFrame.collect()

Avalia o DataFrame e retorna o conjunto de dados resultante como uma Array de objetos Row. Consulte Como retornar todas as linhas.

DataFrame.toLocalIterator()

Avalia o DataFrame e retorna um Iterator de objetos Row. Se o conjunto de resultados for grande, use esse método para evitar carregar todos os resultados na memória de uma só vez. Consulte Como retornar um iterador para as linhas.

DataFrame.count()

Avalia o DataFrame e retorna o número de linhas.

DataFrame.show()

Avalia o DataFrame e imprime as linhas no console. Observe que esse método limita o número de linhas a 10 (por padrão). Consulte Como imprimir as linhas em um DataFrame.

DataFrame.cacheResult()

Executa a consulta, cria uma tabela temporária e coloca os resultados na tabela. O método retorna um objeto HasCachedResult que você pode usar para acessar os dados dessa tabela temporária. Consulte Como criar um cache do DataFrame.

DataFrame.write().saveAsTable()

Armazena os dados no DataFrame para a tabela especificada. Consulte Como salvar dados em uma tabela.

DataFrame.read().fileformat().copyInto('tableName')

Copia os dados no DataFrame para a tabela especificada. Consulte Como copiar dados de arquivos para uma tabela.

Session.table('tableName').delete()

Deleta linhas na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela.

Session.table('tableName').update(), Session.table('tableName').updateColumn()

Atualiza linhas na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela.

Session.table('tableName').merge().methods.collect()

Funde linhas na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela.

Por exemplo, para executar a consulta e retornar o número de resultados, chame o método count:

// Create a DataFrame for the "sample_product_data" table.
DataFrame dfProducts = session.table("sample_product_data");

// Send the query to the server for execution and
// print the count of rows in the table.
System.out.println("Rows returned: " + dfProducts.count());
Copy

Você também pode chamar métodos de ação para:

Nota: Se você estiver chamando o método schema para obter as definições das colunas no DataFrame, você não precisa chamar um método de ação.

Como executar uma ação de forma assíncrona

Nota

Esse recurso foi introduzido no Snowpark 0.11.0.

Para executar uma ação de forma assíncrona, chame o método async para retornar um objeto “ator assíncrono” (por exemplo, DataFrameAsyncActor) e chamar um método de ação assíncrona nesse objeto.

Esses métodos de ação de um objeto de ator assíncrono retornam um objeto TypedAsyncJob, que você pode usar para verificar o status da ação assíncrona e obter os resultados da ação.

As próximas seções explicam como executar as ações de forma assíncrona e verificar os resultados.

Explicação do fluxo básico de ações assíncronas

Você pode usar os seguintes métodos para executar uma ação de forma assíncrona:

Método para realizar uma ação de forma assíncrona

Descrição

DataFrame.async().collect()

Avalia o DataFrame de forma assíncrona para obter o conjunto de dados resultante como um Array de objetos Row. Consulte Como retornar todas as linhas.

DataFrame.async.toLocalIterator

Avalia o DataFrame de forma assíncrona para obter um Iterator de objetos Row. Se o conjunto de resultados for grande, use esse método para evitar carregar todos os resultados na memória de uma só vez. Consulte Como retornar um iterador para as linhas.

DataFrame.async().count()

Avalia o DataFrame de forma assíncrona para obter o número de linhas.

DataFrame.write().async().saveAsTable()

Salva os dados na tabela DataFrame na tabela especificada de forma assíncrona. Consulte Como salvar dados em uma tabela.

DataFrame.read().fileformat().async().copyInto('tableName')

Copia os dados do DataFrame para a tabela especificada de forma assíncrona. Consulte Como copiar dados de arquivos para uma tabela.

Session.table('tableName').async().delete()

Apaga as linhas na tabela especificada de forma assíncrona. Consulte Atualização, eliminação e fusão de linhas em uma tabela.

Session.table('tableName').async().update() e Session.table('tableName').async().updateColumn()

Atualiza as linhas na tabela especificada de forma assíncrona. Consulte Atualização, eliminação e fusão de linhas em uma tabela.

A partir do objeto TypedAsyncJob retornado, você pode fazer o seguinte:

  • Para determinar se a ação foi concluída, chame o método isDone.

  • Para obter a ID da consulta que corresponde à ação, chame o método getQueryId.

  • Para retornar os resultados da ação (por exemplo, o Array de objetos Row para o método collect ou a contagem de linhas para o método count), chame o método getResult.

    Note que getResult é uma chamada com bloqueio.

  • Para cancelar a ação, chame o método cancel.

Por exemplo, para executar uma consulta de forma assíncrona e obter os resultados como uma Array de objetos Row, chame async().collect():

import java.util.Arrays;

// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
DataFrame df = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));

// Execute the query asynchronously.
// This call does not block.
TypedAsyncJob<Row[]> asyncJob = df.async().collect();
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
Row[] results = asyncJob.getResult();
System.out.println(Arrays.toString(results));
Copy

Para executar a consulta de forma assíncrona e obter o número de resultados, chame async().count():

// Create a DataFrame for the "sample_product_data" table.
DataFrame dfProducts = session.table("sample_product_data");

// Execute the query asynchronously.
// This call does not block.
TypedAsyncJob<Long> asyncJob = dfProducts.async().count();
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// Print the count of rows in the table.
// Note that getResult is a blocking call.
System.out.println("Rows returned: " + asyncJob.getResult());
Copy

Como especificar o número máximo de segundos a esperar

Ao chamar o método getResult, você pode usar o argumento maxWaitTimeInSeconds para especificar o número máximo de segundos para esperar que a consulta seja concluída antes de tentar obter os resultados. Por exemplo:

// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
Row[] results = asyncJob.getResult(10);
Copy

Se você omitir esse argumento, o método espera pelo número máximo de segundos especificado pela propriedade de configuração snowpark_request_timeout_in_seconds. (Essa é uma propriedade que você pode definir ao criar o objeto Session).

Como acessar uma consulta assíncrona por ID

Se você tiver a ID de uma consulta assíncrona que você enviou anteriormente, você pode chamar o método Session.createAsyncJob para criar um objeto AsyncJob que você pode usar para verificar o status da consulta, obter os resultados da consulta ou cancelar a consulta.

Note que, ao contrário de TypedAsyncJob, AsyncJob não fornece um método getResult para a obtenção dos resultados. Em vez disso, se você precisar obter os resultados, chame o método getRows ou getIterator.

Por exemplo:

import java.util.Arrays;
...

AsyncJob asyncJob = session.createAsyncJob(myQueryId);
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
Row[] rows = asyncJob.getRows();
System.out.println(Arrays.toString(rows));
Copy

Como obter linhas em um DataFrame

Depois de você especificar como o DataFrame deve ser transformado, você pode chamar um método de ação para executar uma consulta e retornar os resultados. Você pode retornar todas as linhas em um Array, ou você pode retornar um Iterator que lhe permite iterar sobre os resultados linha por linha. Neste último caso, se a quantidade de dados for grande, as linhas são carregadas na memória por parte para evitar o carregamento de uma grande quantidade de dados na memória.

Como retornar todas as linhas

Para retornar todas as linhas de uma vez, chame o método collect. Esse método retorna uma array de objetos Row. Para obter os valores da linha, chame o método getType (por exemplo, getString, getInt, etc.).

Por exemplo:

Row[] rows = session.table("sample_product_data").select(Functions.col("name"), Functions.col("category_id")).sort(Functions.col("name")).collect();
for (Row row : rows) {
  System.out.println("Name: " + row.getString(0) + "; Category ID: " + row.getInt(1));
}
Copy

Como retornar um iterador para as linhas

Se você quiser usar um Iterator para iterar sobre os objetos Row nos resultados, chame toLocalIterator. Se a quantidade de dados nos resultados for grande, o método carregará as linhas por parte para evitar carregar todas as linhas de uma só vez na memória.

Por exemplo:

import java.util.Iterator;

Iterator<Row> rowIterator = session.table("sample_product_data").select(Functions.col("name"), Functions.col("category_id")).sort(Functions.col("name")).toLocalIterator();
while (rowIterator.hasNext()) {
  Row row = rowIterator.next();
  System.out.println("Name: " + row.getString(0) + "; Category ID: " + row.getInt(1));
}
Copy

Como retornar as primeiras n linhas

Para retornar as primeiras n linhas, chame o método first passando o número de linhas a serem retornadas.

Como explicado em Como limitar o número de linhas em um DataFrame, os resultados são não determinísticos. Se você quiser que os resultados sejam determinísticos, chame esse método em um DataFrame ordenado (df.sort().first()).

Por exemplo:

import java.util.Arrays;
...

DataFrame df = session.table("sample_product_data");
Row[] rows = df.sort(Functions.col("name")).first(5);
System.out.println(Arrays.toString(rows));
Copy

Como imprimir as linhas em um DataFrame

Para imprimir as primeiras 10 linhas no DataFrame para o console, chame o método show. Para imprimir um número diferente de linhas, passe o número de linhas a serem impressas.

Como explicado em Como limitar o número de linhas em um DataFrame, os resultados são não determinísticos. Se você quiser que os resultados sejam determinísticos, chame esse método em um DataFrame ordenado (df.sort().show()).

Por exemplo:

DataFrame df = session.table("sample_product_data");
df.sort(Functions.col("name")).show();
Copy

Atualização, eliminação e fusão de linhas em uma tabela

Nota

Esse recurso foi introduzido no Snowpark 0.7.0.

Quando você chama Session.table para criar um objeto DataFrame para uma tabela, o método retorna um objeto Updatable, que estende o DataFrame com métodos adicionais para atualização e exclusão de dados na tabela. (Consulte Updatable.)

Se você precisar atualizar ou excluir linhas em uma tabela, você pode usar os seguintes métodos da classe Updatable:

Atualização de linhas em uma tabela

Para atualizar as linhas em uma tabela, chame o método update ou updateColumn, passando um Map que associa as colunas a serem atualizadas e os valores correspondentes a serem atribuídos a essas colunas:

  • Para especificar os nomes das colunas como cadeias de caracteres no Map, chame updateColumn.

  • Para especificar objetos Column no Map, chame update.

Ambos os métodos retornam um objeto UpdateResult contendo o número de linhas que foram atualizadas. (Consulte UpdateResult.)

Nota

Ambos os métodos são métodos de ação, o que significa que chamar o método envia instruções SQL ao servidor para execução.

Por exemplo, para substituir os valores na coluna chamada count pelo valor 1, se você deseja usar um Map que associa o nome da coluna (a String) com o valor correspondente, chame updateColumn:

import java.util.HashMap;
import java.util.Map;
...

Map<String, Column> assignments = new HashMap<>();
assignments.put("3rd", Functions.lit(1));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.updateColumn(assignments);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Copy

Se você quiser usar um objeto Column no Map para identificar a coluna a ser atualizada, chame update:

import java.util.HashMap;
import java.util.Map;
...

Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(1));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.update(assignments);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Copy

Se a atualização só deve ser feita quando uma condição for cumprida, você pode especificar essa condição como um argumento. Por exemplo, para substituir os valores na coluna chamada count por 2 para linhas nas quais a coluna category_id tem o valor 20:

import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(2));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.update(assignments, Functions.col("category_id").equal_to(Functions.lit(20)));
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Copy

Se você precisar basear a condição em uma junção com um objeto DataFrame diferente, você pode passar esse DataFrame como argumento e usar o DataFrame na condição. Por exemplo, para substituir os valores na coluna chamada count por 3 para linhas nas quais a coluna category_id corresponde à category_id no DataFrame dfParts:

import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(3));
Updatable updatableDf = session.table("sample_product_data");
DataFrame dfParts = session.table("parts");
UpdateResult updateResult = updatableDf.update(assignments, updatableDf.col("category_id").equal_to(dfParts.col("category_id")), dfParts);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Copy

Como excluir linhas de uma tabela

Para o método delete, você pode especificar uma condição que identifica as linhas a serem excluídas, e pode basear essa condição em uma junção com outro DataFrame. delete retorna um objeto DeleteResult, contendo o número de linhas que foram excluídas. (Consulte DeleteResult.)

Nota

delete é um método de ação, o que significa que chamar o método envia instruções SQL ao servidor para execução.

Por exemplo, para apagar as linhas que têm o valor 1 na coluna category_id:

Updatable updatableDf = session.table("sample_product_data");
DeleteResult deleteResult = updatableDf.delete(updatableDf.col("category_id").equal_to(Functions.lit(1)));
System.out.println("Number of rows deleted: " + deleteResult.getRowsDeleted());
Copy

Se a condição se refere a colunas em um DataFrame diferente, passe esse DataFrame como o segundo argumento. Por exemplo, para excluir as linhas nas quais a coluna category_id corresponde à category_id no DataFrame dfParts, passe dfParts como o segundo argumento:

Updatable updatableDf = session.table("sample_product_data");
DeleteResult deleteResult = updatableDf.delete(updatableDf.col("category_id").equal_to(dfParts.col("category_id")), dfParts);
System.out.println("Number of rows deleted: " + deleteResult.getRowsDeleted());
Copy

Fusão de linhas em uma tabela

Para inserir, atualizar e excluir linhas em uma tabela com base nos valores de uma segunda tabela ou subconsulta (o equivalente ao comando MERGE em SQL), faça o seguinte:

  1. No objeto Updatable para a tabela onde você quer fundir os dados, chame o método merge, passando o objeto DataFrame para a outra tabela e a expressão da coluna para a condição de junção.

    Isso retorna um objeto MergeBuilder que você pode usar para especificar as ações a serem tomadas (por exemplo, inserir, atualizar ou excluir) nas linhas que correspondem e nas que não correspondem aos critérios. (Consulte MergeBuilder.)

  2. Usando o objeto MergeBuilder:

    • Para especificar a atualização ou exclusão que deve ser feita nas linhas correspondentes, chame o método whenMatched.

      Se você precisar especificar uma condição adicional quando as linhas devem ser atualizadas ou excluídas, você pode passar uma expressão de coluna para essa condição.

      Esse método retorna um objeto MatchedClauseBuilder que você pode usar para especificar a ação a ser executada. (Consulte MatchedClauseBuilder.)

      Chame o método update ou delete no objeto MatchedClauseBuilder para especificar a ação de atualização ou exclusão que deve ser executada nas linhas correspondentes. Esses métodos devolvem um objeto MergeBuilder que você pode usar para especificar cláusulas adicionais.

    • Para especificar a inserção que deve ser feita quando as linhas não correspondem, chame o método whenNotMatched.

      Se você precisar especificar uma condição adicional quando as linhas devem ser inseridas, você pode passar uma expressão de coluna para essa condição.

      Esse método retorna um objeto NotMatchedClauseBuilder que você pode usar para especificar a ação a ser executada. (Consulte NotMatchedClauseBuilder.)

      Chame o método insert no objeto NotMatchedClauseBuilder para especificar a ação de inserção que deve ser executada quando as linhas não correspondem. Esses métodos devolvem um objeto MergeBuilder que você pode usar para especificar cláusulas adicionais.

  3. Quando terminar de especificar as inserções, atualizações e exclusões que devem ser realizadas, chame o método collect do objeto MergeBuilder para realizar as inserções, atualizações e exclusões especificadas na tabela.

    collect retorna um objeto MergeResult contendo o número de linhas que foram inseridas, atualizadas e excluídas. (Consulte MergeResult.)

O exemplo a seguir insere uma linha com as colunas id e value da tabela source na tabela target se a tabela target não contiver uma linha com uma ID correspondente:

MergeResult mergeResult = target.merge(source, target.col("id").equal_to(source.col("id")))
                    .whenNotMatched().insert([source.col("id"), source.col("value")])
                    .collect();
Copy

O exemplo a seguir atualiza uma linha na tabela target com o valor da coluna value da linha na tabela source que tem a mesma ID:

import java.util.HashMap;
import java.util.Map;
...
Map<String, Column> assignments = new HashMap<>();
assignments.put("value", source.col("value"));
MergeResult mergeResult = target.merge(source, target.col("id").equal_to(source.col("id")))
                    .whenMatched().update(assignments)
                    .collect();
Copy

Como salvar dados em uma tabela

Você pode salvar o conteúdo de um DataFrame em uma tabela nova ou existente. Para fazer isso, você deve ter os seguintes privilégios:

  • Privilégios CREATE TABLE sobre o esquema, se a tabela não existir.

  • Privilégios INSERT sobre a tabela.

Para salvar o conteúdo de um DataFrame em uma tabela:

  1. Chame o método write do DataFrame para obter um objeto DataFrameWriter.

  2. Chame o método mode do objeto DataFrameWriter, passando um objeto SaveMode que especifica suas preferências de escrita para a tabela:

    • Para inserir linhas, passe SaveMode.Append.

    • Para substituir a tabela existente, passe SaveMode.Overwrite.

    Esse método retorna o mesmo objeto DataFrameWriter configurado com o modo especificado.

  3. Se você estiver inserindo linhas em uma tabela existente (SaveMode.Append) e os nomes das colunas no DataFrame corresponderem aos nomes das colunas na tabela, chame a opção DataFrameWriter.option, passando "columnOrder" e "name" como argumentos.

    Nota

    Esse método foi introduzido no Snowpark 1.4.0.

    Por padrão, a opção columnOrder é definida como "index", o que significa que o DataFrameWriter insere os valores na ordem em que as colunas aparecem. Por exemplo, DataFrameWriter insere o valor da primeira coluna do DataFrame na primeira coluna da tabela, a segunda coluna do DataFrame na segunda coluna da tabela, etc.

    Esse método retorna o mesmo objeto DataFrameWriter configurado com a opção especificada.

  4. Chame o método saveAsTable do objeto DataFrameWriter para salvar o conteúdo do DataFrame em uma tabela especificada.

    Você não precisa chamar um método separado (por exemplo, collect) para executar a instrução SQL que salva os dados na tabela. saveAsTable é um método de ação que executa a instrução SQL.

O exemplo a seguir substitui uma tabela existente (identificada pela variável tableName) com o conteúdo do DataFrame df:

df.write().mode(SaveMode.Overwrite).saveAsTable(tableName);
Copy

O exemplo a seguir insere linhas do DataFrame df em uma tabela existente (identificada pela variável tableName). Nesse exemplo, a tabela e o DataFrame ambos contêm as colunas c1 e c2.

O exemplo demonstra a diferença entre configurar a opção columnOrder para "name" (que insere valores nas colunas da tabela com os mesmos nomes das colunas do DataFrame) e usar a opção padrão columnOrder (que insere valores nas colunas da tabela com base na ordem das colunas no DataFrame).

DataFrame df = session.sql("SELECT 1 AS c2, 2 as c1");
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write().mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName);
// With the default value of the columnOrder option ("index"), the DataFrameWriter uses the column positions
// and inserts a row with the values (1, 2).
df.write().mode(SaveMode.Append).saveAsTable(tableName);
Copy

Como criar uma exibição a partir de um DataFrame

Para criar uma exibição a partir de um DataFrame, chame o método createOrReplaceView:

df.createOrReplaceView("db.schema.viewName");
Copy

Note que chamar createOrReplaceView cria imediatamente a nova exibição. Mais importante, isso não faz com que o DataFrame seja avaliado. (O DataFrame em si não é avaliado até que você execute uma ação).

As exibições que você cria ao chamar createOrReplaceView são persistentes. Se você não precisar mais dessa exibição, você pode excluir a exibição manualmente.

Se você precisar criar uma exibição temporária apenas para a sessão, chame o método createOrReplaceTempView:

df.createOrReplaceTempView("db.schema.viewName");
Copy

Como criar um cache do DataFrame

Em alguns casos, pode ser necessário realizar uma consulta complexa e manter os resultados para uso em operações subsequentes (em vez de executar a mesma consulta novamente). Nesses casos, você pode armazenar o conteúdo de um DataFrame chamando o método cacheResult.

Esse método:

  • Executa a consulta.

    Você não precisa chamar um método de ação separado para recuperar os resultados antes de chamar cacheResult. O cacheResult é um método de ação que executa a consulta.

  • Salva os resultados em uma tabela temporária

    Como cacheResult cria uma tabela temporária, você deve ter o privilégio CREATE TABLE sobre o esquema que está em uso.

  • Retorna um objeto HasCachedResult, que dá acesso aos resultados da tabela temporária.

    Como HasCachedResult estende DataFrame, você pode realizar algumas das mesmas operações sobre estes dados em cache que pode realizar em um DataFrame.

Nota

Como cacheResult executa a consulta e salva os resultados em uma tabela, o método pode resultar em um aumento dos custos de computação e armazenamento.

Por exemplo:

// Set up a DataFrame to query a table.
DataFrame df = session.table("sample_product_data").filter(Functions.col("category_id").gt(Functions.lit(10)));
// Retrieve the results and cache the data.
HasCachedResult cachedDf = df.cacheResult();
// Create a DataFrame containing a subset of the cached data.
DataFrame dfSubset = cachedDf.filter(Functions.col("category_id").equal_to(Functions.lit(20))).select(Functions.col("name"), Functions.col("category_id"));
dfSubset.show();
Copy

Note que o DataFrame original não é afetado quando você chama esse método. Por exemplo, suponha que dfTable seja um DataFrame para a tabela sample_product_data:

HasCachedResult dfTempTable = dfTable.cacheResult();
Copy

Depois de chamar cacheResult, dfTable ainda aponta para a tabela sample_product_data, e você pode continuar a usar dfTable para consultar e atualizar essa tabela.

Para usar os dados em cache na tabela temporária, você usa dfTempTable (o objeto HasCachedResult retornado por cacheResult).

Como trabalhar com arquivos em um estágio

A biblioteca do Snowpark fornece classes e métodos que você pode usar para carregar dados no Snowflake e descarregar dados do Snowflake usando arquivos em estágios.

Nota

Para utilizar essas classes e métodos em um estágio, você deve ter os privilégios necessários para trabalhar com o estágio.

As próximas seções explicam como utilizar essas classes e métodos:

Carregamento e descarregamento de arquivos em um estágio

Para carregar e baixar arquivos em um estágio, use os métodos put e get do objeto FileOperation:

Carregamento de arquivos para um estágio

Para carregar arquivos para um estágio:

  1. Verifique se você tem os privilégios de carregar arquivos para o estágio.

  2. Use o método file do objeto Session para acessar o objeto FileOperation para a sessão.

  3. Chame o método put do objeto FileOperation para carregar os arquivos para um estágio.

    Esse método executa um comando SQL PUT.

    • Para especificar quaisquer parâmetros opcionais para o comando PUT, crie um Map dos parâmetros e valores e passe o Map como o argumento options. Por exemplo:

      import java.util.HashMap;
      import java.util.Map;
      ...
      // Upload a file to a stage without compressing the file.
      Map<String, String> putOptions = new HashMap<>();
      putOptions.put("AUTO_COMPRESS", "FALSE");
      PutResult[] putResults = session.file().put("file:///tmp/myfile.csv", "@myStage", putOptions);
      
      Copy
    • No argumento localFileName, você pode usar curingas (* e ?) para identificar um conjunto de arquivos a ser carregado. Por exemplo:

      // Upload the CSV files in /tmp with names that start with "file".
      // You can use the wildcard characters "*" and "?" to match multiple files.
      PutResult[] putResults = session.file().put("file:///tmp/file*.csv", "@myStage/prefix2")
      
      Copy
  4. Verifique o Array de objetos PutResult retornados pelo método put para determinar se os arquivos foram carregados com sucesso. Por exemplo, para imprimir o nome do arquivo e o status da operação PUT para esse arquivo:

    // Print the filename and the status of the PUT operation.
    for (PutResult result : putResults) {
      System.out.println(result.getSourceFileName() + ": " + result.getStatus());
    }
    
    Copy

Como baixar arquivos de um estágio

Para baixar arquivos de uma estágio:

  1. Verifique se você tem os privilégios de carregar arquivos para o estágio.

  2. Use o método file do objeto Session para acessar o objeto FileOperation para a sessão.

  3. Chame o método get do objeto FileOperation para baixar os arquivos de uma estágio.

    Esse método executa um comando SQL GET.

    Para especificar quaisquer parâmetros opcionais para o comando GET, crie um Map dos parâmetros e valores e passe o Map como o argumento options. Por exemplo:

    import java.util.HashMap;
    import java.util.Map;
    ...
    // Upload a file to a stage without compressing the file.
    // Download files with names that match a regular expression pattern.
    Map<String, String> getOptions = new HashMap<>();
    getOptions.put("PATTERN", "'.*file_.*.csv.gz'");
    GetResult[] getResults = session.file().get("@myStage", "file:///tmp", getOptions);
    
    Copy
  4. Verifique o Array de objetos GetResult retornados pelo método get para determinar se os arquivos foram baixados com sucesso. Por exemplo, para imprimir o nome do arquivo e o status da operação GET para esse arquivo:

    // Print the filename and the status of the GET operation.
    for (GetResult result : getResults) {
      System.out.println(result.getFileName() + ": " + result.getStatus());
    }
    
    Copy

Como usar fluxos de entrada para carregar e descarregar dados em um estágio

Nota

Esse recurso foi introduzido no Snowpark 1.4.0.

Para usar fluxos de entrada para carregar dados em um arquivo em um estágio e baixar dados de um arquivo em um estágio, use os métodos uploadStream e downloadStream do objeto FileOperation:

Como usar um fluxo de entrada para carregar dados em um arquivo em um estágio

Para carregar os dados de um objeto java.io.InputStream em um arquivo em um estágio:

  1. Verifique se você tem os privilégios de carregar arquivos para o estágio.

  2. Use o método file do objeto Session para acessar o objeto FileOperation para a sessão.

  3. Chame o método uploadStream do objeto FileOperation.

    Passe o caminho completo para o arquivo no estágio onde os dados devem ser escritos e o objeto InputStream. Além disso, use o argumento compress para especificar se os dados devem ou não ser compactados antes de serem carregados.

Por exemplo:

import java.io.InputStream;
...
boolean compressData = true;
String pathToFileOnStage = "@myStage/path/file";
session.file().uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData);
Copy

Como usar um fluxo de entrada para baixar dados de um arquivo em um estágio

Para baixar dados de um arquivo em um estágio para um objeto java.io.io.InputStream:

  1. Verifique se você tem os privilégios de carregar arquivos para o estágio.

  2. Use o método file do objeto Session para acessar o objeto FileOperation para a sessão.

  3. Chame o método downloadStream do objeto FileOperation.

    Passe o caminho completo para o arquivo no estágio contendo os dados a serem baixados. Use o argumento decompress para especificar se os dados no arquivo estão ou não compactados.

Por exemplo:

import java.io.InputStream;
...
boolean isDataCompressed = true;
String pathToFileOnStage = "@myStage/path/file";
InputStream is = session.file().downloadStream(pathToFileOnStage, isDataCompressed);
Copy

Como configurar um DataFrame para arquivos em um estágio

Esta seção explica como configurar um DataFrame para arquivos em um estágio do Snowflake. Após criar esse DataFrame, você pode usar o DataFrame para:

Para configurar um DataFrame para arquivos em um estágio do Snowflake, use a classe DataFrameReader:

  1. Verifique se você tem os seguintes privilégios:

  2. Chame o método read na classe Session para acessar um objeto DataFrameReader.

  3. Se os arquivos estiverem no formato CSV, descreva os campos no arquivo. Para fazer isso:

    1. Crie um objeto StructType que consiste em uma array de objetos StructField que descrevem os campos no arquivo.

    2. Para cada objeto StructField, especifique o seguinte:

      • O nome do campo.

      • O tipo de dados do campo (especificado como um objeto no pacote com.snowflake.snowpark_java.types).

      • Se o campo é ou não anulável.

      Por exemplo:

      import com.snowflake.snowpark_java.types.*;
      ...
      
      StructType schemaForDataFile = StructType.create(
        new StructField("id", DataTypes.StringType, true),
        new StructField("name", DataTypes.StringType, true));
      
      Copy
    3. Chame o método schema no objeto DataFrameReader, passando o objeto StructType.

      Por exemplo:

      DataFrameReader dfReader = session.read().schema(schemaForDataFile);
      
      Copy

      O método schema retorna um objeto DataFrameReader que é configurado para ler arquivos contendo os campos especificados.

      Note que você não precisa fazer isso para arquivos em outros formatos (como JSON). Para esses arquivos, o DataFrameReader trata os dados como um único campo do tipo VARIANT com o nome de campo $1.

  4. Se você precisar especificar informações adicionais sobre como os dados devem ser lidos (por exemplo, que os dados estão comprimidos ou que um arquivo CSV usa um ponto-e-vírgula em vez de uma vírgula para delimitar campos), chame o método DataFrameReader.option ou o método DataFrameReader.options.

    Passe o nome e valor da opção que você deseja definir. Você pode definir os seguintes tipos de opções:

    O exemplo a seguir configura o objeto DataFrameReader para consultar dados em um arquivo CSV que não é compactado e que usa um ponto-e-vírgula como delimitador de campo.

    dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE");
    
    Copy

    O método option retorna um objeto DataFrameReader que é configurado com as opções especificadas.

    Para definir várias opções, você pode encadear chamadas ao método option (como mostrado no exemplo acima) ou chamar o método DataFrameReader.options, passando um Map dos nomes e valores das opções.

  5. Chame o método correspondente ao formato dos arquivos. Você pode chamar um dos seguintes métodos:

    Ao chamar esses métodos, passe o local do estágio dos arquivos a serem lidos. Por exemplo:

    DataFrame df = dfReader.csv("@mystage/myfile.csv");
    
    Copy

    Para especificar vários arquivos que começam com o mesmo prefixo, especifique o prefixo após o nome do estágio. Por exemplo, para carregar arquivos que tenham o prefixo csv_ do estágio @mystage:

    DataFrame df = dfReader.csv("@mystage/csv_");
    
    Copy

    Os métodos correspondentes ao formato de um arquivo retornam um objeto CopyableDataFrame para esse arquivo. CopyableDataFrame estende DataFrame e fornece métodos adicionais para trabalhar os dados em arquivos preparados.

  6. Chame um método de ação para:

    Como no caso de DataFrames para tabelas, os dados não são obtidos no DataFrame até você chamar um método de ação.

Carregamento de dados de arquivos em um DataFrame

Depois que você configurar um DataFrame para arquivos em um estágio, você pode carregar dados dos arquivos para o DataFrame:

  1. Use os métodos do objeto DataFrame para realizar quaisquer transformações necessárias no conjunto de dados (por exemplo, selecionar campos específicos, filtrar linhas, etc.).

    Por exemplo, para extrair o elemento color de um arquivo JSON chamado data.json no estágio chamado mystage:

    DataFrame df = session.read().json("@mystage/data.json").select(Functions.col("$1").subField("color"));
    
    Copy

    Como explicado anteriormente, para arquivos em formatos diferentes de CSV (por exemplo JSON), o DataFrameReader trata os dados no arquivo como uma única coluna VARIANT com o nome $1.

  2. Chame o método DataFrame.collect para carregar os dados. Por exemplo:

    Row[] results = df.collect();
    
    Copy

Como copiar dados de arquivos para uma tabela

Depois de configurar um DataFrame para arquivos em um estágio, você pode chamar o método copyInto para copiar os dados em uma tabela. Esse método executa o comando COPY INTO <tabela>.

Nota

Você não precisa chamar o método collect antes de chamar copyInto. Os dados dos arquivos não precisam estar no DataFrame antes de você chamar copyInto.

Por exemplo, o seguinte código carrega os dados do arquivo CSV especificado por myFileStage para a tabela mytable. Como os dados estão em um arquivo CSV, o código também deve descrever os campos no arquivo. O exemplo faz isso chamando o método schema do objeto DataFrameReader e passando um objeto StructType (schemaForDataFile) contendo uma array de objetos StructField que descrevem os campos.

CopyableDataFrame copyableDf = session.read().schema(schemaForDataFile).csv("@mystage/myfile.csv");
copyableDf.copyInto("mytable");
Copy

Como salvar um DataFrame para arquivos em um estágio

Nota

Esse recurso foi introduzido no Snowpark 1.5.0.

Se você precisar salvar um DataFrame para arquivos em um estágio, você pode chamar o método DataFrameWriter correspondente ao formato do arquivo (por exemplo, o método csv para escrever em um arquivo CSV), passando o local do estágio onde os arquivos devem ser salvos. Esses métodos DataFrameWriter executam o comando COPY INTO <local>.

Nota

Você não precisa chamar o método collect antes de chamar esses métodos DataFrameWriter. Os dados do arquivo não precisam estar no arquivo DataFrame antes de você chamar esses métodos.

Para salvar o conteúdo de um DataFrame em arquivos em um estágio:

  1. Chame o método write do objeto DataFrame para obter um objeto DataFrameWriter. Por exemplo, para obter o objeto DataFrameWriter para um DataFrame que representa a tabela chamada sample_product_data:

    DataFrameWriter dfWriter = session.table("sample_product_data").write();
    
    Copy
  2. Se você quiser substituir o conteúdo do arquivo (se o arquivo existir), chame o método mode do objeto DataFrameWriter, passando SaveMode.Overwrite.

    Caso contrário, por padrão, o DataFrameWriter reporta um erro se o arquivo especificado no estágio já existir.

    O método mode retorna o mesmo objeto DataFrameWriter configurado com o modo especificado.

    Por exemplo, para especificar que o DataFrameWriter deve substituir o arquivo no estágio:

    dfWriter = dfWriter.mode(SaveMode.Overwrite);
    
    Copy
  3. Se você precisar especificar informações adicionais sobre como os dados devem ser salvos (por exemplo, que os dados devem ser compactados ou que você deseja usar um ponto-e-vírgula para delimitar campos em um arquivo CSV), chame o método DataFrameWriter.option ou o método DataFrameWriter.options.

    Passe o nome e valor da opção que você deseja definir. Você pode definir os seguintes tipos de opções:

    Observe que você não pode usar o método option para definir as seguintes opções:

    • A opção do tipo de formato TYPE.

    • A opção de cópia OVERWRITE. Em vez disso, para definir esta opção, chame o método mode (como mencionado na etapa anterior).

    O exemplo a seguir configura o objeto DataFrameWriter para salvar dados em um arquivo CSV sem compressão, usando um ponto-e-vírgula (no lugar de uma vírgula) como delimitador de campo.

    dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE");
    
    Copy

    O método option retorna um objeto DataFrameWriter que é configurado com a opção especificada.

    Para definir várias opções, você pode encadear chamadas ao método option (como mostrado no exemplo acima) ou chamar o método DataFrameWriter.options, passando um Map dos nomes e valores das opções.

  4. Para retornar detalhes sobre cada arquivo que foi salvo, defina a DETAILED_OUTPUT opção de cópia como TRUE.

    Por padrão, DETAILED_OUTPUT é FALSE, o que significa que o método retorna uma única linha de saída contendo os campos "rows_unloaded", "input_bytes" e "output_bytes".

    Quando você define DETAILED_OUTPUT como TRUE, o método retorna uma linha de saída para cada arquivo salvo. Cada linha contém os campos FILE_NAME, FILE_SIZE e ROW_COUNT.

  5. Chame o método correspondente ao formato do arquivo para salvar os dados no arquivo. Você pode chamar um dos seguintes métodos:

    Ao chamar esses métodos, passe o local do estágio do arquivo onde os dados devem ser escritos (por exemplo, @mystage).

    Por padrão, o método salva os dados em nomes de arquivos com o prefixo data_ (por exemplo, @mystage/data_0_0_0.csv). Se você quiser que os arquivos sejam nomeados com um prefixo diferente, especifique o prefixo após o nome do estágio. Por exemplo:

    WriteFileResult writeFileResult = dfWriter.csv("@mystage/saved_data");
    
    Copy

    Esse exemplo salva o conteúdo do DataFrame em arquivos que começam com o prefixo saved_data (por exemplo, @mystage/saved_data_0_0_0.csv).

  6. Verifique o objeto WriteFileResult retornado para obter informações sobre a quantidade de dados escritos no arquivo.

    A partir do objeto WriteFileResult, você pode acessar a saída produzida pelo comando COPY INTO <location>:

    • Para acessar as linhas de saída como uma array de objetos Row, chame o método getRows.

    • Para determinar quais campos estão presentes nas linhas, chame o método getSchema, que retorna um StructType que descreve os campos na linha.

    Por exemplo, para imprimir os nomes dos campos e valores nas linhas de saída:

    WriteFileResult writeFileResult = dfWriter.csv("@mystage/saved_data");
    Row[] rows = writeFileResult.getRows();
    StructType schema = writeFileResult.getSchema();
    for (int i = 0 ; i < rows.length ; i++) {
      System.out.println("Row:" + i);
      Row row = rows[i];
      for (int j = 0; j < schema.size(); j++) {
        System.out.println(schema.get(j).name() + ": " + row.get(j));
      }
    }
    
    Copy

O exemplo a seguir usa um DataFrame para salvar o conteúdo da tabela chamada car_sales para arquivos JSON com o prefixo saved_data no estágio @mystage (por exemplo, @mystage/saved_data_0_0_0.json). O código de exemplo:

  • Substitui o arquivo se ele já existir no estágio.

  • Retorna resultados detalhados sobre a operação de salvar.

  • Salva os dados sem compressão.

Finalmente, o código de exemplo imprime cada campo e valor nas linhas de saída retornadas:

DataFrame df = session.table("car_sales");
WriteFileResult writeFileResult = df.write().mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data");
Row[] rows = writeFileResult.getRows();
StructType schema = writeFileResult.getSchema();
for (int i = 0 ; i < rows.length ; i++) {
  System.out.println("Row:" + i);
  Row row = rows[i];
  for (int j = 0; j < schema.size(); j++) {
    System.out.println(schema.get(j).name() + ": " + row.get(j));
  }
}
Copy

Como trabalhar com dados semiestruturados

Usando um DataFrame, você pode consultar e acessar dados semiestruturados (por exemplo, dados de JSON). As próximas seções explicam como trabalhar com dados semiestruturados em um DataFrame.

Nota

Os exemplos nestas seções usam os dados de exemplo em Amostra de dados usados em exemplos.

Como percorrer dados semiestruturados

Para se referir a um campo ou elemento específico em dados semiestruturados, use os seguintes métodos do objeto Column:

  • Use subField(«<field_name>») para retornar um objeto Column para um campo em um OBJECT (ou uma VARIANT que contenha um OBJECT).

  • Use subField(<index>) para retornar um objeto Column para um elemento em uma ARRAY (ou uma VARIANT que contenha uma ARRAY).

Nota

Se o nome do campo ou elementos no caminho forem irregulares e dificultarem o uso dos métodos Column.apply, você pode usar Functions.get, Functions.get_ignore_case ou Functions.get_path como alternativa.

Por exemplo, o seguinte código seleciona o campo dealership nos objetos na coluna src dos dados de exemplo:

DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("dealership")).show();
Copy

O código imprime a seguinte saída:

----------------------------
|"""SRC""['DEALERSHIP']"   |
----------------------------
|"Valley View Auto Sales"  |
|"Tindel Toyota"           |
----------------------------
Copy

Nota

Os valores no DataFrame estão entre aspas duplas porque eles são retornados como literais de cadeias de caracteres. Para converter esses valores para um tipo específico, consulte Conversão explícita de valores em dados semiestruturados.

Você também pode encadear chamadas a métodos para percorrer um caminho para um campo ou elemento específico.

Por exemplo, o código a seguir seleciona o campo name no objeto salesperson:

DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("salesperson").subField("name")).show();
Copy

O código imprime a seguinte saída:

------------------------------------
|"""SRC""['SALESPERSON']['NAME']"  |
------------------------------------
|"Frank Beasley"                   |
|"Greg Northrup"                   |
------------------------------------
Copy

Como outro exemplo, o código a seguir seleciona o primeiro elemento do campo vehicle, que contém uma array de veículos. O exemplo também seleciona o campo price do primeiro elemento.

DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("vehicle").subField(0)).show();
df.select(Functions.col("src").subField("vehicle").subField(0).subField("price")).show();
Copy

O código imprime a seguinte saída:

---------------------------
|"""SRC""['VEHICLE'][0]"  |
---------------------------
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "paint protection"   |
|  ],                     |
|  "make": "Honda",       |
|  "model": "Civic",      |
|  "price": "20275",      |
|  "year": "2017"         |
|}                        |
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "rust proofing",     |
|    "fabric protection"  |
|  ],                     |
|  "make": "Toyota",      |
|  "model": "Camry",      |
|  "price": "23500",      |
|  "year": "2017"         |
|}                        |
---------------------------

------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']"  |
------------------------------------
|"20275"                           |
|"23500"                           |
------------------------------------
Copy

Como alternativa ao método apply, você pode usar as funções Functions.get, Functions.get_ignore_case ou Functions.get_path se o nome do campo ou elementos no caminho forem irregulares e dificultarem o uso dos métodos Column.subField.

Por exemplo, as duas linhas de código a seguir imprimem o valor de um campo especificado em um objeto:

df.select(Functions.get(Functions.col("src"), Functions.lit("dealership"))).show();
df.select(Functions.col("src").subField("dealership")).show();
Copy

Da mesma forma, as seguintes linhas de código imprimem o valor de um campo em um caminho especificado em um objeto:

df.select(Functions.get_path(Functions.col("src"), Functions.lit("vehicle[0].make"))).show();
df.select(Functions.col("src").subField("vehicle").subField(0).subField("make")).show();
Copy

Conversão explícita de valores em dados semiestruturados

Por padrão, os valores de campos e elementos são retornados como literais de cadeias de caracteres (incluindo as aspas duplas), como mostrado nos exemplos acima.

Para evitar resultados inesperados, chame o método de converter para converter o valor para um tipo específico. Por exemplo, o seguinte código imprime os valores sem e com conversão:

// Import the objects for the data types, including StringType.
import com.snowflake.snowpark_java.types.*;
...
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("salesperson").subField("id")).show();
df.select(Functions.col("src").subField("salesperson").subField("id").cast(DataTypes.StringType)).show();
Copy

O código imprime a seguinte saída:

----------------------------------
|"""SRC""['SALESPERSON']['ID']"  |
----------------------------------
|"55"                            |
|"274"                           |
----------------------------------

---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)"  |
---------------------------------------------------
|55                                               |
|274                                              |
---------------------------------------------------
Copy

Como achatar uma array de objetos em linhas

Se você precisar “achatar” dados semiestruturados em um DataFrame (por exemplo, produzir uma linha para cada objeto de uma array), chame o método flatten. Esse método é equivalente à função FLATTEN do SQL. Se você passar um caminho para um objeto ou array, o método retorna um DataFrame contendo uma linha para cada campo ou elemento no objeto ou array.

Por exemplo, nos dados de exemplo, src:customer é um conjunto de objetos que contém informações sobre um cliente. Cada objeto contém um campo name e address.

Se você passar esse caminho para a função flatten:

DataFrame df = session.table("car_sales");
df.flatten(Functions.col("src").subField("customer")).show();
Copy

o método retornará um DataFrame:

----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC"                                      |"SEQ"  |"KEY"  |"PATH"  |"INDEX"  |"VALUE"                            |"THIS"                               |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{                                          |1      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "San Francisco, CA",  |  {                                  |
|    {                                      |       |       |        |         |  "name": "Joyce Ridgely",         |    "address": "San Francisco, CA",  |
|      "address": "San Francisco, CA",      |       |       |        |         |  "phone": "16504378889"           |    "name": "Joyce Ridgely",         |
|      "name": "Joyce Ridgely",             |       |       |        |         |}                                  |    "phone": "16504378889"           |
|      "phone": "16504378889"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Valley View Auto Sales",  |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "55",                            |       |       |        |         |                                   |                                     |
|    "name": "Frank Beasley"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "paint protection"                 |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Honda",                     |       |       |        |         |                                   |                                     |
|      "model": "Civic",                    |       |       |        |         |                                   |                                     |
|      "price": "20275",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
|{                                          |2      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "New York, NY",       |  {                                  |
|    {                                      |       |       |        |         |  "name": "Bradley Greenbloom",    |    "address": "New York, NY",       |
|      "address": "New York, NY",           |       |       |        |         |  "phone": "12127593751"           |    "name": "Bradley Greenbloom",    |
|      "name": "Bradley Greenbloom",        |       |       |        |         |}                                  |    "phone": "12127593751"           |
|      "phone": "12127593751"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Tindel Toyota",           |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "274",                           |       |       |        |         |                                   |                                     |
|    "name": "Greg Northrup"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "rust proofing",                   |       |       |        |         |                                   |                                     |
|        "fabric protection"                |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Toyota",                    |       |       |        |         |                                   |                                     |
|      "model": "Camry",                    |       |       |        |         |                                   |                                     |
|      "price": "23500",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
----------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

A partir desse DataFrame, você poderá selecionar os campos name e address de cada objeto no campo VALUE:

df.flatten(Functions.col("src").subField("customer")).select(Functions.col("value").subField("name"), Functions.col("value").subField("address")).show();
Copy
-------------------------------------------------
|"""VALUE""['NAME']"   |"""VALUE""['ADDRESS']"  |
-------------------------------------------------
|"Joyce Ridgely"       |"San Francisco, CA"     |
|"Bradley Greenbloom"  |"New York, NY"          |
-------------------------------------------------
Copy

O código a seguir complementa o exemplo anterior ao converter os valores para um tipo específico e mudar os nomes das colunas:

df.flatten(Functions.col("src").subField("customer")).select(Functions.col("value").subField("name").cast(DataTypes.StringType).as("Customer Name"), Functions.col("value").subField("address").cast(DataTypes.StringType).as("Customer Address")).show();
Copy
-------------------------------------------
|"Customer Name"     |"Customer Address"  |
-------------------------------------------
|Joyce Ridgely       |San Francisco, CA   |
|Bradley Greenbloom  |New York, NY        |
-------------------------------------------
Copy

Como executar instruções SQL

Para executar uma instrução SQL que você especificar, chame o método sql na classe Session e passe a instrução a ser executada. O método retorna um DataFrame.

Note que a instrução SQL não será executada até que você chame um método de ação.

import java.util.Arrays;

// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
DataFrame dfStageFiles  = session.sql("ls @myStage");
Row[] files = dfStageFiles.collect();
System.out.println(Arrays.toString(files));

// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect();

DataFrame tableDf = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
// Get the count of rows from the table.
long numRows = tableDf.count();
System.out.println("Count: " + numRows);
Copy

Se você quiser chamar métodos para transformar o DataFrame (por exemplo, filtro, seleção, etc.), observe que estes métodos funcionam somente se a instrução SQL subjacente for uma instrução SELECT. Os métodos de transformação não são suportados para outros tipos de instruções SQL.

import java.util.Arrays;

DataFrame df = session.sql("select id, category_id, name from sample_product_data where id > 10");
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
Row[] results = df.filter(Functions.col("category_id").lt(Functions.lit(10))).select(Functions.col("id")).collect();
System.out.println(Arrays.toString(results));

// In this example, the underlying SQL statement is not a SELECT statement.
DataFrame dfStageFiles = session.sql("ls @myStage");
// Calling the filter method results in an error.
dfStageFiles.filter(...);
Copy