snowflake.snowpark.DataFrameReader.dbapi

DataFrameReader.dbapi(create_connection: Callable[[...], Connection], *, table: Optional[str] = None, query: Optional[str] = None, column: Optional[str] = None, lower_bound: Optional[Union[str, int]] = None, upper_bound: Optional[Union[str, int]] = None, num_partitions: Optional[int] = None, max_workers: Optional[int] = None, query_timeout: Optional[int] = 0, fetch_size: Optional[int] = 100000, custom_schema: Optional[Union[str, StructType]] = None, predicates: Optional[List[str]] = None, session_init_statement: Optional[Union[str, List[str]]] = None, udtf_configs: Optional[dict] = None, fetch_merge_count: int = 1, fetch_with_process: bool = False, connection_parameters: Optional[dict] = None) DataFrame[source]

Reads data from a database table or query into a DataFrame using a DBAPI connection, with support for optional partitioning, parallel processing, and query customization.

Usage Notes:
  • Ingestion performance tuning:
    • Partitioning: Use column, lower_bound, upper_bound, and num_partitions together to split large tables into smaller partitions for parallel processing. All four parameters must be specified together, otherwise an error will be raised.

    • Parallel execution: Set max_workers to control the maximum number of processes and threads used for parallel execution.

    • Fetch optimization: Adjust fetch_size to optimize performance by reducing the number of round trips to the database.

    • Partition filtering: Use predicates to define WHERE conditions for partitions. Note that predicates will be ignored if column is specified for partitioning.

    • Schema specification: Set custom_schema to skip schema inference. The custom schema must have matching column names with the table in the external data source.

  • Execution timing and error handling:
    • UDTF Ingestion: Uses lazy evaluation. Errors are reported as SnowparkSQLException during DataFrame actions (e.g., DataFrame.collect()).

    • Local Ingestion: Uses eager execution. Errors are reported immediately as SnowparkDataFrameReaderException when this method is called.

Parameters:
  • create_connection – A callable that returns a DB-API compatible database connection. The callable can optionally accept keyword arguments via **kwargs. If connection_parameters is provided, those will be passed as keyword arguments to this callable. The callable must be picklable, as it will be passed to and executed in child processes.

  • table – The name of the table in the external data source. This parameter cannot be used together with the query parameter.

  • query – A valid SQL query to be used as the data source in the FROM clause. This parameter cannot be used together with the table parameter.

  • column – The column name used for partitioning the table. Partitions will be retrieved in parallel. The column must be of a numeric type (e.g., int or float) or a date type. When specifying column, lower_bound, upper_bound, and num_partitions must also be provided.

  • lower_bound – lower bound of partition, decide the stride of partition along with upper_bound. This parameter does not filter out data. It must be provided when column is specified.

  • upper_bound – upper bound of partition, decide the stride of partition along with lower_bound. This parameter does not filter out data. It must be provided when column is specified.

  • num_partitions – number of partitions to create when reading in parallel from multiple processes and threads. It must be provided when column is specified.

  • max_workers – number of processes and threads used for parallelism.

  • query_timeout – The timeout (in seconds) for each query execution. A default value of 0 means the query will never time out. The timeout behavior can also be configured within the create_connection method when establishing the database connection, depending on the capabilities of the DBMS and its driver.

  • fetch_size – The number of rows to fetch per batch from the external data source. This determines how many rows are retrieved in each round trip, which can improve performance for drivers with a low default fetch size.

  • custom_schema – a custom snowflake table schema to read data from external data source, the column names should be identical to corresponded column names external data source. This can be a schema string, for example: “id INTEGER, int_col INTEGER, text_col STRING”, or StructType, for example: StructType([StructField(“ID”, IntegerType(), False), StructField(“INT_COL”, IntegerType(), False), StructField(“TEXT_COL”, StringType(), False)])

  • predicates – A list of expressions suitable for inclusion in WHERE clauses, where each expression defines a partition. Partitions will be retrieved in parallel. If both column and predicates are specified, column takes precedence.

  • session_init_statement – One or more SQL statements executed before fetching data from the external data source. This can be used for session initialization tasks such as setting configurations. For example, “SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED” can be used in SQL Server to avoid row locks and improve read performance. The session_init_statement is executed only once at the beginning of each partition read.

  • udtf_configs

    A dictionary containing configuration parameters for ingesting external data using a Snowflake UDTF. If this parameter is provided, the workload will be executed within a Snowflake UDTF context.

    The dictionary may include the following keys:

    • external_access_integration (str, required): The name of the external access integration,

      which allows the UDTF to access external endpoints.

    • imports (List[str], optional): A list of stage file names to import into the UDTF.

      Use this to include any private packages required by your create_connection() function.

    • packages (List[str], optional): A list of package names (with optional version numbers)

      required as dependencies for your create_connection() function.

  • fetch_merge_count – The number of fetched batches to merge into a single Parquet file before uploading it. This improves performance by reducing the number of small Parquet files. Defaults to 1, meaning each fetch_size batch is written to its own Parquet file and uploaded separately.

  • fetch_with_process – Whether to use multiprocessing for data fetching and Parquet file generation in local ingestion. Default to False, which means multithreading is used to fetch data in parallel. Setting this to True enables multiprocessing, which may improve performance for CPU-bound tasks like Parquet file generation. When using multiprocessing, guard your script with if __name__ == “__main__”: and call multiprocessing.freeze_support() on Windows if needed. This parameter has no effect in UDFT ingestion.

  • connection_parameters – Optional dictionary of parameters to pass to the create_connection callable. If provided, these parameters will be unpacked and passed as keyword arguments to create_connection(**connection_parameters). This allows for flexible connection configuration without hardcoding values in the callable. Example: {“timeout”: 30, “isolation_level”: “READ_UNCOMMITTED”}

Example::
import oracledb
def create_oracledb_connection():
    connection = oracledb.connect(...)
    return connection

df = session.read.dbapi(create_oracledb_connection, table=...)
Copy
Example::
import oracledb
def create_oracledb_connection():
    connection = oracledb.connect(...)
    return connection

df = session.read.dbapi(create_oracledb_connection, table=..., fetch_with_process=True)
Copy
Example::
import sqlite3
def create_sqlite_connection(timeout=5.0, isolation_level=None, **kwargs):
    connection = sqlite3.connect(
        database=":memory:",
        timeout=timeout,
        isolation_level=isolation_level
    )
    return connection

connection_params = {"timeout": 30.0, "isolation_level": "DEFERRED"}
df = session.read.dbapi(
    create_sqlite_connection,
    table=...,
    connection_parameters=connection_params
)
Copy
Example::
import oracledb
def create_oracledb_connection():
    connection = oracledb.connect(...)
    return connection

# pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
    create_oracledb_connection,
    table="target_table",
    fetch_size=100000,
    num_partitions=4,
    column="ID",  # swap with the column you want your partition based on
    upper_bound=10000,
    lower_bound=0
)
Copy
Example::
import oracledb
def create_oracledb_connection():
    connection = oracledb.connect(...)
    return connection

# pull data from target table with parallelism using predicates
df_local_predicates = session.read.dbapi(
    create_oracledb_connection,
    table="target_table",
    fetch_size=100000,
    predicates=[
        "ID < 3",
        "ID >= 3"
    ]
)
Copy
Example::
import oracledb
def create_oracledb_connection():
    connection = oracledb.connect(...)
    return connection
udtf_configs = {
    "external_access_integration": "<your external access integration>"
}

# pull data from target table with udtf ingestion

df_udtf_basic = session.read.dbapi(
    create_oracledb_connection,
    table="target_table",
    udtf_configs=udtf_configs
)
Copy
Example::
import oracledb
def create_oracledb_connection():
    connection = oracledb.connect(...)
    return connection
udtf_configs = {
    "external_access_integration": "<your external access integration>"
}

# pull data from target table with udtf ingestion with parallelism using partition column

df_udtf_par_column = session.read.dbapi(
    create_oracledb_connection,
    table="target_table",
    udtf_configs=udtf_configs,
    fetch_size=100000,
    num_partitions=4,
    column="ID",  # swap with the column you want your partition based on
    upper_bound=10000,
    lower_bound=0
)
Copy
Example::
import oracledb
def create_oracledb_connection():
    connection = oracledb.connect(...)
    return connection
udtf_configs = {
    "external_access_integration": "<your external access integration>"
}

# pull data from target table with udtf ingestion with parallelism using partition column

df_udtf_predicates = session.read.dbapi(
    create_oracledb_connection,
    table="target_table",
    udtf_configs=udtf_configs,
    fetch_size=100000,
    predicates=[
        "ID < 3",
        "ID >= 3"
    ]
)
Copy

This function or method is in private preview since 1.29.0.