snowflake.snowpark.DataFrameReader.dbapi

DataFrameReader.dbapi(create_connection: Callable[[], Connection], *, table: Optional[str] = None, query: Optional[str] = None, column: Optional[str] = None, lower_bound: Optional[Union[str, int]] = None, upper_bound: Optional[Union[str, int]] = None, num_partitions: Optional[int] = None, max_workers: Optional[int] = None, query_timeout: Optional[int] = 0, fetch_size: Optional[int] = 0, custom_schema: Optional[Union[str, StructType]] = None, predicates: Optional[List[str]] = None, session_init_statement: Optional[str] = None) DataFrame[source]

Reads data from a database table or query into a DataFrame using a DBAPI connection, with support for optional partitioning, parallel processing, and query customization.

There are multiple methods to partition data and accelerate ingestion. These methods can be combined to achieve optimal performance:

1.Use column, lower_bound, upper_bound and num_partitions at the same time when you need to split large tables into smaller partitions for parallel processing. These must all be specified together, otherwise error will be raised. 2.Set max_workers to a proper positive integer. This defines the maximum number of processes and threads used for parallel execution. 3.Adjusting fetch_size can optimize performance by reducing the number of round trips to the database. 4.Use predicates to defining WHERE conditions for partitions, predicates will be ignored if column is specified to generate partition. 5.Set custom_schema to avoid snowpark infer schema, custom_schema must have a matched column name with table in external data source.

Parameters:
  • create_connection – A callable that takes no arguments and returns a DB-API compatible database connection. The callable must be picklable, as it will be passed to and executed in child processes.

  • table – The name of the table in the external data source. This parameter cannot be used together with the query parameter.

  • query – A valid SQL query to be used as the data source in the FROM clause. This parameter cannot be used together with the table parameter.

  • column – The column name used for partitioning the table. Partitions will be retrieved in parallel. The column must be of a numeric type (e.g., int or float) or a date type. When specifying column, lower_bound, upper_bound, and num_partitions must also be provided.

  • lower_bound – lower bound of partition, decide the stride of partition along with upper_bound. This parameter does not filter out data. It must be provided when column is specified.

  • upper_bound – upper bound of partition, decide the stride of partition along with lower_bound. This parameter does not filter out data. It must be provided when column is specified.

  • num_partitions – number of partitions to create when reading in parallel from multiple processes and threads. It must be provided when column is specified.

  • max_workers – number of processes and threads used for parallelism.

  • query_timeout – The timeout (in seconds) for each query execution. A default value of 0 means the query will never time out. The timeout behavior can also be configured within the create_connection method when establishing the database connection, depending on the capabilities of the DBMS and its driver.

  • fetch_size – The number of rows to fetch per batch from the external data source. This determines how many rows are retrieved in each round trip, which can improve performance for drivers with a low default fetch size.

  • custom_schema – a custom snowflake table schema to read data from external data source, the column names should be identical to corresponded column names external data source. This can be a schema string, for example: “id INTEGER, int_col INTEGER, text_col STRING”, or StructType, for example: StructType([StructField(“ID”, IntegerType(), False)])

  • predicates – A list of expressions suitable for inclusion in WHERE clauses, where each expression defines a partition. Partitions will be retrieved in parallel. If both column and predicates are specified, column takes precedence.

  • session_init_statement – A SQL statement executed before fetching data from the external data source. This can be used for session initialization tasks such as setting configurations. For example, “SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED” can be used in SQL Server to avoid row locks and improve read performance. The session_init_statement is executed only once at the beginning of each partition read.

Example::
import oracledb
def create_oracledb_connection():
    connection = oracledb.connect(...)
    return connection

df = session.read.dbapi(create_oracledb_connection, table=...)
Copy

This function or method is in private preview since 1.29.0.