Python 커넥터와 함께 Snowflake SQLAlchemy 도구 키트 사용하기

Python용 Snowflake 커넥터 뿐만 아니라 Snowflake SQLAlchemy는 Snowflake 데이터베이스와 SQLAlchemy 애플리케이션을 연결하는 언어 로 실행됩니다.

이 항목의 내용:

전제 조건

Python용 Snowflake 커넥터

Snowflake SQLAlchemy의 유일한 요구 사항은 Python용 Snowflake Connector이지만, Snowflake SQLAlchemy를 설치하면 커넥터가 자동으로 설치되므로 커넥터를 설치하지 않아도 됩니다.

데이터 분석 및 웹 애플리케이션 프레임워크(선택 사항)

Snowflake SQLAlchemy는 pandas, JupyterPyramid 와 함께 사용할 수 있으며, 이를 통해 데이터 분석 및 웹 애플리케이션을 위한 더 높은 수준의 애플리케이션 프레임워크를 제공합니다. 그러나 특히 초보 사용자의 경우 처음부터 작업 환경을 구축하는 것이 쉬운 일이 아닙니다. 프레임워크를 설치하려면 C 컴파일러 및 도구가 필요하며, 올바른 도구와 버전을 선택하는 것은 사용자의 Python 애플리케이션 사용에 방해가 되는 요소가 됩니다.

보다 편리한 환경 구축 방법은 Anaconda 를 사용하는 것이며, 이 애플리케이션은 데이터 분석가 및 학생과 같은 비 Python 전문가 등 모든 사용자에게 사전 컴파일된 완전한 기술 스택을 제공합니다. Anaconda 설치 지침은 Anaconda 설치 설명서 를 참조하십시오. Snowflake SQLAlchemy 패키지는 pip 를 사용하여 Anaconda의 상단에 설치할 수 있습니다.

Snowflake SQLAlchemy 설치하기

Snowflake SQLAlchemy 패키지는 pip 를 사용하여 공용 PyPI 리포지토리에서 설치할 수 있습니다.

pip install --upgrade snowflake-sqlalchemy
Copy

pip 는 Python용 Snowflake Connector 등 필요한 모든 모듈을 자동으로 설치합니다.

개발자 노트는 GitHub 에서 소스 코드와 함께 호스팅됩니다.

설치 확인하기

  1. 다음 Python 샘플 코드가 포함된 파일(예: validate.py)을 생성합니다. 이 파일은 Snowflake에 연결하고 Snowflake 버전을 표시합니다.

    #!/usr/bin/env python
    from sqlalchemy import create_engine
    
    engine = create_engine(
        'snowflake://{user}:{password}@{account_identifier}/'.format(
            user='<user_login_name>',
            password='<password>',
            account_identifier='<account_identifier>',
        )
    )
    try:
        connection = engine.connect()
        results = connection.execute('select current_version()').fetchone()
        print(results[0])
    finally:
        connection.close()
        engine.dispose()
    
    Copy
  2. <사용자_로그인_이름>, <비밀번호><계정_식별자> 를 Snowflake 계정 및 사용자에 대한 적절한 값으로 바꿉니다. 자세한 내용은 이 항목의 연결 매개 변수 섹션을 참조하십시오.

  3. 샘플 코드를 실행합니다. 예를 들어, validate.py 파일을 생성한 경우:

    python validate.py
    
    Copy

Snowflake 버전(예: 1.48.0)이 표시되어야 합니다.

Snowflake 고유 매개 변수 및 동작

Snowflake SQLAlchemy는 SQLAlchemy 애플리케이션을 위한 호환 기능을 최대한 많이 제공합니다. SQLAlchemy 사용에 대한 정보는 SQLAlchemy 설명서 를 참조하십시오.

그러나 Snowflake SQLAlchemy는 다음 섹션에서 설명하는 Snowflake 관련 매개 변수 및 동작도 제공합니다.

연결 매개 변수

필수 매개 변수

Snowflake SQLAlchemy는 다음 연결 문자열 구문을 사용하여 Snowflake에 연결하고 세션을 시작합니다.

'snowflake://<user_login_name>:<password>@<account_identifier>'
Copy

여기서:

  • <사용자_로그인_이름> 은 Snowflake 사용자의 로그인 이름입니다.

  • <비밀번호> 는 Snowflake 사용자의 비밀번호입니다.

  • <계정_식별자> 는 계정 식별자입니다. 계정 식별자 섹션을 참조하십시오.

    참고

    snowflakecomputing.com 도메인 이름을 계정 식별자의 일부로 포함하지 마십시오. Snowflake는 계정 식별자에 도메인 이름을 자동으로 추가하여 필요한 연결을 생성합니다.

추가 연결 매개 변수

연결 문자열의 마지막(<계정_이름> 뒤)에 다음 추가 정보를 선택적으로 포함할 수 있습니다.

'snowflake://<user_login_name>:<password>@<account_identifier>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'
Copy

여기서:

  • <데이터베이스_이름><스키마_이름> 은 슬래시(/)로 구분된 Snowflake 세션의 초기 데이터베이스 및 스키마입니다.

  • warehouse=<웨어하우스_이름>role=<역할_이름>' 은 세션의 초기 웨어하우스 및 역할이며, 물음표(?)로 구분된 매개 변수 문자열로 지정됩니다.

참고

로그인한 후에는, 연결 문자열에 지정된 세션에 대한 초기 데이터베이스, 스키마, 웨어하우스 및 역할을 언제라도 변경할 수 있습니다.

프록시 서버 구성

프록시 서버 매개 변수는 지원되지 않습니다. 대신, 지원되는 환경 변수를 사용하여 프록시 서버를 구성할 수 있습니다. 자세한 내용은 프록시 서버 사용하기 섹션을 참조하십시오.

연결 문자열의 예

다음 예에서는 사용자 이름 testuser1, 비밀번호 0123456, 계정 식별자 myorganization-myaccount, 데이터베이스 testdb, 스키마 public, 웨어하우스 testwh 및 역할 myrolecreate_engine 메서드를 호출합니다.

from sqlalchemy import create_engine
engine = create_engine(
    'snowflake://testuser1:0123456@myorganization-myaccount/testdb/public?warehouse=testwh&role=myrole'
)
Copy

편의를 위해, snowflake.sqlalchemy.URL 메서드를 사용하여 연결 문자열을 구성하고 데이터베이스에 연결할 수 있습니다. 다음 예에서는 이전 예에서와 동일한 연결 문자열을 구성합니다.

from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = '0123456',
    database = 'testdb',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
))
Copy

연결하기 및 연결 종료하기

engine.connect() 를 실행하여 연결을 엽니다. engine.execute() 를 사용하지 마십시오.

# Avoid this.
engine = create_engine(...)
engine.execute(<SQL>)
engine.dispose()

# Do this.
engine = create_engine(...)
connection = engine.connect()
try:
    connection.execute(<SQL>)
finally:
    connection.close()
    engine.dispose()
Copy

참고

connection.close() 후에 engine.dispose() 를 실행하여 연결을 종료해야 합니다. 그렇지 않으면, Python Garbage 수집기가 Snowflake와의 통신에 필요한 리소스를 제거하여 Python 커넥터가 세션을 올바르게 종료할 수 없습니다.

명시적 트랜잭션을 사용하려면 SQLAlchemy 에서 AUTOCOMMIT 실행 옵션 을 비활성화해야 합니다.

기본적으로 SQLAlchemy에서는 이 옵션이 활성화됩니다. 이 옵션이 활성화되면 INSERT, UPDATE 및 DELETE 문이 명시적 트랜잭션 내에서 실행되는 경우에도 실행 시 자동으로 커밋됩니다.

AUTOCOMMIT를 비활성화하려면, autocommit=FalseConnection.execution_options() 메서드로 전달합니다. 예:

# Disable AUTOCOMMIT if you need to use an explicit transaction.
with engine.connect().execution_options(autocommit=False) as connection:

  try:
    connection.execute("BEGIN")
    connection.execute("INSERT INTO test_table VALUES (88888, 'X', 434354)")
    connection.execute("INSERT INTO test_table VALUES (99999, 'Y', 453654654)")
    connection.execute("COMMIT")
  except Exception as e:
    connection.execute("ROLLBACK")
  finally:
    connection.close()

engine.dispose()
Copy

자동 증분 동작

값의 자동 증가를 위해서는 Sequence 오브젝트가 필요합니다. 기본 키 열에 Sequence 오브젝트를 포함하면 새 레코드가 삽입될 때마다 값이 자동으로 증가합니다. 예:

t = Table('mytable', metadata,
    Column('id', Integer, Sequence('id_seq'), primary_key=True),
    Column(...), ...
)
Copy

오브젝트 이름 대/소문자 처리

Snowflake에서는 대/소문자를 구분하지 않는 모든 오브젝트 이름을 대문자로 저장합니다. 대조적으로, SQLAlchemy에서는 모든 소문자 오브젝트 이름에서 대/소문자를 구분하지 않는 것으로 간주합니다. Snowflake SQLAlchemy는 스키마 수준 통신 중에(예: 테이블 및 인덱스 반영 중에) 오브젝트 이름 대/소문자 변환을 수행합니다. 대문자 오브젝트 이름을 사용하는 경우 SQLAlchemy는 오브젝트에서 대/소문자를 구분하는 것으로 가정하고 이름을 따옴표로 묶습니다. 이 동작은 Snowflake에 수신된 데이터 딕셔너리 데이터와 일치하지 않을 수 있으므로, 식별자 이름이 따옴표(예: "TestDb")를 사용하여 대/소문자를 구분하도록 생성되지 않은 경우 모든 소문자 이름을 SQLAlchemy 측에서 사용해야 합니다.

인덱스 지원

Snowflake에서는 인덱스를 사용하지 않으므로 Snowflake SQLAlchemy에서도 사용하지 않습니다.

Numpy 데이터 타입 지원

Snowflake SQLAlchemy는 NumPy 데이터 타입의 바인딩 및 가져오기를 지원합니다. 바인딩은 항상 지원됩니다. NumPy 데이터 타입 가져오기를 활성화하려면, numpy=True 를 연결 매개 변수에 추가합니다.

지원되는 NumPy 데이터 타입은 다음과 같습니다.

  • numpy.int64

  • numpy.float64

  • numpy.datetime64

다음 예는 numpy.datetime64 데이터의 왕복을 보여줍니다.

import numpy as np
import pandas as pd
engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    numpy=True,
))

specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z')

connection = engine.connect()
connection.execute(
    "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)")
connection.execute(
    "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,)
)
df = pd.read_sql_query("SELECT * FROM ts_tbl", engine)
assert df.c1.values[0] == specific_date
Copy

열 메타데이터 캐시하기

SQLAlchemy는 런타임 검사 API 를 제공하여 다양한 오브젝트에 대한 런타임 정보를 가져올 수 있습니다. 일반적인 사용 사례 중 하나는 스키마 카탈로그를 구성하기 위해 스키마의 모든 테이블과 해당 열 메타데이터를 가져오는 것입니다. 예를 들어, SQLAlchemy 상단의 alembic 은 데이터베이스 스키마 마이그레이션을 관리합니다. 의사 코드 흐름은 다음과 같습니다.

inspector = inspect(engine)
schema = inspector.default_schema_name
for table_name in inspector.get_table_names(schema):
    column_metadata = inspector.get_columns(table_name, schema)
    primary_keys = inspector.get_primary_keys(table_name, schema)
    foreign_keys = inspector.get_foreign_keys(table_name, schema)
    ...
Copy

이 흐름에서 잠재적인 문제는 쿼리가 각 테이블에서 실행될 때 꽤 오랜 시간이 걸릴 수 있다는 점입니다. 결과는 캐시되지만 열 메타데이터를 가져오기 위한 비용이 많이 소요됩니다.

이러한 문제를 완화하기 위해 Snowflake SQLAlchemy에서는 cache_column_metadata=True 플래그를 가져와 get_table_names 이 호출될 때 모든 테이블의 모든 열 메타데이터가 캐시되고 나머지 get_columns, get_primary_keysget_foreign_keys 가 캐시를 사용할 수 있도록 합니다.

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    cache_column_metadata=True,
))
Copy

참고

Inspector 오브젝트 관련된 모든 열 메타 데이터가 캐시에 저장되므로 메모리 사용량이 향상됩니다. 플래그는 모든 열 메타데이터를 가져와야 하는 경우에만 사용해야 합니다.

VARIANT, ARRAY 및 OBJECT 지원

Snowflake SQLAlchemy 는 VARIANT, ARRAYOBJECT 데이터 타입의 가져오기를 지원합니다. 모든 타입은 Python에서 str 로 변환되므로 json.loads 를 사용하여 기본 데이터 타입으로 변환할 수 있습니다.

이 예에서는 VARIANT, ARRAYOBJECT 데이터 타입 열을 포함하는 테이블을 생성하는 방법을 보여줍니다.

from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT)
...
t = Table('my_semi_structured_datatype_table', metadata,
    Column('va', VARIANT),
    Column('ob', OBJECT),
    Column('ar', ARRAY))
metdata.create_all(engine)
Copy

VARIANT, ARRAYOBJECT 데이터 타입 열을 검색하고 이를 기본 Python 데이터 타입으로 변환하려면 데이터를 가져온 후 다음과 같이 json.loads 메서드를 호출합니다.

import json
connection = engine.connect()
results = connection.execute(select([t]))
row = results.fetchone()
data_variant = json.loads(row[0])
data_object  = json.loads(row[1])
data_array   = json.loads(row[2])
Copy

CLUSTER BY 지원

Snowflake SQLAlchemy는 테이블에 대한 CLUSTER BY 매개 변수를 지원합니다. 이 매개 변수에 대한 자세한 내용은 CREATE TABLE 를 참조하십시오.

이 예에서는 2개의 idname 열이 클러스터링 키로 포함된 테이블을 생성하는 방법을 보여줍니다.

t = Table('myuser', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    snowflake_clusterby=['id', 'name'], ...
)
metadata.create_all(engine)
Copy

Alembic 지원

AlembicSQLAlchemy 상단의 데이터에비스 마이그레이션 도구입니다. Snowflake SQLAlchemy는 Alembic이 Snowflake SQLAlchemy를 인식할 수 있도록 alembic/env.py 에 다음 코드를 추가하여 작동합니다.

from alembic.ddl.impl import DefaultImpl

class SnowflakeImpl(DefaultImpl):
    __dialect__ = 'snowflake'
Copy

일반적인 사용법은 Alembic 설명서 를 참조하십시오.

키 페어 인증 지원

Snowflake SQLAlchemy는 Python용 Snowflake 커넥터의 기능을 활용하여 키 페어 인증을 지원합니다. 개인 키와 공개 키를 만드는 단계는 키 페어 인증 및 키 페어 순환 사용하기 섹션을 참조하십시오.

개인 키 매개 변수는 다음과 같이 connect_args 를 통해 전달됩니다.

...
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization

with open("rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

engine = create_engine(URL(
    account='abc123',
    user='testuser1',
    ),
    connect_args={
        'private_key': pkb,
        },
    )
Copy

여기서 PRIVATE_KEY_PASSPHRASE 는 개인 키 파일 rsa_key.p8 을 암호 해독하기 위한 암호 구문입니다.

snowflake.sqlalchemy.URL 메서드는 개인 키 매개 변수를 지원하지 않습니다.

병합 명령 지원

Snowflake SQLAlchemy는 MergeInto 사용자 지정 식을 사용한 upsert 수행을 지원합니다. 전체 설명서는 MERGE 섹션을 참조하십시오.

다음과 같이 사용하십시오.

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import MergeInto

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
t1 = meta.tables['t1']
t2 = meta.tables['t2']

merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key)
merge.when_matched_then_delete().where(t2.c.marked == 1)
merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus)
merge.when_matched_then_update().values(val=t2.c.newval)
merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus)
connection.execute(merge)
Copy

CopyIntoStorage 지원

Snowflake SQLAlchemy는 사용자 지정 CopyIntoStorage 식을 사용하여 테이블과 쿼리 결과를 다양한 Snowflake 스테이지, Azure Container, AWS 버킷에 저장하는 기능을 지원합니다. 전체 설명서는 COPY INTO <위치> 섹션을 참조하십시오.

다음과 같이 사용하십시오.

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
users = meta.tables['users']

copy_into = CopyIntoStorage(from_=users,
                            into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'),
                            formatter=CSVFormatter().null_if(['null', 'Null']))
connection.execute(copy_into)
Copy