Python UDF 만들기

이 항목에서는 Python UDF(사용자 정의 함수)를 만들고 설치하는 방법을 보여줍니다.

이 항목의 내용:

Python 코드 쓰기

Python 모듈 및 함수 쓰기

아래 사양을 따르는 모듈을 쓰십시오.

  • 모듈을 정의합니다. 모듈은 Python 정의 및 문을 포함하는 파일입니다.

  • 모듈 내부에 함수를 정의합니다.

  • 함수가 인자를 허용하는 경우, 각 인자는 SQL-Python 형식 매핑 테이블Python Data Type 열에 지정된 데이터 타입 중 하나여야 합니다.

    함수 인자는 이름이 아닌 위치로 바인딩됩니다. UDF에 전달된 첫 번째 인자는 Python 함수에서 수신한 첫 번째 인자입니다.

  • 적절한 반환 값을 지정하십시오. Python UDF는 스칼라 함수여야 하므로 호출될 때마다 하나의 값을 반환해야 합니다. 반환 값의 형식은 SQL-Python 형식 매핑 테이블Python Data Type 열에 지정된 데이터 타입 중 하나여야 합니다. 반환 값의 형식은 CREATE FUNCTION 문의 RETURNS 절에 지정된 SQL 데이터 타입과 호환되어야 합니다.

  • 모듈에는 둘 이상의 함수가 포함될 수 있습니다. Snowflake에서 호출하는 함수는 동일한 모듈의 다른 함수 또는 다른 모듈의 다른 함수를 호출할 수 있습니다.

  • 함수(및 해당 함수에서 호출하는 모든 함수)는 Python UDF에 대해 Snowflake가 부과한 제약 조건 을 준수해야 합니다.

참고

Python UDF 배치 API가 있으며, 이를 사용하면 입력 행 배치를 Pandas DataFrames 로 수신하고 결과 배치를 Pandas 배열 또는 Series 로 반환하는 Python 함수를 정의할 수 있습니다. 자세한 내용은 Python UDF Batch API 섹션을 참조하십시오.

UDF 핸들러로 파일 읽기 및 쓰기

UDF 핸들러 코드로 파일을 읽고 쓸 수 있습니다. Snowflake가 UDF를 실행하는 제한된 엔진 내에서 이 작업을 안전하게 수행하려면 여기에 설명된 지침을 따르십시오.

UDF 핸들러로 파일 읽기

UDF 핸들러는 Snowflake 스테이지에 업로드된 파일을 읽을 수 있습니다. 파일을 호스팅하는 스테이지는 UDF 소유자가 읽을 수 있어야 합니다.

CREATE FUNCTION의 IMPORTS 절에 파일의 스테이지 위치를 지정하면 Snowflake는 특별히 UDF에서 사용할 수 있는 가져오기 디렉터리에 스테이징된 파일을 복사합니다. 핸들러 코드는 그곳에서 파일을 읽을 수 있습니다.

Snowflake는 아마도 여러 스테이지에서 가져왔을 수 있는 모든 파일을 단일 가져오기 디렉터리로 복사합니다. 이러한 이유로, IMPORTS 절에 지정된 파일의 이름은 서로 상대적으로 구별해야 합니다.

예제 코드는 이 항목의 스테이지에서 Python UDF로 파일 로딩하기 섹션을 참조하십시오.

UDF 핸들러 코드로 파일을 읽는 방법은 다음과 같습니다.

  1. 파일을 Snowflake 스테이지에 복사합니다.

    PUT 명령을 사용하여 클라이언트 컴퓨터의 로컬 디렉터리에서 파일을 업로드할 수 있습니다. 자세한 내용은 PUT 섹션을 참조하십시오. 스테이지에 파일을 로딩하는 방법에 대한 더 포괄적인 내용은 데이터 로딩 개요 섹션을 참조하십시오.

  2. CREATE FUNCTION을 사용하여 UDF를 만들 때 IMPORTS 절에 파일의 위치를 지정합니다.

    다음 예제의 코드는 my_stage 라는 스테이지에서 file.txt 파일을 지정합니다.

    create or replace function my_udf()
       ...
       imports=('@my_stage/file.txt')
       ...
    
    Copy
  3. 핸들러 코드에서 가져오기 디렉터리의 파일을 읽습니다.

    Snowflake는 스테이징된 파일을 UDF의 가져오기 디렉터리에 복사합니다. snowflake_import_directory 시스템 옵션을 사용하여 디렉터리의 위치를 검색할 수 있습니다.

    Python 코드에서는 다음 예제와 같이 Python sys._xoptions 메서드를 사용하여 디렉터리의 위치를 검색할 수 있습니다.

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    
    def compute():
       with open(import_dir + 'file.txt', 'r') as file:
          return file.read()
    
    Copy

UDF 핸들러로 파일 쓰기

UDF 핸들러는 UDF를 호출하는 쿼리에 대해 생성된 /tmp 디렉터리에 파일을 쓸 수 있습니다.

/tmp 디렉터리는 단일 호출 쿼리용으로 따로 마련된 것이지만, 여러 Python 작업자 프로세스가 동시에 실행 중일 수도 있습니다. 충돌을 방지하려면 /tmp 디렉터리에 대한 액세스가 다른 Python 작업자 프로세스와 동기화되거나 /tmp에 기록된 파일의 이름이 고유하도록 해야 합니다.

예제 코드는 이 항목의 스테이징된 파일의 압축 풀기 섹션을 참조하십시오.

다음 예제의 코드를 실행하면 입력 text 가 /tmp 디렉터리에 작성되어 파일 위치의 고유성을 보장하도록 함수의 프로세스 ID를 추가합니다.

def func(text):
   # Ensure the file name's uniqueness by appending the function's process ID.
   file_path = '/tmp/content' + str(os.getpid())
   with open(file_path, "w") as file:
      file.write(text)
Copy

Snowflake에서 함수 만들기

다음을 지정하려면 CREATE FUNCTION 문을 실행해야 합니다.

  • 사용할 SQL 함수 이름입니다.

  • Python UDF가 호출될 때 호출할 Python 함수의 이름입니다.

UDF의 이름은 Python으로 작성된 핸들러 함수의 이름과 일치할 필요가 없습니다. CREATE FUNCTION 문은 UDF 이름을 Python 함수와 연관시킵니다.

UDF의 이름을 선택할 때:

  • 오브젝트 식별자 에 대한 규칙을 따르십시오.

  • 고유한 이름을 선택하거나 오버로딩 에 대한 규칙을 따르십시오.

    중요

    인자의 수 및 데이터 타입 둘 다를 기반으로 함수를 구별하는 SQL UDF의 오버로드와 달리 Python UDF는 오직 인자 수만을 기반으로 함수를 구별합니다.

함수 인자는 이름이 아닌 위치로 바인딩됩니다. UDF에 전달된 첫 번째 인자는 Python 함수에서 수신한 첫 번째 인자입니다.

인자의 데이터 타입에 대한 자세한 내용은 SQL-Python 데이터 타입 매핑 을 참조하십시오.

인라인 코드가 있는 UDF와 스테이지에서 업로드된 코드가 있는 UDF

Python UDF에 대한 코드는 다음 방법 중 하나로 지정할 수 있습니다.

  • 스테이지에서 업로드됨: CREATE FUNCTION 문은 스테이지 에서 기존 Python 소스 코드의 위치를 지정합니다.

  • 인라인: CREATE FUNCTION 문은 Python 소스 코드를 지정합니다.

인라인 Python UDF 만들기

인라인 UDF의 경우, CREATE FUNCTION 문의 일부로서 Python 소스 코드를 제공합니다.

예를 들어, 다음 문은 주어진 정수에 1을 더하는 인라인 Python UDF를 생성합니다.

create or replace function addone(i int)
returns int
language python
runtime_version = '3.8'
handler = 'addone_py'
as
$$
def addone_py(i):
  return i+1
$$;
Copy

Python 소스 코드는 AS 절에 지정됩니다. 소스 코드는 작은따옴표나 한 쌍의 달러 기호($$)로 묶을 수 있습니다. 소스 코드에 작은따옴표가 포함되어 있는 경우, 일반적으로 이중 달러 기호를 사용하는 것이 더 쉽습니다.

UDF를 호출합니다.

select addone(10);
Copy

출력은 다음과 같습니다.

+------------+
| ADDONE(10) |
|------------|
|         11 |
+------------+
Copy

Python 소스 코드는 둘 이상의 모듈, 그리고 한 모듈에 둘 이상의 함수를 포함할 수 있으므로 HANDLER 절은 호출할 모듈과 함수를 지정합니다.

인라인 Python UDF는 IMPORTS 절에 포함된 모듈의 코드를 호출할 수 있습니다.

CREATE FUNCTION 문의 구문에 대한 자세한 내용은 CREATE FUNCTION 을 참조하십시오.

더 많은 예제는 인라인 Python UDF 예제 를 참조하십시오.

스테이지에서 업로드된 코드로 Python UDF 만들기

다음 문은 스테이지 에서 업로드된 코드를 사용하여 간단한 Python UDF를 만듭니다. 파일을 호스팅하는 스테이지는 UDF 소유자 가 읽을 수 있어야 합니다. 또한 ZIP 파일이 자체 포함되어야 하며 실행할 추가 설정 스크립트에 의존하면 안 됩니다.

소스 코드가 포함된 sleepy.py 라는 Python 파일을 다음과 같이 만듭니다.

def snore(n):   # return a series of n snores
    result = []
    for a in range(n):
        result.append("Zzz")
    return result
Copy

SnowSQL(CLI 클라이언트) 를 시작하고 PUT 명령을 사용하여 파일을 로컬 파일 시스템에서 @~ 라는 기본 사용자 스테이지로 복사합니다. (PUT 명령은 Snowflake GUI를 통해 실행할 수 없습니다.)

put
file:///Users/Me/sleepy.py
@~/
auto_compress = false
overwrite = true
;
Copy

파일을 삭제하거나 이름을 바꾸면 더 이상 UDF를 호출할 수 없습니다. 파일을 업데이트해야 하는 경우에는 UDF를 호출할 수 없는 동안 업데이트하십시오. 이전 파일이 아직 스테이지에 있는 경우, PUT 명령에 OVERWRITE=TRUE 절이 포함되어야 합니다.

UDF를 만듭니다. 이 핸들러는 모듈과 함수를 지정합니다.

create or replace function dream(i int)
returns variant
language python
runtime_version = '3.8'
handler = 'sleepy.snore'
imports = ('@~/sleepy.py')
Copy

UDF를 호출합니다.

select dream(3);

+----------+
| DREAM(3) |
|----------|
| [        |
|   "Zzz", |
|   "Zzz", |
|   "Zzz"  |
| ]        |
+----------+
Copy

여러 가져오기 파일 지정하기

다음은 여러 가져오기 파일을 지정하는 방법을 보여주는 예제입니다.

create or replace function multiple_import_files(s string)
returns string
language python
runtime_version=3.8
imports=('@python_udf_dep/bar/python_imports_a.zip', '@python_udf_dep/foo/python_imports_b.zip')
handler='compute'
as
$$
def compute(s):
  return s
$$;
Copy

참고

지정된 가져오기 파일 이름은 서로 달라야 합니다. 예를 들어 imports=('@python_udf_dep/bar/python_imports.zip', '@python_udf_dep/foo/python_imports.zip') 은 작동하지 않습니다.

함수에 대한 권한 부여

함수 소유자 이외의 역할이 함수를 호출하려면 소유자가 역할에 적절한 권한을 부여해야 합니다.

Python UDF에 대한 GRANT 문은 JavaScript UDF와 같은 다른 UDF에 대한 GRANT 문과 본질적으로 동일합니다.

예:

GRANT USAGE ON FUNCTION my_python_udf(number, number) TO my_role;
Copy

인라인 Python UDF에서 가져온 패키지 사용하기

Anaconda에서 선별된 서드 파티 패키지 목록을 제공합니다. 자세한 내용은 서드 파티 패키지 사용하기 섹션을 참조하십시오.

참고

Snowflake 조직 관리자가 Snowflake 서드 파티 약관을 승인해야 Anaconda에서 제공하는 패키지를 사용할 수 있습니다. 자세한 내용은 시작하기 섹션을 참조하십시오.

다음 코드에서는 패키지를 가져오고 패키지의 버전을 반환하는 방법을 보여줍니다.

UDF를 만듭니다.

create or replace function py_udf()
returns variant
language python
runtime_version = 3.8
packages = ('numpy','pandas','xgboost==1.5.0')
handler = 'udf'
as $$
import numpy as np
import pandas as pd
import xgboost as xgb
def udf():
    return [np.__version__, pd.__version__, xgb.__version__]
$$;
Copy

UDF를 호출합니다.

select py_udf();
Copy

출력은 다음과 같습니다.

+-------------+
| PY_UDF()    |
|-------------|
| [           |
|   "1.19.2", |
|   "1.4.0",  |
|   "1.5.0"   |
| ]           |
+-------------+
Copy

스테이지에서 Python UDF로 파일 로딩하기

이 예제에서는 비 코드 파일을 스테이지에서 Python UDF로 가져오는 방법을 보여줍니다. 이 파일은 UDF 생성 중에 한 번만 읽히며 파일 읽기가 대상 처리기 외부에서 발생하는 경우 UDF 실행 중에 다시 읽히지 않습니다. 파일 읽기에 대한 자세한 내용은 UDF 핸들러로 파일 읽기 섹션을 참조하십시오.

참고

하위 폴더가 아닌 스테이지의 최상위 디렉터리에서만 파일을 가져올 수 있습니다.

UDF를 만듭니다.

create or replace function my_udf()
  returns string
  language python
  runtime_version=3.8
  imports=('@my_stage/file.txt')
  handler='compute'
as
$$
import sys
import os

with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
    s = f.read()

def compute():
    return s
$$;
Copy

스테이징된 파일의 압축 풀기

.zip 파일을 스테이지에 저장한 다음, Python zipfile 모듈을 사용하여 UDF 형식으로 압축을 풀 수 있습니다.

예를 들어 .zip 파일을 스테이지에 업로드한 다음, UDF를 만들 때 IMPORTS 절의 스테이징된 위치에서 .zip 파일을 참조할 수 있습니다. 런타임에 Snowflake는 스테이징된 파일을 코드에서 액세스할 수 있는 가져오기 디렉터리로 복사합니다.

파일 읽기 및 쓰기에 대한 자세한 내용은 UDF 핸들러로 파일 읽기 및 쓰기 섹션을 참조하십시오.

다음 예제에서 UDF 코드는 NLP 모델을 사용하여 텍스트에서 엔터티를 검색합니다. 코드는 이러한 엔터티로 구성된 배열을 반환합니다. 텍스트 처리를 위해 NLP 모델을 설정하기 위해, 코드는 먼저 zipfile 모듈을 사용하여 .zip 파일에서 모델(en_core_web_sm-2.3.1)을 위한 파일을 추출합니다. 그런 다음 이 코드는 spaCy 모듈을 사용하여 파일에서 모델을 로딩합니다.

이 코드는 이 함수를 호출하는 쿼리를 위해 생성된 /tmp 디렉터리에 추출된 파일 내용을 씁니다. 이 코드는 파일 잠금을 사용하여 추출이 Python 작업자 프로세스 간에 동기화되도록 합니다. 이와 같이, 내용은 한 번만 압축이 풀립니다. 파일 쓰기에 대한 자세한 내용은 UDF 핸들러로 파일 쓰기 섹션을 참조하십시오.

zipfile 모듈에 대한 자세한 내용은 zipfile 참조 를 확인해 보십시오. spaCy 모듈에 대한 자세한 내용은 spaCy API 설명서 를 참조하십시오.

UDF를 만듭니다.

create or replace function py_spacy(str string)
   returns array
   language python
   runtime_version = 3.8
   handler = 'func'
   packages = ('spacy')
   imports = ('@spacy_stage/spacy_en_core_web_sm.zip')
   as
$$
import fcntl
import os
import spacy
import sys
import threading
import zipfile

# File lock class for synchronizing write access to /tmp
class FileLock:
   def __enter__(self):
      self._lock = threading.Lock()
      self._lock.acquire()
      self._fd = open('/tmp/lockfile.LOCK', 'w+')
      fcntl.lockf(self._fd, fcntl.LOCK_EX)

   def __exit__(self, type, value, traceback):
      self._fd.close()
      self._lock.release()

# Get the location of the import directory. Snowflake sets the import
# directory location so code can retrieve the location via sys._xoptions.
IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

# Get the path to the ZIP file and set the location to extract to.
zip_file_path = import_dir + "spacy_en_core_web_sm.zip"
extracted = '/tmp/en_core_web_sm'

# Extract the contents of the ZIP. This is done under the file lock
# to ensure that only one worker process unzips the contents.
with FileLock():
   if not os.path.isdir(extracted + '/en_core_web_sm/en_core_web_sm-2.3.1'):
      with zipfile.ZipFile(zip_file_path, 'r') as myzip:
         myzip.extractall(extracted)

# Load the model from the extracted file.
nlp = spacy.load(extracted + "/en_core_web_sm/en_core_web_sm-2.3.1")

def func(text):
   doc = nlp(text)
   result = []

   for ent in doc.ents:
      result.append((ent.text, ent.start_char, ent.end_char, ent.label_))
   return result
$$;
Copy

Python UDF에서의 NULL 처리

다음 코드에서는 NULL 값이 처리되는 방식을 보여줍니다. 자세한 내용은 NULL 값 섹션을 참조하십시오.

UDF를 만듭니다.

create or replace function py_udf_null(a variant)
returns string
language python
runtime_version = 3.8
handler = 'udf'
as $$

def udf(a):
    if not a:
        return 'JSON null'
    elif getattr(a, "is_sql_null", False):
        return 'SQL null'
    else:
        return 'not null'
$$;
Copy

UDF를 호출합니다.

select py_udf_null(null);
select py_udf_null(parse_json('null'));
select py_udf_null(10);
Copy

출력은 다음과 같습니다.

+-------------------+
| PY_UDF_NULL(NULL) |
|-------------------|
| SQL null          |
+-------------------+

+---------------------------------+
| PY_UDF_NULL(PARSE_JSON('NULL')) |
|---------------------------------|
| JSON null                       |
+---------------------------------+

+-----------------+
| PY_UDF_NULL(10) |
|-----------------|
| not null        |
+-----------------+
Copy