T - The type to be serialized from (applied to the wrapped serializer)public class EncryptingSerializer<T> extends SerdeCryptoBase implements org.apache.kafka.common.serialization.Serializer<T>
DecryptingDeserializer
Configuration
Each message is encrypted with "AES/CBC/PKCS5Padding" before its sent to Kafka. The AES key as well as the initialization vector are random. The AES key is attached to the message in a RSA encrypted manner. The IV is also attached but not RSA encrypted. There is also a hash value of the AES key to allow consumers caching of decrypted AES keys. Finally we have a few magic and header bytes. The resulting byte array looks therefore like this:
MMLLLHH..HHEEEE..EEEEIIII..IIIOOOOO....OOOOOO
MMLLL is called the encryption header and consists of 5 bytes.
RSA public/private keypair can be generated with
java -cp kafka-end-2-end-encryption-1.0.0.jar de.saly.kafka.crypto.RsaKeyGen 2048
Note: As Producers are multithreading-safe this serializer is also thread-safe
| Modifier and Type | Field and Description |
|---|---|
static String |
CRYPTO_NEW_KEY_MSG_INTERVAL |
static String |
CRYPTO_VALUE_SERIALIZER |
int |
msgInterval |
CRYPTO_AES_KEY_LEN, CRYPTO_HASH_METHOD, CRYPTO_IGNORE_DECRYPT_FAILURES, CRYPTO_RSA_PRIVATEKEY_FILEPATH, CRYPTO_RSA_PUBLICKEY_FILEPATH, DEFAULT_TRANSFORMATION| Constructor and Description |
|---|
EncryptingSerializer() |
| Modifier and Type | Method and Description |
|---|---|
void |
close() |
void |
configure(Map<String,?> configs,
boolean isKey) |
void |
newKey()
Generate new AES key for the current thread
|
byte[] |
serialize(String topic,
T data) |
concatenate, crypt, init, newInstancepublic static final String CRYPTO_VALUE_SERIALIZER
public static final String CRYPTO_NEW_KEY_MSG_INTERVAL
public int msgInterval
public EncryptingSerializer()
public void configure(Map<String,?> configs, boolean isKey)
configure in interface org.apache.kafka.common.serialization.Serializer<T>public byte[] serialize(String topic, T data)
serialize in interface org.apache.kafka.common.serialization.Serializer<T>public void close()
close in interface org.apache.kafka.common.serialization.Serializer<T>public void newKey()
newKey in class SerdeCryptoBasekafka-end-2-end-encryption