Python을 사용하여 데이터 로딩 및 언로딩 리소스 관리

Python을 사용하여 Snowflake에서 외부 볼륨, 파이프, 스테이지를 포함한 리소스의 데이터 로딩 및 언로딩을 관리할 수 있습니다.

전제 조건

이 항목의 예제에서는 Snowflake와 연결하고 Snowflake Python APIs 을 사용할 수 있는 Root 오브젝트를 생성하는 코드를 추가했다고 가정합니다.

예를 들어, 다음 코드는 구성 파일에 정의된 연결 매개 변수를 사용하여 Snowflake에 대한 연결을 생성합니다.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

해당 코드에서는 결과 Session 오브젝트를 사용하여 API의 유형과 메서드를 사용하기 위해 Root 오브젝트를 생성합니다. 자세한 내용은 Snowflake Python APIs 을 사용하여 Snowflake에 연결 섹션을 참조하십시오.

스테이지 관리하기

클라우드 저장소의 데이터 파일의 위치인 Snowflake 스테이지를 관리할 수 있습니다. 스테이지에 대한 개요는 데이터 로딩 개요 섹션을 참조하십시오.

Snowflake Python APIs 은 다음 두 가지 별개 유형의 스테이지를 나타냅니다.

  • Stage: 스테이지의 이름, 암호화 유형, 자격 증명, 디렉터리 테이블 설정 등 속성을 표시합니다.

  • StageResource: 해당 Stage 오브젝트를 가져오고, 스테이지에 파일을 업로드 및 목록으로 만들고, 스테이지를 삭제하는 데 사용할 수 있는 메서드를 노출합니다.

스테이지 만들기

스테이지를 생성하려면 먼저 Stage 오브젝트를 생성한 다음 API Root 오브젝트에서 StageCollection 오브젝트를 생성합니다. StageCollection.create 를 사용하여 Snowflake에 새 스테이지를 추가합니다.

다음 예제의 코드는 암호화 유형이 SNOWFLAKE_SSE (서버 측 암호화만 해당)인 my_stage 라는 스테이지를 나타내는 Stage 오브젝트를 생성합니다.

from snowflake.core.stage import Stage, StageEncryption

my_stage = Stage(
  name="my_stage",
  encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
Copy

이 코드는 StageCollection 변수 stages 를 생성하고 StageCollection.create 를 사용하여 Snowflake에 새 스테이지를 생성합니다.

스테이지 세부 정보 얻기

Stage 오브젝트를 반환하는 StageResource.fetch 메서드를 호출하여 스테이지에 대한 정보를 얻을 수 있습니다.

다음 예제의 코드는 my_stage 스테이지에 대한 정보를 가져옵니다.

my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
Copy

목록 스테이지

Stage 오브젝트의 PagedIter 반복기를 반환하는 StageCollection.iter 메서드를 사용하여 스테이지를 나열할 수 있습니다.

다음 예제의 코드는 이름에 my 텍스트가 포함된 스테이지를 나열하고 각 스테이지의 이름을 출력합니다.

from snowflake.core.stage import StageCollection

stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%")  # returns a PagedIter[Stage]
for stage_obj in stage_iter:
  print(stage_obj.name)
Copy

스테이지 작업 수행하기

스테이지에 파일을 업로드하고 스테이지에 파일을 목록으로 나열하는 등 일반적인 스테이지 작업을 StageResource 오브젝트로 수행할 수 있습니다.

스테이지 리소스로 할 수 있는 일부 작업을 보여주기 위해 다음 예제의 코드는 다음을 수행합니다.

  1. 지정된 자동 압축 및 덮어쓰기 옵션을 사용하여 my-file.yaml 파일을 my_stage 스테이지에 업로드합니다.

  2. 파일이 성공적으로 업로드되었는지 확인하기 위해 스테이지에 있는 모든 파일을 나열합니다.

  3. 스테이지를 삭제합니다.

my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]

my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)

stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
  print(stageFile)

my_stage_res.drop()
Copy

파이프 관리하기

Snowpipe가 수집 큐에서 테이블로 데이터를 로드하는 데 사용하는 COPY INTO 문이 포함된 명명된 일급 Snowflake 오브젝트인 Snowflake 파이프를 관리할 수 있습니다. 파이프 개요는 Snowpipe 섹션을 참조하십시오.

Snowflake Python APIs 은 다음 두 가지 별개 유형의 파이프를 나타냅니다.

  • Pipe: 이름과 같은 파이프의 속성과 Snowpipe에서 사용할 COPY INTO 문을 노출합니다.

  • PipeResource: 해당 Pipe 오브젝트를 가져오고, 스테이징된 데이터 파일로 파이프를 새로 고치고, 파이프를 삭제하는 데 사용할 수 있는 메서드를 노출합니다.

파이프 만들기

파이프를 생성하려면 먼저 Pipe 오브젝트를 생성한 다음 API Root 오브젝트에서 PipeCollection 오브젝트를 생성합니다. PipeCollection.create 를 사용하여 Snowflake에 새 파이프를 추가합니다.

다음 예제의 코드는 지정된 COPY INTO 문으로 my_pipe 파이프를 나타내는 Pipe 오브젝트를 생성합니다.

from snowflake.core.pipe import Pipe

my_pipe = Pipe(
  name="my_pipe",
  comment="creating my pipe",
  copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)

pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
Copy

이 코드는 PipeCollection 변수 pipes 를 생성하고 PipeCollection.create 를 사용하여 Snowflake에 새 파이프를 생성합니다.

파이프 세부 정보 얻기

Pipe 오브젝트를 반환하는 PipeResource.fetch 메서드를 호출하여 파이프에 대한 정보를 얻을 수 있습니다.

다음 예제의 코드는 my_pipe 파이프에 대한 정보를 가져옵니다.

my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
Copy

파이프 나열하기

Pipe 오브젝트의 PagedIter 반복기를 반환하는 PipeCollection.iter 메서드를 사용하여 파이프를 나열할 수 있습니다.

다음 예제의 코드는 이름이 my 로 시작하는 파이프를 나열하고 각 계정의 이름을 출력합니다.

from snowflake.core.pipe import PipeCollection

pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%")  # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
  print(pipe_obj.name)
Copy

파이프 작업 수행하기

파이프 새로 고침 및 파이프 삭제와 같은 일반적인 파이프 작업을 PipeResource 오브젝트를 사용하여 수행할 수 있습니다.

참고

현재 ALTER PIPE 의 REFRESH 기능만 지원됩니다.

파이프 리소스로 할 수 있는 작업을 보여주기 위해 다음 예제의 코드는 다음을 수행합니다.

  1. my_pipe 파이프 리소스 오브젝트를 가져옵니다.

  2. 지정된 선택적 접두사(또는 경로)를 사용하여 준비된 데이터 파일로 파이프를 새로 고칩니다.

  3. 파이프를 삭제합니다.

my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]

# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")

my_pipe_res.drop()
Copy

외부 볼륨 관리

외부 볼륨을 관리할 수 있으며, 이는 Snowflake를 Apache Iceberg™ 테이블의 외부 클라우드 저장소에 연결하는 데 사용하는 이름이 지정된 계정 수준의 Snowflake 오브젝트입니다. 자세한 내용은 Apache Iceberg™ 테이블외부 볼륨 섹션을 참조하십시오.

Snowflake Python APIs 은 두 가지 유형의 외부 볼륨을 나타냅니다.

  • ExternalVolume: 외부 볼륨의 이름, 저장 위치 등의 속성을 표시합니다.

  • ExternalVolumeResource: 해당 ExternalVolume 오브젝트를 가져오고 외부 볼륨을 삭제 및 삭제 취소하는 데 사용할 수 있는 메서드를 노출합니다.

외부 볼륨 만들기

외부 볼륨을 생성하려면 먼저 ExternalVolume 오브젝트를 생성한 다음 API Root 오브젝트에서 ExternalVolumeCollection 오브젝트를 생성합니다. ExternalVolumeCollection.create 를 사용하여 Snowflake에 새 외부 볼륨을 추가합니다.

다음 예제의 코드는 지정된 AWS S3 저장소가 있는 my_external_volume 이라는 이름의 외부 볼륨을 나타내는 ExternalVolume 오브젝트를 생성합니다.

from snowflake.core.external_volume import (
    ExternalVolume,
    StorageLocationS3,
)

my_external_volume = ExternalVolume(
    name="my_external_volume",
    storage_locations=[
        StorageLocationS3(
            name="my-s3-us-west-1",
            storage_base_url="s3://MY_EXAMPLE_BUCKET/",
            storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
            encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
        ),
        StorageLocationS3(
            name="my-s3-us-west-2",
            storage_base_url="s3://MY_EXAMPLE_BUCKET/",
            storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
            encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
        ),
    ]
)

root.external_volumes.create(my_external_volume)
Copy

외부 볼륨 세부 정보 가져오기

ExternalVolume 오브젝트를 반환하는 ExternalVolumeResource.fetch 메서드를 호출하여 외부 볼륨에 대한 정보를 얻을 수 있습니다.

다음 예제의 코드는 my_external_volume 외부 볼륨에 대한 정보를 가져옵니다.

my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
Copy

외부 볼륨 나열

ExternalVolume 오브젝트의 PagedIter 반복기를 반환하는 ExternalVolumeCollection.iter 메서드를 사용하여 외부 볼륨을 나열할 수 있습니다.

다음 예제의 코드는 이름이 my 로 시작하는 외부 볼륨을 나열하고 각 계정의 이름을 출력합니다.

external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
  print(external_volume_obj.name)
Copy

외부 볼륨 작업 수행

외부 볼륨 삭제 및 삭제 취소와 같은 일반적인 외부 볼륨 작업을 ExternalVolumeResource 오브젝트로 수행할 수 있습니다.

외부 볼륨 리소스로 할 수 있는 작업을 보여주기 위해 다음 예제의 코드는 다음을 수행합니다.

  1. my_external_volume 외부 볼륨 리소스 오브젝트를 가져옵니다.

  2. 외부 볼륨을 삭제합니다.

  3. 외부 볼륨을 삭제합니다.

my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()
Copy