001package de.saly.kafka.crypto; 002 003import java.util.Map; 004 005import javax.crypto.Cipher; 006 007import org.apache.kafka.common.serialization.Deserializer; 008 009/** 010 * This is a deserialization (for the Consumer) wrapper which adds transparent end-to-end message encryption. 011 * Its intended to be used together with {@link EncryptingSerializer} 012 * <p> 013 * Configuration<p> 014 * <ul> 015 * <li><em>crypto.rsa.privatekey.filepath</em> path on the local filesystem which hold the RSA private key (PKCS#8 format) of the consumer 016 * <li><em>crypto.wrapped_deserializer</em> is the class or full qualified class name or the wrapped deserializer 017 * <li><em>crypto.ignore_decrypt_failures</em> Skip message decryption on error and just pass the byte[] unencrypted (optional, default is "false"). Possible values are "true" or "false". 018 * </ul> 019 * <p> 020 * See {@link EncryptingSerializer} on how encryption works. 021 * <p> 022 * This class will auto detect if an incoming message is encrypted. If not then no decryption attempt is made and message gets handled normally. 023 * <p> 024 * <b>Note</b>: As Consumers are not multithreading-safe this deserializer is also not thread-safe 025 * <p> 026 * @param <T> The type to be deserialized from (applied to the wrapped deserializer) 027 */ 028public class DecryptingDeserializer<T> extends SerdeCryptoBase implements Deserializer<T> { 029 030 public static final String CRYPTO_VALUE_DESERIALIZER = "crypto.wrapped_deserializer"; 031 private Deserializer<T> inner; 032 033 @SuppressWarnings("unchecked") 034 @Override 035 public void configure(Map<String, ?> configs, boolean isKey) { 036 inner = newInstance(configs, CRYPTO_VALUE_DESERIALIZER, Deserializer.class); 037 inner.configure(configs, isKey); 038 init(Cipher.DECRYPT_MODE, configs, isKey); 039 } 040 041 @Override 042 public T deserialize(String topic, byte[] data) { 043 return inner.deserialize(topic, crypt(data)); 044 } 045 046 @Override 047 public void close() { 048 if (inner != null) { 049 inner.close(); 050 } 051 } 052}