Como trabalhar com DataFrames no Snowpark Scala

No Snowpark, a principal forma de consulta e processamento de dados é por meio de um DataFrame. Este tópico explica como trabalhar com DataFrames.

Neste tópico:

Para recuperar e manipular dados, você usa a classe DataFrame. Um DataFrame representa um conjunto de dados relacional que é avaliado lentamente: ele só é executado quando uma ação específica é acionada. Em certo sentido, um DataFrame é como uma consulta que precisa ser avaliada para recuperar dados.

Para obter dados em um DataFrame:

  1. Construa um DataFrame especificando a fonte dos dados para o conjunto de dados.

    Por exemplo, você pode criar um DataFrame para guardar dados de uma tabela, um arquivo CSV externo ou a execução de uma instrução SQL.

  2. Especifique como o conjunto de dados no DataFrame deve ser transformado.

    Por exemplo, você pode especificar quais colunas devem ser selecionadas, como as linhas devem ser filtradas, como os resultados devem ser ordenados e agrupados, etc.

  3. Execute a instrução para obter os dados para o DataFrame.

    Para obter os dados no DataFrame, você deve invocar um método que execute uma ação (por exemplo, o método collect()).

As próximas seções explicam essas etapas com mais detalhes.

Como estabelecer os exemplos para esta seção

Alguns dos exemplos desta seção usam um DataFrame para consultar uma tabela chamada sample_product_data. Se você deseja executar esses exemplos, pode criar esta tabela e preenchê-la com alguns dados executando as seguintes instruções SQL:

CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
INSERT INTO sample_product_data VALUES
    (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
    (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
    (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
    (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
    (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
    (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
    (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
    (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
    (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
    (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
    (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
    (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);
Copy

Para verificar se a tabela foi criada, execute:

SELECT * FROM sample_product_data;
Copy

Como criar um DataFrame

Para criar um DataFrame, você pode usar métodos na classe Session. Cada um dos seguintes métodos cria um DataFrame de um tipo diferente de fonte de dados:

  • Para criar um DataFrame a partir de dados em uma tabela, exibição ou fluxo, chame o método table:

    // Create a DataFrame from the data in the "sample_product_data" table.
    val dfTable = session.table("sample_product_data")
    
    // To print out the first 10 rows, call:
    //   dfTable.show()
    
    Copy

    Nota

    O método session.table retorna um objeto Updatable. Updatable estende DataFrame e fornece métodos adicionais para trabalhar com dados na tabela (por exemplo, métodos para atualização e exclusão de dados). Consulte Atualização, eliminação e fusão de linhas em uma tabela.

  • Para criar um DataFrame a partir de uma sequência de valores, chame o método createDataFrame:

    // Create a DataFrame containing a sequence of values.
    // In the DataFrame, name the columns "i" and "s".
    val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
    
    Copy

    Nota

    As palavras reservadas pelo Snowflake não são válidas como nomes de coluna ao criar um DataFrame. Para obter uma lista de palavras reservadas, consulte Palavras-chave reservadas e limitadas.

  • Para criar um DataFrame contendo um intervalo de valores, chame o método range:

    // Create a DataFrame from a range
    val dfRange = session.range(1, 10, 2)
    
    Copy
  • Para criar um DataFrame para um arquivo em um estágio, chame read para obter um objeto DataFrameReader. No objeto DataFrameReader, chame o método correspondente ao formato dos dados no arquivo:

    // Create a DataFrame from data in a stage.
    val dfJson = session.read.json("@mystage2/data1.json")
    
    Copy
  • Para criar um DataFrame para armazenar os resultados de uma consulta SQL, chame o método sql:

    // Create a DataFrame from a SQL query
    val dfSql = session.sql("SELECT name from products")
    
    Copy

    Nota: embora você possa usar este método para executar instruções SELECT que recuperam dados de tabelas e arquivos preparados, você deve usar os métodos table e read em vez disso. Métodos como table e read pode fornecer melhor realce de sintaxe, realce de erros e conclusão de código inteligente em ferramentas de desenvolvimento.

Especificação de como o conjunto de dados deve ser transformado

Para especificar quais colunas devem ser selecionadas e como os resultados devem ser filtrados, ordenados, agrupados, etc., chame os métodos de DataFrame que transformam o conjunto de dados. Para identificar colunas nestes métodos, use a função col ou uma expressão que seja avaliada como coluna. (Consulte Como especificar colunas e expressões.)

Por exemplo:

  • Para especificar quais linhas devem ser retornadas, chame o método filter:

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame for the rows with the ID 1
    // in the "sample_product_data" table.
    //
    // This example uses the === operator of the Column object to perform an
    // equality check.
    val df = session.table("sample_product_data").filter(col("id") === 1)
    df.show()
    
    Copy
  • Para especificar as colunas que devem ser selecionadas, chame o método select:

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame that contains the id, name, and serial_number
    // columns in te "sample_product_data" table.
    val df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number"))
    df.show()
    
    Copy

Cada método retorna um novo objeto DataFrame que foi transformado. (O método não afeta o objeto original DataFrame.) Isso significa que se você deseja aplicar várias transformações, poderá chamadas de método em cadeia, chamando cada método de transformação subsequente no novo objeto DataFrame retornado pela chamada de método anterior.

Observe que esses métodos de transformação não recuperam dados do banco de dados Snowflake. (Os métodos de ação descritos em Como executar uma ação para avaliar um DataFrame fazem a recuperação de dados.) Os métodos de transformação simplesmente especificam como a instrução SQL deve ser construída.

Como especificar colunas e expressões

Ao chamar esses métodos de transformação, pode ser necessário especificar colunas ou expressões que usam colunas. Por exemplo, ao chamar o método select, você precisa especificar as colunas a serem selecionadas.

Para se referir a uma coluna, crie um objeto Coluna chamando a função col no objeto com.snowflake.snowpark.functions.

// Import the col function from the functions object.
import com.snowflake.snowpark.functions._

val dfProductInfo = session.table("sample_product_data").select(col("id"), col("name"))
dfProductInfo.show()
Copy

Nota

Para criar um objeto Column para um literal, consulte Como usar literais como objetos de coluna.

Ao especificar um filtro, projeção, condição de junção etc., você pode usar objetos Column em uma expressão. Por exemplo:

  • Você pode usar objetos Column com o método filter para especificar uma condição de filtro:

    // Specify the equivalent of "WHERE id = 20"
    // in an SQL SELECT statement.
    df.filter(col("id") === 20)
    
    Copy
    // Specify the equivalent of "WHERE a + b < 10"
    // in an SQL SELECT statement.
    df.filter((col("a") + col("b")) < 10)
    
    Copy
  • Você pode usar objetos Column com o método select para definir um alias:

    // Specify the equivalent of "SELECT b * 10 AS c"
    // in an SQL SELECT statement.
    df.select((col("b") * 10) as "c")
    
    Copy
  • Você pode usar objetos Column com o método join para definir uma condição de junção:

    // Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y"
    // in an SQL SELECT statement.
    dfX.join(dfY, col("a_in_X") === col("b_in_Y"))
    
    Copy

Como se referir a colunas em diferentes DataFrames

Ao se referir a colunas em dois objetos DataFrame diferentes que têm o mesmo nome (por exemplo, ao unir os DataFrames naquela coluna), você pode usar o método DataFrame.col em um objeto DataFrame para se referir a uma coluna naquele objeto (por exemplo, df1.col("name") e df2.col("name")).

O exemplo a seguir demonstra como usar o DataFrame.col método para se referir a uma coluna em um determinado DataFrame. O exemplo une dois objetos DataFrame que têm uma coluna chamada key. O exemplo usa o métodos Column.as para alterar os nomes das colunas no DataFrame recém-criado.

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))
Copy

Uso do método apply para se referir a uma coluna

Como alternativa ao método DataFrame.col, você pode usar o método DataFrame.apply para se referir a uma coluna em um determinado DataFrame. Como o método DataFrame.col, o método DataFrame.apply aceita um nome de coluna como entrada e retorna um objeto Column.

Observe que quando um objeto tem um método apply no Scala, você pode chamar o método apply chamando o objeto como se fosse uma função. Por exemplo, para chamar df.apply("column_name"), você pode simplesmente escrever df("column_name"). As seguintes chamadas são equivalentes:

  • df.col("<nome_da_coluna>")

  • df.apply("<nome_da_coluna>")

  • df("<nome_da_coluna>")

O exemplo seguinte é o mesmo que o exemplo anterior, mas usa o método DataFrame.apply para se referir às colunas em uma operação de junção:

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))
Copy

Usando abreviações para um objeto de coluna

Como alternativa ao uso da função col, você pode se referir a uma coluna de uma destas maneiras:

  • Usando um cifrão em frente ao nome da coluna cotada ($"column_name").

  • Usando um apóstrofo (aspas simples) em frente ao nome da coluna não citada ('column_name).

Para fazer isso, importe os nomes do objeto implicits depois de criar um objeto Session:

val session = Session.builder.configFile("/path/to/properties").create

// Import this after you create the session.
import session.implicits._

// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)

// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)
Copy

Como utilizar aspas duplas em torno de identificadores de objetos (nomes de tabela, nomes de coluna, etc.)

Os nomes de bancos de dados, esquemas, tabelas e estágios especificados por você devem estar de acordo com os requisitos de identificadores do Snowflake. Quando você especifica um nome, o Snowflake considera que o nome está em letras maiúsculas. Por exemplo, as seguintes chamadas são equivalentes:

// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))
Copy

Se o nome não estiver em conformidade com os requisitos do identificador, você deve usar aspas duplas (") em volta do nome. Use uma barra invertida (\) para escapar do caractere de aspas duplas em uma cadeia de caracteres literal do Scala. Por exemplo, o nome da tabela a seguir não começa com uma letra ou sublinhado, então você deve usar aspas duplas ao redor do nome:

val df = session.table("\"10tablename\"")
Copy

Observe que ao especificar o nome de uma coluna, você não precisa usar aspas duplas no nome. A biblioteca Snowpark coloca automaticamente o nome da coluna entre aspas duplas para você se o nome não estiver em conformidade com os requisitos do identificador:.

// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\""))

// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))
Copy

Se você já tiver acrescentado aspas duplas em torno de um nome de coluna, a biblioteca não inserirá aspas duplas adicionais em torno do nome.

Em alguns casos, o nome da coluna pode conter caracteres de aspas duplas:

describe table quoted;
+------------------------+ ...
| name                   | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted"   | ...
+------------------------+ ...
Copy

Como explicado em Requisitos para identificadores, para cada caractere de aspas duplas dentro de um identificador de aspas duplas, deve-se usar dois caracteres de aspas duplas (por exemplo, "name_with_""air""_quotes" e """column_name_quoted"""):

val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()
Copy

Tenha em mente que quando um identificador é incluído entre aspas duplas (se você adicionou explicitamente as aspas ou se a biblioteca adicionou as aspas para você), o Snowflake trata o identificador diferenciando maiúsculas de minúsculas:

// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))
Copy

Como usar literais como objetos de coluna

Para usar um literal em um método que passa em um objeto Column, crie um objeto Column para o literal passando o literal para a função lit no objeto com.snowflake.snowpark.functions. Por exemplo:

// Import for the lit and col functions.
import com.snowflake.snowpark.functions._

// Show the first 10 rows in which num_items is greater than 5.
// Use `lit(5)` to create a Column object for the literal 5.
df.filter(col("num_items").gt(lit(5))).show()
Copy

Se o literal é um valor de ponto flutuante ou duplo no Scala (por exemplo, 0.05 é tratado como Double por padrão), a biblioteca do Snowpark gera SQL que implicitamente converte o valor para o tipo de dados correspondente do Snowpark (por exemplo, 0.05::DOUBLE). Isso pode produzir um valor aproximado que difere do número exato especificado.

Por exemplo, o código a seguir não exibe linhas correspondentes, mesmo que o filtro (que corresponde a valores maiores ou iguais a 0.05) deva corresponder às linhas do DataFrame:

// Create a DataFrame that contains the value 0.05.
val df = session.sql("select 0.05 :: Numeric(5, 2) as a")

// Applying this filter results in no matching rows in the DataFrame.
df.filter(col("a") <= lit(0.06) - lit(0.01)).show()
Copy

O problema é que lit(0.06) e lit(0.01) produzem valores aproximados para 0.06 e 0.01, não os valores exatos.

Para evitar esse problema, você pode usar uma das seguintes abordagens:

  • Opção 1: converter o literal para o tipo Snowpark que você deseja usar. Por exemplo, para usar um NUMBER com uma precisão de 5 e uma escala de 2:

    df.filter(col("a") <= lit(0.06).cast(new DecimalType(5, 2)) - lit(0.01).cast(new DecimalType(5, 2))).show()
    
    Copy
  • Opção 2: converter o valor para o tipo que você deseja usar antes de passar o valor para a função lit. Por exemplo, se você quiser usar o BigDecimal tipo:

    df.filter(col("a") <= lit(BigDecimal(0.06)) - lit(BigDecimal(0.01))).show()
    
    Copy

Como converter um objeto de coluna para um tipo específico

Para converter um objeto Column em um tipo específico, chame o método Column.cast e passe em um objeto de tipo do pacote com.snowflake.snowpark.types. Por exemplo, para converter um literal como um NUMBER com uma precisão de 5 e uma escala de 2:

// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._

val decimalValue = lit(0.05).cast(new DecimalType(5,2))
Copy

Como encadear chamadas a métodos

Como cada método que transforma um objeto DataFrame returna um novo objeto DataFrame que tem a transformação aplicada, você pode encadear chamadas a métodos para produzir um novo DataFrame que é transformado de maneiras adicionais.

O exemplo a seguir retorna um DataFrame que está configurado para:

  • Consultar a tabela sample_product_data.

  • Retornar a linha com id = 1.

  • Selecionar as colunas name e serial_number.

val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
Copy

Neste exemplo:

  • session.table("sample_product_data") retorna um DataFrame para a tabela sample_product_data.

    Embora o DataFrame ainda não contenha os dados da tabela, o objeto contém as definições das colunas da tabela.

  • filter(col("id") === 1) retorna um DataFrame para a tabela sample_product_data que está preparada para retornar a linha com id = 1.

    Observe novamente que o DataFrame ainda não contém a linha correspondente da tabela. A linha correspondente não é recuperada até que você chame um método de ação.

  • select(col("name"), col("serial_number")) retorna um DataFrame que contém as colunas name e serial_number para a linha da tabela sample_product_data que tem id = 1.

Ao encadear chamadas de método, lembre-se de que a ordem das chamadas é importante. Cada chamada de método retorna um DataFrame que foi transformado. Certifique-se de que as chamadas subsequentes funcionem com a DataFrame transformado.

Por exemplo, no código abaixo, o método select retorna um DataFrame que contém apenas duas colunas: name e serial_number. A chamada ao método filter neste DataFrame falha porque usa a coluna id, que não está no DataFrame transformado.

// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") === 1)
Copy

Em contraste, o seguinte código é executado com sucesso porque o método filter() é chamado em um DataFrame que contém todas as colunas da tabela sample_product_data (incluindo a coluna id):

// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
Copy

Tenha em mente que talvez seja necessário fazer as chamadas ao método select e filter em uma ordem diferente daquela em que você usaria as palavras-chave equivalentes (SELECT e WHERE) em uma instrução SQL.

Como limitar o número de linhas em um DataFrame

Para limitar o número de linhas em um DataFrame, você pode usar o método de transformação DataFrame.limit.

A API do Snowpark também fornece métodos de ação para recuperar e imprimir um número limitado de linhas:

  • o método de ação DataFrame.first (para executar a consulta e retornar as primeiras n linhas)

  • o método de ação DataFrame.show (para executar a consulta e imprimir as primeiras n linhas)

Esses métodos efetivamente adicionam uma cláusula LIMIT à instrução SQL que é executada.

Como explicado nas notas de uso para LIMIT, os resultados não são determinísticos, a menos que você especifique uma ordem de classificação (ORDER BY) em conjunto com LIMIT.

Para manter a cláusula ORDER BY com a cláusula LIMIT (por exemplo, para que ORDER BY não esteja em uma subconsulta separada), você deve chamar o método que limita os resultados do DataFrame retornado pelo método sort.

Por exemplo, se você estiver encadeando chamadas a métodos:

// Limit the number of rows to 5, sorted by parent_id.
var dfSubset = df.sort(col("parent_id")).limit(5);

// Return the first 5 rows, sorted by parent_id.
var arrayOfRows = df.sort(col("parent_id")).first(5)

// Print the first 5 rows, sorted by parent_id.
df.sort(col("parent_id")).show(5)
Copy

Como obter definições das colunas

Para obter a definição das colunas no conjunto de dados para o DataFrame, chame o método schema. Esse método retorna um objeto StructType que contém uma Array de objetos StructField. Cada objeto StructField contém a definição de uma coluna.

// Get the StructType object that describes the columns in the
// underlying rowset.
val tableSchema = session.table("sample_product_data").schema
println("Schema for sample_product_data: " + tableSchema);
Copy

No objeto StructType retornado, os nomes das colunas são sempre normalizados. Os identificadores sem aspas são retornados em letras maiúsculas e os identificadores entre aspas são retornados no formato de letra conforme foram definidos.

O exemplo a seguir cria um DataFrame contendo as colunas chamadas ID e 3rd. Para o nome de coluna 3rd, a biblioteca do Snowpark coloca o nome automaticamente entre aspas duplas ("3rd") porque o nome não cumpre os requisitos para um identificador.

O exemplo chama o método schema e, em seguida, chama o método names no objeto retornado StructType para obter um ArraySeq de nomes de coluna. Os nomes são normalizados no StructType devolvido pelo método schema.

// Create a DataFrame containing the "id" and "3rd" columns.
val dfSelectedColumns = session.table("sample_product_data").select(col("id"), col("3rd"))
// Print out the names of the columns in the schema. This prints out:
//   ArraySeq(ID, "3rd")
println(dfSelectedColumns.schema.names.toSeq)
Copy

Junção de DataFrames

Para juntar objetos DataFrame, chame o método DataFrame.join.

As seções a seguir explicam como usar DataFrames para realizar uma junção:

Configuração dos dados de exemplo para as junções

Os exemplos nas próximas seções utilizam dados de exemplo que você pode configurar executando as seguintes instruções SQL:

create or replace table sample_a (
  id_a integer,
  name_a varchar,
  value integer
);
insert into sample_a (id_a, name_a, value) values
  (10, 'A1', 5),
  (40, 'A2', 10),
  (80, 'A3', 15),
  (90, 'A4', 20)
;
create or replace table sample_b (
  id_b integer,
  name_b varchar,
  id_a integer,
  value integer
);
insert into sample_b (id_b, name_b, id_a, value) values
  (4000, 'B1', 40, 10),
  (4001, 'B2', 10, 5),
  (9000, 'B3', 80, 15),
  (9099, 'B4', null, 200)
;
create or replace table sample_c (
  id_c integer,
  name_c varchar,
  id_a integer,
  id_b integer
);
insert into sample_c (id_c, name_c, id_a, id_b) values
  (1012, 'C1', 10, null),
  (1040, 'C2', 40, 4000),
  (1041, 'C3', 40, 4001)
;
Copy

Como especificar as colunas para a junção

Com o método DataFrame.join, você pode especificar as colunas a serem usadas de uma das seguintes maneiras:

  • Especifique uma expressão de coluna que descreva a condição de junção.

  • Especifique uma ou mais colunas que devem ser usadas como colunas comuns na junção.

O exemplo a seguir realiza uma junção interna na coluna chamada id_a:

// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
Copy

Observe que o exemplo usa o método DataFrame.col para especificar a condição a ser usada para a junção. Consulte Como especificar colunas e expressões para saber mais sobre este método.

Isso imprime a seguinte saída:

----------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |
----------------------------------------------------------------------
|10      |A1        |5        |4001    |B2        |10      |5        |
|40      |A2        |10       |4000    |B1        |40      |10       |
|80      |A3        |15       |9000    |B3        |80      |15       |
----------------------------------------------------------------------
Copy
Nomes de colunas idênticos duplicados no resultado da junção

No DataFrame resultante de uma junção, a biblioteca Snowpark utiliza os nomes das colunas encontradas nas tabelas que foram unidas, mesmo quando os nomes das colunas são idênticos entre tabelas. Quando isto acontece, estes nomes de coluna são duplicados no DataFrame resultante da junção. Para acessar uma coluna duplicada pelo nome, chame o método col no DataFrame que representa a tabela original da coluna. (Para obter mais informações sobre como especificar colunas, consulte Como se referir a colunas em diferentes DataFrames.)

O código no exemplo a seguir une dois DataFrames, depois chama o método select no DataFrame que foi unido. Ele especifica as colunas a serem selecionadas chamando o método col da variável que representa os respectivos objetos DataFrame: dfRhs e dfLhs. Ele usa o método as para dar às colunas novos nomes no DataFrame que o método select cria.

val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
val dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"))
dfSelected.show()
Copy

Isso imprime a seguinte saída:

------------------------------
|"LEFTVALUE"  |"RIGHTVALUE"  |
------------------------------
|5            |5             |
|10           |10            |
|15           |15            |
------------------------------
Copy
Eliminação de duplicação de colunas antes de salvar ou armazenar

Observe que quando um DataFrame resultante de uma junção inclui nomes de colunas duplicadas, você deve eliminar a duplicação ou renomear colunas para remover a duplicação no DataFrame antes de salvar o resultado em uma tabela ou armazenar o DataFrame em cache. Para nomes de colunas duplicadas em um DataFrame que você salva em uma tabela ou cache, a biblioteca do Snowpark substituirá os nomes de colunas duplicadas por alias para que não sejam mais duplicadas.

O exemplo seguinte ilustra como a saída de um DataFrame em cache poderia aparecer se os nomes das colunas ID_A e VALUE fossem duplicados em uma junção de duas tabelas, e depois não fossem deduplicados ou renomeados antes de armazenar o resultado em cache.

--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A"  |"NAME_A"  |"l_ZSz7_VALUE"  |"ID_B"  |"NAME_B"  |"r_heec_ID_A"  |"r_heec_VALUE"  |
--------------------------------------------------------------------------------------------------
|10             |A1        |5               |4001    |B2        |10             |5               |
|40             |A2        |10              |4000    |B1        |40             |10              |
|80             |A3        |15              |9000    |B3        |80             |15              |
--------------------------------------------------------------------------------------------------
Copy

Como realizar uma junção natural

Para realizar uma junção natural (onde DataFrames são unidas em colunas que têm o mesmo nome), chame o método DataFrame.naturalJoin.

O exemplo a seguir junta os DataFrames para as tabelas sample_a e sample_b nas colunas comuns (a coluna id_a):

val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
Copy

Isso imprime a seguinte saída:

---------------------------------------------------
|"ID_A"  |"VALUE"  |"NAME_A"  |"ID_B"  |"NAME_B"  |
---------------------------------------------------
|10      |5        |A1        |4001    |B2        |
|40      |10       |A2        |4000    |B1        |
|80      |15       |A3        |9000    |B3        |
---------------------------------------------------
Copy

Como especificar o tipo de junção

Por padrão, o método DataFrame.join cria uma junção interna. Para especificar um tipo diferente de junção, defina o argumento joinType como um dos seguintes valores:

Tipo de junção

joinType

Junção interna

inner (padrão)

Junção externa esquerda

left

Junção externa direita

right

Junção externa completa

full

Junção cruzada

cross

Por exemplo:

// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"), "left")
dfLeftOuterJoin.show()
Copy

Isso imprime a seguinte saída:

----------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |
----------------------------------------------------------------------
|40      |A2        |10       |4000    |B1        |40      |10       |
|10      |A1        |5        |4001    |B2        |10      |5        |
|80      |A3        |15       |9000    |B3        |80      |15       |
|90      |A4        |20       |NULL    |NULL      |NULL    |NULL     |
----------------------------------------------------------------------
Copy

Junção de tabelas múltiplas

Para juntar tabelas múltiplas:

  1. Crie um DataFrame para cada tabela.

  2. Chame o método DataFrame.join no primeiro DataFrame passando o segundo DataFrame.

  3. Usando o DataFrame retornado pelo método join, chame o método join, passando o terceiro DataFrame.

Você pode encadear as chamadas join como mostrado abaixo:

val dfFirst = session.table("sample_a")
val dfSecond  = session.table("sample_b")
val dfThird = session.table("sample_c")
val dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a") === dfSecond.col("id_a")).join(dfThird, dfFirst.col("id_a") === dfThird.col("id_a"))
dfJoinThreeTables.show()
Copy

Isso imprime a seguinte saída:

------------------------------------------------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |"ID_C"  |"NAME_C"  |"ID_A"  |"ID_B"  |
------------------------------------------------------------------------------------------------------------
|10      |A1        |5        |4001    |B2        |10      |5        |1012    |C1        |10      |NULL    |
|40      |A2        |10       |4000    |B1        |40      |10       |1040    |C2        |40      |4000    |
|40      |A2        |10       |4000    |B1        |40      |10       |1041    |C3        |40      |4001    |
------------------------------------------------------------------------------------------------------------
Copy

Como realizar uma autojunção

Se você precisar unir uma tabela com ela mesma em diferentes colunas, não poderá realizar a autojunção com um único DataFrame. Os exemplos a seguir usam um único DataFrame para executar uma junção automática, que falha pois as expressões de coluna para "id" estão presentes nos lados esquerdo e direito da junção:

// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, col("id") === col("parent_id"))
Copy
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, df("id") === df("parent_id"))
Copy

Ambos os exemplos falham, com a seguinte exceção:

Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
  Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
  Instead, join this DataFrame to a clone() of itself.
Copy

Em vez disso, use o método DataFrame.clone para criar um clone do objeto DataFrame e use os dois objetos DataFrame para realizar a junção:

// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
val dfLhs = session.table("sample_product_data")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()

// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))
dfJoined.show()
Copy

Se você quiser realizar uma autojunção na mesma coluna, chame o método join que passa um Seq de expressões de coluna para a cláusula USING:

// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
val df = session.table("sample_product_data");
val dfJoined = df.join(df, Seq("key"))
Copy

Como executar uma ação para avaliar um DataFrame

Como mencionado anteriormente, o DataFrame é avaliado lentamente, o que significa que a instrução SQL não é enviada ao servidor para execução até que você execute uma ação. Uma ação faz com que o DataFrame seja avaliado e envia a instrução SQL correspondente para o servidor para execução.

As seções a seguir explicam como executar uma ação de forma síncrona e assíncrona em um DataFrame:

Como executar uma ação de forma síncrona

Para executar uma ação de forma síncrona, chame um dos seguintes métodos de ação:

Método para executar uma ação de forma síncrona

Descrição

DataFrame.collect

Avalia o DataFrame e retorna o conjunto de dados resultante como uma Array de objetos Row. Consulte Como retornar todas as linhas.

DataFrame.toLocalIterator

Avalia o DataFrame e devolve um Iterator de objetos Row. Se o conjunto de resultados for grande, use esse método para evitar carregar todos os resultados na memória de uma só vez. Consulte Como retornar um iterador para as linhas.

DataFrame.count

Avalia o DataFrame e retorna o número de linhas.

DataFrame.show

Avalia o DataFrame e imprime as linhas no console. Observe que este método limita o número de linhas a 10 (por padrão). Consulte Como imprimir as linhas em um DataFrame.

DataFrame.cacheResult

Executa a consulta, cria uma tabela temporária e coloca os resultados na tabela. O método retorna um objeto HasCachedResult que você pode usar para acessar os dados nesta tabela temporária. Consulte Como criar um cache do DataFrame.

DataFrame.write.saveAsTable

Salva os dados no DataFrame para a tabela especificada. Consulte Como salvar dados em uma tabela.

DataFrame.write.(csv |json| parquet)

Salva um DataFrame para um arquivo especificado em um estágio. Consulte Como salvar um DataFrame para arquivos em um estágio.

DataFrame.read.fileformat.copyInto('tableName')

Copia os dados no DataFrame para a tabela especificada. Consulte Como copiar dados de arquivos para uma tabela.

Session.table('tableName').delete

Exclui linhas na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela.

Session.table('tableName').update

Atualiza linhas na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela.

Session.table('tableName').merge.methods.collect

Mescla as linhas na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela.

Para executar a consulta e retornar o número de resultados, chame o método count:

// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")

// Send the query to the server for execution and
// print the count of rows in the table.
println("Rows returned: " + dfProducts.count())
Copy

Você também pode chamar métodos de ação para:

Nota: Se você estiver chamando o método schema para obter as definições das colunas no DataFrame, você não precisa chamar um método de ação.

Como executar uma ação de forma assíncrona

Nota

Esse recurso foi introduzido no Snowpark 0.11.0.

Para executar uma ação de forma assíncrona, chame o método async para retornar um objeto “ator assíncrono” (por exemplo, DataFrameAsyncActor) e chamar um método de ação assíncrona nesse objeto.

Esses métodos de ação de um objeto de ator assíncrono retornam um objeto TypedAsyncJob, que você pode usar para verificar o status da ação assíncrona e obter os resultados da ação.

As próximas seções explicam como executar as ações de forma assíncrona e verificar os resultados.

Explicação do fluxo básico de ações assíncronas

Você pode usar os seguintes métodos para executar uma ação de forma assíncrona:

Método para realizar uma ação de forma assíncrona

Descrição

DataFrame.async.collect

Avalia o DataFrame de forma assíncrona para obter o conjunto de dados resultante como um Array de objetos Row. Consulte Como retornar todas as linhas.

DataFrame.async.toLocalIterator

Avalia de forma assíncrona o DataFrame para recuperar um Iterator de objetos Row. Se o conjunto de resultados for grande, use este método para evitar carregar todos os resultados na memória de uma só vez. Consulte Como retornar um iterador para as linhas.

DataFrame.async.count

Avalia o DataFrame de forma assíncrona para obter o número de linhas.

DataFrame.write.async.saveAsTable

Salva de forma assíncrona os dados no DataFrame para a tabela especificada. Consulte Como salvar dados em uma tabela.

DataFrame.write.async.(csv |json| parquet)

Salva um DataFrame para um arquivo especificado em um estágio. Consulte Como salvar um DataFrame para arquivos em um estágio.

DataFrame.read.fileformat.async.copyInto('tableName')

Copia de forma assíncrona os dados no DataFrame para a tabela especificada. Consulte Como copiar dados de arquivos para uma tabela.

Session.table('tableName').async.delete

Exclui linhas de forma assíncrona na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela.

Session.table('tableName').async.update

Atualiza linhas de forma assíncrona na tabela especificada. Consulte Atualização, eliminação e fusão de linhas em uma tabela.

Session.table('tableName').merge.methods.async.collect

Mescla assincronamente as linhas na tabela especificada. Compatível com a versão 1.3.0 ou posterior. Consulte Atualização, eliminação e fusão de linhas em uma tabela.

A partir do objeto TypedAsyncJob retornado, você pode fazer o seguinte:

  • Para determinar se a ação foi concluída, chame o método isDone.

  • Para obter a ID da consulta que corresponde à ação, chame o método getQueryId.

  • Para retornar os resultados da ação (por exemplo, o Array de objetos Row para o método collect ou a contagem de linhas para o método count), chame o método getResult.

    Note que getResult é uma chamada com bloqueio.

  • Para cancelar a ação, chame o método cancel.

Por exemplo, para executar uma consulta de forma assíncrona e obter os resultados como uma Array de objetos Row, chame DataFrame.async.collect:

// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
val df = session.table("sample_product_data").select(col("id"), col("name"))

// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.collect()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
val results = asyncJob.getResult()
results.foreach(println)
Copy

Para executar a consulta de forma assíncrona e obter o número de resultados, chame DataFrame.async.count:

// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")

// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.count()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Print the count of rows in the table.
// Note that getResult is a blocking call.
println("Rows returned: " + asyncJob.getResult())
Copy

Como especificar o número máximo de segundos a esperar

Ao chamar o método getResult, você pode usar o argumento maxWaitTimeInSeconds para especificar o número máximo de segundos para esperar que a consulta seja concluída antes de tentar obter os resultados. Por exemplo:

// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
val results = asyncJob.getResult(10)
Copy

Se você omitir esse argumento, o método aguardará o número máximo de segundos especificado pela propriedade de configuração snowpark_request_timeout_in_seconds. (Esta é uma propriedade que você pode definir ao criar o objeto Session.)

Como acessar uma consulta assíncrona por ID

Se você tiver a ID de uma consulta assíncrona que você enviou anteriormente, você pode chamar o método Session.createAsyncJob para criar um objeto AsyncJob que você pode usar para verificar o status da consulta, obter os resultados da consulta ou cancelar a consulta.

Note que ao contrário de TypedAsyncJob, AsyncJob não fornece um método getResult para obter os resultados. Se você precisar recuperar os resultados, chame o getRows ou getIterator em vez disso.

Por exemplo:

val asyncJob = session.createAsyncJob(myQueryId)
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
val rows = asyncJob.getRows()
rows.foreach(println)
Copy

Como obter linhas em um DataFrame

Depois de você especificar como o DataFrame deve ser transformado, você pode chamar um método de ação para executar uma consulta e retornar os resultados. Você pode retornar todas as linhas em um Array, ou você pode retornar um Iterator que permite iterar os resultados, linha por linha. No último caso, se a quantidade de dados for grande, as linhas são carregadas na memória por parte para evitar o carregamento de uma grande quantidade de dados na memória.

Como retornar todas as linhas

Para retornar todas as linhas de uma vez, chame o método DataFrame.collect. Este método retorna uma matriz de objetos Row. Para recuperar os valores da linha, chame o método getType (por exemplo getString, getInt etc).

Por exemplo:

import com.snowflake.snowpark.functions_

val rows = session.table("sample_product_data").select(col("name"), col("category_id")).sort(col("name")).collect()
for (row <- rows) {
  println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
Copy

Como retornar um iterador para as linhas

Se você quiser usar um Iterator para iterar sobre objetos Row nos resultados, chame DataFrame.toLocalIterator. Se a quantidade de dados nos resultados for grande, o método carrega as linhas por parte para evitar carregar todas as linhas na memória de uma vez.

Por exemplo:

import com.snowflake.snowpark.functions_

while (rowIterator.hasNext) {
  val row = rowIterator.next()
  println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
Copy

Como retornar as primeiras n linhas

Para retornar as primeiras n linhas, chame o método DataFrame.first, passando o número de linhas a retornar.

Conforme explicado em Como limitar o número de linhas em um DataFrame, os resultados são não determinísticos. Se você deseja que os resultados sejam determinísticos, chame este método em um DataFrame (df.sort().first()).

Por exemplo:

import com.snowflake.snowpark.functions_

val df = session.table("sample_product_data")
val rows = df.sort(col("name")).first(5)
rows.foreach(println)
Copy

Como imprimir as linhas em um DataFrame

Para imprimir as primeiras 10 linhas no DataFrame para o console, chame o método DataFrame.show. Para imprimir um número diferente de linhas, passe o número de linhas a imprimir.

Conforme explicado em Como limitar o número de linhas em um DataFrame, os resultados são não determinísticos. Se você deseja que os resultados sejam determinísticos, chame este método em um DataFrame (df.sort().show()).

Por exemplo:

import com.snowflake.snowpark.functions_

val df = session.table("sample_product_data")
df.sort(col("name")).show()
Copy

Atualização, eliminação e fusão de linhas em uma tabela

Nota

Esse recurso foi introduzido no Snowpark 0.7.0.

Quando você chama Session.table para criar um objeto DataFrame para uma tabela, o método retorna um objeto Updatable, que estende o DataFrame com métodos adicionais para atualização e exclusão de dados na tabela. (Consulte Updatable.)

Se você precisar atualizar ou excluir linhas em uma tabela, você pode usar os seguintes métodos da classe Updatable:

Atualização de linhas em uma tabela

Para o método update, passe um Map que associe as colunas a serem atualizadas e os valores correspondentes a serem atribuídos a essas colunas. update retorna um objeto UpdateResult, que contém o número de linhas que foram atualizadas. (Consulte UpdateResult.)

Nota

update é um método de ação, o que significa que chamar o método envia instruções SQL ao servidor para execução.

Por exemplo, para substituir os valores na coluna chamada count pelo valor 1:

val updatableDf = session.table("sample_product_data")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")
Copy

O exemplo acima usa o nome da coluna para identificar a coluna. Você também pode usar uma expressão de coluna:

val updateResult = updatableDf.update(Map(col("count") -> lit(1)))
Copy

Se a atualização deve ser feita apenas quando uma condição for atendida, você pode especificar essa condição como um argumento. Por exemplo, para substituir os valores na coluna chamada count para linhas em que a coluna category_id tem o valor 20:

val updateResult = updatableDf.update(Map(col("count") -> lit(1)), col("category_id") === 20)
Copy

Se você precisar basear a condição em uma junção com um objeto DataFrame, você pode passar isso DataFrame como um argumento e usar isso DataFrame na condição. Por exemplo, para substituir os valores na coluna chamada count para linhas em que a coluna category_id corresponde a category_id no DataFrame dfParts:

val updatableDf = session.table("sample_product_data")
val dfParts = session.table("parts")
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), updatableDf("category_id") === dfParts("category_id"), dfParts)
Copy

Como excluir linhas de uma tabela

Para o método delete, você pode especificar uma condição que identifica as linhas a serem excluídas, e pode basear essa condição em uma junção com outro DataFrame. delete retorna um objeto DeleteResult, contendo o número de linhas que foram excluídas. (Consulte DeleteResult.)

Nota

delete é um método de ação, o que significa que chamar o método envia instruções SQL ao servidor para execução.

Por exemplo, para apagar as linhas que têm o valor 1 na coluna category_id:

val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === 1)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
Copy

Se a condição se referir a colunas em um DataFrame diferente, passe esse DataFrame como o segundo argumento. Por exemplo, para excluir as linhas nas quais a coluna category_id corresponde ao category_id no DataFrame dfParts, passe dfParts como o segundo argumento:

val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === dfParts("category_id"), dfParts)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
Copy

Fusão de linhas em uma tabela

Para inserir, atualizar e excluir linhas em uma tabela com base nos valores de uma segunda tabela ou subconsulta (o equivalente ao comando MERGE em SQL), faça o seguinte:

  1. No objeto Updatable para a tabela onde você quer fundir os dados, chame o método merge, passando o objeto DataFrame para a outra tabela e a expressão da coluna para a condição de junção.

    Isso retorna um objeto MergeBuilder que você pode usar para especificar as ações a serem tomadas (por exemplo, inserir, atualizar ou excluir) nas linhas que correspondem e nas que não correspondem aos critérios. (Consulte MergeBuilder.)

  2. Usando o objeto MergeBuilder:

    • Para especificar a atualização ou exclusão que deve ser feita nas linhas correspondentes, chame o método whenMatched.

      Se você precisar especificar uma condição adicional quando as linhas devem ser atualizadas ou excluídas, você pode passar uma expressão de coluna para essa condição.

      Esse método retorna um objeto MatchedClauseBuilder que você pode usar para especificar a ação a ser executada. (Consulte MatchedClauseBuilder.)

      Chame o método update ou delete no objeto MatchedClauseBuilder para especificar a ação de atualização ou exclusão que deve ser executada nas linhas correspondentes. Esses métodos retornam um objeto MergeBuilder que você pode usar para especificar cláusulas adicionais.

    • Para especificar a inserção que deve ser feita quando as linhas não correspondem, chame o método whenNotMatched.

      Se você precisar especificar uma condição adicional quando as linhas devem ser inseridas, você pode passar uma expressão de coluna para essa condição.

      Esse método retorna um objeto NotMatchedClauseBuilder que você pode usar para especificar a ação a ser executada. (Consulte NotMatchedClauseBuilder.)

      Chame o método insert no objeto NotMatchedClauseBuilder para especificar a ação de inserção que deve ser executada quando as linhas não correspondem. Esses métodos devolvem um objeto MergeBuilder que você pode usar para especificar cláusulas adicionais.

  3. Quando terminar de especificar as inserções, atualizações e exclusões que devem ser realizadas, chame o método collect do objeto MergeBuilder para realizar as inserções, atualizações e exclusões especificadas na tabela.

    collect retorna um objeto MergeResult contendo o número de linhas que foram inseridas, atualizadas e excluídas. (Consulte MergeResult.)

O exemplo a seguir insere uma linha com as colunas id e value da tabela source na tabela target se a tabela target não contiver uma linha com uma ID correspondente:

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenNotMatched.insert(Seq(source("id"), source("value")))
                      .collect()
Copy

O exemplo a seguir atualiza uma linha na tabela target com o valor da coluna value da linha na tabela source que tem a mesma ID:

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenMatched.update(Map("value" -> source("value")))
                      .collect()
Copy

Como salvar dados em uma tabela

Você pode salvar o conteúdo de um DataFrame para uma tabela nova ou existente. Para fazer isso, você deve ter os seguintes privilégios:

  • Privilégios CREATE TABLE sobre o esquema, se a tabela não existir.

  • Privilégios INSERT sobre a tabela.

Para salvar o conteúdo de um DataFrame em uma tabela:

  1. Chame o método DataFrame.write para obter um objeto DataFrameWriter.

  2. Chame o método DataFrameWriter.mode, passando um objeto SaveMode que especifica suas preferências para escrever na tabela:

    • Para inserir linhas, passe SaveMode.Append.

    • Para substituir a tabela existente, passe SaveMode.Overwrite.

    Esse método retorna o mesmo objeto DataFrameWriter configurado com o modo especificado.

  3. Se você estiver inserindo linhas em uma tabela existente (SaveMode.Append) e os nomes das colunas no DataFrame corresponderem aos nomes das colunas na tabela, chame o método DataFrameWriter.option passando "columnOrder" e "name" como argumentos.

    Nota

    Esse método foi introduzido no Snowpark 1.4.0.

    Por padrão, a opção columnOrder é definida como "index", o que significa que o DataFrameWriter insere os valores na ordem em que as colunas aparecem. Por exemplo, DataFrameWriter insere o valor da primeira coluna do DataFrame na primeira coluna da tabela, a segunda coluna do DataFrame na segunda coluna da tabela etc.

    Esse método retorna o mesmo objeto DataFrameWriter configurado com a opção especificada.

  4. Chame o DataFrameWriter.saveAsTable para salvar o conteúdo do DataFrame em uma tabela especificada.

    Você não precisa chamar um método separado (por exemplo, collect) para executar a instrução SQL que salva os dados na tabela. saveAsTable é um método de ação que executa a instrução SQL.

O exemplo a seguir substitui uma tabela existente (identificada pela variável tableName) com o conteúdo do DataFrame df:

df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)
Copy

O exemplo a seguir insere linhas do DataFrame df em uma tabela existente (identificada pela variável tableName). Nesse exemplo, a tabela e o DataFrame ambos contêm as colunas c1 e c2.

O exemplo demonstra a diferença entre configurar a opção columnOrder para "name" (que insere valores nas colunas da tabela com os mesmos nomes das colunas do DataFrame) e usar a opção padrão columnOrder (que insere valores nas colunas da tabela com base na ordem das colunas no DataFrame).

val df = session.sql("SELECT 1 AS c2, 2 as c1")
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write.mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName)
// With the default value of the columnOrder option ("index"), the DataFrameWriter the uses column positions
// and inserts a row with the values (1, 2).
df.write.mode(SaveMode.Append).saveAsTable(tableName)
Copy

Como criar uma exibição a partir de um DataFrame

Para criar uma exibição a partir de um DataFrame, chame o método DataFrame.createOrReplaceView:

df.createOrReplaceView("db.schema.viewName")
Copy

Note que chamando createOrReplaceView cria imediatamente a nova exibição. Mais importante ainda, não faz com que o DataFrame seja avaliado. (O DataFrame em si não é avaliado até você executar uma ação.)

As exibições que você cria chamando createOrReplaceView são persistentes. Se você não precisar mais dessa exibição, você pode descartar a exibição manualmente.

Se você precisar criar uma exibição temporária apenas para a sessão, chame o método DataFrame.createOrReplaceTempView em vez disso:

df.createOrReplaceTempView("db.schema.viewName")
Copy

Como criar um cache do DataFrame

Em alguns casos, pode ser necessário executar uma consulta complexa e manter os resultados para uso em operações subsequentes (em vez de executar a mesma consulta novamente). Nesses casos, você pode armazenar em cache o conteúdo de um DataFrame chamando o método DataFrame.cacheResult.

Esse método:

  • Executa a consulta.

    Você não precisa chamar um método de ação separado para recuperar os resultados antes de chamar cacheResult. cacheResult é um método de ação que executa a consulta.

  • Salva os resultados em uma tabela temporária

    Como cacheResult cria uma tabela temporária, você deve ter o privilégio CREATE TABLE sobre o esquema que está em uso.

  • Retorna um objeto HasCachedResult, que dá acesso aos resultados da tabela temporária.

    Como HasCachedResult estende DataFrame, você pode realizar algumas das mesmas operações sobre estes dados em cache que pode realizar em um DataFrame.

Nota

Como cacheResult executa a consulta e salva os resultados em uma tabela, o método pode resultar em um aumento dos custos de computação e armazenamento.

Por exemplo:

import com.snowflake.snowpark.functions_

// Set up a DataFrame to query a table.
val df = session.table("sample_product_data").filter(col("category_id") > 10)
// Retrieve the results and cache the data.
val cachedDf = df.cacheResult()
// Create a DataFrame containing a subset of the cached data.
val dfSubset = cachedDf.filter(col("category_id") === lit(20)).select(col("name"), col("category_id"))
dfSubset.show()
Copy

Note que o DataFrame original não é afetado quando você chama esse método. Por exemplo, suponha que dfTable seja um DataFrame para a tabela sample_product_data:

val dfTempTable = dfTable.cacheResult()
Copy

Depois de chamar cacheResult, dfTable ainda aponta para a tabela sample_product_data, e você pode continuar a usar dfTable para consultar e atualizar essa tabela.

Para usar os dados em cache na tabela temporária, você usa dfTempTable (o objeto HasCachedResult retornado por cacheResult).

Como trabalhar com arquivos em um estágio

A biblioteca do Snowpark fornece classes e métodos que você pode usar para carregar dados no Snowflake e descarregar dados do Snowflake usando arquivos em estágios.

Nota

Para utilizar essas classes e métodos em um estágio, você deve ter os privilégios necessários para trabalhar com o estágio.

As próximas seções explicam como utilizar essas classes e métodos:

Carregamento e descarregamento de arquivos em um estágio

Para carregar e baixar arquivos em um estágio, use o objeto FileOperation:

Carregamento de arquivos para um estágio

Para carregar arquivos para um estágio:

  1. Verifique se você tem os privilégios de carregar arquivos para o estágio.

  2. Use Session.file para acessar o objeto FileOperation para a sessão.

  3. Chame o método FileOperation.put para carregar os arquivos em um estágio.

    Esse método executa um comando SQL PUT.

    • Para especificar quaisquer parâmetros opcionais para o comando PUT, crie um Map dos parâmetros e valores e passe o Map como o argumento options. Por exemplo:

      // Upload a file to a stage without compressing the file.
      val putOptions = Map("AUTO_COMPRESS" -> "FALSE")
      val putResults = session.file.put("file:///tmp/myfile.csv", "@myStage", putOptions)
      
      Copy
    • No argumento localFilePath, você pode usar curingas (* e ?) para identificar um conjunto de arquivos a ser carregado. Por exemplo:

      // Upload the CSV files in /tmp with names that start with "file".
      // You can use the wildcard characters "*" and "?" to match multiple files.
      val putResults = session.file.put("file:///tmp/file*.csv", "@myStage/prefix2")
      
      Copy
  4. Verifique o Array de objetos PutResult retornados pelo método put para determinar se os arquivos foram carregados com sucesso. Por exemplo, para imprimir o nome do arquivo e o status da operação PUT para esse arquivo:

    // Print the filename and the status of the PUT operation.
    putResults.foreach(r => println(s"  ${r.sourceFileName}: ${r.status}"))
    
    Copy

Como baixar arquivos de um estágio

Para baixar arquivos de uma estágio:

  1. Verifique se você tem os privilégios de carregar arquivos para o estágio.

  2. Use Session.file para acessar o objeto FileOperation para a sessão.

  3. Chame o método FileOperation.get para baixar os arquivos de um estágio.

    Esse método executa um comando SQL GET.

    Para especificar quaisquer parâmetros opcionais para o comando GET, crie um Map dos parâmetros e valores e passe o Map como o argumento options. Por exemplo:

    // Download files with names that match a regular expression pattern.
    val getOptions = Map("PATTERN" -> s"'.*file_.*.csv.gz'")
    val getResults = session.file.get("@myStage", "file:///tmp", getOptions)
    
    Copy
  4. Verifique o Array de objetos GetResult retornados pelo método get para determinar se os arquivos foram baixados com sucesso. Por exemplo, para imprimir o nome do arquivo e o status da operação GET para esse arquivo:

    // Print the filename and the status of the GET operation.
    getResults.foreach(r => println(s"  ${r.fileName}: ${r.status}"))
    
    Copy

Como usar fluxos de entrada para carregar e descarregar dados em um estágio

Nota

Esse recurso foi introduzido no Snowpark 1.4.0.

Para usar fluxos de entrada para carregar dados em um arquivo em um estágio e baixar dados de um arquivo em um estágio, use os métodos uploadStream e downloadStream do objeto FileOperation:

Como usar um fluxo de entrada para carregar dados em um arquivo em um estágio

Para carregar os dados de um objeto java.io.InputStream em um arquivo em um estágio:

  1. Verifique se você tem os privilégios de carregar arquivos para o estágio.

  2. Use Session.file para acessar o objeto FileOperation para a sessão.

  3. Chame o método FileOperation.uploadStream.

    Passe o caminho completo para o arquivo no estágio onde os dados devem ser escritos e o objeto InputStream. Além disso, use o argumento compress para especificar se os dados devem ou não ser compactados antes de serem carregados.

Por exemplo:

import java.io.InputStream
...
val compressData = true
val pathToFileOnStage = "@myStage/path/file"
session.file.uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData)
Copy

Como usar um fluxo de entrada para baixar dados de um arquivo em um estágio

Para baixar dados de um arquivo em um estágio para um objeto java.io.io.InputStream:

  1. Verifique se você tem os privilégios de carregar arquivos para o estágio.

  2. Use Session.file para acessar o objeto FileOperation para a sessão.

  3. Chame o método FileOperation.downloadStream.

    Passe o caminho completo para o arquivo no estágio contendo os dados a serem baixados. Use o argumento decompress para especificar se os dados no arquivo estão ou não compactados.

Por exemplo:

import java.io.InputStream
...
val isDataCompressed = true
val pathToFileOnStage = "@myStage/path/file"
val is = session.file.downloadStream(pathToFileOnStage, isDataCompressed)
Copy

Como configurar um DataFrame para arquivos em um estágio

Esta seção explica como configurar um DataFrame para arquivos em um estágio do Snowflake. Depois de criar esse DataFrame, você pode usar o DataFrame para:

Para configurar um DataFrame para arquivos em um estágio do Snowflake, use a classe DataFrameReader:

  1. Verifique se você tem os seguintes privilégios:

  2. Chame o método read na classe Session para acessar um objeto DataFrameReader.

  3. Se os arquivos estiverem no formato CSV, descreva os campos no arquivo. Para fazer isso:

    1. crie um objeto StructType que consiste em uma sequência de objetos StructField descrevendo os campos no arquivo.

    2. Para cada objeto StructField, especifique o seguinte:

      • O nome do campo.

      • O tipo de dados do campo (especificado como um objeto no pacote com.snowflake.snowpark.types).

      • Se o campo é ou não anulável.

      Por exemplo:

      import com.snowflake.snowpark.types._
      
      val schemaForDataFile = StructType(
          Seq(
              StructField("id", StringType, true),
              StructField("name", StringType, true)))
      
      Copy
    3. Chame o método schema no objeto DataFrameReader, passando o objeto StructType.

      Por exemplo:

      var dfReader = session.read.schema(schemaForDataFile)
      
      Copy

      O método schema retorna um objeto DataFrameReader que é configurado para ler arquivos contendo os campos especificados.

      Observe que você não precisa fazer isso para arquivos em outros formatos (como JSON). Para esses arquivos, o DataFrameReader trata os dados como um único campo do tipo VARIANT com o nome do campo $1.

  4. Se você precisar especificar informações adicionais sobre como os dados devem ser lidos (por exemplo, que os dados estão comprimidos ou que um arquivo CSV usa um ponto-e-vírgula em vez de uma vírgula para delimitar campos), chame o método DataFrameReader.option ou o método DataFrameReader.options.

    Passe o nome e o valor da opção que deseja definir. Você pode definir os seguintes tipos de opções:

    O exemplo a seguir configura o objeto DataFrameReader para consultar dados em um arquivo CSV que não é compactado e que usa um ponto-e-vírgula como delimitador de campo.

    dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    
    Copy

    O método option retorna um objeto DataFrameReader que é configurado com a opção especificada.

    Para definir várias opções, você pode encadear chamadas ao método option (como mostrado no exemplo acima) ou chamar o método DataFrameReader.options, passando um Map dos nomes e valores das opções.

  5. Chame o método correspondente ao formato dos arquivos. Você pode chamar um dos seguintes métodos:

    Ao chamar esses métodos, passe o local do estágio dos arquivos a serem lidos. Por exemplo:

    val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
    
    Copy

    Para especificar vários arquivos que começam com o mesmo prefixo, especifique o prefixo após o nome do estágio. Por exemplo, para carregar arquivos com o prefixo csv_ do estágio @mystage:

    val df = dfReader.csv("@mystage/csv_")
    
    Copy

    Os métodos correspondentes ao formato de um arquivo retornam um objeto CopyableDataFrame para esse arquivo. CopyableDataFrame estende DataFrame e fornece métodos adicionais para trabalhar os dados em arquivos preparados.

  6. Chame um método de ação para:

    Como no caso de DataFrames para tabelas, os dados não são obtidos no DataFrame até você chamar um método de ação.

Carregamento de dados de arquivos em um DataFrame

Depois que você configurar um DataFrame para arquivos em um estágio, você pode carregar dados dos arquivos para o DataFrame:

  1. Use os métodos do objeto DataFrame para realizar quaisquer transformações necessárias no conjunto de dados (por exemplo, selecionar campos específicos, filtrar linhas, etc.).

    Por exemplo, para extrair o elemento color de um arquivo JSON chamado data.json no estágio chamado mystage:

    val df = session.read.json("@mystage/data.json").select(col("$1")("color"))
    
    Copy

    Como explicado anteriormente, para arquivos em formatos diferentes de CSV (por exemplo JSON), o DataFrameReader trata os dados no arquivo como uma única coluna VARIANT com o nome $1.

  2. Chame o método DataFrame.collect para carregar os dados. Por exemplo:

    val results = df.collect()
    
    Copy

Como copiar dados de arquivos para uma tabela

Depois de configurar um DataFrame para arquivos em um estágio, você pode chamar o método CopyableDataFrame.copyInto para copiar os dados em uma tabela. Este método executa o comando COPY INTO <tabela>.

Nota

Você não precisa chamar o método collect antes de chamar copyInto. Os dados do arquivo não precisam estar no DataFrame antes de chamar copyInto.

Por exemplo, o código a seguir carrega dados do arquivo CSV especificado por myFileStage na tabela mytable. Como os dados estão em um arquivo CSV, o código também deve descrever os campos no arquivo. O exemplo faz isso chamando o método DataFrameReader.schema e passando em um objeto StructType (csvFileSchema) que contém uma sequência de objetos StructField que descrevem os campos.

val df = session.read.schema(csvFileSchema).csv(myFileStage)
df.copyInto("mytable")
Copy

Como salvar um DataFrame para arquivos em um estágio

Nota

Esse recurso foi introduzido no Snowpark 1.5.0.

Se você precisar salvar um DataFrame para arquivos em um estágio, você pode chamar o método DataFrameWriter correspondente ao formato do arquivo (por exemplo, o método csv para escrever em um arquivo CSV), passando o local do estágio onde os arquivos devem ser salvos. Esses métodos DataFrameWriter executam o comando COPY INTO <local>.

Nota

Você não precisa chamar o método collect antes de chamar esses métodos DataFrameWriter. Os dados do arquivo não precisam estar no DataFrame antes de chamar esses métodos.

Para salvar o conteúdo de um DataFrame em arquivos em um estágio:

  1. Chame o método DataFrame.write para obter um objeto DataFrameWriter. Por exemplo, para obter o objeto DataFrameWriter para um DataFrame que representa a tabela chamada sample_product_data:

    dfWriter = session.table("sample_product_data").write
    
    Copy
  2. Se você quiser sobrescrever o conteúdo do arquivo (se o arquivo existir), chame o método DataFrameWriter.mode, passando SaveMode.Overwrite.

    Caso contrário, por padrão, o DataFrameWriter reporta um erro se o arquivo especificado no estágio já existir.

    O método mode retorna o mesmo objeto DataFrameWriter configurado com o modo especificado.

    Por exemplo, para especificar que o DataFrameWriter deve substituir o arquivo no estágio:

    dfWriter = dfWriter.mode(SaveMode.Overwrite)
    
    Copy
  3. Se você precisar especificar informações adicionais sobre como os dados devem ser salvos (por exemplo, que os dados devem ser compactados ou que você deseja usar um ponto-e-vírgula para delimitar campos em um arquivo CSV), chame o método DataFrameWriter.option ou o método DataFrameWriter.options.

    Passe o nome e o valor da opção que deseja definir. Você pode definir os seguintes tipos de opções:

    Observe que você não pode usar o método option para definir as seguintes opções:

    • A opção do tipo de formato TYPE.

    • A opção de cópia OVERWRITE. Para definir esta opção, chame o método mode em vez disso (conforme mencionado na etapa anterior).

    O exemplo a seguir configura o objeto DataFrameWriter para salvar dados em um arquivo CSV sem compressão, usando um ponto-e-vírgula (no lugar de uma vírgula) como delimitador de campo.

    dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    
    Copy

    O método option retorna um objeto DataFrameWriter que é configurado com a opção especificada.

    Para definir várias opções, você pode encadear chamadas ao método option (como mostrado no exemplo acima) ou chamar o método DataFrameWriter.options, passando um Map dos nomes e valores das opções.

  4. Para retornar detalhes sobre cada arquivo que foi salvo, defina a DETAILED_OUTPUT opção de cópia como TRUE.

    Por padrão, DETAILED_OUTPUT é FALSE, o que significa que o método retorna uma única linha de saída contendo os campos "rows_unloaded", "input_bytes" e "output_bytes".

    Quando você define DETAILED_OUTPUT como TRUE, o método retorna uma linha de saída para cada arquivo salvo. Cada linha contém os campos FILE_NAME, FILE_SIZE e ROW_COUNT.

  5. Chame o método correspondente ao formato do arquivo para salvar os dados no arquivo. Você pode chamar um dos seguintes métodos:

    Ao chamar esses métodos, passe o local do estágio do arquivo onde os dados devem ser escritos (por exemplo, @mystage).

    Por padrão, o método salva os dados em nomes de arquivos com o prefixo data_ (por exemplo: @mystage/data_0_0_0.csv). Se você deseja que os arquivos sejam nomeados com um prefixo diferente, especifique o prefixo após o nome do estágio. Por exemplo:

    val writeFileResult = dfWriter.csv("@mystage/saved_data")
    
    Copy

    Esse exemplo salva o conteúdo do DataFrame em arquivos que começam com o prefixo saved_data (por exemplo, @mystage/saved_data_0_0_0.csv).

  6. Verifique o objeto WriteFileResult retornado para obter informações sobre a quantidade de dados escritos no arquivo.

    A partir do objeto WriteFileResult, você pode acessar a saída produzida pelo comando COPY INTO <location>:

    • Para acessar as linhas de saída como uma array de objetos Row, use o membro de valor rows.

    • Para determinar quais campos estão presentes nas linhas, use o membro de valor schema, que é um StructType que descreve os campos na linha.

    Por exemplo, para imprimir os nomes dos campos e valores nas linhas de saída:

    val writeFileResult = dfWriter.csv("@mystage/saved_data")
    for ((row, index) <- writeFileResult.rows.zipWithIndex) {
      (writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach {
        (structField, element) => println(s"${structField.name}: $element")
      }
    }
    
    Copy

O exemplo a seguir usa um DataFrame para salvar o conteúdo da tabela chamada car_sales para arquivos JSON com o prefixo saved_data no estágio @mystage (por exemplo, @mystage/saved_data_0_0_0.json). O código de exemplo:

  • Substitui o arquivo se ele já existir no estágio.

  • Retorna resultados detalhados sobre a operação de salvar.

  • Salva os dados sem compressão.

Finalmente, o código de exemplo imprime cada campo e valor nas linhas de saída retornadas:

val df = session.table("car_sales")
val writeFileResult = df.write.mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data")
for ((row, index) <- writeFileResult.rows.zipWithIndex) {
  println(s"Row: $index")
  (writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach {
    (structField, element) => println(s"${structField.name}: $element")
  }
}
Copy

Como trabalhar com dados semiestruturados

Usando um DataFrame, você pode consultar e acessar dados semiestruturados (por exemplo: dados JSON). As próximas seções explicam como trabalhar com dados semiestruturados em um DataFrame.

Nota

Os exemplos nestas seções usam os dados de exemplo em Amostra de dados usados em exemplos.

Como percorrer dados semiestruturados

Para se referir a um campo ou elemento específico em dados semiestruturados, use os seguintes métodos do objeto Column:

  • Use Column.apply(«<field_name>») para retornar um objeto Column para um campo em um OBJECT (ou um VARIANT que contenha um OBJECT).

  • Use Column.apply(<index>) para retornar um objeto Column para um elemento em uma ARRAY (ou um VARIANT que contenha uma ARRAY).

Nota

Se o nome do campo ou elementos no caminho forem irregulares e dificultarem o uso dos métodos Column.apply, você pode usar as funções get, get_ignore_case ou get_path como alternativa.

Como mencionado em Uso do método apply para se referir a uma coluna, você pode omitir o nome do método apply:

col("column_name")("field_name")
col("column_name")(index)
Copy

Por exemplo, o seguinte código seleciona o campo dealership nos objetos na coluna src dos dados de exemplo:

val df = session.table("car_sales")
df.select(col("src")("dealership")).show()
Copy

O código imprime a seguinte saída:

----------------------------
|"""SRC""['DEALERSHIP']"   |
----------------------------
|"Valley View Auto Sales"  |
|"Tindel Toyota"           |
----------------------------
Copy

Nota

Os valores no DataFrame estão entre aspas porque esses valores são retornados como cadeias de caracteres literais. Para converter esses valores em um tipo específico, consulte Conversão explícita de valores em dados semiestruturados.

Você também pode encadear chamadas a métodos para percorrer um caminho para um campo ou elemento específico.

Por exemplo, o código a seguir seleciona o campo name no objeto salesperson:

val df = session.table("car_sales")
df.select(col("src")("salesperson")("name")).show()
Copy

O código imprime a seguinte saída:

------------------------------------
|"""SRC""['SALESPERSON']['NAME']"  |
------------------------------------
|"Frank Beasley"                   |
|"Greg Northrup"                   |
------------------------------------
Copy

Como outro exemplo, o código a seguir seleciona o primeiro elemento do campo vehicle, que contém uma série de veículos. O exemplo também seleciona o campo price do primeiro elemento.

val df = session.table("car_sales")
df.select(col("src")("vehicle")(0)).show()
df.select(col("src")("vehicle")(0)("price")).show()
Copy

O código imprime a seguinte saída:

---------------------------
|"""SRC""['VEHICLE'][0]"  |
---------------------------
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "paint protection"   |
|  ],                     |
|  "make": "Honda",       |
|  "model": "Civic",      |
|  "price": "20275",      |
|  "year": "2017"         |
|}                        |
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "rust proofing",     |
|    "fabric protection"  |
|  ],                     |
|  "make": "Toyota",      |
|  "model": "Camry",      |
|  "price": "23500",      |
|  "year": "2017"         |
|}                        |
---------------------------

------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']"  |
------------------------------------
|"20275"                           |
|"23500"                           |
------------------------------------
Copy

Como alternativa ao método apply, você pode usar as funções get, get_ignore_case ou get_path se o nome do campo ou elementos no caminho forem irregulares e dificultarem o uso dos métodos Column.apply.

Por exemplo, as duas linhas de código a seguir imprimem o valor de um campo especificado em um objeto:

df.select(get(col("src"), lit("dealership"))).show()
df.select(col("src")("dealership")).show()
Copy

Da mesma forma, as seguintes linhas de código imprimem o valor de um campo em um caminho especificado em um objeto:

df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
df.select(col("src")("vehicle")(0)("make")).show()
Copy

Conversão explícita de valores em dados semiestruturados

Por padrão, os valores de campos e elementos são retornados como literais de cadeias de caracteres (incluindo as aspas duplas), como mostrado nos exemplos acima.

Para evitar resultados inesperados, chame o método de converter para converter o valor para um tipo específico. Por exemplo, o seguinte código imprime os valores sem e com conversão:

// Import the objects for the data types, including StringType.
import com.snowflake.snowpark.types._
...
val df = session.table("car_sales")
df.select(col("src")("salesperson")("id")).show()
df.select(col("src")("salesperson")("id").cast(StringType)).show()
Copy

O código imprime a seguinte saída:

----------------------------------
|"""SRC""['SALESPERSON']['ID']"  |
----------------------------------
|"55"                            |
|"274"                           |
----------------------------------

---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)"  |
---------------------------------------------------
|55                                               |
|274                                              |
---------------------------------------------------
Copy

Como achatar uma array de objetos em linhas

Se você precisar «achatar» dados semiestruturados em um DataFrame (por exemplo, produzindo uma linha para cada objeto em uma matriz), chame o método DataFrame.flatten. Este método é equivalente à função FLATTEN SQL. Se você passar um caminho para um objeto ou matriz, o método retorna um DataFrame que contém uma linha para cada campo ou elemento no objeto ou matriz.

Por exemplo, nos dados de exemplo, src:customer é um conjunto de objetos que contém informações sobre um cliente. Cada objeto contém um campo name e address.

Se você passar esse caminho para a função flatten:

val df = session.table("car_sales")
df.flatten(col("src")("customer")).show()
Copy

o método retornará um DataFrame:

----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC"                                      |"SEQ"  |"KEY"  |"PATH"  |"INDEX"  |"VALUE"                            |"THIS"                               |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{                                          |1      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "San Francisco, CA",  |  {                                  |
|    {                                      |       |       |        |         |  "name": "Joyce Ridgely",         |    "address": "San Francisco, CA",  |
|      "address": "San Francisco, CA",      |       |       |        |         |  "phone": "16504378889"           |    "name": "Joyce Ridgely",         |
|      "name": "Joyce Ridgely",             |       |       |        |         |}                                  |    "phone": "16504378889"           |
|      "phone": "16504378889"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Valley View Auto Sales",  |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "55",                            |       |       |        |         |                                   |                                     |
|    "name": "Frank Beasley"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "paint protection"                 |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Honda",                     |       |       |        |         |                                   |                                     |
|      "model": "Civic",                    |       |       |        |         |                                   |                                     |
|      "price": "20275",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
|{                                          |2      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "New York, NY",       |  {                                  |
|    {                                      |       |       |        |         |  "name": "Bradley Greenbloom",    |    "address": "New York, NY",       |
|      "address": "New York, NY",           |       |       |        |         |  "phone": "12127593751"           |    "name": "Bradley Greenbloom",    |
|      "name": "Bradley Greenbloom",        |       |       |        |         |}                                  |    "phone": "12127593751"           |
|      "phone": "12127593751"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Tindel Toyota",           |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "274",                           |       |       |        |         |                                   |                                     |
|    "name": "Greg Northrup"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "rust proofing",                   |       |       |        |         |                                   |                                     |
|        "fabric protection"                |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Toyota",                    |       |       |        |         |                                   |                                     |
|      "model": "Camry",                    |       |       |        |         |                                   |                                     |
|      "price": "23500",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
----------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

A partir desse DataFrame, você poderá selecionar os campos name e address de cada objeto no campo VALUE:

df.flatten(col("src")("customer")).select(col("value")("name"), col("value")("address")).show()
Copy
-------------------------------------------------
|"""VALUE""['NAME']"   |"""VALUE""['ADDRESS']"  |
-------------------------------------------------
|"Joyce Ridgely"       |"San Francisco, CA"     |
|"Bradley Greenbloom"  |"New York, NY"          |
-------------------------------------------------
Copy

O código a seguir complementa o exemplo anterior ao converter os valores para um tipo específico e mudar os nomes das colunas:

df.flatten(col("src")("customer")).select(col("value")("name").cast(StringType).as("Customer Name"), col("value")("address").cast(StringType).as("Customer Address")).show()
Copy
-------------------------------------------
|"Customer Name"     |"Customer Address"  |
-------------------------------------------
|Joyce Ridgely       |San Francisco, CA   |
|Bradley Greenbloom  |New York, NY        |
-------------------------------------------
Copy

Como executar instruções SQL

Para executar uma instrução SQL que você especificar, chame o método sql na classe Session e passe a instrução a ser executada. O método retorna um DataFrame.

Note que a instrução SQL não será executada até que você chame um método de ação.

// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val dfStageFiles = session.sql("ls @myStage")
val files = dfStageFiles.collect()
files.foreach(println)

// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()

val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()
println("Count: " + numRows);
Copy

Se você quiser chamar métodos para transformar o DataFrame (por exemplo, filtro, seleção etc.), observe que estes métodos funcionam somente se a instrução SQL subjacente for uma instrução SELECT. Os métodos de transformação não são suportados para outros tipos de instruções SQL.

val df = session.sql("select id, category_id, name from sample_product_data where id > 10")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("category_id") < 10).select(col("id")).collect()
results.foreach(println)

// In this example, the underlying SQL statement is not a SELECT statement.
val dfStageFiles = session.sql("ls @myStage")
// Calling the filter method results in an error.
dfStageFiles.filter(...)
Copy