Tutorial: Get started with Snowpark Connect for Spark¶
This tutorial walks you through a complete Snowpark Connect for Spark workflow using a local IDE. You’ll create a
source table, read data into a Spark DataFrame, apply transformations with user-defined functions,
save results to a table and a file, and verify the output using the SnowflakeSession class.
Choose the tab for your language in each step to follow along in Python, Java, or Scala.
Note
Each step builds on the previous one to form a single end-to-end example. Pick one language tab
and follow it through all steps. For Python, you can run each step individually in a REPL or
notebook. For Java or Scala, combine all steps into one Tutorial.java or Tutorial.scala
file before running.
Complete the
environment setup
for your language before starting this tutorial. You need a Snowflake account with access to
Snowpark Connect for Spark and a database and schema to work in.
The Java client for Snowpark Connect for Spark is a preview feature.
importcom.snowflake.snowpark_connect.client.SnowparkConnectSession;importcom.snowflake.snowpark_connect.client.SnowflakeSession;importorg.apache.spark.sql.*;importorg.apache.spark.sql.types.*;importorg.apache.spark.sql.connect.client.REPLClassDirMonitor;import staticorg.apache.spark.sql.functions.*;publicclassTutorial{publicstaticvoidmain(String[]args){SparkSessionspark=SnowparkConnectSession.builder().appName("Tutorial").getOrCreate();// Register a class finder so Spark can discover compiled UDF classesREPLClassDirMonitorclassFinder=newREPLClassDirMonitor("/path/to/target/classes");spark.registerClassFinder(classFinder);spark.sql("CREATE OR REPLACE TABLE sales (product STRING, region STRING, amount DOUBLE)");spark.sql("INSERT INTO sales VALUES "+"('Widget', 'north', 120.50), "+"('Gadget', 'south', 85.00), "+"('Widget', 'south', 200.75), "+"('Gizmo', 'north', 45.30), "+"('Gadget', 'north', 150.00), "+"('Gizmo', 'south', 60.00)");
Note
The Scala client for Snowpark Connect for Spark is a preview feature.
importcom.snowflake.snowpark_connect.client.{SnowparkConnectSession,SnowflakeSession}importorg.apache.spark.sql.functions._importorg.apache.spark.sql.types.DoubleTypeimportorg.apache.spark.sql.connect.client.REPLClassDirMonitorobjectTutorial{defmain(args:Array[String]):Unit={valspark=SnowparkConnectSession.builder().appName("Tutorial").getOrCreate()// Register a class finder so Spark can discover compiled UDF classesvalclassFinder=newREPLClassDirMonitor("/path/to/target/scala-2.12/classes")spark.registerClassFinder(classFinder)spark.sql("CREATE OR REPLACE TABLE sales (product STRING, region STRING, amount DOUBLE)")spark.sql("""INSERT INTO sales VALUES ('Widget', 'north', 120.50), ('Gadget', 'south', 85.00), ('Widget', 'south', 200.75), ('Gizmo', 'north', 45.30), ('Gadget', 'north', 150.00), ('Gizmo', 'south', 60.00)""")
Step 2: Read from the table and apply transformations¶
Read the data into a DataFrame, normalize the region column to uppercase, and apply a tiered
tax rate based on the sale amount. Orders under $50 are taxed at 5%, orders between $50 and $150
at 10%, and orders over $150 at 15%. This kind of multi-bracket business logic is a natural fit for
a UDF. The Python example registers a UDF, while Java and Scala use typed Dataset map operations.
The Java client for Snowpark Connect for Spark is a preview feature.
importorg.apache.spark.api.java.function.MapFunction;// POJO that mirrors the source table schema.// Encoders.bean() maps DataFrame columns to getter/setter pairs,// giving you a typed Dataset<Sale> instead of raw Row objects.publicstaticclassSaleimplementsjava.io.Serializable{privateStringproduct;privateStringregion;privatedoubleamount;publicSale(){}publicStringgetProduct(){returnproduct;}publicvoidsetProduct(Stringp){this.product=p;}publicStringgetRegion(){returnregion;}publicvoidsetRegion(Stringr){this.region=r;}publicdoublegetAmount(){returnamount;}publicvoidsetAmount(doublea){this.amount=a;}}// Output POJO with the computed column. A separate class lets the// encoder infer the result schema automatically from the bean fields.publicstaticclassSaleWithTaximplementsjava.io.Serializable{privateStringproduct;privateStringregion;privatedoubleamount;privatedoubleamount_with_tax;publicSaleWithTax(){}publicStringgetProduct(){returnproduct;}publicvoidsetProduct(Stringp){this.product=p;}publicStringgetRegion(){returnregion;}publicvoidsetRegion(Stringr){this.region=r;}publicdoublegetAmount(){returnamount;}publicvoidsetAmount(doublea){this.amount=a;}publicdoublegetAmount_with_tax(){returnamount_with_tax;}publicvoidsetAmount_with_tax(doublev){this.amount_with_tax=v;}}staticdoubleapplyTieredTax(doubleamount){doublerate=amount<50?0.05:amount<=150?0.10:0.15;returnMath.round(amount*(1+rate)*100.0)/100.0;}Dataset<Sale>sales=spark.table("sales").as(Encoders.bean(Sale.class));Dataset<Row>result=sales.map((MapFunction<Sale,SaleWithTax>)s->{SaleWithTaxout=newSaleWithTax();out.setProduct(s.getProduct());out.setRegion(s.getRegion().toUpperCase());out.setAmount(s.getAmount());out.setAmount_with_tax(applyTieredTax(s.getAmount()));returnout;},Encoders.bean(SaleWithTax.class)).toDF();result.show();
Note
The Scala client for Snowpark Connect for Spark is a preview feature.
Use SnowflakeSession to run a Snowflake-native aggregation query against the output table and
list the staged Parquet files written in Step 3. This verifies that both outputs were created and
demonstrates how to use SnowflakeSession for Snowflake-specific SQL.
sf=SnowflakeSession(spark)totals=sf.sql(""" SELECT product, SUM(amount_with_tax) AS total_with_tax, COUNT(*) AS num_sales FROM sales_with_tax GROUP BY product ORDER BY total_with_tax DESC""")totals.show()staged_files=sf.sql("LIST @~/tutorial_output/sales_with_tax")staged_files.show(truncate=False)spark.stop()
Note
The Java client for Snowpark Connect for Spark is a preview feature.
SnowflakeSessionsf=newSnowflakeSession(spark);Dataset<Row>totals=sf.sql("SELECT product, "+"SUM(amount_with_tax) AS total_with_tax, "+"COUNT(*) AS num_sales "+"FROM sales_with_tax "+"GROUP BY product "+"ORDER BY total_with_tax DESC");totals.show();Dataset<Row>stagedFiles=sf.sql("LIST @~/tutorial_output/sales_with_tax");stagedFiles.show(false);spark.stop();}}
Note
The Scala client for Snowpark Connect for Spark is a preview feature.
valsf=newSnowflakeSession(spark)valtotals=sf.sql("""SELECT product, | SUM(amount_with_tax) AS total_with_tax, | COUNT(*) AS num_sales |FROM sales_with_tax |GROUP BY product |ORDER BY total_with_tax DESC""".stripMargin)totals.show()valstagedFiles=sf.sql("LIST @~/tutorial_output/sales_with_tax")stagedFiles.show(truncate=false)spark.stop()}}
The staged file listing shows the Parquet files written to the stage:
+--------------------------------------------------------------------------------------------------------+----+--------------------------------+-----------------------------+|name |size|md5 |last_modified |+--------------------------------------------------------------------------------------------------------+----+--------------------------------+-----------------------------+|tutorial_output/sales_with_tax/part-00000-...-c000_0_2_0.snappy.parquet |976 |694064b6ca5a43c9... |Mon, 01 Jan 2024 00:00:00 GMT||tutorial_output/sales_with_tax/part-00000-...-c000_0_3_0.snappy.parquet |976 |a41ee22852932... |Mon, 01 Jan 2024 00:00:00 GMT||tutorial_output/sales_with_tax/part-00000-...-c000_0_4_0.snappy.parquet |976 |abdc796ac22cb... |Mon, 01 Jan 2024 00:00:00 GMT||tutorial_output/sales_with_tax/part-00000-...-c000_0_5_0.snappy.parquet |976 |6efec381920c9... |Mon, 01 Jan 2024 00:00:00 GMT||tutorial_output/sales_with_tax/part-00000-...-c000_0_6_0.snappy.parquet |976 |a63c1fabd8b41... |Mon, 01 Jan 2024 00:00:00 GMT||tutorial_output/sales_with_tax/part-00000-...-c000_0_7_0.snappy.parquet |976 |22825d9f598b4... |Mon, 01 Jan 2024 00:00:00 GMT|+--------------------------------------------------------------------------------------------------------+----+--------------------------------+-----------------------------+