Exemplos Snowpark Submit

Este tópico inclui exemplos que usam Snowpark Submit para enviar aplicativos Spark prontos para produção.

Escrever e enviar um aplicativo Spark simples

O exemplo a seguir mostra como escrever e enviar um aplicativo Spark simples sem dependências.

  1. Em seu IDE local, crie um novo arquivo Python chamado app.py com o seguinte conteúdo:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, lit, upper, concat
    
    # Create Spark session
    spark = SparkSession.builder.appName("SimpleSession").getOrCreate()
    
    # Create a DataFrame from inline data
    data = [
        (1, "alice", "engineering", 95000),
        (2, "bob", "marketing", 72000),
        (3, "carol", "engineering", 105000),
        (4, "david", "sales", 68000),
        (5, "eva", "engineering", 88000),
    ]
    df = spark.createDataFrame(data, ["id", "name", "department", "salary"])
    
    # Add a new column
    df_with_bonus = df.withColumn("bonus", col("salary") * 0.1)
    df_with_bonus.show()
    
    # Filter and transform
    engineers = df.filter(col("department") == "engineering") \
        .withColumn("name_upper", upper(col("name"))) \
        .withColumn("greeting", concat(lit("Hello, "), col("name")))
    engineers.show()
    
    # Aggregate
    df.groupBy("department").avg("salary").show()
    
    # Stop the Spark session
    spark.stop()
    
    Copy
  2. Para enviar o aplicativo, use o seguinte comando:

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --snowflake-connection-name MY_CONNECTION \
      /path/to/app.py
    
    Copy

    Você pode usar a opção --wait-for-completion para aguardar a conclusão da tarefa, --workload-status para verificar o status da tarefa e --display-logs para exibir os logs da tarefa. Para obter uma lista completa de opções, consulte Referência de Snowpark Submit.

Como implementar um aplicativo de um estágio do Snowflake

Se o aplicativo tiver dependências, como arquivos que ele precisa ler, você pode implantá-las a partir de uma área de preparação do Snowflake. O exemplo a seguir mostra como implantar um aplicativo e as dependências dele a partir de uma área de preparação do Snowflake.

  1. Para carregar arquivos para uma área de preparação a partir do terminal, você pode usar o Snowflake CLI. Observe que o SnowSQL é a CLI legada e, se você já o estiver usando, também poderá usá-lo para carregar arquivos para uma área de preparação. Se você ainda não instalou o Snowflake CLI, pode instalá-lo seguindo as instruções em Instalação do Snowflake CLI.

  2. Crie um novo arquivo CSV em seu IDE local chamado sample_employees.csv com o seguinte conteúdo:

    employee_id,name,department,salary,years_employed
    1,Alice Johnson,Engineering,95000,5
    2,Bob Smith,Marketing,72000,3
    3,Carol Williams,Engineering,105000,8
    4,David Brown,Sales,68000,2
    5,Eva Martinez,Engineering,88000,4
    6,Frank Wilson,Marketing,75000,6
    7,Grace Lee,Sales,92000,7
    8,Henry Taylor,Engineering,110000,10
    9,Ivy Chen,Marketing,65000,1
    10,Jack Davis,Sales,78000,4
    11,Karen White,Engineering,98000,6
    12,Leo Harris,Marketing,71000,3
    13,Maria Garcia,Sales,85000,5
    14,Nathan Clark,Engineering,102000,9
    15,Olivia Moore,Marketing,69000,2
    
    Copy

    Carregue os seus arquivos de dependência para uma área de preparação usando o seguinte comando, em que my_stage é o nome de uma área de preparação em sua conta. (Se você não tiver uma área de preparação criada, poderá usar [snow stage create](/developer-guide/snowflake-cli/command-reference/stage-commands/create).)

    snow stage copy sample_employees.csv @<database>.<schema>.<stage>/sample_employees.csv -c MY_CONNECTION
    
    Copy

    Para verificar se o arquivo foi carregado com sucesso, use o seguinte comando para listar os arquivos na área de preparação:

    snow sql -c MY_CONNECTION -q "ls @<database>.<schema>.<stage>"
    
    Copy

    Você deverá ver o arquivo sample_employees.csv na lista.

  3. Em seu IDE local, crie um novo arquivo Python chamado app.py com o seguinte conteúdo:

    from pyspark.sql import SparkSession
    
    # Create Spark session
    spark = SparkSession.builder.appName("SimpleStageExample").getOrCreate()
    
    # Load data from stage (adjust stage name to match yours)
    df = spark.read.csv("/app/<YOUR_STAGE>/sample_employees.csv", header=True, inferSchema=True)
    df.show()
    
    # Filter: Engineering department only
    engineers = df.filter(df["department"] == "Engineering")
    engineers.show()
    
    # Filter: Salary > 80000 and years_employed > 3
    senior_high_earners = df.filter((df["salary"] > 80000) & (df["years_employed"] > 3))
    senior_high_earners.show()
    
    # Aggregate: Average salary by department
    df.groupBy("department").avg("salary").show()
    
    # Select specific columns
    result = senior_high_earners.select("name", "department", "salary")
    result.show()
    
    # Stop the Spark session
    spark.stop()
    
    Copy

    Para enviar o aplicativo que usa os arquivos que você carregou para a área de preparação, use o seguinte comando:

    snowpark-submit \
      --snowflake-connection-name MY_CONNECTION \
      --snowflake-workload-name MY_JOB \
      --snowflake-stage @<database>.<schema>.<stage> \
      /path/to/app.py
    
    Copy

    Observe que um pool de computação é necessário para executar o aplicativo e deve ser especificado no arquivo connections.toml ou na linha de comando usando a opção --compute-pool. Para obter mais informações, consulte Referência de Snowpark Submit.

Monitoramento com espera e logs

O exemplo a seguir mostra como enviar um trabalho, aguardar sua conclusão e depois recuperar os logs.

  1. Envie o trabalho e aguarde a conclusão usando o seguinte comando:

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --wait-for-completion \
      --snowflake-connection-name MY_CONNECTION \
      /path/to/app.py
    
    Copy
  2. Se o trabalho falhar, verifique os logs detalhados usando o seguinte comando:

    snowpark-submit
      --snowflake-workload-name MY_JOB \
      --workload-status \
      --display-logs \
      --snowflake-connection-name MY_CONNECTION
    
    Copy

Usar o Snowpark Submit em um Apache Airflow DAG

Você pode enviar um trabalho do Spark ao Snowflake via Snowpark Connect for Spark. Você pode usar o :comando:`snowpark-submit` no modo de cluster para aproveitar um pool de computação e executar o trabalho.

Ao usar o Apache Airflow dessa forma, certifique-se de que o serviço Docker ou o contêiner do Snowpark Container Services que executa o Apache Airflow tenha acesso adequado ao Snowflake e aos arquivos necessários no estágio do Snowflake.

O código no exemplo a seguir executa as seguintes tarefas:

  • Cria um ambiente virtual Python em /tmp/myenv.

    Na tarefa create_venv, o código usa pip para instalar o pacote snowpark-submit usando um arquivo .whl.

  • Gera um arquivo connections.toml seguro com credenciais de conexão do Snowflake e um token OAuth.

    Na tarefa create_connections_toml, o código cria o diretório /app/.snowflake, cria o arquivo .toml e depois altera as permissões do arquivo para permitir que somente o proprietário (usuário) tenha acesso de leitura e gravação.

  • Executa um trabalho Spark usando o comando:snowpark-submit comando.

    Na tarefa run_snowpark_script, o código faz o seguinte:

    • Ativa o ambiente virtual.

    • Executa o trabalho Spark usando o comando:snowpark-submit comando.

    • Implanta no Snowflake usando o modo de cluster.

    • Usa a Snowpark Connect for Spark remoto URI s://localhost:15002.

    • Especifica a classe do aplicativo Spark org.example.SnowparkConnectApp.

    • Extrai o script do estágio @snowflake_stage.

    • Bloqueia a implantação até que o trabalho termine usando --wait-for-completion.

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


default_args = {
  'start_date': airflow.utils.dates.days_ago(1),
  'retries': 0,
}


with DAG(
  'run_sparkconnect_python_script',
  default_args=default_args,
  schedule_interval=None,
  catchup=False,
) as dag:

  create_venv = BashOperator(
      task_id='create_venv',
      bash_command="""
      python3 -m venv /tmp/myenv &&
      source /tmp/myenv/bin/activate &&
      export PIP_USER=false &&
      pip install --upgrade pip &&
      pip install --no-cache-dir grpcio-tools>=1.48.1 &&
      pip install /app/snowpark_submit-<version>.whl
      """
  )

  create_connections_toml = BashOperator(
      task_id='create_connections_toml',
      bash_command="""
      mkdir -p /app/.snowflake
      echo "${SNOWFLAKE_USER}"
      cat <<EOF > /app/.snowflake/connections.toml

[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
  chmod 600 /app/.snowflake/connections.toml
  """
  )

  run_script = BashOperator(
      task_id='run_snowpark_script',
      bash_command="""
      set -e
      echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"

      echo "Running Python script with Snowpark..."
      source /tmp/myenv/bin/activate &&
      snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
      """,
      env={
          'SNOWFLAKE_HOME': '/app/.snowflake'
      }
  )

create_venv >> create_connections_toml >> run_script
Copy

Você pode monitorar o DAG usando a exibição em gráfico ou a exibição em árvore da interface do usuário do Apache Airflow. Inspecione os logs de tarefas para os seguintes itens:

Você também pode monitorar os trabalhos executados no Snowflake por meio dos logs armazenados no estágio Snowflake ou das tabelas de eventos.