View Javadoc
1   package de.saly.kafka.crypto;
2   
3   import java.util.Map;
4   import java.util.concurrent.atomic.AtomicInteger;
5   
6   import javax.crypto.Cipher;
7   
8   import org.apache.kafka.common.serialization.Serializer;
9   
10  /**
11   * 
12   * This is a serialization wrapper which adds message encryption. Its intended to be used together with {@link DecryptingDeserializer} 
13   * <p>
14   * Configuration<p>
15   * <ul>
16   * <li><em>crypto.rsa.publickey.filepath</em> path on the local filesystem which hold the RSA public key (X.509 format) of the consumer
17   * <li><em>crypto.wrapped_serializer</em> is the class or full qualified class name or the wrapped serializer
18   * <li><em>crypto.hash_method</em> Type of hash generated for the AES key (optional, default is "SHA-256")
19   * <li><em>crypto.new_key_msg_interval</em> Generate new AES every n messages (default is -1, that means never generate a new key)
20   * </ul>
21   * <p>
22   * Each message is encrypted with "AES/CBC/PKCS5Padding" before its sent to Kafka. The AES key as well as the initialization vector are random.
23   * The AES key is attached to the message in a RSA encrypted manner. The IV is also attached but not RSA encrypted. There is also a hash value
24   * of the AES key to allow consumers caching of decrypted AES keys. Finally we have a few magic and header bytes.
25   * The resulting byte array looks therefore like this:
26   * <p>
27   * <pre>MMLLLHH..HHEEEE..EEEEIIII..IIIOOOOO....OOOOOO</pre>
28   * <p>
29   * <ul>
30   * <li> MM: Two magic bytes 0xDF 0xBB to detect if this byte sequence is encrypted or not
31   * <li> LLL: Three bytes indicating the length of the AES key hash, the RSA encrypted AES key and the IV
32   * <li> HH..HH: AES key hash
33   * <li> EE..EE: RSA encrypted AES key
34   * <li> II..II: Initialization vector (if any)
35   * <li> OO..OO: The AES encrypted original message
36   * </ul>
37   * <p>
38   * <em>MMLLL</em> is called the encryption header and consists of 5 bytes.
39   * <p>
40   * <ul>
41   * <li> M1: 0xDF
42   * <li> M2: 0xBB
43   * <li> L1: length of the AES key hash
44   * <li> L2: RSA factor f so that f*128*8 evaluates to the RSA keysize (in bits)
45   * <li> L3: length of the initialization vector in bytes (always 16 for AES CBC)
46   * </ul>
47   * <p>
48   * RSA public/private keypair can be generated with<br>
49   * <em>java -cp kafka-end-2-end-encryption-1.0.0.jar de.saly.kafka.crypto.RsaKeyGen 2048</em>
50   * <p>
51   * <b>Note</b>: As Producers are multithreading-safe this serializer is also thread-safe
52   * <p>
53   * 
54   * @param <T> The type to be serialized from (applied to the wrapped serializer)
55   */
56  public class EncryptingSerializer<T> extends SerdeCryptoBase implements Serializer<T> {
57  
58      public static final String CRYPTO_VALUE_SERIALIZER = "crypto.wrapped_serializer";
59      public static final String CRYPTO_NEW_KEY_MSG_INTERVAL = "crypto.new_key_msg_interval";
60      public int msgInterval = -1;
61      private Serializer<T> inner;
62      private final AtomicInteger msg = new AtomicInteger();
63  
64      @SuppressWarnings("unchecked")
65      @Override
66      public void configure(Map<String, ?> configs, boolean isKey) {
67          inner = newInstance(configs, CRYPTO_VALUE_SERIALIZER, Serializer.class);
68          inner.configure(configs, isKey);
69          init(Cipher.ENCRYPT_MODE, configs, isKey);
70          String msgIntervalProperty = (String) configs.get(CRYPTO_NEW_KEY_MSG_INTERVAL);
71          if (msgIntervalProperty != null && msgIntervalProperty.length() > 0) {
72              msgInterval = Integer.parseInt(msgIntervalProperty);
73              if (msgInterval < 1) {
74                  msgInterval = -1;
75              }
76          }
77      }
78  
79      @Override
80      public byte[] serialize(String topic, T data) {
81          if (msgInterval > 0 && msg.compareAndSet(msgInterval, 0)) {
82              newKey();
83          } else if (msgInterval > 0) {
84              msg.incrementAndGet();
85          }
86          return crypt(inner.serialize(topic, data));
87      }
88  
89      @Override
90      public void close() {
91          if (inner != null) {
92              inner.close();
93          }
94      }
95      
96      /**
97       * Generate new AES key for the current thread
98       */
99      public void newKey() {
100         super.newKey();
101     }
102 }