ConsumeKinesisStream 2025.10.9.21

Bundle

org.apache.nifi | nifi-aws-nar

Description

Reads data from the specified AWS Kinesis stream and outputs a FlowFile for every processed Record (raw) or a FlowFile for a batch of processed records if a Record Reader and Record Writer are configured. At-least-once delivery of all Kinesis Records within the Stream while the processor is running. AWS Kinesis Client Library can take several seconds to initialise before starting to fetch data. Uses DynamoDB for check pointing and CloudWatch (optional) for metrics. Ensure that the credentials provided have access to DynamoDB and CloudWatch (optional) along with Kinesis.

Tags

amazon, aws, consume, kinesis, stream

Input Requirement

FORBIDDEN

Supports Sensitive Dynamic Properties

false

Properties

PropertyDescription
AWS Credentials Provider serviceThe Controller Service that is used to obtain AWS credentials provider
Amazon Kinesis Stream NameThe name of Kinesis Stream
Application NameThe Kinesis stream reader application name.
Checkpoint IntervalInterval between Kinesis checkpoints
Communications Timeout
DynamoDB OverrideDynamoDB override to use non-AWS deployments
Endpoint Override URLEndpoint URL to use instead of the AWS default including scheme, host, port, and path. The AWS libraries select an endpoint URL based on the AWS region, but this property overrides the selected endpoint URL, allowing use with other S3-compatible endpoints.
Failover TimeoutKinesis Client Library failover timeout
FlowFile Handling On Schema DifferenceThe strategy used when records in a Kinesis Stream change their schema in a single batch.
Graceful Shutdown TimeoutKinesis Client Library graceful shutdown timeout
Initial Stream PositionInitial position to read Kinesis streams.
Output StrategyThe format used to output the Kinesis Record into a FlowFile Record.
Record ReaderThe Record Reader to use for reading received messages. The Kinesis Stream name can be referred to by Expression Language ‘${kinesis.name}’ to access a schema. If Record Reader/Writer are not specified, each Kinesis Record will create a FlowFile.
Record WriterThe Record Writer to use for serializing Records to an output FlowFile. The Kinesis Stream name can be referred to by Expression Language ‘${kinesis.name}’ to access a schema. If Record Reader/Writer are not specified, each Kinesis Record will create a FlowFile.
Region
Report Metrics to CloudWatchWhether to report Kinesis usage metrics to CloudWatch.
Retry CountNumber of times to retry a Kinesis operation (process record, checkpoint, shutdown)
Retry WaitInterval between Kinesis operation retries (process record, checkpoint, shutdown)
Stream Position TimestampTimestamp position in stream from which to start reading Kinesis Records. Required if Initial position to read Kinesis streams. is AT_TIMESTAMP. Uses the Timestamp Format to parse value into a Date.
Timestamp FormatFormat to use for parsing the Stream Position Timestamp into a Date and converting the Kinesis Record’s Approximate Arrival Timestamp into a FlowFile attribute.
proxy-configuration-serviceSpecifies the Proxy Configuration Controller Service to proxy network requests.

Relationships

NameDescription
successFlowFiles are routed to success relationship

Writes attributes

NameDescription
aws.kinesis.partition.keyPartition key of the (last) Kinesis Record read from the Shard
aws.kinesis.shard.idShard ID from which the Kinesis Record was read
aws.kinesis.sequence.numberThe unique identifier of the (last) Kinesis Record within its Shard
aws.kinesis.approximate.arrival.timestampApproximate arrival timestamp of the (last) Kinesis Record read from the stream
mime.typeSets the mime.type attribute to the MIME Type specified by the Record Writer (if configured)
record.countNumber of records written to the FlowFiles by the Record Writer (if configured)
record.error.messageThis attribute provides on failure the error message encountered by the Record Reader or Record Writer (if configured)

See also