001package de.saly.kafka.crypto;
002
003import java.util.Map;
004import java.util.concurrent.atomic.AtomicInteger;
005
006import javax.crypto.Cipher;
007
008import org.apache.kafka.common.serialization.Serializer;
009
010/**
011 * 
012 * This is a serialization wrapper which adds message encryption. Its intended to be used together with {@link DecryptingDeserializer} 
013 * <p>
014 * Configuration<p>
015 * <ul>
016 * <li><em>crypto.rsa.publickey.filepath</em> path on the local filesystem which hold the RSA public key (X.509 format) of the consumer
017 * <li><em>crypto.wrapped_serializer</em> is the class or full qualified class name or the wrapped serializer
018 * <li><em>crypto.hash_method</em> Type of hash generated for the AES key (optional, default is "SHA-256")
019 * <li><em>crypto.new_key_msg_interval</em> Generate new AES every n messages (default is -1, that means never generate a new key)
020 * </ul>
021 * <p>
022 * Each message is encrypted with "AES/CBC/PKCS5Padding" before its sent to Kafka. The AES key as well as the initialization vector are random.
023 * The AES key is attached to the message in a RSA encrypted manner. The IV is also attached but not RSA encrypted. There is also a hash value
024 * of the AES key to allow consumers caching of decrypted AES keys. Finally we have a few magic and header bytes.
025 * The resulting byte array looks therefore like this:
026 * <p>
027 * <pre>MMLLLHH..HHEEEE..EEEEIIII..IIIOOOOO....OOOOOO</pre>
028 * <p>
029 * <ul>
030 * <li> MM: Two magic bytes 0xDF 0xBB to detect if this byte sequence is encrypted or not
031 * <li> LLL: Three bytes indicating the length of the AES key hash, the RSA encrypted AES key and the IV
032 * <li> HH..HH: AES key hash
033 * <li> EE..EE: RSA encrypted AES key
034 * <li> II..II: Initialization vector (if any)
035 * <li> OO..OO: The AES encrypted original message
036 * </ul>
037 * <p>
038 * <em>MMLLL</em> is called the encryption header and consists of 5 bytes.
039 * <p>
040 * <ul>
041 * <li> M1: 0xDF
042 * <li> M2: 0xBB
043 * <li> L1: length of the AES key hash
044 * <li> L2: RSA factor f so that f*128*8 evaluates to the RSA keysize (in bits)
045 * <li> L3: length of the initialization vector in bytes (always 16 for AES CBC)
046 * </ul>
047 * <p>
048 * RSA public/private keypair can be generated with<br>
049 * <em>java -cp kafka-end-2-end-encryption-1.0.0.jar de.saly.kafka.crypto.RsaKeyGen 2048</em>
050 * <p>
051 * <b>Note</b>: As Producers are multithreading-safe this serializer is also thread-safe
052 * <p>
053 * 
054 * @param <T> The type to be serialized from (applied to the wrapped serializer)
055 */
056public class EncryptingSerializer<T> extends SerdeCryptoBase implements Serializer<T> {
057
058    public static final String CRYPTO_VALUE_SERIALIZER = "crypto.wrapped_serializer";
059    public static final String CRYPTO_NEW_KEY_MSG_INTERVAL = "crypto.new_key_msg_interval";
060    public int msgInterval = -1;
061    private Serializer<T> inner;
062    private final AtomicInteger msg = new AtomicInteger();
063
064    @SuppressWarnings("unchecked")
065    @Override
066    public void configure(Map<String, ?> configs, boolean isKey) {
067        inner = newInstance(configs, CRYPTO_VALUE_SERIALIZER, Serializer.class);
068        inner.configure(configs, isKey);
069        init(Cipher.ENCRYPT_MODE, configs, isKey);
070        String msgIntervalProperty = (String) configs.get(CRYPTO_NEW_KEY_MSG_INTERVAL);
071        if (msgIntervalProperty != null && msgIntervalProperty.length() > 0) {
072            msgInterval = Integer.parseInt(msgIntervalProperty);
073            if (msgInterval < 1) {
074                msgInterval = -1;
075            }
076        }
077    }
078
079    @Override
080    public byte[] serialize(String topic, T data) {
081        if (msgInterval > 0 && msg.compareAndSet(msgInterval, 0)) {
082            newKey();
083        } else if (msgInterval > 0) {
084            msg.incrementAndGet();
085        }
086        return crypt(inner.serialize(topic, data));
087    }
088
089    @Override
090    public void close() {
091        if (inner != null) {
092            inner.close();
093        }
094    }
095    
096    /**
097     * Generate new AES key for the current thread
098     */
099    public void newKey() {
100        super.newKey();
101    }
102}