Exemplos Snowpark Submit¶
Este tópico inclui exemplos que usam Snowpark Submit para enviar aplicativos Spark prontos para produção.
Escrever e enviar um aplicativo Spark simples¶
O exemplo a seguir mostra como escrever e enviar um aplicativo Spark simples sem dependências.
Em seu IDE local, crie um novo arquivo Python chamado
app.pycom o seguinte conteúdo:from pyspark.sql import SparkSession from pyspark.sql.functions import col, lit, upper, concat # Create Spark session spark = SparkSession.builder.appName("SimpleSession").getOrCreate() # Create a DataFrame from inline data data = [ (1, "alice", "engineering", 95000), (2, "bob", "marketing", 72000), (3, "carol", "engineering", 105000), (4, "david", "sales", 68000), (5, "eva", "engineering", 88000), ] df = spark.createDataFrame(data, ["id", "name", "department", "salary"]) # Add a new column df_with_bonus = df.withColumn("bonus", col("salary") * 0.1) df_with_bonus.show() # Filter and transform engineers = df.filter(col("department") == "engineering") \ .withColumn("name_upper", upper(col("name"))) \ .withColumn("greeting", concat(lit("Hello, "), col("name"))) engineers.show() # Aggregate df.groupBy("department").avg("salary").show() # Stop the Spark session spark.stop()
Para enviar o aplicativo, use o seguinte comando:
snowpark-submit \ --snowflake-workload-name MY_JOB \ --snowflake-connection-name MY_CONNECTION \ /path/to/app.py
Você pode usar a opção
--wait-for-completionpara aguardar a conclusão da tarefa,--workload-statuspara verificar o status da tarefa e--display-logspara exibir os logs da tarefa. Para obter uma lista completa de opções, consulte Referência de Snowpark Submit.
Como implementar um aplicativo de um estágio do Snowflake¶
Se o aplicativo tiver dependências, como arquivos que ele precisa ler, você pode implantá-las a partir de uma área de preparação do Snowflake. O exemplo a seguir mostra como implantar um aplicativo e as dependências dele a partir de uma área de preparação do Snowflake.
Para carregar arquivos para uma área de preparação a partir do terminal, você pode usar o Snowflake CLI. Observe que o SnowSQL é a CLI legada e, se você já o estiver usando, também poderá usá-lo para carregar arquivos para uma área de preparação. Se você ainda não instalou o Snowflake CLI, pode instalá-lo seguindo as instruções em Instalação do Snowflake CLI.
Crie um novo arquivo CSV em seu IDE local chamado
sample_employees.csvcom o seguinte conteúdo:employee_id,name,department,salary,years_employed 1,Alice Johnson,Engineering,95000,5 2,Bob Smith,Marketing,72000,3 3,Carol Williams,Engineering,105000,8 4,David Brown,Sales,68000,2 5,Eva Martinez,Engineering,88000,4 6,Frank Wilson,Marketing,75000,6 7,Grace Lee,Sales,92000,7 8,Henry Taylor,Engineering,110000,10 9,Ivy Chen,Marketing,65000,1 10,Jack Davis,Sales,78000,4 11,Karen White,Engineering,98000,6 12,Leo Harris,Marketing,71000,3 13,Maria Garcia,Sales,85000,5 14,Nathan Clark,Engineering,102000,9 15,Olivia Moore,Marketing,69000,2
Carregue os seus arquivos de dependência para uma área de preparação usando o seguinte comando, em que
my_stageé o nome de uma área de preparação em sua conta. (Se você não tiver uma área de preparação criada, poderá usar [snow stage create](/developer-guide/snowflake-cli/command-reference/stage-commands/create).)snow stage copy sample_employees.csv @<database>.<schema>.<stage>/sample_employees.csv -c MY_CONNECTION
Para verificar se o arquivo foi carregado com sucesso, use o seguinte comando para listar os arquivos na área de preparação:
snow sql -c MY_CONNECTION -q "ls @<database>.<schema>.<stage>"
Você deverá ver o arquivo
sample_employees.csvna lista.Em seu IDE local, crie um novo arquivo Python chamado
app.pycom o seguinte conteúdo:from pyspark.sql import SparkSession # Create Spark session spark = SparkSession.builder.appName("SimpleStageExample").getOrCreate() # Load data from stage (adjust stage name to match yours) df = spark.read.csv("/app/<YOUR_STAGE>/sample_employees.csv", header=True, inferSchema=True) df.show() # Filter: Engineering department only engineers = df.filter(df["department"] == "Engineering") engineers.show() # Filter: Salary > 80000 and years_employed > 3 senior_high_earners = df.filter((df["salary"] > 80000) & (df["years_employed"] > 3)) senior_high_earners.show() # Aggregate: Average salary by department df.groupBy("department").avg("salary").show() # Select specific columns result = senior_high_earners.select("name", "department", "salary") result.show() # Stop the Spark session spark.stop()
Para enviar o aplicativo que usa os arquivos que você carregou para a área de preparação, use o seguinte comando:
snowpark-submit \ --snowflake-connection-name MY_CONNECTION \ --snowflake-workload-name MY_JOB \ --snowflake-stage @<database>.<schema>.<stage> \ /path/to/app.py
Observe que um pool de computação é necessário para executar o aplicativo e deve ser especificado no arquivo
connections.tomlou na linha de comando usando a opção--compute-pool. Para obter mais informações, consulte Referência de Snowpark Submit.
Monitoramento com espera e logs¶
O exemplo a seguir mostra como enviar um trabalho, aguardar sua conclusão e depois recuperar os logs.
Envie o trabalho e aguarde a conclusão usando o seguinte comando:
snowpark-submit \ --snowflake-workload-name MY_JOB \ --wait-for-completion \ --snowflake-connection-name MY_CONNECTION \ /path/to/app.py
Se o trabalho falhar, verifique os logs detalhados usando o seguinte comando:
snowpark-submit --snowflake-workload-name MY_JOB \ --workload-status \ --display-logs \ --snowflake-connection-name MY_CONNECTION
Usar o Snowpark Submit em um Apache Airflow DAG¶
Você pode enviar um trabalho do Spark ao Snowflake via Snowpark Connect for Spark. Você pode usar o :comando:`snowpark-submit` no modo de cluster para aproveitar um pool de computação e executar o trabalho.
Ao usar o Apache Airflow dessa forma, certifique-se de que o serviço Docker ou o contêiner do Snowpark Container Services que executa o Apache Airflow tenha acesso adequado ao Snowflake e aos arquivos necessários no estágio do Snowflake.
O código no exemplo a seguir executa as seguintes tarefas:
Cria um ambiente virtual Python em
/tmp/myenv.Na tarefa
create_venv, o código usapippara instalar o pacotesnowpark-submitusando um arquivo.whl.Gera um arquivo
connections.tomlseguro com credenciais de conexão do Snowflake e um token OAuth.Na tarefa
create_connections_toml, o código cria o diretório/app/.snowflake, cria o arquivo.tomle depois altera as permissões do arquivo para permitir que somente o proprietário (usuário) tenha acesso de leitura e gravação.Executa um trabalho Spark usando o comando:
snowpark-submitcomando.Na tarefa
run_snowpark_script, o código faz o seguinte:Ativa o ambiente virtual.
Executa o trabalho Spark usando o comando:
snowpark-submitcomando.Implanta no Snowflake usando o modo de cluster.
Usa a Snowpark Connect for Spark remoto URI s://localhost:15002.
Especifica a classe do aplicativo Spark
org.example.SnowparkConnectApp.Extrai o script do estágio @snowflake_stage.
Bloqueia a implantação até que o trabalho termine usando
--wait-for-completion.
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
default_args = {
'start_date': airflow.utils.dates.days_ago(1),
'retries': 0,
}
with DAG(
'run_sparkconnect_python_script',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
create_venv = BashOperator(
task_id='create_venv',
bash_command="""
python3 -m venv /tmp/myenv &&
source /tmp/myenv/bin/activate &&
export PIP_USER=false &&
pip install --upgrade pip &&
pip install --no-cache-dir grpcio-tools>=1.48.1 &&
pip install /app/snowpark_submit-<version>.whl
"""
)
create_connections_toml = BashOperator(
task_id='create_connections_toml',
bash_command="""
mkdir -p /app/.snowflake
echo "${SNOWFLAKE_USER}"
cat <<EOF > /app/.snowflake/connections.toml
[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
chmod 600 /app/.snowflake/connections.toml
"""
)
run_script = BashOperator(
task_id='run_snowpark_script',
bash_command="""
set -e
echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"
echo "Running Python script with Snowpark..."
source /tmp/myenv/bin/activate &&
snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
""",
env={
'SNOWFLAKE_HOME': '/app/.snowflake'
}
)
create_venv >> create_connections_toml >> run_script
Você pode monitorar o DAG usando a exibição em gráfico ou a exibição em árvore da interface do usuário do Apache Airflow. Inspecione os logs de tarefas para os seguintes itens:
Configuração do ambiente
Status de Snowpark Connect for Spark
:comando:`snowpark-submit` saída do trabalho
Você também pode monitorar os trabalhos executados no Snowflake por meio dos logs armazenados no estágio Snowflake ou das tabelas de eventos.