001package de.saly.kafka.crypto; 002 003import java.util.Map; 004import java.util.concurrent.atomic.AtomicInteger; 005 006import javax.crypto.Cipher; 007 008import org.apache.kafka.common.serialization.Serializer; 009 010/** 011 * 012 * This is a serialization wrapper which adds message encryption. Its intended to be used together with {@link DecryptingDeserializer} 013 * <p> 014 * Configuration<p> 015 * <ul> 016 * <li><em>crypto.rsa.publickey.filepath</em> path on the local filesystem which hold the RSA public key (X.509 format) of the consumer 017 * <li><em>crypto.wrapped_serializer</em> is the class or full qualified class name or the wrapped serializer 018 * <li><em>crypto.hash_method</em> Type of hash generated for the AES key (optional, default is "SHA-256") 019 * <li><em>crypto.new_key_msg_interval</em> Generate new AES every n messages (default is -1, that means never generate a new key) 020 * </ul> 021 * <p> 022 * Each message is encrypted with "AES/CBC/PKCS5Padding" before its sent to Kafka. The AES key as well as the initialization vector are random. 023 * The AES key is attached to the message in a RSA encrypted manner. The IV is also attached but not RSA encrypted. There is also a hash value 024 * of the AES key to allow consumers caching of decrypted AES keys. Finally we have a few magic and header bytes. 025 * The resulting byte array looks therefore like this: 026 * <p> 027 * <pre>MMLLLHH..HHEEEE..EEEEIIII..IIIOOOOO....OOOOOO</pre> 028 * <p> 029 * <ul> 030 * <li> MM: Two magic bytes 0xDF 0xBB to detect if this byte sequence is encrypted or not 031 * <li> LLL: Three bytes indicating the length of the AES key hash, the RSA encrypted AES key and the IV 032 * <li> HH..HH: AES key hash 033 * <li> EE..EE: RSA encrypted AES key 034 * <li> II..II: Initialization vector (if any) 035 * <li> OO..OO: The AES encrypted original message 036 * </ul> 037 * <p> 038 * <em>MMLLL</em> is called the encryption header and consists of 5 bytes. 039 * <p> 040 * <ul> 041 * <li> M1: 0xDF 042 * <li> M2: 0xBB 043 * <li> L1: length of the AES key hash 044 * <li> L2: RSA factor f so that f*128*8 evaluates to the RSA keysize (in bits) 045 * <li> L3: length of the initialization vector in bytes (always 16 for AES CBC) 046 * </ul> 047 * <p> 048 * RSA public/private keypair can be generated with<br> 049 * <em>java -cp kafka-end-2-end-encryption-1.0.0.jar de.saly.kafka.crypto.RsaKeyGen 2048</em> 050 * <p> 051 * <b>Note</b>: As Producers are multithreading-safe this serializer is also thread-safe 052 * <p> 053 * 054 * @param <T> The type to be serialized from (applied to the wrapped serializer) 055 */ 056public class EncryptingSerializer<T> extends SerdeCryptoBase implements Serializer<T> { 057 058 public static final String CRYPTO_VALUE_SERIALIZER = "crypto.wrapped_serializer"; 059 public static final String CRYPTO_NEW_KEY_MSG_INTERVAL = "crypto.new_key_msg_interval"; 060 public int msgInterval = -1; 061 private Serializer<T> inner; 062 private final AtomicInteger msg = new AtomicInteger(); 063 064 @SuppressWarnings("unchecked") 065 @Override 066 public void configure(Map<String, ?> configs, boolean isKey) { 067 inner = newInstance(configs, CRYPTO_VALUE_SERIALIZER, Serializer.class); 068 inner.configure(configs, isKey); 069 init(Cipher.ENCRYPT_MODE, configs, isKey); 070 String msgIntervalProperty = (String) configs.get(CRYPTO_NEW_KEY_MSG_INTERVAL); 071 if (msgIntervalProperty != null && msgIntervalProperty.length() > 0) { 072 msgInterval = Integer.parseInt(msgIntervalProperty); 073 if (msgInterval < 1) { 074 msgInterval = -1; 075 } 076 } 077 } 078 079 @Override 080 public byte[] serialize(String topic, T data) { 081 if (msgInterval > 0 && msg.compareAndSet(msgInterval, 0)) { 082 newKey(); 083 } else if (msgInterval > 0) { 084 msg.incrementAndGet(); 085 } 086 return crypt(inner.serialize(topic, data)); 087 } 088 089 @Override 090 public void close() { 091 if (inner != null) { 092 inner.close(); 093 } 094 } 095 096 /** 097 * Generate new AES key for the current thread 098 */ 099 public void newKey() { 100 super.newKey(); 101 } 102}