いつもありがとうございます。の気持ちとともにCLOVERを開いて、今日はKafkaのクライアントをJavaで試しに書いてみた。
KafkaTest1
かずひらさんと違うのは、localhostで立ち上げてるってところだけ。なのでこんなコードになった。ちょっとくらい触ろうかなと思って、シリアライザーとデシリアライザーをPropertiesに書いといた。
なるほどなぁと思いながら写経した。設定とSerializer/Deserializerを指定して、KafkaProducer/KafkaConsumerをインスタンス化して、ProducerRecord/ConsumerRecordを送受信する感じか。
consumerも閉じるといいのかな?と思って一応閉じといた。
なんかよく分かんないのは、このConsumerのAUTO_OFFSET_RESET_CONFIGをearliestにしてるのに、2回目はもう取れてない感じ。group.idを変えたり、もういっかいProducer側を実行するとまた取れるので、オフセットがキープされてるってことなのかなぁ?Consumer Groupは生き残り続けてるってことなのかな?まいっか。
KafkaTest2
てことで、次はspring-kafkaのドキュメントを読んでみた。いや、というか逆で、最初にここを読もうとしてよく分かんなくてうろうろしてたらかずひらさんところにたどりついて「さすが師匠!」ってなったのであった。
http://docs.spring.io/spring-kafka/docs/1.2.1.RELEASE/reference/htmlsingle/#_very_very_quick
Spring Bootを使って依存関係を入れた。Kafkaが0.10.2.0なのでpom.xmlのpropertiesでバージョンを指定。
<spring-kafka.version>1.2.1.RELEASE</spring-kafka.version>
dependenciesはこんなもんかな
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
で、コードを書き写してみた。KafkaTemplateがProducerをラッピングしてて、KafkaMessageListenerContainerがConsumerなのかな?
KafkaTest4
Spring Bootの機能を使うとこうなる。基本的な設定はSpring Bootがやっといてくれる。EnableKafkaもなくて良いんかな。
だから、送信する方は KafkaTemplate をDIすればよくて、受信する方は @KafkaListener をつけとくだけか。
実際のところ設定はBeanで定義しておくような気がするね。今日はこれくらいかな。おやすー。
追記
AutoConfigurationを見ておいた。なるほどな感じだった。
KafkaAutoConfigurationは、KafkaTemplateがあるときに有効になって、KafkaTemplate・ProducerListener(ログ出力)・ConsumerFactory・ProducerFactoryをBean定義してある。
設定はここに定義してあって、buildConsumerPropertiesとbuildProducerPropertiesを使う。
んで、EnableKafkaの設定はここにあって、ConcurrentKafkaListenerContainerFactoryのBean定義もしてある。
設定はプロパティファイルで済みそう?