KafkaのクライアントをJavaで試しに書いてみた。それとspring-kafka。

いつもありがとうございます。の気持ちとともにCLOVERを開いて、今日はKafkaのクライアントをJavaで試しに書いてみた。

d.hatena.ne.jp

KafkaTest1

かずひらさんと違うのは、localhostで立ち上げてるってところだけ。なのでこんなコードになった。ちょっとくらい触ろうかなと思って、シリアライザーとデシリアライザーをPropertiesに書いといた。

gist.github.com

なるほどなぁと思いながら写経した。設定とSerializer/Deserializerを指定して、KafkaProducer/KafkaConsumerをインスタンス化して、ProducerRecord/ConsumerRecordを送受信する感じか。

consumerも閉じるといいのかな?と思って一応閉じといた。

なんかよく分かんないのは、このConsumerのAUTO_OFFSET_RESET_CONFIGをearliestにしてるのに、2回目はもう取れてない感じ。group.idを変えたり、もういっかいProducer側を実行するとまた取れるので、オフセットがキープされてるってことなのかなぁ?Consumer Groupは生き残り続けてるってことなのかな?まいっか。

KafkaTest2

てことで、次はspring-kafkaのドキュメントを読んでみた。いや、というか逆で、最初にここを読もうとしてよく分かんなくてうろうろしてたらかずひらさんところにたどりついて「さすが師匠!」ってなったのであった。

http://docs.spring.io/spring-kafka/docs/1.2.1.RELEASE/reference/htmlsingle/#_very_very_quick

Spring Bootを使って依存関係を入れた。Kafkaが0.10.2.0なのでpom.xmlのpropertiesでバージョンを指定。

    <spring-kafka.version>1.2.1.RELEASE</spring-kafka.version>

dependenciesはこんなもんかな

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>

で、コードを書き写してみた。KafkaTemplateがProducerをラッピングしてて、KafkaMessageListenerContainerがConsumerなのかな?

gist.github.com

KafkaTest3

それを、Configurationを使って書くとこうなる。

gist.github.com

@EnableKafkaをつけておくと、@KafkaListenerが使えるみたい。

KafkaTest4

Spring Bootの機能を使うとこうなる。基本的な設定はSpring Bootがやっといてくれる。EnableKafkaもなくて良いんかな。

gist.github.com

だから、送信する方は KafkaTemplate をDIすればよくて、受信する方は @KafkaListener をつけとくだけか。

実際のところ設定はBeanで定義しておくような気がするね。今日はこれくらいかな。おやすー。

追記

AutoConfigurationを見ておいた。なるほどな感じだった。

KafkaAutoConfigurationは、KafkaTemplateがあるときに有効になって、KafkaTemplate・ProducerListener(ログ出力)・ConsumerFactory・ProducerFactoryをBean定義してある。

https://github.com/spring-projects/spring-boot/blob/v1.5.3.RELEASE/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

設定はここに定義してあって、buildConsumerPropertiesとbuildProducerPropertiesを使う。

https://github.com/spring-projects/spring-boot/blob/v1.5.3.RELEASE/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

んで、EnableKafkaの設定はここにあって、ConcurrentKafkaListenerContainerFactoryのBean定義もしてある。

https://github.com/spring-projects/spring-boot/blob/v1.5.3.RELEASE/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java

設定はプロパティファイルで済みそう?