Created
February 20, 2018 09:55
-
-
Save jeroenr/8b0cc0a4ce3b4d521de28267867bc003 to your computer and use it in GitHub Desktop.
Example of case class serializer and deserializer for Avro
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class CaseClassDeserializer[CC](isKey: Boolean)(implicit format: RecordFormat[CC]) extends Deserializer[CC] with Logging { | |
| private val deserializer = new KafkaAvroDeserializer(null, Map( | |
| AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> Config.tracktivity.kafka.connect.`schema-registry-url`, | |
| KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> "false" | |
| ).asJava) | |
| override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {} | |
| override def close(): Unit = deserializer.close() | |
| override def deserialize(topic: String, data: Array[Byte]): CC = { | |
| if (data == null) null.asInstanceOf[CC] | |
| else { | |
| val record = deserializer.deserialize(topic, data).asInstanceOf[GenericRecord] | |
| if (record == null || format == null) logger.info(s"DESER ${record} from topic ${topic} using format ${format}. Based on data ${data}") | |
| format.from(record) | |
| } | |
| } | |
| } | |
| class CaseClassSerializer[CC](isKey: Boolean)(implicit format: RecordFormat[CC]) extends Serializer[CC] { | |
| private val serializer = new KafkaAvroSerializer(null, Map( | |
| AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> Config.tracktivity.kafka.connect.`schema-registry-url` | |
| ).asJava) | |
| override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {} | |
| override def close(): Unit = serializer.close() | |
| override def serialize(topic: String, data: CC): Array[Byte] = { | |
| val record = format.to(data) | |
| serializer.serialize(topic, record) | |
| } | |
| } | |
| class CaseClassSerde[CC](isKey: Boolean)(implicit format: RecordFormat[CC]) extends Serde[CC] { | |
| override def deserializer(): Deserializer[CC] = new CaseClassDeserializer(isKey) | |
| override def serializer(): Serializer[CC] = new CaseClassSerializer(isKey) | |
| override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {} | |
| override def close(): Unit = {} | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment