Snowpark Scala에서 DataFrame 작업하기¶
Snowpark에서 데이터를 쿼리하고 처리하는 주요 방법은 DataFrame을 통하는 것입니다. 이 항목에서는 DataFrame으로 작업하는 방법을 설명합니다.
이 항목의 내용:
데이터를 검색하고 조작하려면 DataFrame 클래스를 사용합니다. DataFrame은 느리게 평가되는 관계형 데이터 세트를 나타내며, 이는 특정 동작이 트리거될 때만 실행됩니다. 어떤 의미에서 DataFrame은 데이터를 검색하기 위해 평가되어야 하는 쿼리와 같습니다.
DataFrame으로 데이터를 가져오려면 다음을 수행하십시오.
DataFrame을 구성하여 데이터 세트의 데이터 소스를 지정합니다.
예를 들어, DataFrame을 만들어 테이블, 외부 CSV 파일 또는 SQL 문 실행의 데이터를 보유할 수 있습니다.
DataFrame의 데이터 세트를 변환하는 방법을 지정합니다.
예를 들어, 어느 열을 선택해야 하는지, 행 필터링 방법, 결과 정렬 및 그룹화 방법 등을 지정할 수 있습니다.
DataFrame으로 데이터를 가져오는 문을 실행합니다.
DataFrame으로 데이터를 가져오려면 동작을 수행하는 메서드(예:
collect()
메서드)를 호출해야 합니다.
다음 섹션에서는 이러한 단계를 더 자세히 설명합니다.
이 섹션의 예 설정하기¶
이 섹션의 일부 예에서는 DataFrame을 사용하여 sample_product_data
라는 테이블을 쿼리합니다. 이러한 예를 실행하려면 다음 SQL 문을 실행하여 이 테이블을 만들고 일부 데이터로 테이블을 채울 수 있습니다.
CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
INSERT INTO sample_product_data VALUES
(1, 0, 5, 'Product 1', 'prod-1', 1, 10),
(2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
(3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
(4, 0, 10, 'Product 2', 'prod-2', 2, 40),
(5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
(6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
(7, 0, 20, 'Product 3', 'prod-3', 3, 70),
(8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
(9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
(10, 0, 50, 'Product 4', 'prod-4', 4, 100),
(11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
(12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);
테이블이 만들어졌는지 확인하려면 다음을 실행합니다.
SELECT * FROM sample_product_data;
DataFrame 구성하기¶
DataFrame을 구성하려면 Session
클래스의 메서드를 사용할 수 있습니다. 다음 각 메서드는 서로 다른 형식의 데이터 원본에서 DataFrame을 구성합니다.
테이블, 뷰 또는 스트림의 데이터에서 DataFrame을 만들려면 다음과 같이
table
메서드를 호출하십시오.// Create a DataFrame from the data in the "sample_product_data" table. val dfTable = session.table("sample_product_data") // To print out the first 10 rows, call: // dfTable.show()
참고
session.table
메서드는Updatable
오브젝트를 반환합니다.Updatable
은DataFrame
을 확장하고 테이블의 데이터 작업을 위한 추가 메서드(예: 데이터 업데이트 및 삭제 메서드)를 제공합니다. 테이블의 행 업데이트, 삭제, 병합하기 섹션을 참조하십시오.값 시퀀스에서 DataFrame을 만들려면 다음과 같이
createDataFrame
메서드를 호출하십시오.// Create a DataFrame containing a sequence of values. // In the DataFrame, name the columns "i" and "s". val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
참고
Snowflake에서 예약한 단어는 DataFrame을 생성할 때 열 이름으로 유효하지 않습니다. 예약어 목록은 예약된 키워드와 제한된 키워드 섹션을 참조하십시오.
값 범위를 포함하는 DataFrame을 만들려면 다음과 같이
range
메서드를 호출하십시오.// Create a DataFrame from a range val dfRange = session.range(1, 10, 2)
스테이지에 있는 파일의 DataFrame을 만들려면
read
를 호출하여DataFrameReader
오브젝트를 가져오십시오.DataFrameReader
오브젝트에서 파일의 데이터 형식에 해당하는 메서드를 다음과 같이 호출하십시오.// Create a DataFrame from data in a stage. val dfJson = session.read.json("@mystage2/data1.json")
SQL 쿼리 결과를 보유할 DataFrame을 만들려면 다음과 같이
sql
메서드를 호출하십시오.// Create a DataFrame from a SQL query val dfSql = session.sql("SELECT name from products")
참고: 테이블 및 스테이징된 파일에서 데이터를 검색하는 SELECT 문을 이 메서드로 실행할 수 있지만, 대신
table
및read
메서드를 사용해야 합니다.table
및read
와 같은 메서드는 개발 도구에서 더 나은 구문 강조 표시, 오류 강조 표시, 지능형 코드 완성 기능을 제공할 수 있습니다.
데이터 세트 변환 방법 지정하기¶
어느 열을 선택해야 하는지, 그리고 결과를 필터링, 정렬, 그룹화하는 방법 등을 지정하려면 데이터 세트를 변환하는 DataFrame 메서드를 호출하십시오. 이러한 메서드에서 열을 식별하려면 열로 평가되는 col
함수 또는 식을 사용하십시오. (열 및 식 지정하기 섹션을 참조하십시오.)
예:
반환되어야 하는 행을 지정하려면 다음과 같이
filter
메서드를 호출하십시오.// Import the col function from the functions object. import com.snowflake.snowpark.functions._ // Create a DataFrame for the rows with the ID 1 // in the "sample_product_data" table. // // This example uses the === operator of the Column object to perform an // equality check. val df = session.table("sample_product_data").filter(col("id") === 1) df.show()
선택해야 하는 열을 지정하려면 다음과 같이
select
메서드를 호출하십시오.// Import the col function from the functions object. import com.snowflake.snowpark.functions._ // Create a DataFrame that contains the id, name, and serial_number // columns in te "sample_product_data" table. val df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number")) df.show()
각 메서드는 변환된 새 DataFrame 오브젝트를 반환합니다. (이 메서드는 원래 DataFrame 오브젝트에 영향을 주지 않습니다.) 즉, 여러 변환을 적용하려는 경우, 이전 메서드 호출에서 반환된 새 DataFrame 오브젝트에서 각 후속 변환 메서드를 호출하여 메서드 호출을 연결 할 수 있다는 뜻입니다.
이러한 변환 메서드는 Snowflake 데이터베이스에서 데이터를 검색하지 않습니다. (DataFrame 평가 동작 수행하기 에 설명된 동작 메서드는 데이터 검색을 수행합니다.) 변환 메서드는 단순히 SQL 문을 구성하는 방법을 지정합니다.
열 및 식 지정하기¶
이러한 변환 메서드를 호출할 때 열 또는 열을 사용하는 식을 지정해야 할 수 있습니다. 예를 들어, select
메서드를 호출할 때, 선택해야 하는 열을 지정해야 합니다.
열을 참조하려면 com.snowflake.snowpark.functions
오브젝트에서 col 함수를 호출하여 Column 오브젝트를 만드십시오.
// Import the col function from the functions object.
import com.snowflake.snowpark.functions._
val dfProductInfo = session.table("sample_product_data").select(col("id"), col("name"))
dfProductInfo.show()
참고
리터럴에 대한 Column
오브젝트를 만들려면 리터럴을 열 오브젝트로 사용하기 을 참조하십시오.
필터, 프로젝션, 조인 조건 등을 지정할 때 식에서 Column
오브젝트를 사용할 수 있습니다. 예를 들면 다음과 같습니다.
filter
메서드와 함께Column
오브젝트를 사용하여 필터 조건을 지정할 수 있습니다.// Specify the equivalent of "WHERE id = 20" // in an SQL SELECT statement. df.filter(col("id") === 20)
// Specify the equivalent of "WHERE a + b < 10" // in an SQL SELECT statement. df.filter((col("a") + col("b")) < 10)
select
메서드와 함께Column
오브젝트를 사용하여 별칭을 정의할 수 있습니다.// Specify the equivalent of "SELECT b * 10 AS c" // in an SQL SELECT statement. df.select((col("b") * 10) as "c")
join
메서드와 함께Column
오브젝트를 사용하여 조인 조건을 정의할 수 있습니다.// Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y" // in an SQL SELECT statement. dfX.join(dfY, col("a_in_X") === col("b_in_Y"))
다른 DataFrame의 열 참조하기¶
동일한 이름을 가진 두 개의 서로 다른 DataFrame 오브젝트의 열을 참조하는 경우(예: 해당 열의 DataFrame 조인), 한 DataFrame 오브젝트의 DataFrame.col
메서드를 사용하여 해당 오브젝트의 열을 참조할 수 있습니다(예: df1.col("name")
및 df2.col("name")
).
다음 예에서는 DataFrame.col
메서드를 사용하여 특정 DataFrame의 열을 참조하는 방법을 보여줍니다. 이 예에서는 key
라는 열이 있는 두 개의 DataFrame 오브젝트를 조인합니다. 이 예에서는 새로 만든 DataFrame의 열 이름을 Column.as
메서드를 사용하여 변경합니다.
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))
apply
메서드를 사용하여 열 참조하기¶
DataFrame.col
메서드의 대안으로 DataFrame.apply
메서드를 사용하여 특정 DataFrame의 열을 참조할 수 있습니다. DataFrame.col
메서드와 마찬가지로 DataFrame.apply
메서드는 열 이름을 입력으로 받아들이고 Column
오브젝트를 반환합니다.
Scala에서 오브젝트에 apply
메서드가 있는 경우, 오브젝트를 함수인 것처럼 호출하여 apply
메서드를 호출할 수 있습니다. 예를 들어 df.apply("column_name")
을 호출하려면 df("column_name")
을 쓰면 됩니다. 다음 호출은 동일합니다.
df.col("<column_name>")
df.apply("<column_name>")
df("<column_name>")
다음 예는 이전 예와 동일하지만, DataFrame.apply
메서드를 사용하여 조인 작업의 열을 참조합니다.
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))
열 오브젝트에 줄임 사용하기¶
col
함수를 사용하는 대신 다음 방법 중 하나로 열을 참조할 수 있습니다.
인용된 열 이름(
$"column_name"
) 앞에 달러 기호를 사용합니다.인용되지 않은 열 이름(
'column_name
) 앞에 아포스트로피(작은따옴표)를 사용합니다.
이렇게 하려면 Session
오브젝트를 만든 후 implicits
오브젝트에서 이름을 가져오십시오.
val session = Session.builder.configFile("/path/to/properties").create
// Import this after you create the session.
import session.implicits._
// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)
// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)
오브젝트 식별자(테이블 이름, 열 이름 등) 주위에 큰따옴표 사용하기¶
지정하는 데이터베이스, 스키마, 테이블, 스테이지의 이름은 Snowflake 식별자 요구 사항 을 준수해야 합니다. 이름을 지정하면 Snowflake는 해당 이름을 대문자로 간주합니다. 예를 들어, 다음 호출은 동일합니다.
// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))
이름이 식별자 요구 사항을 준수하지 않는 경우, 이름 주위에 큰따옴표("
)를 사용해야 합니다. Scala 문자열 리터럴 내에서 큰따옴표 문자를 이스케이프하려면 백슬래시(\
)를 사용하십시오. 예를 들어, 다음 테이블 이름은 문자나 밑줄로 시작하지 않으므로 이름 주위에 큰따옴표를 사용해야 합니다.
val df = session.table("\"10tablename\"")
열 이름을 지정할 때는 이름 주위에 큰따옴표를 사용할 필요가 없습니다. Snowpark 라이브러리는 이름이 식별자 요구 사항을 준수하지 않는 경우 열 이름을 자동으로 큰따옴표로 묶습니다.
// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\""))
// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))
열 이름 주위에 이미 큰따옴표를 추가한 경우, 라이브러리는 이름 주위에 큰따옴표를 추가로 삽입하지 않습니다.
경우에 따라 다음과 같이 열 이름에 큰따옴표 문자가 포함될 수 있습니다.
describe table quoted;
+------------------------+ ...
| name | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted" | ...
+------------------------+ ...
식별자 요구 사항 에 설명된 대로, 큰따옴표로 묶인 식별자 내의 각 큰따옴표 문자에는 두 개의 큰따옴표 문자를 사용해야 합니다(예: "name_with_""air""_quotes"
및 """column_name_quoted"""
).
val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()
식별자가 큰따옴표로 묶인 경우(명시적으로 따옴표를 추가했는지 또는 라이브러리가 따옴표를 추가했는지 여부와 관계없음), Snowflake는 해당 식별자를 대/소문자를 구분하는 것으로 취급함 을 명심하십시오.
// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))
리터럴을 열 오브젝트로 사용하기¶
Column
오브젝트를 전달하는 메서드에서 리터럴을 사용하려면 com.snowflake.snowpark.functions
오브젝트의 lit
함수에 리터럴을 전달하여 리터럴에 대한 Column
오브젝트를 만드십시오. 예를 들면 다음과 같습니다.
// Import for the lit and col functions.
import com.snowflake.snowpark.functions._
// Show the first 10 rows in which num_items is greater than 5.
// Use `lit(5)` to create a Column object for the literal 5.
df.filter(col("num_items").gt(lit(5))).show()
리터럴이 Scala에서 부동 소수점 또는 Double 값인 경우(예: 0.05
는 기본적으로 Double로 처리됨), Snowpark 라이브러리는 값을 해당 Snowpark 데이터 타입으로 암시적으로 캐스팅하는 SQL을 생성합니다(예: 0.05::DOUBLE
). 이렇게 하면 지정된 정확한 숫자와 다른 대략적인 값이 생성될 수 있습니다.
예를 들어, 다음 코드는 필터(0.05
보다 크거나 같은 값과 일치)가 DataFrame의 행과 일치해야 하지만, 일치하는 행을 표시하지 않습니다.
// Create a DataFrame that contains the value 0.05.
val df = session.sql("select 0.05 :: Numeric(5, 2) as a")
// Applying this filter results in no matching rows in the DataFrame.
df.filter(col("a") <= lit(0.06) - lit(0.01)).show()
문제는 lit(0.06)
및 lit(0.01)
이 정확한 값이 아니라 0.06
및 0.01
에 대한 대략적인 값을 생성한다는 것입니다.
이 문제를 방지하려면 다음 방법 중 하나를 사용할 수 있습니다.
옵션 1: 사용하려는 Snowpark 형식으로 리터럴을 캐스팅 합니다. 예를 들어, 전체 자릿수가 5이고 소수 자릿수가 2인 NUMBER 를 사용하려면 다음을 수행합니다.
df.filter(col("a") <= lit(0.06).cast(new DecimalType(5, 2)) - lit(0.01).cast(new DecimalType(5, 2))).show()
옵션 2:
lit
함수에 값을 전달하기 전에 사용하려는 형식으로 값을 캐스팅합니다. 예를 들어, BigDecimal 형식 을 사용하려는 경우:df.filter(col("a") <= lit(BigDecimal(0.06)) - lit(BigDecimal(0.01))).show()
열 오브젝트를 특정 형식으로 캐스팅하기¶
Column
오브젝트를 특정 형식으로 캐스팅하려면 Column.cast 메서드를 호출하고 com.snowflake.snowpark.types package 에서 형식 오브젝트를 전달합니다. 예를 들어, 리터럴을 전체 자릿수가 5이고 소수 자릿수가 2인 NUMBER 로 캐스팅하려면 다음을 수행합니다.
// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._
val decimalValue = lit(0.05).cast(new DecimalType(5,2))
메서드 호출 연결하기¶
DataFrame 오브젝트를 변환하는 각 메서드 는 변환이 적용된 새 DataFrame 오브젝트를 반환하므로 메서드 호출을 연결 하여 추가 방식으로 변환되는 새 DataFrame을 생성할 수 있습니다.
다음 예는 다음과 같이 구성된 DataFrame을 반환합니다.
sample_product_data
테이블을 쿼리합니다.id = 1
인 행을 반환합니다.name
및serial_number
열을 선택합니다.
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
이 예에서는 다음과 같습니다.
session.table("sample_product_data")
는sample_product_data
테이블에 대한 DataFrame을 반환합니다.DataFrame에는 아직 테이블의 데이터가 포함되어 있지 않지만, 오브젝트에는 테이블의 열 정의가 포함되어 있습니다.
filter(col("id") === 1)
은id = 1
인 행을 반환하도록 설정된sample_product_data
테이블에 대한 DataFrame을 반환합니다.DataFrame에는 아직 테이블의 일치하는 행이 포함되어 있지 않다는 점에 다시 유의하십시오. 일치하는 행은 동작 메서드를 호출 할 때까지 검색되지 않습니다.
select(col("name"), col("serial_number"))
는id = 1
인sample_product_data
테이블의 행에 대한name
및serial_number
열을 포함하는 DataFrame을 반환합니다.
메서드 호출을 연결할 때 호출 순서가 중요하다는 점을 명심하십시오. 각 메서드 호출은 변환된 DataFrame을 반환합니다. 변환된 DataFrame에서 후속 호출이 작동하는지 확인하십시오.
예를 들어, 아래 코드에서 select
메서드는 name
및 serial_number
라는 두 열만 포함하는 DataFrame을 반환합니다. 이 DataFrame에 대한 filter
메서드 호출은 실패합니다. 변환된 DataFrame에 없는 id
열을 사용하기 때문입니다.
// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") === 1)
대조적으로 다음 코드는 성공적으로 실행됩니다. sample_product_data
테이블의 모든 열(id
열 포함)이 포함된 DataFrame에서 filter()
메서드가 호출되기 때문입니다.
// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
SQL 문에서 동등한 키워드(SELECT 및 WHERE)를 사용하는 것과는 다른 순서로 select
및 filter
메서드 호출을 수행해야 할 수도 있음을 명심하십시오.
DataFrame의 행 수 제한하기¶
DataFrame의 행 수를 제한하려면 DataFrame.limit 변환 메서드를 사용할 수 있습니다.
Snowpark API는 제한된 수의 행을 검색하고 출력하기 위한 작업 방법도 제공합니다.
DataFrame.first 작업 메서드(쿼리를 실행하고 처음
n
개의 행을 반환하기 위해)DataFrame.show 작업 메서드(쿼리를 실행하고 처음
n
개의 행을 출력하기 위해)
이러한 메서드는 실행되는 SQL 문에 LIMIT 절을 효과적으로 추가합니다.
LIMIT에 대한 사용법 노트 에 설명된 대로, LIMIT와 함께 정렬 순서(ORDER BY)를 지정하지 않는 한 결과는 비결정적입니다.
ORDER BY 절을 LIMIT 절과 함께 유지하려면(예: ORDER BY가 별도의 하위 쿼리에 있지 않도록), sort
메서드에 의해 반환된 DataFrame의 결과를 제한하는 메서드를 호출해야 합니다.
예를 들어, 메서드 호출을 연결 하는 경우:
// Limit the number of rows to 5, sorted by parent_id.
var dfSubset = df.sort(col("parent_id")).limit(5);
// Return the first 5 rows, sorted by parent_id.
var arrayOfRows = df.sort(col("parent_id")).first(5)
// Print the first 5 rows, sorted by parent_id.
df.sort(col("parent_id")).show(5)
열 정의 검색하기¶
DataFrame에 대한 데이터 세트의 열 정의를 검색하려면 schema
메서드를 호출하십시오. 이 메서드는 StructField
오브젝트의 Array
를 포함하는 StructType
오브젝트를 반환합니다. 각 StructField
오브젝트에는 열 정의가 포함됩니다.
// Get the StructType object that describes the columns in the
// underlying rowset.
val tableSchema = session.table("sample_product_data").schema
println("Schema for sample_product_data: " + tableSchema);
반환된 StructType
오브젝트에서 열 이름은 항상 정규화됩니다. 인용되지 않은 식별자는 대문자로 반환되고, 인용된 식별자는 정의된 정확한 대/소문자로 반환됩니다.
다음 예에서는 ID
및 3rd
라는 열이 포함된 DataFrame을 만듭니다. 열 이름 3rd
의 경우, Snowpark 라이브러리는 이름이 식별자 요구 사항을 준수하지 않기 때문에 자동으로 이름을 큰따옴표("3rd"
)로 묶습니다.
이 예에서는 schema
메서드를 호출한 다음, 반환된 StructType
오브젝트에서 names
메서드를 호출하여 열 이름의 ArraySeq
를 가져옵니다. 이름은 schema
메서드에서 반환된 StructType
에서 정규화됩니다.
// Create a DataFrame containing the "id" and "3rd" columns.
val dfSelectedColumns = session.table("sample_product_data").select(col("id"), col("3rd"))
// Print out the names of the columns in the schema. This prints out:
// ArraySeq(ID, "3rd")
println(dfSelectedColumns.schema.names.toSeq)
DataFrame 조인하기¶
DataFrame 오브젝트를 조인하려면 다음과 같이 DataFrame.join 메서드를 호출하십시오.
다음 섹션에서는 DataFrame을 사용하여 조인을 수행하는 방법을 설명합니다.
조인에 대한 샘플 데이터 설정하기¶
다음 섹션의 예에서는 다음 SQL 문을 실행하여 설정할 수 있는 샘플 데이터를 사용합니다.
create or replace table sample_a (
id_a integer,
name_a varchar,
value integer
);
insert into sample_a (id_a, name_a, value) values
(10, 'A1', 5),
(40, 'A2', 10),
(80, 'A3', 15),
(90, 'A4', 20)
;
create or replace table sample_b (
id_b integer,
name_b varchar,
id_a integer,
value integer
);
insert into sample_b (id_b, name_b, id_a, value) values
(4000, 'B1', 40, 10),
(4001, 'B2', 10, 5),
(9000, 'B3', 80, 15),
(9099, 'B4', null, 200)
;
create or replace table sample_c (
id_c integer,
name_c varchar,
id_a integer,
id_b integer
);
insert into sample_c (id_c, name_c, id_a, id_b) values
(1012, 'C1', 10, null),
(1040, 'C2', 40, 4000),
(1041, 'C3', 40, 4001)
;
조인에 대한 열 지정하기¶
DataFrame.join
메서드를 사용하면, 다음 방법 중 하나로 사용할 열을 지정할 수 있습니다.
조인 조건을 설명하는 열 식을 지정합니다.
조인에서 공통 열로 사용해야 하는 하나 이상의 열을 지정합니다.
다음 예는 id_a
라는 열에 대해 내부 조인을 수행합니다.
// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
이 예에서는 조인에 사용할 조건을 지정하기 위해 DataFrame.col
메서드를 사용합니다. 이 메서드에 대한 자세한 내용은 열 및 식 지정하기 섹션을 참조하십시오.
그러면 다음 출력이 나옵니다.
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
----------------------------------------------------------------------
조인 결과에서 중복된 동일한 열 이름¶
조인의 결과 DataFrame에서 Snowpark 라이브러리는 테이블 전체에서 열 이름이 같을 때도 조인된 테이블에서 찾은 열 이름을 사용합니다. 이럴 때 이러한 열 이름은 조인의 결과 DataFrame에서 중복됩니다. 이름을 기준으로 중복된 열에 액세스하려면 열의 원본 테이블을 나타내는 DataFrame에서 col
메서드를 호출하십시오. (열 지정에 대한 자세한 내용은 다른 DataFrame의 열 참조하기 섹션을 참조하십시오.)
다음 예제의 코드는 두 개의 DataFrame을 조인한 다음 조인된 DataFrame에서 select
메서드를 호출합니다. 이 코드는 각각의 DataFrame 오브젝트를 나타내는 변수(dfRhs
및 dfLhs
)에서 col
메서드를 호출하여 선택할 열을 지정합니다. as
메서드를 사용하여 select
메서드가 생성하는 DataFrame에서 열에 새 이름을 지정합니다.
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
val dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"))
dfSelected.show()
그러면 다음 출력이 나옵니다.
------------------------------
|"LEFTVALUE" |"RIGHTVALUE" |
------------------------------
|5 |5 |
|10 |10 |
|15 |15 |
------------------------------
저장 또는 캐싱 전에 열 중복 제거하기¶
조인의 결과 DataFrame에 중복된 열 이름이 포함되어 있을 때 결과를 테이블에 저장하거나 DataFrame을 캐시하기 전에 열을 중복 제거하거나 이름을 바꾸어 DataFrame에서 중복을 제거해야 합니다. 테이블이나 캐시에 저장하는 DataFrame의 중복된 열 이름의 경우 Snowpark 라이브러리는 중복 열 이름을 별칭으로 바꾸어 더 이상 중복되지 않도록 합니다.
다음 예제에서는 열 이름 ID_A
와 VALUE
가 두 테이블의 조인에서 복제된 다음에 결과 캐싱 전에 중복 제거되거나 이름이 바뀌지 않은 경우 캐시된 DataFrame의 출력이 어떻게 표시될지 보여줍니다.
--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A" |"NAME_A" |"l_ZSz7_VALUE" |"ID_B" |"NAME_B" |"r_heec_ID_A" |"r_heec_VALUE" |
--------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
--------------------------------------------------------------------------------------------------
자연 조인 수행하기¶
자연 조인 (DataFrame이 동일 이름을 가진 열에 조인되는 경우)을 수행하려면 DataFrame.naturalJoin 메서드를 호출하십시오.
다음 예에서는 공통 열(열 id_a
)에서 테이블 sample_a
및 sample_b
에 대한 DataFrame을 조인합니다.
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
그러면 다음 출력이 나옵니다.
---------------------------------------------------
|"ID_A" |"VALUE" |"NAME_A" |"ID_B" |"NAME_B" |
---------------------------------------------------
|10 |5 |A1 |4001 |B2 |
|40 |10 |A2 |4000 |B1 |
|80 |15 |A3 |9000 |B3 |
---------------------------------------------------
조인 타입 지정하기¶
기본적으로 DataFrame.join
메서드는 내부 조인을 만듭니다. 다른 유형의 조인을 지정하려면 joinType
인자를 다음 값 중 하나로 설정합니다.
조인 타입 |
|
---|---|
내부 조인 |
|
왼쪽 외부 조인 |
|
오른쪽 외부 조인 |
|
전체 외부 조인 |
|
크로스 조인 |
|
예:
// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"), "left")
dfLeftOuterJoin.show()
그러면 다음 출력이 나옵니다.
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|40 |A2 |10 |4000 |B1 |40 |10 |
|10 |A1 |5 |4001 |B2 |10 |5 |
|80 |A3 |15 |9000 |B3 |80 |15 |
|90 |A4 |20 |NULL |NULL |NULL |NULL |
----------------------------------------------------------------------
여러 테이블 조인¶
여러 테이블을 조인하려면:
각 테이블에 대해 DataFrame을 만듭니다.
첫 번째 DataFrame에서
DataFrame.join
메서드를 호출하고 두 번째 DataFrame을 전달합니다.join
메서드에 의해 반환된 DataFrame을 사용하여join
메서드를 호출하고 세 번째 DataFrame을 전달합니다.
아래와 같이 join
호출을 연결 할 수 있습니다.
val dfFirst = session.table("sample_a")
val dfSecond = session.table("sample_b")
val dfThird = session.table("sample_c")
val dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a") === dfSecond.col("id_a")).join(dfThird, dfFirst.col("id_a") === dfThird.col("id_a"))
dfJoinThreeTables.show()
그러면 다음 출력이 나옵니다.
------------------------------------------------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |"ID_C" |"NAME_C" |"ID_A" |"ID_B" |
------------------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |1012 |C1 |10 |NULL |
|40 |A2 |10 |4000 |B1 |40 |10 |1040 |C2 |40 |4000 |
|40 |A2 |10 |4000 |B1 |40 |10 |1041 |C3 |40 |4001 |
------------------------------------------------------------------------------------------------------------
자체 조인 수행하기¶
테이블을 다른 열에 있는 테이블 자체와 조인해야 하는 경우, 단일 DataFrame으로 자체 조인을 수행할 수 없습니다. 단일 DataFrame을 사용하여 자체 조인을 수행하는 다음 예제 코드는 "id"
에 대한 열 식이 조인의 왼쪽과 오른쪽에 있기 때문에 실패합니다.
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, col("id") === col("parent_id"))
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, df("id") === df("parent_id"))
이 두 예는 모두 다음 예외와 함께 실패합니다.
Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
Instead, join this DataFrame to a clone() of itself.
대신 DataFrame.clone 메서드를 사용하여 DataFrame 오브젝트의 복제본을 만들고 두 DataFrame 오브젝트를 사용하여 조인을 수행합니다.
// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
val dfLhs = session.table("sample_product_data")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()
// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))
dfJoined.show()
동일한 열에서 자체 조인을 수행하려면 USING
절에 대한 열 식의 Seq
를 전달하는 join
메서드를 호출하십시오.
// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
val df = session.table("sample_product_data");
val dfJoined = df.join(df, Seq("key"))
DataFrame 평가 동작 수행하기¶
앞서 언급했듯이 DataFrame은 느리게 평가됩니다. 즉, 사용자가 동작을 수행할 때까지 SQL 문은 실행을 위해 서버로 전송되지 않습니다. 동작은 DataFrame이 평가되도록 하고 해당 SQL 문을 실행을 위해 서버로 보냅니다.
다음 섹션에서는 DataFrame에서 동기적 및 비동기적으로 작업을 수행하는 방법을 설명합니다.
동기적으로 작업 수행하기¶
동기적으로 작업을 수행하려면 다음 작업 메서드 중 하나를 호출합니다.
동기적으로 작업을 수행하는 방법 |
설명 |
---|---|
|
DataFrame을 평가하고 결과 데이터 세트를 행 오브젝트의 |
|
DataFrame을 평가하고 행 오브젝트의 반복기 를 반환합니다. 결과 세트가 큰 경우, 모든 결과를 한 번에 메모리에 로딩하지 않으려면 이 메서드를 사용하십시오. 행에 대한 반복기 반환하기 섹션을 참조하십시오. |
|
DataFrame을 평가하고 행 수를 반환합니다. |
|
DataFrame을 평가하고 콘솔에 행을 출력합니다. 이 메서드는 행 수를 10(기본값)으로 제한합니다. DataFrame의 행 출력하기 섹션을 참조하십시오. |
|
쿼리를 실행하고 임시 테이블을 만들고 결과를 테이블에 넣습니다. 이 메서드는 이 임시 테이블의 데이터에 액세스하는 데 사용할 수 있는 |
|
DataFrame의 데이터를 지정된 테이블에 저장합니다. 테이블에 데이터 저장하기 섹션을 참조하십시오. |
|
스테이지의 지정된 파일에 DataFrame을 저장합니다. 스테이지의 파일에 DataFrame 저장하기 섹션을 참조하십시오. |
|
DataFrame의 데이터를 지정된 테이블에 복사합니다. 파일에서 테이블로 데이터 복사하기 섹션을 참조하십시오. |
|
지정된 테이블의 행을 삭제합니다. 테이블의 행 업데이트, 삭제, 병합하기 섹션을 참조하십시오. |
|
지정된 테이블의 행을 업데이트합니다. 테이블의 행 업데이트, 삭제, 병합하기 섹션을 참조하십시오. |
|
행을 지정된 테이블에 병합합니다. 테이블의 행 업데이트, 삭제, 병합하기 섹션을 참조하십시오. |
쿼리를 실행하고 결과 수를 반환하려면 다음과 같이 count
메서드를 호출하십시오.
// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")
// Send the query to the server for execution and
// print the count of rows in the table.
println("Rows returned: " + dfProducts.count())
다음을 위해 작업 메서드를 호출할 수도 있습니다.
참고: DataFrame의 열 정의를 가져오기 위해 schema
메서드를 호출하는 경우, 동작 메서드를 호출할 필요가 없습니다.
비동기적으로 작업 수행하기¶
참고
이 기능은 Snowpark 0.11.0에서 도입되었습니다.
작업을 비동기적으로 수행하려면, “비동기 액터” 오브젝트(예: DataFrameAsyncActor
)를 반환하는 async
메서드를 호출하고 해당 오브젝트에서 비동기 작업 메서드를 호출하십시오.
비동기 액터 오브젝트의 이러한 작업 메서드는 TypedAsyncJob
오브젝트를 반환하며, 이를 사용하여 비동기 작업의 상태를 확인하고 작업 결과를 검색할 수 있습니다.
다음 섹션에서는 작업을 비동기적으로 수행하고 결과를 확인하는 방법을 설명합니다.
비동기 작업의 기본 흐름 이해하기¶
다음 메서드를 사용하여 작업을 비동기적으로 수행할 수 있습니다.
작업을 비동기적으로 수행하는 방법 |
설명 |
---|---|
|
DataFrame을 비동기적으로 평가하여 결과 데이터 세트를 행 오브젝트의 |
|
DataFrame을 비동기적으로 평가하여 행 오브젝트의 반복기 를 검색합니다. 결과 세트가 큰 경우, 모든 결과를 한 번에 메모리에 로딩하지 않으려면 이 메서드를 사용하십시오. 행에 대한 반복기 반환하기 섹션을 참조하십시오. |
|
DataFrame을 비동기적으로 평가하여 행 수를 검색합니다. |
|
DataFrame의 데이터를 지정된 테이블에 비동기적으로 저장합니다. 테이블에 데이터 저장하기 섹션을 참조하십시오. |
|
스테이지의 지정된 파일에 DataFrame을 저장합니다. 스테이지의 파일에 DataFrame 저장하기 섹션을 참조하십시오. |
|
DataFrame의 데이터를 지정된 테이블에 비동기적으로 복사합니다. 파일에서 테이블로 데이터 복사하기 섹션을 참조하십시오. |
|
지정된 테이블의 행을 비동기적으로 삭제합니다. 테이블의 행 업데이트, 삭제, 병합하기 섹션을 참조하십시오. |
|
지정된 테이블의 행을 비동기적으로 업데이트합니다. 테이블의 행 업데이트, 삭제, 병합하기 섹션을 참조하십시오. |
|
행을 지정된 테이블에 비동기적으로 병합합니다. 버전 1.3.0 이상에서 지원됩니다. 테이블의 행 업데이트, 삭제, 병합하기 섹션을 참조하십시오. |
반환된 TypedAsyncJob 오브젝트에서 다음을 수행할 수 있습니다.
작업이 완료되었는지 확인하려면
isDone
메서드를 호출합니다.작업에 해당하는 쿼리 ID를 얻으려면
getQueryId
메서드를 호출합니다.작업의 결과를 반환하려면(예:
collect
메서드에 대한Row
오브젝트의Array
또는count
메서드에 대한 행 수)getResult
메서드를 호출합니다.getResult
는 호출 차단입니다.작업을 취소하려면
cancel
메서드를 호출합니다.
예를 들어 쿼리를 비동기적으로 실행하고 결과를 Row
오브젝트의 Array
로 검색하려면 DataFrame.async.collect
를 호출합니다.
// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
val df = session.table("sample_product_data").select(col("id"), col("name"))
// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.collect()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
val results = asyncJob.getResult()
results.foreach(println)
쿼리를 비동기적으로 실행하고 결과 수를 검색하려면 DataFrame.async.count
를 호출합니다.
// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")
// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.count()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Print the count of rows in the table.
// Note that getResult is a blocking call.
println("Rows returned: " + asyncJob.getResult())
최대 대기 시간(초) 지정하기¶
getResult
메서드를 호출할 때, 결과 검색 시도 전에 쿼리가 완료될 때까지 대기하는 최대 시간(초)을 maxWaitTimeInSeconds
인자로 지정할 수 있습니다. 예를 들면 다음과 같습니다.
// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
val results = asyncJob.getResult(10)
이 인자를 생략하면 메서드는 snowpark_request_timeout_in_seconds 구성 속성에 지정된 최대 시간(초) 동안 기다립니다. (Session 오브젝트 생성 시 설정할 수 있는 속성입니다.)
ID로 비동기 쿼리에 액세스하기¶
이전에 제출한 비동기 쿼리의 쿼리 ID가 있는 경우, 쿼리 상태 확인, 쿼리 결과 검색, 쿼리 취소에 사용할 수 있는 AsyncJob 오브젝트를 Session.createAsyncJob
메서드 호출을 통해 만들 수 있습니다.
TypedAsyncJob
과 달리 AsyncJob
은 결과를 검색하는 getResult
메서드를 제공하지 않습니다. 결과를 검색해야 하는 경우 getRows
또는 getIterator
메서드를 대신 호출하십시오.
예:
val asyncJob = session.createAsyncJob(myQueryId)
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
val rows = asyncJob.getRows()
rows.foreach(println)
DataFrame으로 행 가져오기¶
DataFrame을 변환하는 방법 을 지정한 후, 작업 메서드를 호출 하여 쿼리를 실행하고 결과를 반환할 수 있습니다. Array
의 모든 행을 반환하거나, 결과를 행 단위로 반복할 수 있는 Iterator 를 반환할 수 있습니다. 후자의 경우, 데이터 양이 많으면 많은 양의 데이터가 메모리에 로딩되는 것을 방지하기 위해 행을 청크로 메모리에 로딩합니다.
모든 행 반환하기¶
모든 행을 한 번에 반환하려면 DataFrame.collect 메서드를 호출하십시오. 이 메서드는 행 오브젝트의 배열을 반환합니다. 행에서 값을 검색하려면 getType
메서드(예: getString
, getInt
등)를 호출하십시오.
예:
import com.snowflake.snowpark.functions_
val rows = session.table("sample_product_data").select(col("name"), col("category_id")).sort(col("name")).collect()
for (row <- rows) {
println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
행에 대한 반복기 반환하기¶
반복기 를 사용하여 결과의 행 오브젝트를 반복하려면 DataFrame.toLocalIterator 을 호출하십시오. 결과의 데이터 양이 많은 경우 메서드는 모든 행을 한 번에 메모리에 로딩하지 않도록 청크로 행을 로딩합니다.
예:
import com.snowflake.snowpark.functions_
while (rowIterator.hasNext) {
val row = rowIterator.next()
println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
처음 n
개 행 반환하기¶
처음 n
개 행을 반환하려면 DataFrame.first 메서드를 호출하여 반환할 행 수를 전달하십시오.
DataFrame의 행 수 제한하기 에 설명된 대로 결과는 비결정적입니다. 결과가 결정적이기를 원하는 경우, 정렬된 DataFrame(df.sort().first()
)에서 이 메서드를 호출하십시오.
예:
import com.snowflake.snowpark.functions_
val df = session.table("sample_product_data")
val rows = df.sort(col("name")).first(5)
rows.foreach(println)
DataFrame의 행 출력하기¶
DataFrame의 처음 10개 행을 콘솔에 출력하려면 DataFrame.show 메서드를 호출하십시오. 다른 수의 행을 출력하려면 인쇄할 행 수를 전달하십시오.
DataFrame의 행 수 제한하기 에 설명된 대로 결과는 비결정적입니다. 결과가 결정적이기를 원하는 경우, 정렬된 DataFrame(df.sort().show()
)에서 이 메서드를 호출하십시오.
예:
import com.snowflake.snowpark.functions_
val df = session.table("sample_product_data")
df.sort(col("name")).show()
테이블의 행 업데이트, 삭제, 병합하기¶
참고
이 기능은 Snowpark 0.7.0에서 도입되었습니다.
Session.table
을 호출하여 테이블에 대한 DataFrame
오브젝트를 만들면 이 메서드는 테이블의 데이터를 업데이트 및 삭제하기 위한 추가 메서드로 DataFrame
을 확장하는 Updatable
오브젝트를 반환합니다. (Updatable 를 참조하십시오.)
테이블의 행을 업데이트하거나 삭제해야 하는 경우 Updatable
클래스의 다음 메서드를 사용할 수 있습니다.
테이블의 기존 행을 업데이트하려면
update
를 호출하십시오. 테이블의 행 업데이트하기 섹션을 참조하십시오.테이블에서 행을 삭제하려면
delete
를 호출하십시오. 테이블의 행 삭제하기 섹션을 참조하십시오.두 번째 테이블 또는 하위 쿼리의 데이터를 기반으로 테이블 하나에서 행을 삽입, 업데이트, 삭제하려면
merge
를 호출하십시오. (이는 SQL의 MERGE 명령과 동일합니다.) 테이블에 행 병합하기 섹션을 참조하십시오.
테이블의 행 업데이트하기¶
update
메서드의 경우, 업데이트할 열 및 이러한 열에 할당할 해당 값을 연결하는 Map
을 전달합니다. update
는 업데이트된 행 수를 포함하는 UpdateResult
오브젝트를 반환합니다. (UpdateResult 섹션을 참조하십시오.)
참고
update
는 동작 메서드 입니다. 즉, 메서드를 호출하면 실행을 위해 SQL 문을 서버로 보냅니다.
예를 들어, count
라는 열의 값을 값 1
로 바꾸려면 다음을 수행합니다.
val updatableDf = session.table("sample_product_data")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")
위의 예에서는 열 이름을 사용하여 열을 식별합니다. 열 식을 사용할 수도 있습니다.
val updateResult = updatableDf.update(Map(col("count") -> lit(1)))
조건이 충족될 때만 업데이트해야 하는 경우, 해당 조건을 인자로 지정할 수 있습니다. 예를 들어, category_id
열의 값이 20
인 행에 대해 count
라는 열의 값을 바꾸려면 다음을 수행합니다.
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), col("category_id") === 20)
다른 DataFrame
오브젝트와의 조인에 조건을 적용해야 하는 경우, 해당 DataFrame
을 인자로 전달하고 조건에서 해당 DataFrame
을 사용할 수 있습니다. 예를 들어, category_id
열이 DataFrame
dfParts
의 category_id
와 일치하는 행에 대해 count
라는 열의 값을 바꾸려면 다음을 수행합니다.
val updatableDf = session.table("sample_product_data")
val dfParts = session.table("parts")
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), updatableDf("category_id") === dfParts("category_id"), dfParts)
테이블의 행 삭제하기¶
delete
메서드의 경우, 삭제할 행을 식별하는 조건을 지정할 수 있으며 해당 조건은 다른 DataFrame과의 조인을 기반으로 할 수 있습니다. delete
는 삭제된 행 수를 포함하는 DeleteResult
오브젝트를 반환합니다. (DeleteResult 섹션을 참조하십시오.)
참고
delete
는 동작 메서드 입니다. 즉, 메서드를 호출하면 실행을 위해 SQL 문을 서버로 보냅니다.
예를 들어 category_id
열에 값 1
이 있는 행을 삭제하려면:
val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === 1)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
다른 DataFrame의 열을 조건이 참조하는 경우 해당 DataFrame을 두 번째 인자로 전달합니다. 예를 들어 category_id
열이 DataFrame
dfParts
의 category_id
와 일치하는 행을 삭제하려면 dfParts
를 두 번째 인자로 전달합니다.
val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === dfParts("category_id"), dfParts)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
테이블에 행 병합하기¶
두 번째 테이블 또는 하위 쿼리(SQL의 MERGE 명령과 동일)의 값을 기반으로 테이블 하나에서 행을 삽입, 업데이트, 삭제하려면 다음을 수행하십시오.
데이터를 병합하려는 테이블의
Updatable
오브젝트에서 다른 테이블의DataFrame
오브젝트 및 조인 조건의 열 식을 전달하여merge
메서드를 호출합니다.이는 일치하는 행 및 일치하지 않는 행에 대해 수행할 동작(예: 삽입, 업데이트 또는 삭제)을 지정하는 데 사용할 수 있는
MergeBuilder
오브젝트를 반환합니다. (MergeBuilder 섹션을 참조하십시오.)MergeBuilder
오브젝트를 사용하여 다음을 수행합니다.일치하는 행에 대해 수행해야 하는 업데이트 또는 삭제를 지정하려면
whenMatched
메서드를 호출합니다.행을 업데이트하거나 삭제해야 하는 추가 조건을 지정해야 하는 경우, 해당 조건에 대한 열 식을 전달할 수 있습니다.
이 메서드는 수행할 동작을 지정하는 데 사용할 수 있는
MatchedClauseBuilder
오브젝트를 반환합니다. (MatchedClauseBuilder 섹션을 참조하십시오.)MatchedClauseBuilder
오브젝트에서update
또는delete
메서드를 호출하여, 일치하는 행에서 수행해야 하는 업데이트 또는 삭제 동작을 지정합니다. 이러한 메서드는 추가 절을 지정하는 데 사용할 수 있는MergeBuilder
오브젝트를 반환합니다.행이 일치하지 않을 때 수행되어야 하는 삽입을 지정하려면
whenNotMatched
메서드를 호출합니다.행을 삽입해야 할 때 추가 조건을 지정해야 하는 경우, 해당 조건에 대한 열 식을 전달할 수 있습니다.
이 메서드는 수행할 동작을 지정하는 데 사용할 수 있는
NotMatchedClauseBuilder
오브젝트를 반환합니다. (NotMatchedClauseBuilder 섹션을 참조하십시오.)NotMatchedClauseBuilder
오브젝트에서insert
메서드를 호출하여, 행이 일치하지 않을 때 수행해야 하는 삽입 동작을 지정합니다. 이러한 메서드는 추가 절을 지정하는 데 사용할 수 있는MergeBuilder
오브젝트를 반환합니다.
수행해야 하는 삽입, 업데이트, 삭제를 지정한 경우,
MergeBuilder
오브젝트의collect
메서드를 호출하여 테이블에 대해 지정된 삽입, 업데이트, 삭제를 수행합니다.collect
는 삽입, 업데이트, 삭제된 행 수를 포함하는MergeResult
오브젝트를 반환합니다. (MergeResult 섹션을 참조하십시오.)
다음 예에서는 target
테이블에 일치하는 ID가 있는 행이 없는 경우 source
테이블에서 id
및 value
열이 있는 행을 target
테이블에 삽입합니다.
val mergeResult = target.merge(source, target("id") === source("id"))
.whenNotMatched.insert(Seq(source("id"), source("value")))
.collect()
다음 예에서는 동일한 ID를 가진 source
테이블의 행에 있는 value
열의 값으로 target
테이블의 행을 업데이트합니다.
val mergeResult = target.merge(source, target("id") === source("id"))
.whenMatched.update(Map("value" -> source("value")))
.collect()
테이블에 데이터 저장하기¶
DataFrame의 내용을 새 테이블이나 기존 테이블에 저장할 수 있습니다. 이렇게 하려면 다음 권한이 있어야 합니다.
테이블이 존재하지 않는 경우 스키마에 대한 CREATE TABLE 권한.
테이블에 대한 INSERT 권한.
DataFrame의 내용을 테이블에 저장하려면 다음을 수행하십시오.
DataFrame.write 메서드를 호출하여 DataFrameWriter 오브젝트를 가져옵니다.
DataFrameWriter.mode 메서드를 호출하여, 테이블에 쓰기 위한 기본 설정을 지정하는 SaveMode 오브젝트를 전달합니다.
행을 삽입하려면
SaveMode.Append
를 전달합니다.기존 테이블을 덮어쓰려면
SaveMode.Overwrite
를 전달합니다.
이 메서드는 지정된 모드로 구성된 동일한
DataFrameWriter
오브젝트를 반환합니다.기존 테이블(
SaveMode.Append
)에 행을 삽입하고 DataFrame의 열 이름이 테이블의 열 이름과 일치하는 경우 DataFrameWriter.option 메서드를 호출하여"columnOrder"
및"name"
을 인자로 전달합니다.참고
이 메서드는 Snowpark 1.4.0에서 도입되었습니다.
기본적으로,
columnOrder
옵션은"index"
로 설정되는데, 이는 곧DataFrameWriter
가 열이 나타나는 순서대로 값을 삽입한다는 의미입니다. 예를 들어DataFrameWriter
는 테이블의 첫 번째 열에 있는 DataFrame에서 첫 번째 열의 값을 테이블의 두 번째 열에 있는 DataFrame의 두 번째 열에 삽입하는 식입니다.이 메서드는 지정된 옵션으로 구성된 동일한
DataFrameWriter
오브젝트를 반환합니다.DataFrameWriter.saveAsTable 를 호출하여 DataFrame의 내용을 지정 테이블에 저장합니다.
데이터를 테이블에 저장하는 SQL 문을 실행하기 위해 별도의 메서드(예:
collect
)를 호출할 필요가 없습니다.saveAsTable
은 SQL 문을 실행하는 동작 메서드 입니다.
다음 예에서는 (tableName
변수로 식별되는) 기존 테이블을 DataFrame df
의 내용으로 덮어씁니다.
df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)
다음 예에서는 DataFrame df
의 행을 (tableName
변수로 식별되는) 기존 테이블에 삽입합니다. 이 예에서 테이블과 DataFrame에 모두 c1
열과 c2
열이 있습니다.
이 예에서는 columnOrder
옵션을 (DataFrame 열과 같은 이름을 가진 테이블 열에 값을 삽입하는) "name"
으로 설정하는 것과 (DataFrame의 열 순서를 기준으로 테이블 열에 값을 삽입하는) 기본 columnOrder
옵션을 사용하는 것의 차이점을 보여줍니다.
val df = session.sql("SELECT 1 AS c2, 2 as c1")
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write.mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName)
// With the default value of the columnOrder option ("index"), the DataFrameWriter the uses column positions
// and inserts a row with the values (1, 2).
df.write.mode(SaveMode.Append).saveAsTable(tableName)
DataFrame에서 뷰 만들기¶
DataFrame에서 뷰를 만들려면 DataFrame.createOrReplaceView 메서드를 호출하십시오.
df.createOrReplaceView("db.schema.viewName")
createOrReplaceView
를 호출하면 즉시 새 뷰가 만들어집니다. 더 중요한 점은 이 동작이 DataFrame 평가를 유발하지 않는다는 것입니다. (DataFrame 자체는 사용자가 동작을 수행 할 때까지 평가되지 않습니다.)
createOrReplaceView
를 호출하여 만든 뷰는 영구적입니다. 해당 뷰가 더 이상 필요하지 않으면 뷰를 수동으로 삭제 할 수 있습니다.
단순히 세션을 위한 임시 뷰를 만들어야 하는 경우 DataFrame.createOrReplaceTempView 메서드를 대신 호출하십시오.
df.createOrReplaceTempView("db.schema.viewName")
DataFrame 캐시하기¶
어떤 경우에는 복잡한 쿼리를 수행하고 (같은 쿼리를 다시 실행하는 대신) 후속 작업에서 사용할 수 있도록 그 결과를 보관해야 할 수도 있습니다. 이러한 경우, DataFrame.cacheResult 메서드를 호출하여 DataFrame의 내용을 캐시할 수 있습니다.
이 메서드:
쿼리를 실행합니다.
cacheResult
를 호출하기 전에 결과를 검색하려고 별도의 작업 메서드를 호출 할 필요는 없습니다.cacheResult
는 쿼리를 실행하는 작업 메서드입니다.결과를 임시 테이블에 저장함
cacheResult
는 임시 테이블을 만들므로, 사용자에게는 사용 중인 스키마에 대한 CREATE TABLE 권한이 있어야 합니다.임시 테이블의 결과에 액세스할 수 있게 해주는 HasCachedResult 오브젝트를 반환합니다.
HasCachedResult
는DataFrame
을 확장하므로, DataFrame에서 수행할 수 있는 것과 동일한 일부 작업을 이 캐시된 데이터에서 수행할 수 있습니다.
참고
cacheResult
는 쿼리를 실행하고 결과를 테이블에 저장하므로, 이 방법을 사용하면 컴퓨팅 및 저장 비용이 증가할 수 있습니다.
예:
import com.snowflake.snowpark.functions_
// Set up a DataFrame to query a table.
val df = session.table("sample_product_data").filter(col("category_id") > 10)
// Retrieve the results and cache the data.
val cachedDf = df.cacheResult()
// Create a DataFrame containing a subset of the cached data.
val dfSubset = cachedDf.filter(col("category_id") === lit(20)).select(col("name"), col("category_id"))
dfSubset.show()
참고로, 원래 DataFrame은 이 메서드를 호출할 때 영향을 받지 않습니다. 예를 들어, dfTable
이 테이블 sample_product_data
에 대한 DataFrame 이라고 가정합니다.
val dfTempTable = dfTable.cacheResult()
cacheResult
를 호출한 후에도 dfTable
은 여전히 sample_product_data
테이블을 가리키며, 사용자는 계속해서 dfTable
을 사용하여 해당 테이블을 쿼리하고 업데이트할 수 있습니다.
임시 테이블의 캐시된 데이터를 사용하려면 dfTempTable
(cacheResult
에 의해 반환된 HasCachedResult
오브젝트)을 사용합니다.
스테이지에서 파일 작업하기¶
Snowpark 라이브러리는 파일을 스테이지별로 사용함으로써 Snowflake에 데이터를 로딩하고 Snowflake에서 데이터를 언로딩 하는 데 사용할 수 있는 클래스와 메서드를 제공합니다.
참고
스테이지에서 이러한 클래스와 메서드를 사용하려면 스테이지 작업에 필요한 권한 이 있어야 합니다.
다음 섹션에서는 이러한 클래스와 메서드를 사용하는 방법을 설명합니다.
스테이지에서 파일 업로드 및 다운로드하기¶
스테이지에서 파일을 업로드 및 다운로드하려면 FileOperation 오브젝트를 사용하십시오.
스테이지에 파일 업로드하기¶
스테이지에 파일을 업로드하려면:
스테이지에 파일을 업로드할 수 있는 권한 이 있는지 확인하십시오.
Session.file 을 사용하여 세션의 FileOperation 오브젝트에 액세스합니다.
FileOperation.put 메서드를 호출하여 파일을 스테이지에 업로드합니다.
이 메서드는 SQL PUT 명령을 실행합니다.
PUT 명령에 대한 선택적 매개 변수 를 지정하려면 매개 변수 및 값의
Map
을 만들고Map
을options
인자로 전달하십시오. 예를 들면 다음과 같습니다.// Upload a file to a stage without compressing the file. val putOptions = Map("AUTO_COMPRESS" -> "FALSE") val putResults = session.file.put("file:///tmp/myfile.csv", "@myStage", putOptions)
localFilePath
인자에서 와일드카드(*
및?
)를 사용하여, 업로드할 파일 세트를 식별할 수 있습니다. 예를 들면 다음과 같습니다.// Upload the CSV files in /tmp with names that start with "file". // You can use the wildcard characters "*" and "?" to match multiple files. val putResults = session.file.put("file:///tmp/file*.csv", "@myStage/prefix2")
put
메서드에서 의해 반환된 PutResult 오브젝트의Array
를 확인하여 파일이 성공적으로 업로드되었는지 확인합니다. 예를 들어, 파일 이름과 해당 파일의 PUT 작업 상태를 출력하려면:// Print the filename and the status of the PUT operation. putResults.foreach(r => println(s" ${r.sourceFileName}: ${r.status}"))
스테이지에서 파일 다운로드하기¶
스테이지에서 파일을 다운로드하려면:
스테이지에서 파일을 다운로드할 수 있는 권한 이 있는지 확인하십시오.
Session.file 을 사용하여 세션의 FileOperation 오브젝트에 액세스합니다.
FileOperation.get 메서드를 호출하여 스테이지에서 파일을 다운로드합니다.
이 메서드는 SQL GET 명령을 실행합니다.
GET 명령에 대한 선택적 매개 변수 를 지정하려면 매개 변수 및 값의
Map
을 만들고Map
을options
인자로 전달하십시오. 예를 들면 다음과 같습니다.// Download files with names that match a regular expression pattern. val getOptions = Map("PATTERN" -> s"'.*file_.*.csv.gz'") val getResults = session.file.get("@myStage", "file:///tmp", getOptions)
get
메서드에서 의해 반환된 GetResult 오브젝트의Array
를 확인하여 파일이 성공적으로 다운로드되었는지 확인합니다. 예를 들어, 파일 이름과 해당 파일의 GET 작업 상태를 출력하려면:// Print the filename and the status of the GET operation. getResults.foreach(r => println(s" ${r.fileName}: ${r.status}"))
입력 스트림을 사용하여 스테이지에서 데이터를 업로드 및 다운로드하기¶
참고
이 기능은 Snowpark 1.4.0에서 도입되었습니다.
입력 스트림을 사용하여 스테이지의 파일로 데이터를 업로드하고 스테이지의 파일에서 데이터를 다운로드하려면 FileOperation 오브젝트의 uploadStream
및 downloadStream
메서드를 사용하십시오.
입력 스트림을 사용하여 스테이지의 파일로 데이터 업로드하기¶
java.io.InputStream 오브젝트의 데이터를 스테이지의 파일로 업로드하는 방법은 다음과 같습니다.
스테이지에 파일을 업로드할 수 있는 권한 이 있는지 확인하십시오.
Session.file 을 사용하여 세션의 FileOperation 오브젝트에 액세스합니다.
FileOperation.uploadStream 메서드를 호출합니다.
데이터를 작성해야 하는 스테이지의 파일과
InputStream
오브젝트에 대한 전체 경로를 전달합니다. 또한compress
인자를 사용하여 데이터를 압축한 후 업로드해야 할지 여부를 지정합니다.
예:
import java.io.InputStream
...
val compressData = true
val pathToFileOnStage = "@myStage/path/file"
session.file.uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData)
입력 스트림을 사용하여 스테이지의 파일에서 데이터 다운로드하기¶
스테이지의 파일에서 java.io.InputStream 오브젝트로 데이터를 다운로드하는 방법은 다음과 같습니다.
스테이지에서 파일을 다운로드할 수 있는 권한 이 있는지 확인하십시오.
Session.file 을 사용하여 세션의 FileOperation 오브젝트에 액세스합니다.
FileOperation.downloadStream 메서드를 호출합니다.
다운로드할 데이터가 포함된 스테이지의 파일에 대한 전체 경로를 전달합니다.
decompress
인자를 사용하여 파일의 데이터가 압축되어 있는지 여부를 지정합니다.
예:
import java.io.InputStream
...
val isDataCompressed = true
val pathToFileOnStage = "@myStage/path/file"
val is = session.file.downloadStream(pathToFileOnStage, isDataCompressed)
스테이지의 파일에 대한 DataFrame 설정하기¶
이 섹션에서는 Snowflake 스테이지에서 파일에 대한 DataFrame을 설정하는 방법을 설명합니다. 이 DataFrame을 만들면 DataFrame을 사용하여 다음을 수행할 수 있습니다.
Snowflake 스테이지에서 파일에 대해 DataFrame을 설정하려면 DataFrameReader
클래스를 사용합니다.
다음 권한이 있는지 확인하십시오.
다음 중 하나:
스테이징된 파일에서 데이터를 복사하는 방법을 결정하는 복사 옵션 을 지정하려는 경우 스키마에 대한 CREATE TABLE 권한.
그렇지 않은 경우 스키마에 대한 CREATE FILE FORMAT 권한.
Session
클래스의read
메서드를 호출하여DataFrameReader
오브젝트에 액세스합니다.파일이 CSV 형식인 경우, 파일의 필드를 설명합니다. 이를 수행하는 방법은 다음과 같습니다.
파일의 필드를 설명하는 StructField 오브젝트 시퀀스로 구성된 StructType 오브젝트를 만듭니다.
각
StructField
오브젝트에 대해 다음을 지정합니다.필드의 이름입니다.
필드의 데이터 타입(
com.snowflake.snowpark.types
패키지에서 오브젝트로 지정됨).필드가 null을 허용하는지 여부입니다.
예:
import com.snowflake.snowpark.types._ val schemaForDataFile = StructType( Seq( StructField("id", StringType, true), StructField("name", StringType, true)))
DataFrameReader
오브젝트에서schema
메서드를 호출하여StructType
오브젝트를 전달합니다.예:
var dfReader = session.read.schema(schemaForDataFile)
schema
메서드는 지정된 필드가 포함된 파일을 읽도록 구성된DataFrameReader
오브젝트를 반환합니다.다른 형식(JSON 등)의 파일에는 이 작업을 수행할 필요가 없습니다. 이러한 파일의 경우,
DataFrameReader
는 데이터를 필드 이름이$1
인 VARIANT 형식의 단일 필드로 취급합니다.
데이터를 어떻게 읽어야 하는지에 대한 추가 정보를 지정해야 하는 경우(예: 데이터가 압축되어 있거나 CSV 파일이 필드를 구분하기 위해 쉼표 대신 세미콜론을 사용하는 경우), DataFrameReader.option 메서드 또는 DataFrameReader.options 메서드를 호출합니다.
설정하려는 옵션의 이름과 값을 전달하십시오. 다음 유형의 옵션을 설정할 수 있습니다.
COPY INTO TABLE 설명서 에 설명된 복사 옵션.
복사 옵션을 설정하는 경우, DataFrame으로 데이터를 가져올 때 부담이 더 드는 실행 전략이 발생할 수 있습니다.
다음 예에서는 압축되어 있지 않고 필드 구분 기호에 세미콜론을 사용하는 CSV 파일의 데이터를 쿼리하도록
DataFrameReader
오브젝트를 설정합니다.dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
option
메서드는 지정된 옵션으로 구성된DataFrameReader
오브젝트를 반환합니다.여러 옵션을 설정하려면 (위의 예에 나타낸 것처럼)
option
메서드에 호출을 체인 연결 하거나 DataFrameReader.options 메서드를 호출하여 옵션의 이름과 값의Map
을 전달할 수 있습니다.파일의 형식에 상응하는 메서드를 호출합니다. 다음 메서드 중 하나를 호출할 수 있습니다.
이러한 메서드를 호출할 때 읽을 파일의 스테이지 위치를 전달하십시오. 예를 들면 다음과 같습니다.
val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
똑같은 접두사로 시작하는 파일을 여러 개 지정하려면 스테이지 이름 뒤에 접두사를 지정하십시오. 예를 들어, 스테이지
@mystage
에서 접두사가csv_
인 파일을 로드하는 방법은 다음과 같습니다.val df = dfReader.csv("@mystage/csv_")
파일 형식에 해당하는 메서드는 해당 파일에 대한 CopyableDataFrame 오브젝트를 반환합니다.
CopyableDataFrame
은DataFrame
을 확장하고, 스테이징된 파일의 데이터 작업을 위한 추가 메서드를 제공합니다.동작 메서드를 호출하여 다음을 수행합니다.
테이블용 DataFrame의 경우와 마찬가지로, 사용자가 동작 메서드 를 호출할 때까지 DataFrame으로 데이터를 가져오지 않습니다.
파일에서 DataFrame으로 데이터 로딩¶
스테이지에서 파일에 대한 DataFrame을 설정 한 후 파일에서 DataFrame으로 데이터를 로딩할 수 있습니다.
DataFrame 오브젝트 메서드를 사용하여 데이터 세트에 필요한 모든 변환(예: 특정 필드 선택, 행 필터링 등)을 수행 합니다.
예를 들어,
mystage
라는 스테이지의data.json
이라는 JSON 파일에서color
요소를 추출하려면 다음을 수행합니다.val df = session.read.json("@mystage/data.json").select(col("$1")("color"))
앞에서 설명했듯이 CSV 이외의 형식(예: JSON)으로 된 파일에 대해
DataFrameReader
는 파일의 데이터를 이름이$1
인 단일 VARIANT 열로 처리합니다.DataFrame.collect
메서드를 호출하여 데이터를 로딩합니다. 예를 들면 다음과 같습니다.val results = df.collect()
파일에서 테이블로 데이터 복사하기¶
스테이지에서 파일에 대해 DataFrame을 설정 한 후 CopyableDataFrame.copyInto 메서드를 호출하여 데이터를 테이블에 복사할 수 있습니다. 이 메서드는 COPY INTO <테이블> 명령을 실행합니다.
참고
copyInto
호출 전에 collect
메서드를 호출할 필요가 없습니다. 파일의 데이터는 copyInto
호출 전에 DataFrame에 있을 필요가 없습니다.
예를 들어 다음 코드는 myFileStage
에 의해 지정된 CSV 파일의 데이터를 mytable
테이블로 로딩합니다. 데이터가 CSV 파일에 있기 때문에 코드는 파일의 필드도 설명 해야 합니다. 이 예에서는 DataFrameReader.schema 메서드를 호출하고, 필드를 설명하는 일련의 StructField 오브젝트가 포함된 StructType 오브젝트(csvFileSchema
)를 전달하여 이를 수행합니다.
val df = session.read.schema(csvFileSchema).csv(myFileStage)
df.copyInto("mytable")
스테이지의 파일에 DataFrame 저장하기¶
참고
이 기능은 Snowpark 1.5.0에서 도입되었습니다.
스테이지의 파일에 DataFrame을 저장해야 하는 경우 파일의 형식에 상응하는 DataFrameWriter 메서드(예: CSV 파일에 쓰기 위한 csv
메서드)를 호출하여 파일을 저장해야 하는 스테이지 위치를 전달할 수 있습니다. 이러한 DataFrameWriter
메서드는 COPY INTO <위치> 명령을 실행합니다.
참고
DataFrameWriter
호출 전에 collect
메서드를 호출할 필요가 없습니다. 파일의 데이터는 이들 메서드를 호출하기 전에 DataFrame에 있을 필요가 없습니다.
DataFrame의 내용을 스테이지의 파일에 저장하는 방법은 다음과 같습니다.
DataFrame.write 메서드를 호출하여 DataFrameWriter 오브젝트를 가져옵니다. 예를 들어,
sample_product_data
라는 테이블을 나타내는 DataFrame에 대한DataFrameWriter
오브젝트를 가져오는 방법은 다음과 같습니다.dfWriter = session.table("sample_product_data").write
(파일이 있는 경우) 파일의 내용을 덮어쓰려면 DataFrameWriter.mode 메서드를 호출하여
SaveMode.Overwrite
를 전달합니다.그렇지 않으면 기본적으로
DataFrameWriter
는 스테이지에 지정된 파일이 이미 있는 경우에 오류를 보고합니다.mode
메서드는 지정된 모드로 구성된 동일한DataFrameWriter
오브젝트를 반환합니다.예를 들어,
DataFrameWriter
가 스테이지의 파일을 덮어쓰도록 지정하는 방법은 다음과 같습니다.dfWriter = dfWriter.mode(SaveMode.Overwrite)
데이터를 어떻게 저장해야 하는지에 대한 추가 정보를 지정해야 하는 경우(예: 데이터를 압축해야 하거나 CSV 파일의 필드를 구분하기 위해 세미콜론을 사용하도록 지정), DataFrameWriter.option 메서드 또는 DataFrameWriter.options 메서드를 호출합니다.
설정하려는 옵션의 이름과 값을 전달하십시오. 다음 유형의 옵션을 설정할 수 있습니다.
COPY INTO <위치>에 대한 설명서 에 설명된 파일 형식 옵션.
COPY INTO <위치>에 대한 설명서에 설명된 복사 옵션.
option
메서드를 사용하여 다음 옵션을 설정할 수는 없습니다.TYPE 형식 타입 옵션.
OVERWRITE 복사 옵션. (이전 단계에서 언급한 대로) 이 옵션을 설정하려면
mode
메서드를 대신 호출하십시오.
다음 예제에서는 (쉼표 대신) 세미콜론을 필드 구분 기호로 사용하여 데이터를 CSV 파일에 압축되지 않은 형식으로 저장하는
DataFrameWriter
오브젝트를 설정합니다.dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE")
option
메서드는 지정된 옵션으로 구성된DataFrameWriter
오브젝트를 반환합니다.여러 옵션을 설정하려면 (위의 예에 나타낸 것처럼)
option
메서드에 호출을 체인 연결 하거나 DataFrameWriter.options 메서드를 호출하여 옵션의 이름과 값의Map
을 전달할 수 있습니다.저장된 각 파일에 대한 세부 정보를 반환하려면
DETAILED_OUTPUT
복사 옵션 을TRUE
로 설정합니다.기본적으로,
DETAILED_OUTPUT
은FALSE
이며, 이는 곧 이 메서드가"rows_unloaded"
,"input_bytes"
,"output_bytes"
필드를 포함하는 단일 출력 행을 반환한다는 뜻입니다.DETAILED_OUTPUT
을TRUE
로 설정하면 이 메서드가 저장된 각 파일에 대한 출력 행을 반환합니다. 각 행에는FILE_NAME
,FILE_SIZE
,ROW_COUNT
필드가 포함됩니다.파일의 형식에 상응하는 메서드를 호출하여 데이터를 파일에 저장합니다. 다음 메서드 중 하나를 호출할 수 있습니다.
이러한 메서드를 호출할 때 데이터를 기록해야 하는 파일의 스테이지 위치(예:
@mystage
)를 전달합니다.기본적으로, 이 메서드는 데이터를 접두사가
data_
인 파일 이름으로 저장합니다(예:@mystage/data_0_0_0.csv
). 파일 이름을 다른 접두사가 있는 이름으로 지정하려면 스테이지 이름 뒤에 접두사를 지정하십시오. 예를 들면 다음과 같습니다.val writeFileResult = dfWriter.csv("@mystage/saved_data")
이 예에서는 DataFrame의 내용을 접두사
saved_data
로 시작하는 파일(예:@mystage/saved_data_0_0_0.csv
)에 저장합니다.파일에 기록된 데이터의 양에 대한 정보는 반환된 WriteFileResult 오브젝트를 확인하십시오.
WriteFileResult
오브젝트에서 COPY INTO <위치> 명령으로 생성된 출력에 액세스할 수 있습니다.Row 오브젝트로 구성된 배열로서 출력 행에 액세스하려면
rows
값 멤버를 사용하십시오.행에 어떤 필드가 있는지 확인하려면 그 행에 있는 필드를 설명하는 StructType 인
schema
값 멤버를 사용하십시오.
예를 들어, 필드 이름과 출력 행의 값을 인쇄 출력하는 방법은 다음과 같습니다.
val writeFileResult = dfWriter.csv("@mystage/saved_data") for ((row, index) <- writeFileResult.rows.zipWithIndex) { (writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach { (structField, element) => println(s"${structField.name}: $element") } }
다음 예에서는 DataFrame을 사용하여 car_sales
라는 테이블의 내용을 스테이지 @mystage
에서 접두사가 saved_data
인 JSON 파일에 저장합니다(예: @mystage/saved_data_0_0_0.json
). 다음은 샘플 코드입니다.
파일이 스테이지에 이미 있는 경우 파일을 덮어씁니다.
저장 작업에 대한 상세 출력을 반환합니다.
압축되지 않은 데이터를 저장합니다.
마지막으로, 이 샘플 코드를 실행하면 반환된 출력 행의 각 필드와 값이 인쇄됩니다.
val df = session.table("car_sales")
val writeFileResult = df.write.mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data")
for ((row, index) <- writeFileResult.rows.zipWithIndex) {
println(s"Row: $index")
(writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach {
(structField, element) => println(s"${structField.name}: $element")
}
}
반정형 데이터로 작업하기¶
DataFrame을 사용하여 반정형 데이터 (예: JSON 데이터)를 쿼리하고 액세스할 수 있습니다. 다음 섹션에서는 DataFrame에서 반정형 데이터로 작업하는 방법을 설명합니다.
참고
이 섹션의 예에서는 예에서 사용된 샘플 데이터 의 샘플 데이터를 사용합니다.
반정형 데이터 탐색하기¶
반정형 데이터의 특정 필드 또는 요소를 참조하려면 열 오브젝트의 다음 메서드를 사용하십시오.
Column.apply(“<field_name>”) 를 사용하여, OBJECT (또는 OBJECT를 포함하는 VARIANT)의 필드에 대한
Column
오브젝트를 반환합니다.Column.apply(<index>) 를 사용하여, ARRAY (또는 ARRAY를 포함하는 VARIANT)의 요소에 대한
Column
오브젝트를 반환합니다.
참고
경로의 필드 이름이나 요소가 불규칙하여 Column.apply
메서드를 사용하기 어려운 경우 get, get_ignore_case 또는 get_path 함수를 대안으로 사용할 수 있습니다.
apply 메서드를 사용하여 열 참조하기 에서 언급했듯이 메서드 이름 apply
를 생략할 수 있습니다.
col("column_name")("field_name")
col("column_name")(index)
예를 들어 다음 코드는 샘플 데이터 의 src
열에 있는 오브젝트의 dealership
필드를 선택합니다.
val df = session.table("car_sales")
df.select(col("src")("dealership")).show()
코드는 다음 출력을 출력합니다.
----------------------------
|"""SRC""['DEALERSHIP']" |
----------------------------
|"Valley View Auto Sales" |
|"Tindel Toyota" |
----------------------------
참고
DataFrame의 값은 문자열 리터럴로 반환되기 때문에 큰따옴표로 묶입니다. 이러한 값을 특정 타입으로 캐스팅하려면 반정형 데이터에서 명시적으로 값 캐스팅하기 섹션을 참조하십시오.
또한 메서드 호출을 연결 하여 특정 필드나 요소에 대한 경로를 탐색할 수 있습니다.
예를 들어 다음 코드는 salesperson
오브젝트의 name
필드를 선택합니다.
val df = session.table("car_sales")
df.select(col("src")("salesperson")("name")).show()
코드는 다음 출력을 출력합니다.
------------------------------------
|"""SRC""['SALESPERSON']['NAME']" |
------------------------------------
|"Frank Beasley" |
|"Greg Northrup" |
------------------------------------
다른 예로, 다음 코드는 차량 배열을 포함하는 vehicle
필드의 첫 번째 요소를 선택합니다. 이 예에서는 첫 번째 요소의 price
필드도 선택합니다.
val df = session.table("car_sales")
df.select(col("src")("vehicle")(0)).show()
df.select(col("src")("vehicle")(0)("price")).show()
코드는 다음 출력을 출력합니다.
---------------------------
|"""SRC""['VEHICLE'][0]" |
---------------------------
|{ |
| "extras": [ |
| "ext warranty", |
| "paint protection" |
| ], |
| "make": "Honda", |
| "model": "Civic", |
| "price": "20275", |
| "year": "2017" |
|} |
|{ |
| "extras": [ |
| "ext warranty", |
| "rust proofing", |
| "fabric protection" |
| ], |
| "make": "Toyota", |
| "model": "Camry", |
| "price": "23500", |
| "year": "2017" |
|} |
---------------------------
------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']" |
------------------------------------
|"20275" |
|"23500" |
------------------------------------
apply
메서드의 대안으로, 경로의 필드 이름이나 요소가 불규칙하여 Column.apply
메서드를 사용하기 어려운 경우 get, get_ignore_case 또는 get_path 함수를 사용할 수 있습니다.
예를 들어 다음 코드 줄은 모두 오브젝트의 지정된 필드 값을 출력합니다.
df.select(get(col("src"), lit("dealership"))).show()
df.select(col("src")("dealership")).show()
마찬가지로, 다음 코드 줄은 모두 오브젝트의 지정된 경로에 있는 필드 값을 출력합니다.
df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
df.select(col("src")("vehicle")(0)("make")).show()
반정형 데이터에서 명시적으로 값 캐스팅하기¶
기본적으로 필드 및 요소의 값은 위의 예와 같이 문자열 리터럴(큰따옴표 포함)로 반환됩니다.
예기치 않은 결과를 방지하려면 cast 메서드를 호출하여 값을 특정 타입으로 캐스팅합니다. 예를 들어, 다음 코드는 캐스팅이 없는 값과 있는 값을 출력합니다.
// Import the objects for the data types, including StringType.
import com.snowflake.snowpark.types._
...
val df = session.table("car_sales")
df.select(col("src")("salesperson")("id")).show()
df.select(col("src")("salesperson")("id").cast(StringType)).show()
코드는 다음 출력을 출력합니다.
----------------------------------
|"""SRC""['SALESPERSON']['ID']" |
----------------------------------
|"55" |
|"274" |
----------------------------------
---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)" |
---------------------------------------------------
|55 |
|274 |
---------------------------------------------------
오브젝트 배열을 행으로 평면화하기¶
반정형 데이터를 DataFrame으로 “평면화”해야 하는 경우(예: 배열의 모든 오브젝트에 대한 행 생성) DataFrame.flatten 을 호출합니다. 이 메서드는 FLATTEN SQL 함수와 동일합니다. 오브젝트 또는 배열에 대한 경로를 전달하는 경우 메서드는 오브젝트 또는 배열의 각 필드 또는 요소에 대한 행을 포함하는 DataFrame을 반환합니다.
예를 들어, 샘플 데이터 에서 src:customer
는 고객에 대한 정보를 포함하는 오브젝트의 배열입니다. 각 오브젝트는 name
및 address
필드를 포함합니다.
이 경로를 flatten
함수에 전달하는 경우:
val df = session.table("car_sales")
df.flatten(col("src")("customer")).show()
메서드는 DataFrame을 반환합니다.
----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC" |"SEQ" |"KEY" |"PATH" |"INDEX" |"VALUE" |"THIS" |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{ |1 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "San Francisco, CA", | { |
| { | | | | | "name": "Joyce Ridgely", | "address": "San Francisco, CA", |
| "address": "San Francisco, CA", | | | | | "phone": "16504378889" | "name": "Joyce Ridgely", |
| "name": "Joyce Ridgely", | | | | |} | "phone": "16504378889" |
| "phone": "16504378889" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Valley View Auto Sales", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "55", | | | | | | |
| "name": "Frank Beasley" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "paint protection" | | | | | | |
| ], | | | | | | |
| "make": "Honda", | | | | | | |
| "model": "Civic", | | | | | | |
| "price": "20275", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
|{ |2 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "New York, NY", | { |
| { | | | | | "name": "Bradley Greenbloom", | "address": "New York, NY", |
| "address": "New York, NY", | | | | | "phone": "12127593751" | "name": "Bradley Greenbloom", |
| "name": "Bradley Greenbloom", | | | | |} | "phone": "12127593751" |
| "phone": "12127593751" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Tindel Toyota", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "274", | | | | | | |
| "name": "Greg Northrup" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "rust proofing", | | | | | | |
| "fabric protection" | | | | | | |
| ], | | | | | | |
| "make": "Toyota", | | | | | | |
| "model": "Camry", | | | | | | |
| "price": "23500", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
----------------------------------------------------------------------------------------------------------------------------------------------------------
이 DataFrame에서 사용자는 VALUE
필드의 각 오브젝트에서 name
및 address
필드를 선택할 수 있습니다.
df.flatten(col("src")("customer")).select(col("value")("name"), col("value")("address")).show()
-------------------------------------------------
|"""VALUE""['NAME']" |"""VALUE""['ADDRESS']" |
-------------------------------------------------
|"Joyce Ridgely" |"San Francisco, CA" |
|"Bradley Greenbloom" |"New York, NY" |
-------------------------------------------------
다음 코드는 값을 특정 타입으로 캐스팅 하고 열 이름을 변경하여 이전 예에 추가합니다.
df.flatten(col("src")("customer")).select(col("value")("name").cast(StringType).as("Customer Name"), col("value")("address").cast(StringType).as("Customer Address")).show()
-------------------------------------------
|"Customer Name" |"Customer Address" |
-------------------------------------------
|Joyce Ridgely |San Francisco, CA |
|Bradley Greenbloom |New York, NY |
-------------------------------------------
SQL 문 실행하기¶
지정한 SQL 문을 실행하려면 Session
클래스에서 sql
메서드를 호출하고, 실행할 문을 전달하십시오. 이 메서드는 DataFrame을 반환합니다.
사용자가 동작 메서드를 호출 할 때까지 SQL 문은 실행되지 않습니다.
// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val dfStageFiles = session.sql("ls @myStage")
val files = dfStageFiles.collect()
files.foreach(println)
// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()
val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()
println("Count: " + numRows);
DataFrame을 변환하는 메서드 (예: filter, select 등)를 호출하려는 경우, 이러한 메서드는 기본 SQL 문이 SELECT 문인 경우에만 작동합니다. 다른 종류의 SQL 문에는 변환 메서드가 지원되지 않습니다.
val df = session.sql("select id, category_id, name from sample_product_data where id > 10")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("category_id") < 10).select(col("id")).collect()
results.foreach(println)
// In this example, the underlying SQL statement is not a SELECT statement.
val dfStageFiles = session.sql("ls @myStage")
// Calling the filter method results in an error.
dfStageFiles.filter(...)