Executar cargas de trabalho do Spark a partir de VS Code, Jupyter Notebooks ou um terminal.

Você pode executar cargas de trabalho do Spark interativamente por meio de notebooks Jupyter, VS Code ou qualquer interface baseada em Python sem precisar gerenciar um cluster Spark. As cargas de trabalho são executadas na infraestrutura do Snowflake.

Por exemplo, você pode executar as seguintes tarefas:

  1. Confirme que você tem os pré-requisitos.

  2. Configuração de seu ambiente para se conectar com Snowpark Connect for Spark no Snowflake.

  3. Instale Snowpark Connect for Spark.

  4. Execute código PySpark de seu cliente para executar no Snowflake.

Pré-requisitos

Confirme se suas instalações do Python e do Java são baseadas na mesma arquitetura de computador. Por exemplo, se Python for baseado em ARM64, Java também deverá ser ARM64 (e não x86_64, por exemplo).

Configure seu ambiente

Você pode configurar seu ambiente de desenvolvimento garantindo que seu código possa se conectar ao Snowpark Connect for Spark no Snowflake. Para conectar-se ao código do cliente Snowflake, você usará um arquivo .toml contendo detalhes da conexão.

Se você tiver Snowflake CLI instalado, você pode usá-lo para definir uma conexão. Caso contrário, você pode escrever manualmente os parâmetros de conexão em um arquivo config.toml.

Adicione uma conexão usando Snowflake CLI

Você pode usar Snowflake CLI para adicionar propriedades de conexão que Snowpark Connect for Spark pode usar para se conectar ao Snowflake. Suas alterações são salvas em um arquivo config.toml.

  1. Execute o seguinte comando para adicionar uma conexão usando o comando de conexão snow: comado add.

    snow connection add
    
    Copy
  2. Siga as instruções para definir uma conexão.

    Certifique-se de especificar spark-connect como o nome da conexão.

    Este comando adiciona uma conexão ao seu arquivo config.toml, como no exemplo a seguir:

    [connections.spark-connect]
    host = "example.snowflakecomputing.com"
    port = 443
    account = "example"
    user = "test_example"
    password = "password"
    protocol = "https"
    warehouse = "example_wh"
    database = "example_db"
    schema = "public"
    
    Copy
  3. Execute o seguinte comando para confirmar que a conexão funciona.

    Você poderá testar a conexão dessa maneira quando a adicionar usando Snowflake CLI.

    snow connection list
    snow connection test --connection spark-connect
    
    Copy

Adição de uma conexão gravando manualmente um arquivo de conexão

Você pode escrever ou atualizar manualmente um arquivo connections.toml para que seu código possa se conectar ao Snowpark Connect for Spark no Snowflake.

  1. Execute o seguinte comando para garantir que seu arquivo connections.toml permita que somente o proprietário (usuário) tenha acesso de leitura e gravação.

    chmod 0600 "~/.snowflake/connections.toml"
    
    Copy
  2. Edite o arquivo connections.toml para que ele contenha uma conexão [spark-connect] com as propriedades de conexão no exemplo a seguir.

    Certifique-se de substituir os valores por suas próprias especificações de conexão.

    [spark-connect]
    host="my_snowflake_account.snowflakecomputing.com"
    account="my_snowflake_account"
    user="my_user"
    password="&&&&&&&&"
    warehouse="my_wh"
    database="my_db"
    schema="public"
    
    Copy

Instalação do Snowpark Connect for Spark

Você pode instalar Snowpark Connect for Spark como um pacote Python.

  1. Crie um ambiente virtual Python.

    Confirme se a sua versão do Python é 3.10 ou posterior e anterior a 3.13 executando python3 --version.

    python3 -m venv .venv
    source .venv/bin/activate
    
    Copy
  2. Instale o pacote Snowpark Connect for Spark.

    pip install --upgrade --force-reinstall 'snowpark-connect[jdk]'
    
    Copy
  3. Adicione o código Python para iniciar um Snowpark Connect for Spark e crie uma sessão de Snowpark Connect for Spark.

    from snowflake import snowflake.snowpark_connect
    
    # Import snowpark_connect *before* importing pyspark libraries
    from pyspark.sql.types import Row
    
    spark = snowflake.snowpark_connect.server.init_spark_session()
    
    Copy

Execução de código Python do seu cliente

Quando você já tem uma conexão autenticada, pode escrever um código como de costume.

Você pode executar o código PySpark que se conecta a Snowpark Connect for Spark usando a biblioteca de cliente PySpark.

from pyspark.sql import Row

  df = spark.createDataFrame([
      Row(a=1, b=2.),
      Row(a=2, b=3.),
      Row(a=4, b=5.),])

  print(df.count())
Copy

Executar código Scala a partir do seu cliente

Você pode executar aplicativos Scala que se conectam ao Snowpark Connect for Spark usando a biblioteca cliente do Spark Connect.

Este guia orienta você na configuração do Snowpark Connect e na conexão de seus aplicativos Scala ao servidor Snowpark Connect for Spark.

Etapa 1: Configurar seu ambiente Snowpark Connect for Spark

Configure seu ambiente usando as etapas descritas nos tópicos a seguir:

  1. Crie um ambiente virtual Python e instale o Snowpark Connect.

  2. Configure uma conexão.

Etapa 2: Criar um script de servidor Snowpark Connect for Spark e iniciar o servidor

  1. Criar um script Python para iniciar o servidor Snowpark Connect for Spark.

    # launch-snowpark-connect.py
    
    from snowflake import snowpark_connect
    
    def main():
        snowpark_connect.start_session(is_daemon=False, remote_url="sc://localhost:15002")
        print("SAS started on port 15002")
    
    if __name__ == "__main__":
        main()
    
    Copy
  2. Inicie o servidor Snowpark Connect for Spark.

    # Make sure you're in the correct Python environment
    pyenv activate your-snowpark-connect-env
    
    # Run the server script
    python launch-snowpark-connect.py
    
    Copy

Etapa 3: Configurar seu aplicativo Scala

  1. Adicione a dependência do cliente Spark Connect ao seu arquivo build.sbt.

    libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.6"
    
    // Add JVM options for Java 9+ module system compatibility
    javaOptions ++= Seq(
      "--add-opens=java.base/java.nio=ALL-UNNAMED"
    )
    
    Copy
  2. Execute o código do Scala para conectar-se ao servidor Snowpark Connect for Spark.

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    object SnowparkConnectExample {
      def main(args: Array[String]): Unit = {
        // Create Spark session with Snowpark Connect
        val spark = SparkSession.builder().remote("sc://localhost:15002").getOrCreate()
    
        // Register ClassFinder for UDF support (if needed)
        // val classFinder = new REPLClassDirMonitor("target/scala-2.12/classes")
        // spark.registerClassFinder(classFinder)
    
        try {
          // Simple DataFrame operations
          import spark.implicits._
    
          val data = Seq(
            (1, "Alice", 25),
            (2, "Bob", 30),
            (3, "Charlie", 35)
          )
    
          val df = spark.createDataFrame(data).toDF("id", "name", "age")
    
          println("Original DataFrame:")
          df.show()
    
          println("Filtered DataFrame (age > 28):")
          df.filter($"age" > 28).show()
    
          println("Aggregated result:")
          df.groupBy().avg("age").show()
    
        } finally {
          spark.stop()
        }
      }
    }
    
    Copy
  3. Compile e execute o aplicativo.

    # Compile your Scala application
    sbt compile
    
    # Run the application
    sbt "runMain SnowparkConnectExample"
    
    Copy

Compatibilidade com UDF do Scala no Snowpark Connect for Spark

Ao usar funções definidas pelo usuário ou código personalizado, siga um destes procedimentos:

  • Registre um localizador de classes para monitorar e enviar arquivos de classe.

    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    val classFinder = new REPLClassDirMonitor("/absolute/path/to/target/scala-2.12/classes")
    spark.registerClassFinder(classFinder)
    
    Copy
  • Carregue as dependências de JAR, se necessário. Você poderá incluir o próprio JAR da carga de trabalho se um localizador de classe não for usado.

    spark.addArtifact("/absolute/path/to/dependency.jar")
    
    Copy
  • Use um JAR preparado.

    spark.conf.set("snowpark.connect.udf.java.imports", "[@mystage/dependency.jar, @db.schema.stage/other_dependency.jar]")
    
    Copy

Usando o Scala 2.13

Por padrão, o Snowpark Connect for Spark usa o Scala 2.12. As cargas de trabalho criadas com o Scala 2.13 devem especificar a versão do Scala usando a opção de configuração «snowpark.connect.scala.version».

// Directly in the session builder
val spark = SparkSession.builder()
  .remote("sc://localhost:15002")
  .config("snowpark.connect.scala.version", "2.13")
  .getOrCreate()

// Or via session configuration
spark.conf.set("snowpark.connect.scala.version", "2.13")
Copy

Solução de problemas de instalação de Snowpark Connect for Spark

Com a lista de verificações a seguir, você pode solucionar problemas de instalação e de uso do Snowpark Connect for Spark.

  • Certifique-se de que Java e Python sejam baseados na mesma arquitetura.

  • Use o arquivo do pacote Snowpark Connect for Spark mais recente conforme descrito em Instalação do Snowpark Connect for Spark.

  • Confirme que o comando :com:`Python` Comando com o código PySpark está funcionando corretamente para execução local, ou seja, sem conectividade do Snowflake.

    Por exemplo, execute um comando como o seguinte:

    python your_pyspark_file.py
    
    Copy

Clientes de código aberto

É possível usar softwares de código aberto padrão (OSS) e pacotes de clientes Spark (como PySpark e clientes Spark para Java ou Scala) em seus ambientes locais preferidos, incluindo Jupyter Notebooks e VS Code. Dessa forma, você pode evitar a instalação de pacotes específicos do Snowflake.

Isso pode ser útil se você quiser escrever código Spark localmente e usar os recursos de computação e governança corporativa do Snowflake. Neste cenário, você realiza autenticação e autorização por meio de tokens de acesso programáticos (PATs).

As seções a seguir abordam a instalação, a configuração e a autenticação. Você também encontrará um exemplo simples de PySpark para validar sua conexão.

Etapa 1: Instalar os pacotes necessários

  • Instale pyspark. Você não precisa instalar nenhum pacote Snowflake.

    pip install "pyspark[connect]>=3.5.0,<4"
    
    Copy

Etapa 2: Configuração e autenticação

  1. Gere um token de acesso programático (PAT).

    Para obter mais informações, consulte os seguintes tópicos:

    O exemplo a seguir adiciona um PAT chamado TEST_PAT para o usuário sysadmin e define a expiração para 30 dias.

    ALTER USER add PAT TEST_PAT ROLE_RESTRICTION = sysadmin DAYS_TO_EXPIRY = 30;
    
    Copy
  2. Encontre o URL do host do Snowflake Spark Connect.

    Execute o seguinte SQL no Snowflake para encontrar o nome do host da sua conta:

    SELECT t.VALUE:type::VARCHAR as type,
           t.VALUE:host::VARCHAR as host,
           t.VALUE:port as port
      FROM TABLE(FLATTEN(input => PARSE_JSON(SYSTEM$ALLOWLIST()))) AS t where type = 'SNOWPARK_CONNECT';
    
    Copy

Etapa 3: Conectar-se ao servidor Spark Connect

  • Para se conectar ao servidor Spark Connect, use um código como o seguinte:

    from pyspark.sql import SparkSession
    import urllib.parse
    
    # Replace with your actual PAT.
    pat = urllib.parse.quote("<pat>", safe="")
    
    # Replace with your Snowpark Connect host from the above SQL query.
    snowpark_connect_host = ""
    
    # Define database/schema/warehouse for executing your Spark session in Snowflake (recommended); otherwise, it will be resolved from your default_namespace and default_warehouse
    
    db_name = urllib.parse.quote("TESTDB", safe="")
    schema_name = urllib.parse.quote("TESTSCHEMA", safe="")
    warehouse_name = urllib.parse.quote("TESTWH", safe="")
    
    spark = SparkSession.builder.remote(f"sc://{snowpark_connect_host}/;token={pat};token_type=PAT;database={db_name};schema={schema_name};warehouse={warehouse_name}").getOrCreate()
    
    # Spark session is ready to use. You can write regular Spark DataFrame code, as in the following example:
    
    from pyspark.sql import Row
    
    df = spark.createDataFrame([
        Row(a=1, b=2.),
        Row(a=2, b=3.),
        Row(a=4, b=5.),])
    print(df.count())
    
    Copy