Como trabalhar com DataFrames no Snowpark Scala¶
No Snowpark, a principal forma de consulta e processamento de dados é por meio de um DataFrame. Este tópico explica como trabalhar com DataFrames.
Neste tópico:
Para recuperar e manipular dados, você usa a classe DataFrame. Um DataFrame representa um conjunto de dados relacional que é avaliado lentamente: ele só é executado quando uma ação específica é acionada. Em certo sentido, um DataFrame é como uma consulta que precisa ser avaliada para recuperar dados.
Para obter dados em um DataFrame:
Construa um DataFrame especificando a fonte dos dados para o conjunto de dados.
Por exemplo, você pode criar um DataFrame para guardar dados de uma tabela, um arquivo CSV externo ou a execução de uma instrução SQL.
Especifique como o conjunto de dados no DataFrame deve ser transformado.
Por exemplo, você pode especificar quais colunas devem ser selecionadas, como as linhas devem ser filtradas, como os resultados devem ser ordenados e agrupados, etc.
Execute a instrução para obter os dados para o DataFrame.
Para obter os dados no DataFrame, você deve invocar um método que execute uma ação (por exemplo, o método
collect()
).
As próximas seções explicam essas etapas com mais detalhes.
Como estabelecer os exemplos para esta seção¶
Alguns dos exemplos desta seção usam um DataFrame para consultar uma tabela chamada sample_product_data
. Se você deseja executar esses exemplos, pode criar esta tabela e preenchê-la com alguns dados executando as seguintes instruções SQL:
CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
INSERT INTO sample_product_data VALUES
(1, 0, 5, 'Product 1', 'prod-1', 1, 10),
(2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
(3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
(4, 0, 10, 'Product 2', 'prod-2', 2, 40),
(5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
(6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
(7, 0, 20, 'Product 3', 'prod-3', 3, 70),
(8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
(9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
(10, 0, 50, 'Product 4', 'prod-4', 4, 100),
(11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
(12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);
Para verificar se a tabela foi criada, execute:
SELECT * FROM sample_product_data;
Como criar um DataFrame¶
Para criar um DataFrame, você pode usar métodos na classe Session
. Cada um dos seguintes métodos cria um DataFrame de um tipo diferente de fonte de dados:
Para criar um DataFrame a partir de dados em uma tabela, exibição ou fluxo, chame o método
table
:// Create a DataFrame from the data in the "sample_product_data" table. val dfTable = session.table("sample_product_data") // To print out the first 10 rows, call: // dfTable.show()
Nota
O método
session.table
retorna um objetoUpdatable
.Updatable
estendeDataFrame
e fornece métodos adicionais para trabalhar com dados na tabela (por exemplo, métodos para atualização e exclusão de dados). Consulte Atualização, eliminação e fusão de linhas em uma tabela.Para criar um DataFrame a partir de uma sequência de valores, chame o método
createDataFrame
:// Create a DataFrame containing a sequence of values. // In the DataFrame, name the columns "i" and "s". val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
Nota
As palavras reservadas pelo Snowflake não são válidas como nomes de coluna ao criar um DataFrame. Para obter uma lista de palavras reservadas, consulte Palavras-chave reservadas e limitadas.
Para criar um DataFrame contendo um intervalo de valores, chame o método
range
:// Create a DataFrame from a range val dfRange = session.range(1, 10, 2)
Para criar um DataFrame para um arquivo em um estágio, chame
read
para obter um objetoDataFrameReader
. No objetoDataFrameReader
, chame o método correspondente ao formato dos dados no arquivo:// Create a DataFrame from data in a stage. val dfJson = session.read.json("@mystage2/data1.json")
Para criar um DataFrame para armazenar os resultados de uma consulta SQL, chame o método
sql
:// Create a DataFrame from a SQL query val dfSql = session.sql("SELECT name from products")
Nota: embora você possa usar este método para executar instruções SELECT que recuperam dados de tabelas e arquivos preparados, você deve usar os métodos
table
eread
em vez disso. Métodos comotable
eread
pode fornecer melhor realce de sintaxe, realce de erros e conclusão de código inteligente em ferramentas de desenvolvimento.
Especificação de como o conjunto de dados deve ser transformado¶
Para especificar quais colunas devem ser selecionadas e como os resultados devem ser filtrados, ordenados, agrupados, etc., chame os métodos de DataFrame que transformam o conjunto de dados. Para identificar colunas nestes métodos, use a função col
ou uma expressão que seja avaliada como coluna. (Consulte Como especificar colunas e expressões.)
Por exemplo:
Para especificar quais linhas devem ser retornadas, chame o método
filter
:// Import the col function from the functions object. import com.snowflake.snowpark.functions._ // Create a DataFrame for the rows with the ID 1 // in the "sample_product_data" table. // // This example uses the === operator of the Column object to perform an // equality check. val df = session.table("sample_product_data").filter(col("id") === 1) df.show()
Para especificar as colunas que devem ser selecionadas, chame o método
select
:// Import the col function from the functions object. import com.snowflake.snowpark.functions._ // Create a DataFrame that contains the id, name, and serial_number // columns in te "sample_product_data" table. val df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number")) df.show()
Cada método retorna um novo objeto DataFrame que foi transformado. (O método não afeta o objeto original DataFrame.) Isso significa que se você deseja aplicar várias transformações, poderá chamadas de método em cadeia, chamando cada método de transformação subsequente no novo objeto DataFrame retornado pela chamada de método anterior.
Observe que esses métodos de transformação não recuperam dados do banco de dados Snowflake. (Os métodos de ação descritos em Como executar uma ação para avaliar um DataFrame fazem a recuperação de dados.) Os métodos de transformação simplesmente especificam como a instrução SQL deve ser construída.
Como especificar colunas e expressões¶
Ao chamar esses métodos de transformação, pode ser necessário especificar colunas ou expressões que usam colunas. Por exemplo, ao chamar o método select
, você precisa especificar as colunas a serem selecionadas.
Para se referir a uma coluna, crie um objeto Coluna chamando a função col no objeto com.snowflake.snowpark.functions
.
// Import the col function from the functions object.
import com.snowflake.snowpark.functions._
val dfProductInfo = session.table("sample_product_data").select(col("id"), col("name"))
dfProductInfo.show()
Nota
Para criar um objeto Column
para um literal, consulte Como usar literais como objetos de coluna.
Ao especificar um filtro, projeção, condição de junção etc., você pode usar objetos Column
em uma expressão. Por exemplo:
Você pode usar objetos
Column
com o métodofilter
para especificar uma condição de filtro:// Specify the equivalent of "WHERE id = 20" // in an SQL SELECT statement. df.filter(col("id") === 20)
// Specify the equivalent of "WHERE a + b < 10" // in an SQL SELECT statement. df.filter((col("a") + col("b")) < 10)
Você pode usar objetos
Column
com o métodoselect
para definir um alias:// Specify the equivalent of "SELECT b * 10 AS c" // in an SQL SELECT statement. df.select((col("b") * 10) as "c")
Você pode usar objetos
Column
com o métodojoin
para definir uma condição de junção:// Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y" // in an SQL SELECT statement. dfX.join(dfY, col("a_in_X") === col("b_in_Y"))
Como se referir a colunas em diferentes DataFrames¶
Ao se referir a colunas em dois objetos DataFrame diferentes que têm o mesmo nome (por exemplo, ao unir os DataFrames naquela coluna), você pode usar o método DataFrame.col
em um objeto DataFrame para se referir a uma coluna naquele objeto (por exemplo, df1.col("name")
e df2.col("name")
).
O exemplo a seguir demonstra como usar o DataFrame.col
método para se referir a uma coluna em um determinado DataFrame. O exemplo une dois objetos DataFrame que têm uma coluna chamada key
. O exemplo usa o métodos Column.as
para alterar os nomes das colunas no DataFrame recém-criado.
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))
Uso do método apply
para se referir a uma coluna¶
Como alternativa ao método DataFrame.col
, você pode usar o método DataFrame.apply
para se referir a uma coluna em um determinado DataFrame. Como o método DataFrame.col
, o método DataFrame.apply
aceita um nome de coluna como entrada e retorna um objeto Column
.
Observe que quando um objeto tem um método apply
no Scala, você pode chamar o método apply
chamando o objeto como se fosse uma função. Por exemplo, para chamar df.apply("column_name")
, você pode simplesmente escrever df("column_name")
. As seguintes chamadas são equivalentes:
df.col("<nome_da_coluna>")
df.apply("<nome_da_coluna>")
df("<nome_da_coluna>")
O exemplo seguinte é o mesmo que o exemplo anterior, mas usa o método DataFrame.apply
para se referir às colunas em uma operação de junção:
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))
Usando abreviações para um objeto de coluna¶
Como alternativa ao uso da função col
, você pode se referir a uma coluna de uma destas maneiras:
Usando um cifrão em frente ao nome da coluna cotada (
$"column_name"
).Usando um apóstrofo (aspas simples) em frente ao nome da coluna não citada (
'column_name
).
Para fazer isso, importe os nomes do objeto implicits
depois de criar um objeto Session
:
val session = Session.builder.configFile("/path/to/properties").create
// Import this after you create the session.
import session.implicits._
// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)
// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)
Como utilizar aspas duplas em torno de identificadores de objetos (nomes de tabela, nomes de coluna, etc.)¶
Os nomes de bancos de dados, esquemas, tabelas e estágios especificados por você devem estar de acordo com os requisitos de identificadores do Snowflake. Quando você especifica um nome, o Snowflake considera que o nome está em letras maiúsculas. Por exemplo, as seguintes chamadas são equivalentes:
// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))
Se o nome não estiver em conformidade com os requisitos do identificador, você deve usar aspas duplas ("
) em volta do nome. Use uma barra invertida (\
) para escapar do caractere de aspas duplas em uma cadeia de caracteres literal do Scala. Por exemplo, o nome da tabela a seguir não começa com uma letra ou sublinhado, então você deve usar aspas duplas ao redor do nome:
val df = session.table("\"10tablename\"")
Observe que ao especificar o nome de uma coluna, você não precisa usar aspas duplas no nome. A biblioteca Snowpark coloca automaticamente o nome da coluna entre aspas duplas para você se o nome não estiver em conformidade com os requisitos do identificador:.
// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\""))
// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))
Se você já tiver acrescentado aspas duplas em torno de um nome de coluna, a biblioteca não inserirá aspas duplas adicionais em torno do nome.
Em alguns casos, o nome da coluna pode conter caracteres de aspas duplas:
describe table quoted;
+------------------------+ ...
| name | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted" | ...
+------------------------+ ...
Como explicado em Requisitos para identificadores, para cada caractere de aspas duplas dentro de um identificador de aspas duplas, deve-se usar dois caracteres de aspas duplas (por exemplo, "name_with_""air""_quotes"
e """column_name_quoted"""
):
val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()
Tenha em mente que quando um identificador é incluído entre aspas duplas (se você adicionou explicitamente as aspas ou se a biblioteca adicionou as aspas para você), o Snowflake trata o identificador diferenciando maiúsculas de minúsculas:
// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))
Como usar literais como objetos de coluna¶
Para usar um literal em um método que passa em um objeto Column
, crie um objeto Column
para o literal passando o literal para a função lit
no objeto com.snowflake.snowpark.functions
. Por exemplo:
// Import for the lit and col functions.
import com.snowflake.snowpark.functions._
// Show the first 10 rows in which num_items is greater than 5.
// Use `lit(5)` to create a Column object for the literal 5.
df.filter(col("num_items").gt(lit(5))).show()
Se o literal é um valor de ponto flutuante ou duplo no Scala (por exemplo, 0.05
é tratado como Double por padrão), a biblioteca do Snowpark gera SQL que implicitamente converte o valor para o tipo de dados correspondente do Snowpark (por exemplo, 0.05::DOUBLE
). Isso pode produzir um valor aproximado que difere do número exato especificado.
Por exemplo, o código a seguir não exibe linhas correspondentes, mesmo que o filtro (que corresponde a valores maiores ou iguais a 0.05
) deva corresponder às linhas do DataFrame:
// Create a DataFrame that contains the value 0.05.
val df = session.sql("select 0.05 :: Numeric(5, 2) as a")
// Applying this filter results in no matching rows in the DataFrame.
df.filter(col("a") <= lit(0.06) - lit(0.01)).show()
O problema é que lit(0.06)
e lit(0.01)
produzem valores aproximados para 0.06
e 0.01
, não os valores exatos.
Para evitar esse problema, você pode usar uma das seguintes abordagens:
Opção 1: converter o literal para o tipo Snowpark que você deseja usar. Por exemplo, para usar um NUMBER com uma precisão de 5 e uma escala de 2:
df.filter(col("a") <= lit(0.06).cast(new DecimalType(5, 2)) - lit(0.01).cast(new DecimalType(5, 2))).show()
Opção 2: converter o valor para o tipo que você deseja usar antes de passar o valor para a função
lit
. Por exemplo, se você quiser usar o BigDecimal tipo:df.filter(col("a") <= lit(BigDecimal(0.06)) - lit(BigDecimal(0.01))).show()
Como converter um objeto de coluna para um tipo específico¶
Para converter um objeto Column
em um tipo específico, chame o método Column.cast e passe em um objeto de tipo do pacote com.snowflake.snowpark.types. Por exemplo, para converter um literal como um NUMBER com uma precisão de 5 e uma escala de 2:
// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._
val decimalValue = lit(0.05).cast(new DecimalType(5,2))
Como encadear chamadas a métodos¶
Como cada método que transforma um objeto DataFrame returna um novo objeto DataFrame que tem a transformação aplicada, você pode encadear chamadas a métodos para produzir um novo DataFrame que é transformado de maneiras adicionais.
O exemplo a seguir retorna um DataFrame que está configurado para:
Consultar a tabela
sample_product_data
.Retornar a linha com
id = 1
.Selecionar as colunas
name
eserial_number
.
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
Neste exemplo:
session.table("sample_product_data")
retorna um DataFrame para a tabelasample_product_data
.Embora o DataFrame ainda não contenha os dados da tabela, o objeto contém as definições das colunas da tabela.
filter(col("id") === 1)
retorna um DataFrame para a tabelasample_product_data
que está preparada para retornar a linha comid = 1
.Observe novamente que o DataFrame ainda não contém a linha correspondente da tabela. A linha correspondente não é recuperada até que você chame um método de ação.
select(col("name"), col("serial_number"))
retorna um DataFrame que contém as colunasname
eserial_number
para a linha da tabelasample_product_data
que temid = 1
.
Ao encadear chamadas de método, lembre-se de que a ordem das chamadas é importante. Cada chamada de método retorna um DataFrame que foi transformado. Certifique-se de que as chamadas subsequentes funcionem com a DataFrame transformado.
Por exemplo, no código abaixo, o método select
retorna um DataFrame que contém apenas duas colunas: name
e serial_number
. A chamada ao método filter
neste DataFrame falha porque usa a coluna id
, que não está no DataFrame transformado.
// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") === 1)
Em contraste, o seguinte código é executado com sucesso porque o método filter()
é chamado em um DataFrame que contém todas as colunas da tabela sample_product_data
(incluindo a coluna id
):
// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
Tenha em mente que talvez seja necessário fazer as chamadas ao método select
e filter
em uma ordem diferente daquela em que você usaria as palavras-chave equivalentes (SELECT e WHERE) em uma instrução SQL.
Como limitar o número de linhas em um DataFrame¶
Para limitar o número de linhas em um DataFrame, você pode usar o método de transformação DataFrame.limit.
A API do Snowpark também fornece métodos de ação para recuperar e imprimir um número limitado de linhas:
o método de ação DataFrame.first (para executar a consulta e retornar as primeiras
n
linhas)o método de ação DataFrame.show (para executar a consulta e imprimir as primeiras
n
linhas)
Esses métodos efetivamente adicionam uma cláusula LIMIT à instrução SQL que é executada.
Como explicado nas notas de uso para LIMIT, os resultados não são determinísticos, a menos que você especifique uma ordem de classificação (ORDER BY) em conjunto com LIMIT.
Para manter a cláusula ORDER BY com a cláusula LIMIT (por exemplo, para que ORDER BY não esteja em uma subconsulta separada), você deve chamar o método que limita os resultados do DataFrame retornado pelo método sort
.
Por exemplo, se você estiver encadeando chamadas a métodos:
// Limit the number of rows to 5, sorted by parent_id.
var dfSubset = df.sort(col("parent_id")).limit(5);
// Return the first 5 rows, sorted by parent_id.
var arrayOfRows = df.sort(col("parent_id")).first(5)
// Print the first 5 rows, sorted by parent_id.
df.sort(col("parent_id")).show(5)
Como obter definições das colunas¶
Para obter a definição das colunas no conjunto de dados para o DataFrame, chame o método schema
. Esse método retorna um objeto StructType
que contém uma Array
de objetos StructField
. Cada objeto StructField
contém a definição de uma coluna.
// Get the StructType object that describes the columns in the
// underlying rowset.
val tableSchema = session.table("sample_product_data").schema
println("Schema for sample_product_data: " + tableSchema);
No objeto StructType
retornado, os nomes das colunas são sempre normalizados. Os identificadores sem aspas são retornados em letras maiúsculas e os identificadores entre aspas são retornados no formato de letra conforme foram definidos.
O exemplo a seguir cria um DataFrame contendo as colunas chamadas ID
e 3rd
. Para o nome de coluna 3rd
, a biblioteca do Snowpark coloca o nome automaticamente entre aspas duplas ("3rd"
) porque o nome não cumpre os requisitos para um identificador.
O exemplo chama o método schema
e, em seguida, chama o método names
no objeto retornado StructType
para obter um ArraySeq
de nomes de coluna. Os nomes são normalizados no StructType
devolvido pelo método schema
.
// Create a DataFrame containing the "id" and "3rd" columns.
val dfSelectedColumns = session.table("sample_product_data").select(col("id"), col("3rd"))
// Print out the names of the columns in the schema. This prints out:
// ArraySeq(ID, "3rd")
println(dfSelectedColumns.schema.names.toSeq)
Junção de DataFrames¶
Para juntar objetos DataFrame, chame o método DataFrame.join.
As seções a seguir explicam como usar DataFrames para realizar uma junção:
Configuração dos dados de exemplo para as junções¶
Os exemplos nas próximas seções utilizam dados de exemplo que você pode configurar executando as seguintes instruções SQL:
create or replace table sample_a (
id_a integer,
name_a varchar,
value integer
);
insert into sample_a (id_a, name_a, value) values
(10, 'A1', 5),
(40, 'A2', 10),
(80, 'A3', 15),
(90, 'A4', 20)
;
create or replace table sample_b (
id_b integer,
name_b varchar,
id_a integer,
value integer
);
insert into sample_b (id_b, name_b, id_a, value) values
(4000, 'B1', 40, 10),
(4001, 'B2', 10, 5),
(9000, 'B3', 80, 15),
(9099, 'B4', null, 200)
;
create or replace table sample_c (
id_c integer,
name_c varchar,
id_a integer,
id_b integer
);
insert into sample_c (id_c, name_c, id_a, id_b) values
(1012, 'C1', 10, null),
(1040, 'C2', 40, 4000),
(1041, 'C3', 40, 4001)
;
Como especificar as colunas para a junção¶
Com o método DataFrame.join
, você pode especificar as colunas a serem usadas de uma das seguintes maneiras:
Especifique uma expressão de coluna que descreva a condição de junção.
Especifique uma ou mais colunas que devem ser usadas como colunas comuns na junção.
O exemplo a seguir realiza uma junção interna na coluna chamada id_a
:
// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
Observe que o exemplo usa o método DataFrame.col
para especificar a condição a ser usada para a junção. Consulte Como especificar colunas e expressões para saber mais sobre este método.
Isso imprime a seguinte saída:
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
----------------------------------------------------------------------
Nomes de colunas idênticos duplicados no resultado da junção¶
No DataFrame resultante de uma junção, a biblioteca Snowpark utiliza os nomes das colunas encontradas nas tabelas que foram unidas, mesmo quando os nomes das colunas são idênticos entre tabelas. Quando isto acontece, estes nomes de coluna são duplicados no DataFrame resultante da junção. Para acessar uma coluna duplicada pelo nome, chame o método col
no DataFrame que representa a tabela original da coluna. (Para obter mais informações sobre como especificar colunas, consulte Como se referir a colunas em diferentes DataFrames.)
O código no exemplo a seguir une dois DataFrames, depois chama o método select
no DataFrame que foi unido. Ele especifica as colunas a serem selecionadas chamando o método col
da variável que representa os respectivos objetos DataFrame: dfRhs
e dfLhs
. Ele usa o método as
para dar às colunas novos nomes no DataFrame que o método select
cria.
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
val dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"))
dfSelected.show()
Isso imprime a seguinte saída:
------------------------------
|"LEFTVALUE" |"RIGHTVALUE" |
------------------------------
|5 |5 |
|10 |10 |
|15 |15 |
------------------------------
Eliminação de duplicação de colunas antes de salvar ou armazenar¶
Observe que quando um DataFrame resultante de uma junção inclui nomes de colunas duplicadas, você deve eliminar a duplicação ou renomear colunas para remover a duplicação no DataFrame antes de salvar o resultado em uma tabela ou armazenar o DataFrame em cache. Para nomes de colunas duplicadas em um DataFrame que você salva em uma tabela ou cache, a biblioteca do Snowpark substituirá os nomes de colunas duplicadas por alias para que não sejam mais duplicadas.
O exemplo seguinte ilustra como a saída de um DataFrame em cache poderia aparecer se os nomes das colunas ID_A
e VALUE
fossem duplicados em uma junção de duas tabelas, e depois não fossem deduplicados ou renomeados antes de armazenar o resultado em cache.
--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A" |"NAME_A" |"l_ZSz7_VALUE" |"ID_B" |"NAME_B" |"r_heec_ID_A" |"r_heec_VALUE" |
--------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
--------------------------------------------------------------------------------------------------
Como realizar uma junção natural¶
Para realizar uma junção natural (onde DataFrames são unidas em colunas que têm o mesmo nome), chame o método DataFrame.naturalJoin.
O exemplo a seguir junta os DataFrames para as tabelas sample_a
e sample_b
nas colunas comuns (a coluna id_a
):
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
Isso imprime a seguinte saída:
---------------------------------------------------
|"ID_A" |"VALUE" |"NAME_A" |"ID_B" |"NAME_B" |
---------------------------------------------------
|10 |5 |A1 |4001 |B2 |
|40 |10 |A2 |4000 |B1 |
|80 |15 |A3 |9000 |B3 |
---------------------------------------------------
Como especificar o tipo de junção¶
Por padrão, o método DataFrame.join
cria uma junção interna. Para especificar um tipo diferente de junção, defina o argumento joinType
como um dos seguintes valores:
Tipo de junção |
|
---|---|
Junção interna |
|
Junção externa esquerda |
|
Junção externa direita |
|
Junção externa completa |
|
Junção cruzada |
|
Por exemplo:
// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"), "left")
dfLeftOuterJoin.show()
Isso imprime a seguinte saída:
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|40 |A2 |10 |4000 |B1 |40 |10 |
|10 |A1 |5 |4001 |B2 |10 |5 |
|80 |A3 |15 |9000 |B3 |80 |15 |
|90 |A4 |20 |NULL |NULL |NULL |NULL |
----------------------------------------------------------------------
Junção de tabelas múltiplas¶
Para juntar tabelas múltiplas:
Crie um DataFrame para cada tabela.
Chame o método
DataFrame.join
no primeiro DataFrame passando o segundo DataFrame.Usando o DataFrame retornado pelo método
join
, chame o métodojoin
, passando o terceiro DataFrame.
Você pode encadear as chamadas join
como mostrado abaixo:
val dfFirst = session.table("sample_a")
val dfSecond = session.table("sample_b")
val dfThird = session.table("sample_c")
val dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a") === dfSecond.col("id_a")).join(dfThird, dfFirst.col("id_a") === dfThird.col("id_a"))
dfJoinThreeTables.show()
Isso imprime a seguinte saída:
------------------------------------------------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |"ID_C" |"NAME_C" |"ID_A" |"ID_B" |
------------------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |1012 |C1 |10 |NULL |
|40 |A2 |10 |4000 |B1 |40 |10 |1040 |C2 |40 |4000 |
|40 |A2 |10 |4000 |B1 |40 |10 |1041 |C3 |40 |4001 |
------------------------------------------------------------------------------------------------------------
Como realizar uma autojunção¶
Se você precisar unir uma tabela com ela mesma em diferentes colunas, não poderá realizar a autojunção com um único DataFrame. Os exemplos a seguir usam um único DataFrame para executar uma junção automática, que falha pois as expressões de coluna para "id"
estão presentes nos lados esquerdo e direito da junção:
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, col("id") === col("parent_id"))
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, df("id") === df("parent_id"))
Ambos os exemplos falham, com a seguinte exceção:
Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
Instead, join this DataFrame to a clone() of itself.
Em vez disso, use o método DataFrame.clone para criar um clone do objeto DataFrame e use os dois objetos DataFrame para realizar a junção:
// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
val dfLhs = session.table("sample_product_data")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()
// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))
dfJoined.show()
Se você quiser realizar uma autojunção na mesma coluna, chame o método join
que passa um Seq
de expressões de coluna para a cláusula USING
:
// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
val df = session.table("sample_product_data");
val dfJoined = df.join(df, Seq("key"))
Como executar uma ação para avaliar um DataFrame¶
Como mencionado anteriormente, o DataFrame é avaliado lentamente, o que significa que a instrução SQL não é enviada ao servidor para execução até que você execute uma ação. Uma ação faz com que o DataFrame seja avaliado e envia a instrução SQL correspondente para o servidor para execução.
As seções a seguir explicam como executar uma ação de forma síncrona e assíncrona em um DataFrame:
Como executar uma ação de forma síncrona¶
Para executar uma ação de forma síncrona, chame um dos seguintes métodos de ação:
Método para executar uma ação de forma síncrona |
Descrição |
---|---|
|
Avalia o DataFrame e retorna o conjunto de dados resultante como uma |
|
Avalia o DataFrame e devolve um Iterator de objetos Row. Se o conjunto de resultados for grande, use esse método para evitar carregar todos os resultados na memória de uma só vez. Consulte Como retornar um iterador para as linhas. |
|
Avalia o DataFrame e retorna o número de linhas. |
|
Avalia o DataFrame e imprime as linhas no console. Observe que este método limita o número de linhas a 10 (por padrão). Consulte Como imprimir as linhas em um DataFrame. |
|
Executa a consulta, cria uma tabela temporária e coloca os resultados na tabela. O método retorna um objeto |
|
Salva os dados no DataFrame para a tabela especificada. Consulte Como salvar dados em uma tabela. |
|
Salva um DataFrame para um arquivo especificado em um estágio. Consulte Como salvar um DataFrame para arquivos em um estágio. |
|
Copia os dados no DataFrame para a tabela especificada. Consulte Como copiar dados de arquivos para uma tabela. |
|
Exclui linhas na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela. |
|
Atualiza linhas na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela. |
|
Mescla as linhas na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela. |
Para executar a consulta e retornar o número de resultados, chame o método count
:
// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")
// Send the query to the server for execution and
// print the count of rows in the table.
println("Rows returned: " + dfProducts.count())
Você também pode chamar métodos de ação para:
Nota: Se você estiver chamando o método schema
para obter as definições das colunas no DataFrame, você não precisa chamar um método de ação.
Como executar uma ação de forma assíncrona¶
Nota
Esse recurso foi introduzido no Snowpark 0.11.0.
Para executar uma ação de forma assíncrona, chame o método async
para retornar um objeto “ator assíncrono” (por exemplo, DataFrameAsyncActor
) e chamar um método de ação assíncrona nesse objeto.
Esses métodos de ação de um objeto de ator assíncrono retornam um objeto TypedAsyncJob
, que você pode usar para verificar o status da ação assíncrona e obter os resultados da ação.
As próximas seções explicam como executar as ações de forma assíncrona e verificar os resultados.
Explicação do fluxo básico de ações assíncronas¶
Você pode usar os seguintes métodos para executar uma ação de forma assíncrona:
Método para realizar uma ação de forma assíncrona |
Descrição |
---|---|
|
Avalia o DataFrame de forma assíncrona para obter o conjunto de dados resultante como um |
|
Avalia de forma assíncrona o DataFrame para recuperar um Iterator de objetos Row. Se o conjunto de resultados for grande, use este método para evitar carregar todos os resultados na memória de uma só vez. Consulte Como retornar um iterador para as linhas. |
|
Avalia o DataFrame de forma assíncrona para obter o número de linhas. |
|
Salva de forma assíncrona os dados no DataFrame para a tabela especificada. Consulte Como salvar dados em uma tabela. |
|
Salva um DataFrame para um arquivo especificado em um estágio. Consulte Como salvar um DataFrame para arquivos em um estágio. |
|
Copia de forma assíncrona os dados no DataFrame para a tabela especificada. Consulte Como copiar dados de arquivos para uma tabela. |
|
Exclui linhas de forma assíncrona na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela. |
|
Atualiza linhas de forma assíncrona na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela. |
|
Mescla assincronamente as linhas na tabela especificada. Compatível com a versão 1.3.0 ou posterior. Consulte Atualização, eliminação e fusão de linhas em uma tabela. |
A partir do objeto TypedAsyncJob retornado, você pode fazer o seguinte:
Para determinar se a ação foi concluída, chame o método
isDone
.Para obter a ID da consulta que corresponde à ação, chame o método
getQueryId
.Para retornar os resultados da ação (por exemplo, o
Array
de objetosRow
para o métodocollect
ou a contagem de linhas para o métodocount
), chame o métodogetResult
.Note que
getResult
é uma chamada com bloqueio.Para cancelar a ação, chame o método
cancel
.
Por exemplo, para executar uma consulta de forma assíncrona e obter os resultados como uma Array
de objetos Row
, chame DataFrame.async.collect
:
// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
val df = session.table("sample_product_data").select(col("id"), col("name"))
// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.collect()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
val results = asyncJob.getResult()
results.foreach(println)
Para executar a consulta de forma assíncrona e obter o número de resultados, chame DataFrame.async.count
:
// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")
// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.count()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Print the count of rows in the table.
// Note that getResult is a blocking call.
println("Rows returned: " + asyncJob.getResult())
Como especificar o número máximo de segundos a esperar¶
Ao chamar o método getResult
, você pode usar o argumento maxWaitTimeInSeconds
para especificar o número máximo de segundos para esperar que a consulta seja concluída antes de tentar obter os resultados. Por exemplo:
// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
val results = asyncJob.getResult(10)
Se você omitir esse argumento, o método aguardará o número máximo de segundos especificado pela propriedade de configuração snowpark_request_timeout_in_seconds. (Esta é uma propriedade que você pode definir ao criar o objeto Session.)
Como acessar uma consulta assíncrona por ID¶
Se você tiver a ID de uma consulta assíncrona que você enviou anteriormente, você pode chamar o método Session.createAsyncJob
para criar um objeto AsyncJob que você pode usar para verificar o status da consulta, obter os resultados da consulta ou cancelar a consulta.
Note que ao contrário de TypedAsyncJob
, AsyncJob
não fornece um método getResult
para obter os resultados. Se você precisar recuperar os resultados, chame o getRows
ou getIterator
em vez disso.
Por exemplo:
val asyncJob = session.createAsyncJob(myQueryId)
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
val rows = asyncJob.getRows()
rows.foreach(println)
Como obter linhas em um DataFrame¶
Depois de você especificar como o DataFrame deve ser transformado, você pode chamar um método de ação para executar uma consulta e retornar os resultados. Você pode retornar todas as linhas em um Array
, ou você pode retornar um Iterator que permite iterar os resultados, linha por linha. No último caso, se a quantidade de dados for grande, as linhas são carregadas na memória por parte para evitar o carregamento de uma grande quantidade de dados na memória.
Como retornar todas as linhas¶
Para retornar todas as linhas de uma vez, chame o método DataFrame.collect. Este método retorna uma matriz de objetos Row. Para recuperar os valores da linha, chame o método getType
(por exemplo getString
, getInt
etc).
Por exemplo:
import com.snowflake.snowpark.functions_
val rows = session.table("sample_product_data").select(col("name"), col("category_id")).sort(col("name")).collect()
for (row <- rows) {
println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
Como retornar um iterador para as linhas¶
Se você quiser usar um Iterator para iterar sobre objetos Row nos resultados, chame DataFrame.toLocalIterator. Se a quantidade de dados nos resultados for grande, o método carrega as linhas por parte para evitar carregar todas as linhas na memória de uma vez.
Por exemplo:
import com.snowflake.snowpark.functions_
while (rowIterator.hasNext) {
val row = rowIterator.next()
println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
Como retornar as primeiras n
linhas¶
Para retornar as primeiras n
linhas, chame o método DataFrame.first, passando o número de linhas a retornar.
Conforme explicado em Como limitar o número de linhas em um DataFrame, os resultados são não determinísticos. Se você deseja que os resultados sejam determinísticos, chame este método em um DataFrame (df.sort().first()
).
Por exemplo:
import com.snowflake.snowpark.functions_
val df = session.table("sample_product_data")
val rows = df.sort(col("name")).first(5)
rows.foreach(println)
Como imprimir as linhas em um DataFrame¶
Para imprimir as primeiras 10 linhas no DataFrame para o console, chame o método DataFrame.show. Para imprimir um número diferente de linhas, passe o número de linhas a imprimir.
Conforme explicado em Como limitar o número de linhas em um DataFrame, os resultados são não determinísticos. Se você deseja que os resultados sejam determinísticos, chame este método em um DataFrame (df.sort().show()
).
Por exemplo:
import com.snowflake.snowpark.functions_
val df = session.table("sample_product_data")
df.sort(col("name")).show()
Atualização, eliminação e fusão de linhas em uma tabela¶
Nota
Esse recurso foi introduzido no Snowpark 0.7.0.
Quando você chama Session.table
para criar um objeto DataFrame
para uma tabela, o método retorna um objeto Updatable
, que estende o DataFrame
com métodos adicionais para atualização e exclusão de dados na tabela. (Consulte Updatable.)
Se você precisar atualizar ou excluir linhas em uma tabela, você pode usar os seguintes métodos da classe Updatable
:
Chame
update
para atualizar as linhas existentes na tabela. Consulte Atualização de linhas em uma tabela.Chame
delete
para excluir linhas de uma tabela. Consulte Como excluir linhas de uma tabela.Chame
merge
para inserir, atualizar e excluir linhas em uma tabela, com base nos dados de uma segunda tabela ou subconsulta. (Isto é o equivalente ao comando MERGE em SQL.) Consulte Fusão de linhas em uma tabela.
Atualização de linhas em uma tabela¶
Para o método update
, passe um Map
que associe as colunas a serem atualizadas e os valores correspondentes a serem atribuídos a essas colunas. update
retorna um objeto UpdateResult
, que contém o número de linhas que foram atualizadas. (Consulte UpdateResult.)
Nota
update
é um método de ação, o que significa que chamar o método envia instruções SQL ao servidor para execução.
Por exemplo, para substituir os valores na coluna chamada count
pelo valor 1
:
val updatableDf = session.table("sample_product_data")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")
O exemplo acima usa o nome da coluna para identificar a coluna. Você também pode usar uma expressão de coluna:
val updateResult = updatableDf.update(Map(col("count") -> lit(1)))
Se a atualização deve ser feita apenas quando uma condição for atendida, você pode especificar essa condição como um argumento. Por exemplo, para substituir os valores na coluna chamada count
para linhas em que a coluna category_id
tem o valor 20
:
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), col("category_id") === 20)
Se você precisar basear a condição em uma junção com um objeto DataFrame
, você pode passar isso DataFrame
como um argumento e usar isso DataFrame
na condição. Por exemplo, para substituir os valores na coluna chamada count
para linhas em que a coluna category_id
corresponde a category_id
no DataFrame
dfParts
:
val updatableDf = session.table("sample_product_data")
val dfParts = session.table("parts")
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), updatableDf("category_id") === dfParts("category_id"), dfParts)
Como excluir linhas de uma tabela¶
Para o método delete
, você pode especificar uma condição que identifica as linhas a serem excluídas, e pode basear essa condição em uma junção com outro DataFrame. delete
retorna um objeto DeleteResult
, contendo o número de linhas que foram excluídas. (Consulte DeleteResult.)
Nota
delete
é um método de ação, o que significa que chamar o método envia instruções SQL ao servidor para execução.
Por exemplo, para apagar as linhas que têm o valor 1
na coluna category_id
:
val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === 1)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
Se a condição se referir a colunas em um DataFrame diferente, passe esse DataFrame como o segundo argumento. Por exemplo, para excluir as linhas nas quais a coluna category_id
corresponde ao category_id
no DataFrame
dfParts
, passe dfParts
como o segundo argumento:
val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === dfParts("category_id"), dfParts)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
Fusão de linhas em uma tabela¶
Para inserir, atualizar e excluir linhas em uma tabela com base nos valores de uma segunda tabela ou subconsulta (o equivalente ao comando MERGE em SQL), faça o seguinte:
No objeto
Updatable
para a tabela onde você quer fundir os dados, chame o métodomerge
, passando o objetoDataFrame
para a outra tabela e a expressão da coluna para a condição de junção.Isso retorna um objeto
MergeBuilder
que você pode usar para especificar as ações a serem tomadas (por exemplo, inserir, atualizar ou excluir) nas linhas que correspondem e nas que não correspondem aos critérios. (Consulte MergeBuilder.)Usando o objeto
MergeBuilder
:Para especificar a atualização ou exclusão que deve ser feita nas linhas correspondentes, chame o método
whenMatched
.Se você precisar especificar uma condição adicional quando as linhas devem ser atualizadas ou excluídas, você pode passar uma expressão de coluna para essa condição.
Esse método retorna um objeto
MatchedClauseBuilder
que você pode usar para especificar a ação a ser executada. (Consulte MatchedClauseBuilder.)Chame o método
update
oudelete
no objetoMatchedClauseBuilder
para especificar a ação de atualização ou exclusão que deve ser executada nas linhas correspondentes. Esses métodos retornam um objetoMergeBuilder
que você pode usar para especificar cláusulas adicionais.Para especificar a inserção que deve ser feita quando as linhas não correspondem, chame o método
whenNotMatched
.Se você precisar especificar uma condição adicional quando as linhas devem ser inseridas, você pode passar uma expressão de coluna para essa condição.
Esse método retorna um objeto
NotMatchedClauseBuilder
que você pode usar para especificar a ação a ser executada. (Consulte NotMatchedClauseBuilder.)Chame o método
insert
no objetoNotMatchedClauseBuilder
para especificar a ação de inserção que deve ser executada quando as linhas não correspondem. Esses métodos devolvem um objetoMergeBuilder
que você pode usar para especificar cláusulas adicionais.
Quando terminar de especificar as inserções, atualizações e exclusões que devem ser realizadas, chame o método
collect
do objetoMergeBuilder
para realizar as inserções, atualizações e exclusões especificadas na tabela.collect
retorna um objetoMergeResult
contendo o número de linhas que foram inseridas, atualizadas e excluídas. (Consulte MergeResult.)
O exemplo a seguir insere uma linha com as colunas id
e value
da tabela source
na tabela target
se a tabela target
não contiver uma linha com uma ID correspondente:
val mergeResult = target.merge(source, target("id") === source("id"))
.whenNotMatched.insert(Seq(source("id"), source("value")))
.collect()
O exemplo a seguir atualiza uma linha na tabela target
com o valor da coluna value
da linha na tabela source
que tem a mesma ID:
val mergeResult = target.merge(source, target("id") === source("id"))
.whenMatched.update(Map("value" -> source("value")))
.collect()
Como salvar dados em uma tabela¶
Você pode salvar o conteúdo de um DataFrame para uma tabela nova ou existente. Para fazer isso, você deve ter os seguintes privilégios:
Privilégios CREATE TABLE sobre o esquema, se a tabela não existir.
Privilégios INSERT sobre a tabela.
Para salvar o conteúdo de um DataFrame em uma tabela:
Chame o método DataFrame.write para obter um objeto DataFrameWriter.
Chame o método DataFrameWriter.mode, passando um objeto SaveMode que especifica suas preferências para escrever na tabela:
Para inserir linhas, passe
SaveMode.Append
.Para substituir a tabela existente, passe
SaveMode.Overwrite
.
Esse método retorna o mesmo objeto
DataFrameWriter
configurado com o modo especificado.Se você estiver inserindo linhas em uma tabela existente (
SaveMode.Append
) e os nomes das colunas no DataFrame corresponderem aos nomes das colunas na tabela, chame o método DataFrameWriter.option passando"columnOrder"
e"name"
como argumentos.Nota
Esse método foi introduzido no Snowpark 1.4.0.
Por padrão, a opção
columnOrder
é definida como"index"
, o que significa que oDataFrameWriter
insere os valores na ordem em que as colunas aparecem. Por exemplo,DataFrameWriter
insere o valor da primeira coluna do DataFrame na primeira coluna da tabela, a segunda coluna do DataFrame na segunda coluna da tabela etc.Esse método retorna o mesmo objeto
DataFrameWriter
configurado com a opção especificada.Chame o DataFrameWriter.saveAsTable para salvar o conteúdo do DataFrame em uma tabela especificada.
Você não precisa chamar um método separado (por exemplo,
collect
) para executar a instrução SQL que salva os dados na tabela.saveAsTable
é um método de ação que executa a instrução SQL.
O exemplo a seguir substitui uma tabela existente (identificada pela variável tableName
) com o conteúdo do DataFrame df
:
df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)
O exemplo a seguir insere linhas do DataFrame df
em uma tabela existente (identificada pela variável tableName
). Nesse exemplo, a tabela e o DataFrame ambos contêm as colunas c1
e c2
.
O exemplo demonstra a diferença entre configurar a opção columnOrder
para "name"
(que insere valores nas colunas da tabela com os mesmos nomes das colunas do DataFrame) e usar a opção padrão columnOrder
(que insere valores nas colunas da tabela com base na ordem das colunas no DataFrame).
val df = session.sql("SELECT 1 AS c2, 2 as c1")
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write.mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName)
// With the default value of the columnOrder option ("index"), the DataFrameWriter the uses column positions
// and inserts a row with the values (1, 2).
df.write.mode(SaveMode.Append).saveAsTable(tableName)
Como criar uma exibição a partir de um DataFrame¶
Para criar uma exibição a partir de um DataFrame, chame o método DataFrame.createOrReplaceView:
df.createOrReplaceView("db.schema.viewName")
Note que chamando createOrReplaceView
cria imediatamente a nova exibição. Mais importante ainda, não faz com que o DataFrame seja avaliado. (O DataFrame em si não é avaliado até você executar uma ação.)
As exibições que você cria chamando createOrReplaceView
são persistentes. Se você não precisar mais dessa exibição, você pode descartar a exibição manualmente.
Se você precisar criar uma exibição temporária apenas para a sessão, chame o método DataFrame.createOrReplaceTempView em vez disso:
df.createOrReplaceTempView("db.schema.viewName")
Como criar um cache do DataFrame¶
Em alguns casos, pode ser necessário executar uma consulta complexa e manter os resultados para uso em operações subsequentes (em vez de executar a mesma consulta novamente). Nesses casos, você pode armazenar em cache o conteúdo de um DataFrame chamando o método DataFrame.cacheResult.
Esse método:
Executa a consulta.
Você não precisa chamar um método de ação separado para recuperar os resultados antes de chamar
cacheResult
.cacheResult
é um método de ação que executa a consulta.Salva os resultados em uma tabela temporária
Como
cacheResult
cria uma tabela temporária, você deve ter o privilégio CREATE TABLE sobre o esquema que está em uso.Retorna um objeto HasCachedResult, que dá acesso aos resultados da tabela temporária.
Como
HasCachedResult
estendeDataFrame
, você pode realizar algumas das mesmas operações sobre estes dados em cache que pode realizar em um DataFrame.
Nota
Como cacheResult
executa a consulta e salva os resultados em uma tabela, o método pode resultar em um aumento dos custos de computação e armazenamento.
Por exemplo:
import com.snowflake.snowpark.functions_
// Set up a DataFrame to query a table.
val df = session.table("sample_product_data").filter(col("category_id") > 10)
// Retrieve the results and cache the data.
val cachedDf = df.cacheResult()
// Create a DataFrame containing a subset of the cached data.
val dfSubset = cachedDf.filter(col("category_id") === lit(20)).select(col("name"), col("category_id"))
dfSubset.show()
Note que o DataFrame original não é afetado quando você chama esse método. Por exemplo, suponha que dfTable
seja um DataFrame para a tabela sample_product_data
:
val dfTempTable = dfTable.cacheResult()
Depois de chamar cacheResult
, dfTable
ainda aponta para a tabela sample_product_data
, e você pode continuar a usar dfTable
para consultar e atualizar essa tabela.
Para usar os dados em cache na tabela temporária, você usa dfTempTable
(o objeto HasCachedResult
retornado por cacheResult
).
Como trabalhar com arquivos em um estágio¶
A biblioteca do Snowpark fornece classes e métodos que você pode usar para carregar dados no Snowflake e descarregar dados do Snowflake usando arquivos em estágios.
Nota
Para utilizar essas classes e métodos em um estágio, você deve ter os privilégios necessários para trabalhar com o estágio.
As próximas seções explicam como utilizar essas classes e métodos:
Carregamento e descarregamento de arquivos em um estágio¶
Para carregar e baixar arquivos em um estágio, use o objeto FileOperation:
Carregamento de arquivos para um estágio¶
Para carregar arquivos para um estágio:
Verifique se você tem os privilégios de carregar arquivos para o estágio.
Use Session.file para acessar o objeto FileOperation para a sessão.
Chame o método FileOperation.put para carregar os arquivos em um estágio.
Esse método executa um comando SQL PUT.
Para especificar quaisquer parâmetros opcionais para o comando PUT, crie um
Map
dos parâmetros e valores e passe oMap
como o argumentooptions
. Por exemplo:// Upload a file to a stage without compressing the file. val putOptions = Map("AUTO_COMPRESS" -> "FALSE") val putResults = session.file.put("file:///tmp/myfile.csv", "@myStage", putOptions)
No argumento
localFilePath
, você pode usar curingas (*
e?
) para identificar um conjunto de arquivos a ser carregado. Por exemplo:// Upload the CSV files in /tmp with names that start with "file". // You can use the wildcard characters "*" and "?" to match multiple files. val putResults = session.file.put("file:///tmp/file*.csv", "@myStage/prefix2")
Verifique o
Array
de objetos PutResult retornados pelo métodoput
para determinar se os arquivos foram carregados com sucesso. Por exemplo, para imprimir o nome do arquivo e o status da operação PUT para esse arquivo:// Print the filename and the status of the PUT operation. putResults.foreach(r => println(s" ${r.sourceFileName}: ${r.status}"))
Como baixar arquivos de um estágio¶
Para baixar arquivos de uma estágio:
Verifique se você tem os privilégios de carregar arquivos para o estágio.
Use Session.file para acessar o objeto FileOperation para a sessão.
Chame o método FileOperation.get para baixar os arquivos de um estágio.
Esse método executa um comando SQL GET.
Para especificar quaisquer parâmetros opcionais para o comando GET, crie um
Map
dos parâmetros e valores e passe oMap
como o argumentooptions
. Por exemplo:// Download files with names that match a regular expression pattern. val getOptions = Map("PATTERN" -> s"'.*file_.*.csv.gz'") val getResults = session.file.get("@myStage", "file:///tmp", getOptions)
Verifique o
Array
de objetos GetResult retornados pelo métodoget
para determinar se os arquivos foram baixados com sucesso. Por exemplo, para imprimir o nome do arquivo e o status da operação GET para esse arquivo:// Print the filename and the status of the GET operation. getResults.foreach(r => println(s" ${r.fileName}: ${r.status}"))
Como usar fluxos de entrada para carregar e descarregar dados em um estágio¶
Nota
Esse recurso foi introduzido no Snowpark 1.4.0.
Para usar fluxos de entrada para carregar dados em um arquivo em um estágio e baixar dados de um arquivo em um estágio, use os métodos uploadStream
e downloadStream
do objeto FileOperation:
Como usar um fluxo de entrada para carregar dados em um arquivo em um estágio
Como usar um fluxo de entrada para baixar dados de um arquivo em um estágio
Como usar um fluxo de entrada para carregar dados em um arquivo em um estágio¶
Para carregar os dados de um objeto java.io.InputStream em um arquivo em um estágio:
Verifique se você tem os privilégios de carregar arquivos para o estágio.
Use Session.file para acessar o objeto FileOperation para a sessão.
Chame o método FileOperation.uploadStream.
Passe o caminho completo para o arquivo no estágio onde os dados devem ser escritos e o objeto
InputStream
. Além disso, use o argumentocompress
para especificar se os dados devem ou não ser compactados antes de serem carregados.
Por exemplo:
import java.io.InputStream
...
val compressData = true
val pathToFileOnStage = "@myStage/path/file"
session.file.uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData)
Como usar um fluxo de entrada para baixar dados de um arquivo em um estágio¶
Para baixar dados de um arquivo em um estágio para um objeto java.io.io.InputStream:
Verifique se você tem os privilégios de carregar arquivos para o estágio.
Use Session.file para acessar o objeto FileOperation para a sessão.
Chame o método FileOperation.downloadStream.
Passe o caminho completo para o arquivo no estágio contendo os dados a serem baixados. Use o argumento
decompress
para especificar se os dados no arquivo estão ou não compactados.
Por exemplo:
import java.io.InputStream
...
val isDataCompressed = true
val pathToFileOnStage = "@myStage/path/file"
val is = session.file.downloadStream(pathToFileOnStage, isDataCompressed)
Como configurar um DataFrame para arquivos em um estágio¶
Esta seção explica como configurar um DataFrame para arquivos em um estágio do Snowflake. Depois de criar esse DataFrame, você pode usar o DataFrame para:
Para configurar um DataFrame para arquivos em um estágio do Snowflake, use a classe DataFrameReader
:
Verifique se você tem os seguintes privilégios:
Um dos seguintes:
privilégios CREATE TABLE no esquema, se você planeja especificar opções de cópia que determinam como os dados são copiados dos arquivos preparados.
Em outros casos, privilégios CREATE FILE FORMAT sobre o esquema.
Chame o método
read
na classeSession
para acessar um objetoDataFrameReader
.Se os arquivos estiverem no formato CSV, descreva os campos no arquivo. Para fazer isso:
crie um objeto StructType que consiste em uma sequência de objetos StructField descrevendo os campos no arquivo.
Para cada objeto
StructField
, especifique o seguinte:O nome do campo.
O tipo de dados do campo (especificado como um objeto no pacote
com.snowflake.snowpark.types
).Se o campo é ou não anulável.
Por exemplo:
import com.snowflake.snowpark.types._ val schemaForDataFile = StructType( Seq( StructField("id", StringType, true), StructField("name", StringType, true)))
Chame o método
schema
no objetoDataFrameReader
, passando o objetoStructType
.Por exemplo:
var dfReader = session.read.schema(schemaForDataFile)
O método
schema
retorna um objetoDataFrameReader
que é configurado para ler arquivos contendo os campos especificados.Observe que você não precisa fazer isso para arquivos em outros formatos (como JSON). Para esses arquivos, o
DataFrameReader
trata os dados como um único campo do tipo VARIANT com o nome do campo$1
.
Se você precisar especificar informações adicionais sobre como os dados devem ser lidos (por exemplo, que os dados estão comprimidos ou que um arquivo CSV usa um ponto-e-vírgula em vez de uma vírgula para delimitar campos), chame o método DataFrameReader.option ou o método DataFrameReader.options.
Passe o nome e o valor da opção que deseja definir. Você pode definir os seguintes tipos de opções:
As opções de formato de arquivo descritas na documentação em CREATE FILE FORMAT.
As opções de cópia descritas na documentação COPY INTO TABLE.
Observe que a definição de opções de cópia pode resultar em uma estratégia de execução mais cara quando você obtém os dados para o DataFrame.
O exemplo a seguir configura o objeto
DataFrameReader
para consultar dados em um arquivo CSV que não é compactado e que usa um ponto-e-vírgula como delimitador de campo.dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
O método
option
retorna um objetoDataFrameReader
que é configurado com a opção especificada.Para definir várias opções, você pode encadear chamadas ao método
option
(como mostrado no exemplo acima) ou chamar o método DataFrameReader.options, passando umMap
dos nomes e valores das opções.Chame o método correspondente ao formato dos arquivos. Você pode chamar um dos seguintes métodos:
Ao chamar esses métodos, passe o local do estágio dos arquivos a serem lidos. Por exemplo:
val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
Para especificar vários arquivos que começam com o mesmo prefixo, especifique o prefixo após o nome do estágio. Por exemplo, para carregar arquivos com o prefixo
csv_
do estágio@mystage
:val df = dfReader.csv("@mystage/csv_")
Os métodos correspondentes ao formato de um arquivo retornam um objeto CopyableDataFrame para esse arquivo.
CopyableDataFrame
estendeDataFrame
e fornece métodos adicionais para trabalhar os dados em arquivos preparados.Chame um método de ação para:
Como no caso de DataFrames para tabelas, os dados não são obtidos no DataFrame até você chamar um método de ação.
Carregamento de dados de arquivos em um DataFrame¶
Depois que você configurar um DataFrame para arquivos em um estágio, você pode carregar dados dos arquivos para o DataFrame:
Use os métodos do objeto DataFrame para realizar quaisquer transformações necessárias no conjunto de dados (por exemplo, selecionar campos específicos, filtrar linhas, etc.).
Por exemplo, para extrair o elemento
color
de um arquivo JSON chamadodata.json
no estágio chamadomystage
:val df = session.read.json("@mystage/data.json").select(col("$1")("color"))
Como explicado anteriormente, para arquivos em formatos diferentes de CSV (por exemplo JSON), o
DataFrameReader
trata os dados no arquivo como uma única coluna VARIANT com o nome$1
.Chame o método
DataFrame.collect
para carregar os dados. Por exemplo:val results = df.collect()
Como copiar dados de arquivos para uma tabela¶
Depois de configurar um DataFrame para arquivos em um estágio, você pode chamar o método CopyableDataFrame.copyInto para copiar os dados em uma tabela. Este método executa o comando COPY INTO <tabela>.
Nota
Você não precisa chamar o método collect
antes de chamar copyInto
. Os dados do arquivo não precisam estar no DataFrame antes de chamar copyInto
.
Por exemplo, o código a seguir carrega dados do arquivo CSV especificado por myFileStage
na tabela mytable
. Como os dados estão em um arquivo CSV, o código também deve descrever os campos no arquivo. O exemplo faz isso chamando o método DataFrameReader.schema e passando em um objeto StructType (csvFileSchema
) que contém uma sequência de objetos StructField que descrevem os campos.
val df = session.read.schema(csvFileSchema).csv(myFileStage)
df.copyInto("mytable")
Como salvar um DataFrame para arquivos em um estágio¶
Nota
Esse recurso foi introduzido no Snowpark 1.5.0.
Se você precisar salvar um DataFrame para arquivos em um estágio, você pode chamar o método DataFrameWriter correspondente ao formato do arquivo (por exemplo, o método csv
para escrever em um arquivo CSV), passando o local do estágio onde os arquivos devem ser salvos. Esses métodos DataFrameWriter
executam o comando COPY INTO <local>.
Nota
Você não precisa chamar o método collect
antes de chamar esses métodos DataFrameWriter
. Os dados do arquivo não precisam estar no DataFrame antes de chamar esses métodos.
Para salvar o conteúdo de um DataFrame em arquivos em um estágio:
Chame o método DataFrame.write para obter um objeto DataFrameWriter. Por exemplo, para obter o objeto
DataFrameWriter
para um DataFrame que representa a tabela chamadasample_product_data
:dfWriter = session.table("sample_product_data").write
Se você quiser sobrescrever o conteúdo do arquivo (se o arquivo existir), chame o método DataFrameWriter.mode, passando
SaveMode.Overwrite
.Caso contrário, por padrão, o
DataFrameWriter
reporta um erro se o arquivo especificado no estágio já existir.O método
mode
retorna o mesmo objetoDataFrameWriter
configurado com o modo especificado.Por exemplo, para especificar que o
DataFrameWriter
deve substituir o arquivo no estágio:dfWriter = dfWriter.mode(SaveMode.Overwrite)
Se você precisar especificar informações adicionais sobre como os dados devem ser salvos (por exemplo, que os dados devem ser compactados ou que você deseja usar um ponto-e-vírgula para delimitar campos em um arquivo CSV), chame o método DataFrameWriter.option ou o método DataFrameWriter.options.
Passe o nome e o valor da opção que deseja definir. Você pode definir os seguintes tipos de opções:
As opções de formato de arquivo descritas na documentação em COPY INTO <location>.
As opções de cópia descritas na documentação em COPY INTO <location>.
Observe que você não pode usar o método
option
para definir as seguintes opções:A opção do tipo de formato TYPE.
A opção de cópia OVERWRITE. Para definir esta opção, chame o método
mode
em vez disso (conforme mencionado na etapa anterior).
O exemplo a seguir configura o objeto
DataFrameWriter
para salvar dados em um arquivo CSV sem compressão, usando um ponto-e-vírgula (no lugar de uma vírgula) como delimitador de campo.dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE")
O método
option
retorna um objetoDataFrameWriter
que é configurado com a opção especificada.Para definir várias opções, você pode encadear chamadas ao método
option
(como mostrado no exemplo acima) ou chamar o método DataFrameWriter.options, passando umMap
dos nomes e valores das opções.Para retornar detalhes sobre cada arquivo que foi salvo, defina a
DETAILED_OUTPUT
opção de cópia comoTRUE
.Por padrão,
DETAILED_OUTPUT
éFALSE
, o que significa que o método retorna uma única linha de saída contendo os campos"rows_unloaded"
,"input_bytes"
e"output_bytes"
.Quando você define
DETAILED_OUTPUT
comoTRUE
, o método retorna uma linha de saída para cada arquivo salvo. Cada linha contém os camposFILE_NAME
,FILE_SIZE
eROW_COUNT
.Chame o método correspondente ao formato do arquivo para salvar os dados no arquivo. Você pode chamar um dos seguintes métodos:
Ao chamar esses métodos, passe o local do estágio do arquivo onde os dados devem ser escritos (por exemplo,
@mystage
).Por padrão, o método salva os dados em nomes de arquivos com o prefixo
data_
(por exemplo:@mystage/data_0_0_0.csv
). Se você deseja que os arquivos sejam nomeados com um prefixo diferente, especifique o prefixo após o nome do estágio. Por exemplo:val writeFileResult = dfWriter.csv("@mystage/saved_data")
Esse exemplo salva o conteúdo do DataFrame em arquivos que começam com o prefixo
saved_data
(por exemplo,@mystage/saved_data_0_0_0.csv
).Verifique o objeto WriteFileResult retornado para obter informações sobre a quantidade de dados escritos no arquivo.
A partir do objeto
WriteFileResult
, você pode acessar a saída produzida pelo comando COPY INTO <location>:Para acessar as linhas de saída como uma array de objetos Row, use o membro de valor
rows
.Para determinar quais campos estão presentes nas linhas, use o membro de valor
schema
, que é um StructType que descreve os campos na linha.
Por exemplo, para imprimir os nomes dos campos e valores nas linhas de saída:
val writeFileResult = dfWriter.csv("@mystage/saved_data") for ((row, index) <- writeFileResult.rows.zipWithIndex) { (writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach { (structField, element) => println(s"${structField.name}: $element") } }
O exemplo a seguir usa um DataFrame para salvar o conteúdo da tabela chamada car_sales
para arquivos JSON com o prefixo saved_data
no estágio @mystage
(por exemplo, @mystage/saved_data_0_0_0.json
). O código de exemplo:
Substitui o arquivo se ele já existir no estágio.
Retorna resultados detalhados sobre a operação de salvar.
Salva os dados sem compressão.
Finalmente, o código de exemplo imprime cada campo e valor nas linhas de saída retornadas:
val df = session.table("car_sales")
val writeFileResult = df.write.mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data")
for ((row, index) <- writeFileResult.rows.zipWithIndex) {
println(s"Row: $index")
(writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach {
(structField, element) => println(s"${structField.name}: $element")
}
}
Como trabalhar com dados semiestruturados¶
Usando um DataFrame, você pode consultar e acessar dados semiestruturados (por exemplo: dados JSON). As próximas seções explicam como trabalhar com dados semiestruturados em um DataFrame.
Nota
Os exemplos nestas seções usam os dados de exemplo em Amostra de dados usados em exemplos.
Como percorrer dados semiestruturados¶
Para se referir a um campo ou elemento específico em dados semiestruturados, use os seguintes métodos do objeto Column:
Use Column.apply(«<field_name>») para retornar um objeto
Column
para um campo em um OBJECT (ou um VARIANT que contenha um OBJECT).Use Column.apply(<index>) para retornar um objeto
Column
para um elemento em uma ARRAY (ou um VARIANT que contenha uma ARRAY).
Nota
Se o nome do campo ou elementos no caminho forem irregulares e dificultarem o uso dos métodos Column.apply
, você pode usar as funções get, get_ignore_case ou get_path como alternativa.
Como mencionado em Uso do método apply para se referir a uma coluna, você pode omitir o nome do método apply
:
col("column_name")("field_name")
col("column_name")(index)
Por exemplo, o seguinte código seleciona o campo dealership
nos objetos na coluna src
dos dados de exemplo:
val df = session.table("car_sales")
df.select(col("src")("dealership")).show()
O código imprime a seguinte saída:
----------------------------
|"""SRC""['DEALERSHIP']" |
----------------------------
|"Valley View Auto Sales" |
|"Tindel Toyota" |
----------------------------
Nota
Os valores no DataFrame estão entre aspas porque esses valores são retornados como cadeias de caracteres literais. Para converter esses valores em um tipo específico, consulte Conversão explícita de valores em dados semiestruturados.
Você também pode encadear chamadas a métodos para percorrer um caminho para um campo ou elemento específico.
Por exemplo, o código a seguir seleciona o campo name
no objeto salesperson
:
val df = session.table("car_sales")
df.select(col("src")("salesperson")("name")).show()
O código imprime a seguinte saída:
------------------------------------
|"""SRC""['SALESPERSON']['NAME']" |
------------------------------------
|"Frank Beasley" |
|"Greg Northrup" |
------------------------------------
Como outro exemplo, o código a seguir seleciona o primeiro elemento do campo vehicle
, que contém uma série de veículos. O exemplo também seleciona o campo price
do primeiro elemento.
val df = session.table("car_sales")
df.select(col("src")("vehicle")(0)).show()
df.select(col("src")("vehicle")(0)("price")).show()
O código imprime a seguinte saída:
---------------------------
|"""SRC""['VEHICLE'][0]" |
---------------------------
|{ |
| "extras": [ |
| "ext warranty", |
| "paint protection" |
| ], |
| "make": "Honda", |
| "model": "Civic", |
| "price": "20275", |
| "year": "2017" |
|} |
|{ |
| "extras": [ |
| "ext warranty", |
| "rust proofing", |
| "fabric protection" |
| ], |
| "make": "Toyota", |
| "model": "Camry", |
| "price": "23500", |
| "year": "2017" |
|} |
---------------------------
------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']" |
------------------------------------
|"20275" |
|"23500" |
------------------------------------
Como alternativa ao método apply
, você pode usar as funções get, get_ignore_case ou get_path se o nome do campo ou elementos no caminho forem irregulares e dificultarem o uso dos métodos Column.apply
.
Por exemplo, as duas linhas de código a seguir imprimem o valor de um campo especificado em um objeto:
df.select(get(col("src"), lit("dealership"))).show()
df.select(col("src")("dealership")).show()
Da mesma forma, as seguintes linhas de código imprimem o valor de um campo em um caminho especificado em um objeto:
df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
df.select(col("src")("vehicle")(0)("make")).show()
Conversão explícita de valores em dados semiestruturados¶
Por padrão, os valores de campos e elementos são retornados como literais de cadeias de caracteres (incluindo as aspas duplas), como mostrado nos exemplos acima.
Para evitar resultados inesperados, chame o método de converter para converter o valor para um tipo específico. Por exemplo, o seguinte código imprime os valores sem e com conversão:
// Import the objects for the data types, including StringType.
import com.snowflake.snowpark.types._
...
val df = session.table("car_sales")
df.select(col("src")("salesperson")("id")).show()
df.select(col("src")("salesperson")("id").cast(StringType)).show()
O código imprime a seguinte saída:
----------------------------------
|"""SRC""['SALESPERSON']['ID']" |
----------------------------------
|"55" |
|"274" |
----------------------------------
---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)" |
---------------------------------------------------
|55 |
|274 |
---------------------------------------------------
Como achatar uma array de objetos em linhas¶
Se você precisar «achatar» dados semiestruturados em um DataFrame (por exemplo, produzindo uma linha para cada objeto em uma matriz), chame o método DataFrame.flatten. Este método é equivalente à função FLATTEN SQL. Se você passar um caminho para um objeto ou matriz, o método retorna um DataFrame que contém uma linha para cada campo ou elemento no objeto ou matriz.
Por exemplo, nos dados de exemplo, src:customer
é um conjunto de objetos que contém informações sobre um cliente. Cada objeto contém um campo name
e address
.
Se você passar esse caminho para a função flatten
:
val df = session.table("car_sales")
df.flatten(col("src")("customer")).show()
o método retornará um DataFrame:
----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC" |"SEQ" |"KEY" |"PATH" |"INDEX" |"VALUE" |"THIS" |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{ |1 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "San Francisco, CA", | { |
| { | | | | | "name": "Joyce Ridgely", | "address": "San Francisco, CA", |
| "address": "San Francisco, CA", | | | | | "phone": "16504378889" | "name": "Joyce Ridgely", |
| "name": "Joyce Ridgely", | | | | |} | "phone": "16504378889" |
| "phone": "16504378889" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Valley View Auto Sales", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "55", | | | | | | |
| "name": "Frank Beasley" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "paint protection" | | | | | | |
| ], | | | | | | |
| "make": "Honda", | | | | | | |
| "model": "Civic", | | | | | | |
| "price": "20275", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
|{ |2 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "New York, NY", | { |
| { | | | | | "name": "Bradley Greenbloom", | "address": "New York, NY", |
| "address": "New York, NY", | | | | | "phone": "12127593751" | "name": "Bradley Greenbloom", |
| "name": "Bradley Greenbloom", | | | | |} | "phone": "12127593751" |
| "phone": "12127593751" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Tindel Toyota", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "274", | | | | | | |
| "name": "Greg Northrup" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "rust proofing", | | | | | | |
| "fabric protection" | | | | | | |
| ], | | | | | | |
| "make": "Toyota", | | | | | | |
| "model": "Camry", | | | | | | |
| "price": "23500", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
----------------------------------------------------------------------------------------------------------------------------------------------------------
A partir desse DataFrame, você poderá selecionar os campos name
e address
de cada objeto no campo VALUE
:
df.flatten(col("src")("customer")).select(col("value")("name"), col("value")("address")).show()
-------------------------------------------------
|"""VALUE""['NAME']" |"""VALUE""['ADDRESS']" |
-------------------------------------------------
|"Joyce Ridgely" |"San Francisco, CA" |
|"Bradley Greenbloom" |"New York, NY" |
-------------------------------------------------
O código a seguir complementa o exemplo anterior ao converter os valores para um tipo específico e mudar os nomes das colunas:
df.flatten(col("src")("customer")).select(col("value")("name").cast(StringType).as("Customer Name"), col("value")("address").cast(StringType).as("Customer Address")).show()
-------------------------------------------
|"Customer Name" |"Customer Address" |
-------------------------------------------
|Joyce Ridgely |San Francisco, CA |
|Bradley Greenbloom |New York, NY |
-------------------------------------------
Como executar instruções SQL¶
Para executar uma instrução SQL que você especificar, chame o método sql
na classe Session
e passe a instrução a ser executada. O método retorna um DataFrame.
Note que a instrução SQL não será executada até que você chame um método de ação.
// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val dfStageFiles = session.sql("ls @myStage")
val files = dfStageFiles.collect()
files.foreach(println)
// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()
val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()
println("Count: " + numRows);
Se você quiser chamar métodos para transformar o DataFrame (por exemplo, filtro, seleção etc.), observe que estes métodos funcionam somente se a instrução SQL subjacente for uma instrução SELECT. Os métodos de transformação não são suportados para outros tipos de instruções SQL.
val df = session.sql("select id, category_id, name from sample_product_data where id > 10")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("category_id") < 10).select(col("id")).collect()
results.foreach(println)
// In this example, the underlying SQL statement is not a SELECT statement.
val dfStageFiles = session.sql("ls @myStage")
// Calling the filter method results in an error.
dfStageFiles.filter(...)