Kafka StreamsをSpring Bootで試してみた

この辺とか読みながら→ Introduction — Confluent Platform 3.1.0 documentation

Twitter4Jで適当に英語のツイートをとってきて、Kafkaに流し込む。(キーとかの設定はtwitter4j.propertiesに入れておいた)

@Component
public class TwitterToKafka implements CommandLineRunner {

  @Autowired
  private KafkaTemplate<String, String> template;

  @Override
  public void run(String... args) throws Exception {
    TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
    twitterStream.addListener(new StatusAdapter() {
      @Override
      public void onStatus(Status status) {
        template.send("myTopic", status.getText());
      }
    });
    twitterStream.filter(filterQuery());
  }

  private FilterQuery filterQuery() {
    // 英語のツイートで"spring"を含んでいるものにしてみた。ちょうど良いくらいの量だったので。
    FilterQuery filter = new FilterQuery();
    filter.language("en");
    filter.track("spring");
    // filter.follow(TwitterFactory.getSingleton().showUser("bufferings").getId());
    return filter;
  }
}

myTopicって名前のTopicに流し込むようにしたので、今度はそれを処理する定義を書く。単語の数を数えてみた。これがWordCountStoreってストアに保存される。

@Configuration
@EnableKafkaStreams
public class KStreamConfig {

  @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  public StreamsConfig kStreamsConfigs(KafkaProperties kafkaProperties) {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
    // Spring BootのAutoConfigurationに入ってるやつ使えばいいかなと思って
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    // 動作確認用に1秒ごとにコミット
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000");
    return new StreamsConfig(props);
  }

  @Bean
  public KStream<String, String> kStream(KStreamBuilder kStreamBuilder) {
    KStream<String, String> stream = kStreamBuilder.stream(Serdes.String(), Serdes.String(), "myTopic");
    // @formatter:off
    stream
      // 単語に分割して
      .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
      // 単語でGroupBy
      .groupBy((key, word) -> word, Serdes.String(), Serdes.String())
       // カウントを取る(KTable)
      .count("WordCountStore");
    // @formatter:on
    return stream;
  }

}

ついでに、KTableの中身を表示するのを書いてみた。

@RestController
public class StateStoreViewer {

  @Autowired
  private KStreamBuilderFactoryBean kStreamBuilder;

  @GetMapping
  public List<KeyValue<String, Long>> query() {
    final ReadOnlyKeyValueStore<String, Long> store = kStreamBuilder.getKafkaStreams().store("WordCountStore",
        QueryableStoreTypes.keyValueStore());

    final List<KeyValue<String, Long>> results = new ArrayList<>();
    final KeyValueIterator<String, Long> range = store.all();
    while (range.hasNext()) {
      results.add(range.next());
    }

    return results;
  }

}

ら、大量に出てきてしまった。しかも良くわからん感じ。

f:id:bufferings:20170608085740p:plain

でも、面白かったー!