この辺とか読みながら→ Introduction — Confluent Platform 3.1.0 documentation
Twitter4Jで適当に英語のツイートをとってきて、Kafkaに流し込む。(キーとかの設定はtwitter4j.propertiesに入れておいた)
@Component public class TwitterToKafka implements CommandLineRunner { @Autowired private KafkaTemplate<String, String> template; @Override public void run(String... args) throws Exception { TwitterStream twitterStream = new TwitterStreamFactory().getInstance(); twitterStream.addListener(new StatusAdapter() { @Override public void onStatus(Status status) { template.send("myTopic", status.getText()); } }); twitterStream.filter(filterQuery()); } private FilterQuery filterQuery() { // 英語のツイートで"spring"を含んでいるものにしてみた。ちょうど良いくらいの量だったので。 FilterQuery filter = new FilterQuery(); filter.language("en"); filter.track("spring"); // filter.follow(TwitterFactory.getSingleton().showUser("bufferings").getId()); return filter; } }
myTopicって名前のTopicに流し込むようにしたので、今度はそれを処理する定義を書く。単語の数を数えてみた。これがWordCountStoreってストアに保存される。
@Configuration @EnableKafkaStreams public class KStreamConfig { @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public StreamsConfig kStreamsConfigs(KafkaProperties kafkaProperties) { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams"); // Spring BootのAutoConfigurationに入ってるやつ使えばいいかなと思って props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()); // 動作確認用に1秒ごとにコミット props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000"); return new StreamsConfig(props); } @Bean public KStream<String, String> kStream(KStreamBuilder kStreamBuilder) { KStream<String, String> stream = kStreamBuilder.stream(Serdes.String(), Serdes.String(), "myTopic"); // @formatter:off stream // 単語に分割して .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) // 単語でGroupBy .groupBy((key, word) -> word, Serdes.String(), Serdes.String()) // カウントを取る(KTable) .count("WordCountStore"); // @formatter:on return stream; } }
ついでに、KTableの中身を表示するのを書いてみた。
@RestController public class StateStoreViewer { @Autowired private KStreamBuilderFactoryBean kStreamBuilder; @GetMapping public List<KeyValue<String, Long>> query() { final ReadOnlyKeyValueStore<String, Long> store = kStreamBuilder.getKafkaStreams().store("WordCountStore", QueryableStoreTypes.keyValueStore()); final List<KeyValue<String, Long>> results = new ArrayList<>(); final KeyValueIterator<String, Long> range = store.all(); while (range.hasNext()) { results.add(range.next()); } return results; } }
ら、大量に出てきてしまった。しかも良くわからん感じ。
でも、面白かったー!