Como trabalhar com DataFrames no Snowpark Python¶
No Snowpark, a principal forma de consultar e processar dados é através de um DataFrame. Este tópico explica como trabalhar com DataFrames.
Neste tópico:
Para obter e manipular dados, use a classe DataFrame
. Um DataFrame representa um conjunto de dados relacionais que é avaliado lentamente: ele só é executado quando uma ação específica é acionada. Em certo sentido, um DataFrame é como uma consulta que precisa ser avaliada para obter dados.
Para obter dados em um DataFrame:
Construa um DataFrame especificando a fonte dos dados para o conjunto de dados.
Por exemplo, você pode criar um DataFrame para armazenar dados de uma tabela, um arquivo CSV externo, de dados locais ou a execução de uma instrução SQL.
Especifique como o conjunto de dados no DataFrame deve ser transformado.
Por exemplo, você pode especificar quais colunas devem ser selecionadas, como as linhas devem ser filtradas, como os resultados devem ser ordenados e agrupados, etc.
Execute a instrução para obter os dados para o DataFrame.
Para obter os dados no DataFrame, você deve invocar um método que execute uma ação (por exemplo, o método
collect()
).
As próximas seções explicam essas etapas com mais detalhes.
Como estabelecer os exemplos para esta seção¶
Alguns exemplos desta seção utilizam um DataFrame para consultar uma tabela chamada sample_product_data
. Se você quiser executar esses exemplos, você pode criar essa tabela e preenchê-la com alguns dados executando as seguintes instruções SQL:
>>> session.sql('CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT)').collect()
[Row(status='Table SAMPLE_PRODUCT_DATA successfully created.')]
>>> session.sql("""
... INSERT INTO sample_product_data VALUES
... (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
... (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
... (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
... (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
... (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
... (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
... (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
... (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
... (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
... (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
... (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
... (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100)
... """).collect()
[Row(number of rows inserted=12)]
Para verificar se a tabela foi criada, execute:
>>> session.sql("SELECT count(*) FROM sample_product_data").collect()
[Row(COUNT(*)=12)]
Como criar um DataFrame¶
Para criar um DataFrame, você pode usar os métodos e propriedades da classe Session
. Cada um dos métodos a seguir constrói um DataFrame a partir de um tipo diferente de fonte de dados:
Para criar um DataFrame a partir de dados em uma tabela, exibição ou fluxo, chame o método
table
:>>> # Create a DataFrame from the data in the "sample_product_data" table. >>> df_table = session.table("sample_product_data") # To print out the first 10 rows, call df_table.show()
Para criar um DataFrame a partir de valores especificados, chame o método
create_dataframe
:>>> # Create a DataFrame from specified values. >>> df1 = session.create_dataframe([1, 2, 3, 4]).to_df("a") # one column, named "a" >>> df1.show() ------- |"A" | ------- |1 | |2 | |3 | |4 | ------- >>> # Create a DataFrame with 4 columns, "a", "b", "c" and "d". >>> df2 = session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"]) >>> df2.show() ------------------------- |"A" |"B" |"C" |"D" | ------------------------- |1 |2 |3 |4 | ------------------------- >>> # Create another DataFrame with 4 columns, "a", "b", "c" and "d". >>> from snowflake.snowpark import Row >>> df3 = session.create_dataframe([Row(a=1, b=2, c=3, d=4)]) >>> df3.show() ------------------------- |"A" |"B" |"C" |"D" | ------------------------- |1 |2 |3 |4 | ------------------------- >>> # Create a DataFrame and specify a schema >>> from snowflake.snowpark.types import IntegerType, StringType, StructType, StructField >>> schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())]) >>> df4 = session.create_dataframe([[1, "snow"], [3, "flake"]], schema) >>> df4.show() --------------- |"A" |"B" | --------------- |1 |snow | |3 |flake | ---------------
Para criar um DataFrame contendo um intervalo de valores, chame o método
range
:>>> # Create a DataFrame from a range >>> # The dataframe will contain rows with values 1, 3, 5, 7, and 9 respectively. >>> df_range = session.range(1, 10, 2).to_df("a") >>> df_range.show() ------- |"A" | ------- |1 | |3 | |5 | |7 | |9 | -------
Para criar um DataFrame para armazenar os dados de um arquivo em um estágio, use a propriedade
read
para obter um objetoDataFrameReader
. No objetoDataFrameReader
, chame o método correspondente ao formato dos dados no arquivo:>>> from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType >>> # Create DataFrames from data in a stage. >>> df_json = session.read.json("@my_stage2/data1.json") >>> df_catalog = session.read.schema(StructType([StructField("name", StringType()), StructField("age", IntegerType())])).csv("@stage/some_dir")
Para criar um DataFrame para armazenar os resultados de uma consulta SQL, chame o método
sql
:>>> # Create a DataFrame from a SQL query >>> df_sql = session.sql("SELECT name from sample_product_data") >>> df_sql.show() -------------- |"NAME" | -------------- |Product 1 | |Product 1A | |Product 1B | |Product 2 | |Product 2A | |Product 2B | |Product 3 | |Product 3A | |Product 3B | |Product 4 | --------------
Embora você possa usar esse método para executar instruções SELECT que recuperam dados de tabelas e arquivos preparados, em vez disso, você deve usar o método
table
e a propriedaderead
, que pode fornecer melhor realce de sintaxe, realce de erros e conclusão de código inteligente em ferramentas de desenvolvimento.
Especificação de como o conjunto de dados deve ser transformado¶
Para especificar quais colunas devem ser selecionadas e como os resultados devem ser filtrados, ordenados, agrupados, etc., chame os métodos de DataFrame que transformam o conjunto de dados. Para identificar colunas nestes métodos, use a função col
ou uma expressão que seja avaliada como coluna. (Consulte Como especificar colunas e expressões.)
Por exemplo:
Para especificar quais linhas devem ser retornadas, chame o método
filter
:>>> # Import the col function from the functions module. >>> from snowflake.snowpark.functions import col >>> # Create a DataFrame for the rows with the ID 1 >>> # in the "sample_product_data" table. >>> # This example uses the == operator of the Column object to perform an >>> # equality check. >>> df = session.table("sample_product_data").filter(col("id") == 1) >>> df.show() ------------------------------------------------------------------------------------ |"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" | ------------------------------------------------------------------------------------ |1 |0 |5 |Product 1 |prod-1 |1 |10 | ------------------------------------------------------------------------------------
Para especificar as colunas que devem ser selecionadas, chame o método
select
:>>> # Import the col function from the functions module. >>> from snowflake.snowpark.functions import col >>> # Create a DataFrame that contains the id, name, and serial_number >>> # columns in the "sample_product_data" table. >>> df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number")) >>> df.show() --------------------------------------- |"ID" |"NAME" |"SERIAL_NUMBER" | --------------------------------------- |1 |Product 1 |prod-1 | |2 |Product 1A |prod-1-A | |3 |Product 1B |prod-1-B | |4 |Product 2 |prod-2 | |5 |Product 2A |prod-2-A | |6 |Product 2B |prod-2-B | |7 |Product 3 |prod-3 | |8 |Product 3A |prod-3-A | |9 |Product 3B |prod-3-B | |10 |Product 4 |prod-4 | ---------------------------------------
Você também pode consultar colunas como esta:
>>> # Import the col function from the functions module. >>> from snowflake.snowpark.functions import col >>> df_product_info = session.table("sample_product_data") >>> df1 = df_product_info.select(df_product_info["id"], df_product_info["name"], df_product_info["serial_number"]) >>> df2 = df_product_info.select(df_product_info.id, df_product_info.name, df_product_info.serial_number) >>> df3 = df_product_info.select("id", "name", "serial_number")
Cada método retorna um novo objeto DataFrame que foi transformado. (O método não afeta o objeto DataFrame original). Isso significa que se você quiser aplicar múltiplas transformações, você pode encadear chamadas a métodos, chamando cada método de transformação subsequente sobre o novo objeto DataFrame retornado pela chamada ao método anterior.
Observe que esses métodos de transformação não obtêm dados do banco de dados Snowflake. (Os métodos de ação descritos em Como executar uma ação para avaliar um DataFrame realizam a recuperação de dados). Os métodos de transformação simplesmente especificam como a instrução SQL deve ser construída.
Junção de DataFrames¶
Para juntar objetos DataFrame, chame o método join
:
>>> # Create two DataFrames to join >>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"]) >>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"]) >>> # Create a DataFrame that joins the two DataFrames >>> # on the column named "key". >>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].as_("key"), "value1", "value2").show() ------------------------------- |"KEY" |"VALUE1" |"VALUE2" | ------------------------------- |a |1 |3 | |b |2 |4 | ------------------------------- >>> # Both dataframes have the same column "key", the following is more convenient. >>> df_lhs.join(df_rhs, ["key"]).show() ------------------------------- |"KEY" |"VALUE1" |"VALUE2" | ------------------------------- |a |1 |3 | |b |2 |4 | ------------------------------- >>> # Use & operator connect join expression. '|' and ~ are similar. >>> df_joined_multi_column = df_lhs.join(df_rhs, (df_lhs.col("key") == df_rhs.col("key")) & (df_lhs.col("value1") < df_rhs.col("value2"))).select(df_lhs["key"].as_("key"), "value1", "value2") >>> df_joined_multi_column.show() ------------------------------- |"KEY" |"VALUE1" |"VALUE2" | ------------------------------- |a |1 |3 | |b |2 |4 | ------------------------------- >>> # copy the DataFrame if you want to do a self-join >>> from copy import copy >>> df_lhs_copied = copy(df_lhs) >>> df_self_joined = df_lhs.join(df_lhs_copied, (df_lhs.col("key") == df_lhs_copied.col("key")) & (df_lhs.col("value1") == df_lhs_copied.col("value1")))
Observe que quando há colunas sobrepostas nos Dataframes, o Snowpark irá anexar um prefixo gerado aleatoriamente para as colunas no resultado da junção:
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).show() ----------------------------------------------------- |"l_av5t_KEY" |"VALUE1" |"r_1p6k_KEY" |"VALUE2" | ----------------------------------------------------- |a |1 |a |3 | |b |2 |b |4 | -----------------------------------------------------
Você pode referenciar as colunas sobrepostas usando Column.alias
:
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].alias("key1"), df_rhs["key"].alias("key2"), "value1", "value2").show() ----------------------------------------- |"KEY1" |"KEY2" |"VALUE1" |"VALUE2" | ----------------------------------------- |a |a |1 |3 | |b |b |2 |4 | -----------------------------------------
Para evitar prefixos aleatórios, você poderia especificar um sufixo para anexar às colunas sobrepostas:
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"), lsuffix="_left", rsuffix="_right").show() -------------------------------------------------- |"KEY_LEFT" |"VALUE1" |"KEY_RIGHT" |"VALUE2" | -------------------------------------------------- |a |1 |a |3 | |b |2 |b |4 | --------------------------------------------------
Observe que estes exemplos usam DataFrame.col
para especificar as colunas a serem usadas na junção. Consulte Como especificar colunas e expressões para mais maneiras de fazer isto.
Se você precisar unir uma tabela consigo mesma em diferentes colunas, você não pode realizar a autojunção com um único DataFrame. Os exemplos a seguir que usam um único DataFrame para realizar uma autojunção falham porque as expressões da coluna para "id"
estão presentes nos lados esquerdo e direito da junção:
>>> from snowflake.snowpark.exceptions import SnowparkJoinException >>> df = session.table("sample_product_data") >>> # This fails because columns named "id" and "parent_id" >>> # are in the left and right DataFrames in the join. >>> try: ... df_joined = df.join(df, col("id") == col("parent_id")) # fails ... except SnowparkJoinException as e: ... print(e.message) You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy. >>> # This fails because columns named "id" and "parent_id" >>> # are in the left and right DataFrames in the join. >>> try: ... df_joined = df.join(df, df["id"] == df["parent_id"]) # fails ... except SnowparkJoinException as e: ... print(e.message) You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy.
Em vez disso, use o método clone copy()
incluído com o Python para criar um clone do objeto DataFrame, e use os dois objetos DataFrame para realizar a junção:
>>> from copy import copy >>> # Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join. >>> df_lhs = session.table("sample_product_data") >>> # Clone the DataFrame object to use as the right-hand side of the join. >>> df_rhs = copy(df_lhs) >>> # Create a DataFrame that joins the two DataFrames >>> # for the "sample_product_data" table on the >>> # "id" and "parent_id" columns. >>> df_joined = df_lhs.join(df_rhs, df_lhs.col("id") == df_rhs.col("parent_id")) >>> df_joined.count() 8
Como especificar colunas e expressões¶
Ao chamar esses métodos de transformação, pode ser necessário especificar colunas ou expressões que utilizam colunas. Por exemplo, ao chamar o método select
, você precisa especificar as colunas que devem ser selecionadas.
Para se referir a uma coluna, crie um objeto Column
chamando a função col
no módulo snowflake.snowpark.functions
.
>>> # Import the col function from the functions module.
>>> from snowflake.snowpark.functions import col
>>> df_product_info = session.table("sample_product_data").select(col("id"), col("name"))
>>> df_product_info.show()
---------------------
|"ID" |"NAME" |
---------------------
|1 |Product 1 |
|2 |Product 1A |
|3 |Product 1B |
|4 |Product 2 |
|5 |Product 2A |
|6 |Product 2B |
|7 |Product 3 |
|8 |Product 3A |
|9 |Product 3B |
|10 |Product 4 |
---------------------
Nota
Para criar um objeto Column
para um literal, consulte Como usar literais como objetos de coluna.
Ao especificar um filtro, projeção, condição de junção, etc., você pode usar objetos Column
em uma expressão. Por exemplo:
Você pode usar objetos
Column
com o métodofilter
para especificar uma condição de filtro:>>> # Specify the equivalent of "WHERE id = 20" >>> # in a SQL SELECT statement. >>> df_filtered = df.filter(col("id") == 20)
>>> df = session.create_dataframe([[1, 3], [2, 10]], schema=["a", "b"]) >>> # Specify the equivalent of "WHERE a + b < 10" >>> # in a SQL SELECT statement. >>> df_filtered = df.filter((col("a") + col("b")) < 10) >>> df_filtered.show() ------------- |"A" |"B" | ------------- |1 |3 | -------------
Você pode usar objetos
Column
com o métodoselect
para definir um alias:>>> # Specify the equivalent of "SELECT b * 10 AS c" >>> # in a SQL SELECT statement. >>> df_selected = df.select((col("b") * 10).as_("c")) >>> df_selected.show() ------- |"C" | ------- |30 | |100 | -------
Você pode usar objetos
Column
com o métodojoin
para definir uma condição de junção:>>> dfX = session.create_dataframe([[1], [2]], schema=["a_in_X"]) >>> dfY = session.create_dataframe([[1], [3]], schema=["b_in_Y"]) >>> # Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y" >>> # in a SQL SELECT statement. >>> df_joined = dfX.join(dfY, col("a_in_X") == col("b_in_Y")).select(dfX["a_in_X"].alias("the_joined_column")) >>> df_joined.show() ----------------------- |"THE_JOINED_COLUMN" | ----------------------- |1 | -----------------------
Ao se referir a colunas em dois objetos DataFrame diferentes que têm o mesmo nome (por exemplo, ao unir os DataFrames naquela coluna), você pode usar o método DataFrame.col
em um objeto DataFrame para se referir a uma coluna naquele objeto (por exemplo, df1.col("name")
e df2.col("name")
).
O exemplo a seguir demonstra como usar o método DataFrame.col
para se referir a uma coluna em um DataFrame específico. O exemplo une dois objetos DataFrame, ambos contendo uma coluna chamada key
. O exemplo usa o método Column.as
para mudar os nomes das colunas no DataFrame recém-criado.
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value"])
>>> # Create a DataFrame that joins two other DataFrames (df_lhs and df_rhs).
>>> # Use the DataFrame.col method to refer to the columns used in the join.
>>> df_joined = df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs.col("key").as_("key"), df_lhs.col("value").as_("L"), df_rhs.col("value").as_("R"))
>>> df_joined.show()
---------------------
|"KEY" |"L" |"R" |
---------------------
|a |1 |3 |
|b |2 |4 |
---------------------
Como utilizar aspas duplas em torno de identificadores de objetos (nomes de tabela, nomes de coluna, etc.)¶
Os nomes de bancos de dados, esquemas, tabelas e estágios que você especificar devem estar em conformidade com requisitos de identificadores do Snowflake.
Crie uma tabela sem colunas que diferenciam maiúsculas de minúsculas.
>>> session.sql("""
... create or replace temp table "10tablename"(
... id123 varchar, -- case insensitive because it's not quoted.
... "3rdID" varchar, -- case sensitive.
... "id with space" varchar -- case sensitive.
... )""").collect()
[Row(status='Table 10tablename successfully created.')]
>>> session.sql("""insert into "10tablename" (id123, "3rdID", "id with space") values ('a', 'b', 'c')""").collect()
[Row(number of rows inserted=1)]
>>> df = session.table('"10tablename"')
>>> df.show()
---------------------------------------
|"ID123" |"3rdID" |"id with space" |
---------------------------------------
|a |b |c |
---------------------------------------
Quando você especifica um nome, o Snowflake considera que o nome está em maiúsculas. Por exemplo, as chamadas a seguir são equivalentes:
>>> # The following calls are equivalent:
>>> df.select(col("id123")).collect()
[Row(ID123='a')]
>>> df.select(col("ID123")).collect()
[Row(ID123='a')]
Se o nome não estiver de acordo com os requisitos de identificador, deve-se usar aspas duplas ("
) em torno do nome. Use uma barra invertida (\
) para escapar o caractere de aspas duplas dentro de uma cadeia de caracteres literal. Por exemplo, como o nome da tabela a seguir não começa com uma letra ou um sublinhado, você deve usar aspas duplas em torno do nome:
>>> df = session.table("\"10tablename\"")
Alternativamente, você pode usar aspas simples em vez de barras invertidas para escapar o caractere de aspas duplas dentro de uma cadeia de caracteres literal.
>>> df = session.table('"10tablename"')
Observe que ao especificar o nome de uma coluna, você não precisa usar aspas duplas em torno do nome. A biblioteca do Snowpark coloca automaticamente o nome da coluna entre aspas duplas caso o nome não cumpra com os requisitos de identificador:
>>> # The following calls are equivalent:
>>> df.select(col("3rdID")).collect()
[Row(3rdID='b')]
>>> df.select(col("\"3rdID\"")).collect()
[Row(3rdID='b')]
>>> # The following calls are equivalent:
>>> df.select(col("id with space")).collect()
[Row(id with space='c')]
>>> df.select(col("\"id with space\"")).collect()
[Row(id with space='c')]
Se você já tiver acrescentado aspas duplas em torno de um nome de coluna, a biblioteca não inserirá aspas duplas adicionais em torno do nome.
Em alguns casos, o nome da coluna pode conter caracteres de aspas duplas:
>>> session.sql('''
... create or replace temp table quoted(
... "name_with_""air""_quotes" varchar,
... """column_name_quoted""" varchar
... )''').collect()
[Row(status='Table QUOTED successfully created.')]
>>> session.sql('''insert into quoted ("name_with_""air""_quotes", """column_name_quoted""") values ('a', 'b')''').collect()
[Row(number of rows inserted=1)]
Como explicado em Requisitos para identificadores, para cada caractere de aspas duplas dentro de um identificador de aspas duplas, deve-se usar dois caracteres de aspas duplas (por exemplo, "name_with_""air""_quotes"
e """column_name_quoted"""
):
>>> df_table = session.table("quoted")
>>> df_table.select("\"name_with_\"\"air\"\"_quotes\"").collect()
[Row(name_with_"air"_quotes='a')]
>>> df_table.select("\"\"\"column_name_quoted\"\"\"").collect()
[Row("column_name_quoted"='b')]
Tenha em mente que quando um identificador é incluído entre aspas duplas (se você adicionou explicitamente as aspas ou se a biblioteca adicionou as aspas para você), o Snowflake trata o identificador diferenciando maiúsculas de minúsculas:
>>> # The following calls are NOT equivalent!
>>> # The Snowpark library adds double quotes around the column name,
>>> # which makes Snowflake treat the column name as case-sensitive.
>>> df.select(col("id with space")).collect()
[Row(id with space='c')]
>>> from snowflake.snowpark.exceptions import SnowparkSQLException
>>> try:
... df.select(col("ID WITH SPACE")).collect()
... except SnowparkSQLException as e:
... print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 7
invalid identifier '"ID WITH SPACE"'
Como usar literais como objetos de coluna¶
Para usar um literal em um método que toma um objeto Column
como argumento, crie um objeto Column
para o literal passando o literal para a função lit
no módulo snowflake.snowpark.functions
. Por exemplo:
>>> # Import for the lit and col functions.
>>> from snowflake.snowpark.functions import col, lit
>>> # Show the first 10 rows in which num_items is greater than 5.
>>> # Use `lit(5)` to create a Column object for the literal 5.
>>> df_filtered = df.filter(col("num_items") > lit(5))
Como converter um objeto de coluna para um tipo específico¶
Para converter um objeto Column
a um tipo específico, chame o método cast
e passe um objeto do módulo snowflake.snowpark.types
. Por exemplo, para converter um literal para um NUMBER com uma precisão de 5 e escala de 2:
>>> # Import for the lit function.
>>> from snowflake.snowpark.functions import lit
>>> # Import for the DecimalType class.
>>> from snowflake.snowpark.types import DecimalType
>>> decimal_value = lit(0.05).cast(DecimalType(5,2))
Como encadear chamadas a métodos¶
Como cada método que transforma um objeto DataFrame returna um novo objeto DataFrame que tem a transformação aplicada, você pode encadear chamadas a métodos para produzir um novo DataFrame que é transformado de maneiras adicionais.
O exemplo a seguir retorna um DataFrame que está configurado para:
Consultar a tabela
sample_product_data
.Retornar a linha com
id = 1
.Selecionar as colunas
name
eserial_number
.>>> df_product_info = session.table("sample_product_data").filter(col("id") == 1).select(col("name"), col("serial_number")) >>> df_product_info.show() ------------------------------- |"NAME" |"SERIAL_NUMBER" | ------------------------------- |Product 1 |prod-1 | -------------------------------
Neste exemplo:
session.table("sample_product_data")
retorna um DataFrame para a tabelasample_product_data
.Embora o DataFrame ainda não contenha os dados da tabela, o objeto contém as definições das colunas da tabela.
filter(col("id") == 1)
retorna um DataFrame para a tabelasample_product_data
que está preparada para retornar a linha comid = 1
.Observe novamente que o DataFrame ainda não contém a linha correspondente da tabela. A linha correspondente não é obtida até que você chame um método de ação.
select(col("name"), col("serial_number"))
retorna um DataFrame que contém as colunasname
eserial_number
para a linha da tabelasample_product_data
que temid = 1
.
Quando você fizer chamadas a métodos encadeadas, tenha em mente que a ordem das chamadas é importante. Cada chamada de método retorna um DataFrame que foi transformado. Certifique-se de que as chamadas subsequentes funcionem com os DataFrame transformados.
Por exemplo, no código abaixo, o método select
retorna um DataFrame que contém apenas duas colunas: name
e serial_number
. A chamada ao método filter
nesse DataFrame falha porque usa a coluna id
, que não está no DataFrame transformado.
>>> # This fails with the error "invalid identifier 'ID'."
>>> df_product_info = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") == 1)
>>> try:
... df_product_info.show()
... except SnowparkSQLException as e:
... print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 121
invalid identifier 'ID'
Em contraste, o seguinte código é executado com sucesso porque o método filter()
é chamado em um DataFrame que contém todas as colunas da tabela sample_product_data
(incluindo a coluna id
):
>>> # This succeeds because the DataFrame returned by the table() method
>>> # includes the "id" column.
>>> df_product_info = session.table("sample_product_data").filter(col("id") == 1).select(col("name"), col("serial_number"))
>>> df_product_info.show()
-------------------------------
|"NAME" |"SERIAL_NUMBER" |
-------------------------------
|Product 1 |prod-1 |
-------------------------------
Tenha em mente que talvez seja necessário fazer as chamadas ao método select
e filter
em uma ordem diferente daquela em que você usaria as palavras-chave equivalentes (SELECT e WHERE) em uma instrução SQL.
Como obter definições das colunas¶
Para obter a definição das colunas no conjunto de dados para o DataFrame, chame a propriedade schema
. Esse método retorna um objeto StructType
que contém uma list
de StructField
objetos. Cada objeto StructField
contém a definição de uma coluna.
>>> # Get the StructType object that describes the columns in the
>>> # underlying rowset.
>>> table_schema = session.table("sample_product_data").schema
>>> table_schema
StructType([StructField('ID', LongType(), nullable=True), StructField('PARENT_ID', LongType(), nullable=True), StructField('CATEGORY_ID', LongType(), nullable=True), StructField('NAME', StringType(), nullable=True), StructField('SERIAL_NUMBER', StringType(), nullable=True), StructField('KEY', LongType(), nullable=True), StructField('"3rd"', LongType(), nullable=True)])
No objeto devolvido StructType
, os nomes das colunas são sempre normalizados. Identificadores sem aspas são retornados em maiúsculas, e identificadores entre aspas são devolvidos na caixa em que foram definidos.
O exemplo a seguir cria um DataFrame contendo as colunas ID
e 3rd
. Para o nome de coluna 3rd
, a biblioteca do Snowpark coloca o nome automaticamente entre aspas duplas ("3rd"
) porque o nome não cumpre os requisitos para um identificador.
O exemplo chama a propriedade schema
e depois chama a propriedade names
no objeto StructType
retornado para obter uma list
de nomes de coluna. Os nomes são normalizados no StructType
retornado pela propriedade schema
.
>>> # Create a DataFrame containing the "id" and "3rd" columns.
>>> df_selected_columns = session.table("sample_product_data").select(col("id"), col("3rd"))
>>> # Print out the names of the columns in the schema. This prints out:
>>> # This prints List["ID", "\"3rd\""]
>>> df_selected_columns.schema.names
['ID', '"3rd"']
Como executar uma ação para avaliar um DataFrame¶
Como mencionado anteriormente, o DataFrame é avaliado lentamente, o que significa que a instrução SQL não é enviada ao servidor para execução até que você execute uma ação. Uma ação faz com que o DataFrame seja avaliado e envia a instrução SQL correspondente para o servidor para execução.
Os seguintes métodos executam uma ação:
Classe |
Método |
Descrição |
---|---|---|
|
|
Avalia o DataFrame e retorna o conjunto de dados resultante como uma |
|
|
Avalia o DataFrame e retorna o número de linhas. |
|
|
Avalia o DataFrame e imprime as linhas no console. Observe que esse método limita o número de linhas a 10 (por padrão). |
|
|
Armazena os dados no DataFrame para a tabela especificada. Consulte Como salvar dados em uma tabela. |
Por exemplo, para executar uma consulta em uma tabela e retornar os resultados, chame o método collect
:
>>> # Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
>>> # This does not execute the query.
>>> df = session.table("sample_product_data").select(col("id"), col("name"))
>>> # Send the query to the server for execution and
>>> # return a list of Rows containing the results.
>>> results = df.collect()
Para executar a consulta e retornar o número de resultados, chame o método count
:
>>> # Create a DataFrame for the "sample_product_data" table.
>>> df_products = session.table("sample_product_data")
>>> # Send the query to the server for execution and
>>> # print the count of rows in the table.
>>> print(df_products.count())
12
Para executar uma consulta e imprimir os resultados para o console, chame o método show
:
>>> # Create a DataFrame for the "sample_product_data" table.
>>> df_products = session.table("sample_product_data")
>>> # Send the query to the server for execution and
>>> # print the results to the console.
>>> # The query limits the number of rows to 10 by default.
>>> df_products.show()
-------------------------------------------------------------------------------------
|"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" |
-------------------------------------------------------------------------------------
|1 |0 |5 |Product 1 |prod-1 |1 |10 |
|2 |1 |5 |Product 1A |prod-1-A |1 |20 |
|3 |1 |5 |Product 1B |prod-1-B |1 |30 |
|4 |0 |10 |Product 2 |prod-2 |2 |40 |
|5 |4 |10 |Product 2A |prod-2-A |2 |50 |
|6 |4 |10 |Product 2B |prod-2-B |2 |60 |
|7 |0 |20 |Product 3 |prod-3 |3 |70 |
|8 |7 |20 |Product 3A |prod-3-A |3 |80 |
|9 |7 |20 |Product 3B |prod-3-B |3 |90 |
|10 |0 |50 |Product 4 |prod-4 |4 |100 |
-------------------------------------------------------------------------------------
>>> # Limit the number of rows to 20, rather than 10.
>>> df_products.show(20)
-------------------------------------------------------------------------------------
|"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" |
-------------------------------------------------------------------------------------
|1 |0 |5 |Product 1 |prod-1 |1 |10 |
|2 |1 |5 |Product 1A |prod-1-A |1 |20 |
|3 |1 |5 |Product 1B |prod-1-B |1 |30 |
|4 |0 |10 |Product 2 |prod-2 |2 |40 |
|5 |4 |10 |Product 2A |prod-2-A |2 |50 |
|6 |4 |10 |Product 2B |prod-2-B |2 |60 |
|7 |0 |20 |Product 3 |prod-3 |3 |70 |
|8 |7 |20 |Product 3A |prod-3-A |3 |80 |
|9 |7 |20 |Product 3B |prod-3-B |3 |90 |
|10 |0 |50 |Product 4 |prod-4 |4 |100 |
|11 |10 |50 |Product 4A |prod-4-A |4 |100 |
|12 |10 |50 |Product 4B |prod-4-B |4 |100 |
-------------------------------------------------------------------------------------
Nota: Se você estiver chamando a propriedade schema
para obter as definições das colunas no DataFrame, você não precisa chamar um método de ação.
Como retornar o conteúdo de um DataFrame como um DataFrame do Pandas¶
Para retornar o conteúdo de um DataFrame como um DataFrame do Pandas, use o método to_pandas
.
Por exemplo:
>>> python_df = session.create_dataframe(["a", "b", "c"])
>>> pandas_df = python_df.to_pandas()
Como salvar dados em uma tabela¶
Para salvar o conteúdo de um DataFrame em uma tabela:
Chame a propriedade
write
para obter um objetoDataFrameWriter
.Chame o método
mode
no objetoDataFrameWriter
e especifique se você deseja inserir linhas ou atualizar linhas na tabela. Esse método retorna um novo objetoDataFrameWriter
que é configurado com o modo especificado.Chame o método
save_as_table
no objetoDataFrameWriter
para salvar o conteúdo do DataFrame em uma tabela especificada.
Observe que não é necessário chamar um método separado (por exemplo, collect
) para executar a instrução SQL que salva os dados na tabela.
Por exemplo:
>>> df.write.mode("overwrite").save_as_table("table1")
Como criar uma exibição a partir de um DataFrame¶
Para criar uma exibição a partir de um DataFrame, chame o método create_or_replace_view
, que imediatamente cria a nova exibição:
>>> import os
>>> database = os.environ["snowflake_database"] # use your own database and schema
>>> schema = os.environ["snowflake_schema"]
>>> view_name = "my_view"
>>> df.create_or_replace_view(f"{database}.{schema}.{view_name}")
[Row(status='View MY_VIEW successfully created.')]
As exibições que você cria ao chamar create_or_replace_view
são persistentes. Se você não precisar mais dessa exibição, você pode excluir a exibição manualmente.
Alternativamente, use o método create_or_replace_temp_view
, que cria uma exibição temporária. A exibição temporária só fica disponível na sessão em que ela é criada.
Como trabalhar com arquivos em um estágio¶
Esta seção explica como consultar dados em um arquivo em um estágio do Snowflake. Para outras operações em arquivos, use instruções SQL.
Para consultar dados em arquivos em um estágio do Snowflake, use a classe DataFrameReader
:
Chame o método
read
na classeSession
para acessar um objetoDataFrameReader
.Se os arquivos estiverem no formato CSV, descreva os campos no arquivo. Para fazer isso:
Crie um objeto
StructType
que consiste em umalist
de objetosStructField
descrevendo os campos no arquivo.Para cada objeto
StructField
, especifique o seguinte:O nome do campo.
O tipo de dados do campo (especificado como um objeto no módulo
snowflake.snowpark.types
).Se o campo é ou não anulável.
Por exemplo:
>>> from snowflake.snowpark.types import * >>> schema_for_data_file = StructType([ ... StructField("id", StringType()), ... StructField("name", StringType()) ... ])
Chame a propriedade
schema
no objetoDataFrameReader
, passando o objetoStructType
.Por exemplo:
>>> df_reader = session.read.schema(schema_for_data_file)
A propriedade
schema
retorna um objetoDataFrameReader
que é configurado para ler arquivos contendo os campos especificados.Note que você não precisa fazer isso para arquivos em outros formatos (como JSON). Para esses arquivos, o
DataFrameReader
trata os dados como um único campo do tipo VARIANT com o nome de campo$1
.
Se você precisar especificar informações adicionais sobre como os dados devem ser lidos (por exemplo, que os dados estão comprimidos ou que um arquivo CSV usa um ponto-e-vírgula em vez de uma vírgula para delimitar campos), chame os métodos
option
ouoptions
do objetoDataFrameReader
.O método
option
obtém um nome e um valor da opção que você deseja definir e permite combinar múltiplas chamadas encadeadas, enquanto o métodooptions
obtém um dicionário dos nomes das opções e seus valores correspondentes.Para os nomes e valores das opções de formato do arquivo, veja a documentação documentação sobre CREATE FILE FORMAT.
Você também pode definir as opções de cópia descritas na documentação COPY INTO TABLE. Observe que a definição de opções de cópia pode resultar em uma estratégia de execução mais cara quando você obtém os dados para o DataFrame.
O exemplo a seguir configura o objeto
DataFrameReader
para consultar dados em um arquivo CSV que não é compactado e que usa um ponto-e-vírgula como delimitador de campo.>>> df_reader = df_reader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
Os métodos
option
eoptions
retornam um objetoDataFrameReader
que é configurado com as opções especificadas.Chame o método correspondente ao formato do arquivo (por exemplo, o método
csv
) passando o local do arquivo.>>> df = df_reader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
Os métodos correspondentes ao formato de um arquivo retornam um objeto DataFrame que é configurado para manter os dados nesse arquivo.
Use os métodos do objeto DataFrame para realizar quaisquer transformações necessárias no conjunto de dados (por exemplo, selecionar campos específicos, filtrar linhas, etc.).
Por exemplo, para extrair o elemento
color
de um arquivo JSON no estágio chamadomy_stage
:>>> # Import the sql_expr function from the functions module. >>> from snowflake.snowpark.functions import sql_expr >>> df = session.read.json("@my_stage").select(sql_expr("$1:color"))
Como explicado anteriormente, para arquivos em formatos diferentes de CSV (por exemplo JSON), o
DataFrameReader
trata os dados no arquivo como uma única coluna VARIANT com o nome$1
.Esse exemplo usa a função
sql_expr
no módulosnowflake.snowpark.functions
para especificar o caminho para o elementocolor
.Observe que a função
sql_expr
não interpreta ou modifica o argumento de entrada. A função só permite formar expressões e trechos em SQL que ainda não são suportados pela API do Snowpark.Chame um método de ação para consultar os dados no arquivo.
Como é o caso de DataFrames para tabelas, os dados não são obtidos no DataFrame até que você chame um método de ação.
Como executar instruções SQL¶
Para executar uma instrução SQL que você especificar, chame o método sql
na classe Session
e passe a instrução a ser executada. O método retorna um DataFrame.
Note que a instrução SQL não será executada até que você chame um método de ação.
>>> # Get the list of the files in a stage.
>>> # The collect() method causes this SQL statement to be executed.
>>> session.sql("create or replace temp stage my_stage").collect()
[Row(status='Stage area MY_STAGE successfully created.')]
>>> stage_files_df = session.sql("ls @my_stage").collect()
>>> # Resume the operation of a warehouse.
>>> # Note that you must call the collect method in order to execute
>>> # the SQL statement.
>>> session.sql("alter warehouse if exists my_warehouse resume if suspended").collect()
[Row(status='Statement executed successfully.')]
>>> # Set up a SQL statement to copy data from a stage to a table.
>>> session.sql("copy into sample_product_data from @my_stage file_format=(type = csv)").collect()
[Row(status='Copy executed with 0 files processed.')]
Se você quiser chamar métodos para transformar o DataFrame (por exemplo, filter
, select
, etc.), observe que esses métodos só funcionam se a instrução SQL subjacente for uma instrução SELECT. Os métodos de transformação não são suportados para outros tipos de instruções SQL.
>>> df = session.sql("select id, parent_id from sample_product_data where id < 10")
>>> # Because the underlying SQL statement for the DataFrame is a SELECT statement,
>>> # you can call the filter method to transform this DataFrame.
>>> results = df.filter(col("id") < 3).select(col("id")).collect()
>>> # In this example, the underlying SQL statement is not a SELECT statement.
>>> df = session.sql("ls @my_stage")
>>> # Calling the filter method results in an error.
>>> try:
... df.filter(col("size") > 50).collect()
... except SnowparkSQLException as e:
... print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 104
invalid identifier 'SIZE'