Python으로 저장 프로시저 작성하기

이 항목에서는 Python에서 저장 프로시저를 작성하는 방법에 대해 설명합니다. 저장 프로시저 내에서 Snowpark 라이브러리를 사용하여 Snowflake의 테이블에 대한 쿼리, 업데이트 및 기타 작업을 수행할 수 있습니다.

이 항목의 내용:

소개

Snowpark 저장 프로시저를 사용하면 Snowflake 웨어하우스를 컴퓨팅 프레임워크로 사용하여 Snowflake 내에서 데이터 파이프라인을 구축하고 실행할 수 있습니다. Python용 Snowpark API를 사용하여 데이터 파이프라인을 만들어 저장 프로시저를 작성합니다. 이러한 저장 프로시저의 실행을 예약하려면 작업 을 사용합니다.

머신 러닝 모델과 Snowpark Python에 대한 자세한 내용은 Snowpark Python으로 머신 러닝 모델 학습시키기 섹션을 참조하십시오.

Python 워크시트를 사용 하거나 로컬 개발 환경을 사용 하여 Python용 Snowpark 저장 프로시저를 작성할 수 있습니다.

처리기 코드가 실행될 때 로그 및 추적 데이터를 캡처할 수 있습니다. 자세한 내용은 로깅 및 추적 개요 섹션을 참조하십시오.

참고

익명 프로시저를 만들기도 하고 호출도 하려면 CALL(익명 프로시저 사용) 를 사용하십시오. 익명 프로시저를 만들고 호출하는 데는 CREATE PROCEDURE 스키마 권한이 있는 역할이 필요하지 않습니다.

저장 프로시저를 로컬에서 작성하기 위한 전제 조건

로컬 개발 환경에서 Python 저장 프로시저를 작성하려면 다음 전제 조건을 충족해야 합니다.

  • 버전 0.4.0 또는 최신 버전의 Snowpark 라이브러리를 사용해야 합니다.

  • Snowpark Python이 필요한 서드 파티 종속성을 로드할 수 있도록 Anaconda 패키지를 활성화합니다. Anaconda의 서드 파티 패키지 사용하기 섹션을 참조하십시오.

  • 지원되는 Python 버전은 다음과 같습니다.

    • 3.8

    • 3.9

    • 3.10

    • 3.11

Snowpark 라이브러리를 사용하기 위한 개발 환경을 설정해야 합니다. Snowpark를 위한 개발 환경 설정하기 를 참조하십시오.

저장 프로시저에 대한 Python 코드 작성하기

프로시저 논리의 경우, 프로시저가 호출될 때 실행되는 처리기 코드를 작성합니다. 이 섹션에서는 처리기의 설계에 대해 설명합니다.

여러 가지 방법으로 처리기 코드에서 저장 프로시저를 만들 수 있습니다.

제한 사항

Snowpark 저장 프로시저에는 다음과 같은 제한 사항이 있습니다.

  • 저장 프로시저에서는 프로세스 생성이 지원되지 않습니다.

  • 저장 프로시저에서는 동시 쿼리 실행이 지원되지 않습니다.

  • PUT 및 GET 명령을 실행하는 API(Session.sql("PUT ...")Session.sql("GET ...") 포함)를 사용할 수 없습니다.

  • session.file.get 을 사용하여 스테이지에서 파일을 다운로드할 때는 패턴 일치가 지원되지 않습니다.

  • 작업에서 저장 프로시저를 실행하는 경우, 작업을 생성할 때 웨어하우스를 지정해야 합니다. 사용자는 Snowflake가 관리하는 컴퓨팅 리소스를 사용하여 작업을 실행할 수 없습니다.

  • 명명된 임시 오브젝트 만들기는 소유자의 권한 저장 프로시저에서 지원되지 않습니다. 소유자의 권한 저장 프로시저는 저장 프로시저 소유자의 권한으로 실행되는 저장 프로시저입니다. 자세한 내용은 호출자 권한 또는 소유자 권한 을 참조하십시오.

저장 프로시저 작성 계획하기

저장 프로시저는 Snowflake 내부에서 실행되므로 그 점을 염두에 두고 작성하는 코드를 계획해야 합니다.

  • 사용되는 메모리양을 제한합니다. Snowflake는 필요한 메모리 양 측면에서 메서드에 제한을 둡니다. 지침은 Snowflake에서 적용한 제약 조건 내에서 유지되는 처리기 설계하기 섹션을 참조하십시오.

  • 처리기 메서드 또는 함수가 스레드로부터 안전한지 확인하십시오.

  • 규칙 및 보안 제한 사항을 따르십시오. UDF 및 프로시저의 보안 모범 사례 섹션을 참조하십시오.

  • 저장 프로시저를 호출자의 권한으로 실행할지 소유자의 권한으로 실행할지 여부를 결정합니다.

  • 저장 프로시저를 실행하는 데 사용되는 snowflake-snowpark-python 버전을 고려하십시오. 저장 프로시저 출시 프로세스의 제한 사항으로 인해, Python 저장 프로시저 환경에서 사용할 수 있는 snowflake-snowpark-python 라이브러리는 보통 공개적으로 출시된 버전보다 한 버전 뒤처진 버전입니다. 다음 SQL을 사용하여 사용 가능한 최신 버전을 찾습니다.

    select * from information_schema.packages where package_name = 'snowflake-snowpark-python' order by version desc;
    
    Copy

메서드 또는 함수 작성하기

저장 프로시저에 대한 메서드 또는 함수를 작성할 때 다음 사항에 유의하십시오.

  • Snowpark Session 오브젝트를 메서드 또는 함수의 첫 번째 인자로 지정합니다. 저장 프로시저를 호출하면 Snowflake는 자동으로 Session 오브젝트를 생성하여 저장 프로시저에 전달합니다. (Session Session 오브젝트를 직접 만들 수는 없습니다.)

  • 나머지 인자와 반환 값의 경우, Snowflake 데이터 타입 에 해당하는 Python 타입을 사용합니다. Snowflake는 매개 변수 및 반환 형식에 대한 SQL-Python 데이터 타입 매핑 에 나열된 Python 데이터 타입을 지원합니다.

오류 처리

일반적인 Python 예외 처리 기법을 사용하여 프로시저 내에서 오류를 포착할 수 있습니다.

메서드 내에서 포착되지 않은 예외가 발생하면 Snowflake는 예외에 대한 스택 추적을 포함하는 오류를 발생시킵니다. 처리되지 않은 예외 로깅 이 활성화되면 Snowflake는 이벤트 테이블에 처리되지 않은 예외에 대한 데이터를 기록합니다.

코드에 종속성을 사용할 수 있도록 만들기

처리기 코드가 처리기 자체 외부에 정의된 코드(예: 모듈에 정의된 코드) 또는 리소스 파일에 의존하는 경우 이러한 종속성을 스테이지에 업로드하여 코드에서 사용 가능하게 만들 수 있습니다. 코드에 종속성을 사용할 수 있도록 만들기 섹션을 참조하시고, Python 워크시트에 대해서는 스테이지의 Python 파일을 워크시트에 추가하기 섹션을 참조하십시오.

SQL을 사용하여 저장 프로시저를 만드는 경우 CREATE PROCEDURE 문 을 작성할 때 IMPORTS 절을 사용하여 종속성 파일을 가리킵니다.

저장 프로시저에서 Snowflake의 데이터에 액세스하기

Snowflake의 데이터에 액세스하려면 Snowpark 라이브러리 API를 사용합니다.

Python 저장 프로시저에 대한 호출을 처리할 때 Snowflake는 Snowpark Session 오브젝트를 생성하고 저장 프로시저에 대한 메서드나 함수에 오브젝트를 전달합니다.

다른 언어의 저장 프로시저의 경우와 마찬가지로 세션의 컨텍스트(예: 권한, 현재 데이터베이스, 스키마 등)는 저장 프로시저가 호출자의 권한으로 실행되는지 소유자의 권한으로 실행되는지에 따라 결정됩니다. 자세한 내용은 Accessing and Setting the Session State 섹션을 참조하십시오.

Session 오브젝트를 사용하여 Snowpark 라이브러리 에서 API를 호출할 수 있습니다. 예를 들어 테이블에 대한 DataFrame을 만들거나 SQL 문을 실행할 수 있습니다.

자세한 내용은 Snowpark 개발자 가이드 를 참조하십시오.

데이터 액세스 예

다음은 지정된 수의 행을 한 테이블에서 다른 테이블로 복사하는 Python 메서드의 예입니다. 이 메서드는 다음 인자를 사용합니다.

  • Snowpark Session 오브젝트

  • 행을 복사할 출처가 되는 테이블의 이름

  • 행이 저장될 테이블의 이름

  • 복사할 행의 수

이 예의 메서드는 문자열을 반환합니다. Python 워크시트 에서 이 예제 코드를 실행하는 경우 워크시트의 반환 유형String 으로 변경합니다.

def run(session, from_table, to_table, count):

  session.table(from_table).limit(count).write.save_as_table(to_table)

  return "SUCCESS"
Copy

파일 읽기

Snowpark snowflake.snowpark.files 모듈의 SnowflakeFile 클래스를 사용하여 Python 처리기의 스테이지에서 파일을 동적으로 읽을 수 있습니다.

Snowflake는 저장 프로시저와 사용자 정의 함수 모두에 대해 SnowflakeFile 을 사용하여 파일 읽기를 지원합니다. 처리기 코드에서 파일 읽기에 대한 자세한 내용과 더 많은 예제는 Python UDF 처리기로 파일 읽기 를 참조하십시오.

이 예는 SnowflakeFile 클래스를 사용하여 파일을 읽는 소유자의 권한 저장 프로시저를 만들고 호출하는 방법을 보여줍니다.

mode 인자에 대해 rb 를 전달하여 입력 모드를 바이너리로 지정하여 인라인 처리기로 저장 프로시저를 만듭니다.

CREATE OR REPLACE PROCEDURE calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile

def run(ignored_session, file_path):
    with SnowflakeFile.open(file_path, 'rb') as f:
        return imagehash.average_hash(Image.open(f))
$$;
Copy

저장 프로시저를 호출합니다.

CALL calc_phash(build_scoped_file_url(@my_files, 'my_image.jpg'));
Copy

Anaconda의 서드 파티 패키지 사용하기

Python 저장 프로시저를 만들 때 설치할 Anaconda 패키지를 지정할 수 있습니다. Anaconda의 서드 파티 패키지 목록을 보려면 Anaconda Snowflake 채널 을 참조하십시오. 이러한 서드 파티 패키지는 Anaconda에서 만들어 제공합니다. Anaconda의 서비스 약관에 대한 추가 포함 소프트웨어 약관에 따라 현지 테스트 및 개발을 위한 Snowflake conda 채널을 무료로 사용할 수 있습니다.

제한 사항은 제한 사항 섹션을 참조하십시오.

시작하기

Snowflake 내부에서 Anaconda가 제공하는 패키지를 처음 사용하기 전에 Snowflake 서드 파티 약관 에 동의해야 합니다.

참고

조직 관리자(ORGADMIN 역할 사용)가 약관을 수락할 수 있습니다. Snowflake 계정에 대해 약관을 한 번만 수락하면 됩니다. 계정에서 ORGADMIN 역할 활성화하기 섹션을 참조하십시오.

  1. Snowsight 에 로그인합니다.

  2. Admin » Billing & Terms 를 선택합니다.

  3. Anaconda 섹션에서 Enable 을 선택합니다.

  4. Anaconda Packages 대화 상자에서 링크를 클릭하여 Snowflake 서드 파티 약관 페이지 를 검토하십시오.

  5. 약관에 동의하면 Acknowledge & Continue 를 선택합니다.

서비스 약관을 수락하려고 할 때 오류가 나타날 경우 사용자 프로필에 이름, 성 또는 이메일 주소가 누락되었기 때문일 수 있습니다. 관리자 역할이 있는 경우 사용자 프로필에 사용자 세부 정보 추가하기 섹션을 참조하여 Snowsight 를 사용해 프로필을 업데이트하십시오. 그렇지 않으면 관리자에게 문의하여 계정을 업데이트하십시오.

참고

위에 설명된 대로 Snowflake 서드 파티 약관에 동의하지 않더라도 저장 프로시저를 계속 사용할 수 있지만, 다음과 같은 제한 사항이 따릅니다.

  • Anaconda의 서드 파티 패키지는 사용할 수 없습니다.

  • Snowpark Python을 저장 프로시저의 패키지로 계속 지정할 수 있지만, 특정 버전을 지정할 수는 없습니다.

  • DataFrame 오브젝트와 상호 작용할 때는 to_pandas 메서드를 사용할 수 없습니다.

패키지 표시 및 사용하기

Information Schema에서 PACKAGES 뷰를 쿼리하여 사용 가능한 모든 패키지와 해당 버전 정보를 표시할 수 있습니다.

select * from information_schema.packages where language = 'python';
Copy

자세한 내용은 Snowflake Python UDF 설명서에서 서드 파티 패키지 사용 을 참조하십시오.

저장 프로시저 만들기

저장 프로시저는 Python 워크시트에서 만들거나 SQL을 사용하여 만들 수 있습니다.

Python 워크시트 코드를 자동화하는 Python 저장 프로시저 만들기

Python 워크시트에서 코드를 자동화하는 Python 저장 프로시저를 만듭니다. Python 워크시트 작성에 대한 자세한 내용은 Python 워크시트에 Snowpark 코드 작성하기 섹션을 참조하십시오.

전제 조건

자신의 역할에 저장 프로시저로 배포하기 위해 Python 워크시트를 실행하는 데이터베이스 스키마에 대한 OWNERSHIP 또는 CREATE PROCEDURE 권한이 있어야 합니다.

Python 워크시트를 저장 프로시저로 배포하기

Python 워크시트의 코드를 자동화하는 Python 저장 프로시저를 만들려면 다음을 수행하십시오.

  1. Snowsight 에 로그인합니다.

  2. Worksheets 를 엽니다.

  3. 저장 프로시저로 배포하려는 Python 워크시트를 엽니다.

  4. Deploy 를 선택합니다.

  5. 저장 프로시저의 이름을 입력합니다.

  6. (선택 사항) 저장 프로시저에 대한 세부 정보가 포함된 설명을 입력합니다.

  7. (선택 사항) Replace if exists 를 선택하여 같은 이름의 기존 저장 프로시저를 바꿉니다.

  8. Handler 의 경우 저장 프로시저의 처리기 함수를 선택합니다. 예: main.

  9. 처리기 함수에서 사용하는 인자를 검토하고 필요한 경우 형식화된 인자에 대한 SQL 데이터 타입 매핑을 재정의합니다. Python 유형이 SQL 유형에 매핑되는 방법에 대한 자세한 내용은 SQL-Python 데이터 타입 매핑 섹션을 참조하십시오.

  10. (선택 사항) Open in Worksheets 를 선택하여 SQL 워크시트에서 저장 프로시저 정의를 엽니다.

  11. Deploy 를 선택하여 저장 프로시저를 만듭니다.

  12. 저장 프로시저가 생성된 후 프로시저 세부 정보로 이동하거나 Done 을 선택할 수 있습니다.

하나의 Python 워크시트에서 여러 저장 프로시저를 만들 수 있습니다.

저장 프로시저를 만든 후 작업의 일부로 자동화할 수 있습니다. 작업을 사용하여 일정에 따라 SQL 문 실행 섹션을 참조하십시오.

테이블 형식 데이터 반환하기

테이블 형식으로 데이터를 반환하는 프로시저를 작성할 수 있습니다. 테이블 형식 데이터를 반환하는 프로시저를 작성하려면 다음을 수행하십시오.

  • CREATE PROCEDURE 문에서 프로시저의 반환 유형으로 TABLE(...) 을 지정합니다.

    반환된 데이터의 열 이름과 유형 을 알고 있는 경우 이들을 TABLE 매개 변수로 지정할 수 있습니다. 프로시저를 정의할 때 반환된 열을 모르는 경우(예: 런타임에 지정된 경우) TABLE 매개 변수를 생략할 수 있습니다. 그렇게 하면 프로시저의 반환 값 열이 해당 처리기에서 반환된 DataFrame의 열에서 변환됩니다. 열 데이터 타입은 SQL-Python 데이터 타입 매핑 에 지정된 매핑에 따라 SQL로 변환됩니다.

  • Snowpark DataFrame에서 테이블 형식 결과를 반환하도록 처리기를 작성합니다.

    데이터 프레임에 대한 자세한 내용은 Snowpark Python에서 DataFrame으로 작업하기 섹션을 참조하십시오.

이 섹션의 예에서는 열이 문자열과 일치하는 행을 필터링하는 프로시저에서 테이블 형식 값을 반환하는 방법을 보여줍니다.

데이터 정의하기

다음은 직원 테이블을 생성하는 코드 예제입니다.

CREATE OR REPLACE TABLE employees(id NUMBER, name VARCHAR, role VARCHAR);
INSERT INTO employees (id, name, role) VALUES (1, 'Alice', 'op'), (2, 'Bob', 'dev'), (3, 'Cindy', 'dev');
Copy

반환 열 이름 및 유형 지정하기

이 예에서는 RETURNS TABLE() 문에 열 이름과 유형을 지정합니다.

CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col

def filter_by_role(session, table_name, role):
   df = session.table(table_name)
   return df.filter(col("role") == role)
$$;
Copy

반환 열 이름 및 유형 생략하기

다음 예제의 코드에서는 처리기의 반환 값에 있는 열에서 반환 값 열 이름 및 유형을 추정할 수 있도록 하는 프로시저를 선언합니다. 이 코드는 RETURNS TABLE() 문에서 열 이름과 유형을 생략합니다.

CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col

def filter_by_role(session, table_name, role):
  df = session.table(table_name)
  return df.filter(col("role") == role)
$$;
Copy

프로시저 호출하기

다음 예에서는 저장 프로시저를 호출합니다.

CALL filterByRole('employees', 'dev');
Copy

프로시저를 호출하면 다음 출력이 생성됩니다.

+----+-------+------+
| ID | NAME  | ROLE |
+----+-------+------+
| 2  | Bob   | dev  |
| 3  | Cindy | dev  |
+----+-------+------+

저장 프로시저 호출하기

저장 프로시저를 만든 후 SQL에서 호출하거나 예약된 작업의 일부로 호출할 수 있습니다.

작업자 프로세스로 동시 작업 실행하기

Python 작업자 프로세스를 사용하여 동시 작업을 실행할 수 있습니다. 웨어하우스 노드에서 여러 CPU 코어를 활용하는 병렬 작업을 실행해야 할 때 이 기능이 유용할 수 있습니다.

참고

기본 제공된 Python 다중 처리 모듈을 사용하지 않는 것이 좋습니다.

Python Global Interpreter Lock 으로 인해 멀티태스킹 접근 방식이 모든 CPU 코어에서 확장되지 못하는 문제를 해결하려면 스레드가 아닌 별도의 작업자 프로세스를 사용하여 동시 작업을 실행할 수 있습니다.

다음 예에서처럼 joblib 라이브러리의 Parallel 클래스를 사용하여 Snowflake 웨어하우스에서 이 작업을 수행할 수 있습니다.

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.8
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;
Copy

참고

joblib.Parallel 에 사용되는 기본 백엔드는 Snowflake 표준과 Snowpark 최적화 웨어하우스 간에 다릅니다.

  • 표준 웨어하우스 기본값: threading

  • Snowpark 최적화 웨어하우스 기본값: loky (다중 처리)

다음 예에서처럼 joblib.parallel_backend 함수를 호출하여 기본 백엔드 설정을 재정의할 수 있습니다.

import joblib
joblib.parallel_backend('loky')
Copy