Note
This connector is subject to the Snowflake Connector Terms.
Configuring custom transformations¶
The streaming connectors (Kafka high-performance, Kinesis high-performance) consume messages with a Consume* processor and deliver them to Snowflake with PublishSnowpipeStreaming. By default the source connects directly to PublishSnowpipeStreaming. This topic describes how to insert custom processing between the source and the destination — filtering, field mapping (flatten / rename / remove), topic-to-table mapping, content-based routing to multiple tables, default values, and custom Groovy scripts — by adding a Custom Transformations process group in the Openflow UI.
Streaming connectors use at-least-once (ALO) delivery. Custom processing sits directly in that path, so a set of restriction rules must be followed or you risk duplication, out-of-order delivery, or data loss (if records are explicitly dropped). Read Restriction rules before adding any processor.
Tip
You don’t have to apply this customization by hand. The Openflow skill in Snowflake CoCo can perform it for you — describe the change you want and it edits the flow following the steps on this page. We recommend using the skill instead of configuring the components manually.
For switching the message data type (JSON to Avro / Protobuf), see:
For routing failed records to a dedicated destination instead of dropping them, see:
Scope¶
This topic customizes an already-installed streaming connector in place by adding a process group and processors in the Openflow UI and rewiring the source-to-destination path.
What this topic covers:
- Optionally inserting a Custom Transformations process group for readability (processors can also be added directly to the connector canvas).
- The restriction rules for safe in-flight processing.
- Reusing the connector’s existing
JsonTreeReader/JsonRecordSetWriterand adding a schema cache. - Planning a pipeline that minimizes serialization/deserialization (ser/de) passes.
- Common transformation patterns and the processors that implement them.
Out of scope — broker authentication, data-type switching, and Snowflake Private Key Auth. See the See also links.
Prerequisites¶
- You have an existing Kafka high-performance or Kinesis high-performance connector deployed in Openflow.
- The connector currently connects its source processor (for example,
ConsumeKafka) directly toPublishSnowpipeStreaming. - You know which transformation you need and to which table(s) records should be written.
- The runtime role can create any target tables used for routing (see the connector’s destination grants).
Architecture¶
Insert the transformation processors between the source processor and PublishSnowpipeStreaming — that is, remove the direct source to PublishSnowpipeStreaming connection and chain your processors in between.
Note
The Custom Transformations process group is optional. You can add the transformation processors directly onto the connector’s canvas, between the source processor and PublishSnowpipeStreaming. Grouping them inside a dedicated process group named Custom Transformations (with an Input Port and an Output Port) is purely for readability and maintainability — it visually separates your customizations from the connector’s built-in components and makes them easy to find, label, and reason about. It changes nothing functionally: the same processors, restriction rules, FIFO connections, and reader/writer apply either way. The rest of this topic describes the grouped layout. If you skip the group, ignore the Input/Output Port steps and connect the source processor directly to your first transformation processor and your last one directly to PublishSnowpipeStreaming.
The recommended (grouped) layout. An Input Port receives messages from the source, transformation processors run inside the group, and an Output Port feeds the result to PublishSnowpipeStreaming:
Step 1: Create the process group (optional)¶
This step (and Step 2) is only needed if you want the grouped layout. To add processors directly onto the connector canvas instead, do Step 1.1–1.2 below, then skip to Step 3 and connect the source processor straight to your first transformation processor.
- Open the connector’s process group in the Openflow UI.
- Remove the existing connection between the source processor and
PublishSnowpipeStreaming. - Drag a new Process Group onto the canvas and name it
Custom Transformations.
Step 2: Add the input and output ports (optional)¶
Only if you created the process group in Step 1. Inside the Custom Transformations group:
- Add an Input Port named
Custom Transformations Input. - Add an Output Port named
Custom Transformations Output.
Step 3: Wire the processing into the flow¶
- Connect the source processor’s
successrelationship toCustom Transformations Input(or directly to your first transformation processor if you skipped the group). - Connect
Custom Transformations OutputtoPublishSnowpipeStreaming(or your last transformation processor toPublishSnowpipeStreamingif you skipped the group). - Configure every connection (both these external connections and all internal connections you add later) with the FirstInFirstOut prioritizer. See Connection configuration.
Restriction rules¶
Warning
These rules MUST be followed. Violating them might cause data loss, duplication, or performance degradation.
-
Minimize ser/de operations. Plan the full pipeline before creating processors. Combine filtering, renaming, and defaults into as few record-aware processors as possible. Extract useful values (for example, routing fields) as attributes early so downstream steps can use attribute-only processors (
RouteOnAttribute,UpdateAttribute) instead of content-aware ones. -
Reuse the existing reader/writer where possible. The connector already defines a
JsonTreeReader(schema inference) and aJsonRecordSetWriterat the parent process-group level, and these are visible to your processors. When a processor reads or writes content of the same data type, prefer reusing these existing services rather than creating new ones — it keeps the flow simple and consistent. See Reader and writer setup. -
One table per FlowFile. A single FlowFile can contain records for only ONE table. Prefer
RouteOnAttributeoverPartitionRecordwhen the routing value is already available as an attribute (it saves a ser/de pass). -
Use FIFO prioritizers on connections. Apply the FirstInFirstOut prioritizer on ALL connections (internal and external). This keeps records in arrival order within each partition and avoids unexpected interleaving. It does not affect throughput.
-
No array exploding to the same table. Exploding an array into multiple records destined for the same table is NOT allowed (it can cause data loss or duplicates). Array exploding is valid only when the resulting records route to different tables.
-
Let schema inference handle structure. With the existing
JsonTreeReader(plus aVolatileSchemaCache) andJsonRecordSetWriter, structural changes are handled automatically — you do not need to define schemas manually.
Reader and writer setup¶
Transformation processors that access FlowFile content need a Record Reader and Record Writer. The connector already defines a JsonTreeReader and a JsonRecordSetWriter at the parent process-group level, and these are visible inside the Custom Transformations group. When a processor works with the same data type, prefer reusing these existing services rather than creating new ones.
Add a schema cache to the reader (recommended)¶
The existing JsonTreeReader uses schema inference. To avoid re-inferring the schema on every FlowFile, add a VolatileSchemaCache and point the reader at it.
- At the connector process-group level, go to Configure > Controller Services.
- Add a
VolatileSchemaCacheservice.
| Property | Value |
|---|---|
| Maximum Cache Size | 100 (default is usually sufficient) |
- Enable the
VolatileSchemaCacheservice. - Disable the
JsonTreeReader(required before editing it). - Edit
JsonTreeReaderand set:
| Property | Value |
|---|---|
| Schema Inference Cache | The VolatileSchemaCache created above. |
- Enable the
JsonTreeReaderagain.
This caches the inferred schema and reuses it for messages with the same structure, improving performance.
Transformation planning¶
Plan the full pipeline before creating any processor (rule 1). The goal is the fewest possible ser/de passes. Building processors one-by-one without a plan almost always produces a wasteful pipeline.
Planning checklist¶
-
List every required transformation (filter, rename, defaults, routing, and so on).
-
Identify which can run on attributes alone (zero ser/de). Attribute-only processors work only when the FlowFile already carries the attribute (for example,
kafka.topicset byConsumeKafka,aws.kinesis.stream.nameset byConsumeKinesis, or an attribute set by an upstreamPartitionRecord):- Routing by source (topic / stream) or by an existing attribute →
UpdateAttributeto derive a target table name into an attribute (for example,table.name), whichPublishSnowpipeStreamingthen reads to pick the destination table. This is explained in detail under Topic-to-table mapping and Content-based routing to multiple tables. - Attribute-based filtering →
RouteOnAttribute.
- Routing by source (topic / stream) or by an existing attribute →
-
Combine content transformations into the fewest content-aware processors:
- Filter + rename + defaults → ONE
QueryRecord(SQLSELECTaliases,COALESCE,WHERE) or ONEJoltTransformRecord(Chain spec).
- Filter + rename + defaults → ONE
-
Use
PartitionRecordfor multi-table routing. A FlowFile may contain many records with different values for a routing field. Partitioning splits the FlowFile so each output FlowFile holds records for one value only, and sets that value as a FlowFile attribute. -
Order operations: content transforms first (one pass on the whole FlowFile) → partition (split into per-value FlowFiles) →
UpdateAttributeto set a target table-name attribute from the partitioned value (zero ser/de —PublishSnowpipeStreaminguses this attribute to choose the destination table) →RouteOnAttributeonly if filtering is needed (zero ser/de). The table-name attribute is explained under Content-based routing to multiple tables.
Choosing between readable and optimized pipelines¶
After planning the standard-processor pipeline, count its ser/de passes:
- 1 pass — nothing to optimize, so implement it.
- 2 or more passes — a single
ExecuteGroovyScriptcould consolidate all content operations into 1 pass. Weigh maintainability (standard processors are easier to read and modify) against performance (one Groovy script can be faster for performance-critical workloads — confirm assumptions with testing before optimizing — but is harder to change). Choose the readable option unless you specifically need maximum performance.
Example — “filter by timestamp, rename fields, add defaults, route to tables by field value”:
Readable (2 ser/de passes):
Optimized (1 ser/de pass):
PublishSnowpipeStreaming handles multi-table routing natively — set its Table property to ${table.name} (or the routing attribute directly). RouteOnAttribute is only for filtering, never required for routing alone.
Anti-pattern (avoid): separate processors for each of filter / rename / partition (3 ser/de passes where 2 suffice), or a separate UpdateAttribute per route value. Combine where possible, and use a single UpdateAttribute driven by the partitioned attribute.
Connection configuration¶
Every connection — both into/out of the Custom Transformations group and between processors inside it — MUST be configured with:
- FirstInFirstOut prioritizer — keeps records in arrival order within each partition (rule 4).
- Back pressure — leave at defaults unless you have a specific reason to change it.
To set the prioritizer: edit the connection, open the Settings tab, and add FirstInFirstOutPrioritizer under Prioritizers.
This applies to:
- Source processor to
Custom Transformations Input Custom Transformations OutputtoPublishSnowpipeStreaming- All connections between processors inside the group
Note
Connection queue before PublishSnowpipeStreaming: Set the connection queue size limit to 5 GB (not the default 1 GB). PublishSnowpipeStreaming keeps FlowFiles queued until they are acknowledged by Snowflake; with the default 1 GB limit, backpressure activates too early under normal load.
Transformation patterns¶
Pick the pattern that matches your need. All patterns assume the reader/writer setup above and obey the restriction rules.
Pattern: Filtering messages¶
By attribute / key (no ser/de — preferred). Use RouteOnAttribute. It reads only FlowFile attributes, so there is no content-parsing cost.
- Add a
RouteOnAttributeprocessor. - Add dynamic properties whose values are Expression Language conditions. Each becomes a relationship.
- Connect the desired relationship to the next step. Connect
unmatchedto auto-terminate (to drop) or to the Output Port (to keep).
By content (requires ser/de). Use QueryRecord with a SQL WHERE clause.
| Property | Value |
|---|---|
| Record Reader | The existing JsonTreeReader. |
| Record Writer | The existing JsonRecordSetWriter. |
filtered (dynamic) | SELECT * FROM FLOWFILE WHERE <condition> |
Route the filtered relationship to the next step, and auto-terminate original.
Pattern: Mapping / field transformations¶
Use JoltTransformRecord (record-aware) for flatten / rename / remove / default operations.
| Property | Value |
|---|---|
| Record Reader | The existing JsonTreeReader. |
| Record Writer | The existing JsonRecordSetWriter. |
| Jolt Transform | jolt-transform-chain |
| Jolt Specification | A Chain spec (see below). |
Combine operations in one Chain spec to keep it to a single ser/de pass:
Note
Use JoltTransformRecord, not JoltTransformJSON. JoltTransformRecord is record-aware and processes NDJSON correctly using the configured RecordReader/RecordWriter. Do NOT use JoltTransformJSON — it treats the entire FlowFile as a single JSON document and fails with NDJSON input.
For simpler operations (add/rename/remove fields, set defaults), UpdateRecord with RecordPath expressions is a straightforward alternative to Jolt spec syntax. For content-based filtering, QueryRecord (SQL WHERE) is the alternative.
Pattern: Topic-to-table mapping¶
Route messages to different Snowflake tables based on the Kafka topic (or, for Kinesis, the source stream). ConsumeKafka sets the kafka.topic attribute automatically, and ConsumeKinesis sets aws.kinesis.stream.name, so this is attribute-only (no ser/de). The steps below use kafka.topic — for Kinesis, substitute aws.kinesis.stream.name.
- Add an
UpdateAttributeprocessor with a dynamic property:
| Property | Value |
|---|---|
table.name | Kafka: ${kafka.topic:replaceByPattern(#{'Topic To Table Map'})}Kinesis: ${aws.kinesis.stream.name:replaceByPattern(#{'Topic To Table Map'})} |
-
Add a
Topic To Table Mapparameter to the connector’s parameter context. Format:topic:tablepairs separated by commas. Table names must be valid unquoted Snowflake identifiers. Regex patterns must map a topic to a single table. If empty or no match, the topic name is used as the table name.- Explicit:
topic1:low_range,topic2:low_range,topic5:high_range - Regex:
topic[0-4]:low_range,topic[5-9]:high_range
- Explicit:
-
Update
PublishSnowpipeStreaming:
| Property | Value |
|---|---|
| Table | ${table.name} |
Pattern: Default values for null / empty fields¶
Use JoltTransformRecord with a default operation (preferably combined into the Chain spec above), or UpdateRecord with RecordPath:
| Property | Value |
|---|---|
| Record Reader | The existing JsonTreeReader. |
| Record Writer | The existing JsonRecordSetWriter. |
| Replacement Value Strategy | record-path-value |
/field_name (dynamic) | replaceNull(/field_name, 'default_value') |
Pattern: Content-based routing to multiple tables¶
Route records to different tables based on a field value in the message content.
A single FlowFile can contain records with different routing-field values, so PartitionRecord is always required — there is no shortcut.
Step 1 — PartitionRecord splits by the routing field and sets the value as a FlowFile attribute:
| Property | Value |
|---|---|
| Record Reader | The existing JsonTreeReader. |
| Record Writer | The existing JsonRecordSetWriter. |
<routing-field-name> (dynamic) | /routing_field |
Step 2 (optional) — UpdateAttribute to map the attribute to a table name, only if they differ:
| Property | Value |
|---|---|
table.name | ${<routing-field-attribute>} |
If the attribute value is the table name, skip this and point PublishSnowpipeStreaming directly at the attribute.
Step 3 (optional) — RouteOnAttribute only if you must drop unwanted values:
| Property | Value |
|---|---|
matched (dynamic) | ${table.name:isEmpty():not()} |
Records that satisfy the expression flow to the matched relationship; connect it to the Output Port. Everything else falls to the built-in unmatched relationship — auto-terminate it to drop those records.
Step 4 — PublishSnowpipeStreaming: set Table = ${table.name} (or the routing attribute directly). PublishSnowpipeStreaming writes each FlowFile to whatever table the attribute resolves to.
Note
Dynamic database and schema too. PublishSnowpipeStreaming supports Expression Language (FlowFile attributes) on its Database, Schema, Table, and Pipe properties — not just Table. So you can route to a fully dynamic destination by setting, for example, Database = ${target.db}, Schema = ${target.schema}, and Table = ${table.name}, where each attribute is set upstream by UpdateAttribute / PartitionRecord. Each evaluates per FlowFile, so a single processor can fan out across databases, schemas, and tables.
| Need | Pipeline |
|---|---|
| Route to multiple tables (all values valid) | PartitionRecord -> Output Port, with PublishSnowpipeStreaming Table = ${field} |
| Route to multiple tables + rename attribute | PartitionRecord -> UpdateAttribute -> Output Port |
| Route to multiple tables + drop some values | PartitionRecord -> UpdateAttribute -> RouteOnAttribute -> Output Port |
Set FIFO on all connections.
Pattern: Custom Groovy script (catch-all)¶
Use ExecuteGroovyScript for anything that does not fit the patterns above. All restriction rules still apply: FIFO on connections, one table per output FlowFile, and preserved ordering.
Warning
Preserve original attributes. The script MUST NOT remove or overwrite incoming attributes. For Kafka these include kafka.topic and kafka.partition. For Kinesis these include aws.kinesis.stream.name and aws.kinesis.shard.id. These are used in PublishSnowpipeStreaming channel names and must be preserved — the connectors provide at-least-once delivery and do not track offsets. When emitting output FlowFiles, always inherit attributes from the incoming FlowFile.
Validate the script against edge cases (nulls, missing fields, type mismatches) before enabling the flow.
Combining transformations¶
Chain multiple processors inside the group:
- Order: place attribute-only processors (no ser/de) before content-aware ones, so unwanted data is dropped early and costly ser/de runs only on records that will actually be written.
- Minimize ser/de: combine content operations into a single processor where possible (one
QueryRecordor one chainedJoltTransformRecord). KeepPartitionRecordseparate — it changes FlowFile boundaries. - Connections: FIFO on all. Route failures to auto-terminate or a dead-letter output.
Parameterization¶
After wiring the transformations, move suitable hardcoded values into the connector’s parameter context so the flow can be reconfigured without editing processors.
Good candidates: connection strings / URLs, topic-to-table mappings, credentials, thresholds and constants.
Not suitable (keep inline): Groovy scripts, Jolt specs, filter/routing conditions, large schema definitions.
Verification¶
After creating all processors, wiring all connections, and updating PublishSnowpipeStreaming:
- Enable all controller services first. Processors that reference a disabled service show as INVALID, so enable every service before validating.
- Validate the process group and resolve any validation failures.
- Confirm
PublishSnowpipeStreamingreferences the correct Table value (for example,${table.name}). - Start the flow.
Troubleshooting¶
| Symptom | Likely cause |
|---|---|
| Data loss or duplicate data | FIFO prioritizer missing on a connection. |
| Out-of-order delivery | Missing FIFO prioritizer on a connection between processors. |
| Processor shows INVALID | A referenced controller service is disabled, or a required property is missing — enable services first, then re-validate. |
| Records written to the wrong table | PublishSnowpipeStreaming Table not set to the routing attribute, or PartitionRecord / UpdateAttribute not setting the expected attribute. |
| Schema re-inferred on every message (slow) | VolatileSchemaCache not configured on the JsonTreeReader (see Reader and writer setup). |
Lost kafka.* attributes after a Groovy step | The script created new FlowFiles without inheriting the incoming attributes. |
See also¶
- Configuring Avro data type ingestion — JSON to Avro switching
- Configuring Protobuf data type ingestion — JSON to Protobuf switching
- Configuring Dead Letter Queue (DLQ) handling — routing failed records to a destination