Python UDF 처리기 예

이 항목에는 Python으로 작성된 UDF 처리기 코드의 간단한 예가 나와 있습니다.

Python을 사용하여 UDF 처리기를 만드는 방법에 대한 자세한 내용은 Python UDF 만들기 섹션을 참조하십시오.

runtime_version 을 코드에서 필요한 Python 런타임 버전으로 설정합니다. 지원되는 Python 버전은 다음과 같습니다.

  • 3.8

  • 3.9

  • 3.10

  • 3.11

인라인 처리기에서 패키지 가져오기

Anaconda에서 선별된 서드 파티 패키지 목록을 제공합니다. 자세한 내용은 서드 파티 패키지 사용하기 섹션을 참조하십시오.

참고

Snowflake 조직 관리자가 Snowflake 서드 파티 약관을 승인해야 Anaconda에서 제공하는 패키지를 사용할 수 있습니다. 자세한 내용은 Anaconda의 서드 파티 패키지 사용하기 섹션을 참조하십시오.

다음 코드에서는 패키지를 가져오고 패키지의 버전을 반환하는 방법을 보여줍니다.

UDF를 만듭니다.

CREATE OR REPLACE FUNCTION py_udf()
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('numpy','pandas','xgboost==1.5.0')
HANDLER = 'udf'
AS $$
import numpy as np
import pandas as pd
import xgboost as xgb
def udf():
  return [np.__version__, pd.__version__, xgb.__version__]
$$;
Copy

UDF를 호출합니다.

SELECT py_udf();
+-------------+
| PY_UDF()    |
|-------------|
| [           |
|   "1.19.2", |
|   "1.4.0",  |
|   "1.5.0"   |
| ]           |
+-------------+
Copy

파일 읽기

Python UDF 처리기 코드로 파일의 내용을 읽을 수 있습니다. 예를 들어 비정형 데이터를 처리하기 위해 파일을 읽을 수 있습니다.

파일의 내용은 다음과 같은 방법으로 읽을 수 있습니다.

IMPORTS를 사용하여 정적으로 지정된 파일 읽기

CREATE FUNCTION 명령의 IMPORTS 절에 파일 이름과 스테이지 이름을 지정하여 파일을 읽을 수 있습니다.

IMPORTS 절에 파일을 지정하면 Snowflake는 해당 파일을 스테이지에서 UDF의 홈 디렉터리 (가져오기 디렉터리 라고도 함)로 복사합니다. 이 디렉터리는 UDF가 파일을 읽는 디렉터리입니다.

Snowflake는 가져온 파일을 단일 디렉터리에 복사합니다. 해당 디렉터리의 모든 파일은 고유한 이름을 가져야 하므로 IMPORTS 절의 각 파일은 고유한 이름을 가져야 합니다. 이는 파일이 다른 스테이지 또는 스테이지 내의 다른 하위 디렉터리에서 시작하는 경우에도 적용됩니다.

참고

하위 폴더가 아닌 스테이지의 최상위 디렉터리에서만 파일을 가져올 수 있습니다.

다음 예에서는 my_stage 라는 스테이지에서 file.txt 라는 파일을 읽는 인라인 Python 처리기를 사용합니다. 처리기는 snowflake_import_directory 시스템 옵션과 함께 Python sys._xoptions 메서드를 사용하여 UDF의 홈 디렉터리 위치를 검색합니다.

Snowflake는 UDF 생성 중에 한 번만 파일을 읽으며 파일 읽기가 대상 처리기 외부에서 발생하는 경우 UDF 실행 중에 파일을 다시 읽지 않습니다.

인라인 처리기로 UDF를 만듭니다.

CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
IMPORTS=('@my_stage/file.txt')
HANDLER='compute'
AS
$$
import sys
import os

with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
  s = f.read()

def compute():
  return s
$$;
Copy

SnowflakeFile 을 사용하여 동적으로 지정된 파일 읽기

Snowpark snowflake.snowpark.files 모듈에서 SnowflakeFile 클래스를 사용하여 스테이지에서 파일을 읽을 수 있습니다. SnowflakeFile 클래스는 모든 크기의 파일을 스트리밍할 수 있는 동적 파일 액세스를 제공합니다. 동적 파일 액세스는 여러 파일에서 반복하려는 경우에도 유용합니다. 예를 들어 여러 파일 처리하기 섹션을 참조하십시오.

SnowflakeFile 클래스에는 파일을 열기 위한 메서드가 한 개 있는데, 바로 open 입니다. open 메서드는 Python의 IOBase 파일 오브젝트를 확장하는 SnowflakeFile 오브젝트를 반환합니다.

SnowflakeFile 오브젝트는 다음 IOBase, BufferedIOBaseRawIOBase 메서드를 지원합니다.

  • IOBase.fileno

  • IOBase.isatty

  • IOBase.readable

  • IOBase.readinto

  • IOBase.readline

  • IOBase.readlines

  • IOBase.seek

  • IOBase.seekable

  • IOBase.tell

  • BufferedIOBase.readinto1

  • RawIOBase.read

  • RawIOBase.readall

자세한 내용은 IOBase의 Python 3.8 설명서 를 참조하십시오. fileno 메서드와 같이 Snowflake 서버에서 지원되지 않는 메서드를 호출하면 오류가 반환됩니다.

참고

기본적으로 SnowflakeFile 을 사용한 파일 액세스에는 코드가 파일 주입 공격에 대한 복원력을 갖도록 하기 위해 범위가 지정된 URL이 필요합니다. 기본 제공 함수 BUILD_SCOPED_FILE_URL 을 사용하여 SQL에서 범위가 지정된 URL을 생성할 수 있습니다. 범위가 지정된 URL에 대한 자세한 내용은 파일 액세스에 사용할 수 있는 URL의 유형 섹션을 참조하십시오. 파일에 대한 액세스 권한이 있는 사용자만 범위가 지정된 URL을 만들 수 있습니다.

이 섹션의 예에서는 SnowflakeFile 을 사용하여 지정된 스테이지 위치에서 하나 이상의 파일을 읽습니다.

전제 조건

다음을 수행하여 코드에 스테이지의 파일을 사용할 수 있도록 해야 Python 처리기 코드가 해당 파일을 읽을 수 있습니다.

  1. 처리기가 사용할 수 있는 스테이지를 만듭니다.

    외부 스테이지 또는 내부 스테이지를 사용할 수 있습니다. 내부 스테이지를 사용하는 경우 호출자의 권한 저장 프로시저를 만들 계획일 때는 사용자 스테이지가 될 수 있습니다. 그렇지 않으면 명명된 스테이지를 사용해야 합니다. Snowflake는 현재 UDF 종속성에 대한 테이블 스테이지 사용을 지원하지 않습니다.

    스테이지 생성에 대한 자세한 내용은 CREATE STAGE 섹션을 참조하십시오. 내부 스테이지 유형 선택에 대한 자세한 내용은 로컬 파일을 위한 내부 스테이지 선택하기 섹션을 참조하십시오.

    사용 사례에 따라 스테이지에 대한 적절한 권한을 다음 역할에 할당해야 합니다.

    사용 사례

    역할

    UDF 또는 소유자의 권한 저장 프로시저

    실행 중인 UDF 또는 저장 프로시저를 소유하는 역할.

    호출자 권한 저장 프로시저

    사용자 역할.

    자세한 내용은 Granting Privileges for User-Defined Functions 섹션을 참조하십시오.

  2. 코드에서 읽을 파일을 스테이지로 복사합니다.

    PUT 명령을 사용하여 로컬 드라이브에서 내부 스테이지로 파일을 복사할 수 있습니다. PUT으로 파일을 스테이징하는 자세한 방법은 로컬 파일 시스템에서 데이터 파일 스테이징하기 섹션을 참조하십시오.

    클라우드 저장소 서비스에서 제공하는 도구를 사용하여 로컬 드라이브에서 외부 스테이지 위치로 파일을 복사할 수 있습니다. 도움말은 클라우드 저장소 서비스 설명서를 참조하십시오.

인라인 Python 처리기로 이미지의 지각 해시 계산하기

이 예에서는 SnowflakeFile 을 사용하여 스테이징된 이미지 파일 쌍을 읽고 각 파일의 지각 해시 (pHash)를 사용하여 이미지가 서로 얼마나 유사한지 확인합니다.

mode 인자에 rb 를 전달하여 입력 모드를 바이너리로 지정하여 이미지의 phash 값을 반환하는 UDF를 만듭니다.

CREATE OR REPLACE FUNCTION calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile

def run(file_path):
  with SnowflakeFile.open(file_path, 'rb') as f:
  return imagehash.average_hash(Image.open(f))
$$;
Copy

두 이미지의 phash 값 사이의 거리를 계산하는 두 번째 UDF를 만듭니다.

CREATE OR REPLACE FUNCTION calc_phash_distance(h1 string, h2 string)
RETURNS INT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('imagehash')
HANDLER = 'run'
as
$$
import imagehash

def run(h1, h2):
  return imagehash.hex_to_hash(h1) - imagehash.hex_to_hash(h2)
$$;
Copy

이미지 파일을 스테이징하고 디렉터리 테이블을 새로 고칩니다.

PUT file:///tmp/image1.jpg @images AUTO_COMPRESS=FALSE;
PUT file:///tmp/image2.jpg @images AUTO_COMPRESS=FALSE;

ALTER STAGE images REFRESH;
Copy

UDFs를 호출합니다.

SELECT
  calc_phash_distance(
    calc_phash(build_scoped_file_url(@images, 'image1.jpg')),
    calc_phash(build_scoped_file_url(@images, 'image2.jpg'))
  ) ;
Copy

UDTF로 CSV 파일 처리하기

이 예에서는 SnowflakeFile 을 사용하여 CSV 파일의 내용을 추출하고 테이블의 행을 반환하는 UDTF를 생성합니다.

인라인 처리기로 UDTF를 만듭니다.

CREATE FUNCTION parse_csv(file_path string)
RETURNS TABLE (col1 string, col2 string, col3 string)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'csvparser'
AS
$$
from snowflake.snowpark.files import SnowflakeFile

class csvparser:
  def process(self, stagefile):
    with SnowflakeFile.open(stagefile) as f:
      for line in f.readlines():
        lineStr = line.strip()
        row = lineStr.split(",")
        try:
          # Read the columns from the line.
          yield (row[1], row[0], row[2], )
        except:
          pass
$$;
Copy

CSV 파일을 스테이징하고 디렉터리 테이블을 새로 고칩니다.

PUT file:///tmp/sample.csv @data_stage AUTO_COMPRESS=FALSE;

ALTER STAGE data_stage REFRESH;
Copy

UDTF를 호출하여 파일 URL을 전달합니다.

SELECT * FROM TABLE(PARSE_CSV(build_scoped_file_url(@data_stage, 'sample.csv')));
Copy

여러 파일 처리하기

디렉터리 테이블의 RELATIVE_PATH 열을 처리기에 전달하여 여러 파일을 읽고 처리할 수 있습니다. RELATIVE_PATH 열에 대한 자세한 내용은 디렉터리 테이블 쿼리의 출력 을 참조하십시오.

참고

파일 크기 및 컴퓨팅 요구 사항에 따라 여러 파일을 읽고 처리하는 문을 실행하기 전에 ALTER WAREHOUSE 를 사용하여 웨어하우스를 확장할 수 있습니다.

여러 파일을 처리하는 UDF를 호출합니다.

다음 예에서는 CREATE TABLE 문 내에서 UDF를 호출하여 스테이지의 각 파일을 처리한 다음 결과를 새 테이블에 저장합니다.

데모 목적으로, 예제에서는 다음을 가정합니다.

  • my_stage 라는 스테이지에 여러 텍스트 파일이 있습니다.

  • 비정형 텍스트에 대한 감정 분석을 수행하는 get_sentiment 라는 기존 UDF가 있습니다. UDF는 텍스트 파일의 경로를 입력값으로 받아 감정을 나타내는 값을 반환합니다.

CREATE OR REPLACE TABLE sentiment_results AS
SELECT
  relative_path
  , get_sentiment(build_scoped_file_url(@my_stage, relative_path)) AS sentiment
FROM directory(@my_stage);
Copy
여러 파일을 처리하는 UDTF를 호출합니다.

다음 예에서는 parse_excel_udtf 라는 UDTF를 호출합니다. 이 예에서는 my_excel_stage 라는 스테이지의 디렉터리 테이블에서 relative_path 를 전달합니다.

SELECT t.*
FROM directory(@my_stage) d,
table(parse_excel_udtf(build_scoped_file_url(@my_excel_stage, relative_path)) t;
Copy

스테이지 URI와 URL이 있는 파일 읽기

SnowflakeFile 을 사용한 파일 액세스에는 기본적으로 범위가 지정된 URL이 필요합니다. 이를 통해 코드를 파일 주입 공격에 대해 복원력 있게 만들 수 있습니다. 그러나 대신 스테이지 URI 또는 스테이지 URL을 사용하여 파일 위치를 참조할 수 있습니다. 그러려면 require_scoped_url = False 키워드 인자로 SnowflakeFile.open 메서드를 호출해야 합니다.

이 옵션은 호출자가 UDF 소유자만 액세스할 수 있는 URI를 제공할 수 있도록 하려는 경우에 유용합니다. 예를 들어 UDF를 소유하고 구성 파일 또는 머신 러닝 모델을 읽으려는 경우 파일 액세스를 위해 스테이지 URI를 사용할 수 있습니다. 사용자 입력을 기반으로 생성된 파일과 같이 이름을 예측할 수 없는 파일로 작업할 때는 이 옵션을 권장하지 않습니다.

이 예제에서는 파일에서 머신 러닝 모델을 읽고 함수에서 이 모델을 사용하여 감정 분석을 위한 자연어 처리를 수행합니다. 이 예에서는 require_scoped_url = Falseopen 을 호출합니다. 두 파일 위치 형식(스테이지 URI 및 스테이지 URL)에서 모두 UDF 소유자는 모델 파일에 액세스할 수 있어야 합니다.

인라인 처리기로 UDF를 만듭니다.

CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle

def run(input_data):
  model_file = '@models/NLP_model.pickle'
  # Specify 'mode = rb' to open the file in binary mode.
  with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
    model = pickle.load(f)
    return model.predict([input_data])[0]
$$;
Copy

모델 파일을 스테이징하고 디렉터리 테이블을 새로 고칩니다.

PUT file:///tmp/NLP_model.pickle @models AUTO_COMPRESS=FALSE;

ALTER STAGE models REFRESH;
Copy

또는 모델의 스테이지 URL로 UDF를 지정하여 감정을 추출할 수 있습니다.

예를 들어 스테이지 URL을 사용하여 파일을 지정하는 인라인 처리기가 있는 UDF를 생성합니다.

CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle

def run(input_data):
  model_file = 'https://my_account/api/files/my_db/my_schema/models/NLP_model.pickle'
  # Specify 'rb' to open the file in binary mode.
  with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
    model = pickle.load(f)
    return model.predict([input_data])[0]
$$;
Copy

입력 데이터로 UDF를 호출합니다.

SELECT extract_sentiment('I am writing to express my interest in a recent posting made.');
Copy

파일 쓰기

UDF 처리기는 UDF를 호출하는 쿼리에 대해 생성된 /tmp 디렉터리에 파일을 쓸 수 있습니다.

/tmp 디렉터리는 단일 호출 쿼리용으로 따로 마련된 것이지만, 여러 Python 작업자 프로세스가 동시에 실행 중일 수도 있습니다. 충돌을 방지하려면 /tmp 디렉터리에 대한 액세스가 다른 Python 작업자 프로세스와 동기화되거나 /tmp에 기록된 파일의 이름이 고유하도록 해야 합니다.

예제 코드는 이 항목의 스테이징된 파일의 압축 풀기 섹션을 참조하십시오.

다음 예제의 코드는 입력 text/tmp 디렉터리에 씁니다. 또한 파일 위치의 고유성을 보장하도록 함수의 프로세스 ID를 추가합니다.

def func(text):
   # Append the function's process ID to ensure the file name's uniqueness.
   file_path = '/tmp/content' + str(os.getpid())
   with open(file_path, "w") as file:
      file.write(text)
Copy

스테이징된 파일의 압축 풀기

.zip 파일을 스테이지에 저장한 다음, Python zipfile 모듈을 사용하여 UDF 형식으로 압축을 풀 수 있습니다.

예를 들어 .zip 파일을 스테이지에 업로드한 다음, UDF를 만들 때 IMPORTS 절의 스테이징된 위치에서 .zip 파일을 참조할 수 있습니다. 런타임에 Snowflake는 스테이징된 파일을 코드에서 액세스할 수 있는 가져오기 디렉터리로 복사합니다.

파일 읽기 및 쓰기에 대한 자세한 내용은 파일 읽기파일 쓰기 섹션을 참조하십시오.

다음 예제에서 UDF 코드는 NLP 모델을 사용하여 텍스트에서 엔터티를 검색합니다. 코드는 이러한 엔터티로 구성된 배열을 반환합니다. 텍스트 처리를 위해 NLP 모델을 설정하기 위해, 코드는 먼저 zipfile 모듈을 사용하여 .zip 파일에서 모델(en_core_web_sm-2.3.1)을 위한 파일을 추출합니다. 그런 다음 이 코드는 spaCy 모듈을 사용하여 파일에서 모델을 로딩합니다.

이 코드는 이 함수를 호출하는 쿼리를 위해 생성된 /tmp 디렉터리에 추출된 파일 내용을 씁니다. 이 코드는 파일 잠금을 사용하여 추출이 Python 작업자 프로세스 간에 동기화되도록 합니다. 이와 같이, 내용은 한 번만 압축이 풀립니다. 파일 쓰기에 대한 자세한 내용은 파일 쓰기 섹션을 참조하십시오.

zipfile 모듈에 대한 자세한 내용은 zipfile 참조 를 확인해 보십시오. spaCy 모듈에 대한 자세한 내용은 spaCy API 설명서 를 참조하십시오.

인라인 처리기로 UDF를 만듭니다.

CREATE OR REPLACE FUNCTION py_spacy(str string)
RETURNS ARRAY
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'func'
PACKAGES = ('spacy')
IMPORTS = ('@spacy_stage/spacy_en_core_web_sm.zip')
AS
$$
import fcntl
import os
import spacy
import sys
import threading
import zipfile

 # File lock class for synchronizing write access to /tmp.
 class FileLock:
   def __enter__(self):
       self._lock = threading.Lock()
       self._lock.acquire()
       self._fd = open('/tmp/lockfile.LOCK', 'w+')
       fcntl.lockf(self._fd, fcntl.LOCK_EX)

    def __exit__(self, type, value, traceback):
       self._fd.close()
       self._lock.release()

 # Get the location of the import directory. Snowflake sets the import
 # directory location so code can retrieve the location via sys._xoptions.
 IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
 import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

 # Get the path to the ZIP file and set the location to extract to.
 zip_file_path = import_dir + "spacy_en_core_web_sm.zip"
 extracted = '/tmp/en_core_web_sm'

 # Extract the contents of the ZIP. This is done under the file lock
 # to ensure that only one worker process unzips the contents.
 with FileLock():
    if not os.path.isdir(extracted + '/en_core_web_sm/en_core_web_sm-2.3.1'):
       with zipfile.ZipFile(zip_file_path, 'r') as myzip:
          myzip.extractall(extracted)

 # Load the model from the extracted file.
 nlp = spacy.load(extracted + "/en_core_web_sm/en_core_web_sm-2.3.1")

 def func(text):
    doc = nlp(text)
    result = []

    for ent in doc.ents:
       result.append((ent.text, ent.start_char, ent.end_char, ent.label_))
    return result
 $$;
Copy

NULL 값 처리하기

다음 코드에서는 NULL 값이 처리되는 방식을 보여줍니다. 자세한 내용은 NULL 값 섹션을 참조하십시오.

UDF를 만듭니다.

CREATE OR REPLACE FUNCTION py_udf_null(a variant)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'udf'
AS $$

def udf(a):
    if not a:
        return 'JSON null'
    elif getattr(a, "is_sql_null", False):
        return 'SQL null'
    else:
        return 'not null'
$$;
Copy

UDF를 호출합니다.

SELECT py_udf_null(null);
SELECT py_udf_null(parse_json('null'));
SELECT py_udf_null(10);
+-------------------+
| PY_UDF_NULL(NULL) |
|-------------------|
| SQL null          |
+-------------------+

+---------------------------------+
| PY_UDF_NULL(PARSE_JSON('NULL')) |
|---------------------------------|
| JSON null                       |
+---------------------------------+

+-----------------+
| PY_UDF_NULL(10) |
|-----------------|
| not null        |
+-----------------+
Copy