Massoud Mazar

Sharing The Knowledge

NAVIGATION - SEARCH

Kafka stream processing: lookup against hive data

Here is a scenario which in my opinion should be very common:

Suppose you need to build an ETL kafka stream which read data from one stream and checks it against a blacklist before writing to destination stream. This blacklist gets updated daily, andhas the same key as your source stream. One way to implement this is to use a Kafka Table (ktable) and join your stream with the table to find the matches.

To make it a bit more complicated, when keys match, we need to check a property in the object coming from source with value stored in the KTable, which is a list of comma separated values, and skip writing the message if source property is found in the list coming from blacklist. Since this blacklist is updated nightly and is stored in hive, easiest way was to just push the data directly from hive to kafka.

For this article, I'm using Hortonworks HDP 3.1.

Populating KTable from Hive

Hive 3.1 has built-in support for kafka streams. Since a kafka table is simply a stream, we can populate our kafka lookup table from hive. First we need to create a hive external table:

CREATE EXTERNAL TABLE kafka_blacklist 
    (hashes string)
STORED BY 
    'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES
  ("kafka.topic" = "blacklist",
  "kafka.bootstrap.servers"="kafkanode1:6667,kafkanode2:6667");

In this case, blacklist data consists of a bunch of hashes separated by comma. When this table is created, you can see some kafka related metadata columns are added to it automatically. You can see these columns using "describe" SQL command:

describe kafka_blacklist;

Every night, an automated job updates the blacklist data and then copies the data to the kafka stream:

INSERT INTO TABLE 
   kafka_blacklist
SELECT 
    concat_ws(",", collect_set(hash)) AS `hashes`
    , binary(key) AS `__key`
    , null AS `__partition`
    , -1 AS `__offset`
    , to_epoch_milli(CURRENT_TIMESTAMP) AS `__timestamp`
FROM blacklist
GROUP BY key;

Since blacklist data has multiple hashes per key, in above insert statement I create a comma separated list of these hashes so I have only one record per key. Key is string, so I have to cast it to binary. 

If you run the kafka console consumer, you can see data is stored in kafka stream as a JSON object which looks like this:

{"hashes":"hash1,hash2,hash3"}

Joining to KTable

In the stream processing app, source stream is left joined with the blacklist using "key" and then hash property of the message is checked against black listed hashes to decide if message should be passed to the destination stream.

JSON data stored in the kafka table will be deserialized to this class:

    static public class Hashes {
        public String hashes;
    }

And the following snippet shows how source stream is joined to the blacklist table:

// Input stream
KStream<String, byte[]> source = builder.stream(kafkaTopic_Message);

// JSON serde
Map<String, Object> serdeProps = new HashMap<>();

final Serializer<Hashes> hashesSerializer = new JsonPOJOSerializer<>();
serdeProps.put("JsonPOJOClass", Hashes.class);
hashesSerializer.configure(serdeProps, false);

final Deserializer<Hashes> hashesDeserializer = new JsonPOJODeserializer<>();
serdeProps.put("JsonPOJOClass", hashes.class);
hashesDeserializer.configure(serdeProps, false);

final Serde<Hashes> hashesSerde = Serdes.serdeFrom(hashesSerializer, hashesDeserializer);

// Common locations as a Global Table
GlobalKTable<String, Hashes> blacklist = builder.globalTable(kafkaTopic_blacklist,
        Consumed.with(Serdes.String(), hashesSerde));

// transform and filter
KStream<String, Message> messages =
        source.map((key, value) -> new KeyValue<>(key, parseMessage(value)))
        // Some filters ...
        .filter((key, message) -> ... );

// Skip sending if black listed
messages.leftJoin(blacklist,
        (leftKey, leftValue) -> leftKey,
        (leftValue, rightValue) -> rightValue == null
                || !Arrays.asList(rightValue.hashes.split(",")).contains(leftValue.getHash())
                ? leftValue : null)
        // Skip if message was set to null
        .filter((key, message) -> message != null)
        // anonymize the key
        .map((key, value) -> new KeyValue<>(DigestUtils.sha1Hex(key), value))
        // Transform to output result
        .map((key, message) -> new KeyValue<>(key, new OutGoingMessage(message)))
        .foreach(writer::write);

// Build the streams
final KafkaStreams streams = new KafkaStreams(builder.build(), props);

The POJO serializer and deserializer code looks like this:

package kafka.streams;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class JsonPOJOSerializer<T> implements Serializer<T> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    /**
     * Default constructor needed by Kafka
     */
    public JsonPOJOSerializer() {
    }

    @Override
    public void configure(Map<String, ?> props, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, T data) {
        if (data == null)
            return null;

        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new SerializationException("Error serializing JSON message", e);
        }
    }

    @Override
    public void close() {
    }

}


package kafka.streams;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

public class JsonPOJODeserializer<T> implements Deserializer<T> {
    private ObjectMapper objectMapper = new ObjectMapper();

    private Class<T> tClass;

    /**
     * Default constructor needed by Kafka
     */
    public JsonPOJODeserializer() {
    }

    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map<String, ?> props, boolean isKey) {
        tClass = (Class<T>) props.get("JsonPOJOClass");
    }

    @Override
    public T deserialize(String topic, byte[] bytes) {
        if (bytes == null)
            return null;

        T data;
        try {
            data = objectMapper.readValue(bytes, tClass);
        } catch (Exception e) {
            throw new SerializationException(e);
        }

        return data;
    }

    @Override
    public void close() {

    }
}

 

 

Add comment