Python을 사용하여 데이터 로딩 및 언로딩 리소스 관리¶
Python을 사용하여 Snowflake에서 외부 볼륨, 파이프, 스테이지를 포함한 리소스의 데이터 로딩 및 언로딩을 관리할 수 있습니다.
전제 조건¶
이 항목의 예제에서는 Snowflake와 연결하고 Snowflake Python APIs 을 사용할 수 있는 Root
오브젝트를 생성하는 코드를 추가했다고 가정합니다.
예를 들어, 다음 코드는 구성 파일에 정의된 연결 매개 변수를 사용하여 Snowflake에 대한 연결을 생성합니다.
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
해당 코드에서는 결과 Session
오브젝트를 사용하여 API의 유형과 메서드를 사용하기 위해 Root
오브젝트를 생성합니다. 자세한 내용은 Snowflake Python APIs 을 사용하여 Snowflake에 연결 섹션을 참조하십시오.
스테이지 관리하기¶
클라우드 저장소의 데이터 파일의 위치인 Snowflake 스테이지를 관리할 수 있습니다. 스테이지에 대한 개요는 데이터 로딩 개요 섹션을 참조하십시오.
Snowflake Python APIs 은 다음 두 가지 별개 유형의 스테이지를 나타냅니다.
Stage
: 스테이지의 이름, 암호화 유형, 자격 증명, 디렉터리 테이블 설정 등 속성을 표시합니다.StageResource
: 해당Stage
오브젝트를 가져오고, 스테이지에 파일을 업로드 및 목록으로 만들고, 스테이지를 삭제하는 데 사용할 수 있는 메서드를 노출합니다.
스테이지 만들기¶
스테이지를 생성하려면 먼저 Stage
오브젝트를 생성한 다음 API Root
오브젝트에서 StageCollection
오브젝트를 생성합니다. StageCollection.create
를 사용하여 Snowflake에 새 스테이지를 추가합니다.
다음 예제의 코드는 암호화 유형이 SNOWFLAKE_SSE
(서버 측 암호화만 해당)인 my_stage
라는 스테이지를 나타내는 Stage
오브젝트를 생성합니다.
from snowflake.core.stage import Stage, StageEncryption
my_stage = Stage(
name="my_stage",
encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
이 코드는 StageCollection
변수 stages
를 생성하고 StageCollection.create
를 사용하여 Snowflake에 새 스테이지를 생성합니다.
스테이지 세부 정보 얻기¶
Stage
오브젝트를 반환하는 StageResource.fetch
메서드를 호출하여 스테이지에 대한 정보를 얻을 수 있습니다.
다음 예제의 코드는 my_stage
스테이지에 대한 정보를 가져옵니다.
my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
목록 스테이지¶
Stage
오브젝트의 PagedIter
반복기를 반환하는 StageCollection.iter
메서드를 사용하여 스테이지를 나열할 수 있습니다.
다음 예제의 코드는 이름에 my
텍스트가 포함된 스테이지를 나열하고 각 스테이지의 이름을 출력합니다.
from snowflake.core.stage import StageCollection
stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%") # returns a PagedIter[Stage]
for stage_obj in stage_iter:
print(stage_obj.name)
스테이지 작업 수행하기¶
스테이지에 파일을 업로드하고 스테이지에 파일을 목록으로 나열하는 등 일반적인 스테이지 작업을 StageResource
오브젝트로 수행할 수 있습니다.
스테이지 리소스로 할 수 있는 일부 작업을 보여주기 위해 다음 예제의 코드는 다음을 수행합니다.
지정된 자동 압축 및 덮어쓰기 옵션을 사용하여
my-file.yaml
파일을my_stage
스테이지에 업로드합니다.파일이 성공적으로 업로드되었는지 확인하기 위해 스테이지에 있는 모든 파일을 나열합니다.
스테이지를 삭제합니다.
my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]
my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)
stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
print(stageFile)
my_stage_res.drop()
파이프 관리하기¶
Snowpipe가 수집 큐에서 테이블로 데이터를 로드하는 데 사용하는 COPY INTO 문이 포함된 명명된 일급 Snowflake 오브젝트인 Snowflake 파이프를 관리할 수 있습니다. 파이프 개요는 Snowpipe 섹션을 참조하십시오.
Snowflake Python APIs 은 다음 두 가지 별개 유형의 파이프를 나타냅니다.
Pipe
: 이름과 같은 파이프의 속성과 Snowpipe에서 사용할 COPY INTO 문을 노출합니다.PipeResource
: 해당Pipe
오브젝트를 가져오고, 스테이징된 데이터 파일로 파이프를 새로 고치고, 파이프를 삭제하는 데 사용할 수 있는 메서드를 노출합니다.
파이프 만들기¶
파이프를 생성하려면 먼저 Pipe
오브젝트를 생성한 다음 API Root
오브젝트에서 PipeCollection
오브젝트를 생성합니다. PipeCollection.create
를 사용하여 Snowflake에 새 파이프를 추가합니다.
다음 예제의 코드는 지정된 COPY INTO 문으로 my_pipe
파이프를 나타내는 Pipe
오브젝트를 생성합니다.
from snowflake.core.pipe import Pipe
my_pipe = Pipe(
name="my_pipe",
comment="creating my pipe",
copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)
pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
이 코드는 PipeCollection
변수 pipes
를 생성하고 PipeCollection.create
를 사용하여 Snowflake에 새 파이프를 생성합니다.
파이프 세부 정보 얻기¶
Pipe
오브젝트를 반환하는 PipeResource.fetch
메서드를 호출하여 파이프에 대한 정보를 얻을 수 있습니다.
다음 예제의 코드는 my_pipe
파이프에 대한 정보를 가져옵니다.
my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
파이프 나열하기¶
Pipe
오브젝트의 PagedIter
반복기를 반환하는 PipeCollection.iter
메서드를 사용하여 파이프를 나열할 수 있습니다.
다음 예제의 코드는 이름이 my
로 시작하는 파이프를 나열하고 각 계정의 이름을 출력합니다.
from snowflake.core.pipe import PipeCollection
pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%") # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
print(pipe_obj.name)
파이프 작업 수행하기¶
파이프 새로 고침 및 파이프 삭제와 같은 일반적인 파이프 작업을 PipeResource
오브젝트를 사용하여 수행할 수 있습니다.
참고
현재 ALTER PIPE 의 REFRESH 기능만 지원됩니다.
파이프 리소스로 할 수 있는 작업을 보여주기 위해 다음 예제의 코드는 다음을 수행합니다.
my_pipe
파이프 리소스 오브젝트를 가져옵니다.지정된 선택적 접두사(또는 경로)를 사용하여 준비된 데이터 파일로 파이프를 새로 고칩니다.
파이프를 삭제합니다.
my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]
# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")
my_pipe_res.drop()
외부 볼륨 관리¶
외부 볼륨을 관리할 수 있으며, 이는 Snowflake를 Apache Iceberg™ 테이블의 외부 클라우드 저장소에 연결하는 데 사용하는 이름이 지정된 계정 수준의 Snowflake 오브젝트입니다. 자세한 내용은 Apache Iceberg™ 테이블 의 외부 볼륨 섹션을 참조하십시오.
Snowflake Python APIs 은 두 가지 유형의 외부 볼륨을 나타냅니다.
ExternalVolume
: 외부 볼륨의 이름, 저장 위치 등의 속성을 표시합니다.ExternalVolumeResource
: 해당ExternalVolume
오브젝트를 가져오고 외부 볼륨을 삭제 및 삭제 취소하는 데 사용할 수 있는 메서드를 노출합니다.
외부 볼륨 만들기¶
외부 볼륨을 생성하려면 먼저 ExternalVolume
오브젝트를 생성한 다음 API Root
오브젝트에서 ExternalVolumeCollection
오브젝트를 생성합니다. ExternalVolumeCollection.create
를 사용하여 Snowflake에 새 외부 볼륨을 추가합니다.
다음 예제의 코드는 지정된 AWS S3 저장소가 있는 my_external_volume
이라는 이름의 외부 볼륨을 나타내는 ExternalVolume
오브젝트를 생성합니다.
from snowflake.core.external_volume import (
ExternalVolume,
StorageLocationS3,
)
my_external_volume = ExternalVolume(
name="my_external_volume",
storage_locations=[
StorageLocationS3(
name="my-s3-us-west-1",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
StorageLocationS3(
name="my-s3-us-west-2",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
]
)
root.external_volumes.create(my_external_volume)
외부 볼륨 세부 정보 가져오기¶
ExternalVolume
오브젝트를 반환하는 ExternalVolumeResource.fetch
메서드를 호출하여 외부 볼륨에 대한 정보를 얻을 수 있습니다.
다음 예제의 코드는 my_external_volume
외부 볼륨에 대한 정보를 가져옵니다.
my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
외부 볼륨 나열¶
ExternalVolume
오브젝트의 PagedIter
반복기를 반환하는 ExternalVolumeCollection.iter
메서드를 사용하여 외부 볼륨을 나열할 수 있습니다.
다음 예제의 코드는 이름이 my
로 시작하는 외부 볼륨을 나열하고 각 계정의 이름을 출력합니다.
external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
print(external_volume_obj.name)
외부 볼륨 작업 수행¶
외부 볼륨 삭제 및 삭제 취소와 같은 일반적인 외부 볼륨 작업을 ExternalVolumeResource
오브젝트로 수행할 수 있습니다.
외부 볼륨 리소스로 할 수 있는 작업을 보여주기 위해 다음 예제의 코드는 다음을 수행합니다.
my_external_volume
외부 볼륨 리소스 오브젝트를 가져옵니다.외부 볼륨을 삭제합니다.
외부 볼륨을 삭제합니다.
my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()