For a kafka stream to be resilient and reliable it is important to handle failures gracefully. Occurrence of failures can halt the stream and can cause disruption in the service. This article talks about exceptions that can occurr in a kafka stream and how to handle them.

Exceptions within the code

Exceptions which are thrown by the processor code such as arithmetic exceptions, parsing exceptions, or timeout exception on database call. These can be handled by simply putting a try-catch block around the piece of code which throws the exception.

1
2
3
4
5
6
try {  
    user = db.getUser(id) 
}  
catch (ReadTimeoutException e){  
  //handle, retry, notify..
}

Deserialization exception

The data may have different schema then what the stream consumer expects. This throws a deserialization exception when consumed by the consumer. To handle such failures kafka provides DeserializationExceptionHandler. As these failures occur due to data inconsistency.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public class CustomDeserializationExceptionHandler implements DeserializationExceptionHandler {  
  
  @Override  
  public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record, 
  									Exception exception) {  
        //logic to deal with the corrupt record  
  	return DeserializationHandlerResponse.CONTINUE;  
    }  
  
  @Override  
  public void configure(Map<String, ?> map) {  
        //can read any property set in the stream here  
  }  
}

// in the streams config
streamConfig.put("default.deserialization.exception.handler", CustomDeserializationExceptionHandler.class);

Production exception

Any exception that occur during kafka broker and client interaction is a production exception. An example is RecordTooLargeException. If the kafka stream writes a record to a topic but the record size exceeds the largest message size allowed by the broker(max.message.bytes), it will throw a RecordTooLargeException. Such exceptions can be handled by ProductionExceptionHandler.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CustomProductionExceptionHandler implements ProductionExceptionHandler {  
  @Override  
  public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record,
  								 Exception exception) {  
          
        if (exception instanceof RecordTooLargeException){  
            // code to deal with large record..  
 		 return ProductionExceptionHandlerResponse.CONTINUE;  
        }  
          
        return ProductionExceptionHandlerResponse.FAIL;  
    }  
  
  @Override  
  public void configure(Map<String, ?> map) {  
  
    }  
}

//streams config
streamConfig.put("default.production.exception.handler", CustomProductionExceptionHandler.class);

Handling the uncaught exceptions

In case any uncaught exception occurs in a stream thread and the thread abruptly terminates. The setUncaughtExceptionHandler method provides a way to trigger a code/logic (like notify) on termination.

1
2
3
4
kafkaStreams.setUncaughtExceptionHandler((thread,exception) ->{
    //code to be triggered
});



References: