Como trabalhar com DataFrames no Snowpark Java¶
No Snowpark, a principal forma de consultar e processar dados é através de um DataFrame. Este tópico explica como trabalhar com DataFrames.
Neste tópico:
Para obter e manipular dados, use a classe DataFrame. Um DataFrame representa um conjunto de dados relacionais que é avaliado lentamente: ele só é executado quando uma ação específica é acionada. Em certo sentido, um DataFrame é como uma consulta que precisa ser avaliada para obter dados.
Para obter dados em um DataFrame:
Construa um DataFrame especificando a fonte dos dados para o conjunto de dados.
Por exemplo, você pode criar um DataFrame para guardar dados de uma tabela, um arquivo CSV externo ou a execução de uma instrução SQL.
Especifique como o conjunto de dados no DataFrame deve ser transformado.
Por exemplo, você pode especificar quais colunas devem ser selecionadas, como as linhas devem ser filtradas, como os resultados devem ser ordenados e agrupados, etc.
Execute a instrução para obter os dados para o DataFrame.
Para obter os dados no DataFrame, você deve invocar um método que execute uma ação (por exemplo, o método
collect()
).
As próximas seções explicam essas etapas com mais detalhes.
Como estabelecer os exemplos para esta seção¶
Alguns exemplos desta seção utilizam um DataFrame para consultar uma tabela chamada sample_product_data
. Se você quiser executar esses exemplos, você pode criar essa tabela e preenchê-la com alguns dados executando as seguintes instruções SQL:
CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT, amount NUMBER(12, 2), quantity INT, product_date DATE);
INSERT INTO sample_product_data VALUES
(1, 0, 5, 'Product 1', 'prod-1', 1, 10, 1.00, 15, TO_DATE('2021.01.01', 'YYYY.MM.DD')),
(2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20, 2.00, 30, TO_DATE('2021.02.01', 'YYYY.MM.DD')),
(3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30, 3.00, 45, TO_DATE('2021.03.01', 'YYYY.MM.DD')),
(4, 0, 10, 'Product 2', 'prod-2', 2, 40, 4.00, 60, TO_DATE('2021.04.01', 'YYYY.MM.DD')),
(5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50, 5.00, 75, TO_DATE('2021.05.01', 'YYYY.MM.DD')),
(6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60, 6.00, 90, TO_DATE('2021.06.01', 'YYYY.MM.DD')),
(7, 0, 20, 'Product 3', 'prod-3', 3, 70, 7.00, 105, TO_DATE('2021.07.01', 'YYYY.MM.DD')),
(8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80, 7.25, 120, TO_DATE('2021.08.01', 'YYYY.MM.DD')),
(9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90, 7.50, 135, TO_DATE('2021.09.01', 'YYYY.MM.DD')),
(10, 0, 50, 'Product 4', 'prod-4', 4, 100, 7.75, 150, TO_DATE('2021.10.01', 'YYYY.MM.DD')),
(11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100, 8.00, 165, TO_DATE('2021.11.01', 'YYYY.MM.DD')),
(12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100, 8.50, 180, TO_DATE('2021.12.01', 'YYYY.MM.DD'));
Para verificar se a tabela foi criada, execute:
SELECT * FROM sample_product_data;
Como criar um DataFrame¶
Para construir um DataFrame, você pode usar métodos na classe Session
. Cada um dos métodos a seguir constrói um DataFrame a partir de um tipo diferente de fonte de dados:
Para criar um DataFrame a partir de dados em uma tabela, exibição ou fluxo, chame o método
table
:// Create a DataFrame from the data in the "sample_product_data" table. DataFrame dfTable = session.table("sample_product_data"); // Print out the first 10 rows. dfTable.show();
Nota
O método
table
retorna um objetoUpdatable
.Updatable
estendeDataFrame
e fornece métodos adicionais para trabalhar com dados na tabela (por exemplo, métodos para atualização e exclusão de dados). Consulte Atualização, eliminação e fusão de linhas em uma tabela.Para criar um DataFrame a partir de valores especificados:
Construa um conjunto de objetos
Row
que contenham os valores.Construa um objeto
StructType
que descreva os tipos de dados desses valores.Chame o método
createDataFrame
passando a array o objetoStructType
.
// Import name from the types package, which contains StructType and StructField. import com.snowflake.snowpark_java.types.*; ... // Create a DataFrame containing specified values. Row[] data = {Row.create(1, "a"), Row.create(2, "b")}; StructType schema = StructType.create( new StructField("num", DataTypes.IntegerType), new StructField("str", DataTypes.StringType)); DataFrame df = session.createDataFrame(data, schema); // Print the contents of the DataFrame. df.show();
Nota
As palavras reservadas pelo Snowflake não são válidas como nomes de colunas na criação de um DataFrame. Para obter uma lista de palavras reservadas, consulte Palavras-chave reservadas e limitadas.
Para criar um DataFrame contendo um intervalo de valores, chame o método
range
:// Create a DataFrame from a range DataFrame dfRange = session.range(1, 10, 2); // Print the contents of the DataFrame. dfRange.show();
Para criar um DataFrame para um arquivo em um estágio, chame
read
para obter um objetoDataFrameReader
. No objetoDataFrameReader
, chame o método correspondente ao formato dos dados no arquivo:// Create a DataFrame from data in a stage. DataFrame dfJson = session.read().json("@mystage2/data1.json"); // Print the contents of the DataFrame. dfJson.show();
Para criar um DataFrame para armazenar os resultados de uma consulta SQL, chame o método
sql
:// Create a DataFrame from a SQL query DataFrame dfSql = session.sql("SELECT name from sample_product_data"); // Print the contents of the DataFrame. dfSql.show();
Nota: Embora você possa usar este método para executar instruções SELECT que obtêm dados de tabelas e arquivos preparados, você deve em vez disso usar os métodos
table
eread
. Métodos comotable
eread
podem fornecer melhor realce de sintaxe, realce de erros e complementação inteligente de código em ferramentas de desenvolvimento.
Especificação de como o conjunto de dados deve ser transformado¶
Para especificar quais colunas devem ser selecionadas e como os resultados devem ser filtrados, ordenados, agrupados, etc., chame os métodos de DataFrame que transformam o conjunto de dados. Para identificar colunas nesses métodos, use o método estático Functions.col
ou uma expressão que avalie para uma coluna. (Consulte Como especificar colunas e expressões.)
Por exemplo:
Para especificar quais linhas devem ser retornadas, chame o método
filter
:// Create a DataFrame for the rows with the ID 1 // in the "sample_product_data" table. DataFrame df = session.table("sample_product_data").filter( Functions.col("id").equal_to(Functions.lit(1))); df.show();
Para especificar as colunas que devem ser selecionadas, chame o método
select
:// Create a DataFrame that contains the id, name, and serial_number // columns in te "sample_product_data" table. DataFrame df = session.table("sample_product_data").select( Functions.col("id"), Functions.col("name"), Functions.col("serial_number")); df.show();
Cada método retorna um novo objeto DataFrame que foi transformado. (O método não afeta o objeto DataFrame original). Isso significa que se você quiser aplicar múltiplas transformações, você pode encadear chamadas a métodos, chamando cada método de transformação subsequente sobre o novo objeto DataFrame retornado pela chamada ao método anterior.
Observe que esses métodos de transformação não obtêm dados do banco de dados Snowflake. (Os métodos de ação descritos em Como executar uma ação para avaliar um DataFrame realizam a recuperação de dados). Os métodos de transformação simplesmente especificam como a instrução SQL deve ser construída.
Como especificar colunas e expressões¶
Ao chamar esses métodos de transformação, pode ser necessário especificar colunas ou expressões que utilizam colunas. Por exemplo, ao chamar o método select
, você precisa especificar as colunas que devem ser selecionadas.
Para se referir a uma coluna, crie um objeto Column chamando o método estático Functions.col.
DataFrame dfProductInfo = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
dfProductInfo.show();
Nota
Para criar um objeto Column
para um literal, consulte Como usar literais como objetos de coluna.
Ao especificar um filtro, projeção, condição de junção, etc., você pode usar objetos Column
em uma expressão. Por exemplo:
Você pode usar objetos
Column
com o métodofilter
para especificar uma condição de filtro:// Specify the equivalent of "WHERE id = 12" // in an SQL SELECT statement. DataFrame df = session.table("sample_product_data"); df.filter(Functions.col("id").equal_to(Functions.lit(12))).show();
// Specify the equivalent of "WHERE key + category_id < 10" // in an SQL SELECT statement. DataFrame df2 = session.table("sample_product_data"); df2.filter(Functions.col("key").plus(Functions.col("category_id")).lt(Functions.lit(10))).show();
Você pode usar objetos
Column
com o métodoselect
para definir um alias:// Specify the equivalent of "SELECT key * 10 AS c" // in an SQL SELECT statement. DataFrame df3 = session.table("sample_product_data"); df3.select(Functions.col("key").multiply(Functions.lit(10)).as("c")).show();
Você pode usar objetos
Column
com o métodojoin
para definir uma condição de junção:// Specify the equivalent of "sample_a JOIN sample_b on sample_a.id_a = sample_b.id_a" // in an SQL SELECT statement. DataFrame dfLhs = session.table("sample_a"); DataFrame dfRhs = session.table("sample_b"); DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a"))); dfJoined.show();
Como se referir a colunas em diferentes DataFrames¶
Ao se referir a colunas em dois objetos DataFrame diferentes com o mesmo nome (por exemplo, unindo os DataFrames naquela coluna), pode-se usar o método col
em cada objeto DataFrame para se referir a uma coluna naquele objeto (por exemplo, df1.col("name")
e df2.col("name")
).
O exemplo a seguir demonstra como usar o método col
para se referir a uma coluna em um DataFrame específico. O exemplo une dois objetos DataFrame, ambos contendo uma coluna chamada value
. O exemplo usa o método as
do objeto Column
para mudar os nomes das colunas no DataFrame recém-criado.
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a"))).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"));
dfJoined.show();
Como utilizar aspas duplas em torno de identificadores de objetos (nomes de tabela, nomes de coluna, etc.)¶
Os nomes de bancos de dados, esquemas, tabelas e estágios que você especificar devem estar em conformidade com requisitos de identificadores do Snowflake. Quando você especifica um nome, o Snowflake considera que o nome está em maiúsculas. Por exemplo, as chamadas a seguir são equivalentes:
// The following calls are equivalent:
df.select(Functions.col("id123"));
df.select(Functions.col("ID123"));
Se o nome não estiver de acordo com os requisitos de identificador, deve-se usar aspas duplas ("
) em torno do nome. Use uma barra invertida (\
) para escapar o caractere de aspas duplas dentro de uma cadeia de caracteres literal Scala. Por exemplo, como o nome da tabela a seguir não começa com uma letra ou um sublinhado, você deve usar aspas duplas em torno do nome:
DataFrame df = session.table("\"10tablename\"");
Observe que ao especificar o nome de uma coluna, você não precisa usar aspas duplas em torno do nome. A biblioteca do Snowpark coloca automaticamente o nome da coluna entre aspas duplas caso o nome não cumpra com os requisitos de identificador:
// The following calls are equivalent:
df.select(Functions.col("3rdID"));
df.select(Functions.col("\"3rdID\""));
// The following calls are equivalent:
df.select(Functions.col("id with space"));
df.select(Functions.col("\"id with space\""));
Se você já tiver acrescentado aspas duplas em torno de um nome de coluna, a biblioteca não inserirá aspas duplas adicionais em torno do nome.
Em alguns casos, o nome da coluna pode conter caracteres de aspas duplas:
describe table quoted;
+------------------------+ ...
| name | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted" | ...
+------------------------+ ...
Como explicado em Requisitos para identificadores, para cada caractere de aspas duplas dentro de um identificador de aspas duplas, deve-se usar dois caracteres de aspas duplas (por exemplo, "name_with_""air""_quotes"
e """column_name_quoted"""
):
DataFrame dfTable = session.table("quoted");
dfTable.select("\"name_with_\"\"air\"\"_quotes\"");
dfTable.select("\"\"\"column_name_quoted\"\"\"");
Tenha em mente que quando um identificador é incluído entre aspas duplas (se você adicionou explicitamente as aspas ou se a biblioteca adicionou as aspas para você), o Snowflake trata o identificador diferenciando maiúsculas de minúsculas:
// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(Functions.col("id with space"));
df.select(Functions.col("ID WITH SPACE"));
Como usar literais como objetos de coluna¶
Para usar um literal em um método que passa um objeto Column
, crie um objeto Column
para o literal passando o literal para o método estático lit
na classe Functions
. Por exemplo:
// Show the first 10 rows in which category_id is greater than 5.
// Use `Functions.lit(5)` to create a Column object for the literal 5.
DataFrame df = session.table("sample_product_data");
df.filter(Functions.col("category_id").gt(Functions.lit(5))).show();
Se o literal é um ponto flutuante ou valor duplo em Java (por exemplo, 0.05
é tratado como Double por padrão), a biblioteca do Snowpark gera um SQL que implicitamente converte o valor para o tipo de dados correspondente do Snowpark (por exemplo, 0.05::DOUBLE
). Isso pode produzir um valor aproximado que difira do número exato especificado.
Por exemplo, o código a seguir não exibe linhas correspondentes, mesmo que o filtro (que corresponde a valores maiores ou iguais a 0.05
) deva corresponder às linhas do DataFrame:
// Create a DataFrame that contains the value 0.05.
DataFrame df = session.sql("select 0.05 :: Numeric(5, 2) as a");
// Applying this filter results in no matching rows in the DataFrame.
df.filter(Functions.col("a").leq(Functions.lit(0.06).minus(Functions.lit(0.01)))).show();
O problema é que Functions.lit(0.06)
e Functions.lit(0.01)
produzem valores aproximados para 0.06
e 0.01
, não os valores exatos.
Para evitar esse problema, converta o literal para o tipo Snowpark que você deseja usar. Por exemplo, para usar um NUMBER com uma precisão de 5 e escala de 2:
import com.snowflake.snowpark_java.types.*;
...
df.filter(Functions.col("a").leq(Functions.lit(0.06).cast(DataTypes.createDecimalType(5, 2)).minus(Functions.lit(0.01).cast(DataTypes.createDecimalType(5, 2))))).show();
Como converter um objeto de coluna para um tipo específico¶
Para converter um objeto Column
a um tipo específico, chame o método cast e passe um objeto do tipo com.snowflake.snowpark_java.types package. Por exemplo, para converter um literal para um NUMBER com uma precisão de 5 e escala de 2:
// Import for the DecimalType class..
import com.snowflake.snowpark_java.types.*;
Column decimalValue = Functions.lit(0.05).cast(DataTypes.createDecimalType(5,2));
Como encadear chamadas a métodos¶
Como cada método que transforma um objeto DataFrame returna um novo objeto DataFrame que tem a transformação aplicada, você pode encadear chamadas a métodos para produzir um novo DataFrame que é transformado de maneiras adicionais.
O exemplo a seguir retorna um DataFrame que está configurado para:
Consultar a tabela
sample_product_data
.Retornar a linha com
id = 1
.Selecionar as colunas
name
eserial_number
.
DataFrame dfProductInfo = session.table("sample_product_data").filter(Functions.col("id").equal_to(Functions.lit(1))).select(Functions.col("name"), Functions.col("serial_number"));
dfProductInfo.show();
Neste exemplo:
session.table("sample_product_data")
retorna um DataFrame para a tabelasample_product_data
.Embora o DataFrame ainda não contenha os dados da tabela, o objeto contém as definições das colunas da tabela.
filter(Functions.col("id").equal_to(Functions.lit(1)))
retorna um DataFrame para a tabelasample_product_data
que está preparada para retornar a linha comid = 1
.Observe novamente que o DataFrame ainda não contém a linha correspondente da tabela. A linha correspondente não é obtida até que você chame um método de ação.
select(Functions.col("name"), Functions.col("serial_number"))
retorna um DataFrame que contém as colunasname
eserial_number
para a linha da tabelasample_product_data
que temid = 1
.
Quando você fizer chamadas a métodos encadeadas, tenha em mente que a ordem das chamadas é importante. Cada chamada de método retorna um DataFrame que foi transformado. Certifique-se de que as chamadas subsequentes funcionem com os DataFrame transformados.
Por exemplo, no código abaixo, o método select
retorna um DataFrame que contém apenas duas colunas: name
e serial_number
. A chamada ao método filter
nesse DataFrame falha porque usa a coluna id
, que não está no DataFrame transformado.
// This fails with the error "invalid identifier 'ID'."
DataFrame dfProductInfo = session.table("sample_product_data").select(Functions.col("name"), Functions.col("serial_number")).filter(Functions.col("id").equal_to(Functions.lit(1)));
dfProductInfo.show();
Em contraste, o seguinte código é executado com sucesso porque o método filter()
é chamado em um DataFrame que contém todas as colunas da tabela sample_product_data
(incluindo a coluna id
):
// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
DataFrame dfProductInfo = session.table("sample_product_data").filter(Functions.col("id").equal_to(Functions.lit(1))).select(Functions.col("name"), Functions.col("serial_number"));
dfProductInfo.show();
Tenha em mente que talvez seja necessário fazer as chamadas ao método select
e filter
em uma ordem diferente daquela em que você usaria as palavras-chave equivalentes (SELECT e WHERE) em uma instrução SQL.
Como limitar o número de linhas em um DataFrame¶
Para limitar o número de linhas em um DataFrame, você pode usar o método de transformação limit.
A API do Snowpark também fornece métodos de ação para recuperar e imprimir um número limitado de linhas:
o método de ação first (para executar a consulta e retornar as primeiras
n
linhas)o método de ação show (para executar a consulta e imprimir as primeiras
n
linhas)
Esses métodos efetivamente adicionam uma cláusula LIMIT à instrução SQL que é executada.
Como explicado nas notas de uso para LIMIT, os resultados não são determinísticos, a menos que você especifique uma ordem de classificação (ORDER BY) em conjunto com LIMIT.
Para manter a cláusula ORDER BY com a cláusula LIMIT (por exemplo, para que ORDER BY não esteja em uma subconsulta separada), você deve chamar o método que limita os resultados do DataFrame retornado pelo método sort
.
Por exemplo, se você estiver encadeando chamadas a métodos:
DataFrame df = session.table("sample_product_data");
// Limit the number of rows to 5, sorted by parent_id.
DataFrame dfSubset = df.sort(Functions.col("parent_id")).limit(5);
// Return the first 5 rows, sorted by parent_id.
Row[] arrayOfRows = df.sort(Functions.col("parent_id")).first(5);
// Print the first 5 rows, sorted by parent_id.
df.sort(Functions.col("parent_id")).show(5);
Como obter definições das colunas¶
Para obter a definição das colunas no conjunto de dados para o DataFrame, chame o método schema
. Esse método retorna um objeto StructType
que contém uma Array
de StructField
objetos. Cada objeto StructField
contém a definição de uma coluna.
import com.snowflake.snowpark_java.types.*;
...
// Get the StructType object that describes the columns in the
// underlying rowset.
StructType tableSchema = session.table("sample_product_data").schema();
System.out.println("Schema for sample_product_data: " + tableSchema);
No objeto devolvido StructType
, os nomes das colunas são sempre normalizados. Identificadores sem aspas são retornados em maiúsculas, e identificadores entre aspas são devolvidos na caixa em que foram definidos.
O exemplo a seguir cria um DataFrame contendo as colunas ID
e 3rd
. Para o nome de coluna 3rd
, a biblioteca do Snowpark coloca o nome automaticamente entre aspas duplas ("3rd"
) porque o nome não cumpre os requisitos para um identificador.
O exemplo chama o método schema
e depois chama o método names
no objeto retornado StructType
para obter uma array de nomes de colunas. Os nomes são normalizados no método StructType
retornado pelo método schema
.
import java.util.Arrays;
...
// Create a DataFrame containing the "id" and "3rd" columns.
DataFrame dfSelectedColumns = session.table("sample_product_data").select(Functions.col("id"), Functions.col("3rd"));
// Print out the names of the columns in the schema.
System.out.println(Arrays.toString(dfSelectedColumns.schema().names()));
Junção de DataFrames¶
Para unir objetos DataFrame, chame o método join.
As seções a seguir explicam como usar DataFrames para realizar uma junção:
Configuração dos dados de exemplo para as junções¶
Os exemplos nas próximas seções utilizam dados de exemplo que você pode configurar executando as seguintes instruções SQL:
CREATE OR REPLACE TABLE sample_a (
id_a INTEGER,
name_a VARCHAR,
value INTEGER
);
INSERT INTO sample_a (id_a, name_a, value) VALUES
(10, 'A1', 5),
(40, 'A2', 10),
(80, 'A3', 15),
(90, 'A4', 20)
;
CREATE OR REPLACE TABLE sample_b (
id_b INTEGER,
name_b VARCHAR,
id_a INTEGER,
value INTEGER
);
INSERT INTO sample_b (id_b, name_b, id_a, value) VALUES
(4000, 'B1', 40, 10),
(4001, 'B2', 10, 5),
(9000, 'B3', 80, 15),
(9099, 'B4', NULL, 200)
;
CREATE OR REPLACE TABLE sample_c (
id_c INTEGER,
name_c VARCHAR,
id_a INTEGER,
id_b INTEGER
);
INSERT INTO sample_c (id_c, name_c, id_a, id_b) VALUES
(1012, 'C1', 10, NULL),
(1040, 'C2', 40, 4000),
(1041, 'C3', 40, 4001)
;
Como especificar as colunas para a junção¶
Com o método DataFrame.join
, você pode especificar as colunas a serem usadas de uma das seguintes maneiras:
Especifique uma expressão de coluna que descreva a condição de junção.
Especifique uma ou mais colunas que devem ser usadas como colunas comuns na junção.
O exemplo a seguir realiza uma junção interna na coluna chamada id_a
:
// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")));
dfJoined.show();
Observe que o exemplo usa o método DataFrame.col
para especificar a condição a ser usada para a junção. Consulte Como especificar colunas e expressões para obter mais informações sobre este método.
Isso imprime a seguinte saída:
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
----------------------------------------------------------------------
Nomes de colunas idênticos duplicados no resultado da junção¶
No DataFrame resultante de uma junção, a biblioteca Snowpark utiliza os nomes das colunas encontradas nas tabelas que foram unidas, mesmo quando os nomes das colunas são idênticos entre tabelas. Quando isto acontece, estes nomes de coluna são duplicados no DataFrame resultante da junção. Para acessar uma coluna duplicada pelo nome, chame o método col
no DataFrame que representa a tabela original da coluna. (Para obter mais informações sobre como especificar colunas, consulte Como se referir a colunas em diferentes DataFrames).
O código no exemplo a seguir une dois DataFrames, depois chama o método select
no DataFrame unido. Ele especifica as colunas a serem selecionadas chamando o método col
da variável que representa os respectivos objetos DataFrame: dfRhs
e dfLhs
. Ele usa o método as
para dar às colunas novos nomes no DataFrame que o método select
cria.
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")));
DataFrame dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"));
dfSelected.show();
Isso imprime a seguinte saída:
------------------------------
|"LEFTVALUE" |"RIGHTVALUE" |
------------------------------
|5 |5 |
|10 |10 |
|15 |15 |
------------------------------
Eliminação de duplicação de colunas antes de salvar ou armazenar¶
Observe que quando um DataFrame resultante de uma junção inclui nomes de colunas duplicadas, você deve eliminar a duplicação ou renomear colunas para remover a duplicação no DataFrame antes de salvar o resultado em uma tabela ou armazenar o DataFrame em cache. Para nomes de colunas duplicadas em um DataFrame que você salva em uma tabela ou cache, a biblioteca do Snowpark substituirá os nomes de colunas duplicadas por alias para que não sejam mais duplicadas.
O exemplo seguinte ilustra como a saída de um DataFrame em cache poderia aparecer se os nomes das colunas ID_A
e VALUE
fossem duplicados em uma junção de duas tabelas, e depois não fossem deduplicados ou renomeados antes de armazenar o resultado em cache.
--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A" |"NAME_A" |"l_ZSz7_VALUE" |"ID_B" |"NAME_B" |"r_heec_ID_A" |"r_heec_VALUE" |
--------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
--------------------------------------------------------------------------------------------------
Como realizar uma junção natural¶
Para realizar uma junção natural (onde DataFrames são unidas em colunas que têm o mesmo nome), chame o método .naturalJoin.
O exemplo a seguir junta os DataFrames para as tabelas sample_a
e sample_b
nas colunas comuns (a coluna id_a
):
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.naturalJoin(dfRhs);
dfJoined.show();
Isso imprime a seguinte saída:
---------------------------------------------------
|"ID_A" |"VALUE" |"NAME_A" |"ID_B" |"NAME_B" |
---------------------------------------------------
|10 |5 |A1 |4001 |B2 |
|40 |10 |A2 |4000 |B1 |
|80 |15 |A3 |9000 |B3 |
---------------------------------------------------
Como especificar o tipo de junção¶
Por padrão, o método DataFrame.join
cria uma junção interna. Para especificar um tipo diferente de junção, defina o argumento joinType
para um dos seguintes valores:
Tipo de junção |
|
---|---|
Junção interna |
|
Junção externa esquerda |
|
Junção externa direita |
|
Junção externa completa |
|
Junção cruzada |
|
Por exemplo:
// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")), "left");
dfLeftOuterJoin.show();
Isso imprime a seguinte saída:
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|40 |A2 |10 |4000 |B1 |40 |10 |
|10 |A1 |5 |4001 |B2 |10 |5 |
|80 |A3 |15 |9000 |B3 |80 |15 |
|90 |A4 |20 |NULL |NULL |NULL |NULL |
----------------------------------------------------------------------
Junção de tabelas múltiplas¶
Para juntar tabelas múltiplas:
Crie um DataFrame para cada tabela.
Chame o método
DataFrame.join
no primeiro DataFrame passando o segundo DataFrame.Usando o DataFrame retornado pelo método
join
, chame o métodojoin
, passando o terceiro DataFrame.
Você pode encadear as chamadas join
como mostrado abaixo:
DataFrame dfFirst = session.table("sample_a");
DataFrame dfSecond = session.table("sample_b");
DataFrame dfThird = session.table("sample_c");
DataFrame dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a").equal_to(dfSecond.col("id_a"))).join(dfThird, dfFirst.col("id_a").equal_to(dfThird.col("id_a")));
dfJoinThreeTables.show();
Isso imprime a seguinte saída:
------------------------------------------------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |"ID_C" |"NAME_C" |"ID_A" |"ID_B" |
------------------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |1012 |C1 |10 |NULL |
|40 |A2 |10 |4000 |B1 |40 |10 |1040 |C2 |40 |4000 |
|40 |A2 |10 |4000 |B1 |40 |10 |1041 |C3 |40 |4001 |
------------------------------------------------------------------------------------------------------------
Como realizar uma autojunção¶
Se você precisar unir uma tabela consigo mesma em diferentes colunas, você não pode realizar a autojunção com um único DataFrame. Os exemplos a seguir que usam um único DataFrame para realizar uma autojunção falham porque as expressões da coluna para "id"
estão presentes nos lados esquerdo e direito da junção:
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, Functions.col("id").equal_to(Functions.col("parent_id")));
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, df.col("id").equal_to(df.col("parent_id")));
Ambos os exemplos falham, com a seguinte exceção:
Exception in thread "main" com.snowflake.snowpark_java.SnowparkClientException:
Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
Instead, join this DataFrame to a clone() of itself.
Em vez disso, use o método clone para criar um clone do objeto DataFrame, e use os dois objetos DataFrame para realizar a junção:
// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
DataFrame dfLhs = session.table("sample_product_data");
// Clone the DataFrame object to use as the right-hand side of the join.
DataFrame dfRhs = dfLhs.clone();
// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id").equal_to(dfRhs.col("parent_id")));
dfJoined.show();
Se você quiser realizar uma autojunção na mesma coluna, chame o método join
que passa no nome da coluna (ou uma array de nomes de colunas) para a cláusula USING
:
// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, "key");
Como executar uma ação para avaliar um DataFrame¶
Como mencionado anteriormente, o DataFrame é avaliado lentamente, o que significa que a instrução SQL não é enviada ao servidor para execução até que você execute uma ação. Uma ação faz com que o DataFrame seja avaliado e envia a instrução SQL correspondente para o servidor para execução.
As seções a seguir explicam como executar uma ação de forma síncrona e assíncrona em um DataFrame:
Como executar uma ação de forma síncrona¶
Para executar uma ação de forma síncrona, chame um dos seguintes métodos de ação:
Método para executar uma ação de forma síncrona |
Descrição |
---|---|
|
Avalia o DataFrame e retorna o conjunto de dados resultante como uma |
|
Avalia o DataFrame e retorna um |
|
Avalia o DataFrame e retorna o número de linhas. |
|
Avalia o DataFrame e imprime as linhas no console. Observe que esse método limita o número de linhas a 10 (por padrão). Consulte Como imprimir as linhas em um DataFrame. |
|
Executa a consulta, cria uma tabela temporária e coloca os resultados na tabela. O método retorna um objeto |
|
Armazena os dados no DataFrame para a tabela especificada. Consulte Como salvar dados em uma tabela. |
|
Copia os dados no DataFrame para a tabela especificada. Consulte Como copiar dados de arquivos para uma tabela. |
|
Deleta linhas na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela. |
|
Atualiza linhas na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela. |
|
Funde linhas na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela. |
Por exemplo, para executar a consulta e retornar o número de resultados, chame o método count
:
// Create a DataFrame for the "sample_product_data" table.
DataFrame dfProducts = session.table("sample_product_data");
// Send the query to the server for execution and
// print the count of rows in the table.
System.out.println("Rows returned: " + dfProducts.count());
Você também pode chamar métodos de ação para:
Nota: Se você estiver chamando o método schema
para obter as definições das colunas no DataFrame, você não precisa chamar um método de ação.
Como executar uma ação de forma assíncrona¶
Nota
Esse recurso foi introduzido no Snowpark 0.11.0.
Para executar uma ação de forma assíncrona, chame o método async
para retornar um objeto “ator assíncrono” (por exemplo, DataFrameAsyncActor
) e chamar um método de ação assíncrona nesse objeto.
Esses métodos de ação de um objeto de ator assíncrono retornam um objeto TypedAsyncJob
, que você pode usar para verificar o status da ação assíncrona e obter os resultados da ação.
As próximas seções explicam como executar as ações de forma assíncrona e verificar os resultados.
Explicação do fluxo básico de ações assíncronas¶
Você pode usar os seguintes métodos para executar uma ação de forma assíncrona:
Método para realizar uma ação de forma assíncrona |
Descrição |
---|---|
|
Avalia o DataFrame de forma assíncrona para obter o conjunto de dados resultante como um |
|
Avalia o DataFrame de forma assíncrona para obter um |
|
Avalia o DataFrame de forma assíncrona para obter o número de linhas. |
|
Salva os dados na tabela DataFrame na tabela especificada de forma assíncrona. Consulte Como salvar dados em uma tabela. |
|
Copia os dados do DataFrame para a tabela especificada de forma assíncrona. Consulte Como copiar dados de arquivos para uma tabela. |
|
Apaga as linhas na tabela especificada de forma assíncrona. Consulte Atualização, eliminação e fusão de linhas em uma tabela. |
|
Atualiza as linhas na tabela especificada de forma assíncrona. Consulte Atualização, eliminação e fusão de linhas em uma tabela. |
A partir do objeto TypedAsyncJob retornado, você pode fazer o seguinte:
Para determinar se a ação foi concluída, chame o método
isDone
.Para obter a ID da consulta que corresponde à ação, chame o método
getQueryId
.Para retornar os resultados da ação (por exemplo, o
Array
de objetosRow
para o métodocollect
ou a contagem de linhas para o métodocount
), chame o métodogetResult
.Note que
getResult
é uma chamada com bloqueio.Para cancelar a ação, chame o método
cancel
.
Por exemplo, para executar uma consulta de forma assíncrona e obter os resultados como uma Array
de objetos Row
, chame async().collect()
:
import java.util.Arrays;
// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
DataFrame df = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
// Execute the query asynchronously.
// This call does not block.
TypedAsyncJob<Row[]> asyncJob = df.async().collect();
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
Row[] results = asyncJob.getResult();
System.out.println(Arrays.toString(results));
Para executar a consulta de forma assíncrona e obter o número de resultados, chame async().count()
:
// Create a DataFrame for the "sample_product_data" table.
DataFrame dfProducts = session.table("sample_product_data");
// Execute the query asynchronously.
// This call does not block.
TypedAsyncJob<Long> asyncJob = dfProducts.async().count();
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// Print the count of rows in the table.
// Note that getResult is a blocking call.
System.out.println("Rows returned: " + asyncJob.getResult());
Como especificar o número máximo de segundos a esperar¶
Ao chamar o método getResult
, você pode usar o argumento maxWaitTimeInSeconds
para especificar o número máximo de segundos para esperar que a consulta seja concluída antes de tentar obter os resultados. Por exemplo:
// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
Row[] results = asyncJob.getResult(10);
Se você omitir esse argumento, o método espera pelo número máximo de segundos especificado pela propriedade de configuração snowpark_request_timeout_in_seconds. (Essa é uma propriedade que você pode definir ao criar o objeto Session).
Como acessar uma consulta assíncrona por ID¶
Se você tiver a ID de uma consulta assíncrona que você enviou anteriormente, você pode chamar o método Session.createAsyncJob
para criar um objeto AsyncJob que você pode usar para verificar o status da consulta, obter os resultados da consulta ou cancelar a consulta.
Note que, ao contrário de TypedAsyncJob
, AsyncJob
não fornece um método getResult
para a obtenção dos resultados. Em vez disso, se você precisar obter os resultados, chame o método getRows
ou getIterator
.
Por exemplo:
import java.util.Arrays;
...
AsyncJob asyncJob = session.createAsyncJob(myQueryId);
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
Row[] rows = asyncJob.getRows();
System.out.println(Arrays.toString(rows));
Como obter linhas em um DataFrame¶
Depois de você especificar como o DataFrame deve ser transformado, você pode chamar um método de ação para executar uma consulta e retornar os resultados. Você pode retornar todas as linhas em um Array
, ou você pode retornar um Iterator
que lhe permite iterar sobre os resultados linha por linha. Neste último caso, se a quantidade de dados for grande, as linhas são carregadas na memória por parte para evitar o carregamento de uma grande quantidade de dados na memória.
Como retornar todas as linhas¶
Para retornar todas as linhas de uma vez, chame o método collect. Esse método retorna uma array de objetos Row. Para obter os valores da linha, chame o método getType
(por exemplo, getString
, getInt
, etc.).
Por exemplo:
Row[] rows = session.table("sample_product_data").select(Functions.col("name"), Functions.col("category_id")).sort(Functions.col("name")).collect();
for (Row row : rows) {
System.out.println("Name: " + row.getString(0) + "; Category ID: " + row.getInt(1));
}
Como retornar um iterador para as linhas¶
Se você quiser usar um Iterator
para iterar sobre os objetos Row nos resultados, chame toLocalIterator. Se a quantidade de dados nos resultados for grande, o método carregará as linhas por parte para evitar carregar todas as linhas de uma só vez na memória.
Por exemplo:
import java.util.Iterator;
Iterator<Row> rowIterator = session.table("sample_product_data").select(Functions.col("name"), Functions.col("category_id")).sort(Functions.col("name")).toLocalIterator();
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
System.out.println("Name: " + row.getString(0) + "; Category ID: " + row.getInt(1));
}
Como retornar as primeiras n
linhas¶
Para retornar as primeiras n
linhas, chame o método first passando o número de linhas a serem retornadas.
Como explicado em Como limitar o número de linhas em um DataFrame, os resultados são não determinísticos. Se você quiser que os resultados sejam determinísticos, chame esse método em um DataFrame ordenado (df.sort().first()
).
Por exemplo:
import java.util.Arrays;
...
DataFrame df = session.table("sample_product_data");
Row[] rows = df.sort(Functions.col("name")).first(5);
System.out.println(Arrays.toString(rows));
Como imprimir as linhas em um DataFrame¶
Para imprimir as primeiras 10 linhas no DataFrame para o console, chame o método show. Para imprimir um número diferente de linhas, passe o número de linhas a serem impressas.
Como explicado em Como limitar o número de linhas em um DataFrame, os resultados são não determinísticos. Se você quiser que os resultados sejam determinísticos, chame esse método em um DataFrame ordenado (df.sort().show()
).
Por exemplo:
DataFrame df = session.table("sample_product_data");
df.sort(Functions.col("name")).show();
Atualização, eliminação e fusão de linhas em uma tabela¶
Nota
Esse recurso foi introduzido no Snowpark 0.7.0.
Quando você chama Session.table
para criar um objeto DataFrame
para uma tabela, o método retorna um objeto Updatable
, que estende o DataFrame
com métodos adicionais para atualização e exclusão de dados na tabela. (Consulte Updatable.)
Se você precisar atualizar ou excluir linhas em uma tabela, você pode usar os seguintes métodos da classe Updatable
:
Chame
update
ouupdateColumn
para atualizar as linhas existentes na tabela. Consulte Atualização de linhas em uma tabela.Chame
delete
para apagar linhas de uma tabela. Consulte Como excluir linhas de uma tabela.Chame
merge
para inserir, atualizar e excluir linhas de uma tabela, com base nos dados em uma segunda tabela ou subconsulta. (Esse é o equivalente do comando MERGE em SQL). Consulte Fusão de linhas em uma tabela.
Atualização de linhas em uma tabela¶
Para atualizar as linhas em uma tabela, chame o método update
ou updateColumn
, passando um Map
que associa as colunas a serem atualizadas e os valores correspondentes a serem atribuídos a essas colunas:
Para especificar os nomes das colunas como cadeias de caracteres no
Map
, chameupdateColumn
.Para especificar objetos
Column
noMap
, chameupdate
.
Ambos os métodos retornam um objeto UpdateResult
contendo o número de linhas que foram atualizadas. (Consulte UpdateResult.)
Nota
Ambos os métodos são métodos de ação, o que significa que chamar o método envia instruções SQL ao servidor para execução.
Por exemplo, para substituir os valores na coluna chamada count
pelo valor 1
, se você deseja usar um Map
que associa o nome da coluna (a String
) com o valor correspondente, chame updateColumn
:
import java.util.HashMap;
import java.util.Map;
...
Map<String, Column> assignments = new HashMap<>();
assignments.put("3rd", Functions.lit(1));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.updateColumn(assignments);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Se você quiser usar um objeto Column
no Map
para identificar a coluna a ser atualizada, chame update
:
import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(1));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.update(assignments);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Se a atualização só deve ser feita quando uma condição for cumprida, você pode especificar essa condição como um argumento. Por exemplo, para substituir os valores na coluna chamada count
por 2
para linhas nas quais a coluna category_id
tem o valor 20
:
import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(2));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.update(assignments, Functions.col("category_id").equal_to(Functions.lit(20)));
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Se você precisar basear a condição em uma junção com um objeto DataFrame
diferente, você pode passar esse DataFrame
como argumento e usar o DataFrame
na condição. Por exemplo, para substituir os valores na coluna chamada count
por 3
para linhas nas quais a coluna category_id
corresponde à category_id
no DataFrame
dfParts
:
import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(3));
Updatable updatableDf = session.table("sample_product_data");
DataFrame dfParts = session.table("parts");
UpdateResult updateResult = updatableDf.update(assignments, updatableDf.col("category_id").equal_to(dfParts.col("category_id")), dfParts);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Como excluir linhas de uma tabela¶
Para o método delete
, você pode especificar uma condição que identifica as linhas a serem excluídas, e pode basear essa condição em uma junção com outro DataFrame. delete
retorna um objeto DeleteResult
, contendo o número de linhas que foram excluídas. (Consulte DeleteResult.)
Nota
delete
é um método de ação, o que significa que chamar o método envia instruções SQL ao servidor para execução.
Por exemplo, para apagar as linhas que têm o valor 1
na coluna category_id
:
Updatable updatableDf = session.table("sample_product_data");
DeleteResult deleteResult = updatableDf.delete(updatableDf.col("category_id").equal_to(Functions.lit(1)));
System.out.println("Number of rows deleted: " + deleteResult.getRowsDeleted());
Se a condição se refere a colunas em um DataFrame diferente, passe esse DataFrame como o segundo argumento. Por exemplo, para excluir as linhas nas quais a coluna category_id
corresponde à category_id
no DataFrame
dfParts
, passe dfParts
como o segundo argumento:
Updatable updatableDf = session.table("sample_product_data");
DeleteResult deleteResult = updatableDf.delete(updatableDf.col("category_id").equal_to(dfParts.col("category_id")), dfParts);
System.out.println("Number of rows deleted: " + deleteResult.getRowsDeleted());
Fusão de linhas em uma tabela¶
Para inserir, atualizar e excluir linhas em uma tabela com base nos valores de uma segunda tabela ou subconsulta (o equivalente ao comando MERGE em SQL), faça o seguinte:
No objeto
Updatable
para a tabela onde você quer fundir os dados, chame o métodomerge
, passando o objetoDataFrame
para a outra tabela e a expressão da coluna para a condição de junção.Isso retorna um objeto
MergeBuilder
que você pode usar para especificar as ações a serem tomadas (por exemplo, inserir, atualizar ou excluir) nas linhas que correspondem e nas que não correspondem aos critérios. (Consulte MergeBuilder.)Usando o objeto
MergeBuilder
:Para especificar a atualização ou exclusão que deve ser feita nas linhas correspondentes, chame o método
whenMatched
.Se você precisar especificar uma condição adicional quando as linhas devem ser atualizadas ou excluídas, você pode passar uma expressão de coluna para essa condição.
Esse método retorna um objeto
MatchedClauseBuilder
que você pode usar para especificar a ação a ser executada. (Consulte MatchedClauseBuilder.)Chame o método
update
oudelete
no objetoMatchedClauseBuilder
para especificar a ação de atualização ou exclusão que deve ser executada nas linhas correspondentes. Esses métodos devolvem um objetoMergeBuilder
que você pode usar para especificar cláusulas adicionais.Para especificar a inserção que deve ser feita quando as linhas não correspondem, chame o método
whenNotMatched
.Se você precisar especificar uma condição adicional quando as linhas devem ser inseridas, você pode passar uma expressão de coluna para essa condição.
Esse método retorna um objeto
NotMatchedClauseBuilder
que você pode usar para especificar a ação a ser executada. (Consulte NotMatchedClauseBuilder.)Chame o método
insert
no objetoNotMatchedClauseBuilder
para especificar a ação de inserção que deve ser executada quando as linhas não correspondem. Esses métodos devolvem um objetoMergeBuilder
que você pode usar para especificar cláusulas adicionais.
Quando terminar de especificar as inserções, atualizações e exclusões que devem ser realizadas, chame o método
collect
do objetoMergeBuilder
para realizar as inserções, atualizações e exclusões especificadas na tabela.collect
retorna um objetoMergeResult
contendo o número de linhas que foram inseridas, atualizadas e excluídas. (Consulte MergeResult.)
O exemplo a seguir insere uma linha com as colunas id
e value
da tabela source
na tabela target
se a tabela target
não contiver uma linha com uma ID correspondente:
MergeResult mergeResult = target.merge(source, target.col("id").equal_to(source.col("id")))
.whenNotMatched().insert([source.col("id"), source.col("value")])
.collect();
O exemplo a seguir atualiza uma linha na tabela target
com o valor da coluna value
da linha na tabela source
que tem a mesma ID:
import java.util.HashMap;
import java.util.Map;
...
Map<String, Column> assignments = new HashMap<>();
assignments.put("value", source.col("value"));
MergeResult mergeResult = target.merge(source, target.col("id").equal_to(source.col("id")))
.whenMatched().update(assignments)
.collect();
Como salvar dados em uma tabela¶
Você pode salvar o conteúdo de um DataFrame em uma tabela nova ou existente. Para fazer isso, você deve ter os seguintes privilégios:
Privilégios CREATE TABLE sobre o esquema, se a tabela não existir.
Privilégios INSERT sobre a tabela.
Para salvar o conteúdo de um DataFrame em uma tabela:
Chame o método write do DataFrame para obter um objeto DataFrameWriter.
Chame o método mode do objeto
DataFrameWriter
, passando um objeto SaveMode que especifica suas preferências de escrita para a tabela:Para inserir linhas, passe
SaveMode.Append
.Para substituir a tabela existente, passe
SaveMode.Overwrite
.
Esse método retorna o mesmo objeto
DataFrameWriter
configurado com o modo especificado.Se você estiver inserindo linhas em uma tabela existente (
SaveMode.Append
) e os nomes das colunas no DataFrame corresponderem aos nomes das colunas na tabela, chame a opção DataFrameWriter.option, passando"columnOrder"
e"name"
como argumentos.Nota
Esse método foi introduzido no Snowpark 1.4.0.
Por padrão, a opção
columnOrder
é definida como"index"
, o que significa que oDataFrameWriter
insere os valores na ordem em que as colunas aparecem. Por exemplo,DataFrameWriter
insere o valor da primeira coluna do DataFrame na primeira coluna da tabela, a segunda coluna do DataFrame na segunda coluna da tabela, etc.Esse método retorna o mesmo objeto
DataFrameWriter
configurado com a opção especificada.Chame o método saveAsTable do objeto
DataFrameWriter
para salvar o conteúdo do DataFrame em uma tabela especificada.Você não precisa chamar um método separado (por exemplo,
collect
) para executar a instrução SQL que salva os dados na tabela.saveAsTable
é um método de ação que executa a instrução SQL.
O exemplo a seguir substitui uma tabela existente (identificada pela variável tableName
) com o conteúdo do DataFrame df
:
df.write().mode(SaveMode.Overwrite).saveAsTable(tableName);
O exemplo a seguir insere linhas do DataFrame df
em uma tabela existente (identificada pela variável tableName
). Nesse exemplo, a tabela e o DataFrame ambos contêm as colunas c1
e c2
.
O exemplo demonstra a diferença entre configurar a opção columnOrder
para "name"
(que insere valores nas colunas da tabela com os mesmos nomes das colunas do DataFrame) e usar a opção padrão columnOrder
(que insere valores nas colunas da tabela com base na ordem das colunas no DataFrame).
DataFrame df = session.sql("SELECT 1 AS c2, 2 as c1");
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write().mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName);
// With the default value of the columnOrder option ("index"), the DataFrameWriter uses the column positions
// and inserts a row with the values (1, 2).
df.write().mode(SaveMode.Append).saveAsTable(tableName);
Como criar uma exibição a partir de um DataFrame¶
Para criar uma exibição a partir de um DataFrame, chame o método createOrReplaceView:
df.createOrReplaceView("db.schema.viewName");
Note que chamar createOrReplaceView
cria imediatamente a nova exibição. Mais importante, isso não faz com que o DataFrame seja avaliado. (O DataFrame em si não é avaliado até que você execute uma ação).
As exibições que você cria ao chamar createOrReplaceView
são persistentes. Se você não precisar mais dessa exibição, você pode excluir a exibição manualmente.
Se você precisar criar uma exibição temporária apenas para a sessão, chame o método createOrReplaceTempView:
df.createOrReplaceTempView("db.schema.viewName");
Como criar um cache do DataFrame¶
Em alguns casos, pode ser necessário realizar uma consulta complexa e manter os resultados para uso em operações subsequentes (em vez de executar a mesma consulta novamente). Nesses casos, você pode armazenar o conteúdo de um DataFrame chamando o método cacheResult.
Esse método:
Executa a consulta.
Você não precisa chamar um método de ação separado para recuperar os resultados antes de chamar
cacheResult
. OcacheResult
é um método de ação que executa a consulta.Salva os resultados em uma tabela temporária
Como
cacheResult
cria uma tabela temporária, você deve ter o privilégio CREATE TABLE sobre o esquema que está em uso.Retorna um objeto HasCachedResult, que dá acesso aos resultados da tabela temporária.
Como
HasCachedResult
estendeDataFrame
, você pode realizar algumas das mesmas operações sobre estes dados em cache que pode realizar em um DataFrame.
Nota
Como cacheResult
executa a consulta e salva os resultados em uma tabela, o método pode resultar em um aumento dos custos de computação e armazenamento.
Por exemplo:
// Set up a DataFrame to query a table.
DataFrame df = session.table("sample_product_data").filter(Functions.col("category_id").gt(Functions.lit(10)));
// Retrieve the results and cache the data.
HasCachedResult cachedDf = df.cacheResult();
// Create a DataFrame containing a subset of the cached data.
DataFrame dfSubset = cachedDf.filter(Functions.col("category_id").equal_to(Functions.lit(20))).select(Functions.col("name"), Functions.col("category_id"));
dfSubset.show();
Note que o DataFrame original não é afetado quando você chama esse método. Por exemplo, suponha que dfTable
seja um DataFrame para a tabela sample_product_data
:
HasCachedResult dfTempTable = dfTable.cacheResult();
Depois de chamar cacheResult
, dfTable
ainda aponta para a tabela sample_product_data
, e você pode continuar a usar dfTable
para consultar e atualizar essa tabela.
Para usar os dados em cache na tabela temporária, você usa dfTempTable
(o objeto HasCachedResult
retornado por cacheResult
).
Como trabalhar com arquivos em um estágio¶
A biblioteca do Snowpark fornece classes e métodos que você pode usar para carregar dados no Snowflake e descarregar dados do Snowflake usando arquivos em estágios.
Nota
Para utilizar essas classes e métodos em um estágio, você deve ter os privilégios necessários para trabalhar com o estágio.
As próximas seções explicam como utilizar essas classes e métodos:
Carregamento e descarregamento de arquivos em um estágio¶
Para carregar e baixar arquivos em um estágio, use os métodos put
e get
do objeto FileOperation:
Carregamento de arquivos para um estágio¶
Para carregar arquivos para um estágio:
Verifique se você tem os privilégios de carregar arquivos para o estágio.
Use o método file do objeto
Session
para acessar o objeto FileOperation para a sessão.Chame o método put do objeto
FileOperation
para carregar os arquivos para um estágio.Esse método executa um comando SQL PUT.
Para especificar quaisquer parâmetros opcionais para o comando PUT, crie um
Map
dos parâmetros e valores e passe oMap
como o argumentooptions
. Por exemplo:import java.util.HashMap; import java.util.Map; ... // Upload a file to a stage without compressing the file. Map<String, String> putOptions = new HashMap<>(); putOptions.put("AUTO_COMPRESS", "FALSE"); PutResult[] putResults = session.file().put("file:///tmp/myfile.csv", "@myStage", putOptions);
No argumento
localFileName
, você pode usar curingas (*
e?
) para identificar um conjunto de arquivos a ser carregado. Por exemplo:// Upload the CSV files in /tmp with names that start with "file". // You can use the wildcard characters "*" and "?" to match multiple files. PutResult[] putResults = session.file().put("file:///tmp/file*.csv", "@myStage/prefix2")
Verifique o
Array
de objetos PutResult retornados pelo métodoput
para determinar se os arquivos foram carregados com sucesso. Por exemplo, para imprimir o nome do arquivo e o status da operação PUT para esse arquivo:// Print the filename and the status of the PUT operation. for (PutResult result : putResults) { System.out.println(result.getSourceFileName() + ": " + result.getStatus()); }
Como baixar arquivos de um estágio¶
Para baixar arquivos de uma estágio:
Verifique se você tem os privilégios de carregar arquivos para o estágio.
Use o método file do objeto
Session
para acessar o objeto FileOperation para a sessão.Chame o método get do objeto
FileOperation
para baixar os arquivos de uma estágio.Esse método executa um comando SQL GET.
Para especificar quaisquer parâmetros opcionais para o comando GET, crie um
Map
dos parâmetros e valores e passe oMap
como o argumentooptions
. Por exemplo:import java.util.HashMap; import java.util.Map; ... // Upload a file to a stage without compressing the file. // Download files with names that match a regular expression pattern. Map<String, String> getOptions = new HashMap<>(); getOptions.put("PATTERN", "'.*file_.*.csv.gz'"); GetResult[] getResults = session.file().get("@myStage", "file:///tmp", getOptions);
Verifique o
Array
de objetos GetResult retornados pelo métodoget
para determinar se os arquivos foram baixados com sucesso. Por exemplo, para imprimir o nome do arquivo e o status da operação GET para esse arquivo:// Print the filename and the status of the GET operation. for (GetResult result : getResults) { System.out.println(result.getFileName() + ": " + result.getStatus()); }
Como usar fluxos de entrada para carregar e descarregar dados em um estágio¶
Nota
Esse recurso foi introduzido no Snowpark 1.4.0.
Para usar fluxos de entrada para carregar dados em um arquivo em um estágio e baixar dados de um arquivo em um estágio, use os métodos uploadStream
e downloadStream
do objeto FileOperation:
Como usar um fluxo de entrada para carregar dados em um arquivo em um estágio
Como usar um fluxo de entrada para baixar dados de um arquivo em um estágio
Como usar um fluxo de entrada para carregar dados em um arquivo em um estágio¶
Para carregar os dados de um objeto java.io.InputStream em um arquivo em um estágio:
Verifique se você tem os privilégios de carregar arquivos para o estágio.
Use o método file do objeto
Session
para acessar o objeto FileOperation para a sessão.Chame o método uploadStream do objeto
FileOperation
.Passe o caminho completo para o arquivo no estágio onde os dados devem ser escritos e o objeto
InputStream
. Além disso, use o argumentocompress
para especificar se os dados devem ou não ser compactados antes de serem carregados.
Por exemplo:
import java.io.InputStream;
...
boolean compressData = true;
String pathToFileOnStage = "@myStage/path/file";
session.file().uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData);
Como usar um fluxo de entrada para baixar dados de um arquivo em um estágio¶
Para baixar dados de um arquivo em um estágio para um objeto java.io.io.InputStream:
Verifique se você tem os privilégios de carregar arquivos para o estágio.
Use o método file do objeto
Session
para acessar o objeto FileOperation para a sessão.Chame o método downloadStream do objeto
FileOperation
.Passe o caminho completo para o arquivo no estágio contendo os dados a serem baixados. Use o argumento
decompress
para especificar se os dados no arquivo estão ou não compactados.
Por exemplo:
import java.io.InputStream;
...
boolean isDataCompressed = true;
String pathToFileOnStage = "@myStage/path/file";
InputStream is = session.file().downloadStream(pathToFileOnStage, isDataCompressed);
Como configurar um DataFrame para arquivos em um estágio¶
Esta seção explica como configurar um DataFrame para arquivos em um estágio do Snowflake. Após criar esse DataFrame, você pode usar o DataFrame para:
Para configurar um DataFrame para arquivos em um estágio do Snowflake, use a classe DataFrameReader
:
Verifique se você tem os seguintes privilégios:
Um dos seguintes:
privilégios CREATE TABLE no esquema, se você planeja especificar opções de cópia que determinam como os dados são copiados dos arquivos preparados.
Em outros casos, privilégios CREATE FILE FORMAT sobre o esquema.
Chame o método
read
na classeSession
para acessar um objetoDataFrameReader
.Se os arquivos estiverem no formato CSV, descreva os campos no arquivo. Para fazer isso:
Crie um objeto StructType que consiste em uma array de objetos StructField que descrevem os campos no arquivo.
Para cada objeto
StructField
, especifique o seguinte:O nome do campo.
O tipo de dados do campo (especificado como um objeto no pacote
com.snowflake.snowpark_java.types
).Se o campo é ou não anulável.
Por exemplo:
import com.snowflake.snowpark_java.types.*; ... StructType schemaForDataFile = StructType.create( new StructField("id", DataTypes.StringType, true), new StructField("name", DataTypes.StringType, true));
Chame o método
schema
no objetoDataFrameReader
, passando o objetoStructType
.Por exemplo:
DataFrameReader dfReader = session.read().schema(schemaForDataFile);
O método
schema
retorna um objetoDataFrameReader
que é configurado para ler arquivos contendo os campos especificados.Note que você não precisa fazer isso para arquivos em outros formatos (como JSON). Para esses arquivos, o
DataFrameReader
trata os dados como um único campo do tipo VARIANT com o nome de campo$1
.
Se você precisar especificar informações adicionais sobre como os dados devem ser lidos (por exemplo, que os dados estão comprimidos ou que um arquivo CSV usa um ponto-e-vírgula em vez de uma vírgula para delimitar campos), chame o método DataFrameReader.option ou o método DataFrameReader.options.
Passe o nome e valor da opção que você deseja definir. Você pode definir os seguintes tipos de opções:
As opções de formato de arquivo descritas na documentação em CREATE FILE FORMAT.
As opções de cópia descritas na documentação COPY INTO TABLE.
Observe que a definição de opções de cópia pode resultar em uma estratégia de execução mais cara quando você obtém os dados para o DataFrame.
O exemplo a seguir configura o objeto
DataFrameReader
para consultar dados em um arquivo CSV que não é compactado e que usa um ponto-e-vírgula como delimitador de campo.dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE");
O método
option
retorna um objetoDataFrameReader
que é configurado com as opções especificadas.Para definir várias opções, você pode encadear chamadas ao método
option
(como mostrado no exemplo acima) ou chamar o método DataFrameReader.options, passando umMap
dos nomes e valores das opções.Chame o método correspondente ao formato dos arquivos. Você pode chamar um dos seguintes métodos:
Ao chamar esses métodos, passe o local do estágio dos arquivos a serem lidos. Por exemplo:
DataFrame df = dfReader.csv("@mystage/myfile.csv");
Para especificar vários arquivos que começam com o mesmo prefixo, especifique o prefixo após o nome do estágio. Por exemplo, para carregar arquivos que tenham o prefixo
csv_
do estágio@mystage
:DataFrame df = dfReader.csv("@mystage/csv_");
Os métodos correspondentes ao formato de um arquivo retornam um objeto CopyableDataFrame para esse arquivo.
CopyableDataFrame
estendeDataFrame
e fornece métodos adicionais para trabalhar os dados em arquivos preparados.Chame um método de ação para:
Como no caso de DataFrames para tabelas, os dados não são obtidos no DataFrame até você chamar um método de ação.
Carregamento de dados de arquivos em um DataFrame¶
Depois que você configurar um DataFrame para arquivos em um estágio, você pode carregar dados dos arquivos para o DataFrame:
Use os métodos do objeto DataFrame para realizar quaisquer transformações necessárias no conjunto de dados (por exemplo, selecionar campos específicos, filtrar linhas, etc.).
Por exemplo, para extrair o elemento
color
de um arquivo JSON chamadodata.json
no estágio chamadomystage
:DataFrame df = session.read().json("@mystage/data.json").select(Functions.col("$1").subField("color"));
Como explicado anteriormente, para arquivos em formatos diferentes de CSV (por exemplo JSON), o
DataFrameReader
trata os dados no arquivo como uma única coluna VARIANT com o nome$1
.Chame o método
DataFrame.collect
para carregar os dados. Por exemplo:Row[] results = df.collect();
Como copiar dados de arquivos para uma tabela¶
Depois de configurar um DataFrame para arquivos em um estágio, você pode chamar o método copyInto para copiar os dados em uma tabela. Esse método executa o comando COPY INTO <tabela>.
Nota
Você não precisa chamar o método collect
antes de chamar copyInto
. Os dados dos arquivos não precisam estar no DataFrame antes de você chamar copyInto
.
Por exemplo, o seguinte código carrega os dados do arquivo CSV especificado por myFileStage
para a tabela mytable
. Como os dados estão em um arquivo CSV, o código também deve descrever os campos no arquivo. O exemplo faz isso chamando o método schema do objeto DataFrameReader
e passando um objeto StructType (schemaForDataFile
) contendo uma array de objetos StructField que descrevem os campos.
CopyableDataFrame copyableDf = session.read().schema(schemaForDataFile).csv("@mystage/myfile.csv");
copyableDf.copyInto("mytable");
Como salvar um DataFrame para arquivos em um estágio¶
Nota
Esse recurso foi introduzido no Snowpark 1.5.0.
Se você precisar salvar um DataFrame para arquivos em um estágio, você pode chamar o método DataFrameWriter correspondente ao formato do arquivo (por exemplo, o método csv
para escrever em um arquivo CSV), passando o local do estágio onde os arquivos devem ser salvos. Esses métodos DataFrameWriter
executam o comando COPY INTO <local>.
Nota
Você não precisa chamar o método collect
antes de chamar esses métodos DataFrameWriter
. Os dados do arquivo não precisam estar no arquivo DataFrame antes de você chamar esses métodos.
Para salvar o conteúdo de um DataFrame em arquivos em um estágio:
Chame o método write do objeto DataFrame para obter um objeto DataFrameWriter. Por exemplo, para obter o objeto
DataFrameWriter
para um DataFrame que representa a tabela chamadasample_product_data
:DataFrameWriter dfWriter = session.table("sample_product_data").write();
Se você quiser substituir o conteúdo do arquivo (se o arquivo existir), chame o método mode do objeto
DataFrameWriter
, passandoSaveMode.Overwrite
.Caso contrário, por padrão, o
DataFrameWriter
reporta um erro se o arquivo especificado no estágio já existir.O método
mode
retorna o mesmo objetoDataFrameWriter
configurado com o modo especificado.Por exemplo, para especificar que o
DataFrameWriter
deve substituir o arquivo no estágio:dfWriter = dfWriter.mode(SaveMode.Overwrite);
Se você precisar especificar informações adicionais sobre como os dados devem ser salvos (por exemplo, que os dados devem ser compactados ou que você deseja usar um ponto-e-vírgula para delimitar campos em um arquivo CSV), chame o método DataFrameWriter.option ou o método DataFrameWriter.options.
Passe o nome e valor da opção que você deseja definir. Você pode definir os seguintes tipos de opções:
As opções de formato de arquivo descritas na documentação em COPY INTO <location>.
As opções de cópia descritas na documentação em COPY INTO <location>.
Observe que você não pode usar o método
option
para definir as seguintes opções:A opção do tipo de formato TYPE.
A opção de cópia OVERWRITE. Em vez disso, para definir esta opção, chame o método
mode
(como mencionado na etapa anterior).
O exemplo a seguir configura o objeto
DataFrameWriter
para salvar dados em um arquivo CSV sem compressão, usando um ponto-e-vírgula (no lugar de uma vírgula) como delimitador de campo.dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE");
O método
option
retorna um objetoDataFrameWriter
que é configurado com a opção especificada.Para definir várias opções, você pode encadear chamadas ao método
option
(como mostrado no exemplo acima) ou chamar o método DataFrameWriter.options, passando umMap
dos nomes e valores das opções.Para retornar detalhes sobre cada arquivo que foi salvo, defina a
DETAILED_OUTPUT
opção de cópia comoTRUE
.Por padrão,
DETAILED_OUTPUT
éFALSE
, o que significa que o método retorna uma única linha de saída contendo os campos"rows_unloaded"
,"input_bytes"
e"output_bytes"
.Quando você define
DETAILED_OUTPUT
comoTRUE
, o método retorna uma linha de saída para cada arquivo salvo. Cada linha contém os camposFILE_NAME
,FILE_SIZE
eROW_COUNT
.Chame o método correspondente ao formato do arquivo para salvar os dados no arquivo. Você pode chamar um dos seguintes métodos:
Ao chamar esses métodos, passe o local do estágio do arquivo onde os dados devem ser escritos (por exemplo,
@mystage
).Por padrão, o método salva os dados em nomes de arquivos com o prefixo
data_
(por exemplo,@mystage/data_0_0_0.csv
). Se você quiser que os arquivos sejam nomeados com um prefixo diferente, especifique o prefixo após o nome do estágio. Por exemplo:WriteFileResult writeFileResult = dfWriter.csv("@mystage/saved_data");
Esse exemplo salva o conteúdo do DataFrame em arquivos que começam com o prefixo
saved_data
(por exemplo,@mystage/saved_data_0_0_0.csv
).Verifique o objeto WriteFileResult retornado para obter informações sobre a quantidade de dados escritos no arquivo.
A partir do objeto
WriteFileResult
, você pode acessar a saída produzida pelo comando COPY INTO <location>:Para acessar as linhas de saída como uma array de objetos Row, chame o método
getRows
.Para determinar quais campos estão presentes nas linhas, chame o método
getSchema
, que retorna um StructType que descreve os campos na linha.
Por exemplo, para imprimir os nomes dos campos e valores nas linhas de saída:
WriteFileResult writeFileResult = dfWriter.csv("@mystage/saved_data"); Row[] rows = writeFileResult.getRows(); StructType schema = writeFileResult.getSchema(); for (int i = 0 ; i < rows.length ; i++) { System.out.println("Row:" + i); Row row = rows[i]; for (int j = 0; j < schema.size(); j++) { System.out.println(schema.get(j).name() + ": " + row.get(j)); } }
O exemplo a seguir usa um DataFrame para salvar o conteúdo da tabela chamada car_sales
para arquivos JSON com o prefixo saved_data
no estágio @mystage
(por exemplo, @mystage/saved_data_0_0_0.json
). O código de exemplo:
Substitui o arquivo se ele já existir no estágio.
Retorna resultados detalhados sobre a operação de salvar.
Salva os dados sem compressão.
Finalmente, o código de exemplo imprime cada campo e valor nas linhas de saída retornadas:
DataFrame df = session.table("car_sales");
WriteFileResult writeFileResult = df.write().mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data");
Row[] rows = writeFileResult.getRows();
StructType schema = writeFileResult.getSchema();
for (int i = 0 ; i < rows.length ; i++) {
System.out.println("Row:" + i);
Row row = rows[i];
for (int j = 0; j < schema.size(); j++) {
System.out.println(schema.get(j).name() + ": " + row.get(j));
}
}
Como trabalhar com dados semiestruturados¶
Usando um DataFrame, você pode consultar e acessar dados semiestruturados (por exemplo, dados de JSON). As próximas seções explicam como trabalhar com dados semiestruturados em um DataFrame.
Nota
Os exemplos nestas seções usam os dados de exemplo em Amostra de dados usados em exemplos.
Como percorrer dados semiestruturados¶
Para se referir a um campo ou elemento específico em dados semiestruturados, use os seguintes métodos do objeto Column:
Use subField(«<field_name>») para retornar um objeto
Column
para um campo em um OBJECT (ou uma VARIANT que contenha um OBJECT).Use subField(<index>) para retornar um objeto
Column
para um elemento em uma ARRAY (ou uma VARIANT que contenha uma ARRAY).
Nota
Se o nome do campo ou elementos no caminho forem irregulares e dificultarem o uso dos métodos Column.apply
, você pode usar Functions.get, Functions.get_ignore_case ou Functions.get_path como alternativa.
Por exemplo, o seguinte código seleciona o campo dealership
nos objetos na coluna src
dos dados de exemplo:
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("dealership")).show();
O código imprime a seguinte saída:
----------------------------
|"""SRC""['DEALERSHIP']" |
----------------------------
|"Valley View Auto Sales" |
|"Tindel Toyota" |
----------------------------
Nota
Os valores no DataFrame estão entre aspas duplas porque eles são retornados como literais de cadeias de caracteres. Para converter esses valores para um tipo específico, consulte Conversão explícita de valores em dados semiestruturados.
Você também pode encadear chamadas a métodos para percorrer um caminho para um campo ou elemento específico.
Por exemplo, o código a seguir seleciona o campo name
no objeto salesperson
:
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("salesperson").subField("name")).show();
O código imprime a seguinte saída:
------------------------------------
|"""SRC""['SALESPERSON']['NAME']" |
------------------------------------
|"Frank Beasley" |
|"Greg Northrup" |
------------------------------------
Como outro exemplo, o código a seguir seleciona o primeiro elemento do campo vehicle
, que contém uma array de veículos. O exemplo também seleciona o campo price
do primeiro elemento.
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("vehicle").subField(0)).show();
df.select(Functions.col("src").subField("vehicle").subField(0).subField("price")).show();
O código imprime a seguinte saída:
---------------------------
|"""SRC""['VEHICLE'][0]" |
---------------------------
|{ |
| "extras": [ |
| "ext warranty", |
| "paint protection" |
| ], |
| "make": "Honda", |
| "model": "Civic", |
| "price": "20275", |
| "year": "2017" |
|} |
|{ |
| "extras": [ |
| "ext warranty", |
| "rust proofing", |
| "fabric protection" |
| ], |
| "make": "Toyota", |
| "model": "Camry", |
| "price": "23500", |
| "year": "2017" |
|} |
---------------------------
------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']" |
------------------------------------
|"20275" |
|"23500" |
------------------------------------
Como alternativa ao método apply
, você pode usar as funções Functions.get, Functions.get_ignore_case ou Functions.get_path se o nome do campo ou elementos no caminho forem irregulares e dificultarem o uso dos métodos Column.subField
.
Por exemplo, as duas linhas de código a seguir imprimem o valor de um campo especificado em um objeto:
df.select(Functions.get(Functions.col("src"), Functions.lit("dealership"))).show();
df.select(Functions.col("src").subField("dealership")).show();
Da mesma forma, as seguintes linhas de código imprimem o valor de um campo em um caminho especificado em um objeto:
df.select(Functions.get_path(Functions.col("src"), Functions.lit("vehicle[0].make"))).show();
df.select(Functions.col("src").subField("vehicle").subField(0).subField("make")).show();
Conversão explícita de valores em dados semiestruturados¶
Por padrão, os valores de campos e elementos são retornados como literais de cadeias de caracteres (incluindo as aspas duplas), como mostrado nos exemplos acima.
Para evitar resultados inesperados, chame o método de converter para converter o valor para um tipo específico. Por exemplo, o seguinte código imprime os valores sem e com conversão:
// Import the objects for the data types, including StringType.
import com.snowflake.snowpark_java.types.*;
...
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("salesperson").subField("id")).show();
df.select(Functions.col("src").subField("salesperson").subField("id").cast(DataTypes.StringType)).show();
O código imprime a seguinte saída:
----------------------------------
|"""SRC""['SALESPERSON']['ID']" |
----------------------------------
|"55" |
|"274" |
----------------------------------
---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)" |
---------------------------------------------------
|55 |
|274 |
---------------------------------------------------
Como achatar uma array de objetos em linhas¶
Se você precisar “achatar” dados semiestruturados em um DataFrame (por exemplo, produzir uma linha para cada objeto de uma array), chame o método flatten. Esse método é equivalente à função FLATTEN do SQL. Se você passar um caminho para um objeto ou array, o método retorna um DataFrame contendo uma linha para cada campo ou elemento no objeto ou array.
Por exemplo, nos dados de exemplo, src:customer
é um conjunto de objetos que contém informações sobre um cliente. Cada objeto contém um campo name
e address
.
Se você passar esse caminho para a função flatten
:
DataFrame df = session.table("car_sales");
df.flatten(Functions.col("src").subField("customer")).show();
o método retornará um DataFrame:
----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC" |"SEQ" |"KEY" |"PATH" |"INDEX" |"VALUE" |"THIS" |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{ |1 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "San Francisco, CA", | { |
| { | | | | | "name": "Joyce Ridgely", | "address": "San Francisco, CA", |
| "address": "San Francisco, CA", | | | | | "phone": "16504378889" | "name": "Joyce Ridgely", |
| "name": "Joyce Ridgely", | | | | |} | "phone": "16504378889" |
| "phone": "16504378889" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Valley View Auto Sales", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "55", | | | | | | |
| "name": "Frank Beasley" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "paint protection" | | | | | | |
| ], | | | | | | |
| "make": "Honda", | | | | | | |
| "model": "Civic", | | | | | | |
| "price": "20275", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
|{ |2 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "New York, NY", | { |
| { | | | | | "name": "Bradley Greenbloom", | "address": "New York, NY", |
| "address": "New York, NY", | | | | | "phone": "12127593751" | "name": "Bradley Greenbloom", |
| "name": "Bradley Greenbloom", | | | | |} | "phone": "12127593751" |
| "phone": "12127593751" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Tindel Toyota", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "274", | | | | | | |
| "name": "Greg Northrup" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "rust proofing", | | | | | | |
| "fabric protection" | | | | | | |
| ], | | | | | | |
| "make": "Toyota", | | | | | | |
| "model": "Camry", | | | | | | |
| "price": "23500", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
----------------------------------------------------------------------------------------------------------------------------------------------------------
A partir desse DataFrame, você poderá selecionar os campos name
e address
de cada objeto no campo VALUE
:
df.flatten(Functions.col("src").subField("customer")).select(Functions.col("value").subField("name"), Functions.col("value").subField("address")).show();
-------------------------------------------------
|"""VALUE""['NAME']" |"""VALUE""['ADDRESS']" |
-------------------------------------------------
|"Joyce Ridgely" |"San Francisco, CA" |
|"Bradley Greenbloom" |"New York, NY" |
-------------------------------------------------
O código a seguir complementa o exemplo anterior ao converter os valores para um tipo específico e mudar os nomes das colunas:
df.flatten(Functions.col("src").subField("customer")).select(Functions.col("value").subField("name").cast(DataTypes.StringType).as("Customer Name"), Functions.col("value").subField("address").cast(DataTypes.StringType).as("Customer Address")).show();
-------------------------------------------
|"Customer Name" |"Customer Address" |
-------------------------------------------
|Joyce Ridgely |San Francisco, CA |
|Bradley Greenbloom |New York, NY |
-------------------------------------------
Como executar instruções SQL¶
Para executar uma instrução SQL que você especificar, chame o método sql
na classe Session
e passe a instrução a ser executada. O método retorna um DataFrame.
Note que a instrução SQL não será executada até que você chame um método de ação.
import java.util.Arrays;
// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
DataFrame dfStageFiles = session.sql("ls @myStage");
Row[] files = dfStageFiles.collect();
System.out.println(Arrays.toString(files));
// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect();
DataFrame tableDf = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
// Get the count of rows from the table.
long numRows = tableDf.count();
System.out.println("Count: " + numRows);
Se você quiser chamar métodos para transformar o DataFrame (por exemplo, filtro, seleção, etc.), observe que estes métodos funcionam somente se a instrução SQL subjacente for uma instrução SELECT. Os métodos de transformação não são suportados para outros tipos de instruções SQL.
import java.util.Arrays;
DataFrame df = session.sql("select id, category_id, name from sample_product_data where id > 10");
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
Row[] results = df.filter(Functions.col("category_id").lt(Functions.lit(10))).select(Functions.col("id")).collect();
System.out.println(Arrays.toString(results));
// In this example, the underlying SQL statement is not a SELECT statement.
DataFrame dfStageFiles = session.sql("ls @myStage");
// Calling the filter method results in an error.
dfStageFiles.filter(...);