Snowpark Connect for Spark package reference

This page documents the public Python API of the snowpark-connect package. It covers server startup, session initialization, and Snowflake-specific session helpers.

Install the package from PyPI:

pip install snowpark-connect

Package exports

The snowflake.snowpark_connect package exports the following symbols:

ExportDescription
init_spark_sessionStart the server and return a ready-to-use SparkSession. This is the most common entry point.
start_sessionStart the gRPC server without creating a client session.
get_sessionReturn a SparkSession from an already-running server.
skip_session_configurationSkip the automatic ALTER SESSION SET parameter bundle that Snowpark Connect for Spark applies at startup.

Server lifecycle

init_spark_session

Initialize and return a SparkSession connected to Snowflake. This is the most common entry point. It starts the Snowpark Connect for Spark server (if it isn’t already running) and returns a ready-to-use session.

def init_spark_session(
    conf: SparkConf = None,
    connection_parameters: dict[str, str] | None = None,
    app_name: str | None = None,
) -> SparkSession
ParameterTypeDefaultDescription
confSparkConfNoneOptional Spark configuration object.
connection_parametersdictNoneConnection parameters for the Snowpark session (for example, connection_name, account, user, password, host, warehouse, database, schema). If not provided, the connection resolver determines which connection to use from connections.toml. Not supported inside snowpark-submit jobs.
app_namestrNoneApplication name for the Snowflake session. If not provided, a default is derived from the caller’s filename and a timestamp.

Returns: SparkSession connected to Snowflake.

start_session

Start the Snowpark Connect for Spark gRPC server. This is a no-op if the server is already running. Use this when you need to control the server lifecycle separately from session creation, for example when running a long-lived server process for Scala clients.

def start_session(
    is_daemon: bool = True,
    remote_url: str | None = None,
    tcp_port: int | None = None,
    unix_domain_socket: str | None = None,
    stop_event: threading.Event = None,
    snowpark_session: snowpark.Session | None = None,
    connection_parameters: dict[str, str] | None = None,
    max_grpc_message_size: int = 134217728,
    app_name: str | None = None,
) -> threading.Thread | None
ParameterTypeDefaultDescription
is_daemonboolTrueWhen True, the server shuts down automatically when the main program exits. Set to False for a standalone, long-running server.
remote_urlstrNoneA sc:// URL to start the server on. Mutually exclusive with tcp_port and unix_domain_socket.
tcp_portintNoneTCP port for the gRPC server. Mutually exclusive with remote_url and unix_domain_socket.
unix_domain_socketstrNonePath to a Unix domain socket for the gRPC server. Mutually exclusive with remote_url and tcp_port.
stop_eventthreading.EventNoneWhen set() is called, the server shuts down. Only works when is_daemon=True.
snowpark_sessionsnowpark.SessionNoneAn existing Snowpark session to reuse (for example, from a stored procedure environment). Can’t be used together with connection_parameters.
connection_parametersdictNoneConnection parameters for creating a Snowpark session. Can’t be used together with snowpark_session.
max_grpc_message_sizeint134217728Maximum gRPC message size in bytes (default 128 MiB).
app_namestrNoneApplication name registered with the Snowflake session.

Returns: threading.Thread when is_daemon=True, or None when is_daemon=False (blocks until the server stops).

get_session

Return a SparkSession connected to a running Snowpark Connect for Spark server. The server must already be started via start_session or init_spark_session.

def get_session(
    url: str | None = None,
    conf: SparkConf = None,
) -> SparkSession
ParameterTypeDefaultDescription
urlstrNoneSpark Connect server URL. Uses the default server URL if not provided.
confSparkConfNoneOptional Spark configuration object.

Returns: SparkSession

Raises: RuntimeError if the server hasn’t been started.

execute_jar

Run a Java or Scala JAR inside the Snowpark Connect for Spark server process. This function manages the full lifecycle: it sets up the classpath, starts the server and JVM, executes the JAR’s main class, and shuts everything down when the application finishes.

from snowflake.snowpark_connect.server import execute_jar

execute_jar(
    jar_path: str,
    main_class: str,
    jar_args: list[str] | None = None,
    additional_jars: list[str] | None = None,
    tcp_port: int | None = None,
    jvm_options: list[str] | None = None,
) -> None
ParameterTypeDefaultDescription
jar_pathstr(required)Path to the application JAR file.
main_classstr(required)Fully qualified class name (for example, com.example.MyApp).
jar_argslist[str]NoneArguments forwarded to the application’s main method.
additional_jarslist[str]NoneDependency JARs or globs added to the classpath (for example, ["/path/to/gson.jar", "/path/to/lib/*.jar"]).
tcp_portintNonegRPC server port (defaults to 15002).
jvm_optionslist[str]NoneJVM flags (for example, ["-Xmx4g", "-Xms1g"]).

Note

execute_jar isn’t exported from the top-level package. Import it directly from snowflake.snowpark_connect.server.

skip_session_configuration

Control whether Snowpark Connect for Spark runs ALTER SESSION SET for its standard parameter bundle at startup. When set to True, you’re responsible for setting the required session parameters manually. This is useful in restricted environments such as some Native App stored procedures.

def skip_session_configuration(skip: bool) -> None

SnowflakeSession

The SnowflakeSession class wraps a SparkSession to provide Snowflake SQL pass-through and helper methods for switching database, schema, role, and warehouse. Use it when you need to run Snowflake-specific SQL that Spark’s parser doesn’t support.

from snowflake.snowpark_connect.snowflake_session import SnowflakeSession

Constructor

sf = SnowflakeSession(spark_session: SparkSession)
ParameterTypeDescription
spark_sessionSparkSessionThe Spark Connect session to wrap.

sql

Execute Snowflake-specific SQL directly against Snowflake. This bypasses the Spark SQL parser and sends the statement directly to Snowflake, allowing Snowflake-specific syntax that Spark doesn’t support.

sf.sql(sql_stmt: str) -> DataFrame
ParameterTypeDescription
sql_stmtstrThe Snowflake SQL statement to execute.

Returns: pyspark.sql.DataFrame

use_database

Switch the active database for the Snowflake session.

sf.use_database(database: str, preserve_case: bool = False) -> DataFrame

use_schema

Switch the active schema for the Snowflake session.

sf.use_schema(schema: str, preserve_case: bool = False) -> DataFrame

use_role

Switch the active role for the Snowflake session.

sf.use_role(role: str, preserve_case: bool = False) -> DataFrame

use_warehouse

Switch the active warehouse for the Snowflake session.

sf.use_warehouse(warehouse: str, preserve_case: bool = False) -> DataFrame

All four use_* methods accept a preserve_case parameter. When set to True, the identifier is wrapped in double quotes to preserve its original casing. By default, Snowflake uppercases unquoted identifiers.

Examples

Minimal setup

If your ~/.snowflake/connections.toml has a default connection configured, no parameters are needed:

from snowflake.snowpark_connect import init_spark_session

spark = init_spark_session()
df = spark.sql("SELECT 1 AS value")
df.show()

Connecting with explicit credentials

from snowflake.snowpark_connect import init_spark_session

spark = init_spark_session(connection_parameters={
    "account": "myaccount",
    "user": "myuser",
    "password": "mypassword",
    "warehouse": "my_wh",
    "database": "my_db",
    "schema": "my_schema",
})

Connecting with a named connection

from snowflake.snowpark_connect import init_spark_session

spark = init_spark_session(connection_parameters={
    "connection_name": "my_dev_connection",
})

Setting an application name

The app name is registered as a query tag in Snowflake (Spark-Connect-App-Name=my-etl-pipeline), making it easy to identify queries in the query history.

from snowflake.snowpark_connect import init_spark_session

spark = init_spark_session(app_name="my-etl-pipeline")

Passing Spark configuration

from pyspark import SparkConf
from snowflake.snowpark_connect import init_spark_session

conf = SparkConf()
conf.set("spark.sql.session.timeZone", "UTC")
conf.set("spark.sql.ansi.enabled", "true")

spark = init_spark_session(conf=conf)

Separate server and session lifecycle

Use this pattern when you want the server to outlive individual sessions, or when multiple sessions share the same server:

from snowflake.snowpark_connect import start_session, get_session

start_session(is_daemon=True, tcp_port=15002)

spark = get_session()
spark.sql("SELECT current_timestamp()").show()

Long-running standalone server

This blocks the calling thread and is useful for running the server as a service:

from snowflake.snowpark_connect import start_session

start_session(is_daemon=False, tcp_port=15002)

Graceful shutdown with stop_event

import threading
from snowflake.snowpark_connect import start_session, get_session

stop = threading.Event()
start_session(is_daemon=True, tcp_port=15002, stop_event=stop)

spark = get_session()
spark.sql("SELECT 1").show()

stop.set()

Using an existing Snowpark session

In stored procedure environments where a Snowpark session is already available:

from snowflake.snowpark_connect import start_session, get_session

start_session(snowpark_session=existing_snowpark_session)
spark = get_session()

Snowflake SQL pass-through

Execute Snowflake-specific SQL that Spark’s parser doesn’t support:

from snowflake.snowpark_connect import init_spark_session
from snowflake.snowpark_connect.snowflake_session import SnowflakeSession

spark = init_spark_session()
sf = SnowflakeSession(spark)

sf.sql("CREATE OR REPLACE TABLE test_table (id INT, name STRING)")
sf.sql("SHOW TABLES IN SCHEMA public").show()
sf.sql("DESCRIBE TABLE test_table").show()

Switching database, schema, role, and warehouse

from snowflake.snowpark_connect import init_spark_session
from snowflake.snowpark_connect.snowflake_session import SnowflakeSession

spark = init_spark_session()
sf = SnowflakeSession(spark)

sf.use_database("analytics")
sf.use_schema("public")
sf.use_role("analyst_role")
sf.use_warehouse("compute_wh")

spark.sql("SELECT * FROM my_table").show()

Preserving case in identifiers

Snowflake uppercases unquoted identifiers by default. Use preserve_case=True to wrap names in double quotes:

sf.use_database("MyMixedCaseDB", preserve_case=True)
sf.use_schema("camelCaseSchema", preserve_case=True)

Running a Java or Scala JAR

from snowflake.snowpark_connect.server import execute_jar

execute_jar(
    jar_path="my-spark-app.jar",
    main_class="com.example.MySparkApp",
    jar_args=["--input", "@my_stage/data", "--output", "@my_stage/results"],
    additional_jars=["/path/to/deps/*.jar"],
    jvm_options=["-Xmx4g", "-Xms1g"],
)

Skipping automatic session configuration

Use this when you need full control over the Snowflake session parameters:

from snowflake.snowpark_connect import skip_session_configuration, init_spark_session

skip_session_configuration(True)
spark = init_spark_session()