Configure a Dead Letter Queue
A Dead Letter Queue (DLQ) captures records that Confluent Cloud for Apache Flink cannot deserialize, enabling your statements to continue processing instead of failing. When a deserialization error occurs, Flink routes the problematic record to a DLQ table, where you can inspect it later.
DLQ error handling applies only to deserialization errors at the source. Errors in user-defined functions (UDFs), serialization, or windowed aggregations are not routed to the DLQ. For recommendations on handling errors inside UDFs, see Error handling best practices.
Note
Tableflow uses the same error-handling.mode and error-handling.log.target table properties and the same DLQ schema. For Tableflow-specific error handling behavior, see Error-handling mode.
Prerequisites
Access to Confluent Cloud.
A provisioned Flink compute pool.
The appropriate RBAC roles and ACL permissions. For details, see Dead Letter Queue permissions.
Configure a DLQ when creating a new table
You can enable DLQ error handling when creating a new table by setting the error-handling.mode and error-handling.log.target table properties in your CREATE TABLE statement.
CREATE TABLE my_source_table (
id INT,
name STRING,
event_time TIMESTAMP_LTZ(3)
) WITH (
'error-handling.mode' = 'log',
'error-handling.log.target' = 'my_source_table_error_log'
);
Flink attempts to create the DLQ topic and register the schema automatically. If you don’t specify error-handling.log.target, the default DLQ table name is error_log.
Configure a DLQ for an existing table
You can add DLQ error handling to an existing table by using an ALTER TABLE statement.
ALTER TABLE my_source_table SET (
'error-handling.mode' = 'log',
'error-handling.log.target' = 'my_source_table_error_log'
);
Flink attempts to create the DLQ topic and register the schema at the time the ALTER TABLE statement runs.
Important
If DLQ creation fails (for example, the topic cannot be created, the schema cannot be registered, or permissions are missing), the CREATE TABLE or ALTER TABLE statement itself does not fail. The failure surfaces later when the first deserialization error occurs and Flink cannot write the record to the DLQ, causing the job to fail.
To verify that Flink created the DLQ successfully, check that the DLQ topic and schema exist in your Kafka cluster and Schema Registry after running the statement.
Pre-create a DLQ outside Flink
By default, Flink creates the DLQ topic and schema automatically with default settings. If you need control over the topic configuration, such as partition count, retention time, or cleanup policy, you can pre-create the DLQ topic and schema before enabling error handling.
Option A: Create the DLQ table in Flink with a CREATE TABLE statement
Run a CREATE TABLE statement with the full DLQ schema and your desired topic configuration.
CREATE TABLE my_source_table_error_log (
`error_timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
`error_code` INT NOT NULL,
`error_reason` VARCHAR(2147483647) NOT NULL,
`error_message` VARCHAR(2147483647) NOT NULL,
`error_details` MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)> NOT NULL,
`processor` VARCHAR(2147483647) NOT NULL,
`statement_name` VARCHAR(2147483647),
`affected_type` VARCHAR(2147483647) NOT NULL,
`affected_catalog` VARCHAR(2147483647),
`affected_database` VARCHAR(2147483647),
`affected_name` VARCHAR(2147483647),
`source_record` ROW<
`topic` VARCHAR(2147483647),
`partition` INT,
`offset` BIGINT,
`timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE,
`timestamp_type` VARCHAR(2147483647),
`headers` MAP<VARCHAR(2147483647) NOT NULL, VARBINARY(2147483647)>,
`key` VARBINARY(2147483647),
`value` VARBINARY(2147483647)>
)
DISTRIBUTED INTO 6 BUCKETS
WITH (
'changelog.mode' = 'append',
'kafka.cleanup-policy' = 'delete',
'kafka.retention.time' = '7 d',
'value.format' = 'avro-registry'
);
Then configure your source table to use this DLQ table:
ALTER TABLE my_source_table SET (
'error-handling.mode' = 'log',
'error-handling.log.target' = 'my_source_table_error_log'
);
Option B: Create the topic and schema manually
If you need full control and want to create the DLQ topic and schema outside of Flink, follow these steps:
Create the Kafka topic with your desired configuration (partition count, retention, cleanup policy) using the Confluent Cloud Console, CLI, or API.
Register both a key schema and a value schema for the topic in Schema Registry. The subject names must follow the pattern
<topic_name>-keyand<topic_name>-value. See Schema definitions for manual registration for the required schemas.Configure your source table to use the pre-created DLQ:
ALTER TABLE my_source_table SET ( 'error-handling.mode' = 'log', 'error-handling.log.target' = 'my_source_table_error_log' );
DLQ table schema reference
The DLQ table uses the following schema. The error_timestamp field is stored in the Kafka message key; all other fields appear in the message value.
Key fields
Column | Type | Nullable | Description |
|---|---|---|---|
|
| NOT NULL | When the error occurred. |
Value fields
Column | Type | Nullable | Description |
|---|---|---|---|
|
| NOT NULL | Numeric error code. |
|
| NOT NULL | Error category or reason. |
|
| NOT NULL | Human-readable error message. |
|
| NOT NULL | Additional key-value error details. |
|
| NOT NULL | The processor that encountered the error. |
|
| Nullable | Name of the statement that triggered the error. |
|
| NOT NULL | Type of the affected resource. |
|
| Nullable | Catalog of the affected table. |
|
| Nullable | Database of the affected table. |
|
| Nullable | Name of the affected table. |
|
| Nullable | The original Kafka record that caused the error. Contains nested fields: |
Schema definitions for manual registration
When pre-creating a DLQ outside Flink, you must register both a key schema and a value schema in Schema Registry. The subject names must follow the pattern <topic_name>-key and <topic_name>-value.
Note
In the Avro schemas below, the name field uses error_log as a placeholder (matching the default DLQ table name). Replace error_log with your actual DLQ table name in the name and nested record name fields.
Key schema
{
"fields": [
{
"name": "error_timestamp",
"type": {
"logicalType": "timestamp-millis",
"type": "long"
}
}
],
"name": "error_log_key",
"namespace": "org.apache.flink.avro.generated.record",
"type": "record"
}
{
"additionalProperties": false,
"properties": {
"error_timestamp": {
"connect.index": 0,
"connect.type": "int64",
"title": "org.apache.kafka.connect.data.Timestamp",
"type": "number"
}
},
"required": ["error_timestamp"],
"title": "Record",
"type": "object"
}
syntax = "proto3";
package org.apache.flink.proto.generated.record;
import "google/protobuf/timestamp.proto";
message Record {
.google.protobuf.Timestamp error_timestamp = 1 [(confluent.field_meta) = {
params: [
{ key: "flink.version", value: "1" },
{ key: "flink.notNull", value: "true" },
{ key: "flink.precision", value: "3" }
]
}];
}
Value schema
{
"fields": [
{
"name": "error_code",
"type": "int"
},
{
"name": "error_reason",
"type": "string"
},
{
"name": "error_message",
"type": "string"
},
{
"name": "error_details",
"type": {
"type": "map",
"values": ["null", "string"]
}
},
{
"name": "processor",
"type": "string"
},
{
"default": null,
"name": "statement_name",
"type": ["null", "string"]
},
{
"name": "affected_type",
"type": "string"
},
{
"default": null,
"name": "affected_catalog",
"type": ["null", "string"]
},
{
"default": null,
"name": "affected_database",
"type": ["null", "string"]
},
{
"default": null,
"name": "affected_name",
"type": ["null", "string"]
},
{
"default": null,
"name": "source_record",
"type": [
"null",
{
"fields": [
{
"default": null,
"name": "topic",
"type": ["null", "string"]
},
{
"default": null,
"name": "partition",
"type": ["null", "int"]
},
{
"default": null,
"name": "offset",
"type": ["null", "long"]
},
{
"default": null,
"name": "timestamp",
"type": [
"null",
{
"logicalType": "timestamp-millis",
"type": "long"
}
]
},
{
"default": null,
"name": "timestamp_type",
"type": ["null", "string"]
},
{
"default": null,
"name": "headers",
"type": [
"null",
{
"type": "map",
"values": ["null", "bytes"]
}
]
},
{
"default": null,
"name": "key",
"type": ["null", "bytes"]
},
{
"default": null,
"name": "value",
"type": ["null", "bytes"]
}
],
"name": "error_log_value_source_record",
"type": "record"
}
]
}
],
"name": "error_log_value",
"namespace": "org.apache.flink.avro.generated.record",
"type": "record"
}
{
"additionalProperties": false,
"properties": {
"error_code": {
"connect.index": 0,
"connect.type": "int32",
"type": "number"
},
"error_reason": {
"connect.index": 1,
"type": "string"
},
"error_message": {
"connect.index": 2,
"type": "string"
},
"error_details": {
"additionalProperties": {
"oneOf": [
{ "type": "null" },
{ "type": "string" }
]
},
"connect.index": 3,
"connect.type": "map",
"type": "object"
},
"processor": {
"connect.index": 4,
"type": "string"
},
"statement_name": {
"connect.index": 5,
"oneOf": [
{ "type": "null" },
{ "type": "string" }
]
},
"affected_type": {
"connect.index": 6,
"type": "string"
},
"affected_catalog": {
"connect.index": 7,
"oneOf": [
{ "type": "null" },
{ "type": "string" }
]
},
"affected_database": {
"connect.index": 8,
"oneOf": [
{ "type": "null" },
{ "type": "string" }
]
},
"affected_name": {
"connect.index": 9,
"oneOf": [
{ "type": "null" },
{ "type": "string" }
]
},
"source_record": {
"connect.index": 10,
"oneOf": [
{ "type": "null" },
{
"additionalProperties": false,
"properties": {
"topic": {
"connect.index": 0,
"oneOf": [
{ "type": "null" },
{ "type": "string" }
]
},
"partition": {
"connect.index": 1,
"oneOf": [
{ "type": "null" },
{ "connect.type": "int32", "type": "number" }
]
},
"offset": {
"connect.index": 2,
"oneOf": [
{ "type": "null" },
{ "connect.type": "int64", "type": "number" }
]
},
"timestamp": {
"connect.index": 3,
"oneOf": [
{ "type": "null" },
{
"connect.type": "int64",
"title": "org.apache.kafka.connect.data.Timestamp",
"type": "number"
}
]
},
"timestamp_type": {
"connect.index": 4,
"oneOf": [
{ "type": "null" },
{ "type": "string" }
]
},
"headers": {
"connect.index": 5,
"oneOf": [
{ "type": "null" },
{
"additionalProperties": {
"oneOf": [
{ "type": "null" },
{ "connect.type": "bytes", "type": "string" }
]
},
"connect.type": "map",
"type": "object"
}
]
},
"key": {
"connect.index": 6,
"oneOf": [
{ "type": "null" },
{ "connect.type": "bytes", "type": "string" }
]
},
"value": {
"connect.index": 7,
"oneOf": [
{ "type": "null" },
{ "connect.type": "bytes", "type": "string" }
]
}
},
"title": "Record_source_record",
"type": "object"
}
]
}
},
"required": [
"error_code",
"error_reason",
"error_message",
"error_details",
"processor",
"affected_type"
],
"title": "Record",
"type": "object"
}
syntax = "proto3";
package org.apache.flink.proto.generated.record;
import "google/protobuf/timestamp.proto";
message Record {
int32 error_code = 1;
string error_reason = 2;
string error_message = 3;
repeated ErrorDetailsEntry error_details = 4;
string processor = 5;
optional string statement_name = 6;
string affected_type = 7;
optional string affected_catalog = 8;
optional string affected_database = 9;
optional string affected_name = 10;
optional source_record_Row source_record = 11;
message ErrorDetailsEntry {
string key = 1;
optional string value = 2;
}
message source_record_Row {
optional string topic = 1;
optional int32 partition = 2;
optional int64 offset = 3;
optional .google.protobuf.Timestamp timestamp = 4 [(confluent.field_meta) = {
params: [
{ key: "flink.precision", value: "3" },
{ key: "flink.version", value: "1" }
]
}];
optional string timestamp_type = 5;
headersRepeatedWrapper headers = 6 [(confluent.field_meta) = {
params: [
{ key: "flink.wrapped", value: "true" },
{ key: "flink.version", value: "1" }
]
}];
optional bytes key = 7;
optional bytes value = 8;
message headersRepeatedWrapper {
repeated ValueEntry value = 1;
message ValueEntry {
string key = 1;
optional bytes value = 2;
}
}
}
}
Inspect DLQ records
Query the DLQ table to view captured deserialization errors:
SELECT * FROM my_source_table_error_log;
Each record contains the error details and, when available, the original Kafka record (topic, partition, offset, key, and value) that caused the error.
Monitor source deserialization errors
Confluent Cloud for Apache Flink exposes a per-statement counter of source deserialization failures as the metric io.confluent.flink/num_records_in_errors. The counter increments once per record that fails source deserialization, independent of error-handling.mode — the mode only controls what happens to the failed record itself:
log(DLQ mode): the record is also written to the DLQ target table.ignore: the record is dropped silently.fail: the statement fails after the counter advances.
Use this metric to alert on the rate of source deserialization errors against your own baseline, regardless of which error-handling mode the source table uses.
The Confluent Cloud Metrics API exposes this counter alongside other Flink metrics. Use the table_name label to break the count down per source table. For details on querying or exporting it to third-party services such as Prometheus, Grafana, or Datadog, see Flink metrics integrations. For the full list of Flink metrics, see the Metrics API Reference.
This counter increments only for source deserialization errors; errors in UDFs, serialization, or windowed aggregations are not counted (the same limitation that applies to the DLQ target table itself).
Suggested alert
Alert when the share of failed records over a recent window exceeds a small fraction of total ingested records. The threshold should be tuned to your own baseline; for example, the following PromQL expression fires when more than 5% of ingested records have failed deserialization over a 15-minute window:
increase(confluent_flink_num_records_in_errors[15m])
/ increase(confluent_flink_num_records_in[15m]) > 0.05
Compare against io.confluent.flink/num_records_in (the total input record count for the same source table) so the alert tracks relative degradation rather than the absolute error count, which naturally scales with throughput. The same expression works in Grafana and Datadog with the corresponding provider-specific metric names exported from the Metrics API.
Limitations and known behaviors
DLQ error handling applies only to deserialization errors at the source. Errors in UDFs, serialization, or windowed aggregations are not routed to the DLQ. For UDF error handling, see Error handling best practices.
You configure error handling at the table level, not per statement. All statements reading from a table with
error-handling.modeset tologshare the same DLQ configuration.Each source table can have one DLQ target at a time. You set DLQ configuration at the source table level, not per job or per consumer.
Best practices
Use a dedicated service account for DLQ setup when consuming from shared or team-owned source topics. Because configuring a DLQ requires
DeveloperManagepermissions on the source topic, consider using a platform or admin service account to run theALTER TABLEstatement. This avoids granting elevated permissions on source topics to individual consumer service accounts.