Perform chaos testing
This feature is available with Conduktor Shield only.
Chaos testing Interceptors in Conduktor are like stress tests for your data systems.
Performing chaos testing allows you to see how well your applications can handle problems like slow data delivery, corrupted messages or unexpected duplicates. This helps ensure that your systems are strong and can keep running smoothly even when things go wrong.
Simulate broken brokers
This Interceptor injects intermittent errors in client connections to brokers that are consistent with broker side issues.
This only works on Produce and Fetch requests.
Configuration
key | type | default | description |
---|---|---|---|
rateInPercent | int | The percentage of requests that will result in a broker not available response | |
errorMap | Map | {"FETCH": "UNKNOWN_SERVER_ERROR", "PRODUCE": "CORRUPT_MESSAGE"} | Map of ApiKeys and Error you want to response |
Possible errors for API Key
- OFFSET_OUT_OF_RANGE
- TOPIC_AUTHORIZATION_FAILED
- REPLICA_NOT_AVAILABLE
- NOT_LEADER_OR_FOLLOWER
- FENCED_LEADER_EPOCH
- UNKNOWN_LEADER_EPOCH
- UNKNOWN_TOPIC_OR_PARTITION
- KAFKA_STORAGE_ERROR
- UNSUPPORTED_COMPRESSION_TYPE
- CORRUPT_MESSAGE
- UNKNOWN_TOPIC_ID
- FETCH_SESSION_TOPIC_ID_ERROR,
- INCONSISTENT_TOPIC_ID,
- UNKNOWN_SERVER_ERROR
- CORRUPT_MESSAGE,
- UNKNOWN_TOPIC_OR_PARTITION,
- NOT_LEADER_OR_FOLLOWER,
- INVALID_TOPIC_EXCEPTION,
- RECORD_LIST_TOO_LARGE,
- NOT_ENOUGH_REPLICAS,
- NOT_ENOUGH_REPLICAS_AFTER_APPEND,
- INVALID_REQUIRED_ACKS,
- TOPIC_AUTHORIZATION_FAILED,
- UNSUPPORTED_FOR_MESSAGE_FORMAT,
- INVALID_PRODUCER_EPOCH,
- CLUSTER_AUTHORIZATION_FAILED,
- TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
- INVALID_RECORD
Example
{
"name": "myBrokenBrokerChaosInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.SimulateBrokenBrokersPlugin",
"priority": 100,
"config": {
"rateInPercent": 100,
"errorMap": {
"FETCH": "UNKNOWN_SERVER_ERROR",
"PRODUCE": "CORRUPT_MESSAGE"
}
}
}
Simulate duplicate messages
Duplicate Messages will duplicate records when the client produces/consumes the records to/from kafka.
This interceptor is useful for testing applications to ensure that they behave appropriately when there are duplicate records received from Kafka.
Note: By default, duplicate messages causes chaos on fetch, therefore this plugin only duplicates the records returned to the client, the records on the broker are not duplicated
For example, you could have a message that says "Add £10 to a bank account, Unique Message Id is 12345".
That message is duplicated. The unique id is the same in both.
The client application needs to be validated to ensure that it only receives £10 once.
Configuration
key | type | default | description |
---|---|---|---|
topic | String | .* | Topics that match this regex will have the interceptor applied. |
rateInPercent | int | 100 | The percentage of records that will be duplicated. |
target | enum | CONSUME | Record is duplicated when the client produces or consumes the record, values can be PRODUCE or CONSUME |
Example
{
"name": "myDuplicateRecordsInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.DuplicateMessagesPlugin",
"priority": 100,
"config": {
"topic": "client_topic_.*",
"rateInPercent": 100,
"target": "PRODUCE"
}
}
Simulate invalid schema ID
Simulate invalid schema id will overwrite an invalid schemaId value to the records.
Because schemaId is overwritten with an invalid value, the following error is returned when consuming records:
Processed a total of 1 messages
[2022-11-17 15:59:13,184] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Error retrieving JSON schema for id 999
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:182)
at io.confluent.kafka.formatter.json.JsonSchemaMessageFormatter$JsonSchemaMessageDeserializer.deserialize(JsonSchemaMessageFormatter.java:130)
at io.confluent.kafka.formatter.json.JsonSchemaMessageFormatter$JsonSchemaMessageDeserializer.deserialize(JsonSchemaMessageFormatter.java:103)
at io.confluent.kafka.formatter.json.JsonSchemaMessageFormatter.writeTo(JsonSchemaMessageFormatter.java:94)
at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:181)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:116)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:76)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema 999 not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:301)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:371)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:840)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:813)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:294)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:417)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:119)
... 8 more
See more about schema registry and schema-id here
Configuration
key | type | default | description |
---|---|---|---|
topic | String | .* | Topics that match this regex will have the interceptor applied |
invalidSchemaId | integer | Invalid schema id, if not passed the value will be random | |
target | enum | CONSUME | SchemaId is overwritten with an invalid value in the record when the client produces or consumes the record, values can be PRODUCE or CONSUME |
Example
{
"name": "mySimulateInvalidSchemaIdInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.SimulateInvalidSchemaIdPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"invalidSchemaId": 9999,
"target": "PRODUCE"
}
}
Simulate latency on all interactions
This interceptor adds latency to a percentage of requests and responses flowing between your Kafka applications and your Kafka cluster.
This interceptor is useful for testing applications to ensure that they behave appropriately when there are network delays talking to Kafka, or the Kafka broker is for some reason responding slowly.
Configuration
key | type | description |
---|---|---|
appliedPercentage | int | The percentage of requests flowing through the gateway that will have increased latency applied for them. For example, an applied percentage of 10 will add a latency of the value of latencyMs to 10% of requests and responses. The value must be between 0 and 10. |
latencyMs | long | The number of milliseconds to add to the request. The latency in milliseconds that will be applied to the requests and responses flowing through the gateway. The value must be between 0 and (don't mind... max int, or 10 seconds, or something else)? |
Example
{
"name": "mySimulateLatencyInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.SimulateLatencyPlugin",
"priority": 100,
"config": {
"appliedPercentage": 100,
"latencyMs": 1000
}
}
Simulate leader election errors
This interceptor is useful for testing applications to ensure they can survive leader election that happen:
- When the leader dies, and another one needs to take over
- When we do rolling upgrades
By sending:
- LEADER_NOT_AVAILABLE
- NOT_LEADER_OR_FOLLOWER
- BROKER_NOT_AVAILABLE
Configuration
key | type | description |
---|---|---|
rateInPercent | int | The percentage of requests that will result in a leader or broker not available response |
Example
{
"name": "mySimulateLeaderElectionsErrorsPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.SimulateLeaderElectionsErrorsPlugin",
"priority": 100,
"config": {
"rateInPercent": 100
}
}
Simulate message corruption
From time to time, messages will arrive that are not in the expected format.
This interceptor adds a random bytes to the end of the data in records produced to Kafka.
Configuration
key | type | default | description |
---|---|---|---|
topic | String | .* | Regular expression that matches topics from your produce request. |
sizeInBytes | int | 10 | Number of random content bytes to append to the message data. |
rateInPercent | int | 100 | percentage of records that will have random bytes appended. |
You can simulate corruption when:
- sending data:
io.conduktor.gateway.interceptor.chaos.ProduceSimulateMessageCorruptionPlugin
- *reading data:
io.conduktor.gateway.interceptor.chaos.FetchSimulateMessageCorruptionPlugin
Example
{
"name": "mySimulateMessageCorruptionInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.FetchSimulateMessageCorruptionPlugin",
"priority": 100,
"config": {
"topic": "client_topic_.*",
"sizeInBytes": 100,
"rateInPercent": 100
}
}
Simulate slow brokers
This Interceptor slows responses from the brokers. This only works on Produce and Fetch requests.
Configuration
key | type | description |
---|---|---|
rateInPercent | int | The percentage of requests that will have the interceptor applied |
minLatencyMs | int | Minimum for the random response latency in milliseconds |
maxLatencyMs | int | Maximum for the random response latency in milliseconds |
Example
{
"name": "mySimulateSlowBrokerInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.SimulateSlowBrokerPlugin",
"priority": 100,
"config": {
"rateInPercent": 100,
"minLatencyMs": 50,
"maxLatencyMs": 1200
}
}
Simulate slow producers and consumers
This Interceptor slows responses from the brokers. It will operate only on a set of topics rather than all traffic.
This Interceptor only works on Produce and Fetch requests.
Configuration
key | type | default | description |
---|---|---|---|
topic | String | .* | Topics that match this regex will have the interceptor applied. |
rateInPercent | int | The percentage of requests that will apply this interceptor | |
minLatencyMs | int | Minimum for the random response latency in milliseconds | |
maxLatencyMs | int | Maximum for the random response latency in milliseconds |
Example
{
"name": "mySimulateSlowProducersConsumersInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.chaos.SimulateSlowProducersConsumersPlugin",
"priority": 100,
"config": {
"rateInPercent": 100,
"minLatencyMs": 50,
"maxLatencyMs": 1200
}
}