001package de.saly.kafka.crypto;
002
003import java.util.Map;
004
005import javax.crypto.Cipher;
006
007import org.apache.kafka.common.serialization.Deserializer;
008
009/**
010 * This is a deserialization (for the Consumer) wrapper which adds transparent end-to-end message encryption. 
011 * Its intended to be used together with {@link EncryptingSerializer}
012 * <p>
013 * Configuration<p>
014 * <ul>
015 * <li><em>crypto.rsa.privatekey.filepath</em> path on the local filesystem which hold the RSA private key (PKCS#8 format) of the consumer
016 * <li><em>crypto.wrapped_deserializer</em> is the class or full qualified class name or the wrapped deserializer
017 * <li><em>crypto.ignore_decrypt_failures</em> Skip message decryption on error and just pass the byte[] unencrypted (optional, default is "false"). Possible values are "true" or "false".
018 * </ul>
019 * <p>
020 * See {@link EncryptingSerializer} on how encryption works.
021 * <p>
022 * This class will auto detect if an incoming message is encrypted. If not then no decryption attempt is made and message gets handled normally.
023 * <p>
024 * <b>Note</b>: As Consumers are not multithreading-safe this deserializer is also not thread-safe
025 * <p>
026 * @param <T> The type to be deserialized from (applied to the wrapped deserializer)
027 */
028public class DecryptingDeserializer<T> extends SerdeCryptoBase implements Deserializer<T> {
029
030    public static final String CRYPTO_VALUE_DESERIALIZER = "crypto.wrapped_deserializer";
031    private Deserializer<T> inner;
032
033    @SuppressWarnings("unchecked")
034    @Override
035    public void configure(Map<String, ?> configs, boolean isKey) {
036        inner = newInstance(configs, CRYPTO_VALUE_DESERIALIZER, Deserializer.class);
037        inner.configure(configs, isKey);
038        init(Cipher.DECRYPT_MODE, configs, isKey);
039    }
040
041    @Override
042    public T deserialize(String topic, byte[] data) {
043        return inner.deserialize(topic, crypt(data));
044    }
045
046    @Override
047    public void close() {
048        if (inner != null) {
049            inner.close();
050        }
051    }
052}