Referência rápida: APIs do Snowpark Scala para comandos SQL

Este tópico fornece uma referência rápida de algumas das APIs do Snowpark que correspondem a comandos SQL.

(Note que esta não é uma lista completa das APIs que correspondem a comandos SQL).

Neste tópico:

Como realizar consultas

Como selecionar colunas

Para selecionar colunas específicas, use DataFrame.select.

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

SELECT id, name FROM sample_product_data;
Copy
val dfSelectedCols = df.select(col("id"), col("name"))
dfSelectedCols.show()
Copy

Como renomear colunas

Para renomear uma coluna, use Column.as, Column.alias ou Column.name.

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

SELECT id AS item_id FROM sample_product_data;
Copy
val dfRenamedCol = df.select(col("id").as("item_id"))
dfRenamedCol.show()
Copy
val dfRenamedCol = df.select(col("id").alias("item_id"))
dfRenamedCol.show()
Copy
val dfRenamedCol = df.select(col("id").name("item_id"))
dfRenamedCol.show()
Copy

Como filtrar dados

Para filtrar dados, use DataFrame.filter ou DataFrame.where.

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

SELECT * FROM sample_product_data WHERE id = 1;
Copy
val dfFilteredRows = df.filter((col("id") === 1))
dfFilteredRows.show()
Copy
val dfFilteredRows = df.where((col("id") === 1))
dfFilteredRows.show()
Copy

Como ordenar os dados

Para ordenar os dados, use DataFrame.sort.

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

SELECT * FROM sample_product_data ORDER BY category_id;
Copy
val dfSorted = df.sort(col("category_id"))
dfSorted.show()
Copy

Como limitar o número de linhas retornadas

Para limitar o número de linhas retornadas, use DataFrame.limit. Consulte Como limitar o número de linhas em um DataFrame.

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

SELECT * FROM sample_product_data
  ORDER BY category_id LIMIT 2;
Copy
val dfSorted = df.sort(col("category_id")).limit(2);
val arrayRows = dfSorted.collect()
Copy

Como realizar junções

Para realizar uma junção, use DataFrame.join ou DataFrame.naturalJoin. Consulte Junção de DataFrames.

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

SELECT * FROM sample_a
  INNER JOIN sample_b
  on sample_a.id_a = sample_b.id_a;
Copy
val dfJoined =
  dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
Copy
SELECT * FROM sample_a NATURAL JOIN sample_b;
Copy
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
Copy

Consulta de dados semiestruturados

Para percorrer dados semiestruturados, use Column.apply(“<field_name>”) e Column.apply(<index>). Consulte Como trabalhar com dados semiestruturados.

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

SELECT src:salesperson.name FROM car_sales;
Copy
dfJsonField =
  df.select(col("src")("salesperson")("name"))
dfJsonField.show()
Copy

Agrupamento e agregação de dados

Para agrupar dados, use DataFrame.groupBy. Isso retorna um objeto RelationalGroupedDataFrame que você pode usar para realizar as agregações.

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

SELECT category_id, count(*)
  FROM sample_product_data GROUP BY category_id;
Copy
val dfCountPerCategory = df.groupBy(col("category")).count()
dfCountPerCategory.show()
Copy

Como chamar funções de janela

Para chamar uma função de janela, use os métodos do objeto Window para construir um objeto WindowSpec, que por sua vez você pode usar para funções de janela (semelhante ao uso de “<function> OVER … PARTITION BY … ORDER BY”).

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

SELECT category_id, price_date, SUM(amount) OVER
  (PARTITION BY category_id ORDER BY price_date)
  FROM prices ORDER BY price_date;
Copy
val window = Window.partitionBy(
  col("category")).orderBy(col("price_date"))
val dfCumulativePrices = dfPrices.select(
  col("category"), col("price_date"),
  sum(col("amount")).over(window)).sort(col("price_date"))
dfCumulativePrices.show()
Copy

Atualização, exclusão e fusão de linhas

Para atualizar, excluir e fundir linhas em uma tabela, use Updatable. Consulte Atualização, eliminação e fusão de linhas em uma tabela.

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

UPDATE sample_product_data
  SET serial_number = 'xyz' WHERE id = 12;
Copy
val updateResult =
  updatableDf.update(
    Map("serial_number" -> lit("xyz")),
    col("id") === 12)
Copy
DELETE FROM sample_product_data
  WHERE category_id = 50;
Copy
val deleteResult =
  updatableDf.delete(updatableDf("category_id") === 50)
Copy
MERGE  INTO target_table USING source_table
  ON target_table.id = source_table.id
  WHEN MATCHED THEN
    UPDATE SET target_table.description =
      source_table.description;
Copy
val mergeResult =
   target.merge(source, target("id") === source("id"))
  .whenMatched.update(Map("description" -> source("description")))
  .collect()
Copy

Como trabalhar com estágios

Para obter mais informações sobre como trabalhar com estágios, consulte Como trabalhar com arquivos em um estágio.

Como carregar e baixar arquivos de um estágio

Para carregar e baixar arquivos de um estágio, use FileOperation. Consulte Carregamento e descarregamento de arquivos em um estágio.

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

PUT file:///tmp/*.csv @myStage OVERWRITE = TRUE;
Copy
val putOptions = Map("OVERWRITE" -> "TRUE")
val putResults = session.file.put(
  "file:///tmp/*.csv", "@myStage", putOptions)
Copy
GET @myStage file:///tmp PATTERN = '.*.csv.gz';
Copy
val getOptions = Map("PATTERN" -> s"'.*.csv.gz'")
val getResults = session.file.get(
 "@myStage", "file:///tmp", getOptions)
Copy

Como ler dados de arquivos em um estágio

Para ler dados de arquivos em um estágio, use o DataFrameReader para criar um DataFrame para os dados. Consulte Como configurar um DataFrame para arquivos em um estágio.

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

CREATE FILE FORMAT snowpark_temp_format TYPE = JSON;
SELECT "$1"[0]['salesperson']['name'] FROM (
  SELECT $1::VARIANT AS "$1" FROM @mystage/car_sales.json(
    FILE_FORMAT => 'snowpark_temp_format')) LIMIT 10;
DROP FILE FORMAT snowpark_temp_format;
Copy
val df = session.read.json(
  "@mystage/car_sales.json").select(
    col("$1")(0)("salesperson")("name"))
df.show();
Copy

Como copiar dados de arquivos em um estágio para uma tabela

Para copiar dados de arquivos em uma estágio para uma tabela, use DataFrameReader para criar um CopyableDataFrame para os dados e use o método CopyableDataFrame.copyInto para copiar os dados para a tabela. Consulte Como copiar dados de arquivos para uma tabela.

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

COPY INTO new_car_sales
  FROM @mystage/car_sales.json
  FILE_FORMAT = (TYPE = JSON);
Copy
val dfCopyableDf = session.read.json("@mystage/car_sales.json")
dfCopyableDf.copyInto("new_car_sales")
Copy

Como salvar um DataFrame para arquivos em um estágio

Para salvar um DataFrame em arquivos em um estágio, use o método DataFrameWriter nomeado com o formato dos arquivos que você deseja usar. Consulte Como salvar um DataFrame para arquivos em um estágio.

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

COPY INTO @mystage/saved_data.json
  FROM (  SELECT  *  FROM (car_sales) )
  FILE_FORMAT = ( TYPE = JSON COMPRESSION = 'none' )
  OVERWRITE = TRUE
  DETAILED_OUTPUT = TRUE
Copy
val df = session.table("car_sales")
val writeFileResult = df.write.mode(
  SaveMode.Overwrite).option(
  "DETAILED_OUTPUT", "TRUE").option(
  "compression", "none").json(
  "@mystage/saved_data.json")
Copy

Como criar e chamar funções definidas pelo usuário (UDFs)

Para criar uma função de Scala que serve como UDF (uma UDF anônima), use udf.

Para criar uma UDF temporária ou permanente que você pode chamar pelo nome, use UDFRegistration.registerTemporary ou UDFRegistration.registerPermanent.

Para chamar uma UDF permanente pelo nome, use callUDF.

Para obter mais detalhes, consulte Criação de funções definidas pelo usuário (UDFs) para DataFrames no Scala e Como chamar funções definidas pelo usuário escalares (UDFs).

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

CREATE FUNCTION <temp_function_name>
  RETURNS INT
  LANGUAGE JAVA
  ...
  AS
  ...;

SELECT ...,
  <temp_function_name>(amount) AS doublenum
  FROM sample_product_data;
Copy
val doubleUdf = udf((x: Int) => x + x)
val dfWithDoubleNum = df.withColumn(
 "doubleNum", doubleUdf(col("amount")))
dfWithDoubleNum.show()
Copy
CREATE FUNCTION <temp_function_name>
  RETURNS INT
  LANGUAGE JAVA
  ...
  AS
  ...;

SELECT ...,
  <temp_function_name>(amount) AS doublenum
  FROM sample_product_data;
Copy
session.udf.registerTemporary(
  "doubleUdf", (x: Int) => x + x)
val dfWithDoubleNum = df.withColumn(
 "doubleNum", callUDF("doubleUdf", (col("amount"))))
dfWithDoubleNum.show()
Copy
CREATE FUNCTION doubleUdf(arg1 INT)
  RETURNS INT
  LANGUAGE JAVA
  ...
  AS
  ...;

SELECT ...,
  doubleUdf(amount) AS doublenum
  FROM sample_product_data;
Copy
session.udf.registerPermanent(
  "doubleUdf", (x: Int) => x + x, "mystage")
val dfWithDoubleNum = df.withColumn(
 "doubleNum", callUDF("doubleUdf", (col("amount"))))
dfWithDoubleNum.show()
Copy

Como criar e chamar procedimentos armazenados

Para obter um guia sobre como criar procedimentos armazenados com o Snowpark, consulte Criação de procedimentos armazenados para DataFrames em Scala.

Exemplo de uma instrução SQL

Exemplo de código do Snowpark

CREATE PROCEDURE <temp_procedure_name>(x INTEGER, y INTEGER)
  RETURNS INTEGER
  LANGUAGE JAVA
  ...
  AS
  $$
  BEGIN
   RETURN x + y;
  END
  $$
  ;

CALL <temp_procedure_name>(2, 3);
Copy
StoredProcedure sp =
  session.sproc().registerTemporary((Session session, Integer x, Integer y) -> x + y,
    new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType},
    DataTypes.IntegerType);

  session.storedProcedure(sp, 2, 3).show();
Copy
CREATE PROCEDURE sproc(x INTEGER, y INTEGER)
  RETURNS INTEGER
  LANGUAGE JAVA
  ...
  AS
  $$
  BEGIN
   RETURN x + y;
  END
  $$
  ;

CALL sproc(2, 3);
Copy
String name = "sproc";
StoredProcedure sp =
  session.sproc().registerTemporary(name,
    (Session session, Integer x, Integer y) -> x + y,
    new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType},
    DataTypes.IntegerType);

  session.storedProcedure(name, 2, 3).show();
Copy
CREATE PROCEDURE add_hundred(x INTEGER)
  RETURNS INTEGER
  LANGUAGE JAVA
  ...
  AS
  $$
  BEGIN
   RETURN x + 100;
  END
  $$
  ;

CALL add_hundred(3);
Copy
val name: String = "add_hundred"
val stageName: String = "sproc_libs"

val sp: StoredProcedure =
  session.sproc.registerPermanent(
    name,
    (session: Session, x: Int) => x + 100,
    stageName,
    true
  )

session.storedProcedure(name, 3).show
Copy