Python Connector 사용하기

이 항목에서는 Snowflake 커넥터를 사용하여 사용자 로그인, 데이터베이스 및 테이블 생성, 웨어하우스 생성, 데이터 삽입/로드 및 쿼리 등의 표준 Snowflake 작업을 수행하기 위한 방법을 보여주는 일련의 예를 제공합니다.

이 항목의 마지막에 제공되는 샘플 코드는 예시를 작동하는 단일 Python 프로그램에 통합합니다.

이 항목의 내용:

SnowCD를 사용한 Snowflake로의 네트워크 연결 확인하기

드라이버를 구성한 후에는 SnowCD 를 사용하여 Snowflake로의 네트워크 연결을 평가하고 문제를 해결할 수 있습니다.

초기 구성 프로세스 및 언제라도 필요할 때 SnowCD를 사용하여 Snowflake로의 네트워크 연결을 평가하고 문제를 해결할 수 있습니다.

Snowflake에 연결하기

snowflake.connector 모듈 가져오기:

import snowflake.connector

환경 변수, 명령줄, 구성 파일 또는 다른 적절한 소스에서 로그인 정보를 읽습니다. 예:

PASSWORD = os.getenv('SNOWSQL_PWD')
WAREHOUSE = os.getenv('WAREHOUSE')
...

ACCOUNT 매개 변수에 계정 식별자 를 사용합니다. 계정 식별자에는 snowflakecomputing.com 접미사가 포함되지 않는다는 점에 유의하십시오.

자세한 내용 및 예는 account 매개 변수에 대한 사용법 노트(connect 메서드용) 를 참조하십시오.

참고

사용할 수 있는 커넥터 매개 변수에 대한 설명은 snowflake.connector 메서드 를 참조하십시오.

If you copy data from your own Amazon S3 bucket, then you need the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

import os

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

참고

데이터가 Microsoft Azure 컨테이너에 저장되는 경우에는 COPY 문에 자격 증명을 직접 입력하십시오.

연결 정보를 읽은 후, 기본 인증자 또는 페더레이션 인증(활성화된 경우)을 사용하여 연결합니다.

세션 매개 변수 설정하기

Python Connector를 사용할 때는 QUERY_TAG 등의 세션 매개 변수를 여러 가지 방법으로 설정할 수 있습니다.

세션 수준의 매개 변수는 Snowflake에 연결할 때 설정할 수 있습니다. 이 작업을 수행하려면, 아래와 같이 선택적 연결 매개 변수인 《세션_매개 변수》를 전달합니다.

con = snowflake.connector.connect(
    user='XXXX',
    password='XXXX',
    account='XXXX',
    session_parameters={
        'QUERY_TAG': 'EndOfMonthFinancials',
    }
)

connect() 메서드로 전달된 세션_매개 변수 딕셔너리에는 1개 이상의 세션 수준 매개 변수가 포함될 수 있습니다.

또한, 연결 후에 ALTER SESSION SET ... SQL 문을 실행하여 세션 매개 변수를 설정할 수도 있습니다.

con.cursor().execute("ALTER SESSION SET QUERY_TAG = 'EndOfMonthFinancials'")

세션 매개 변수에 대한 자세한 내용은 일반 매개 변수 페이지의 개별 매개 변수 설명을 참조하십시오.

기본 인증자를 사용하여 연결하기

로그인 매개 변수를 사용하여 Snowflake에 연결:

            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )

기타 정보를 사용하여 확장해야 할 수 있습니다.

인증에서 Single Sign-On(SSO) 사용하기

Snowflake에서 Single Sign-On(SSO) 을 사용하도록 구성한 경우, 인증에서 SSO를 사용하도록 클라이언트 애플리케이션을 구성할 수 있습니다. 자세한 내용은 Snowflake에 연결하는 클라이언트 애플리케이션에서 SSO 사용하기 섹션을 참조하십시오.

다단계 인증(MFA) 사용하기

Snowflake는 MFA 토큰 캐싱을 SSO와 결합하는 등의 캐싱 MFA 토큰을 지원합니다.

자세한 내용은 MFA 토큰 캐싱을 사용하여 인증 도중 프롬프트 수 최소화하기 — 선택 사항 섹션을 참조하십시오.

키 페어 인증 & 키 페어 순환 사용하기

Python Connector는 키 페어 인증 및 키 순환을 지원합니다.

키 페어 인증 및 키 순환을 구성하는 방법에 대한 자세한 내용은 키 페어 인증 & 키 페어 순환 을 참조하십시오.

키 페어 인증 구성을 완료한 후에는 connect 함수에서 private_key 매개 변수를 개인 키 파일의 경로로 설정합니다.

  1. 아래 샘플 코드를 수정하여 실행합니다. 이 코드는 개인 키 파일의 암호를 해독하여 Snowflake 드라이버로 전달하여 연결을 생성합니다.

샘플 코드

import snowflake.connector
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization
with open("<path>/rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

ctx = snowflake.connector.connect(
    user='<user>',
    account='<account_identifier>',
    private_key=pkb,
    warehouse=WAREHOUSE,
    database=DATABASE,
    schema=SCHEMA
    )

cs = ctx.cursor()

프록시 서버 사용하기

프록시 서버를 사용하려면 다음 환경 변수를 구성합니다.

  • HTTP_PROXY

  • HTTPS_PROXY

  • NO_PROXY

참고

프록시 매개 변수(즉, proxy_host, proxy_port, proxy_userproxy_password)는 더 이상 사용되지 않습니다. 대신, 환경 변수를 사용하십시오.

예:

Linux 또는 macOS
export HTTP_PROXY='http://username:password@proxyserver.company.com:80'
export HTTPS_PROXY='http://username:password@proxyserver.company.com:80'
Windows
set HTTP_PROXY=http://username:password@proxyserver.company.com:80
set HTTPS_PROXY=http://username:password@proxyserver.company.com:80

Snowflake의 보안 모델에서는 보안 소켓 레이어(SSL) 프록시(HTTPS 인증서 사용)를 사용할 수 없습니다. 프록시 서버는 공개적으로 사용 가능한 인증 기관(CA)을 사용해야 하며, 손상된 프록시를 통한 MITM(메시지 가로채기) 공격과 같은 잠재적인 보안 위험을 줄여야 합니다.

SSL 프록시를 반드시 사용해야 하는 경우 통신 중에 인증서가 변경되지 않도록 Snowflake 인증서를 통과하도록 서버 정책을 업데이트하는 것이 좋습니다.

선택 사항으로, NO_PROXY 를 사용하여 특정 통신에 대한 프록시를 우회할 수 있습니다. 예를 들어, Amazon S3에 대한 액세스는 NO_PROXY=".amazonaws.com" 을 지정하여 프록시 서버를 우회할 수 있습니다.

NO_PROXY 에서는 와일드카드를 지원하지 않습니다. 지정된 각 값은 다음 중 하나여야 합니다.

  • 호스트 이름의 끝(또는 전체 호스트 이름), 예:

    • .amazonaws.com

    • myorganization-myaccount.snowflakecomputing.com

  • IP 주소, 예:

    • 192.196.1.15

2개 이상의 값이 지정된 경우, 값은 쉼표로 구분되어야 합니다. 예:

localhost,.my_company.com,.snowflakecomputing.com,192.168.1.15,192.168.1.16

OAuth를 사용하여 연결하기

OAuth를 사용하여 연결하려면 연결 문자열에는 oauth 로 설정된 authenticator 매개 변수와 oauth_access_token 으로 설정된 token 매개 변수가 반드시 포함되어야 합니다. 자세한 내용은 클라이언트, 드라이버 및 커넥터를 사용한 OAuth 섹션을 참조하십시오.

ctx = snowflake.connector.connect(
    user="<username>",
    host="<hostname>",
    account="<account_identifier>",
    authenticator="oauth",
    token="<oauth_access_token>",
    warehouse="test_warehouse",
    database="test_db",
    schema="test_schema"
)

OCSP

드라이버가 연결되면 Snowflake는 Snowflake를 가장하는 호스트가 아니라 Snowflake에 연결되었음을 확인하는 인증서를 전송합니다. 드라이버는 해당 인증서를 OCSP(온라인 인증서 상태 프로토콜) 서버로 전송하여 인증서가 해지되지 않았는지 확인합니다.

드라이버가 인증서를 확인하기 위해 OCSP 서버에 연결할 수 없는 경우 드라이버는 《페일 오픈》 또는 《페일 클로즈》 메시지를 표시할 수 있습니다.

페일 오픈 또는 페일 클로즈 모드 선택하기

1.8.0 이전의 Python용 Snowflake 커넥터 버전은 기본적으로 페일 클로즈 모드로 설정되어 있습니다. 1.8.0 이상 버전의 경우에는 기본적으로 페일 오픈입니다. 사용자는 connect() 메서드를 호출할 때 선택적 연결 매개 변수인 ocsp_fail_open 을 설정하여 기본 동작을 재정의할 수 있습니다. 예:

con = snowflake.connector.connect(
    account=<account_identifier>,
    user=<user>,
    ...,
    ocsp_fail_open=False,
    ...);

OCSP 커넥터 또는 드라이버 버전 확인하기

드라이버 또는 커넥터 버전 및 구성 모두를 통해 OCSP 동작이 결정됩니다. 드라이버 또는 커넥터 버전, 구성 및 OCSP 동작에 대한 자세한 내용은 OCSP 구성 를 참조하십시오.

OCSP 응답 캐시하기

모든 통신의 보안을 보장하기 위해 Python용 Snowflake 커넥터는 HTTPS 프로토콜을 사용하여 Snowflake 및 기타 모든 서비스에 연결합니다(예: 스테이징된 데이터 파일용 Amazon S3 및 페더레이션 인증용 Okta). 이 커넥터는 일반 HTTPS 프로토콜뿐만 아니라 OCSP(온라인 인증서 상태 프로토콜)를 통해 각 연결에 대한 TLS/SSL 인증서 해지를 확인하고 인증서가 해지되거나 OCSP 상태를 신뢰할 수 없는 경우 연결을 중단합니다.

각 Snowflake 연결은 OCSP 서버와 최대 3회의 왕복을 시도하므로, 연결에 추가된 네트워크 오버헤드를 줄이기 위해 OCSP 응답을 위한 여러 수준의 캐시가 도입되었습니다.

  • 메모리 캐시, 프로세스의 수명 동안 지속됩니다.

  • 파일 캐시, 캐시 디렉터리(예: ~/.cache/snowflake)가 삭제될 때까지 지속됩니다.

  • OCSP 응답 서버 캐시.

캐싱은 OCSP 서버의 가용성 문제(즉, 실제 OCSP 서버의 작동이 중지된 경우)도 해결할 수 있습니다. 캐시가 유효한 동안 커넥터는 인증서 해지 상태를 확인할 수 있습니다.

캐시 레이어에 OCSP 응답이 포함되어 있지 않으면 클라이언트는 CA의 OCSP 서버에서 유효성 검사 상태를 직접 가져오기 위해 시도합니다.

OCSP 응답 파일 캐시 위치 수정하기

기본적으로 파일 캐시가 활성화되는 위치는 다음과 같으며, 그러므로 추가적인 구성 작업이 필요하지 않습니다.

Linux

~/.cache/snowflake/ocsp_response_cache.json

macOS

~/Library/Caches/Snowflake/ocsp_response_cache.json

Windows

%USERPROFILE%\AppData\Local\Snowflake\Caches\ocsp_response_cache.json

그러나 OCSP 응답 캐시 파일에 다른 위치 및/또는 파일 이름을 지정하려면 connect 메서드에 ocsp_response_cache_filename 매개 변수를 사용하여 OCSP 캐시 파일의 경로 및 이름을 URI의 형식으로 지정할 수 있습니다.

OCSP 응답 캐시 서버

참고

OCSP 응답 캐시 서버는 현재 Python용 Snowflake 커넥터 1.6.0 이상 버전에서 지원됩니다.

OCSP 캐시의 메모리 및 파일 형식은 Snowflake가 영구 호스트와 함께 제공하는 클라이언트 중 하나를 사용하여 Snowflake에 연결된 애플리케이션에 적합합니다. 그러나 AWS Lambda 또는 Docker 등과 같이 동적으로 프로비저닝된 환경에는 적합하지 않습니다.

이러한 상황을 해결하기 위해 Snowflake는 세 번째 수준의 캐싱인 OCSP 응답 캐시 서버를 제공합니다. OCSP 응답 캐시 서버는 CA의 OCSP 서버에서 매시간 OCSP 응답을 가져와서 24시간 동안 저장합니다. 그러면 클라이언트는 이 서버 캐시에서 지정된 Snowflake 인증서의 유효성 검사 상태를 요청할 수 있습니다.

중요

서버 정책이 외부 IP 주소 및 웹 사이트의 대부분 또는 전체에 대한 액세스를 거부하도록 설정된 경우, 서비스가 정상적으로 작동하기 위해서는 반드시 캐시 서버 주소를 허용해야 합니다. 캐시 서버 URL은 ocsp*.snowflakecomputing.com:80 입니다.

어떤 이유로든 캐시 서버를 비활성화해야 하는 경우 SF_OCSP_RESPONSE_CACHE_SERVER_ENABLED 환경 변수를 false 로 설정합니다. 이 값은 대/소문자를 구분하며 반드시 소문자여야 함에 유의하십시오.

데이터베이스, 스키마 및 웨어하우스 만들기

로그인한 후, 아직 없는 경우 CREATE DATABASE, CREATE SCHEMACREATE WAREHOUSE 명령을 사용하여 데이터베이스, 스키마 및 웨어하우스를 생성합니다.

아래 예는 tiny_warehouse 웨어하우스, testdb 데이터베이스 및 testschema 스키마를 생성하는 방법을 보여줍니다. 스키마를 생성할 때는 반드시 스키마를 생성할 데이터베이스의 이름을 지정하거나 스키마를 생성할 데이터베이스에 이미 연결되어 있어야 함에 유의하십시오. 아래 예에서는 USE DATABASE 명령을 실행한 후 CREATE SCHEMA 명령을 실행하여 올바른 데이터베이스에 스키마가 생성되도록 합니다.

        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")

데이터베이스, 스키마 및 웨어하우스 사용하기

테이블을 생성할 데이터베이스 및 스키마를 지정합니다. 또한, DML 문 및 쿼리를 실행하기 위한 리소스를 제공할 웨어하우스를 지정합니다.

예를 들어, testdb 데이터베이스, testschema 스키마 및 tiny_warehouse 웨어하우스(이전에 생성한)를 사용하려면:

        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")

테이블 생성 및 데이터 삽입하기

CREATE TABLE 명령을 사용하여 테이블을 생성하고 INSERT 명령을 사용하여 데이터를 테이블에 채웁니다.

예를 들어, 이름이 testtable 인 테이블을 생성하고 이 테이블에 행을 2개 삽입합니다.

    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")

데이터 로드하기

개별 INSERT 명령을 사용하여 테이블에 데이터를 삽입하는 대신, 내부 또는 외부 위치에 스테이징된 파일에서 데이터를 일괄적으로 로드할 수 있습니다.

내부 위치에서 데이터 복사하기

호스트 컴퓨터의 파일에서 테이블로 데이터를 로드하려면, 우선 PUT 명령을 사용하여 내부 위치의 파일을 스테이징한 후 COPY INTO <테이블> 명령을 사용하여 파일의 데이터를 테이블로 복사합니다.

예:

# Putting Data
con.cursor().execute("PUT file:///tmp/data/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

여기서 CSV 데이터는 Linux 또는 macOS 환경의 로컬 디렉터리인 /tmp/data 에 저장되며 이 디렉터리에는 이름이 file0, file1, … file100 인 파일이 포함됩니다.

외부 위치에서 데이터 복사하기

외부 위치(즉, S3 버킷)에 이미 스테이징된 파일에서 테이블로 데이터를 로드하려면 COPY INTO <테이블> 명령을 사용합니다.

예:

# Copying Data
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
    STORAGE_INTEGRATION = myint
    FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

여기서:

  • s3://<s3_버킷>/data/ 은 S3 버킷의 이름을 지정합니다.

  • 버킷의 파일에는 data 접두사가 사용됩니다.

  • 버킷은 계정 관리자(즉, ACCOUNTADMIN 역할의 사용자) 또는 전역 CREATE INTEGRATION 권한이 있는 역할이 CREATE STORAGE INTEGRATION 을 사용하여 생성한 저장소 통합으로 액세스됩니다. 저장소 통합을 사용하면 사용자는 개인 저장소 위치에 액세스하기 위한 자격 증명을 입력하지 않아도 됩니다.

참고

이 예에서는 format() 함수를 사용하여 문을 구성합니다. 환경에 SQL 삽입 공격의 위험이 있는 경우에는 format() 함수를 사용하는 대신 값을 바인딩하는 것이 좋을 수 있습니다.

데이터 쿼리하기

Python용 Snowflake 커넥터를 통해 다음을 제출할 수 있습니다.

  • 동기 쿼리, 쿼리가 완료된 후 애플리케이션으로 제어를 반환합니다.

  • 비동기 쿼리, 쿼리가 완료되기 전 애플리케이션으로 제어를 반환합니다.

쿼리가 완료된 후, Cursor 오브젝트를 사용하여 결과의 모든 값을 가져올 수 있습니다. 기본적으로, Python용 Snowflake 커넥터는 Snowflake 데이터 타입 에서 네이티브 Python 데이터 타입으로 값을 변환합니다. (값을 문자열로 반환하고 애플리케이션에서 타입을 변환하도록 선택할 수 있음에 유의하십시오. 데이터 변환을 우회하여 쿼리 성능 향상하기 를 참조하십시오.)

참고

기본적으로 NUMBER 열의 값은 배정밀도 부동 소수점 값(float64)으로 반환됩니다. fetch_pandas_all()fetch_pandas_batches() 메서드에서 이를 10진수 값(decimal.Decimal)으로 반환하려면 connect() 메서드의 True 매개 변수를 arrow_number_to_decimal 로 설정합니다.

동기 쿼리 수행하기

동기 쿼리를 수행하려면 Cursor 오브젝트의 execute() 메서드를 호출합니다. 예:

conn = snowflake.connector.connect( ... )
cur = conn.cursor()
cur.execute('select * from products')

cursor 를 사용하여 값 가져오기 의 설명과 같이, Cursor 오브젝트를 사용하여 결과에서 값을 가져옵니다.

비동기 쿼리 수행하기

Python용 Snowflake 드라이버는 비동기 쿼리(쿼리가 완료되기 전에 사용자에게 제어를 반환하는 쿼리)를 지원합니다. 사용자는 비동기 쿼리를 제출하고 폴링을 사용하여 쿼리 완료 시점을 결정할 수 있습니다. 쿼리가 완료된 후에는 결과가 제공됩니다.

참고

To perform asynchronous queries, you must ensure the ABORT_DETACHED_QUERY configuration parameter is FALSE (default value).

Snowflake automatically closes connections after a period of time (default: 5 minutes), which orphans any active queries. If the value is TRUE, Snowflake terminates these orphaned queries, which can impact asynchronous queries.

이 기능을 사용하면 각 쿼리가 완료될 때까지 기다릴 필요 없이 여러 쿼리를 병렬로 제출할 수 있습니다. 또한, 동일한 세션 동안 동기 및 비동기 쿼리의 조합을 실행할 수도 있습니다.

마지막으로, 한 연결에서 비동기 쿼리를 제출하고 다른 연결에서 결과를 확인할 수 있습니다. 예를 들어, 사용자는 애플리케이션에서 장기 실행 쿼리를 시작하고 애플리케이션을 종료하며 나중에 결과를 확인할 수 있도록 애플리케이션을 다시 시작할 수 있습니다.

비동기 쿼리 제출하기

비동기 쿼리를 제출하려면 Cursor 오브젝트의 execute_async() 메서드를 호출합니다. 예:

conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

쿼리를 제출한 후:

비동기 쿼리를 수행하는 예는 비동기 쿼리의 예 를 참조하십시오.

비동기 쿼리의 모범 사례

비동기 쿼리를 제출할 때 따라야 하는 모범 사례는 다음과 같습니다.

  • 다른 쿼리에 종속된 쿼리를 확인한 후 쿼리를 병렬로 실행합니다. 일부 쿼리는 상호 의존적이고 순서가 중요하므로, 병렬 실행에 적합하지 않습니다. 예를 들어, INSERT 문은 해당 CREATE TABLE 문이 종료될 때까지 시작되지 않아야 합니다.

  • 사용 가능한 메모리에 대해 너무 많은 쿼리를 실행하지 않아야 합니다. 여러 쿼리를 병렬로 실행하면 일반적으로 메모리 사용량이 증가하게 됩니다. 특히, 메모리에 2개 이상의 결과 세트가 메모리에 동시에 저장되어 있는 경우에는 사용량이 크게 증가합니다.

  • 폴링 중에 쿼리가 실패하는 드문 경우를 처리해야 합니다.

  • 트랜잭션 제어 문(BEGIN, COMMIT 및 ROLLBACK)은 다른 문과 병렬로 실행되지 않아야 합니다.

Snowflake 쿼리 ID 검색하기

쿼리 ID를 통해 Snowflake에 의해 실행되는 각 쿼리를 식별할 수 있습니다. Python용 Snowflake 커넥터를 사용하여 쿼리를 실행하는 경우에는 Cursor 오브젝트의 sfqid 속성을 통해 쿼리 ID에 액세스할 수 있습니다.

# Retrieving a Snowflake Query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

쿼리 ID를 사용하여 수행할 수 있는 작업은 다음과 같습니다.

쿼리 상태 확인하기

쿼리의 상태를 확인하려면:

  1. Cursor 오브젝트의 sfqid 필드에서 쿼리 ID를 가져옵니다.

  2. Connection 오브젝트의 get_query_status() 메서드로 쿼리 ID를 전달하여 쿼리의 상태를 나타내는 QueryStatus 열거형 상수를 반환합니다.

    기본적으로 get_query_status() 에서는 쿼리 결과가 오류인 경우 오류가 발생되지 않습니다. 오류가 발생하도록 하려면, 대신 get_query_status_throw_if_error() 메서드를 호출해야 합니다.

  3. 쿼리의 상태를 확인하려면 QueryStatus 열거형 상수를 사용합니다.

    • 쿼리가 아직 실행 중인지 확인(예: 비동기 쿼리인지 확인)하려면 Connection 오브젝트의 is_still_running() 메서드로 이 상수를 전달합니다.

    • 오류 발생 여부를 확인하려면 이 상수를 is_an_error() 메서드로 전달합니다.

    열거형 상수의 전체 목록은 QueryStatus 를 참조하십시오.

다음 예에서는 비동기 쿼리를 실행한 후 쿼리의 상태를 확인합니다.

import time
...
# Execute a long-running query asynchronously.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
...
# Wait for the query to finish running.
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status(query_id)):
  time.sleep(1)

다음 예에서는 쿼리 결과에 오류가 있는 경우 오류가 발생합니다.

from snowflake.connector import ProgrammingError
import time
...
# Wait for the query to finish running and raise an error
# if a problem occurred with the execution of the query.
try:
  query_id = cur.sfqid
  while conn.is_still_running(conn.get_query_status_throw_if_error(query_id)):
    time.sleep(1)
except ProgrammingError as err:
  print('Programming Error: {0}'.format(err))

쿼리 ID를 사용하여 쿼리 결과 검색하기

참고

Cursor 오브젝트에 대한 execute() 메서드를 호출하여 비동기 쿼리를 실행한 경우에는 결과를 검색하기 위해 쿼리 ID를 사용할 필요가 없습니다. cursor 를 사용하여 값 가져오기 에서의 설명과 같이, 결과에서 값을 가져오기만 하면 됩니다.

비동기 쿼리 또는 이전에 제출한 동기 쿼리의 결과를 검색하려면, 다음 단계를 따르십시오.

  1. 쿼리의 쿼리 ID를 가져옵니다. Snowflake 쿼리 ID 검색하기 섹션을 참조하십시오.

  2. Cursor 오브젝트의 get_results_from_sfqid() 메서드를 호출하여 결과를 검색합니다.

  3. cursor 를 사용하여 값 가져오기 의 설명과 같이, Cursor 오브젝트를 사용하여 결과에서 값을 가져옵니다.

쿼리가 아직 실행 중인 경우, 페치 메서드(fetchone(), fetchmany(), fetchall() 등)는 쿼리가 완료될 때까지 대기합니다.

예:

# Get the results from a query.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')

cursor 를 사용하여 값 가져오기

커서 오브젝트 반복기 메서드를 사용하여 테이블에서 값을 가져옵니다.

예를 들어, 테이블 생성 및 데이터 삽입하기 에서 이전에 생성한 testtable 테이블에서 《col1》 및 《col2》 열을 가져오려면 다음과 유사한 코드를 사용합니다.

    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()

또는 Python용 Snowflake 커넥터가 편리한 바로 가기를 제공합니다.

for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"):
    print('{0}, {1}'.format(col1, col2))

단일 결과(즉, 단일 행)를 가져오려면 fetchone 메서드를 사용합니다.

col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone()
print('{0}, {1}'.format(col1, col2))

지정된 행의 개수를 한 번에 가져오려면 행의 개수와 함께 fetchmany 메서드를 사용합니다.

cur = con.cursor().execute("SELECT col1, col2 FROM testtable")
ret = cur.fetchmany(3)
print(ret)
while len(ret) > 0:
    ret = cur.fetchmany(3)
    print(ret)

참고

결과 세트가 너무 커 메모리에 적합하지 않은 경우에는 fetchone 또는 fetchmany 를 사용합니다.

모든 결과를 한 번에 가져오려면:

results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall()
for rec in results:
    print('%s, %s' % (rec[0], rec[1]))

쿼리에 대한 시간 초과를 설정하려면 《begin》 명령을 실행하고 쿼리에 시간 초과 매개 변수를 포함합니다. 쿼리가 매개 변수 값의 길이를 초과하면 오류가 발생하고 롤백이 수행됩니다.

다음 코드에서 604 오류는 쿼리가 취소되었음을 의미합니다. 시간 초과 매개 변수를 통해 Timer() 가 시작되고 쿼리가 지정된 시간 내에 완료되지 않으면 취소됩니다.

conn.cursor().execute("create or replace table testtbl(a int, b string)")

conn.cursor().execute("begin")
try:
   conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query

except ProgrammingError as e:
   if e.errno == 604:
      print("timeout")
      conn.cursor().execute("rollback")
   else:
      raise e
else:
   conn.cursor().execute("commit")

DictCursor 를 사용하여 열 이름을 기준으로 값 가져오기

열 이름을 기준으로 값을 가져오려면, DictCursor 타입의 cursor 오브젝트를 생성합니다.

예:

# Querying data by DictCursor
from snowflake.connector import DictCursor
cur = con.cursor(DictCursor)
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for rec in cur:
        print('{0}, {1}'.format(rec['COL1'], rec['COL2']))
finally:
    cur.close()

비동기 쿼리의 예

비동기 쿼리의 간단한 예는 다음과 같습니다.

from snowflake.connector import ProgrammingError
import time

conn = snowflake.connector.connect( ... )
cur = conn.cursor()

# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

# Retrieve the results.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')

다음 예에서는 1개의 연결에서 비동기 쿼리를 제출하고 다른 연결에서 결과를 검색합니다.

from snowflake.connector import ProgrammingError
import time

conn = snowflake.connector.connect( ... )
cur = conn.cursor()

# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

# Get the query ID for the asynchronous query.
query_id = cur.sfqid

# Close the cursor and the connection.
cur.close()
conn.close()

# Open a new connection.
new_conn = snowflake.connector.connect( ... )

# Create a new cursor.
new_cur = new_conn.cursor()

# Retrieve the results.
new_cur.get_results_from_sfqid(query_id)
results = new_cur.fetchall()
print(f'{results[0]}')

쿼리 ID를 기준으로 쿼리 취소하기

쿼리 ID 를 기준으로 쿼리를 취소합니다.

cur = cn.cursor()

try:
  cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')")
  result = cur.fetchall()
  print(len(result))
  print(result[0])
finally:
  cur.close()

《queryID》 문자열을 실제 쿼리 ID로 바꿉니다. 쿼리의 ID를 가져오려면 Snowflake 쿼리 ID 검색하기 를 참조하십시오.

데이터 변환을 우회하여 쿼리 성능 향상하기

쿼리 성능을 향상하려면, snowflake.connector.converter_null 모듈의 SnowflakeNoConverterToPython 클래스를 사용하여 Snowflake 내부 데이터 타입에서 네이티브 Python 데이터 타입으로의 데이터 변환을 우회합니다. 예:

from snowflake.connector.converter_null import SnowflakeNoConverterToPython

con = snowflake.connector.connect(
    ...
    converter_class=SnowflakeNoConverterToPython
)
for rec in con.cursor().execute("SELECT * FROM large_table"):
    # rec includes raw Snowflake data

결과적으로, 모든 데이터는 문자열 형식으로 표시되어 애플리케이션이 네이티브 Python 데이터 타입으로 변환을 수행합니다. 예를 들어, TIMESTAMP_NTZTIMESTAMP_LTZ 데이터는 문자열 형식으로 표시되는 Epoch 시간이며 TIMESTAMP_TZ 데이터는 Epoch 시간 다음에 공백이 오고 그 다음에 UTC에 대한 오프셋(분)이 표시되는 문자열 형식입니다.

바인딩 데이터에는 영향을 주지 않으며, 여전히 Python 네이티브 데이터를 바인딩하여 업데이트에서 사용할 수 있습니다.

데이터 바인딩하기

SQL 문에서 사용되는 값을 지정하려면, 문에 리터럴을 포함하거나 변수를 바인딩할 수 있습니다. 변수를 바인딩할 때 SQL 문의 텍스트에 자리 표시자를 1개 이상 추가하고 각 자리 표시자에 변수(사용할 변수)를 지정합니다.

다음 예는 리터럴과 바인딩을 사용할 때를 비교하여 보여줍니다.

리터럴:

con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")

바인딩:

con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%s, %s)", (
        789,
        'test string3'
    ))

참고

바인딩할 수 있거나 일괄 결합할 수 있는 데이터 크기의 상한에는 제한이 있습니다. 자세한 내용은 쿼리 텍스트 크기 제한 섹션을 참조하십시오.

Snowflake에서 지원되는 바인딩의 타입은 다음과 같습니다.

이와 관련한 각각의 설명은 아래에서 제공됩니다.

pyformat 또는 format 바인딩

pyformat 바인딩 및 format 바인딩 모두 서버측이 아닌 클라이언트측에서 데이터를 바인딩합니다.

기본적으로, Python용 Snowflake 커넥터는 pyformatformat 모두를 지원하므로, 사용자는 %(name)s 또는 %s 를 자리 표시자로 사용할 수 있습니다. 예:

  • %(name)s 를 자리 표시자로 사용:

        conn.cursor().execute(
            "INSERT INTO test_table(col1, col2) "
            "VALUES(%(col1)s, %(col2)s)", {
                'col1': 789,
                'col2': 'test string3',
                })
    
  • %s 를 자리 표시자로 사용:

    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(%s, %s)", (
            789,
            'test string3'
        ))
    

pyformatformat 을 사용하면 목록 오브젝트를 사용하여 IN 연산자를 위해 데이터를 바인딩할 수 있습니다.

# Binding data for IN operator
con.cursor().execute(
    "SELECT col1, col2 FROM testtable"
    " WHERE col2 IN (%s)", (
        ['test string1', 'test string3'],
    ))

퍼센트 문자(《%》)는 SQL LIKE용 와일드카드 문자 및 Python용 형식 바인딩 문자로 사용할 수 있습니다. 형식 바인딩을 사용하고 SQL 명령에 퍼센트 문자가 포함된 경우에는 퍼센트 문자를 이스케이프해야 할 수 있습니다. 예를 들어, SQL 문이 다음과 같은 경우:

SELECT col1, col2
    FROM test_table
    WHERE col2 ILIKE '%York' LIMIT 1;  -- Find York, New York, etc.

Python 코드는 다음과 같아야 합니다(원본 퍼센트 기호를 이스케이프하려면 추가 퍼센트 기호에 유의).

        sql_command = "select col1, col2 from test_table "
        sql_command += " where col2 like '%%York' limit %(lim)s"
        parameter_dictionary = {'lim': 1 }
        cur.execute(sql_command, parameter_dictionary)

qmark 또는 numeric 바인딩

qmark 바인딩 및 numeric 바인딩은 클라이언트측이 아닌 서버측에서 데이터를 바인딩합니다.

  • qmark 바인딩의 경우, 물음표 문자(?)를 사용하여 문자열에서 변수 값을 삽입할 위치를 나타냅니다.

  • numeric 바인딩의 경우, 콜론(:) 뒤에 숫자를 사용하여 해당 위치에 대체될 변수의 위치를 나타냅니다. 예를 들어, :2 는 두 번째 변수를 지정합니다.

    숫자 바인딩을 사용하여 동일한 쿼리에서 동일한 값을 두 번 이상 바인딩합니다. 예를 들어, 두 번 이상 사용할 Long VARCHAR 또는 BINARY 또는 반정형 값이 있는 경우 numeric 바인딩을 사용하면 서버에 값을 한 번 전송하고 여러 번 사용할 수 있습니다.

다음 섹션에서는 qmarknumeric 바인딩의 사용 방법을 설명합니다.

qmark 또는 numeric 바인딩 사용하기

qmark 또는 numeric 스타일 바인딩을 사용하려면 다음 중 하나를 실행합니다.

  • snowflake.connector.paramstyle='qmark'

  • snowflake.connector.paramstyle='numeric'

중요

paramstyle 속성을 설정한 connect() 메서드를 호출합니다.

paramstyleqmark 또는 numeric 을 설정한 경우에는, ? 또는 :N (여기서 N 을 숫자로 대체)을 각각 자리 표시자로 사용해야 합니다.

예:

  • ? 를 자리 표시자로 사용:

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(?, ?)", (
            789,
            'test string3'
        ))
    
  • :N 을 자리 표시자로 사용:

    import snowflake.connector
    
    snowflake.connector.paramstyle='numeric'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(:1, :2)", (
            789,
            'test string3'
        ))
    

    The following query shows how to use numeric binding to reuse a variable:

    con.cursor().execute(
        "INSERT INTO testtable(complete_video, short_sample_of_video) "
        "VALUES(:1, SUBSTRING(:1, :2, :3))", (
            binary_value_that_stores_video,          # variable :1
            starting_offset_in_bytes_of_video_clip,  # variable :2
            length_in_bytes_of_video_clip            # variable :3
        ))
    

datetime 오브젝트와 함께 qmark 또는 numeric 바인딩 사용하기

qmark 또는 numeric 바인딩을 사용하여 데이터를 Snowflake TIMESTAMP 데이터 타입으로 바인딩하는 경우, 바인딩 변수를 Snowflake 타임스탬프 데이터 타입(TIMESTAMP_LTZ 또는 TIMESTAMP_TZ) 및 값을 지정하는 튜플로 설정합니다. 예:

import snowflake.connector

snowflake.connector.paramstyle='qmark'

con = snowflake.connector.connect(...)

con.cursor().execute(
    "CREATE OR REPLACE TABLE testtable2 ("
    "   col1 int, "
    "   col2 string, "
    "   col3 timestamp_ltz"
    ")"
)

con.cursor().execute(
    "INSERT INTO testtable2(col1,col2,col3) "
    "VALUES(?,?,?)", (
        987,
        'test string4',
        ("TIMESTAMP_LTZ", datetime.now())
    )
 )

클라이언트측 바인딩과 달리, 서버측 바인딩에는 열에 대한 Snowflake 데이터 타입이 필요합니다. 가장 일반적인 Python 데이터 타입에는 이미 Snowflake 데이터 타입에 대한 암시적 매핑(예: intFIXED 로 매핑됨)이 있습니다. 그러나 Python datetime 데이터는 여러 Snowflake 데이터 타입 중 1개(TIMESTAMP_NTZ, TIMESTAMP_LTZ 또는 TIMESTAMP_TZ)에 바인딩될 수 있으며 기본 매핑은 TIMESTAMP_NTZ 이므로 사용자가 사용할 Snowflake 데이터 타입을 지정해야 합니다.

IN 연산자와 함께 바인딩 변수 사용하기

qmarknumeric (서버측 바인딩)에서는 IN 연산자와 함께 변수를 바인딩하는 것을 지원하지 않습니다.

바인딩 변수와 IN 연산자를 함께 사용해야 하는 경우에는 클라이언트측 바인딩 (pyformat 또는 format)을 사용하십시오.

일괄 삽입을 위해 매개 변수를 변수에 바인딩하기

애플리케이션 코드에서는 단일 일괄 처리에 여러 행을 삽입할 수 있습니다. 이 작업을 수행하려면 INSERT 문에서 값에 대한 매개 변수를 사용하십시오. 예를 들어, 다음 문에서는 INSERT 문에서 qmark 바인딩을 위해 자리 표시자를 사용합니다.

insert into grocery (item, quantity) values (?, ?)

그리고 삽입할 데이터를 지정하려면 시퀀스의 시퀀스(예: 튜플 목록)인 변수를 정의합니다.

rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]

위의 예에서와 같이, 목록의 각 항목은 삽입할 행에 대한 열 값이 포함된 튜플입니다.

바인딩을 실행하려면 executemany() 메서드를 호출하여 변수를 두 번째 인자로 전달합니다. 예:

conn = snowflake.connector.connect( ... )
rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
conn.cursor().executemany(
    "insert into grocery (item, quantity) values (?, ?)",
    rows_to_insert)

서버측에서 데이터를 바인딩 (즉, qmark 또는 numeric 바인딩)하는 경우, 커넥터는 바인딩을 통해 일괄 삽입의 성능을 최적화할 수 있습니다.

이러한 방식을 사용하여 값을 대량으로 삽입하는 경우 드라이버는 수집을 위한 임시 스테이지로 데이터를 스트리밍하여(로컬 시스템에 파일을 생성하지 않음) 성능을 향상할 수 있습니다. 값의 개수가 임계값을 초과하는 경우 드라이버는 자동으로 이 작업을 수행합니다.

드라이버가 데이터를 임시 스테이지로 전송할 수 있으려면 사용자에게 다음과 같은 스키마 권한이 있어야 합니다.

  • CREATE STAGE.

사용자에게 이러한 권한이 없으면 드라이버는 Snowflake 데이터베이스에 쿼리가 포함된 데이터를 다시 전송합니다.

또한, 세션의 현재 데이터베이스 및 스키마를 설정해야 합니다. 이러한 값이 설정되지 않은 경우에는 드라이버가 실행하는 CREATE TEMPORARY STAGE 명령에서 다음 오류가 발생하며 실패할 수 있습니다.

CREATE TEMPORARY STAGE SYSTEM$BIND file_format=(type=csv field_optionally_enclosed_by='"')
Cannot perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.

참고

Snowflake 데이터베이스에 데이터를 로드하는 대체 방법(COPY 명령을 사용한 대량 로드 등)과 관련해서는 Snowflake에 데이터 로딩하기 를 참조하십시오.

SQL 삽입 공격 방지하기

SQL 삽입의 위험이 있으므로 Python의 형식 지정 함수를 사용하여 데이터를 바인딩하지 말아야 합니다. 예:

# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)d, '%(col2)s')" % {
        'col1': 789,
        'col2': 'test string3'
    })
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%d, '%s')" % (
        789,
        'test string3'
    ))
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES({col1}, '{col2}')".format(
        col1=789,
        col2='test string3')
    )

Instead, store the values in variables and then bind those variables using qmark or numeric binding style.

열 메타데이터 검색하기

결과 세트에서 각 열에 대한 메타데이터(예: 각 열의 이름, 타입, 전체 자릿수, 소수 자릿수 등)를 검색하려면 다음 방식 중 하나를 사용하십시오.

  • 쿼리를 실행하기 위해 execute() 메서드를 호출한 후 메타데이터에 액세스하려면 Cursor 오브젝트의 describe 속성을 사용합니다.

  • 쿼리를 실행할 필요 없이 메타데이터에 액세스하려면 describe() 메서드를 호출합니다.

    describe 메서드는 Python용 Snowflake 커넥터 2.4.6 이상 버전에서 사용할 수 있습니다.

description 속성은 다음 값 중 1개로 설정됩니다.

  • 2.4.5 이하 버전: 튜플의 목록.

  • 2.4.6 이상 버전: ResultMetadata 오브젝트의 목록. (describe 메서드도 이 목록을 반환합니다.)

각 튜플 및 ResultMetadata 오브젝트에는 열에 대한 메타데이터(열 이름, 데이터 타입 등)가 포함되어 있습니다. 메타데이터에는 인덱스를 사용하여 또는 2.4.6 이상 버전의 경우 ResultMetadata 속성을 사용하여 액세스할 수 있습니다.

다음 예는 반환된 튜플 및 ResultMetadata 오브젝트에서 메타데이터에 액세스하는 방법을 보여줍니다.

예: 색인을 사용하여 열 이름 메타데이터 가져오기(2.4.5 및 이전 버전):

다음 예에서는 description 속성을 사용하여 쿼리를 실행한 후 열 이름 목록을 검색합니다. 이 속성은 튜플의 목록으로, 이 예에서는 각 튜플의 첫 번째 값에서 열 이름에 액세스합니다.

    cur = conn.cursor()
    cur.execute("SELECT * FROM test_table")
    print(','.join([col[0] for col in cur.description]))

예: 속성을 사용하여 열 이름 메타데이터 가져오기(2.4.6 이상 버전):

다음 예에서는 description 속성을 사용하여 쿼리를 실행한 후 열 이름 목록을 검색합니다. 이 속성은 ResultMetaData 오브젝트의 목록으로, 이 예에서는 각 ResultMetadata 오브젝트의 name 속성에서 열 이름에 액세스합니다.

    cur = conn.cursor()
    cur.execute("SELECT * FROM test_table")
    print(','.join([col.name for col in cur.description]))

예: 쿼리를 실행하지 않고 열 이름 메타데이터 가져오기(2.4.6 이상 버전):

다음 예에서는 describe 메서드를 사용하여 쿼리를 실행하지 않고 열 이름 목록을 검색합니다. describe() 메서드는 ResultMetaData 오브젝트의 목록을 반환하는데, 이 예에서는 각 ResultMetadata 오브젝트의 name 속성에서 열 이름에 액세스합니다.

    cur = conn.cursor()
    result_metadata_list = cur.describe("SELECT * FROM test_table")
    print(','.join([col.name for col in result_metadata_list]))

오류 처리

애플리케이션은 Snowflake 커넥터에서 발생한 예외를 올바르게 처리하고 코드 실행의 계속 또는 중지 여부를 결정합니다.

# Catching the syntax error
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

execute_stream 을 사용하여 SQL 스크립트 실행하기

execute_stream 함수를 사용하면 스트림에서 1개 이상의 SQL 스크립트를 실행할 수 있습니다.

from codecs import open
with open(sqlfile, 'r', encoding='utf-8') as f:
    for cur in con.execute_stream(f):
        for ret in cur:
            print(ret)

연결 종료하기

close 메서드를 호출하여 연결을 종료하는 것이 모범 사례입니다.

        connection.close()

이를 통해 수집된 클라이언트 메트릭을 서버로 제출하고 세션을 삭제할 수 있습니다. 또한, try-finally 를 사용하면 중간에 예외가 발생하는 경우에도 연결이 종료되지 않도록 할 수 있습니다.

# Connecting to Snowflake
con = snowflake.connector.connect(...)
try:
    # Running queries
    con.cursor().execute(...)
    ...
finally:
    # Closing the connection
    con.close()

컨텍스트 관리자를 사용한 트랜잭션 연결 및 관리

Python용 Snowflake 커넥터는 필요한 경우 리소스를 할당 및 해제하는 컨텍스트 관리자를 지원합니다. 컨텍스트 관리자는 autocommit 이 비활성화된 경우 문의 상태에 따라 트랜잭션을 커밋 또는 롤백하는 데 유용합니다.

# Connecting to Snowflake using the context manager
with snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False,
) as con:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail

위의 예에서, 세 번째 문이 실패하면 컨텍스트 관리자가 트랜잭션의 변경 사항을 롤백하고 연결을 끊습니다. 모든 문이 성공하면 컨텍스트 관리자가 변경 사항을 커밋하고 연결을 끊습니다.

tryexcept 블록에 해당하는 코드는 다음과 같습니다.

# Connecting to Snowflake using try and except blocks
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False)
try:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
    con.commit()
except Exception as e:
    con.rollback()
    raise e
finally:
    con.close()

로깅

Python용 Snowflake 커넥터는 표준 Python logging 모듈을 사용하여 일정 간격으로 상태를 기록함으로써 애플리케이션이 백그라운드로 실행되는 활동을 추적할 수 있도록 해줍니다. 로깅을 활성화하는 가장 간단한 방법은 애플리케이션을 시작할 때 logging.basicConfig() 를 호출하는 것입니다.

예를 들어, 로깅 수준을 INFO 로 설정하고 /tmp/snowflake_python_connector.log 파일에 로그를 저장하려면:

        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)

보다 포괄적인 로깅은 다음과 같이 로깅 수준을 DEBUG 로 설정하여 활성화할 수 있습니다.

# Logging including the timestamp, thread and the source code location
import logging
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)

선택 사항이지만 권장되는 SecretDetector 포맷터 클래스를 사용하면 알려진 민감한 정보 세트를 마스킹한 후 Snowflake Python Connector 로그 파일에 작성할 수 있습니다. SecretDetector를 사용하려면, 다음과 유사한 코드를 사용합니다.

# Logging including the timestamp, thread and the source code location
import logging
from snowflake.connector.secret_detector import SecretDetector
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(SecretDetector('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)

참고

botocoreboto3 는 Python용 AWS(Amazon Web Services) SDK를 통해 사용할 수 있습니다.

간단한 프로그램

다음 샘플 코드에서는 이전 섹션에서 설명한 여러 예를 작동하는 Python 프로그램에 통합합니다. 이 예에는 다음의 두 부분이 포함됩니다.

  • 상위 클래스(《python_veritas_base》)에는 서버 연결과 같은 여러 일반 작업을 위한 코드가 포함됩니다.

  • 하위 클래스(《python_connector_example》)는 테이블 쿼리와 같은 특정 클라이언트를 위한 사용자 지정 부분을 나타냅니다.

이 샘플 코드는 최신 제품 빌드에서의 실행을 보장하기 위해 테스트 중 1개에서 직접 가져온 코드입니다.

이 코드는 테스트에서 가져온 것이므로 일부 테스트에서 사용되는 대체 포트 및 프로토콜을 설정하기 위한 소량의 코드가 포함되어 있습니다. 사용자는 프로토콜 또는 포트 번호를 설정하지 않아야 하며, 대신 이러한 단계를 생략하고 기본값을 사용해야 합니다.

여기에는 문서에 별도로 가져올 수 있는 코드를 나타내기 위한 섹션 마커(《코드 조각 태그》라고도 함)도 포함되어 있습니다. 섹션 마커는 일반적으로 다음과 같습니다.

# -- (> ---------------------- SECTION=import_connector ---------------------
...
# -- <) ---------------------------- END_SECTION ----------------------------

이러한 섹션 마커는 사용자 코드에서 필수가 아닙니다.

코드 샘플의 첫 번째 부분에는 다음을 수행하기 위한 공통 서브루틴이 포함되어 있습니다.

  • 연결 정보가 포함된 명령줄 인자(예: 《–warehouse MyWarehouse》) 읽기.

  • 서버에 연결하기.

  • 웨어하우스, 데이터베이스 및 스키마 만들기 및 사용하기.

  • 사용 완료 시 스키마, 데이터베이스 및 웨어하우스 삭제하기.


import logging
import os
import sys


# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


class python_veritas_base:

    """
    PURPOSE:
        This is the Base/Parent class for programs that use the Snowflake
        Connector for Python.
        This class is intended primarily for:
            * Sample programs, e.g. in the documentation.
            * Tests.
    """


    def __init__(self, p_log_file_name = None):

        """
        PURPOSE:
            This does any required initialization steps, which in this class is
            basically just turning on logging.
        """

        file_name = p_log_file_name
        if file_name is None:
            file_name = '/tmp/snowflake_python_connector.log'

        # -- (> ---------- SECTION=begin_logging -----------------------------
        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)
        # -- <) ---------- END_SECTION ---------------------------------------


    # -- (> ---------------------------- SECTION=main ------------------------
    def main(self, argv):

        """
        PURPOSE:
            Most tests follow the same basic pattern in this main() method:
               * Create a connection.
               * Set up, e.g. use (or create and use) the warehouse, database,
                 and schema.
               * Run the queries (or do the other tasks, e.g. load data).
               * Clean up. In this test/demo, we drop the warehouse, database,
                 and schema. In a customer scenario, you'd typically clean up
                 temporary tables, etc., but wouldn't drop your database.
               * Close the connection.
        """

        # Read the connection parameters (e.g. user ID) from the command line
        # and environment variables, then connect to Snowflake.
        connection = self.create_connection(argv)

        # Set up anything we need (e.g. a separate schema for the test/demo).
        self.set_up(connection)

        # Do the "real work", for example, create a table, insert rows, SELECT
        # from the table, etc.
        self.do_the_real_work(connection)

        # Clean up. In this case, we drop the temporary warehouse, database, and
        # schema.
        self.clean_up(connection)

        print("\nClosing connection...")
        # -- (> ------------------- SECTION=close_connection -----------------
        connection.close()
        # -- <) ---------------------------- END_SECTION ---------------------

    # -- <) ---------------------------- END_SECTION=main --------------------


    def args_to_properties(self, args):

        """
        PURPOSE:
            Read the command-line arguments and store them in a dictionary.
            Command-line arguments should come in pairs, e.g.:
                "--user MyUser"
        INPUTS:
            The command line arguments (sys.argv).
        RETURNS:
            Returns the dictionary.
        DESIRABLE ENHANCEMENTS:
            Improve error detection and handling.
        """

        connection_parameters = {}

        i = 1
        while i < len(args) - 1:
            property_name = args[i]
            # Strip off the leading "--" from the tag, e.g. from "--user".
            property_name = property_name[2:]
            property_value = args[i + 1]
            connection_parameters[property_name] = property_value
            i += 2

        return connection_parameters


    def create_connection(self, argv):

        """
        PURPOSE:
            This gets account identifier and login information from the
            environment variables and command-line parameters, connects to the
            server, and returns the connection object.
        INPUTS:
            argv: This is usually sys.argv, which contains the command-line
                  parameters. It could be an equivalent substitute if you get
                  the parameter information from another source.
        RETURNS:
            A connection.
        """

        # Get account identifier and login information from environment variables and command-line parameters.
        # For information about account identifiers, see
        # https://docs.snowflake.com/en/user-guide/admin-account-identifier.html .
        # -- (> ----------------------- SECTION=set_login_info ---------------

        # Get the password from an appropriate environment variable, if
        # available.
        PASSWORD = os.getenv('SNOWSQL_PWD')

        # Get the other login info etc. from the command line.
        if len(argv) < 11:
            msg = "ERROR: Please pass the following command-line parameters:\n"
            msg += "--warehouse <warehouse> --database <db> --schema <schema> "
            msg += "--user <user> --account <account_identifier> "
            print(msg)
            sys.exit(-1)
        else:
            connection_parameters = self.args_to_properties(argv)
            USER = connection_parameters["user"]
            ACCOUNT = connection_parameters["account"]
            WAREHOUSE = connection_parameters["warehouse"]
            DATABASE = connection_parameters["database"]
            SCHEMA = connection_parameters["schema"]
            # Optional: for internal testing only.
            try:
                PORT = connection_parameters["port"]
            except:
                PORT = ""
            try:
                PROTOCOL = connection_parameters["protocol"]
            except:
                PROTOCOL = ""

        # If the password is set by both command line and env var, the
        # command-line value takes precedence over (is written over) the
        # env var value.

        # If the password wasn't set either in the environment var or on
        # the command line...
        if PASSWORD is None or PASSWORD == '':
            print("ERROR: Set password, e.g. with SNOWSQL_PWD environment variable")
            sys.exit(-2)
        # -- <) ---------------------------- END_SECTION ---------------------

        # Optional diagnostic:
        #print("USER:", USER)
        #print("ACCOUNT:", ACCOUNT)
        #print("WAREHOUSE:", WAREHOUSE)
        #print("DATABASE:", DATABASE)
        #print("SCHEMA:", SCHEMA)
        #print("PASSWORD:", PASSWORD)
        #print("PROTOCOL:" "'" + PROTOCOL + "'")
        #print("PORT:" + "'" + PORT + "'")

        print("Connecting...")
        # If the PORT is set but the protocol is not, we ignore the PORT (bug!!).
        if PROTOCOL is None or PROTOCOL == "" or PORT is None or PORT == "":
            # -- (> ------------------- SECTION=connect_to_snowflake ---------
            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )
            # -- <) ---------------------------- END_SECTION -----------------
        else:

            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA,
                # Optional: for internal testing only.
                protocol=PROTOCOL,
                port=PORT
                )

        return conn


    def set_up(self, connection):

        """
        PURPOSE:
            Set up to run a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.create_warehouse_database_and_schema(connection)


    def do_the_real_work(self, conn):

        """
        PURPOSE:
            Your sub-class should override this to include the code required for
            your documentation sample or your test case.
            This default method does a very simple self-test that shows that the
            connection was successful.
        """

        # Create a cursor for this connection.
        cursor1 = conn.cursor()
        # This is an example of an SQL statement we might want to run.
        command = "SELECT PI()"
        # Run the statement.
        cursor1.execute(command)
        # Get the results (should be only one):
        for row in cursor1:
            print(row[0])
        # Close this cursor.
        cursor1.close()


    def clean_up(self, connection):

        """
        PURPOSE:
            Clean up after a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.drop_warehouse_database_and_schema(connection)


    def create_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Create the temporary schema, database, and warehouse that we use
            for most tests/demos.
        """

        # Create a database, schema, and warehouse if they don't already exist.
        print("\nCreating warehouse, database, schema...")
        # -- (> ------------- SECTION=create_warehouse_database_schema -------
        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------

        # -- (> --------------- SECTION=use_warehouse_database_schema --------
        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


    def drop_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Drop the temporary schema, database, and warehouse that we create
            for most tests/demos.
        """

        # -- (> ------------- SECTION=drop_warehouse_database_schema ---------
        conn.cursor().execute("DROP SCHEMA IF EXISTS testschema_mg")
        conn.cursor().execute("DROP DATABASE IF EXISTS testdb_mg")
        conn.cursor().execute("DROP WAREHOUSE IF EXISTS tiny_warehouse_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


# ----------------------------------------------------------------------------

if __name__ == '__main__':
    pvb = python_veritas_base()
    pvb.main(sys.argv)


코드 샘플의 두 번째 부품에서는 테이블을 생성하고 테이블에 행을 삽입하는 등의 작업을 수행합니다.


import sys

# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


# Import the base class that contains methods used in many tests and code 
# examples.
from python_veritas_base import python_veritas_base


class python_connector_example (python_veritas_base):

  """
  PURPOSE:
      This is a simple example program that shows how to use the Snowflake 
      Python Connector to create and query a table.
  """

  def __init__(self):
    pass


  def do_the_real_work(self, conn):

    """
    INPUTS:
        conn is a Connection object returned from snowflake.connector.connect().
    """

    print("\nCreating table test_table...")
    # -- (> ----------------------- SECTION=create_table ---------------------
    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")
    # -- <) ---------------------------- END_SECTION -------------------------


    print("\nSelecting from test_table...")
    # -- (> ----------------------- SECTION=querying_data --------------------
    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()
    # -- <) ---------------------------- END_SECTION -------------------------




# ============================================================================

if __name__ == '__main__':

    test_case = python_connector_example()
    test_case.main(sys.argv)

이 샘플을 실행하려면 다음을 수행해야 합니다.

  1. 코드의 첫 번째 부분을 《python_veritas_base.py》 파일로 복사합니다.

  2. 코드의 두 번째 부분을 《python_connector_example.py》 파일로 복사합니다.

  3. SNOWSQL_PWD 환경 변수를 비밀번호에 설정합니다. 예:

    export SNOWSQL_PWD='MyPassword'
    
  4. 다음과 유사한 명령줄을 사용하여 프로그램을 실행합니다(사용자 및 계정 정보를 사용자 및 계정 정보로 바꿔야 함).

    경고

    이 작업을 수행하면 프로그램의 마지막에 웨어하우스, 데이터베이스 및 스키마가 삭제됩니다! 손실될 수 있으므로 기존 데이터베이스의 이름을 사용하지 마십시오.

    python3 python_connector_example.py --warehouse <unique_warehouse_name> --database <new_warehouse_zzz_test> --schema <new_schema_zzz_test> --account myorganization-myaccount --user MyUserName
    

출력은 다음과 같습니다.

Connecting...

Creating warehouse, database, schema...

Creating table test_table...

Selecting from test_table...
123, test string1
456, test string2

Closing connection...

길이가 더 긴 예는 다음과 같습니다.

참고

계정 및 로그인 정보를 설정한 섹션에서 Snowflake 로그인 정보(이름, 비밀번호 등)와 일치하도록 변수를 바꿨는지 확인하십시오.

이 예에서는 format() 함수를 사용하여 문을 구성합니다. 환경에 SQL 삽입 공격의 위험이 있는 경우에는 format() 함수를 사용하는 대신 값을 바인딩하는 것이 좋을 수 있습니다.

#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#

# Logging
import logging
logging.basicConfig(
    filename='/tmp/snowflake_python_connector.log',
    level=logging.INFO)

import snowflake.connector

# Set your account and login information (replace the variables with
# the necessary values).
ACCOUNT = '<account_identifier>'
USER = '<login_name>'
PASSWORD = '<password>'

import os

# Only required if you copy data from your S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

# Connecting to Snowflake
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
)

# Creating a database, schema, and warehouse if none exists
con.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
con.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
con.cursor().execute("USE DATABASE testdb")
con.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")

# Using the database, schema and warehouse
con.cursor().execute("USE WAREHOUSE tiny_warehouse")
con.cursor().execute("USE SCHEMA testdb.testschema")

# Creating a table and inserting data
con.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "testtable(col1 integer, col2 string)")
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(123, 'test string1'),(456, 'test string2')")

# Copying data from internal stage (for testtable table)
con.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

# Copying data from external stage (S3 bucket -
# replace <s3_bucket> with the name of your bucket)
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
     STORAGE_INTEGRATION = myint
     FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

# Querying data
cur = con.cursor()
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()

# Binding data
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)s, %(col2)s)", {
        'col1': 789,
        'col2': 'test string3',
        })

# Retrieving column names
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))

# Catching syntax errors
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # user error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

# Retrieving the Snowflake query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

# Closing the connection
con.close()
맨 위로 이동