Spark 커넥터 사용하기

커넥터는 표준 Spark API를 준수하지만, 이 항목에서는 추가된 Snowflake 관련 옵션에 대한 설명을 제공합니다.

이 항목에서 COPY라는 용어는 다음을 모두 의미합니다.

  • COPY INTO <테이블> (내부 또는 외부 스테이지에서 테이블로 데이터를 전송하기 위해 사용).

  • COPY INTO <위치> (테이블에서 내부 또는 외부 스테이지로 데이터를 전송하기 위해 사용).

이 항목의 내용:

SnowCD를 사용한 Snowflake로의 네트워크 연결 확인하기

드라이버를 구성한 후에는 SnowCD 를 사용하여 Snowflake로의 네트워크 연결을 평가하고 문제를 해결할 수 있습니다.

초기 구성 프로세스 및 언제라도 필요할 때 SnowCD를 사용하여 Snowflake로의 네트워크 연결을 평가하고 문제를 해결할 수 있습니다.

푸시다운

Spark Connector는 SQL 작업에 대한 Spark 논리적 계획을 캡처 및 분석하여 조건자 및 쿼리 푸시다운을 적용합니다. 데이터 소스가 Snowflake인 경우 작업이 SQL 쿼리로 변환된 후 Snowflake에서 실행되어 성능이 향상됩니다.

그러나 이 변환의 경우 거의 일대일 방식으로 Spark SQL 연산자를 Snowflake 식으로 변환해야 하므로 모든 Spark SQL 연산자를 푸시다운하는 것은 불가능합니다. 푸시다운이 실패하면 커넥터는 덜 최적화된 실행 계획으로 대체됩니다. 지원되지 않는 작업은 Spark에서 대신 수행됩니다.

참고

모든 작업에 대해 푸시다운이 필요한 경우 대신 Snowpark 를 사용하는 코드를 작성해 보십시오.

푸시다운이 지원되는 작업 목록은 아래와 같습니다(아래의 모든 함수는 Spark의 해당 이름을 사용). 함수가 이 목록에 없으면 이를 사용하는 Spark 계획이 Snowflake로 푸시다운되지 않고 Spark에서 실행될 수 있습니다.

  • 집계 함수

    • Average

    • Corr

    • CovPopulation

    • CovSample

    • Count

    • Max

    • Min

    • StddevPop

    • StddevSamp

    • Sum

    • VariancePop

    • VarianceSamp

  • 부울 연산자

    • And

    • Between

    • Contains

    • EndsWith

    • EqualTo

    • GreaterThan

    • GreaterThanOrEqual

    • In

    • IsNull

    • IsNotNull

    • LessThan

    • LessThanOrEqual

    • Not

    • Or

    • StartsWith

  • 날짜, 시간 및 타임스탬프 함수

    • DateAdd

    • DateSub

    • Month

    • Quarter

    • TruncDate

    • TruncTimestamp

    • Year

  • 수학 함수

    • 산술 연산자 〈+〉(더하기), 〈-〈(빼기), 〈*〉(곱하기), 〈/〉(나누기) 및 〈-〈(단항 부정).

    • Abs

    • Acos

    • Asin

    • Atan

    • Ceil

    • CheckOverflow

    • Cos

    • Cosh

    • Exp

    • Floor

    • Greatest

    • Least

    • Log

    • Pi

    • Pow

    • PromotePrecision

    • Rand

    • Round

    • Sin

    • Sinh

    • Sqrt

    • Tan

    • Tanh

  • 기타 연산자

    • Alias(AS 식)

    • BitwiseAnd

    • BitwiseNot

    • BitwiseOr

    • BitwiseXor

    • CaseWhen

    • Cast(child, t, _)

    • Coalesce

    • If

    • MakeDecimal

    • ScalarSubquery

    • ShiftLeft

    • ShiftRight

    • SortOrder

    • UnscaledValue

  • 관계형 연산자

    • 집계 함수 및 그룹화 방법 절

    • Distinct

    • Filters

    • In

    • InSet

    • Joins

    • Limits

    • Projections

    • Sorts (ORDER BY)

    • Union 및 Union All

    • 윈도우 함수 및 윈도우 절

  • 문자열 함수

    • Ascii

    • Concat(어린이용)

    • Length

    • Like

    • Lower

    • StringLPad

    • StringRPad

    • StringTranslate

    • StringTrim

    • StringTrimLeft

    • StringTrimRight

    • Substring

    • Upper

  • 윈도우 함수(참고: Spark 2.2에서는 작동하지 않음)

    • DenseRank

    • Rank

    • RowNumber

Scala에서 커넥터 사용하기

데이터 소스 클래스 이름 지정하기

Spark에서 Snowflake를 데이터 소스로 사용하려면 .format 옵션을 사용하여 데이터 소스를 정의하는 Snowflake 커넥터 클래스 이름을 제공합니다.

net.snowflake.spark.snowflake

컴파일 시간에 클래스 이름을 확인할 수 있도록 Snowflake는 클래스 이름에 대한 변수를 정의할 것을 적극 권장합니다. 예:

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

또한, 편의를 위해 Utils 클래스는 변수를 제공하며 이 변수는 다음과 같이 가져올 수 있습니다.

import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME

참고

이 항목에서 모든 예는 클래스 정의에서와 같이 SNOWFLAKE_SOURCE_NAME 을 사용합니다.

세션에서 푸시다운 활성화/비활성화하기

커넥터 버전 2.1.0 이상은 쿼리 푸시다운을 지원하므로 Snowflake가 Spark 데이터 소스인 경우에는 Snowflake로 쿼리 처리를 푸시하여 성능을 크게 향상할 수 있습니다.

푸시다운은 기본적으로 활성화됩니다.

주어진 DataFrame에 대해 Spark 세션 내에서 푸시다운을 비활성화하는 방법은 다음과 같습니다.

  1. SparkSession 오브젝트를 인스턴스화한 후 SnowflakeConnectorUtils.disablePushdownSession 정적 메서드를 호출하여 SparkSession 오브젝트를 전달합니다. 예:

    SnowflakeConnectorUtils.disablePushdownSession(spark)
    

    여기서, sparkSparkSession 오브젝트입니다.

  2. autopushdown 옵션을 off 로 설정하여 DataFrame을 만듭니다. 예:

    val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME)
      .options(sfOptions)
      .option("query", query)
      .option("autopushdown", "off")
      .load()
    

    options 메서드에 전달하는 Map 에서 autopushdown 옵션을 설정할 수도 있습니다(예: 위의 예에서는 sfOptions 에서).

푸시다운을 비활성화한 후 다시 활성화하려면 SnowflakeConnectorUtils.enablePushdownSession 정적 메서드를 호출하고(SparkSession 오브젝트 전달), autopushdown 이 활성화된 DataFrame을 만드십시오.

Snowflake에서 Spark로 데이터 이동하기

참고

DataFrames을 사용하는 경우 Snowflake 커넥터는 SELECT 쿼리만을 지원합니다.

Snowflake에서 Spark DataFrame으로 데이터를 읽으려면:

  1. SqlContext 오브젝트의 DataFrameReader 메서드를 사용하여 read() 를 구성합니다.

  2. format() 메서드를 사용하여 SNOWFLAKE_SOURCE_NAME 을 지정합니다. 해당 정의는 이 항목의 데이터 소스 클래스 이름 지정하기 섹션을 참조하십시오.

  3. option() 또는 options() 메서드를 사용하여 커넥터 옵션을 지정합니다. 자세한 내용은 이 항목의 커넥터에 대한 구성 옵션 설정하기 섹션을 참조하십시오.

  4. 읽을 테이블 데이터에 대해 다음 옵션 중 1개를 지정합니다.

    • dbtable: 읽을 테이블의 이름입니다. 모든 열과 레코드가 검색됩니다(즉, SELECT * FROM db_table 에 해당).

    • query: 실행할 정확한 쿼리(SELECT 문)입니다.

사용법 노트

  • 현재 커넥터는 DataFrames을 사용하는 경우 다른 타입의 쿼리(예: SHOW 또는 DESC 또는 DML 문)를 지원하지 않습니다.

  • 개별 행의 크기에는 상한이 있습니다. 자세한 내용은 쿼리 텍스트 크기 제한 섹션을 참조하십시오.

성능 고려 사항

Snowflake와 Spark 사이에서 데이터를 전송하는 경우에는 다음 방법을 사용하여 성능을 분석/향상할 수 있습니다.

  • net.snowflake.spark.snowflake.Utils.getLastSelect() 메서드를 사용하여 Snowflake에서 Spark로 데이터를 이동할 때 발생하는 실제 쿼리를 확인합니다.

  • Spark DataFrame의 filter 또는 where 기능을 사용하는 경우, 발급된 SQL 쿼리에 해당 필터가 있는지 확인합니다. Snowflake 커넥터는 Spark에서 요청한 모든 필터를 SQL로 변환하려고 시도합니다.

    그러나 최근 Spark 인프라에는 Snowflake 커넥터에 전달되지 않는 필터 형식이 있습니다. 그러므로 상황에 따라 Snowflake가 많은 수의 불필요한 레코드를 요청하게 됩니다.

  • 열의 하위 세트만 필요한 경우에는 SQL 쿼리에 하위 세트를 반영해야 합니다.

  • 일반적으로, 발급된 SQL 쿼리가 DataFrame 작업에 따라 예상한 쿼리와 일치하지 않는 경우 query 옵션을 사용하여 원하는 정확한 SQL 구문을 제공합니다.

전체 테이블 읽기:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

쿼리 결과 읽기:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1")
    .load()

Spark에서 Snowflake로 데이터 이동하기

DataFrame의 내용을 Snowflake 테이블에 저장하는 단계는 Snowflake에서 Spark로 작성하는 단계와 유사합니다.

  1. DataFrameDataFrameWriter 메서드를 사용하여 write() 를 구성합니다.

  2. format() 메서드를 사용하여 SNOWFLAKE_SOURCE_NAME 을 지정합니다. 해당 정의는 이 항목의 데이터 소스 클래스 이름 지정하기 섹션을 참조하십시오.

  3. option() 또는 options() 메서드를 사용하여 커넥터 옵션을 지정합니다. 자세한 내용은 이 항목의 커넥터에 대한 구성 옵션 설정하기 섹션을 참조하십시오.

  4. dbtable 옵션을 사용하여 데이터가 기록될 테이블을 지정합니다.

  5. mode() 메서드를 사용하여 내용을 저장할 모드를 지정합니다.

    자세한 내용은 Spark 설명서의 SaveMode 를 참조하십시오.

df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Spark에서 Snowflake로 JSON 내보내기

Spark DataFrames에는 문자열로 직렬화된 JSON 오브젝트가 포함될 수 있습니다. 다음 코드는 일반 DataFrame을 JSON 데이터가 포함된 DataFrame으로 변환하는 예를 제공합니다.

val rdd = myDataFrame.toJSON
val schema = new StructType(Array(StructField("JSON", StringType)))
val jsonDataFrame = sqlContext.createDataFrame(
            rdd.map(s => Row(s)), schema)

결과 jsonDataFrame 에는 StringType 타입의 단일 열이 포함됨에 유의하십시오. 결과적으로 이 공통 SaveMode.Overwrite 모드를 사용하여 DataFrame을 Snowflake로 내보내면 Snowflake의 새 테이블이 VARCHAR 타입의 단일 열로 생성됩니다.

jsonDataFrameVARIANT 열로 로드하려면:

  1. Snowflake 테이블을 만듭니다(Snowflake JDBC 드라이버를 사용하여 Java의 Snowflake에 연결). 예제에 사용된 연결 매개 변수에 대한 설명은 JDBC 드라이버 연결 매개 변수 참조 섹션을 참조하십시오.

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class SnowflakeJDBCExample {
      public static void main(String[] args) throws Exception {
        String jdbcUrl = "jdbc:snowflake://myorganization-myaccount.snowflakecomputing.com/";
    
        Properties properties = new Properties();
        properties.put("user", "peter");
        properties.put("password", "test");
        properties.put("account", "myorganization-myaccount");
        properties.put("warehouse", "mywh");
        properties.put("db", "mydb");
        properties.put("schema", "public");
    
        // get connection
        System.out.println("Create JDBC connection");
        Connection connection = DriverManager.getConnection(jdbcUrl, properties);
        System.out.println("Done creating JDBC connection\n");
        // create statement
        System.out.println("Create JDBC statement");
        Statement statement = connection.createStatement();
        System.out.println("Done creating JDBC statement\n");
        // create a table
        System.out.println("Create my_variant_table table");
        statement.executeUpdate("create or replace table my_variant_table(json VARIANT)");
        statement.close();
        System.out.println("Done creating demo table\n");
    
        connection.close();
        System.out.println("Close connection\n");
      }
    }
    
  2. SaveMode.Overwrite 를 사용하는 대신 SaveMode.Append 를 사용하여 기존 테이블을 재사용하십시오. JSON을 나타내는 문자열 값이 Snowflake에 로드될 때 대상 열의 타입이 VARIANT이므로 JSON으로 구문 분석됩니다. 예:

    df.write
        .format(SNOWFLAKE_SOURCE_NAME)
        .options(sfOptions)
        .option("dbtable", "my_variant_table")
        .mode(SaveMode.Append)
        .save()
    

DDL/DML SQL 문 실행하기

Utils 오브젝트의 runQuery() 메서드를 사용하여 쿼리 외에 DDL/DML SQL 문을 실행합니다. 이와 관련한 예는 다음과 같습니다.

var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
    )
Utils.runQuery(sfOptions, "CREATE TABLE MY_TABLE(A INTEGER)")

여기서, sfOptions 은 DataFrames을 읽고 쓰기 위해 사용되는 매개 변수 맵입니다.

runQuery 메서드에서는 TRUE 또는 FALSE만을 반환합니다. 이 메서드는 예를 들어 CREATE TABLE 과 같은 DDL 문 및 INSERT, UPDATEDELETE 와 같은 DML 문과 같이 결과 세트를 반환하지 않는 문에서 사용됩니다. SELECT 또는 SHOW 와 같이 결과 세트를 반환하는 문에는 유용하지 않습니다.

타임스탬프 및 타임존 관련 작업하기

Spark는 Scala/Java Timestamp 타입과 동일한 1가지 타입의 타임스탬프만 제공합니다. 이 타임스탬프는 Snowflake의 TIMESTAMP_LTZ(현지 타임존) 데이터 타입과 거의 동일하게 동작합니다. 그러므로 Spark와 Snowflake 사이에서 데이터를 전송할 때 Snowflake는 타임존을 기준으로 시간을 올바르게 보존할 수 있도록 다음 방식을 사용할 것을 권장합니다.

  • Snowflake에서는 TIMESTAMP_LTZ 데이터 타입만 사용하십시오.

    참고

    기본 타임스탬프 데이터 타입 매핑은 TIMESTAMP_NTZ(타임존 없음)이므로 TIMESTAMP_LTZ를 사용하려면 TIMESTAMP_TYPE_MAPPING 매개 변수를 반드시 명시적으로 설정해야 합니다.

  • Spark 타임존을 UTC 로 설정하고 Snowflake에서 이 타임존을 사용합니다(즉, 커넥터에 sfTimezone 옵션을 설정하지 않고 Snowflake에서 타임존을 명시적으로 설정하지 않음). 이러한 상황에는 TIMESTAMP_LTZ와 TIMESTAMP_NTZ가 사실상 동일합니다.

    타임존을 설정하려면 Spark 코드에 다음 라인을 추가합니다.

    java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))
    

이러한 방식으로 구현하지 않으면 원치 않는 시간 수정이 발생할 수 있습니다. 예를 들어, 다음 시나리오를 생각해 보겠습니다.

  • Spark의 타임존이 America/New_York 으로 설정되었습니다.

  • Snowflake의 타임존이 Europe/Warsaw 로 설정되면 다음 중 하나가 발생할 수 있습니다.

    • 커넥터용으로 sfTimezoneEurope/Warsaw 설정.

    • 커넥터용으로 sfTimezonesnowflake 로 설정하고 Snowflake의 TIMEZONE 세션 매개 변수를 Europe/Warsaw 로 설정.

  • TIMESTAMP_NTZ 및 TIMESTAMP_LTZ가 모두 Snowflake에서 사용됩니다.

이 시나리오에서:

  1. Snowflake의 TIMESTAMP_NTZ 열에서 12:00:00 를 나타내는 값이 Spark로 전송되면 이 값에는 타임존 정보가 포함되지 않습니다. Spark는 값을 뉴욕의 12:00:00 로 취급합니다.

  2. Spark가 이 값 12:00:00 (뉴욕)을 Snowflake로 다시 전송하여 TIMESTAMP_LTZ 열에 로드되면 자동으로 변환되어 18:00:00 (바르샤바 타임존)로 로드됩니다.

  3. 그리고 이 값을 Snowflake에서 TIMESTAMP_NTZ로 변환하면 사용자에게는 원래 값인 12:00:00 이 아닌 18:00:00 가 표시됩니다.

요약하자면 Snowflake는 다음 규칙 중 적어도 1개를 엄격하게 준수할 것을 권장합니다.

  • Spark와 Snowflake 모두에서 동일한 타임존을, 이상적으로는 UTC 를 사용하는 것이 가장 좋습니다.

  • Spark와 Snowflake 사이에서 데이터를 전송하려면 TIMESTAMP_LTZ 데이터 타입만 사용합니다.

샘플 Scala 프로그램

중요

이 샘플 프로그램에서는 임시 데이터를 저장하기 위해 Snowflake 내부 스테이지를 사용하므로, 임시 데이터를 저장하기 위한 S3의 위치가 필요하지 않은 커넥터 버전 2.2.0 이상을 사용하는 것으로 가정합니다. 이전 버전을 사용하는 경우에는 기존 S3 위치가 있어야 하며 sfOptions 에 대한 tempdir, awsAccessKey, awsSecretKey 값이 포함되어야 합니다. 자세한 내용은 이 항목의 외부 데이터 전송을 위한 AWS 옵션 섹션을 참조하십시오.

다음 Scala 프로그램은 Spark용 Snowflake Connector의 전체 사용 사례를 제공합니다. 코드를 사용하기 전, 이 항목에서 커넥터에 대한 구성 옵션 설정하기 의 설명과 같이 다음 문자열을 적절한 값으로 바꾸십시오.

  • <계정_식별자>: 사용자의 계정 식별자.

  • <사용자_이름> , <비밀번호>: Snowflake 사용자의 로그인 자격 증명.

  • <데이터베이스> , <스키마> , <웨어하우스>: Snowflake 세션의 기본값.

샘플 Scala 프로그램에서는 기본 인증(즉, 사용자 이름 및 비밀번호)을 사용합니다. OAuth를 사용하여 인증하려면 이 항목의 외부 OAuth 사용하기 를 참조하십시오.

import org.apache.spark.sql._

//
// Configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "select c1, count(*) from t1 group by c1")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Python 커넥터 사용하기

Python에서 커넥터를 사용하는 방법은 Scala 사용법과 매우 유사합니다.

Spark 배포판에 포함된 bin/pyspark 스크립트를 사용하는 것이 좋습니다.

pyspark 스크립트 구성하기

pyspark 스크립트는 --packages 또는 --jars 옵션을 사용하여 spark-shell 스크립트와 유사하게 구성해야 합니다. 예:

bin/pyspark --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3

CLASSPATH 환경 변수에 Snowflake Spark Connector 및 JDBC Connector .jar 파일을 포함하는 것을 잊지 마십시오.

spark-shell 스크립트 구성에 대한 자세한 내용은 4단계: 로컬 Spark 클러스터 또는 Amazon EMR 호스팅 Spark 환경 구성하기 를 참조하십시오.

세션에서 푸시다운 활성화/비활성화하기

커넥터 버전 2.1.0 이상은 쿼리 푸시다운을 지원하므로 Snowflake가 Spark 데이터 소스인 경우에는 Snowflake로 쿼리 처리를 푸시하여 성능을 크게 향상할 수 있습니다.

푸시다운은 기본적으로 활성화됩니다.

주어진 DataFrame에 대해 Spark 세션 내에서 푸시다운을 비활성화하는 방법은 다음과 같습니다.

  1. SparkSession 오브젝트를 인스턴스화한 후 SnowflakeConnectorUtils.disablePushdownSession 정적 메서드를 호출하여 SparkSession 오브젝트를 전달합니다. 예:

    sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
    
  2. autopushdown 옵션을 off 로 설정하여 DataFrame을 만듭니다. 예:

    df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
      .options(**sfOptions) \
      .option("query",  query) \
      .option("autopushdown", "off") \
      .load()
    

    options 메서드에 전달하는 Dictionary 에서 autopushdown 옵션을 설정할 수도 있습니다(예: 위의 예에서는 sfOptions 에서).

푸시다운을 비활성화한 후 다시 활성화하려면 SnowflakeConnectorUtils.enablePushdownSession 정적 메서드를 호출하고(SparkSession 오브젝트 전달), autopushdown 이 활성화된 DataFrame을 만드십시오.

샘플 Python 스크립트

중요

이 샘플 스크립트에서는 임시 데이터를 저장하기 위해 Snowflake 내부 스테이지를 사용하므로, 이 데이터를 저장하기 위한 S3의 위치가 필요하지 않은 커넥터 버전 2.2.0 이상을 사용하는 것으로 가정합니다. 이전 버전을 사용하는 경우에는 기존 S3 위치가 있어야 하며 sfOptions 에 대한 tempdir, awsAccessKey, awsSecretKey 값이 포함되어야 합니다. 자세한 내용은 이 항목의 외부 데이터 전송을 위한 AWS 옵션 섹션을 참조하십시오.

pyspark 스크립트가 구성되면 SQL 쿼리 및 기타 작업을 수행할 수 있습니다. 간단한 SQL 쿼리를 수행하는 Python 스크립트의 예는 다음과 같습니다. 이 스크립트는 기본적인 커넥터 사용법을 설명합니다. 이 설명서에서 제공되는 대부분의 Scala 예제는 Python과 함께 사용할 수 있도록 최소한의 작업/변경을 수행하여 사용할 수 있습니다.

샘플 Python 스크립트에서는 기본 인증(즉, 사용자 이름 및 비밀번호)을 사용합니다. OAuth를 사용하여 인증하려면 이 항목의 외부 OAuth 사용하기 를 참조하십시오.

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfPassword" : "<password>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()

sfOptionsSNOWFLAKE_SOURCE_NAME 사용법을 참고하십시오. 이를 통해 코드가 간소화되고 오류 발생 가능성이 감소합니다.

sfOptions 에서 지원되는 옵션에 대한 자세한 내용은 이 항목의 커넥터에 대한 구성 옵션 설정하기 섹션을 참조하십시오.

데이터 타입 매핑

Spark Connector는 다양한 공통 데이터 타입 사이에서의 변환을 지원합니다.

Spark SQL에서 Snowflake로

Spark 데이터 타입

Snowflake 데이터 타입

ArrayType

VARIANT

BinaryType

지원되지 않음

BooleanType

BOOLEAN

ByteType

INTEGER. Snowflake는 BYTE 데이터 타입을 지원하지 않습니다.

DateType

DATE

DecimalType

DECIMAL

DoubleType

DOUBLE

FloatType

FLOAT

IntegerType

INTEGER

LongType

INTEGER

MapType

VARIANT

ShortType

INTEGER

StringType

길이가 지정된 경우 VARCHAR(N), 그렇지 않은 경우 VARCHAR

StructType

VARIANT

TimestampType

TIMESTAMP

Snowflake에서 Spark SQL로

Snowflake 데이터 타입

Spark 데이터 타입

ARRAY

StringType

BIGINT

DecimalType(38, 0)

BINARY

지원되지 않음

BLOB

지원되지 않음

BOOLEAN

BooleanType

CHAR

StringType

CLOB

StringType

DATE

DateType

DECIMAL

DecimalType

DOUBLE

DoubleType

FLOAT

DoubleType

INTEGER

DecimalType(38, 0)

OBJECT

StringType

TIMESTAMP

TimestampType

TIME

StringType (Spark Connector 버전 2.4.14 이상)

VARIANT

StringType

DataFrame.show 메서드 호출하기

DataFrame.show 메서드를 호출하고 DataFrame의 행 수보다 적은 수의 행을 전달하는 경우, 정렬된 순서로 표시할 행만 포함하는 DataFrame을 생성합니다.

이를 위해 다음을 수행하십시오.

  1. 먼저 sort 메서드를 호출하여 정렬된 행을 포함하는 DataFrame을 반환합니다.

  2. 그 DataFrame에서 limit 메서드를 호출하여 표시하려는 행만 포함하는 DataFrame을 반환합니다.

  3. 반환된 DataFrame에서 show 메서드를 호출합니다.

예를 들어, 5개의 행을 표시하고 열 my_col 을 기준으로 결과를 정렬하려는 경우 다음과 같이 하십시오.

val dfWithRowsToShow = originalDf.sort("my_col").limit(5)
dfWithRowsToShow.show(5)

그렇지 않고 show 를 호출하여 DataFrame에 있는 행 중 일부를 표시하는 경우 코드의 다른 실행에서는 다른 행이 표시될 수 있습니다.

커넥터에 대한 구성 옵션 설정하기

다음 섹션에는 커넥터의 동작을 구성하기 위해 설정하는 옵션이 나와 있습니다.

이러한 옵션을 설정하려면 Spark DataframeReader 클래스의 .option(<키>, <값>) 또는 .options(<맵>) 메서드를 호출하십시오.

옵션을 수월하게 사용하려면 단일 Map 오브젝트에서 옵션을 지정하고 .options(<맵>) 을 호출하여 옵션을 설정하는 것이 좋습니다.

필수 연결 옵션

Snowflake에 연결하기 위해 필요한 옵션은 다음과 같습니다.

sfUrl

계정의 호스트 이름 을 다음 형식으로 지정합니다.

account_identifier.snowflakecomputing.com

account_identifier계정 식별자 입니다.

sfUser

Snowflake 사용자의 로그인 이름입니다.

또한 다음 옵션 중 하나를 사용해 인증해야 합니다.

  • sfPassword

    Snowflake 사용자의 비밀번호입니다.

  • pem_private_key

    키 페어 인증을 위한 개인 키(PEM 형식)입니다. 자세한 지침은 키 페어 인증 & 키 페어 순환 섹션을 참조하십시오.

  • sfAuthenticator

    외부 OAuth 를 사용하여 Snowflake에 인증하도록 지정합니다. 값을 oauth 로 설정합니다.

    외부 OAuth를 사용하려면 sfToken 매개 변수를 설정해야 합니다.

sfToken

(External OAuth를 사용하는 경우 필수) External OAuth 액세스 토큰에 값을 설정합니다.

이 연결 매개 변수에서는 sfAuthenticator 매개 변수 값을 oauth 로 설정해야 합니다.

기본값은 없음입니다.

필수 컨텍스트 옵션

세션의 데이터베이스 및 스키마 컨텍스트를 설정하기 위한 필수 옵션은 다음과 같습니다.

sfDatabase

연결 후 세션에서 사용할 데이터베이스입니다.

sfSchema

연결 후 세션에서 사용할 스키마입니다.

추가 컨텍스트 옵션

이 섹션에 나열된 옵션은 필수 사항이 아닙니다.

sfAccount

계정 식별자(예: myorganization-myaccount). sfUrl 에 계정 식별자가 지정되었으므로 이 옵션은 더 이상 필요하지 않습니다. 여기에는 이전 버전과의 호환성을 위해서만 설명이 제공됩니다.

sfWarehouse

연결 후 세션에서 사용한 기분 가상 웨어하우스입니다.

sfRole

연결 후에 세션에서 사용할 기본 보안 역할입니다.

프록시 옵션

이 섹션에 나열된 옵션은 필수 사항이 아닙니다.

use_proxy

커넥터에서 프록시를 사용해야 하는지 여부를 지정합니다.

  • true 는 커넥터에서 프록시를 사용해야 한다고 지정하는 것입니다.

  • false 는 커넥터에서 프록시를 사용하면 안 된다고 지정하는 것입니다.

기본값은 false 입니다.

proxy_host

(use_proxytrue 인 경우 필수) 사용할 프록시 서버의 호스트 이름을 지정합니다.

proxy_port

(use_proxytrue 인 경우 필수) 사용할 프록시 서버의 포트 번호를 지정합니다.

proxy_user

프록시 서버의 인증에서 사용할 사용자 이름을 지정합니다. 프록시 서버에 인증이 필요한 경우 이를 설정합니다.

이것은 AWS에서 Snowflake에만 지원됩니다.

proxy_password

프록시 서버의 인증을 위한 proxy_user 의 비밀번호를 지정합니다. 프록시 서버에 인증이 필요한 경우 이를 설정합니다.

이것은 AWS에서 Snowflake에만 지원됩니다.

non_proxy_hosts

커넥터가 프록시 서버를 우회하여 직접 연결해야 하는 호스트 목록을 지정합니다.

URL로 이스케이프된 파이프 기호(%7C)로 호스트 이름을 구분합니다. 와일드카드로 별표(*)를 사용할 수도 있습니다.

이것은 AWS에서 Snowflake에만 지원됩니다.

추가 옵션

이 섹션에 나열된 옵션은 필수 사항이 아닙니다.

sfTimezone

Spark 관련 작업을 수행할 때 Snowflake에서 사용할 타임존입니다. 매개 변수는 Snowflake의 타임존만 설정하며, Spark 환경은 수정되지 않은 상태로 유지된다는 점에 유의하십시오. 지원되는 값은 다음과 같습니다.

  • spark: Spark의 타임존을 사용합니다(기본값).

  • snowflake: Snowflake의 현재 타임존을 사용합니다.

  • sf_default: 연결 중인 Snowflake 사용자의 기본 타임존을 사용합니다.

  • time_zone: 유효한 경우, 특정 타임존(예: America/New_York)을 사용합니다.

    이 옵션을 설정하는 것의 영향에 대한 자세한 내용은 이 항목의 타임스탬프 및 타임존 관련 작업하기 섹션을 참조하십시오.

sfCompress

on (기본값)으로 설정하면 Snowflake와 Spark 사이에서 전달되는 데이터가 압축됩니다.

s3MaxFileSize

Snowflake에서 Spark로 데이터를 이동할 때 사용되는 파일의 크기입니다. 기본값은 10MB입니다.

preactions

Spark와 Snowflake 사이에서 데이터를 전송하기 전에 실행되는 세미콜론으로 구분된 SQL 명령 목록입니다.

SQL 명령에 %s 가 포함된 경우 %s 가 작업에서 참조되는 테이블의 이름으로 바뀝니다.

postactions

Spark와 Snowflake 사이에서 데이터를 전송한 후에 실행되는 세미콜론으로 구분된 SQL 명령 목록입니다.

SQL 명령에 %s 가 포함된 경우, 작업에서 참조되는 테이블의 이름으로 바뀝니다.

truncate_columns

on (기본값)으로 설정하면 COPY 명령이 대상 열 길이를 초과하는 텍스트 문자열을 자동으로 자릅니다. off 로 설정되면, 명령 실행 시 로드된 문자열이 대상 열 길이를 초과하면 오류가 발생합니다.

truncate_table

이 매개 변수는 Snowflake가 해당 테이블을 덮어쓸 때 Snowflake 대상 테이블의 스키마를 유지할지 여부를 제어합니다.

기본적으로, Snowflake의 대상 테이블을 덮어쓰면 해당 대상 테이블의 스키마도 덮어써지며, 새 스키마는 소스 테이블(Spark datafram)의 스키마를 기반으로 합니다.

그러나 일부 경우 소스의 스키마가 가장 적합한 것은 아닙니다. 예를 들어, 초기 소스 열의 데이터 타입은 FLOAT이지만 사용자는 향후 Snowflake 대상 테이블에 INTEGER 값을 저장할 수 있기를 원할 수 있습니다. 이러한 경우에는 Snowflake 테이블의 스키마를 덮어쓰는 대신, Snowflake 테이블을 자른 후 현재 스키마와 함께 다시 사용해야 합니다.

이 매개 변수에서 사용할 수 있는 값은 다음과 같습니다.

  • on

  • off

이 매개 변수가 on 이면 대상 테이블의 원래 스키마가 유지됩니다. 이 매개 변수가 off 이면 테이블의 기존 스키마가 무시되고 소스의 스키마를 기반으로 새 스키마가 생성됩니다.

이 매개 변수는 선택 사항입니다.

이 매개 변수의 기본값은 off 입니다(즉, 기본적으로 원래 테이블 스키마 덮어쓰기).

Spark 데이터 타입을 Snowflake 데이터 타입으로 매핑하는 방법(또는 그 반대로)에 대한 자세한 내용은 이 항목의 데이터 타입 매핑 섹션을 참조하십시오.

continue_on_error

이 변수는 사용자가 유효하지 않은 데이터(예: 베리언트 데이터 타입 열에 유효하지 않은 COPY 타입)를 입력하는 경우 JSON 명령을 중단할지 여부를 제어합니다.

사용할 수 있는 값은 다음과 같습니다.

  • on

  • off

on 값은 오류가 발생해도 계속 진행함을 의미합니다. off 값은 오류가 발생하면 중단함을 의미합니다.

이 매개 변수는 선택 사항입니다.

이 매개 변수의 기본값은 off 입니다.

이 옵션을 끄는 것은 권장되지 않습니다. Spark 커넥터를 사용하여 Snowflake에 COPYing을 수행하는 동안 오류가 보고되면 데이터가 누락될 수 있습니다.

참고

행이 거부 또는 누락되었지만 입력 소스에서 해당 행에 명확하게 결함이 없는 경우 Snowflake에 보고하십시오.

usestagingtable

이 매개 변수는 데이터 로딩이 스테이징 테이블을 사용하는지 여부를 제어합니다.

스테이징 테이블은 커넥터에 의해 생성되는 일반 테이블(임시 이름 포함)로, 데이터 로딩 작업이 성공하면 원래 대상 테이블이 삭제되고 스테이징 테이블의 이름이 원래 대상 테이블의 이름으로 바뀝니다. 데이터 로딩 작업이 실패하면 스테이징 테이블이 삭제되고 대상 테이블에는 작업 직전의 데이터가 유지됩니다. 그러므로 스테이징 테이블을 사용하면 작업이 실패할 경우 원래 대상 테이블의 데이터를 유지할 수 있습니다. 안전을 위해 Snowflake는 대부분의 상황에서 스테이징 테이블을 사용할 것을 강력하게 권장합니다.

커넥터가 스테이징 테이블을 생성하려면 Spark 커넥터를 통해 COPY를 실행하는 사용자에게 충분한 테이블 생성 권한이 있어야 합니다. 사용자에게 테이블 생성 권한이 없는 경우에는 직접 로드(즉, 스테이징 테이블을 사용하지 않고 로드) 방식이 유용합니다.

이 매개 변수에서 사용할 수 있는 값은 다음과 같습니다.

  • on

  • off

이 매개 변수가 on 이면 스테이징 테이블이 사용됩니다. 이 매개 변수가 off 이면 데이터가 대상 테이블에 직접 로드됩니다.

이 매개 변수는 선택 사항입니다.

이 매개 변수의 기본값은 on 입니다(즉, 스테이징 테이블 사용).

autopushdown

이 매개 변수는 자동 쿼리 푸시다운의 활성화 여부를 제어합니다.

푸시다운이 활성화된 경우 Spark에서 쿼리가 실행될 때 쿼리의 일부를 Snowflake 서버로 《푸시다운》할 수 있으면 해당 쿼리가 푸시다운됩니다. 이를 통해 일부 쿼리의 성능이 향상됩니다.

이 매개 변수는 선택 사항입니다.

커넥터가 호환되는 Spark 버전에 연결된 경우 기본값은 on 입니다. 그렇지 않으면 기본값은 off 입니다.

If the connector is plugged into a different version of Spark than the connector is intended for (e.g. if version 3.2 of the connector is plugged into version 3.3 of Spark), then auto-pushdown is disabled even if this parameter is set to on.

purge

이 매개 변수를 on 으로 설정하면 외부 데이터 전송을 통해 Spark에서 Snowflake로 전송할 때 커넥터가 생성된 임시 파일을 삭제합니다. 이 매개 변수를 off 로 설정하면 해당 파일이 커넥터에 의해 자동으로 삭제되지 않습니다.

파일 제거는 Snowflake에서 Spark로의 전송이 아닌 Spark에서 Snowflake로의 전송에만 적용됩니다.

사용할 수 있는 값은 다음과 같습니다.

  • on

  • off

기본값은 off 입니다.

columnmap

이 매개 변수는 Spark에서 Snowflake로 데이터를 쓰고 Snowflake 테이블의 열 이름이 Spark 테이블의 열 이름과 일치하지 않을 때 유용합니다. 사용자는 각 Snowflake 대상 열에 해당하는 Spark 소스 열을 나타내는 맵을 생성할 수 있습니다.

매개 변수는 다음 형식을 갖는 단일 문자열 리터럴입니다.

"Map(col_2 -> col_b, col_3 -> col_a)"

예를 들어, 다음 시나리오를 생각해 보겠습니다.

  • Spark의 df Dataframe에는 다음과 같이 열이 3개 있습니다.

    col_1 , col_2 , col_3

  • Snowflake의 tb 테이블에는 다음과 같이 열이 2개 있습니다.

    col_a , col_b

  • 다음과 같이 값을 복사하려고 합니다.

    • df.col_2 에서 tb.col_b 로.

    • df.col_3 에서 tb.col_a 로.

columnmap 매개 변수의 값은:

Map(col_2 -> col_b, col_3 -> col_a)

다음 Scala 코드를 실행하여 이 값을 생성할 수 있습니다.

Map("col_2"->"col_b","col_3"->"col_a").toString()

이 매개 변수의 기본값은 null입니다. 즉, 기본적으로 원본 테이블과 대상 테이블의 열 이름이 일치해야 합니다.

이 매개 변수는 Spark에서 Snowflake로 쓸 때만 사용되며, Snowflake에서 Spark로 쓸 때는 적용되지 않습니다.

keep_column_case

열 이름이 큰따옴표로 묶인 경우를 제외하고, Spark에서 Snowflake로 테이블을 작성할 때 Spark 커넥터는 기본적으로 열 이름의 문자를 대문자로 변환합니다.

Snowflake에서 Spark로 테이블을 작성할 때 Spark 커넥터는 기본적으로 대문자, 밑줄 및 숫자를 제외한 모든 문자가 포함된 열 이름을 큰따옴표로 묶습니다.

열_대소문자_유지를 on 으로 설정하면 Spark는 이러한 변경을 수행하지 않습니다.

사용할 수 있는 값은 다음과 같습니다.

  • on

  • off

기본값은 off 입니다.

column_mapping

커넥터는 Spark 데이터 프레임의 열을 Snowflake 테이블에 매핑해야 합니다. 이 작업은 열 이름(순서에 관계없이) 또는 열 순서(즉, 열 이름에 관계없이 데이터 프레임의 첫 번째 열이 테이블의 첫 번째 열에 매핑됨)를 기반으로 수행할 수 있습니다.

기본적으로 매핑은 순서에 따라 수행됩니다. 이 매개 변수를 name 으로 설정하여 이를 무시할 수 있으며, 이 경우 커넥터가 열 이름을 기반으로 열을 매핑하도록 지시합니다. (이름 매핑에서는 대/소문자를 구분하지 않습니다.)

이 매개 변수에서 사용할 수 있는 값은 다음과 같습니다.

  • order

  • name

기본값은 order 입니다.

column_mismatch_behavior

이 매개 변수는 column_mapping 매개 변수가 name 으로 설정된 경우에만 적용됩니다.

Spark 데이터 프레임과 Snowflake 테이블의 열 이름이 일치하지 않는 경우:

  • column_mismatch_behaviorerror 이면, Spark Connector에서 오류를 보고합니다.

  • column_mismatch_behaviorignore 이면, Spark Connector는 오류를 무시합니다.

    • 드라이버는 Snowflake 테이블에 해당 열이 없는 Spark 데이터 프레임의 모든 열을 삭제합니다.

    • 드라이버는 Spark 데이터 프레임에 해당 열이 없는 Snowflake 테이블의 열에 NULLs을 삽입합니다.

가능한 오류는 다음과 같습니다.

  • Spark 데이터 프레임에 대/소문자(대문자/소문자)를 제외하고 동일한 열이 포함될 수 있습니다. 열 이름 매핑에서는 대/소문자를 구분하지 않기 때문에 데이터 프레임에서 테이블로의 올바른 매핑을 결정할 수 없습니다.

  • Snowflake 테이블에 대/소문자(대문자/소문자)를 제외하고 동일한 열이 포함될 수 있습니다. 열 이름 매핑에서는 대/소문자를 구분하지 않기 때문에 데이터 프레임에서 테이블로의 올바른 매핑을 결정할 수 없습니다.

  • Spark 데이터 프레임과 Snowflake 테이블에 공통되는 열 이름이 없을 수 있습니다. 이론적으로 Spark Connector는 모든 행의 모든 열에 NULLs을 삽입할 수 있지만, 이는 일반적으로 무의미하므로 column_mismatch_behaviorignore 로 설정된 경우에도 커넥터에서 오류가 throw됩니다.

이 매개 변수에서 사용할 수 있는 값은 다음과 같습니다.

  • error

  • ignore

기본값은 error 입니다.

time_output_format

이 매개 변수를 사용하면 반환되는 TIME 데이터의 타입을 지정할 수 있습니다.

이 매개 변수에서 사용할 수 있는 값은 시간 형식 에 지정된 시간 타입에서 사용할 수 있는 값입니다.

이 매개 변수는 입력이 아닌 출력에만 영향을 줍니다.

timestamp_ntz_output_format, . timestamp_ltz_output_format, . timestamp_tz_output_format

이러한 옵션은 타임스탬프 값의 출력 형식을 지정합니다. 이러한 옵션의 기본값은 다음과 같습니다.

구성 옵션

기본값

timestamp_ntz_output_format

"YYYY-MM-DD HH24:MI:SS.FF3"

timestamp_ltz_output_format

"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"

timestamp_tz_output_format

"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"

이러한 옵션이 "sf_current" 로 설정되면 커넥터는 세션에 대해 지정된 형식을 사용합니다.

partition_size_in_mb

이 매개 변수는 쿼리 결과 세트의 크기가 매우 커 여러 DataFrame 파티션으로 분할해야 하는 경우에 사용됩니다. 이 매개 변수는 각 DataFrame 파티션에 권장되는 압축되지 않은 크기를 지정합니다. 파티션 수를 줄이려면 이 크기를 늘립니다.

이 크기가 권장 크기로 사용되지만, 파티션의 실제 크기는 더 작거나 클 수 있습니다.

이 옵션은 use_copy_unload 매개 변수가 FALSE로 설정된 경우에만 적용됩니다.

이 매개 변수는 선택 사항입니다.

기본값은 100 (MB)입니다.

use_copy_unload

이 매개 변수가 FALSE 이면 Snowflake는 SELECTing 데이터인 경우 화살표 데이터 타입을 사용합니다. 이 값이 TRUE 로 설정된 경우, Snowflake는 COPY UNLOAD 명령을 사용하여 선택된 데이터를 전송하는 이전 동작이 수행됩니다.

이 매개 변수는 선택 사항입니다.

기본값은 FALSE 입니다.

키 페어 인증 & 키 페어 순환 사용하기

Spark 커넥터는 키 페어 인증 및 키 순환을 지원합니다.

  1. 시작하려면, 키 페어 인증 & 키 페어 순환 에서와 같이 키 페어 인증을 위한 초기 구성을 수행합니다.

  2. pem_private_key 연결 옵션을 사용하여 개인 키의 암호화되지 않은 복사본을 전송합니다.

주의

보안상의 이유로 애플리케이션에서 pem_private_key 를 하드코딩하는 것보다 보안 소스에서 키를 읽은 후 해당 매개 변수를 동적으로 설정해야 합니다. 키가 암호화된 경우 암호를 해독하고 해독된 버전을 전송합니다.

Python 예에서 pem_private_key 파일인 rsa_key.p8 은:

  • PRIVATE_KEY_PASSPHRASE 환경 변수를 사용하여 비밀번호로 보호된 파일에서 직접 읽습니다.

  • sfOptions 문자열에서 pkb 식을 사용합니다.

연결하려면 Python 예제를 파일(즉, <file.py>)에 저장하고 다음 명령을 실행합니다.

spark-submit --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3 <file.py>

Python

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os

with open("<path>/rsa_key.p8", "rb") as key_file:
  p_key = serialization.load_pem_private_key(
    key_file.read(),
    password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
    backend = default_backend()
    )

pkb = p_key.private_bytes(
  encoding = serialization.Encoding.PEM,
  format = serialization.PrivateFormat.PKCS8,
  encryption_algorithm = serialization.NoEncryption()
  )

pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Simple App')

sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "pem_private_key" : pkb,
  "sfDatabase" : "<database>",
  "sfSchema" : "schema",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "COLORS") \
    .load()

df.show()

External OAuth 사용하기

Spark Connector 버전 2.7.0부터 외부 OAuth 를 사용하여 샘플 Scala 프로그램 또는 샘플 Python 스크립트를 사용하여 Snowflake에 인증할 수 있습니다.

External OAuth 및 Spark 커넥터를 사용하여 Snowflake에 인증하기 전, 지원되는 External OAuth 인증 서버 중 1개 또는 External OAuth 사용자 지정 클라이언트 에 External OAuth 보안 통합을 구성해야 합니다.

Scala 및 Python 예제에서 sfPassword 매개 변수를 sfAuthenticatorsfToken 매개 변수로 대체했음에 유의하십시오.

Scala:

// spark connector version

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
import net.snowflake.spark.snowflake2.Utils.SNOWFLAKE_SOURCE_NAME
import org.apache.spark.sql.DataFrame

var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<username>",
    "sfAuthenticator" -> "oauth",
    "sfToken" -> "<external_oauth_access_token>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "region")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Python:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfAuthenticator" : "oauth",
  "sfToken" : "<external_oauth_access_token>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()

외부 데이터 전송을 위한 AWS 옵션

이러한 옵션은 임시 데이터가 저장되는 Amazon S3 위치를 지정하고 해당 위치에 액세스하기 위한 인증 세부 정보를 제공하기 위해 사용됩니다. 그리고 외부 데이터 전송을 수행하는 경우에만 필요합니다. 다음 중 1개에 해당하는 경우 외부 데이터 전송이 필요합니다.

  • Spark Connector의 버전 2.1.x 이하(내부 전송을 지원하지 않음)를 사용하는 중입니다. 또는

  • 전송에 36시간 이상이 걸릴 수 있습니다(내부 전송에서는 36시간 후에 만료되는 임시 자격 증명이 사용됨).

tempDir

임시 데이터가 저장된 S3 위치(예: s3n://xy12345-bucket/spark-snowflake-tmp/)입니다.

tempDir 이 지정된 경우에는 다음 중 1개도 지정해야 합니다.

  • awsAccessKey , awsSecretKey . 또는

  • temporary_aws_access_key_id , temporary_aws_secret_access_key, temporary_aws_session_token

awsAccessKey , awsSecretKey

이 옵션은 tempDir 에 지정된 위치에 대한 액세스를 허용하는 표준 AWS 자격 증명입니다. 이러한 두 옵션을 함께 설정해야 한다는 점에 유의하십시오.

설정된 경우, 기존 SparkContext 오브젝트에서 검색할 수 있습니다.

이러한 변수를 지정하는 경우에는 tempDir 도 지정해야 합니다.

이러한 자격 증명은 Hadoop 클러스터에 대해서도 설정되어야 합니다.

temporary_aws_access_key_id , temporary_aws_secret_access_key, temporary_aws_session_token

이 옵션은 tempDir 에 지정된 위치에 대한 액세스를 허용하는 임시 AWS 자격 증명입니다. 이러한 세 옵션을 모두 설정해야 한다는 점에 유의하십시오.

또한, 이러한 옵션이 설정된 경우 awsAccessKeyawsSecretKey 옵션보다 우선 적용됩니다.

temporary_aws_access_key_id , temporary_aws_secret_access_keytemporary_aws_session_token 을 지정하면 tempDir 도 지정해야 합니다. 그렇지 않으면, 이러한 매개 변수가 무시됩니다.

check_bucket_configuration

on (기본값)으로 설정하면 커넥터는 데이터 전송에 사용되는 버킷에 수명 주기 정책이 구성되어 있는지 확인합니다(자세한 내용은 AWS 외부 S3 버킷 준비하기 참조). 수명 주기 정책이 없는 경우 경고가 로그에 기록됩니다.

이 옵션을 비활성화하면(off 로 설정하여) 이 검사를 건너뜁니다. 이는 사용자가 버킷 데이터 작업에 액세스할 수 있지만, 버킷 수명 주기 정책에는 액세스할 수 없는 경우에 유용할 수 있습니다. 옵션을 비활성화하면 쿼리가 약간 빠르게 실행될 수 있습니다.

자세한 내용은 이 항목의 데이터 교환을 위해 S3 인증하기 섹션을 참조하십시오.

외부 데이터 전송을 위한 Azure 옵션

이 섹션에서는 외부 데이터 전송을 수행할 때 Azure Blob 저장소에 적용되는 매개 변수에 대해 설명합니다. 다음 중 1개에 해당하는 경우 외부 데이터 전송이 필요합니다.

  • Spark Connector의 버전 2.1.x 이하(내부 전송을 지원하지 않음)를 사용하는 중입니다. 또는

  • 전송에 36시간 이상이 걸릴 수 있습니다(내부 전송에서는 36시간 후에 만료되는 임시 자격 증명이 사용됨).

Azure Blob 저장소에서 외부 전송을 사용하는 경우 아래에 설명된 매개 변수를 사용하여 Azure 컨테이너의 위치와 해당 컨테이너에 대한 SAS(공유 액세스 서명)를 지정합니다.

tempDir

임시 데이터가 저장되는 Azure Blob 저장소 컨테이너입니다. 이는 URL의 형식이며, 예를 들면 다음과 같습니다.

wasb://<azure_컨테이너>@<azure_계정>.<azure_엔드포인트>/

temporary_azure_sas_token

Azure Blob 저장소에 대한 SAS 토큰을 지정합니다.

자세한 내용은 이 항목의 데이터 교환을 위해 Azure 인증하기 섹션을 참조하십시오.

Spark에서 임시 저장소에 대한 Azure 정보 지정하기

Azure Blob 저장소를 사용하여 Spark와 Snowflake 사이에서 데이터를 전송하기 위한 임시 저장소를 제공하는 경우, Spark 및 Snowflake Spark Connector에 임시 저장소의 위치 및 자격 증명을 제공해야 합니다.

Spark에 임시 저장 위치를 제공하려면 Spark 클러스터에서 다음과 유사한 명령을 실행합니다.

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

마지막 명령에 포함된 변수는 다음과 같음에 유의하십시오.

  • <컨테이너><계정>: Azure 배포에 대한 컨테이너 및 계정의 이름입니다.

  • <azure_엔드포인트>: Azure 배포 위치에 대한 엔드포인트입니다. 예를 들어, Azure US 배포를 사용하는 경우 엔드포인트은 blob.core.windows.net 일 가능성이 높습니다.

  • <azure_sas>: Shared Access Signature 보안 토큰입니다.

이러한 각 변수를 Azure Blob 저장소 계정에 대한 적절한 정보로 바꿉니다.

커넥터에 대한 옵션으로 Snowflake 세션 매개 변수 전달하기

Spark용 Snowflake Connector는 임의의 세션 수준 매개 변수를 Snowflake로 전송하는 기능을 지원합니다(자세한 내용은 세션 매개 변수 참조). 이를 위해서는 ("<키>" -> "<값>") 페어를 options 오브젝트에 추가합니다. 여기서, <키> 는 세션 매개 변수의 이름이고 <값> 은 값입니다.

참고

<값> 은 숫자 또는 부울 값(예: "1" 또는 "true")을 허용하는 매개 변수의 경우에도 큰따옴표로 묶인 문자열이어야 합니다.

예를 들어, 다음 코드 샘플은 이전에 실행된 쿼리의 결과를 사용하지 않도록 설정하는 "false" 값으로 USE_CACHED_RESULT 세션 매개 변수를 전달합니다.

// ... assuming sfOptions contains Snowflake connector options

// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")

// ... now use sfOptions with the .options() method

보안 고려 사항

고객은 멀티 노드 Spark 시스템에서 노드 간 통신이 안전한지 확인해야 합니다. Spark 마스터는 Snowflake 자격 증명을 Spark 작업자에게 전송하여 해당 작업자가 Snowflake 스테이지에 액세스할 수 있도록 합니다. Spark 마스터와 Spark 작업자 간 통신이 안전하지 않은 경우, 권한이 없는 제3자가 해당 자격 증명을 읽을 수 있습니다.

데이터 교환을 위해 S3 인증하기

이 섹션에서는 데이터를 교환하기 위해 S3를 사용할 때 인증하는 방법을 설명합니다.

이 작업은 다음 상황 중 1개에 해당하는 경우에만 필요합니다.

  • Spark용 Snowflake Connector의 버전이 2.1.x 이하입니다. v2.2.0부터 커넥터는 Snowflake 내부 임시 스테이지를 사용하여 데이터를 교환합니다. 현재 2.2.0 이상 버전을 사용하지 않는 경우 Snowflake는 최신 버전으로 업그레이드할 것을 강력하게 권장합니다.

  • Spark용 Snowflake Connector 버전이 2.2.0 이상이지만, 작업 실행 시간이 정기적으로 36시간을 초과합니다. 이 시간은 커넥터가 데이터를 교환하기 위해 내부 스테이지에 액세스하기 위해 사용하는 AWS 토큰의 최대 기간입니다.

이전 버전의 커넥터를 사용하는 경우 커넥터가 Snowflake와 Spark 간에 데이터를 교환하기 위해 사용할 수 있는 S3 위치를 준비해야 합니다.

Spark와 Snowflake 사이에서 데이터를 교환하기 위해 사용되는 S3 버킷/디렉터리에 대한 액세스할 수 있도록(tempDir 에 지정된 대로) 다음과 같은 2가지 인증 방법이 지원됩니다.

  • 영구 AWS 자격 증명(S3에 액세스하기 위한 Hadoop/Spark 인증 구성에서도 사용됨)

  • 임시 AWS 자격 증명

영구 AWS 자격 증명 사용하기

표준 AWS 인증 방법입니다. awsAccessKeyawsSecretKey 값의 페어가 필요합니다.

참고

이 값은 S3에 액세스하기 위해 Hadoop/Spark를 구성하기 위해서도 사용해야 합니다. 예를 포함한 자세한 내용은 이 항목의 S3A 또는 S3N을 사용하여 Hadoop/Spark 인증하기 섹션을 참조하십시오.

예:

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>")

// Then, configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>",
    "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"),
    "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"),
    "tempdir" -> "s3n://<temp-bucket-name>"
)

sfOptions 에서 지원되는 옵션에 대한 자세한 내용은 이 항목의 외부 데이터 전송을 위한 AWS 옵션 섹션을 참조하십시오.

S3A 또는 S3N을 사용하여 Hadoop/Spark 인증하기

Hadoop/Spark 에코시스템에서는 S3에 액세스 하기 위한 2가지 URI 방식을 지원합니다.

s3a://

새로운 권장 방법(Hadoop 2.7 이상)

이 방법을 사용하려면 이 항목의 Scala 예제를 수정하여 다음 Hadoop 구성 옵션을 추가해야 합니다.

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", <accessKey>)
hadoopConf.set("fs.s3a.secret.key", <secretKey>)

tempdir 옵션에서 s3a:// 도 사용하는지 확인하십시오.

s3n://

기존 방법(Hadoop 2.6 이하)

일부 시스템에서는 다음 Scala 예제와 같이 명시적으로 지정해야 합니다.

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>)
hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)

임시 AWS 자격 증명 사용하기

이 방법에서는 커넥터에 대한 temporary_aws_access_key_id, temporary_aws_secret_access_keytemporary_aws_session_token 구성 옵션을 사용합니다.

이 방법에서는 데이터 교환에 사용되는 S3 버킷/디렉터리에 대한 임시 액세스 권한만을 Snowflake에 제공하여 추가 보안을 허용합니다.

참고

임시 자격 증명은 커넥터에 대한 S3 인증을 구성하기 위해서만 사용할 수 있으며, Hadoop/Spark 인증을 구성하기 위해서는 사용할 수 없습니다.

또한, 임시 자격 증명을 제공하면 제공된 영구 자격 증명보다 우선 적용됩니다.

다음 Scala 코드 샘플은 임시 자격 증명을 사용하여 인증하는 예를 제공합니다.

import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest

import net.snowflake.spark.snowflake.Parameters

// ...

val sts_client = new AWSSecurityTokenServiceClient()
val session_token_request = new GetSessionTokenRequest()

// Set the token duration to 2 hours.

session_token_request.setDurationSeconds(7200)
val session_token_result = sts_client.getSessionToken(session_token_request)
val session_creds = session_token_result.getCredentials()

// Create a new set of Snowflake connector options, based on the existing
// sfOptions definition, with additional temporary credential options that override
// the credential options in sfOptions.
// Note that constants from Parameters are used to guarantee correct
// key names, but literal values, such as temporary_aws_access_key_id are, of course,
// also allowed.

var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions
sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId())
sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey())
sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())

이제 sfOptions2options() DataFrame 메서드와 함께 사용할 수 있습니다.

데이터 교환을 위해 Azure 인증하기

이 섹션에서는 데이터를 교환하기 위해 Azure Blob 저장소를 사용할 때 인증하는 방법을 설명합니다.

다음 상황 중 1개에 해당하는 경우에만 이 방법으로 인증해야 합니다.

  • Spark용 Snowflake Connector의 버전이 2.1.x 이하입니다. v2.2.0부터 커넥터는 Snowflake 내부 임시 스테이지를 사용하여 데이터를 교환합니다. 현재 2.2.0 이상 버전을 사용하지 않는 경우 Snowflake는 최신 버전으로 업그레이드할 것을 강력하게 권장합니다.

  • Spark용 Snowflake Connector 버전이 2.2.0 이상이지만, 작업 실행 시간이 정기적으로 36시간을 초과합니다. 이 시간은 커넥터가 데이터 교환을 위한 내부 스테이지에 액세스하기 위해 사용하는 Azure 토큰의 최대 기간입니다.

커넥터가 Snowflake와 Spark 사이에서 데이터를 교환하기 위해 사용할 수 있는 Azure Blob 저장소 컨테이터를 준비합니다.

Azure 자격 증명 사용하기

표준 Azure Blob 저장소 인증 방법입니다. tempDir (URL) 및 temporary_azure_sas_token 값의 값 페어가 필요합니다.

참고

이 값은 Azure Blob 저장소에 액세스하기 위해 Hadoop/Spark를 구성하기 위해서도 사용해야 합니다. 예를 포함한 자세한 내용은 이 항목의 Azure를 사용하여 Hadoop/Spark 인증하기 섹션을 참조하십시오.

예:

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

// Then, configure your Snowflake environment
//
val sfOptions = Map(
  "sfURL" -> "<account_identifier>.snowflakecomputing.com",
  "sfUser" -> "<user_name>",
  "sfPassword" -> "<password>",
  "sfDatabase" -> "<database_name>",
  "sfSchema" -> "<schema_name>",
  "sfWarehouse" -> "<warehouse_name>",
  "sfCompress" -> "on",
  "sfSSL" -> "on",
  "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/",
  "temporary_azure_sas_token" -> "<azure_sas>"
)

sfOptions 에서 지원되는 옵션에 대한 자세한 내용은 이 항목의 외부 데이터 전송을 위한 Azure 옵션 섹션을 참조하십시오.

Azure를 사용하여 Hadoop/Spark 인증하기

이 방법을 사용하려면 이 항목의 Scala 예제를 수정하여 다음 Hadoop 구성 옵션을 추가해야 합니다.

val hadoopConf = sc.hadoopConfiguration
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

tempdir 옵션에서 wasb:// 도 사용하는지 확인하십시오.

브라우저를 통한 인증은 지원되지 않음

Spark Connector를 사용할 때 사용자에게 자격 증명을 요청하기 위해 브라우저 창을 여는 인증 형식을 사용하는 것은 비현실적입니다. 클라이언트 시스템에서 창이 반드시 표시되어야 하는 것은 아닙니다. 그러므로 Spark Connector는 MFA(다단계 인증) 또는 SSO(Single Sign-On) 등 브라우저 창을 호출하는 인증 타입을 지원하지 않습니다.

맨 위로 이동