NATS はCNCF (Cloud Native Computing Foundation)によってホスティングされているメッセージングシステムです。軽量で高パフォーマンスかつスケーラブルなのが特徴だそうです。オランダのSynadia社が中心となって開発を行っていますが、オープンソースソフトウェアなのでGitHub 上で今トリビュートすることもできます。 Go、NodeJS、Ruby、Java、C、C#、Nginx用のクライアントライブラリはSynadiaによってサポートされており、そのほかにもPythonやElixir用のクライアントなどが存在します。 NATSのサーバ自体(gnatsd)はGoで書かれている ため、バイナリ一つで起動できるほか、、公式Dockerコンテナイメージ やKubernetes用のOperator も用意されているため、簡単に構築・運用することができます。 本記事でも、Dockerで起動したサーバを使用しています。
NATSでは3種類のメッセージングモデルを利用することができます。
Publish/Subscribe Request/Reply Queueing 今回はPub/SubとRequest/Replyを試してみます。
サーバを立ち上げる 実験に先駆けて、まずはサーバを立ち上げます。今回はmacOS High Sierra環境のため、docker for macで起動してみます。
1 $ docker run --rm -d --name nats -p 4222:4222 -p 6222:6222 -p 8222:8222 nats:1.4.0-linux nats:1.4.0-linuxは執筆時点(2019-02-06)でnats:latestです。 ここで三つのポートを空けていますが、それぞれ用途は次の通りです。
:4222: client port :6222: route port :8222: http port それぞれの詳細な説明は割愛しますが、本記事ではクライアントからの接続だけを試してみますので、4222番ポートだけの開放でも問題ありません。
Publish/Subscribe まずは標準的なPub/Subモデルから試してみます。NATSのPub/SubはRedisなどと同様、Wikipedia でいうところの「トピックベース」なPub/Subです。NATSではトピックのことをSubjectとよびます。 NATSのSubjectは階層構造をとることができ、.(ドット)で区切って表現します。Subscriberはこの階層構造の一部にワイルドカードとして*(アスタリスク)を使用することができます。また、>を使用して下の階層すべて、を表現することもできます。 例えば、Subscriberがfoo.bar.*を購読している場合、foo.bar.bazやfoo.bar.quxなどのメッセージを受け取ることができますが、foo.bar.baz.quxは受け取ることができません。一方、foo.bar.>を購読している場合、foo.bar.baz.quxも受け取ることができます。
サンプルコードとして、次のようなものを書いてみました。
Publisher 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package main import ( "log" nats "github.com/nats-io/go-nats" ) func main() { nc, err := nats.Connect("localhost:4222") if err != nil { log.Fatal(err) } defer nc.Close() if err := nc.Publish("subjectFoo", []byte("bodyBar")); err != nil { log.Fatal(err) } } Subscriber 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package main import ( "log" nats "github.com/nats-io/go-nats" ) func main() { nc, err := nats.Connect("localhost:4222") if err != nil { log.Fatal(err) } defer nc.Close() sub, err := nc.Subscribe("subjectFoo", callback) if err != nil { log.Fatal(err) } log.Printf("Subject: %s", sub.Subject) log.Printf("Queue: %s", sub.Queue) ch := make(chan struct{}) <-ch } func callback(message *nats.Msg) { log.Print(string(message.Data)) } それぞれ、適当なファイルに保存し、go runで起動します。あらかじめSubscriber側を起動しておくことで、Publisherを起動した際にメッセージ(今回は"bodyBar")が(Subscriber側で)Printされるはずです。 ポイントは*nats.Conn.Subscribeが非同期な関数で、メッセージを受け取った際にcallback関数が呼ばれる、というところです。 今回のサンプル中では<-chとしてブロックしていますが、何らかの方法でブロックしないと、受け取る前にmainが終わってしまうので注意が必要です。 同期処理したい場合には、*nats.Conn.SubscribeSyncを使用することで次のように書き換えられます。
...