Create an Apache Iceberg™ table in Snowflake from Parquet data files¶
Note
Snowflake also supports loading data from Apache Parquet files directly into a Snowflake-managed Iceberg table. This option is generally available. For more information, see Load data into Apache Iceberg™ tables and Example: Load Iceberg-compatible Parquet files.
This topic covers how to create a read-only Apache Iceberg™ table for Snowflake from Apache Parquet files that you manage in object storage. With this option, you can create an Iceberg table without supplying Iceberg metadata or using an external Iceberg catalog.
Partitioned Iceberg tables¶
To improve query performance, we strongly recommend that you partition Iceberg tables created from Parquet source files by using partition columns. Query response time is faster when Snowflake processes only a small part of the data instead of having to scan the entire data set. An Iceberg table definition can include multiple partition columns, which impose a multi-dimensional structure on the external data.
To partition a table, your data must be organized using logical paths.
When you create an Iceberg table, you define partition columns as expressions that parse the path or filename information stored in the METADATA$FILENAME pseudo-column. A partition consists of all data files that match the path and/or filename in the expression for the partition column.
Snowflake computes and adds partitions based on the defined partition column expressions when an Iceberg table is refreshed. For an example of creating a partitioned Iceberg table, see Example: Create an Iceberg table from Parquet files, specifying a partition column.
Workflow¶
Use the workflow in this section to create an Iceberg table from Parquet source files.
Note
If you store your Parquet files in Amazon S3, you can create an Iceberg table that supports automatically refreshing the table data. To learn more, see Refresh an Iceberg table automatically for Amazon S3.
Step 1: Create an external volume¶
To create an external volume, complete the instructions for your cloud storage service:
Step 2: Create a catalog integration¶
Create a catalog integration by using the CREATE CATALOG INTEGRATION command.
To indicate that the catalog integration is for Iceberg tables created from Parquet source files, set the CATALOG_SOURCE
parameter equal to OBJECT_STORE
and the TABLE_FORMAT
parameter equal to NONE
.
Note
Snowflake does not support creating Iceberg tables from Parquet-based table definitions in the AWS Glue Data Catalog.
The following example creates a catalog integration for Parquet files in object storage.
CREATE OR REPLACE CATALOG INTEGRATION icebergCatalogInt
CATALOG_SOURCE = OBJECT_STORE
TABLE_FORMAT = NONE
ENABLED=TRUE;
Step 3: Create an Iceberg table¶
Create an Iceberg table by using the CREATE ICEBERG TABLE command.
Example: Create an Iceberg table from Parquet files, specifying data columns
Example: Create an Iceberg table from Parquet files, specifying a partition column
Example: Create an Iceberg table from Parquet files using automatic schema inference
Syntax¶
CREATE [ OR REPLACE ] ICEBERG TABLE [ IF NOT EXISTS ] <table_name>
[
--Data column definition
<col_name> <col_type>
[ COLLATE '<collation_specification>' ]
[ [ WITH ] MASKING POLICY <policy_name> [ USING ( <col_name> , <cond_col1> , ... ) ] ]
[ [ WITH ] TAG ( <tag_name> = '<tag_value>' [ , <tag_name> = '<tag_value>' , ... ] ) ]
[ COMMENT '<string_literal>' ]
-- In-line constraint
[ inlineConstraint ]
-- Additional column definitions (data, virtual, or partition columns)
[ , <col_name> <col_type> ...
-- Virtual column definition
| <col_name> <col_type> AS <expr>
-- Partition column definition
| <part_col_name> <col_type> AS <part_expr>
-- In-line constraint
[ inlineConstraint ]
[ , ... ]
]
-- Out-of-line constraints
[ , outoflineConstraint [ ... ] ]
]
[ PARTITION BY ( <part_col_name> [, <part_col_name> ... ] ) ]
[ EXTERNAL_VOLUME = '<external_volume_name>' ]
[ CATALOG = <catalog_integration_name> ]
BASE_LOCATION = '<relative_path_from_external_volume>'
[ INFER_SCHEMA = { TRUE | FALSE } ]
[ AUTO_REFRESH = { TRUE | FALSE } ]
[ REPLACE_INVALID_CHARACTERS = { TRUE | FALSE } ]
[ [ WITH ] ROW ACCESS POLICY <policy_name> ON ( <col_name> [ , <col_name> ... ] ) ]
[ [ WITH ] TAG ( <tag_name> = '<tag_value>' [ , <tag_name> = '<tag_value>' , ... ] ) ]
[ COMMENT = '<string_literal>' ]
Where:
inlineConstraint ::= [ CONSTRAINT <constraint_name> ] { UNIQUE | PRIMARY KEY | [ FOREIGN KEY ] REFERENCES <ref_table_name> [ ( <ref_col_name> ) ] } [ <constraint_properties> ]For additional inline constraint details, see CREATE | ALTER TABLE … CONSTRAINT.
outoflineConstraint ::= [ CONSTRAINT <constraint_name> ] { UNIQUE [ ( <col_name> [ , <col_name> , ... ] ) ] | PRIMARY KEY [ ( <col_name> [ , <col_name> , ... ] ) ] | [ FOREIGN KEY ] [ ( <col_name> [ , <col_name> , ... ] ) ] REFERENCES <ref_table_name> [ ( <ref_col_name> [ , <ref_col_name> , ... ] ) ] } [ <constraint_properties> ]For additional out-of-line constraint details, see CREATE | ALTER TABLE … CONSTRAINT.
Required parameters¶
table_name
Specifies the identifier (name) for the table; must be unique for the schema in which the table is created.
In addition, the identifier must start with an alphabetic character and cannot contain spaces or special characters unless the entire identifier string is enclosed in double quotes (for example,
"My object"
). Identifiers enclosed in double quotes are also case-sensitive.For more details, see Identifier requirements.
BASE_LOCATION = 'relative_path_from_external_volume'
Specifies a relative path from the table’s
EXTERNAL_VOLUME
location to a directory where Snowflake can access your Parquet files and write table metadata. The base location must point to a directory and cannot point to a single Parquet file.
Optional parameters¶
col_name
Specifies the column identifier (name). All the requirements for table identifiers also apply to column identifiers.
For more details, see Identifier requirements and Reserved & limited keywords.
Note
In addition to the standard reserved keywords, the following keywords cannot be used as column identifiers because they are reserved for ANSI-standard context functions:
CURRENT_DATE
CURRENT_ROLE
CURRENT_TIME
CURRENT_TIMESTAMP
CURRENT_USER
For the list of reserved keywords, see Reserved & limited keywords.
col_type
Specifies the data type for the column.
For details about the data types that can be specified for table columns, see Data type mapping and SQL data types reference.
expr
String that specifies the expression for the column. When queried, the column returns results derived from this expression.
A column can be a virtual column, which is defined using an explicit expression.
- METADATA$FILENAME:
A pseudo-column that identifies the name of each Parquet data file included in the table, relative to its path on the external volume.
For example:
If the external volume location is
s3://bucket-name/data/warehouse/
and theBASE_LOCATION
of the table isdefault_db/schema_name/table_name/
, the absolute location of the Parquet file iss3://bucket-name/data/warehouse/default_db/schema_name/table_name/ds=2023-01-01/file1.parquet
.As a result, the METADATA$FILENAME for this file is
default_db/schema_name/table_name/ds=2023-01-01/file1.parquet
.
CONSTRAINT ...
Defines an inline or out-of-line constraint for the specified column(s) in the table.
For syntax details, see CREATE | ALTER TABLE … CONSTRAINT. For more information about constraints, see Constraints.
COLLATE 'collation_specification'
Specifies the collation to use for column operations such as string comparison. This option applies only to text columns (VARCHAR, STRING, TEXT, etc.). For more details, see Collation specifications.
MASKING POLICY = policy_name
Specifies the masking policy to set on a column.
EXTERNAL_VOLUME = 'external_volume_name'
Specifies the identifier (name) for the external volume where Snowflake can access your Parquet data files.
You must specify an external volume if you have not set one at the database or schema level. Otherwise, the Iceberg table defaults to the external volume for the schema, database, or account. The schema takes precedence over the database, and the database takes precedence over the account.
CATALOG = 'catalog_integration_name'
Specifies the identifier (name) of the catalog integration for this table.
You must specify a catalog integration if you have not set one at the database or schema level. Otherwise, the Iceberg table defaults to the catalog integration for the schema, database, or account. The schema takes precedence over the database, and the database takes precedence over the account.
INFER_SCHEMA = '{ TRUE | FALSE }'
Specifies whether to automatically detect the table schema (based on the fields in the Parquet files) in order to retrieve column definitions and partition values.
TRUE
: Snowflake detects the table schema to retrieve column definitions and detect partition values. With this option, Snowflake automatically creates virtual columns for partition values in the Parquet file path.If you refresh the table, Snowflake creates newly identified columns as visible table columns.
FALSE
: Snowflake does not detect the table schema. You must include column definitions in your CREATE ICEBERG TABLE statement.
Default: TRUE if you don’t provide a column definition; otherwise, FALSE.
AUTO_REFRESH = '{ TRUE | FALSE }'
Specifies whether the table data will be automatically refreshed. This parameter is required only when you create an Iceberg table from Parquet files that supports automatic refreshes. For more information, see Refresh an Iceberg table automatically for Amazon S3.
Note
Using AUTO_REFRESH with INFER_SCHEMA isn’t supported.
REPLACE_INVALID_CHARACTERS = { TRUE | FALSE }
Specifies whether to replace invalid UTF-8 characters with the Unicode replacement character (�) in query results. You can only set this parameter for tables that use an external Iceberg catalog.
TRUE
replaces invalid UTF-8 characters with the Unicode replacement character.FALSE
leaves invalid UTF-8 characters unchanged. Snowflake returns a user error message when it encounters invalid UTF-8 characters in a Parquet data file.
If not specified, the Iceberg table defaults to the parameter value for the schema, database, or account. The schema takes precedence over the database, and the database takes precedence over the account.
Default:
FALSE
ROW ACCESS POLICY policy_name ON ( col_name [ , col_name ... ] )
Specifies the row access policy to set on a table.
TAG ( tag_name = 'tag_value' [ , tag_name = 'tag_value' , ... ] )
Specifies the tag name and the tag string value.
The tag value is always a string, and the maximum number of characters for the tag value is 256.
For information about specifying tags in a statement, see Tag quotas for objects and columns.
COMMENT 'string_literal'
Specifies a comment for the column or the table.
Comments can be specified at the column level or the table level. The syntax for each is slightly different.
Partitioning parameters¶
Use these parameters to partition your Iceberg table.
part_col_name col_type AS part_expr
Defines one or more partition columns in the Iceberg table.
A partition column must evaluate as an expression that parses the path and/or filename information in the METADATA$FILENAME pseudo-column. A partition consists of all data files that match the path and/or filename in the expression for the partition column.
part_col_name
String that specifies the partition column identifier (i.e. name). All the requirements for table identifiers also apply to column identifiers.
col_type
String (constant) that specifies the data type for the column. The data type must match the result of
part_expr
for the column.part_expr
String that specifies the expression for the column. The expression must include the METADATA$FILENAME pseudocolumn.
Iceberg tables currently support the following subset of functions in partition expressions:
List of supported functions:
=
,<>
,>
,>=
,<
,<=
||
+
,-
-
(negate)*
AND
,OR
NOT
[ PARTITION BY ( part_col_name [, part_col_name ... ] ) ]
Specifies any partition columns to evaluate for the Iceberg table.
- Usage:
When querying an Iceberg table, include one or more partition columns in a WHERE clause, for example:
... WHERE part_col_name = 'filter_value'
A common practice is to partition the data files based on increments of time; or, if the data files are staged from multiple sources, to partition by a data source identifier and date or timestamp.
Example: Create an Iceberg table from Parquet files, specifying data columns¶
The following example creates an Iceberg table from Parquet files in object storage.
The example specifies the external volume and catalog integration created previously in this workflow,
and provides a value for the required BASE_LOCATION
parameter.
CREATE ICEBERG TABLE myTable (
first_name STRING,
last_name STRING,
amount NUMBER,
create_date DATE
)
CATALOG = icebergCatalogInt
EXTERNAL_VOLUME = myIcebergVolume
BASE_LOCATION='relative_path_from_external_volume/';
Example: Create an Iceberg table from Parquet files, specifying a partition column¶
The following example creates an Iceberg table from Parquet files in object storage and defines a partition column
named sr_returned_date_sk
.
CREATE OR REPLACE ICEBERG TABLE store_returns (
sr_returned_date_sk integer AS
IFF(
regexp_substr(METADATA$FILENAME, 'sr_returned_date_sk=(.*)/', 1, 1, 'e') = '__HIVE_DEFAULT_PARTITION__',
null,
TO_NUMBER(
regexp_substr(METADATA$FILENAME, 'sr_returned_date_sk=(.*)/', 1, 1, 'e')
)
),
sr_return_time_sk integer ,
sr_item_sk integer ,
sr_customer_sk integer ,
sr_cdemo_sk integer ,
sr_hdemo_sk integer ,
sr_addr_sk integer ,
sr_store_sk integer ,
sr_reason_sk integer ,
sr_ticket_number bigint ,
sr_return_quantity integer ,
sr_return_amt decimal(7,2) ,
sr_return_tax decimal(7,2) ,
sr_return_amt_inc_tax decimal(7,2) ,
sr_fee decimal(7,2) ,
sr_return_ship_cost decimal(7,2) ,
sr_refunded_cash decimal(7,2) ,
sr_reversed_charge decimal(7,2) ,
sr_store_credit decimal(7,2) ,
sr_net_loss decimal(7,2)
)
PARTITION BY (sr_returned_date_sk)
EXTERNAL_VOLUME = 'exvol'
CATALOG = 'catint'
BASE_LOCATION = 'store_returns/';
Example: Create an Iceberg table from Parquet files using automatic schema inference¶
The following example creates an Iceberg table from Parquet files using automatic schema inference without including a column definition.
CREATE OR REPLACE ICEBERG TABLE auto_schema_table
EXTERNAL_VOLUME = 'exvol'
CATALOG = 'catint'
BASE_LOCATION = 'auto_schema_table/';
Alternatively, you can include a column definition to provide information about certain columns.
Snowflake uses the definition to create those columns, then automatically detects other table columns.
In this scenario, you must specify INFER_SCHEMA = TRUE
since you include a column definition.
CREATE OR REPLACE ICEBERG TABLE auto_schema_table_col_spec (col1 INT)
EXTERNAL_VOLUME = 'exvol'
CATALOG = 'catint'
BASE_LOCATION = 'auto_schema_table_col_spec/'
INFER_SCHEMA = TRUE;
Refresh the table¶
After you create an Iceberg table from Parquet files, you can refresh the table data using the ALTER ICEBERG TABLE command. Refreshing synchronizes the table with the most recent changes to your Parquet files in object storage.
ALTER ICEBERG TABLE [ IF EXISTS ] <table_name> REFRESH ['<relative_path>']
Where:
relative_path
Optional path to a Parquet file or a directory of Parquet files that you want to refresh.
Note
If you specify a relative path that does not exist, the table refresh proceeds as if no relative path was specified.
Example: Refresh all of the files in a table’s BASE_LOCATION¶
To manually refresh all of the files in the table’s BASE_LOCATION
, omit the relative path argument:
ALTER ICEBERG TABLE myIcebergTable REFRESH;
Example: Refresh the files in a subpath from the BASE_LOCATION¶
To manually refresh a set of Parquet files in a directory, specify a relative path to that directory from the table’s BASE_LOCATION
:
ALTER ICEBERG TABLE myIcebergTable REFRESH '/relative/path/to/myParquetDataFiles';
Example: Refresh a particular file¶
To manually refresh a particular Parquet file, specify a relative path to that file from the BASE_LOCATION
:
ALTER ICEBERG TABLE myIcebergTable REFRESH '/relative/path/to/myParquetFile.parquet';
Refresh an Iceberg table automatically for Amazon S3¶
If you manage your Parquet source files in Amazon S3, you can create an Iceberg table that uses Amazon SNS (Simple Notification Service) for automatic refresh.
This section provides instructions for creating an Iceberg table that automatically refreshes the Parquet source files.
Prerequisite: Create an Amazon SNS topic and subscription¶
Create an SNS topic in your AWS account to handle all messages for the Snowflake external volume location on your S3 bucket.
Subscribe your target destinations for the S3 event notifications (for example, other SQS queues or AWS Lambda workloads) to this topic. SNS publishes event notifications for your bucket to all subscribers to the topic.
For full instructions, see the SNS documentation.
Step 1: Subscribe the Snowflake SQS queue to your SNS topic¶
Log in to the AWS Management Console.
From the home dashboard, select Simple Notification Service (SNS).
In the left-hand navigation pane, select Topics.
Locate the topic for your S3 bucket. Note the topic ARN.
Using a Snowflake client, query the SYSTEM$GET_AWS_SNS_IAM_POLICY system function with your SNS topic ARN:
select system$get_aws_sns_iam_policy('<sns_topic_arn>');
The function returns an IAM policy that grants a Snowflake SQS queue permission to subscribe to the SNS topic.
Return to the AWS Management console. In the left-hand navigation pane, select Topics.
Select the topic for your S3 bucket, then select Edit. The Edit page opens.
Select Access policy - Optional to expand this area of the page.
Merge the IAM policy addition from the SYSTEM$GET_AWS_SNS_IAM_POLICY function results into the JSON document.
To allow S3 to publish event notifications for the bucket to the SNS topic, add an additional policy grant.
For example:
{ "Sid":"s3-event-notifier", "Effect":"Allow", "Principal":{ "Service":"s3.amazonaws.com" }, "Action":"SNS:Publish", "Resource":"arn:aws:sns:us-west-2:001234567890:s3_mybucket", "Condition":{ "ArnLike":{ "aws:SourceArn":"arn:aws:s3:*:*:s3_mybucket" } } }
Merged IAM policy:
{ "Version":"2008-10-17", "Id":"__default_policy_ID", "Statement":[ { "Sid":"__default_statement_ID", "Effect":"Allow", "Principal":{ "AWS":"*" } .. }, { "Sid":"1", "Effect":"Allow", "Principal":{ "AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234" }, "Action":[ "sns:Subscribe" ], "Resource":[ "arn:aws:sns:us-west-2:001234567890:s3_mybucket" ] }, { "Sid":"s3-event-notifier", "Effect":"Allow", "Principal":{ "Service":"s3.amazonaws.com" }, "Action":"SNS:Publish", "Resource":"arn:aws:sns:us-west-2:001234567890:s3_mybucket", "Condition":{ "ArnLike":{ "aws:SourceArn":"arn:aws:s3:*:*:s3_mybucket" } } } ] }
Select Save changes.
Step 2: Create an external volume with your AWS SNS topic¶
To configure an external volume, complete the instructions for Configure an external volume for Amazon S3.
In Step 4: Creating an external volume in Snowflake, specify the following additional parameter:
AWS_SNS_TOPIC = '<sns_topic_arn>'
Specifies the Amazon Resource Name (ARN) of the Amazon SNS topic that handles all messages for your external volume location.
For example:
CREATE OR REPLACE EXTERNAL VOLUME auto_refresh_exvol
STORAGE_LOCATIONS = (
(
NAME = 'my-s3-us-east-1'
STORAGE_PROVIDER = 'S3'
STORAGE_BASE_URL = 's3://s3_mybucket/'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::0123456789102:role/my-role'
AWS_SNS_TOPIC = 'arn:aws:sns:us-east-1:0123456789102:sns_topic'
)
);
Step 3: Create a catalog integration¶
Create a catalog integration by using the CREATE CATALOG INTEGRATION command. To indicate that the catalog integration is for
Iceberg tables created from Parquet source files, set the CATALOG_SOURCE
parameter equal to OBJECT_STORE
and the TABLE_FORMAT
parameter equal to NONE
.
Note
Snowflake does not support creating Iceberg tables from Parquet-based table definitions in the AWS Glue Data Catalog.
The following example creates a catalog integration for Parquet files in object storage.
CREATE OR REPLACE CATALOG INTEGRATION icebergCatalogInt
CATALOG_SOURCE = OBJECT_STORE
TABLE_FORMAT = NONE
ENABLED=TRUE;
Step 4: Create an Iceberg table¶
Create an Iceberg table by using the CREATE ICEBERG TABLE command, setting the AUTO_REFRESH
parameter equal to TRUE
.
Note
Using AUTO_REFRESH with INFER_SCHEMA isn’t supported.
CREATE OR REPLACE ICEBERG TABLE my_s3_auto_refresh_table (
first_name STRING,
last_name STRING,
amount NUMBER,
create_date DATE
)
CATALOG = icebergCatalogInt
EXTERNAL_VOLUME = myIcebergVolume
BASE_LOCATION='relative_path_from_external_volume'
AUTO_REFRESH = true;
Troubleshoot¶
To track the status of automatic refreshes for your Iceberg table, use the SYSTEM$ICEBERG_TABLE_AUTO_REFRESH_STATUS function.
For example:
SELECT SYSTEM$ICEBERG_TABLE_AUTO_REFRESH_STATUS('my_s3_auto_refresh_table');
Data type mapping¶
When you define a column in a CREATE ICEBERG TABLE statement for Parquet source files, you must specify a Snowflake data type that maps to the Apache Parquet data type used in your source files.
The following table shows how Parquet logical types map to physical types, and how the physical types map to Snowflake data types.
Parquet logical type |
Parquet physical type |
Snowflake data type |
---|---|---|
None |
BOOLEAN |
BOOLEAN |
None INT(bitWidth=8, isSigned=true) INT(bitWidth=16, isSigned=true) INT(bitWidth=32, isSigned=true) |
INT32 |
INT |
None, INT(bitWidth=64, isSigned=true) |
INT64 |
BIGINT |
None |
FLOAT |
FLOAT |
None |
DOUBLE |
FLOAT |
DECIMAL(P,S) |
INT32 INT64 FIXED_LEN_BYTE_ARRAY(N) |
DECIMAL(P,S) |
DATE |
INT32 |
DATE |
TIME(isAdjustedToUTC=true, unit=MILLIS) |
INT32 |
TIME(3) |
TIME(isAdjustedToUTC=true, unit=MICROS) |
INT64 |
TIME(6) |
TIME(isAdjustedToUTC=true, unit=NANOS) |
INT64 |
TIME(9) |
NONE |
INT96 |
TIMESTAMP_LTZ(9) |
TIMESTAMP(isAdjustedToUTC=true, unit=MILLIS) |
INT64 |
TIMESTAMP_NTZ(3) |
TIMESTAMP(isAdjustedToUTC=true, unit=MICROS) |
INT64 |
TIMESTAMP_NTZ(6) |
TIMESTAMP(isAdjustedToUTC=true, unit=NANOS) |
INT64 |
TIMESTAMP_NTZ(9) |
STRING |
BYTE_ARRAY |
VARCHAR |
ENUM |
BYTE_ARRAY |
VARCHAR |
JSON |
BYTE_ARRAY |
VARCHAR |
UUID |
FIXED_LEN_BYTE_ARRAY(16) |
BINARY(16) |
NONE |
FIXED_LEN_BYTE_ARRAY(N) |
BINARY(L) |
NONE BSON |
BYTE_ARRAY |
BINARY |
INTERVAL |
FIXED_LEN_BYTE_ARRAY(12) |
BINARY(12) Snowflake does not support a corresponding data type for the Parquet INTERVAL type, and reads the data from source files as binary data. |
The following table shows how Parquet nested data types map to Snowflake data types.
Parquet logical nested type |
Snowflake data type |
---|---|
NONE |
|
LIST |
|
MAP |
Limitations¶
The maximum number of Parquet files that you can use to create an Iceberg table is ~2 million.
Parquet files that use any of the following features or data types are not supported:
Field IDs.
The DECIMAL data type with precision higher than 38.
LIST or MAP types with one-level or two-level representation.
Unsigned integer types (INT(signed=false)).
The FLOAT16 data type.
Snowflake does not support creating Iceberg tables from Parquet-based table definitions in the AWS Glue Data Catalog.
Generating Iceberg metadata using the SYSTEM$GET_ICEBERG_TABLE_INFORMATION function is not supported.