Exemplos Snowpark Submit

Este tópico inclui exemplos que usam Snowpark Submit para enviar aplicativos Spark prontos para produção.

Como implementar um aplicativo de um estágio do Snowflake

O exemplo a seguir mostra como implantar um aplicativo e suas dependências de um estágio do Snowflake.

  1. Carregue os arquivos do seu aplicativo para um estágio usando o seguinte comando:

    PUT file:///<abs_path>/app.py @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
    PUT file:///<abs_path>/dependencies.zip @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
    
    Copy
  2. Para enviar o trabalho usando os arquivos que você carregou no estágio, use o seguinte comando:

    snowpark-submit \
      --py-files @my_stage/dependencies.zip \
      --snowflake-stage @my_stage \
      --snowflake-workload-name MY_JOB \
      --snowflake-connection-name MY_CONNECTION\
      --compute-pool MY_COMPUTE_POOL \
      @my_stage/app.py
    
    Copy

Monitoramento com espera e logs

O exemplo a seguir mostra como enviar um trabalho, aguardar sua conclusão e depois recuperar os logs.

  1. Envie o trabalho e aguarde a conclusão usando o seguinte comando:

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --wait-for-completion \
      --snowflake-connection-name MY_CONNECTION \
      --compute-pool MY_COMPUTE_POOL \
      app.py
    
    Copy
  2. Se o trabalho falhar, verifique os logs detalhados usando o seguinte comando:

    snowpark-submit
      --snowflake-workload-name MY_JOB
      --workload-status
      --display-logs
      --snowflake-connection-name MY_CONNECTION
    
    Copy

Usar o Snowpark Submit em um Apache Airflow DAG

Você pode enviar um trabalho do Spark ao Snowflake via Snowpark Connect for Spark. Você pode usar o :comando:`snowpark-submit` no modo de cluster para aproveitar um pool de computação e executar o trabalho.

Ao usar o Apache Airflow dessa forma, certifique-se de que o serviço Docker ou o contêiner do Snowpark Container Services que executa o Apache Airflow tenha acesso adequado ao Snowflake e aos arquivos necessários no estágio do Snowflake.

O código no exemplo a seguir executa as seguintes tarefas:

  • Cria um ambiente virtual Python em /tmp/myenv.

    Na tarefa create_venv, o código usa pip para instalar o pacote snowpark-submit usando um arquivo .whl.

  • Gera um arquivo connections.toml seguro com credenciais de conexão do Snowflake e um token OAuth.

    Na tarefa create_connections_toml, o código cria o diretório /app/.snowflake, cria o arquivo .toml e depois altera as permissões do arquivo para permitir que somente o proprietário (usuário) tenha acesso de leitura e gravação.

  • Executa um trabalho Spark usando o comando:snowpark-submit comando.

    Na tarefa run_snowpark_script, o código faz o seguinte:

    • Ativa o ambiente virtual.

    • Executa o trabalho Spark usando o comando:snowpark-submit comando.

    • Implanta no Snowflake usando o modo de cluster.

    • Usa a Snowpark Connect for Spark remoto URI s://localhost:15002.

    • Especifica a classe do aplicativo Spark org.example.SnowparkConnectApp.

    • Extrai o script do estágio @snowflake_stage.

    • Bloqueia a implantação até que o trabalho termine usando --wait-for-completion.

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


default_args = {
  'start_date': airflow.utils.dates.days_ago(1),
  'retries': 0,
}


with DAG(
  'run_sparkconnect_python_script',
  default_args=default_args,
  schedule_interval=None,
  catchup=False,
) as dag:

  create_venv = BashOperator(
      task_id='create_venv',
      bash_command="""
      python3 -m venv /tmp/myenv &&
      source /tmp/myenv/bin/activate &&
      export PIP_USER=false &&
      pip install --upgrade pip &&
      pip install --no-cache-dir grpcio-tools>=1.48.1 &&
      pip install /app/snowpark_submit-<version>.whl
      """
  )

  create_connections_toml = BashOperator(
      task_id='create_connections_toml',
      bash_command="""
      mkdir -p /app/.snowflake
      echo "${SNOWFLAKE_USER}"
      cat <<EOF > /app/.snowflake/connections.toml

[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
  chmod 600 /app/.snowflake/connections.toml
  """
  )

  run_script = BashOperator(
      task_id='run_snowpark_script',
      bash_command="""
      set -e
      echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"

      echo "Running Python script with Snowpark..."
      source /tmp/myenv/bin/activate &&
      snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
      """,
      env={
          'SNOWFLAKE_HOME': '/app/.snowflake'
      }
  )

create_venv >> create_connections_toml >> run_script
Copy

Você pode monitorar o DAG usando a exibição em gráfico ou a exibição em árvore da interface do usuário do Apache Airflow. Inspecione os logs de tarefas para os seguintes itens:

Você também pode monitorar os trabalhos executados no Snowflake por meio dos logs armazenados no estágio Snowflake ou das tabelas de eventos.