Como trabalhar com DataFrames no Snowpark Python

No Snowpark, a principal forma de consultar e processar dados é através de um DataFrame. Este tópico explica como trabalhar com DataFrames.

Neste tópico:

Para obter e manipular dados, use a classe DataFrame. Um DataFrame representa um conjunto de dados relacionais que é avaliado lentamente: ele só é executado quando uma ação específica é acionada. Em certo sentido, um DataFrame é como uma consulta que precisa ser avaliada para obter dados.

Para obter dados em um DataFrame:

  1. Construa um DataFrame especificando a fonte dos dados para o conjunto de dados.

    Por exemplo, você pode criar um DataFrame para armazenar dados de uma tabela, um arquivo CSV externo, de dados locais ou a execução de uma instrução SQL.

  2. Especifique como o conjunto de dados no DataFrame deve ser transformado.

    Por exemplo, você pode especificar quais colunas devem ser selecionadas, como as linhas devem ser filtradas, como os resultados devem ser ordenados e agrupados, etc.

  3. Execute a instrução para obter os dados para o DataFrame.

    Para obter os dados no DataFrame, você deve invocar um método que execute uma ação (por exemplo, o método collect()).

As próximas seções explicam essas etapas com mais detalhes.

Como estabelecer os exemplos para esta seção

Alguns exemplos desta seção utilizam um DataFrame para consultar uma tabela chamada sample_product_data. Se você quiser executar esses exemplos, você pode criar essa tabela e preenchê-la com alguns dados executando as seguintes instruções SQL:

>>> session.sql('CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT)').collect()
[Row(status='Table SAMPLE_PRODUCT_DATA successfully created.')]
>>> session.sql("""
... INSERT INTO sample_product_data VALUES
... (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
... (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
... (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
... (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
... (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
... (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
... (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
... (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
... (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
... (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
... (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
... (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100)
... """).collect()
[Row(number of rows inserted=12)]

Para verificar se a tabela foi criada, execute:

>>> session.sql("SELECT count(*) FROM sample_product_data").collect()
[Row(COUNT(*)=12)]

Como criar um DataFrame

Para criar um DataFrame, você pode usar os métodos e propriedades da classe Session. Cada um dos métodos a seguir constrói um DataFrame a partir de um tipo diferente de fonte de dados:

  • Para criar um DataFrame a partir de dados em uma tabela, exibição ou fluxo, chame o método table:

    >>> # Create a DataFrame from the data in the "sample_product_data" table.
    >>> df_table = session.table("sample_product_data")
    
    # To print out the first 10 rows, call df_table.show()
    
  • Para criar um DataFrame a partir de valores especificados, chame o método create_dataframe:

    >>> # Create a DataFrame from specified values.
    >>> df1 = session.create_dataframe([1, 2, 3, 4]).to_df("a")  # one column, named "a"
    >>> df1.show()
    -------
    |"A"  |
    -------
    |1    |
    |2    |
    |3    |
    |4    |
    -------
    
    
    >>> # Create a DataFrame with 4 columns, "a", "b", "c" and "d".
    >>> df2 = session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"])
    >>> df2.show()
    -------------------------
    |"A"  |"B"  |"C"  |"D"  |
    -------------------------
    |1    |2    |3    |4    |
    -------------------------
    
    
    >>> # Create another DataFrame with 4 columns, "a", "b", "c" and "d".
    >>> from snowflake.snowpark import Row
    >>> df3 = session.create_dataframe([Row(a=1, b=2, c=3, d=4)])
    >>> df3.show()
    -------------------------
    |"A"  |"B"  |"C"  |"D"  |
    -------------------------
    |1    |2    |3    |4    |
    -------------------------
    
    
    >>> # Create a DataFrame and specify a schema
    >>> from snowflake.snowpark.types import IntegerType, StringType, StructType, StructField
    >>> schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())])
    >>> df4 = session.create_dataframe([[1, "snow"], [3, "flake"]], schema)
    >>> df4.show()
    ---------------
    |"A"  |"B"    |
    ---------------
    |1    |snow   |
    |3    |flake  |
    ---------------
    
  • Para criar um DataFrame contendo um intervalo de valores, chame o método range:

    >>> # Create a DataFrame from a range
    >>> # The dataframe will contain rows with values 1, 3, 5, 7, and 9 respectively.
    >>> df_range = session.range(1, 10, 2).to_df("a")
    >>> df_range.show()
    -------
    |"A"  |
    -------
    |1    |
    |3    |
    |5    |
    |7    |
    |9    |
    -------
    
  • Para criar um DataFrame para armazenar os dados de um arquivo em um estágio, use a propriedade read para obter um objeto DataFrameReader. No objeto DataFrameReader, chame o método correspondente ao formato dos dados no arquivo:

    >>> from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType
    
    >>> # Create DataFrames from data in a stage.
    >>> df_json = session.read.json("@my_stage2/data1.json")
    >>> df_catalog = session.read.schema(StructType([StructField("name", StringType()), StructField("age", IntegerType())])).csv("@stage/some_dir")
    
  • Para criar um DataFrame para armazenar os resultados de uma consulta SQL, chame o método sql:

    >>> # Create a DataFrame from a SQL query
    >>> df_sql = session.sql("SELECT name from sample_product_data")
    >>> df_sql.show()
    --------------
    |"NAME"      |
    --------------
    |Product 1   |
    |Product 1A  |
    |Product 1B  |
    |Product 2   |
    |Product 2A  |
    |Product 2B  |
    |Product 3   |
    |Product 3A  |
    |Product 3B  |
    |Product 4   |
    --------------
    

    Embora você possa usar esse método para executar instruções SELECT que recuperam dados de tabelas e arquivos preparados, em vez disso, você deve usar o método table e a propriedade read, que pode fornecer melhor realce de sintaxe, realce de erros e conclusão de código inteligente em ferramentas de desenvolvimento.

Especificação de como o conjunto de dados deve ser transformado

Para especificar quais colunas devem ser selecionadas e como os resultados devem ser filtrados, ordenados, agrupados, etc., chame os métodos de DataFrame que transformam o conjunto de dados. Para identificar colunas nestes métodos, use a função col ou uma expressão que seja avaliada como coluna. (Consulte Como especificar colunas e expressões.)

Por exemplo:

  • Para especificar quais linhas devem ser retornadas, chame o método filter:

    >>> # Import the col function from the functions module.
    >>> from snowflake.snowpark.functions import col
    
    >>> # Create a DataFrame for the rows with the ID 1
    >>> # in the "sample_product_data" table.
    
    >>> # This example uses the == operator of the Column object to perform an
    >>> # equality check.
    >>> df = session.table("sample_product_data").filter(col("id") == 1)
    >>> df.show()
    ------------------------------------------------------------------------------------
    |"ID"  |"PARENT_ID"  |"CATEGORY_ID"  |"NAME"     |"SERIAL_NUMBER"  |"KEY"  |"3rd"  |
    ------------------------------------------------------------------------------------
    |1     |0            |5              |Product 1  |prod-1           |1      |10     |
    ------------------------------------------------------------------------------------
    
  • Para especificar as colunas que devem ser selecionadas, chame o método select:

    >>> # Import the col function from the functions module.
    >>> from snowflake.snowpark.functions import col
    
    >>> # Create a DataFrame that contains the id, name, and serial_number
    >>> # columns in the "sample_product_data" table.
    >>> df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number"))
    >>> df.show()
    ---------------------------------------
    |"ID"  |"NAME"      |"SERIAL_NUMBER"  |
    ---------------------------------------
    |1     |Product 1   |prod-1           |
    |2     |Product 1A  |prod-1-A         |
    |3     |Product 1B  |prod-1-B         |
    |4     |Product 2   |prod-2           |
    |5     |Product 2A  |prod-2-A         |
    |6     |Product 2B  |prod-2-B         |
    |7     |Product 3   |prod-3           |
    |8     |Product 3A  |prod-3-A         |
    |9     |Product 3B  |prod-3-B         |
    |10    |Product 4   |prod-4           |
    ---------------------------------------
    
  • Você também pode consultar colunas como esta:

    >>> # Import the col function from the functions module.
    >>> from snowflake.snowpark.functions import col
    
    >>> df_product_info = session.table("sample_product_data")
    >>> df1 = df_product_info.select(df_product_info["id"], df_product_info["name"], df_product_info["serial_number"])
    >>> df2 = df_product_info.select(df_product_info.id, df_product_info.name, df_product_info.serial_number)
    >>> df3 = df_product_info.select("id", "name", "serial_number")
    

Cada método retorna um novo objeto DataFrame que foi transformado. (O método não afeta o objeto DataFrame original). Isso significa que se você quiser aplicar múltiplas transformações, você pode encadear chamadas a métodos, chamando cada método de transformação subsequente sobre o novo objeto DataFrame retornado pela chamada ao método anterior.

Observe que esses métodos de transformação não obtêm dados do banco de dados Snowflake. (Os métodos de ação descritos em Como executar uma ação para avaliar um DataFrame realizam a recuperação de dados). Os métodos de transformação simplesmente especificam como a instrução SQL deve ser construída.

Junção de DataFrames

Para juntar objetos DataFrame, chame o método join:

>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> # Create a DataFrame that joins the two DataFrames
>>> # on the column named "key".
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].as_("key"), "value1", "value2").show()
-------------------------------
|"KEY"  |"VALUE1"  |"VALUE2"  |
-------------------------------
|a      |1         |3         |
|b      |2         |4         |
-------------------------------


>>> # Both dataframes have the same column "key", the following is more convenient.
>>> df_lhs.join(df_rhs, ["key"]).show()
-------------------------------
|"KEY"  |"VALUE1"  |"VALUE2"  |
-------------------------------
|a      |1         |3         |
|b      |2         |4         |
-------------------------------


>>> # Use & operator connect join expression. '|' and ~ are similar.
>>> df_joined_multi_column = df_lhs.join(df_rhs, (df_lhs.col("key") == df_rhs.col("key")) & (df_lhs.col("value1") < df_rhs.col("value2"))).select(df_lhs["key"].as_("key"), "value1", "value2")
>>> df_joined_multi_column.show()
-------------------------------
|"KEY"  |"VALUE1"  |"VALUE2"  |
-------------------------------
|a      |1         |3         |
|b      |2         |4         |
-------------------------------


>>> # copy the DataFrame if you want to do a self-join
>>> from copy import copy
>>> df_lhs_copied = copy(df_lhs)
>>> df_self_joined = df_lhs.join(df_lhs_copied, (df_lhs.col("key") == df_lhs_copied.col("key")) & (df_lhs.col("value1") == df_lhs_copied.col("value1")))

Observe que quando há colunas sobrepostas nos Dataframes, o Snowpark irá anexar um prefixo gerado aleatoriamente para as colunas no resultado da junção:

>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).show()  
-----------------------------------------------------
|"l_av5t_KEY"  |"VALUE1"  |"r_1p6k_KEY"  |"VALUE2"  |
-----------------------------------------------------
|a             |1         |a             |3         |
|b             |2         |b             |4         |
-----------------------------------------------------

Você pode referenciar as colunas sobrepostas usando Column.alias:

>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].alias("key1"), df_rhs["key"].alias("key2"), "value1", "value2").show()
-----------------------------------------
|"KEY1"  |"KEY2"  |"VALUE1"  |"VALUE2"  |
-----------------------------------------
|a       |a       |1         |3         |
|b       |b       |2         |4         |
-----------------------------------------

Para evitar prefixos aleatórios, você poderia especificar um sufixo para anexar às colunas sobrepostas:

>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"), lsuffix="_left", rsuffix="_right").show()
--------------------------------------------------
|"KEY_LEFT"  |"VALUE1"  |"KEY_RIGHT"  |"VALUE2"  |
--------------------------------------------------
|a           |1         |a            |3         |
|b           |2         |b            |4         |
--------------------------------------------------

Observe que estes exemplos usam DataFrame.col para especificar as colunas a serem usadas na junção. Consulte Como especificar colunas e expressões para mais maneiras de fazer isto.

Se você precisar unir uma tabela consigo mesma em diferentes colunas, você não pode realizar a autojunção com um único DataFrame. Os exemplos a seguir que usam um único DataFrame para realizar uma autojunção falham porque as expressões da coluna para "id" estão presentes nos lados esquerdo e direito da junção:

>>> from snowflake.snowpark.exceptions import SnowparkJoinException

>>> df = session.table("sample_product_data")
>>> # This fails because columns named "id" and "parent_id"
>>> # are in the left and right DataFrames in the join.
>>> try:
...     df_joined = df.join(df, col("id") == col("parent_id")) # fails
... except SnowparkJoinException as e:
...     print(e.message)
You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy.

>>> # This fails because columns named "id" and "parent_id"
>>> # are in the left and right DataFrames in the join.
>>> try:
...     df_joined = df.join(df, df["id"] == df["parent_id"])   # fails
... except SnowparkJoinException as e:
...     print(e.message)
You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy.

Em vez disso, use o método clone copy() incluído com o Python para criar um clone do objeto DataFrame, e use os dois objetos DataFrame para realizar a junção:

>>> from copy import copy

>>> # Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
>>> df_lhs = session.table("sample_product_data")
>>> # Clone the DataFrame object to use as the right-hand side of the join.
>>> df_rhs = copy(df_lhs)

>>> # Create a DataFrame that joins the two DataFrames
>>> # for the "sample_product_data" table on the
>>> # "id" and "parent_id" columns.
>>> df_joined = df_lhs.join(df_rhs, df_lhs.col("id") == df_rhs.col("parent_id"))
>>> df_joined.count()
8

Como especificar colunas e expressões

Ao chamar esses métodos de transformação, pode ser necessário especificar colunas ou expressões que utilizam colunas. Por exemplo, ao chamar o método select, você precisa especificar as colunas que devem ser selecionadas.

Para se referir a uma coluna, crie um objeto Column chamando a função col no módulo snowflake.snowpark.functions.

>>> # Import the col function from the functions module.
>>> from snowflake.snowpark.functions import col

>>> df_product_info = session.table("sample_product_data").select(col("id"), col("name"))
>>> df_product_info.show()
---------------------
|"ID"  |"NAME"      |
---------------------
|1     |Product 1   |
|2     |Product 1A  |
|3     |Product 1B  |
|4     |Product 2   |
|5     |Product 2A  |
|6     |Product 2B  |
|7     |Product 3   |
|8     |Product 3A  |
|9     |Product 3B  |
|10    |Product 4   |
---------------------

Nota

Para criar um objeto Column para um literal, consulte Como usar literais como objetos de coluna.

Ao especificar um filtro, projeção, condição de junção, etc., você pode usar objetos Column em uma expressão. Por exemplo:

  • Você pode usar objetos Column com o método filter para especificar uma condição de filtro:

    >>> # Specify the equivalent of "WHERE id = 20"
    >>> # in a SQL SELECT statement.
    >>> df_filtered = df.filter(col("id") == 20)
    
    >>> df = session.create_dataframe([[1, 3], [2, 10]], schema=["a", "b"])
    >>> # Specify the equivalent of "WHERE a + b < 10"
    >>> # in a SQL SELECT statement.
    >>> df_filtered = df.filter((col("a") + col("b")) < 10)
    >>> df_filtered.show()
    -------------
    |"A"  |"B"  |
    -------------
    |1    |3    |
    -------------
    
  • Você pode usar objetos Column com o método select para definir um alias:

    >>> # Specify the equivalent of "SELECT b * 10 AS c"
    >>> # in a SQL SELECT statement.
    >>> df_selected = df.select((col("b") * 10).as_("c"))
    >>> df_selected.show()
    -------
    |"C"  |
    -------
    |30   |
    |100  |
    -------
    
  • Você pode usar objetos Column com o método join para definir uma condição de junção:

    >>> dfX = session.create_dataframe([[1], [2]], schema=["a_in_X"])
    >>> dfY = session.create_dataframe([[1], [3]], schema=["b_in_Y"])
    >>> # Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y"
    >>> # in a SQL SELECT statement.
    >>> df_joined = dfX.join(dfY, col("a_in_X") == col("b_in_Y")).select(dfX["a_in_X"].alias("the_joined_column"))
    >>> df_joined.show()
    -----------------------
    |"THE_JOINED_COLUMN"  |
    -----------------------
    |1                    |
    -----------------------
    

Ao se referir a colunas em dois objetos DataFrame diferentes que têm o mesmo nome (por exemplo, ao unir os DataFrames naquela coluna), você pode usar o método DataFrame.col em um objeto DataFrame para se referir a uma coluna naquele objeto (por exemplo, df1.col("name") e df2.col("name")).

O exemplo a seguir demonstra como usar o método DataFrame.col para se referir a uma coluna em um DataFrame específico. O exemplo une dois objetos DataFrame, ambos contendo uma coluna chamada key. O exemplo usa o método Column.as para mudar os nomes das colunas no DataFrame recém-criado.

>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value"])
>>> # Create a DataFrame that joins two other DataFrames (df_lhs and df_rhs).
>>> # Use the DataFrame.col method to refer to the columns used in the join.
>>> df_joined = df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs.col("key").as_("key"), df_lhs.col("value").as_("L"), df_rhs.col("value").as_("R"))
>>> df_joined.show()
---------------------
|"KEY"  |"L"  |"R"  |
---------------------
|a      |1    |3    |
|b      |2    |4    |
---------------------

Como utilizar aspas duplas em torno de identificadores de objetos (nomes de tabela, nomes de coluna, etc.)

Os nomes de bancos de dados, esquemas, tabelas e estágios que você especificar devem estar em conformidade com requisitos de identificadores do Snowflake.

Crie uma tabela sem colunas que diferenciam maiúsculas de minúsculas.

>>> session.sql("""
... create or replace temp table "10tablename"(
... id123 varchar, -- case insensitive because it's not quoted.
... "3rdID" varchar, -- case sensitive.
... "id with space" varchar -- case sensitive.
... )""").collect()
[Row(status='Table 10tablename successfully created.')]
>>> session.sql("""insert into "10tablename" (id123, "3rdID", "id with space") values ('a', 'b', 'c')""").collect()
[Row(number of rows inserted=1)]
>>> df = session.table('"10tablename"')
>>> df.show()
---------------------------------------
|"ID123"  |"3rdID"  |"id with space"  |
---------------------------------------
|a        |b        |c                |
---------------------------------------

Quando você especifica um nome, o Snowflake considera que o nome está em maiúsculas. Por exemplo, as chamadas a seguir são equivalentes:

>>> # The following calls are equivalent:
>>> df.select(col("id123")).collect()
[Row(ID123='a')]
>>> df.select(col("ID123")).collect()
[Row(ID123='a')]

Se o nome não estiver de acordo com os requisitos de identificador, deve-se usar aspas duplas (") em torno do nome. Use uma barra invertida (\) para escapar o caractere de aspas duplas dentro de uma cadeia de caracteres literal. Por exemplo, como o nome da tabela a seguir não começa com uma letra ou um sublinhado, você deve usar aspas duplas em torno do nome:

>>> df = session.table("\"10tablename\"")

Alternativamente, você pode usar aspas simples em vez de barras invertidas para escapar o caractere de aspas duplas dentro de uma cadeia de caracteres literal.

>>> df = session.table('"10tablename"')

Observe que ao especificar o nome de uma coluna, você não precisa usar aspas duplas em torno do nome. A biblioteca do Snowpark coloca automaticamente o nome da coluna entre aspas duplas caso o nome não cumpra com os requisitos de identificador:

>>> # The following calls are equivalent:
>>> df.select(col("3rdID")).collect()
[Row(3rdID='b')]
>>> df.select(col("\"3rdID\"")).collect()
[Row(3rdID='b')]

>>> # The following calls are equivalent:
>>> df.select(col("id with space")).collect()
[Row(id with space='c')]
>>> df.select(col("\"id with space\"")).collect()
[Row(id with space='c')]

Se você já tiver acrescentado aspas duplas em torno de um nome de coluna, a biblioteca não inserirá aspas duplas adicionais em torno do nome.

Em alguns casos, o nome da coluna pode conter caracteres de aspas duplas:

>>> session.sql('''
... create or replace temp table quoted(
... "name_with_""air""_quotes" varchar,
... """column_name_quoted""" varchar
... )''').collect()
[Row(status='Table QUOTED successfully created.')]
>>> session.sql('''insert into quoted ("name_with_""air""_quotes", """column_name_quoted""") values ('a', 'b')''').collect()
[Row(number of rows inserted=1)]

Como explicado em Requisitos para identificadores, para cada caractere de aspas duplas dentro de um identificador de aspas duplas, deve-se usar dois caracteres de aspas duplas (por exemplo, "name_with_""air""_quotes" e """column_name_quoted"""):

>>> df_table = session.table("quoted")
>>> df_table.select("\"name_with_\"\"air\"\"_quotes\"").collect()
[Row(name_with_"air"_quotes='a')]
>>> df_table.select("\"\"\"column_name_quoted\"\"\"").collect()
[Row("column_name_quoted"='b')]

Tenha em mente que quando um identificador é incluído entre aspas duplas (se você adicionou explicitamente as aspas ou se a biblioteca adicionou as aspas para você), o Snowflake trata o identificador diferenciando maiúsculas de minúsculas:

>>> # The following calls are NOT equivalent!
>>> # The Snowpark library adds double quotes around the column name,
>>> # which makes Snowflake treat the column name as case-sensitive.
>>> df.select(col("id with space")).collect()
[Row(id with space='c')]

>>> from snowflake.snowpark.exceptions import SnowparkSQLException
>>> try:
...     df.select(col("ID WITH SPACE")).collect()
... except SnowparkSQLException as e:
...     print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 7
invalid identifier '"ID WITH SPACE"'

Como usar literais como objetos de coluna

Para usar um literal em um método que toma um objeto Column como argumento, crie um objeto Column para o literal passando o literal para a função lit no módulo snowflake.snowpark.functions. Por exemplo:

>>> # Import for the lit and col functions.
>>> from snowflake.snowpark.functions import col, lit

>>> # Show the first 10 rows in which num_items is greater than 5.
>>> # Use `lit(5)` to create a Column object for the literal 5.
>>> df_filtered = df.filter(col("num_items") > lit(5))

Como converter um objeto de coluna para um tipo específico

Para converter um objeto Column a um tipo específico, chame o método cast e passe um objeto do módulo snowflake.snowpark.types. Por exemplo, para converter um literal para um NUMBER com uma precisão de 5 e escala de 2:

>>> # Import for the lit function.
>>> from snowflake.snowpark.functions import lit

>>> # Import for the DecimalType class.
>>> from snowflake.snowpark.types import DecimalType

>>> decimal_value = lit(0.05).cast(DecimalType(5,2))

Como encadear chamadas a métodos

Como cada método que transforma um objeto DataFrame returna um novo objeto DataFrame que tem a transformação aplicada, você pode encadear chamadas a métodos para produzir um novo DataFrame que é transformado de maneiras adicionais.

O exemplo a seguir retorna um DataFrame que está configurado para:

  • Consultar a tabela sample_product_data.

  • Retornar a linha com id = 1.

  • Selecionar as colunas name e serial_number.

    >>> df_product_info = session.table("sample_product_data").filter(col("id") == 1).select(col("name"), col("serial_number"))
    >>> df_product_info.show()
    -------------------------------
    |"NAME"     |"SERIAL_NUMBER"  |
    -------------------------------
    |Product 1  |prod-1           |
    -------------------------------
    

Neste exemplo:

  • session.table("sample_product_data") retorna um DataFrame para a tabela sample_product_data.

    Embora o DataFrame ainda não contenha os dados da tabela, o objeto contém as definições das colunas da tabela.

  • filter(col("id") == 1) retorna um DataFrame para a tabela sample_product_data que está preparada para retornar a linha com id = 1.

    Observe novamente que o DataFrame ainda não contém a linha correspondente da tabela. A linha correspondente não é obtida até que você chame um método de ação.

  • select(col("name"), col("serial_number")) retorna um DataFrame que contém as colunas name e serial_number para a linha da tabela sample_product_data que tem id = 1.

Quando você fizer chamadas a métodos encadeadas, tenha em mente que a ordem das chamadas é importante. Cada chamada de método retorna um DataFrame que foi transformado. Certifique-se de que as chamadas subsequentes funcionem com os DataFrame transformados.

Por exemplo, no código abaixo, o método select retorna um DataFrame que contém apenas duas colunas: name e serial_number. A chamada ao método filter nesse DataFrame falha porque usa a coluna id, que não está no DataFrame transformado.

>>> # This fails with the error "invalid identifier 'ID'."
>>> df_product_info = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") == 1)
>>> try:
...   df_product_info.show()
... except SnowparkSQLException as e:
...   print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 121
invalid identifier 'ID'

Em contraste, o seguinte código é executado com sucesso porque o método filter() é chamado em um DataFrame que contém todas as colunas da tabela sample_product_data (incluindo a coluna id):

>>> # This succeeds because the DataFrame returned by the table() method
>>> # includes the "id" column.
>>> df_product_info = session.table("sample_product_data").filter(col("id") == 1).select(col("name"), col("serial_number"))
>>> df_product_info.show()
-------------------------------
|"NAME"     |"SERIAL_NUMBER"  |
-------------------------------
|Product 1  |prod-1           |
-------------------------------

Tenha em mente que talvez seja necessário fazer as chamadas ao método select e filter em uma ordem diferente daquela em que você usaria as palavras-chave equivalentes (SELECT e WHERE) em uma instrução SQL.

Como obter definições das colunas

Para obter a definição das colunas no conjunto de dados para o DataFrame, chame a propriedade schema. Esse método retorna um objeto StructType que contém uma list de StructField objetos. Cada objeto StructField contém a definição de uma coluna.

>>> # Get the StructType object that describes the columns in the
>>> # underlying rowset.
>>> table_schema = session.table("sample_product_data").schema
>>> table_schema
StructType([StructField('ID', LongType(), nullable=True), StructField('PARENT_ID', LongType(), nullable=True), StructField('CATEGORY_ID', LongType(), nullable=True), StructField('NAME', StringType(), nullable=True), StructField('SERIAL_NUMBER', StringType(), nullable=True), StructField('KEY', LongType(), nullable=True), StructField('"3rd"', LongType(), nullable=True)])

No objeto devolvido StructType, os nomes das colunas são sempre normalizados. Identificadores sem aspas são retornados em maiúsculas, e identificadores entre aspas são devolvidos na caixa em que foram definidos.

O exemplo a seguir cria um DataFrame contendo as colunas ID e 3rd. Para o nome de coluna 3rd, a biblioteca do Snowpark coloca o nome automaticamente entre aspas duplas ("3rd") porque o nome não cumpre os requisitos para um identificador.

O exemplo chama a propriedade schema e depois chama a propriedade names no objeto StructType retornado para obter uma list de nomes de coluna. Os nomes são normalizados no StructType retornado pela propriedade schema.

>>> # Create a DataFrame containing the "id" and "3rd" columns.
>>> df_selected_columns = session.table("sample_product_data").select(col("id"), col("3rd"))
>>> # Print out the names of the columns in the schema. This prints out:
>>> # This prints List["ID", "\"3rd\""]
>>> df_selected_columns.schema.names
['ID', '"3rd"']

Como executar uma ação para avaliar um DataFrame

Como mencionado anteriormente, o DataFrame é avaliado lentamente, o que significa que a instrução SQL não é enviada ao servidor para execução até que você execute uma ação. Uma ação faz com que o DataFrame seja avaliado e envia a instrução SQL correspondente para o servidor para execução.

Os seguintes métodos executam uma ação:

Classe

Método

Descrição

DataFrame

collect

Avalia o DataFrame e retorna o conjunto de dados resultante como uma list de objetos Row.

DataFrame

count

Avalia o DataFrame e retorna o número de linhas.

DataFrame

show

Avalia o DataFrame e imprime as linhas no console. Observe que esse método limita o número de linhas a 10 (por padrão).

DataFrameWriter

save_as_table

Armazena os dados no DataFrame para a tabela especificada. Consulte Como salvar dados em uma tabela.

Por exemplo, para executar uma consulta em uma tabela e retornar os resultados, chame o método collect:

>>> # Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
>>> # This does not execute the query.
>>> df = session.table("sample_product_data").select(col("id"), col("name"))

>>> # Send the query to the server for execution and
>>> # return a list of Rows containing the results.
>>> results = df.collect()

Para executar a consulta e retornar o número de resultados, chame o método count:

>>> # Create a DataFrame for the "sample_product_data" table.
>>> df_products = session.table("sample_product_data")

>>> # Send the query to the server for execution and
>>> # print the count of rows in the table.
>>> print(df_products.count())
12

Para executar uma consulta e imprimir os resultados para o console, chame o método show:

>>> # Create a DataFrame for the "sample_product_data" table.
>>> df_products = session.table("sample_product_data")

>>> # Send the query to the server for execution and
>>> # print the results to the console.
>>> # The query limits the number of rows to 10 by default.
>>> df_products.show()
-------------------------------------------------------------------------------------
|"ID"  |"PARENT_ID"  |"CATEGORY_ID"  |"NAME"      |"SERIAL_NUMBER"  |"KEY"  |"3rd"  |
-------------------------------------------------------------------------------------
|1     |0            |5              |Product 1   |prod-1           |1      |10     |
|2     |1            |5              |Product 1A  |prod-1-A         |1      |20     |
|3     |1            |5              |Product 1B  |prod-1-B         |1      |30     |
|4     |0            |10             |Product 2   |prod-2           |2      |40     |
|5     |4            |10             |Product 2A  |prod-2-A         |2      |50     |
|6     |4            |10             |Product 2B  |prod-2-B         |2      |60     |
|7     |0            |20             |Product 3   |prod-3           |3      |70     |
|8     |7            |20             |Product 3A  |prod-3-A         |3      |80     |
|9     |7            |20             |Product 3B  |prod-3-B         |3      |90     |
|10    |0            |50             |Product 4   |prod-4           |4      |100    |
-------------------------------------------------------------------------------------


>>> # Limit the number of rows to 20, rather than 10.
>>> df_products.show(20)
-------------------------------------------------------------------------------------
|"ID"  |"PARENT_ID"  |"CATEGORY_ID"  |"NAME"      |"SERIAL_NUMBER"  |"KEY"  |"3rd"  |
-------------------------------------------------------------------------------------
|1     |0            |5              |Product 1   |prod-1           |1      |10     |
|2     |1            |5              |Product 1A  |prod-1-A         |1      |20     |
|3     |1            |5              |Product 1B  |prod-1-B         |1      |30     |
|4     |0            |10             |Product 2   |prod-2           |2      |40     |
|5     |4            |10             |Product 2A  |prod-2-A         |2      |50     |
|6     |4            |10             |Product 2B  |prod-2-B         |2      |60     |
|7     |0            |20             |Product 3   |prod-3           |3      |70     |
|8     |7            |20             |Product 3A  |prod-3-A         |3      |80     |
|9     |7            |20             |Product 3B  |prod-3-B         |3      |90     |
|10    |0            |50             |Product 4   |prod-4           |4      |100    |
|11    |10           |50             |Product 4A  |prod-4-A         |4      |100    |
|12    |10           |50             |Product 4B  |prod-4-B         |4      |100    |
-------------------------------------------------------------------------------------

Nota: Se você estiver chamando a propriedade schema para obter as definições das colunas no DataFrame, você não precisa chamar um método de ação.

Como retornar o conteúdo de um DataFrame como um DataFrame do Pandas

Para retornar o conteúdo de um DataFrame como um DataFrame do Pandas, use o método to_pandas.

Por exemplo:

>>> python_df = session.create_dataframe(["a", "b", "c"])
>>> pandas_df = python_df.to_pandas()

Como salvar dados em uma tabela

Para salvar o conteúdo de um DataFrame em uma tabela:

  1. Chame a propriedade write para obter um objeto DataFrameWriter.

  2. Chame o método mode no objeto DataFrameWriter e especifique se você deseja inserir linhas ou atualizar linhas na tabela. Esse método retorna um novo objeto DataFrameWriter que é configurado com o modo especificado.

  3. Chame o método save_as_table no objeto DataFrameWriter para salvar o conteúdo do DataFrame em uma tabela especificada.

Observe que não é necessário chamar um método separado (por exemplo, collect) para executar a instrução SQL que salva os dados na tabela.

Por exemplo:

>>> df.write.mode("overwrite").save_as_table("table1")

Como criar uma exibição a partir de um DataFrame

Para criar uma exibição a partir de um DataFrame, chame o método create_or_replace_view, que imediatamente cria a nova exibição:

>>> import os
>>> database = os.environ["snowflake_database"]  # use your own database and schema
>>> schema = os.environ["snowflake_schema"]
>>> view_name = "my_view"
>>> df.create_or_replace_view(f"{database}.{schema}.{view_name}")
[Row(status='View MY_VIEW successfully created.')]

As exibições que você cria ao chamar create_or_replace_view são persistentes. Se você não precisar mais dessa exibição, você pode excluir a exibição manualmente.

Alternativamente, use o método create_or_replace_temp_view, que cria uma exibição temporária. A exibição temporária só fica disponível na sessão em que ela é criada.

Como trabalhar com arquivos em um estágio

Esta seção explica como consultar dados em um arquivo em um estágio do Snowflake. Para outras operações em arquivos, use instruções SQL.

Para consultar dados em arquivos em um estágio do Snowflake, use a classe DataFrameReader:

  1. Chame o método read na classe Session para acessar um objeto DataFrameReader.

  2. Se os arquivos estiverem no formato CSV, descreva os campos no arquivo. Para fazer isso:

    1. Crie um objeto StructType que consiste em uma list de objetos StructField descrevendo os campos no arquivo.

    2. Para cada objeto StructField, especifique o seguinte:

      • O nome do campo.

      • O tipo de dados do campo (especificado como um objeto no módulo snowflake.snowpark.types).

      • Se o campo é ou não anulável.

      Por exemplo:

      >>> from snowflake.snowpark.types import *
      
      >>> schema_for_data_file = StructType([
      ...                          StructField("id", StringType()),
      ...                          StructField("name", StringType())
      ...                       ])
      
    3. Chame a propriedade schema no objeto DataFrameReader, passando o objeto StructType.

      Por exemplo:

      >>> df_reader = session.read.schema(schema_for_data_file)
      

      A propriedade schema retorna um objeto DataFrameReader que é configurado para ler arquivos contendo os campos especificados.

      Note que você não precisa fazer isso para arquivos em outros formatos (como JSON). Para esses arquivos, o DataFrameReader trata os dados como um único campo do tipo VARIANT com o nome de campo $1.

  3. Se você precisar especificar informações adicionais sobre como os dados devem ser lidos (por exemplo, que os dados estão comprimidos ou que um arquivo CSV usa um ponto-e-vírgula em vez de uma vírgula para delimitar campos), chame os métodos option ou options do objeto DataFrameReader.

    O método option obtém um nome e um valor da opção que você deseja definir e permite combinar múltiplas chamadas encadeadas, enquanto o método options obtém um dicionário dos nomes das opções e seus valores correspondentes.

    Para os nomes e valores das opções de formato do arquivo, veja a documentação documentação sobre CREATE FILE FORMAT.

    Você também pode definir as opções de cópia descritas na documentação COPY INTO TABLE. Observe que a definição de opções de cópia pode resultar em uma estratégia de execução mais cara quando você obtém os dados para o DataFrame.

    O exemplo a seguir configura o objeto DataFrameReader para consultar dados em um arquivo CSV que não é compactado e que usa um ponto-e-vírgula como delimitador de campo.

    >>> df_reader = df_reader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    

    Os métodos option e options retornam um objeto DataFrameReader que é configurado com as opções especificadas.

  4. Chame o método correspondente ao formato do arquivo (por exemplo, o método csv) passando o local do arquivo.

    >>> df = df_reader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
    

    Os métodos correspondentes ao formato de um arquivo retornam um objeto DataFrame que é configurado para manter os dados nesse arquivo.

  5. Use os métodos do objeto DataFrame para realizar quaisquer transformações necessárias no conjunto de dados (por exemplo, selecionar campos específicos, filtrar linhas, etc.).

    Por exemplo, para extrair o elemento color de um arquivo JSON no estágio chamado my_stage:

    >>> # Import the sql_expr function from the functions module.
    >>> from snowflake.snowpark.functions import sql_expr
    
    >>> df = session.read.json("@my_stage").select(sql_expr("$1:color"))
    

    Como explicado anteriormente, para arquivos em formatos diferentes de CSV (por exemplo JSON), o DataFrameReader trata os dados no arquivo como uma única coluna VARIANT com o nome $1.

    Esse exemplo usa a função sql_expr no módulo snowflake.snowpark.functions para especificar o caminho para o elemento color.

    Observe que a função sql_expr não interpreta ou modifica o argumento de entrada. A função só permite formar expressões e trechos em SQL que ainda não são suportados pela API do Snowpark.

  6. Chame um método de ação para consultar os dados no arquivo.

    Como é o caso de DataFrames para tabelas, os dados não são obtidos no DataFrame até que você chame um método de ação.

Como executar instruções SQL

Para executar uma instrução SQL que você especificar, chame o método sql na classe Session e passe a instrução a ser executada. O método retorna um DataFrame.

Note que a instrução SQL não será executada até que você chame um método de ação.

>>> # Get the list of the files in a stage.
>>> # The collect() method causes this SQL statement to be executed.
>>> session.sql("create or replace temp stage my_stage").collect()
[Row(status='Stage area MY_STAGE successfully created.')]
>>> stage_files_df = session.sql("ls @my_stage").collect()
>>> # Resume the operation of a warehouse.
>>> # Note that you must call the collect method in order to execute
>>> # the SQL statement.
>>> session.sql("alter warehouse if exists my_warehouse resume if suspended").collect()
[Row(status='Statement executed successfully.')]

>>> # Set up a SQL statement to copy data from a stage to a table.
>>> session.sql("copy into sample_product_data from @my_stage file_format=(type = csv)").collect()
[Row(status='Copy executed with 0 files processed.')]

Se você quiser chamar métodos para transformar o DataFrame (por exemplo, filter, select, etc.), observe que esses métodos só funcionam se a instrução SQL subjacente for uma instrução SELECT. Os métodos de transformação não são suportados para outros tipos de instruções SQL.

>>> df = session.sql("select id, parent_id from sample_product_data where id < 10")
>>> # Because the underlying SQL statement for the DataFrame is a SELECT statement,
>>> # you can call the filter method to transform this DataFrame.
>>> results = df.filter(col("id") < 3).select(col("id")).collect()

>>> # In this example, the underlying SQL statement is not a SELECT statement.
>>> df = session.sql("ls @my_stage")
>>> # Calling the filter method results in an error.
>>> try:
...   df.filter(col("size") > 50).collect()
... except SnowparkSQLException as e:
...   print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 104
invalid identifier 'SIZE'