#KanJavaParty で喋ったよ(ヽ´ω`)

kanjava.connpass.com

f:id:bufferings:20170624165221p:plain

直前までコード触ってた。ローカルでデモを見てもらうけど、実際のアプリも触ってもらえたらいいなと思ってAzureでも動かした。

んで、ほんとにギリギリでAzure上で動き始めて、そしてなぜかローカルが動かなくなった。Azureでも動いてなかったら何も見せられかったから危なかった。なんで動いてないのか気になるけど、今はもう気力がない。


資料

これで喋ったよ。僕の中では関ジャバは、ゆるくみんなで話をする場所だから。こういうのがいいかなーと思って。

デモアプリ

Azureで動かしてる。今日の夜には落とそうかな。


ソースコード

あとは、しゃべりながら見せたやつの、紹介ね。

Event Sourcing

docs.microsoft.com

いつもの人

martinfowler.com

Kafka Streams

Apache Kafka

Confluentのサイトがサンプルとかいっぱいあっていい感じ。

Kafka Streams — Confluent Platform 3.2.2 documentation

Kafka Streams & Event Sourcing

これくらいしか見つからないかな。

Event sourcing, CQRS, stream processing and Apache Kafka: What’s the connection? - Confluent

Azure Container Service: DC/OS

Running DC/OS on Azure

無事(?)終わった!

みんながあたたかく見守ってくれたおかげで無事終わりましたー。これをきっかけに、色々と情報交換ができたら嬉しいな。Event Sourcingとか。

解放感

Using multiple Avro schema in one Kafka Topic

I tried Kafka with Avro on Spring Boot in the previous blog post(Japanese).

bufferings.hatenablog.com

Then this time, I tried to use multiple Avro schema in one topic.

Schema

"tweet.avsc" is the schema file which I used in the previous post.

{
  "name": "Tweet",
  "type": "record",
  "fields": [
    {"name": "id",  "type": "long"},
    {"name": "text", "type": "string"}
  ]
}

Then, I added another schema file "different-schema.avsc"

{
  "name": "DifferentSchema",
  "type": "record",
  "fields": [
    {"name": "id2",  "type": "long"},
    {"name": "createdAt", "type": "string"},
    {"name": "text2", "type": "string"}
  ]
}

(Please note that I don't think about Schema Evolution yet, because it's still early for me to think about it.)

Modify Serializer

I found that KafkaAvroSerializer uses the topic name for the schema registration key. Because I want to try multiple schema in one topic, I created KafkaAvroSerializerWithSchemaName to use the schema fullname for registering schema.

https://github.com/bufferings/kafka-streams-demo/blob/v0.3/src/main/java/com/example/demo/KafkaAvroSerializerWithSchemaName.java#L36

  @Override
  public byte[] serialize(String topic, Object record) {
    return serializeImpl(getSchema(record).getFullName(), record);
  }

In addition, I changed Serde for KStream to use KafkaAvroSerializerWithSchemaName for serialization.

https://github.com/bufferings/kafka-streams-demo/blob/v0.3/src/main/java/com/example/demo/GenericAvroSerializerWithSchemaName.java

Finally, I have multiple schemas in one topic :)

f:id:bufferings:20170615195802p:plain

Addition

This time I tried "schema-registry-ui" and "kafka-topics-ui" which are pretty nice web UI provided by Landoop( http://www.landoop.com/kafka/kafka-tools/ ) under BSL license( http://www.landoop.com/bsl/ ).

Schema Registry UI

To enable CORS accessing from other docker instance, I added "access.control.allow.methods" and "access.control.allow.origin" to the schema registry.

docker run -d --net=host --name=schema-registry \
    -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=localhost:2181 \
    -e SCHEMA_REGISTRY_HOST_NAME=localhost \
    -e SCHEMA_REGISTRY_DEBUG=true \
    -e SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS=GET,POST,PUT,OPTIONS \
    -e SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN=* \
    confluentinc/cp-schema-registry:3.2.1

Then run schema-regisry-ui.

docker run -d --rm -p 7000:8000 \
    -e "SCHEMAREGISTRY_URL=http://172.17.0.1:8081" \
    landoop/schema-registry-ui

Now I can see schemas like this:

f:id:bufferings:20170615194257p:plain

Kafka Topic UI

To use Kafka Topic UI, I need to run Kafka Rest Proxy.

docker run -d \
  --net=host \
  --name=kafka-rest \
  -e KAFKA_REST_ZOOKEEPER_CONNECT=localhost:2181 \
  -e KAFKA_REST_SCHEMA_REGISTRY_URL=http://localhost:8081 \
  -e KAFKA_REST_HOST_NAME=localhost \
  -e KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGIN=* \
  -e KAFKA_REST_ACCESS_CONTROL_ALLOW_METHODS=GET,POST,PUT,DELETE,OPTIONS \
  confluentinc/cp-kafka-rest:3.2.1

Then run kafka-topics-ui.

docker run --rm -it -p 7001:8000 \
  -e "KAFKA_REST_PROXY_URL=http://172.17.0.1:8082" \
  landoop/kafka-topics-ui

f:id:bufferings:20170615194726p:plain

It's fun!

KafkaとAvroとSchema Registryの自分メモ

これの続き
bufferings.hatenablog.com

Avro

Avroはデータのシリアライズをする何か。去年のSpringOneでちらほら名前を聞いた。Schema Evolution系のセッションだったかな。

Welcome to Apache Avro!

ということで、前回の記事では文字列だけを渡してたけど、今回はTweetのidとtextを渡してみた。

スキーマファイルはこんなの。

{
  "name": "Tweet",
  "type": "record",
  "fields": [
    {"name": "id",  "type": "long"},
    {"name": "text", "type": "string"}
  ]
}

Schema Registry

そのSchemaをSchema Registryを使ってやり取りするような感じ。

GitHub - confluentinc/schema-registry: Schema registry for Kafka

最初はSchema Registryを使わずにSchemaファイルを使おうと思ってたんだけど、それだと受け取る側がどんなスキーマなのかを事前に分かってないといけないので、違うなって思って。結局Registryを使ってみることにした。

できあがったソースはここ

https://github.com/bufferings/kafka-streams-demo/tree/v0.2

Kafkaだけじゃなくて、Schema Registry(port 8081の想定にしてます)も立ち上げてからアプリを起動すると動く。

Dockerイメージあったので使った。

Configuration — Confluent Platform 3.2.1 documentation

docker run -d   --net=host \
   --name=schema-registry \
   -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=localhost:2181 \
   -e SCHEMA_REGISTRY_HOST_NAME=localhost \
   -e SCHEMA_REGISTRY_DEBUG=true \
   confluentinc/cp-schema-registry:3.2.1

でもDocker for Macは非推奨みたいね。(僕はUbuntuで動かしてる)

Introduction — Confluent Platform 3.2.1 documentation

AvroについてもSchema Registryについても

まだ全然理解できてないけど、とりあえず、なんか動いた。

次はベスト10的なAggregateを作ってみたいな。

(つд⊂)オヤスミナサイ

Kafka StreamsをSpring Bootで(その2

これの続き。ちょーっとずつ進んでるのかなぁ。

bufferings.hatenablog.com

こうなった

f:id:bufferings:20170610205644p:plain

日本語のsampleストリーム

今度は日本語のsampleストリームを取ってくるようにしてみた。

  @Override
  public void run(String... args) throws Exception {
    TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
    twitterStream.addListener(new StatusAdapter() {
      @Override
      public void onStatus(Status status) {
        template.send("TweetTexts", status.getText());
      }
    });
    twitterStream.sample("ja");
  }

https://github.com/bufferings/kafka-streams-demo/blob/v0.1/src/main/java/com/example/demo/TwitterToKafka.java

ので、kuromojiで分解。

  @Bean
  public KStream<String, String> kStream(KStreamBuilder kStreamBuilder) {
    KStream<String, String> stream = kStreamBuilder.stream("TweetTexts");
    // @formatter:off
    stream
      .flatMapValues(value -> tokenize(value))
      .groupBy((key, word) -> word)
      .count("TweetTextWords");
    // @formatter:on
    return stream;
  }

  private static final Tokenizer tokenizer = new Tokenizer();

  private List<String> tokenize(String value) {
    // @formatter:off
    return tokenizer.tokenize(value).stream()
      // 辞書にあって名詞で2文字以上のにしてみた
      .filter(token -> token.isKnown() && token.getPartOfSpeechLevel1().equals("名詞") && token.getBaseForm().length() >= 2)
      .map(token -> token.getBaseForm())
      .collect(Collectors.toList());
    // @formatter:on
  }

https://github.com/bufferings/kafka-streams-demo/blob/v0.1/src/main/java/com/example/demo/KStreamConfig.java

全部返しても見づらいから

強引に上位100件だけ返すようにしてみた。本当に作るときはこんなことしないけどね。

  @GetMapping
  public List<KeyValue<String, Long>> query() {
    final ReadOnlyKeyValueStore<String, Long> store = kStreamBuilder.getKafkaStreams().store("TweetTextWords",
        QueryableStoreTypes.keyValueStore());

    final List<KeyValue<String, Long>> results = new ArrayList<>();
    final KeyValueIterator<String, Long> range = store.all();
    while (range.hasNext()) {
      results.add(range.next());
    }

    // 強引に上位100件をとってみた
    Collections.sort(results, (r1, r2) -> {
      return r2.value.intValue() - r1.value.intValue();
    });
    return results.subList(0, Math.min(results.size(), 100));
  }

https://github.com/bufferings/kafka-streams-demo/blob/v0.1/src/main/java/com/example/demo/StateStoreViewer.java

ま、今回も面白かった。次はAvro試してみたいなー。

Kafka StreamsをSpring Bootで試してみた

この辺とか読みながら→ Introduction — Confluent Platform 3.1.0 documentation

Twitter4Jで適当に英語のツイートをとってきて、Kafkaに流し込む。(キーとかの設定はtwitter4j.propertiesに入れておいた)

@Component
public class TwitterToKafka implements CommandLineRunner {

  @Autowired
  private KafkaTemplate<String, String> template;

  @Override
  public void run(String... args) throws Exception {
    TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
    twitterStream.addListener(new StatusAdapter() {
      @Override
      public void onStatus(Status status) {
        template.send("myTopic", status.getText());
      }
    });
    twitterStream.filter(filterQuery());
  }

  private FilterQuery filterQuery() {
    // 英語のツイートで"spring"を含んでいるものにしてみた。ちょうど良いくらいの量だったので。
    FilterQuery filter = new FilterQuery();
    filter.language("en");
    filter.track("spring");
    // filter.follow(TwitterFactory.getSingleton().showUser("bufferings").getId());
    return filter;
  }
}

myTopicって名前のTopicに流し込むようにしたので、今度はそれを処理する定義を書く。単語の数を数えてみた。これがWordCountStoreってストアに保存される。

@Configuration
@EnableKafkaStreams
public class KStreamConfig {

  @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  public StreamsConfig kStreamsConfigs(KafkaProperties kafkaProperties) {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
    // Spring BootのAutoConfigurationに入ってるやつ使えばいいかなと思って
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    // 動作確認用に1秒ごとにコミット
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000");
    return new StreamsConfig(props);
  }

  @Bean
  public KStream<String, String> kStream(KStreamBuilder kStreamBuilder) {
    KStream<String, String> stream = kStreamBuilder.stream(Serdes.String(), Serdes.String(), "myTopic");
    // @formatter:off
    stream
      // 単語に分割して
      .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
      // 単語でGroupBy
      .groupBy((key, word) -> word, Serdes.String(), Serdes.String())
       // カウントを取る(KTable)
      .count("WordCountStore");
    // @formatter:on
    return stream;
  }

}

ついでに、KTableの中身を表示するのを書いてみた。

@RestController
public class StateStoreViewer {

  @Autowired
  private KStreamBuilderFactoryBean kStreamBuilder;

  @GetMapping
  public List<KeyValue<String, Long>> query() {
    final ReadOnlyKeyValueStore<String, Long> store = kStreamBuilder.getKafkaStreams().store("WordCountStore",
        QueryableStoreTypes.keyValueStore());

    final List<KeyValue<String, Long>> results = new ArrayList<>();
    final KeyValueIterator<String, Long> range = store.all();
    while (range.hasNext()) {
      results.add(range.next());
    }

    return results;
  }

}

ら、大量に出てきてしまった。しかも良くわからん感じ。

f:id:bufferings:20170608085740p:plain

でも、面白かったー!

KafkaのクライアントをJavaで試しに書いてみた。それとspring-kafka。

いつもありがとうございます。の気持ちとともにCLOVERを開いて、今日はKafkaのクライアントをJavaで試しに書いてみた。

d.hatena.ne.jp

KafkaTest1

かずひらさんと違うのは、localhostで立ち上げてるってところだけ。なのでこんなコードになった。ちょっとくらい触ろうかなと思って、シリアライザーとデシリアライザーをPropertiesに書いといた。

gist.github.com

なるほどなぁと思いながら写経した。設定とSerializer/Deserializerを指定して、KafkaProducer/KafkaConsumerをインスタンス化して、ProducerRecord/ConsumerRecordを送受信する感じか。

consumerも閉じるといいのかな?と思って一応閉じといた。

なんかよく分かんないのは、このConsumerのAUTO_OFFSET_RESET_CONFIGをearliestにしてるのに、2回目はもう取れてない感じ。group.idを変えたり、もういっかいProducer側を実行するとまた取れるので、オフセットがキープされてるってことなのかなぁ?Consumer Groupは生き残り続けてるってことなのかな?まいっか。

KafkaTest2

てことで、次はspring-kafkaのドキュメントを読んでみた。いや、というか逆で、最初にここを読もうとしてよく分かんなくてうろうろしてたらかずひらさんところにたどりついて「さすが師匠!」ってなったのであった。

http://docs.spring.io/spring-kafka/docs/1.2.1.RELEASE/reference/htmlsingle/#_very_very_quick

Spring Bootを使って依存関係を入れた。Kafkaが0.10.2.0なのでpom.xmlのpropertiesでバージョンを指定。

    <spring-kafka.version>1.2.1.RELEASE</spring-kafka.version>

dependenciesはこんなもんかな

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>

で、コードを書き写してみた。KafkaTemplateがProducerをラッピングしてて、KafkaMessageListenerContainerがConsumerなのかな?

gist.github.com

KafkaTest3

それを、Configurationを使って書くとこうなる。

gist.github.com

@EnableKafkaをつけておくと、@KafkaListenerが使えるみたい。

KafkaTest4

Spring Bootの機能を使うとこうなる。基本的な設定はSpring Bootがやっといてくれる。EnableKafkaもなくて良いんかな。

gist.github.com

だから、送信する方は KafkaTemplate をDIすればよくて、受信する方は @KafkaListener をつけとくだけか。

実際のところ設定はBeanで定義しておくような気がするね。今日はこれくらいかな。おやすー。

追記

AutoConfigurationを見ておいた。なるほどな感じだった。

KafkaAutoConfigurationは、KafkaTemplateがあるときに有効になって、KafkaTemplate・ProducerListener(ログ出力)・ConsumerFactory・ProducerFactoryをBean定義してある。

https://github.com/spring-projects/spring-boot/blob/v1.5.3.RELEASE/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

設定はここに定義してあって、buildConsumerPropertiesとbuildProducerPropertiesを使う。

https://github.com/spring-projects/spring-boot/blob/v1.5.3.RELEASE/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

んで、EnableKafkaの設定はここにあって、ConcurrentKafkaListenerContainerFactoryのBean定義もしてある。

https://github.com/spring-projects/spring-boot/blob/v1.5.3.RELEASE/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java

設定はプロパティファイルで済みそう?

だんだん開発スピードが遅くなっていくのをどうやってとめたら良かったんだろう?

先日、モブプロをやってきた。その中で、モブプロとは別で、いくつか感じたことがあって、今日はその中のひとつを思い浮かんだままにメモ。

bufferings.hatenablog.com

要件を満たすプロダクトをより早く出す

モブプロでTDDしながら、要件を満たすプロダクトをより早く出すことに集中してみた。例えば、第2ラウンドのお題はTDDBCなどでお馴染みの「自販機」。

「100円を入れてボタンを押すとコーラが1本買えること」

最初に「100円を入れてボタンを押すとコーラが1本買えること」と言われ。

assertThat(get(100), is("コーラ"));

みたいなテストを書いて。

String get(int money) {
  return "コーラ";
}

みたいな実装を書いた。爆速!

「200円を入れてボタンを押すとオレンジジュースが1本買えること」

次に「200円を入れてボタンを押すとオレンジジュースが1本買えること」と言われ。

assertThat(get(200), is("オレンジジュース"));

実装はこうなった。

String get(int money) {
  if(money == 200) {
    return "オレンジジュース";
  }
  return "コーラ";
}

その後

その後は

  • 200円でコーラが2本買えること→( ゚д゚)ハッ!「getOrangeJuice()とgetCoke()に分ける?」
  • コーラの在庫を追加することができること→( ゚д゚)ハッ!「・・・ざ、いこ・・・?」
  • 900円入れてコーラを2本とオレンジジュースを2本とビールを1本買って100円お釣りが出てくること→( ゚д゚)ハッ!「ボタンが必要・・・?てか、お、お釣り?」

とかなりながら、だんだん遅くなりながらも、要件を全て満たすだけのコードを書き続けた。

その結果として出来上がったもの

は、要件は満たすけど、色んなif文に溢れ、在庫を減らす処理はいくつかの場所に散らばって(たしかビールの在庫は減らなかったと思うw)、「あぁ・・・」という感じのものだった。

プロダクトオーナーのセリフ

僕らが「え?在庫なんてあるんですか?」というと、POは「ふつー自販機ならあるやろー」って言ってた。まぁ、自販機ならそりゃそうよなー、と思いつつ。でも普通のプロジェクトで「そんなの普通あたりまえでしょ?」とか言っちゃうPOだとたぶん開発うまくいかないだろうなーとか思いながら聞いてた。

それと、POはしきりに「聞いてくれていいよ」と言ってはくれてたけど、でも自分から「このプロダクトをチームと一緒に良くしたい!」という気持ちは見えなかったなぁw

チームのメンバーのセリフ

途中でメンバーの中では「色んな商品を取り扱えるように、Productクラスを作った方が良いのでは?」という意見が出てきた。その段階では、要望には、コーラとオレンジジュースしかなく、この後、本当に商品が増えていくのかどうかも分からなかった。

とか色んなこと

をバーっと1時間の中で経験して、みんなと「もうちょっと設計をちゃんとやった方が良かったかもしれないね」みたいな話もした。

帰り道

で、ぼーっと考えながら。「これって、うまくいかないプロジェクトの縮図っぽいなぁ」と思った。

  1. 僕らは、「自販機」というものが何をできるものなのかを知らない状態で開発を始めて、要望を満たすものを急いで作る。
  2. 次の要望くらいまでは大丈夫なんだけど、だんだん開発スピードが遅くなってくる。
  3. 動いている部分には極力触るなの精神で、if文で分岐を追加して新機能対応をする。
  4. その結果、例えば「在庫を減らす」という処理が散らばってしまい、バグを埋め込んでしまう。
  5. 本来、少しの変更で対応できそうなことが、その複雑さのためにすごく時間がかかるようになってしまう。

んで、おまけで、

  1. 最初の開発をやってた人がマネージャとかになる
  2. 「僕がやってたころは、もっと開発スピード速かったんだけどな」とか言ってる。
  3. その人しか知らない仕様やif文があって、結局全てその人に聞かないといけない状態になってる。

みたいなね。

じゃあどうすれば良かったのか?

  1. そういう経験をしてきた僕は「ちゃんと設計をしたらマシだったはずだ!」と、しっかり設計をしようとする。
  2. 「自販機」というものを誰も知らない状況で「これはジュースだけでなく、どんな商品でも購入できるように、汎用的に作らなければならない!」とか考え出しちゃう。
  3. これも必要かもしれない、これも今入れておかないといけない、とかでどんどん仕様が膨らんでいく
  4. だいぶ時間をかけてできあがったけど、ほとんどの機能が使われない
  5. 新機能を追加するのに、使われていない機能のテストもしないといけなくて、スピードがあがらない

あれ?(´・ω・`)

じゃあどうすれば良かったのか?

そのあいだなのかなー?

目の前のことに追われすぎない、かつ、分からない先のことを想像で考え過ぎない。

だから、何かに気づいたときに、それを元に全体をリファクタリングする。できるようにしておく。というのが大切なのかなって思った。

「在庫」という概念を発見したら「在庫をどうもたせるべきか」というのを考えてリファクタリング

たぶんそういう発見はブレイクスルー的なリファクタリングになるから、ため息をつきながら、プレッシャーでドキドキしながら、やる。

そのためには、スキルと勇気と変更容易性が必要。

というところなのかなー。

それと「自販機」というものを知って、早く小さくブレイクスルーを起こすために、POはもっと中から発言をすると良さそうかなー。

ということで

誰も知らない新しいものを作るときは「全員の知識を持ち寄って、変化(発見)に対応し続ける」というのができたら良いのかなーって思った。

おわり。