스칼라 Scala UDF에서 전역 상태 제어하기

공유 상태에 대한 액세스가 필요한 UDF와 처리기를 설계할 때 Snowflake가 UDF를 실행하여 행을 처리하는 방식을 설명해야 합니다.

대부분의 처리기는 다음 지침을 따라야 합니다.

  • 행 간에 변경되지 않는 공유 상태를 초기화해야 하는 경우 처리기 함수 외부에서(예: 생성자에서) 초기화하십시오.

  • 스레드로부터 안전하도록 핸들러 메서드를 작성하십시오.

  • 행 간에 동적 상태를 저장 및 공유하지 마십시오.

UDF가 이러한 지침을 따를 수 없거나 이러한 지침의 이유를 더 깊이 이해하려는 경우, 다음 몇 가지 하위 섹션을 읽으십시오.

호출 간 상태 공유하기

Snowflake는 스칼라 UDF가 독립적으로 처리될 것으로 예상합니다. 호출 간에 공유되는 상태에 의존하면 예기치 않은 동작이 발생할 수 있습니다. 이는 시스템에서 행을 임의의 순서로 처리하고 여러 JVM(Java 또는 Scala로 작성된 처리기의 경우)에 걸쳐 이러한 호출을 분산시킬 수 있기 때문입니다.

UDF는 핸들러 메서드에 대한 호출에서 공유 상태에 의존하는 것을 피해야 합니다. 그러나 사용자가 UDF에서 공유 상태를 저장하려 할 수 있는 다음 두 가지 상황이 있습니다.

  • 각 행에 대해 반복하고 싶지 않은, 부담이 큰 초기화 논리가 포함된 코드입니다.

  • 캐시와 같이 행 간에 공유 상태를 활용하는 코드입니다.

여러 행에서 상태를 공유해야 하고 해당 상태가 시간이 지나도 변경되지 않는 경우, 생성자를 사용하여 인스턴스 수준 변수를 설정해 공유 상태를 만드십시오. 생성자는 인스턴스당 한 번만 실행되는 반면 핸들러는 행당 한 번 호출되므로 핸들러가 여러 행을 처리할 때 생성자에서 초기화하는 것이 부담이 더 적습니다. 그리고 생성자는 한 번만 호출되기 때문에 생성자는 스레드로부터 안전하도록 작성할 필요가 없습니다.

변경되는 공유 상태를 UDF가 저장하는 경우, 해당 상태에 대한 동시 액세스를 처리할 수 있도록 코드를 준비해야 합니다.

병렬 처리 및 공유 상태에 대한 자세한 내용은 이 항목의 병렬 처리 이해하기JVM 상태 정보 저장 섹션을 참조하십시오.

병렬 처리 이해하기

성능을 향상시키기 위해 Snowflake는 JVM 전체에 걸쳐, 그리고 JVM 내에서 병렬 처리를 합니다.

JVM 전체에 걸쳐 병렬 처리하기

Snowflake는 웨어하우스 의 작업자 전체에 걸쳐 병렬 처리를 합니다. 각 작업자는 하나 이상의 JVM을 실행합니다. 이는 전역 공유 상태가 없음을 의미합니다. 기껏해야 단일 JVM 내에서만 상태를 공유할 수 있습니다.

JVM 내에서 병렬 처리하기

  • 각 JVM은 동일한 인스턴스의 핸들러 메서드를 병렬로 호출할 수 있는 여러 스레드를 실행할 수 있습니다. 이는 각 핸들러 메서드가 스레드로부터 안전해야 함을 의미합니다.

  • UDF가 IMMUTABLE이고 SQL 문이 동일 행에 대해 동일 인자를 사용하여 UDF를 두 번 이상 호출하는 경우, UDF는 해당 행의 각 호출에 대해 동일 값을 반환합니다.

    예를 들어, UDF가 IMMUTABLE인 경우 다음은 각 행에 대해 동일 값을 두 번 반환합니다.

    SELECT my_scala_udf(42), my_scala_udf(42) FROM table1;
    
    Copy

    동일 인자가 전달된 경우에도 여러 호출이 독립 값을 반환하고 함수 VOLATILE을 선언하지 않으려면 여러 개별 UDF를 동일 핸들러 메서드에 바인딩하십시오.

    다음 단계에 따라 이 작업을 수행할 수 있습니다.

    1. 다음 코드를 사용하여 @udf_libs/rand.jar 라는 JAR 파일을 만듭니다.

      class MyClass {
      
          var x: Double = 0.0
      
          // Constructor
          def this() = {
              x = Math.random()
          }
      
          // Handler
          def myHandler(): Double = x
      }
      
      Copy
    2. 아래와 같이 Scala UDF를 만듭니다.

      이러한 UDF는 이름이 다르지만, 동일 JAR 파일을 사용하며, 해당 JAR 파일 내에서 동일 핸들러를 사용합니다.

      CREATE FUNCTION my_scala_udf_1()
        RETURNS DOUBLE
        LANGUAGE SCALA
        IMPORTS = ('@udf_libs/rand.jar')
        HANDLER = 'MyClass.myHandler';
      
      CREATE FUNCTION my_scala_udf_2()
        RETURNS DOUBLE
        LANGUAGE SCALA
        IMPORTS = ('@udf_libs/rand.jar')
        HANDLER = 'MyClass.myHandler';
      
      Copy
    3. 다음 코드를 사용하여 두 UDF를 모두 호출합니다.

      UDF는 동일한 JAR 파일 및 핸들러를 가리킵니다. 이러한 호출은 동일한 클래스의 두 인스턴스를 만듭니다. 각 인스턴스는 독립적인 값을 반환하므로 아래 예에서는 동일 값을 두 번 반환하는 대신 두 개의 독립적인 값을 반환합니다.

      SELECT my_scala_udf_1(), my_scala_udf_2() FROM table1;
      
      Copy

JVM 상태 정보 저장

동적 공유 상태에 의존하지 않는 한 가지 이유는 행이 반드시 예측 가능한 순서로 처리되는 것은 아니기 때문입니다. SQL 문이 실행될 때마다 Snowflake는 배치 수, 배치가 처리되는 순서, 배치 내의 행 순서를 변경할 수 있습니다. 한 행이 후속 행의 반환 값에 영향을 미치도록 스칼라 UDF가 설계된 경우, UDF는 UDF가 실행될 때마다 다른 결과를 반환할 수 있습니다.