1 package de.saly.kafka.crypto;
2
3 import java.util.Map;
4 import java.util.concurrent.atomic.AtomicInteger;
5
6 import javax.crypto.Cipher;
7
8 import org.apache.kafka.common.serialization.Serializer;
9
10 /**
11 *
12 * This is a serialization wrapper which adds message encryption. Its intended to be used together with {@link DecryptingDeserializer}
13 * <p>
14 * Configuration<p>
15 * <ul>
16 * <li><em>crypto.rsa.publickey.filepath</em> path on the local filesystem which hold the RSA public key (X.509 format) of the consumer
17 * <li><em>crypto.wrapped_serializer</em> is the class or full qualified class name or the wrapped serializer
18 * <li><em>crypto.hash_method</em> Type of hash generated for the AES key (optional, default is "SHA-256")
19 * <li><em>crypto.new_key_msg_interval</em> Generate new AES every n messages (default is -1, that means never generate a new key)
20 * </ul>
21 * <p>
22 * Each message is encrypted with "AES/CBC/PKCS5Padding" before its sent to Kafka. The AES key as well as the initialization vector are random.
23 * The AES key is attached to the message in a RSA encrypted manner. The IV is also attached but not RSA encrypted. There is also a hash value
24 * of the AES key to allow consumers caching of decrypted AES keys. Finally we have a few magic and header bytes.
25 * The resulting byte array looks therefore like this:
26 * <p>
27 * <pre>MMLLLHH..HHEEEE..EEEEIIII..IIIOOOOO....OOOOOO</pre>
28 * <p>
29 * <ul>
30 * <li> MM: Two magic bytes 0xDF 0xBB to detect if this byte sequence is encrypted or not
31 * <li> LLL: Three bytes indicating the length of the AES key hash, the RSA encrypted AES key and the IV
32 * <li> HH..HH: AES key hash
33 * <li> EE..EE: RSA encrypted AES key
34 * <li> II..II: Initialization vector (if any)
35 * <li> OO..OO: The AES encrypted original message
36 * </ul>
37 * <p>
38 * <em>MMLLL</em> is called the encryption header and consists of 5 bytes.
39 * <p>
40 * <ul>
41 * <li> M1: 0xDF
42 * <li> M2: 0xBB
43 * <li> L1: length of the AES key hash
44 * <li> L2: RSA factor f so that f*128*8 evaluates to the RSA keysize (in bits)
45 * <li> L3: length of the initialization vector in bytes (always 16 for AES CBC)
46 * </ul>
47 * <p>
48 * RSA public/private keypair can be generated with<br>
49 * <em>java -cp kafka-end-2-end-encryption-1.0.0.jar de.saly.kafka.crypto.RsaKeyGen 2048</em>
50 * <p>
51 * <b>Note</b>: As Producers are multithreading-safe this serializer is also thread-safe
52 * <p>
53 *
54 * @param <T> The type to be serialized from (applied to the wrapped serializer)
55 */
56 public class EncryptingSerializer<T> extends SerdeCryptoBase implements Serializer<T> {
57
58 public static final String CRYPTO_VALUE_SERIALIZER = "crypto.wrapped_serializer";
59 public static final String CRYPTO_NEW_KEY_MSG_INTERVAL = "crypto.new_key_msg_interval";
60 public int msgInterval = -1;
61 private Serializer<T> inner;
62 private final AtomicInteger msg = new AtomicInteger();
63
64 @SuppressWarnings("unchecked")
65 @Override
66 public void configure(Map<String, ?> configs, boolean isKey) {
67 inner = newInstance(configs, CRYPTO_VALUE_SERIALIZER, Serializer.class);
68 inner.configure(configs, isKey);
69 init(Cipher.ENCRYPT_MODE, configs, isKey);
70 String msgIntervalProperty = (String) configs.get(CRYPTO_NEW_KEY_MSG_INTERVAL);
71 if (msgIntervalProperty != null && msgIntervalProperty.length() > 0) {
72 msgInterval = Integer.parseInt(msgIntervalProperty);
73 if (msgInterval < 1) {
74 msgInterval = -1;
75 }
76 }
77 }
78
79 @Override
80 public byte[] serialize(String topic, T data) {
81 if (msgInterval > 0 && msg.compareAndSet(msgInterval, 0)) {
82 newKey();
83 } else if (msgInterval > 0) {
84 msg.incrementAndGet();
85 }
86 return crypt(inner.serialize(topic, data));
87 }
88
89 @Override
90 public void close() {
91 if (inner != null) {
92 inner.close();
93 }
94 }
95
96 /**
97 * Generate new AES key for the current thread
98 */
99 public void newKey() {
100 super.newKey();
101 }
102 }