Como o Snowflake High Performance connector for Kafka funciona¶
Este tópico descreve vários aspectos do conector, como ele funciona com tabelas e canais e como configurá-lo.
Como o conector funciona com tabelas e canais¶
O conector Snowflake de alto desempenho para Kafka requer que você crie tabelas de destino manualmente. O conector trata cada registro do Kafka como uma linha a ser inserida em uma tabela do Snowflake. Por exemplo, se você tiver um tópico do Kafka com o conteúdo da mensagem estruturado desta forma:
{
"order_id": 12345,
"customer_name": "John",
"order_total": 100.00,
"isPaid": true
}
Você pode criar uma tabela com colunas correspondentes às chaves JSON e recorrer a um canal padrão chamado {tableName}-STREAMING que mapeará automaticamente as chaves de primeiro nível do conteúdo do registro para as colunas da tabela correspondentes por nome (sem distinção entre maiúsculas e minúsculas).
CREATE TABLE ORDERS (
record_metadata VARIANT,
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
ispaid BOOLEAN
);
Se você criar o próprio canal, poderá definir a lógica de transformação de dados na instrução COPY INTO do canal. Você pode renomear as colunas e converter os tipos de dados conforme necessário. Por exemplo:
CREATE TABLE ORDERS (
order_id VARCHAR,
customer_name VARCHAR,
order_total VARCHAR,
ispaid VARCHAR
);
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:order_id::STRING,
$1:customer_name,
$1:order_total::STRING,
$1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
ou
CREATE TABLE ORDERS (
topic VARCHAR,
partition VARCHAR,
order_id VARCHAR,
customer_name VARCHAR,
order_total VARCHAR,
ispaid VARCHAR
);
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:RECORD_METADATA.topic::STRING AS topic,
$1:RECORD_METADATA.partition::STRING AS partition,
$1['order_id']::STRING AS order_id,
$1['customer_name']::STRING as customer_name,
CONCAT($1['order_total']::STRING, ' USD') AS order_total,
$1['isPaid']::STRING AS ispaid
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Quando você define o próprio canal, as colunas da tabela de destino não precisam corresponder às chaves JSON. Você pode renomear as colunas para os nomes desejados e converter os tipos de dados conforme necessário.
Nomes de tópicos, tabelas e canais¶
Dependendo das definições de configuração, o conector usará nomes diferentes para a tabela de destino. O nome da tabela de destino é sempre derivado do nome do tópico.
Como o conector mapeia nomes de tópicos para a tabela de destino¶
O conector Kafka oferece dois modos para mapear nomes de tópicos do Kafka para nomes de tabelas do Snowflake:
Mapeamento estático: o conector deriva nomes de tabelas de destino usando apenas o nome do tópico do Kafka.
Modo de mapeamento explícito de tópico para tabela: você especifica mapeamentos personalizados entre tópicos e tabelas usando o parâmetro de configuração
snowflake.topic2table.map.
Mapeamento estático¶
Se você não configurar o parâmetro snowflake.topic2table.map, o conector sempre vai derivar os nomes das tabelas do nome do tópico.
Geração de nome de tabela:
O conector deriva o nome da tabela de destino do nome do tópico usando as seguintes regras:
Se o nome do tópico for um identificador válido do Snowflake (começa com uma letra ou sublinhado e contém apenas letras, dígitos, sublinhados ou cifrões), o conector usará o nome do tópico como o nome da tabela (convertido em maiúsculas).
Se o nome do tópico contiver caracteres inválidos, o conector:
Substituirá os caracteres inválidos por sublinhados;
Adicionará um sublinhado seguido por um código hash para garantir a exclusividade;
Por exemplo, o tópico
my-topic.datatorna-seMY_TOPIC_DATA_<hash>.
Determinação do nome do canal:
O conector determina qual canal usar com base na seguinte lógica:
O conector verifica se existe um canal com o mesmo nome da tabela de destino.
Se existir um canal criado pelo usuário com esse nome, o conector usará esse canal (modo de canal definido pelo usuário).
Caso contrário, o conector usará o canal padrão chamado
{tableName}-STREAMING.
Nota
O Snowflake recomenda escolher nomes de tópicos que sigam as regras de nomes dos identificadores do Snowflake para garantir nomes de tabelas previsíveis.
Explicando o RECORD_METADATA¶
O conector preenche a estrutura de RECORD_METADATA com os metadados sobre o registro do Kafka. Esses metadados são enviados por meio da fonte de dados Snowpipe Streaming para o Snowflake, onde ficam disponíveis em transformações de canal usando o acessador $1:RECORD_METADATA. A estrutura de RECORD_METADATA está disponível nos modos de canal tanto definido pelo usuário quanto padrão. O conteúdo pode ser salvo na coluna do tipo VARIANT, ou os campos individuais podem ser extraídos e salvos em colunas separadas.
Exemplo de canal com transformações e metadados:
CREATE PIPE ORDERS AS
COPY INTO ORDERS_TABLE
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total,
$1:RECORD_METADATA.topic AS source_topic,
$1:RECORD_METADATA.offset::NUMBER AS kafka_offset,
$1:RECORD_METADATA.SnowflakeConnectorPushTime::BIGINT AS ingestion_time
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Neste exemplo:
O canal extrai campos específicos da mensagem do Kafka (order_id, customer_name, order_total)
Ele também captura campos de metadados (tópico, deslocamento e carimbo de data/hora da ingestão)
Os valores podem ser convertidos e/ou transformados conforme necessário
Como os campos de metadados são preenchidos¶
O conector preenche automaticamente os campos de metadados com base nas propriedades do registro do Kafka e na configuração do conector. Você pode controlar quais campos de metadados são incluídos usando estes parâmetros de configuração:
snowflake.metadata.topic(padrão: true): inclui o nome do tópicosnowflake.metadata.offset.and.partition(padrão: true): inclui deslocamento e partiçãosnowflake.metadata.createtime(padrão: true): inclui o carimbo de data/hora do registro do Kafkasnowflake.metadata.all(padrão: true): inclui todos os metadados disponíveis
Quando snowflake.metadata.all=true (padrão), todos os campos de metadados são preenchidos. A definição de sinalizadores de metadados individuais como false exclui esses campos específicos da estrutura de RECORD_METADATA.
Nota
O campo SnowflakeConnectorPushTime está sempre disponível e representa a hora em que o conector enviou por push o registro para o buffer de ingestão. Isso é útil para calcular a latência de ingestão de ponta a ponta.
Por padrão, a estrutura de RECORD_METADATA contém as seguintes informações:
Campo |
Tipo de dados |
Descrição |
|---|---|---|
topic |
Cadeia de caracteres |
O nome do tópico Kafka que deu origem ao registro. |
partition |
Cadeia de caracteres |
O número da partição dentro do tópico. (Note que esta é a partição do Kafka, não a micropartição do Snowflake). |
offset |
number |
O offset dessa partição. |
CreateTime / . LogAppendTime |
number |
Esse é o carimbo de data/hora associado à mensagem no tópico do Kafka. O valor é em milissegundos desde a meia-noite de 1.º de janeiro de 1970, UTC. Para obter mais informações, consulte: https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html |
SnowflakeConnectorPushTime |
number |
Um carimbo de data/hora em que um registro foi enviado para um buffer Ingest SDK. O valor é o número de milissegundos desde a meia-noite de 1º de janeiro de 1970 UTC. Para obter mais informações, consulte Estimativa de latência de ingestão. |
key |
Cadeia de caracteres |
Se a mensagem for uma KeyedMessage do Kafka, esta é a chave da mensagem. Para que o conector possa armazenar a chave no RECORD_METADATA, o parâmetro key.converter no Propriedades de configuração do Kafka deve ser definido como “org.apache.kafka.connect.storage.StringConverter”. Caso contrário, o conector ignora as chaves. |
headers |
Objeto |
Um cabeçalho é um par chave-valor associado ao registro e definido pelo usuário. Cada registro pode ter 0, 1 ou vários cabeçalhos. |
A quantidade de metadados gravados na coluna RECORD_METADATA é configurável usando propriedades opcionais de configuração do Kafka.
Os nomes e valores dos campos diferenciam maiúsculas e minúsculas.
Como os registros do Kafka são convertidos antes da ingestão¶
Antes de cada linha ser passada ao Snowpipe Streaming, o conector converte o valor do registro do Kafka Connect em um Map<cadeia de caracteres, objeto>, com chaves que devem corresponder aos nomes das colunas de destino (ou podem ser transformadas dentro de um canal definido pelo usuário). Cadeias de caracteres, matrizes de bytes ou números primitivos devem ser delimitados (por exemplo, usando uma SMT HoistField) para que o conector receba um objeto estruturado. O conversor aplica as seguintes regras:
Os valores nulos são tratados como marcas de exclusão. Eles são ignorados quando
behavior.on.null.values=IGNORE, caso contrário, são ingeridos como objetos JSON vazios.Os campos numéricos e boolianos são passados como estão. Valores decimais com precisão maior que 38 são serializados como cadeias de caracteres para permanecer dentro dos limites de
NUMBERdo Snowflake.As cargas úteis
byte[]eByteBuffersão cadeias de caracteres codificadas em Base64, portanto armazene-as em colunasVARIANTouVARCHAR.As matrizes permanecem como matrizes, e os objetos aninhados permanecem como mapas aninhados. Declare as colunas
VARIANTquando você recorrer ao canal padrão para descarregar os dados aninhados como estão.Mapas com chaves que não são cadeia de caracteres são emitidos como matrizes de pares
[key, value]porque os nomes das colunas do Snowflake devem ser texto.Os cabeçalhos e as chaves dos registros são copiados para
RECORD_METADATAsempre que os sinalizadores de metadados relevantes estiverem habilitados.
Se você precisar que todo o corpo da mensagem seja preservado como uma única coluna, delimite-o em um novo campo de nível superior usando SMTs. Consulte Coluna RECORD_CONTENT legada para ver o padrão de transformação.
Modo de canal definido pelo usuário ou padrão¶
O conector oferece suporte a dois modos para gerenciar a ingestão de dados:
Modo de canal definido pelo usuário¶
Neste modo, você tem controle completo sobre a transformação de dados e o mapeamento de colunas.
Quando usar este modo:
Você precisa de nomes de colunas personalizados que sejam diferentes dos nomes de campo JSON
Você precisa aplicar transformações de dados (conversão de tipo, mascaramento, filtragem)
Você quer controle completo sobre como os dados são mapeados para colunas
Modo de canal padrão¶
Neste modo, o conector usa um canal padrão chamado {tableName}-STREAMING e mapeia os campos de registro do Kafka para colunas de tabelas correspondentes por nome (sem distinção entre maiúsculas e minúsculas).
Quando usar este modo:
Os nomes da sua chave de registro do Kafka correspondem aos nomes das colunas desejadas
Você não precisa de transformações de dados personalizadas
Você quer uma configuração simplificada
Mapeando chaves de registro do Kafka para colunas de tabela com modo de canal padrão
Ao usar o modo de canal padrão, o conector usa o canal padrão chamado {tableName}-STREAMING e mapeia as chaves de primeiro nível do conteúdo diretamente para as colunas da tabela usando uma correspondência sem distinção entre maiúsculas e minúsculas.
Usando o modo de canal padrão – exemplo¶
Exemplo 1:¶
Considere a seguinte carga útil de conteúdo do registro do Kafka:
{
"city": "New York",
"age": 30,
"married": true,
"has cat": true,
"@&$#* includes special characters": true,
"skills": ["sitting", "standing", "eating"],
"family": {"son": "Jack", "daughter": "Anna"}
}
Você cria uma tabela com colunas que correspondem às chaves JSON (sem distinção entre maiúsculas e minúsculas, incluindo caracteres especiais):
CREATE TABLE PERSON_DATA (
record_metadata VARIANT,
city VARCHAR,
age NUMBER,
married BOOLEAN,
"has cat" BOOLEAN,
"!@&$#* includes special characters" BOOLEAN,
skills VARIANT,
family VARIANT
);
Comportamento de correspondência:
"city"(kafka) →cityouCITYouCity(coluna) – sem distinção entre maiúsculas e minúsculas"has cat"(kafka) →"has cat"(coluna) – deve ser delimitado entre aspas devido ao espaço"!@&$#* includes special characters"(kafka) →"!@&$#* includes special characters"(coluna) – caracteres especiais preservadosObjetos aninhados como
skillsefamilymapeiam para colunas VARIANT automaticamente
Usando o modo de canal definido pelo usuário – exemplos¶
Este exemplo mostra como configurar e usar canais definidos pelo usuário com transformações de dados personalizadas.
Exemplo 1:¶
Crie uma tabela com o esquema desejado:
CREATE TABLE ORDERS (
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
order_date TIMESTAMP_NTZ,
source_topic VARCHAR
);
Crie um canal que transforma os registros recebidos do Kafka para corresponder ao esquema da sua tabela:
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total::NUMBER,
$1:order_date::TIMESTAMP_NTZ,
$1:RECORD_METADATA.topic
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Observe que o nome do canal (ORDERS) corresponde ao nome da tabela (ORDERS). A definição do canal extrai campos da carga útil JSON usando a sintaxe $1:field_name e os mapeia para as colunas da tabela.
Nota
Você pode acessar os campos JSON aninhados, e campos com caracteres especiais, usando notação de colchetes, como $1['field name'] ou $1['has cat'].
Configure o mapeamento do tópico para a tabela:
snowflake.topic2table.map=kafka-orders-topic:ORDERS
Esta configuração mapeia o tópico do Kafka kafka-orders-topic para a tabela e o canal preexistentes chamados ORDERS.
Exemplo 2:¶
Quando você precisa acessar chaves no conteúdo que não têm nomes convencionais, use a seguinte sintaxe:
Campos simples:
$1:field_nameCampos com espaços ou caracteres especiais:
$1['field name']ou$1['has cat']Campos com caracteres unicode:
$1[' @&$#* has Łułósżź']Campos aninhados:
$1:parent.childou$1:parent['child field']
Considere esta carga útil JSON do Kafka:
{
"city": "New York",
"age": 30,
"married": true,
"has cat": true,
" @&$#* has Łułósżź": true,
"skills": ["sitting", "standing", "eating"],
"family": {"son": "Jack", "daughter": "Anna"}
}
Crie uma tabela de destino com os nomes de coluna escolhidos:
CREATE TABLE PERSON_DATA (
city VARCHAR,
age NUMBER,
married BOOLEAN,
has_cat BOOLEAN,
weird_field_name BOOLEAN,
skills VARIANT,
family VARIANT
);
Em seguida, crie um canal com o mesmo nome que define o mapeamento:
CREATE PIPE PERSON_DATA AS
COPY INTO PERSON_DATA
FROM (
SELECT
$1:city,
$1:age,
$1:married,
$1['has cat'] AS has_cat,
$1[' @&$#* has Łułósżź'] AS weird_field_name,
$1:skills,
$1:family
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Pontos principais:
Você controla os nomes das colunas (por exemplo, renomear
"has cat"parahas_cat)Você pode converter tipos de dados conforme necessário (por exemplo,
$1:age::NUMBER)Você pode incluir ou excluir campos conforme desejado
Você pode adicionar campos de metadados (por exemplo,
$1:RECORD_METADATA.topic)Colunas VARIANT processam estruturas JSON aninhadas automaticamente
Exemplo 3: Com tabelas interativas¶
As tabelas interativas são um tipo especial de tabela do Snowflake otimizada para consultas de baixa latência e alta simultaneidade. Você pode saber mais sobre tabelas interativas na documentação de tabelas interativas.
Nota
Atualmente, as tabelas interativas são um recurso em versão preliminar disponível apenas para contas selecionadas.
Crie uma tabela interativa:
CREATE INTERACTIVE TABLE REALTIME_METRICS ( metric_name VARCHAR, metric_value NUMBER, source_topic VARCHAR, timestamp TIMESTAMP_NTZ ) AS (SELECT $1:M_NAME::VARCHAR, $1:M_VALUE::NUMBER, $1:RECORD_METADATA.topic::VARCHAR, $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Configure o mapeamento do tópico para a tabela:
snowflake.topic2table.map=metrics-topic:REALTIME_METRICS
Considerações importantes:
As tabelas interativas têm limitações e restrições de consulta específicas. Revise a documentação de tabelas interativas antes de usá-las com o conector.
Para tabelas interativas, as transformações necessárias devem ser tratadas na definição da tabela.
Warehouses interativos são necessários para consultar tabelas interativas de forma eficiente.
Mapeamento explícito de tópico para tabela¶
Quando você configura o parâmetro snowflake.topic2table.map, o conector opera no modo de mapeamento explícito. Esse modo permite que você:
Mapeie vários tópicos do Kafka para uma única tabela do Snowflake;
Use nomes de tabelas personalizados diferentes dos nomes dos tópicos;
Aplique padrões regex para corresponder a vários tópicos.
Formato de configuração:
O parâmetro snowflake.topic2table.map aceita uma lista separada por vírgulas de mapeamentos de tópico para tabela no formato:
topic1:table1,topic2:table2,topic3:table3
Exemplos de configurações:
Mapeamento direto do tópico
snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
Correspondência de padrão regex
snowflake.topic2table.map=.*_cat:CAT_TABLE,.*_dog:DOG_TABLE
Esta configuração mapeia todos os tópicos que terminam com _cat (como orange_cat, calico_cat) para a tabela CAT_TABLE, e todos os tópicos que terminam com _dog para a tabela DOG_TABLE.
Muitos tópicos para uma tabela
snowflake.topic2table.map=topic1:shared_table,topic2:shared_table,topic3:other_table
Esta configuração mapeia tanto topic1 quanto topic2 para shared_table, enquanto topic3 é mapeado para other_table.
Importante
Os padrões regex no mapeamento não podem se sobrepor. Cada tópico deve corresponder no máximo a um padrão.
Os nomes das tabelas no mapeamento devem ser identificadores válidos do Snowflake com pelo menos 2 caracteres, começando com uma letra ou sublinhado.
Você pode mapear vários tópicos para uma única tabela.
Coluna RECORD_CONTENT legada¶
Em versões mais antigas do conector, quando o recurso de esquematização era desabilitado, o conector criava uma tabela de destino com duas colunas: RECORD_CONTENT e RECORD_METADATA. A coluna RECORD_CONTENT continha todo o conteúdo da mensagem do Kafka em uma coluna do tipo VARIANT. A coluna RECORD_METADATA ainda é compatível, mas a coluna RECORD_CONTENT não é mais criada pelo conector. A mesma funcionalidade pode ser obtida usando transformações SMT (consulte os exemplos mais adiante nesta seção). A chave RECORD_CONTENT também não está mais disponível em transformações de PIPE. Por exemplo, esta definição de PIPE não funcionará por padrão:
Nota
Essa definição de canal não funcionará sem transformações SMT adicionais.
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:RECORD_CONTENT
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Se você precisa de todo o conteúdo da mensagem do Kafka salvo em uma única coluna, ou de um manipulador para todo o conteúdo da mensagem do Kafka em uma transformação de PIPE, pode usar a seguinte transformação SMT que delimita todo o conteúdo da mensagem do Kafka no campo personalizado desejado:
transforms=wrapKafkaMessageContent
transforms.wrapKafkaMessageContent.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapKafkaMessageContent.field=your_top_level_field_name
Essa transformação delimita todo o conteúdo da mensagem do Kafka em um campo personalizado chamado your_top_level_field_name. Em seguida, você pode acessar todo o conteúdo da mensagem do Kafka usando o acessador $1:your_top_level_field_name em sua transformação de PIPE.
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:your_top_level_field_name
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Uma outra opção se você quiser salvar todos os metadados e o conteúdo em uma única tabela usando o canal padrão é não criar um canal personalizado; em vez disso, crie apenas uma tabela com duas colunas: RECORD_CONTENT e your_top_level_field_name.
CREATE TABLE ORDERS (
record_metadata VARIANT,
your_top_level_field_name VARIANT
);
Para ler mais sobre a transformação HoistField$Value, consulte a Documentação do Kafka.
Aviso
Salvar todo o conteúdo e os metadados da mensagem do Kafka em uma tabela pode ter um impacto negativo no custo de ingestão, na velocidade do pipeline e na latência. Se você precisa do melhor desempenho possível, considere salvar apenas os dados necessários se estiverem acessíveis no nível superior do conteúdo do registro do Kafka ou usar transformações SMT para extrair os dados dos campos profundamente aninhados para campos de nível superior.
Tratamento de erros de canais de streaming e filas de mensagens mortas¶
Na versão 4.0.0-rc4, o conector inspeciona o status do canal Snowpipe Streaming antes de confirmar os deslocamentos. Se o Snowflake relatar linhas rejeitadas (rowsErrorCount > 0), the connector now raises a fatal error (ERROR_5030) when errors.tolerance=none so that data issues cannot go unnoticed. To allow ingestion to continue while you triage bad rows, set errors.tolerance=all
errors.tolerance=all
Evolução do esquema¶
Importante
A evolução do esquema não é compatível no conector Snowflake de alto desempenho para Kafka. É necessário gerenciar manualmente as alterações de esquema nas tabelas de destino.
O conector não detecta automaticamente alterações de esquema nem evolui esquemas de tabela com base nos registros do Kafka recebidos. Quando você precisa adicionar colunas, modificar tipos de dados ou fazer outras alterações no esquema, deve:
Pausar o conector para interromper a ingestão de dados;
Alterar manualmente o esquema da tabela usando ALTER TABLE ou recriar a tabela;
Atualizar sua definição de canal se estiver usando canais definidos pelo usuário e se a lógica de transformação precisa mudar;
Reiniciar o conector para retomar a ingestão de dados.
Nota
O suporte para evolução de esquema será adicionado em versões futuras.
Tolerância a falhas¶
Limitações da tolerância a falhas com o conector¶
Os tópicos do Kafka podem ser configurados com um limite de espaço de armazenamento ou de tempo de retenção.
Se o sistema ficar offline por mais do que o tempo de retenção, então os registros expirados não serão carregados. Da mesma forma, se o limite do espaço de armazenamento do Kafka for excedido, algumas mensagens não serão entregues.
Se as mensagens no tópico do Kafka forem excluídas, essas alterações talvez não sejam refletidas na tabela do Snowflake.