RabbitMQで1つのメッセージを複数の相手に渡す同報配信をしてみます。
セゾン情報システムズの HULFTって製品に同報配信という便利な機能があり良く使っていました。
これと似たことを RabbitMQでできますのでやってみます。
RabbitMQのインストールについては「RabbitMQでメッセージを受け渡し」に書いていますので、RabbitMQが動くところから開始します。
ここでは以下の環境で実験を行います。
- Ubuntu Server 22.04.2
- RabbitMQ 3.11.13-1
クライアントは何でも良いですが、私は Lubuntu 22.04を使っています。
実験に入る前に、RabbitMQの少し突っ込んだ構造をば。
RabbitMQの中には、バーチャルホスト・エクスチェンジ・キューって観念がありまして、こんな図になろうかと思います。
こんな説明で良いのか?と思う簡略化するとこんな説明になるかと。
●バーチャルホスト
エクスチェンジやキューが収まる箱のようなもの。
論理的なデータの区切りとなる。
●エクスチェンジ
入ってきたメッセージのデータをルールに従ってキューに振り分ける。
●キュー
メッセージのデータを FIFOで保持している。
論より証拠なんで、作っていって実験してみましょう。
Ubuntu Serverで rabbitmqctlコマンドを使用して作業していきます。
バーチャルホストから作っていきます。
マニュアルはこちら。
Virtual Hosts
「vhost1」「vhost2」という名前で 2つ作ってみました。
subro@RabbitMQ:~$ sudo rabbitmqctl add_vhost vhost1
Adding vhost "vhost1" ...
subro@RabbitMQ:~$ sudo rabbitmqctl add_vhost vhost2
Adding vhost "vhost2" ...
できました。
バーチャルホストのリストを取ります。
subro@RabbitMQ:~$ sudo rabbitmqctl list_vhosts
Listing vhosts ...
name
vhost1
/
vhost2
ピンクのが今作ったやつです。
[/]は最初から存在しているデフォルトのバーチャルホストですね。
RabbitMQでの作業には、バーチャルホストにログインが必要です。
基本的にアカウント/パスワードによる認証が必要なんですが、ローカルホストからのアクセスに限り、ユーザーを指定しなければバーチャルホストに対して「guest」ユーザーで接続している状態になっています。
guestユーザーには[/]バーチャルホストへの全権が与えられています。
しかしネットワーク越しにアクセスする場合や作成したバーチャルホストにログインするには、ユーザーを作ってバーチャルホストに対するアクセス権限を作らなければいけません。
ユーザーとアクセス権限のマニュアルはこちら。
Managing Users and Permissions
ユーザーを作ります。
ユーザーは RabbitMQ全体に所属する考え方になっています。
[subro]ユーザーのパスワードを[P@ssw0rd]で作ってみました。
subro@RabbitMQ:~$ sudo rabbitmqctl add_user subro P@ssw0rd
Adding user "subro" ...
Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more.
ユーザーができました。
ユーザーのリストを取ります。
subro@RabbitMQ:~$ sudo rabbitmqctl list_users
Listing users ...
user tags
subro []
guest [administrator]
バーチャルホストに対してのアクセス権限を設定します。
subro@RabbitMQ:~$ sudo /usr/sbin/rabbitmqctl set_permissions -p vhost1 subro ".*" ".*" ".*"
Setting permissions for user "subro" in vhost "vhost1" ...
subro@RabbitMQ:~$ sudo /usr/sbin/rabbitmqctl set_permissions -p vhost2 subro ".*" ".*" ".*"
Setting permissions for user "subro" in vhost "vhost2" ...
設定できました。
「.*」の部分が権限の内容で、左から、[設定][書き込み][読み込み]となっています。
「.*」は全権です。
[subro]ユーザーの権限のリストを取ります。
subro@RabbitMQ:~$ sudo rabbitmqctl list_user_permissions subro
Listing permissions for user "subro" ...
vhost configure write read
vhost2 .* .* .*
vhost1 .* .* .*
ズレてますが、バーチャルホスト毎に設定されていることが分かります。
次はエクスチェンジを作って〜と行きたい所なのですが、実は管理用コマンドの rabbitmqctlでは、エクスチェンジ作成・キュー作成ができないのでした…。
開発言語用のライブラリを使うとできますので、ここではクライアント(Lubuntu 2204)にある Pythonからやります。
「RabbitMQでメッセージを受け渡し」で、Pythonの「Pika」ってライブラリを導入していますので、それをご参照下さい。
さてここで、エクスチェンジの種類について。
マニュアルはこちらです。
Exchanges and Exchange Types
エクスチェンジの種類は以下の4つ。
●Direct
届いたメッセージを(送信時に指定する)ルーティングキー(このタイプの場合はキュー名にする)で指定されたキューにメッセージを入れる。
バーチャルホストにはデフォルトでこのタイプのエクスチェンジが無名で作られている。
●Fanout
紐付けられているキュー全てに同じメッセージを入れる。→同報配信
●Topic
(送信時に指定する)ルーティングキーに対して、キュー紐付けに指定されている文字列が合致する場合、そのキューにメッセージを入れる。
●Header
(送信時に指定する)ルーティングキーではなく、メッセージヘッダーによってキューに振り分ける。
ここでは同報配信の実験をしますので、Topic Exchangeを使用します。
Pythonで Exchangeを作成するにあたり、Pikaの Channelクラスにある exchange_declare()メソッドを使います。
こんなスクリプトで作りました。
[vhost1]には4種類全部を、[vhost2]には[Topic]のみです。
[create_exchange.py]
#!/usr/bin/python3
import pika
connection1 = pika.BlockingConnection( pika.ConnectionParameters( host="RabbitMQ", virtual_host="vhost1", credentials=pika.PlainCredentials('subro', 'P@ssw0rd')))
channel1 = connection1.channel()
channel1.exchange_declare("vhost1_exchange_direct", exchange_type="direct", durable=True)
channel1.exchange_declare("vhost1_exchange_fanout", exchange_type="fanout", durable=True)
channel1.exchange_declare("vhost1_exchange_topic", exchange_type="topic", durable=True)
channel1.exchange_declare("vhost1_exchange_headers", exchange_type="headers", durable=True)
channel1.close()
connection1.close()
connection2 = pika.BlockingConnection( pika.ConnectionParameters( host="RabbitMQ", virtual_host="vhost2", credentials=pika.PlainCredentials('subro', 'P@ssw0rd')))
channel2 = connection2.channel()
channel2.exchange_declare("vhost2_exchange_fanout", exchange_type="fanout", durable=True)
channel2.close()
connection2.close()
exchange_declare()メソッドの第1引数は、バーチャルホストの名前です。
「durable=True」とすることで、RabbitMQのシャットダウン時に消えないようにしています。
エクスチェンジはできたと思います。
UbuntuServer側でエクスチェンジのリストを取ります。
(出力がズレて見辛いので手動で整形しています)
subro@RabbitMQ:~$ sudo rabbitmqctl list_exchanges --vhost vhost1
Listing exchanges for vhost vhost1 ...
name type
vhost1_exchange_direct direct
amq.fanout fanout
amq.rabbitmq.trace topic
vhost1_exchange_topic topic
vhost1_exchange_fanout fanout
vhost1_exchange_headers headers
amq.direct direct
amq.headers headers
direct
amq.match headers
amq.topic topic
subro@RabbitMQ:~$ sudo rabbitmqctl list_exchanges --vhost vhost2
Listing exchanges for vhost vhost2 ...
name type
amq.direct direct
vhost2_exchange_fanout fanout
amq.topic topic
amq.fanout fanout
amq.match headers
direct
amq.rabbitmq.trace topic
amq.headers headers
ピンクがスクリプトで新規作成したものです。
黄色がデフォルトで作られている無名のやつですね。
どうやら何もしなくてもバーチャルホストには「amq.」で始まるエクスチェンジが勝手にできているようです。
キューを作ります。
Pikaの Channelクラスにある queue_declare()メソッドを使います。
[vhost2]に 4つのキューを作りました。
[create_queue.py]
#!/usr/bin/python3
import pika
connection = pika.BlockingConnection( pika.ConnectionParameters( host="RabbitMQ", virtual_host="vhost2", credentials=pika.PlainCredentials('subro', 'P@ssw0rd')))
channel = connection.channel()
channel.queue_declare("queue1", durable=True)
channel.queue_declare("queue2", durable=True)
channel.queue_declare("queue3", durable=True)
channel.queue_declare("queue4", durable=True)
channel.close()
connection.close()
queue_declare()メソッドの第1引数は、キューの名前です。
「durable=True」とすることで、RabbitMQのシャットダウン時に消えないようにしています。
キューができました。
キューのリストを取ります。
subro@RabbitMQ:~$ sudo rabbitmqctl list_queues --vhost vhost2
Timeout: 60.0 seconds ...
Listing queues for vhost vhost2 ...
name messages
queue2 0
queue4 0
queue3 0
queue1 0
4つのキューができています。
これら4つのキューを、[vhost2_exchange_fanout]エクスチェンジに紐付けます。
Pikaの Channelクラスにある queue_bind()メソッドを使います。
[queue_bind.py]
#!/usr/bin/python3
import pika
connection = pika.BlockingConnection( pika.ConnectionParameters( host="RabbitMQ", virtual_host="vhost2", credentials=pika.PlainCredentials('subro', 'P@ssw0rd')))
channel = connection.channel()
channel.queue_bind("queue1", "vhost2_exchange_fanout")
channel.queue_bind("queue2", "vhost2_exchange_fanout")
channel.queue_bind("queue3", "vhost2_exchange_fanout")
channel.queue_bind("queue4", "vhost2_exchange_fanout")
channel.close()
connection.close()
バインドされました。
エクスチェンジとキューの紐付け情報を取ります。
ズレる(以下略)
subro@RabbitMQ:~$ sudo rabbitmqctl list_bindings --vhost vhost2
Listing bindings for vhost vhost2...
source_name source_kind destination_name destination_kind routing_key arguments
exchange queue2 queue queue2 []
exchange queue4 queue queue4 []
exchange queue3 queue queue3 []
exchange queue1 queue queue1 []
vhost2_exchange_fanout exchange queue1 queue queue1 []
vhost2_exchange_fanout exchange queue2 queue queue2 []
vhost2_exchange_fanout exchange queue3 queue queue3 []
vhost2_exchange_fanout exchange queue4 queue queue4 []
無名エクスチェンジ(Directタイプ)の紐付けがあることも分かりますね。
結果、こんな感じになったはずです。
これで環境はできましたので、[vhost2]バーチャルホストの[vhost2_exchange_fanout]エクスチェンジに対してメッセージを送ってみます。
上手く行けば、[vhost2]バーチャルホストの[queue1]〜[queue4]までに同じものが届くはずです。
大事なのは、データ送信しているのは 1回だけということ。
[send.py]
#!/usr/bin/python3
import pika
connection = pika.BlockingConnection( pika.ConnectionParameters( host="RabbitMQ", virtual_host="vhost2", credentials=pika.PlainCredentials('subro', 'P@ssw0rd') ))
channel = connection.channel()
# routeing_key はキュー名を指定してない
channel.basic_publish(exchange='vhost2_exchange_fanout', routing_key='', body='Hello World!')
print(" [x] Sent 'Hello World!'")
channel.close()
connection.close()
実行したところ、問題なく送信はできたようです。
RabbitMQは受け取ってはくれましたが、果たして 4つのキューに同報配信してるでしょうか。
キューの状態を取ります。
subro@RabbitMQ:~$ sudo rabbitmqctl list_queues --vhost vhost2
Timeout: 60.0 seconds ...
Listing queues for vhost vhost2 ...
name messages
queue2 1
queue4 1
queue3 1
queue1 1
4つのキューのメッセージ数が全て 1になっています。
想定通りの動きをしました。
ここまでくればコンシューマーで受信するまでもないですが、けじめとしてやります。
[receive.py]
#!/usr/bin/python3
import pika, sys, os
def main():
connection = pika.BlockingConnection( pika.ConnectionParameters( host="RabbitMQ", virtual_host="vhost2", credentials=pika.PlainCredentials('subro', 'P@ssw0rd') ))
channel = connection.channel()
def callback1(ch, method, properties, body):
print("queue1 [x] Received %r" % body)
def callback2(ch, method, properties, body):
print("queue2 [x] Received %r" % body)
def callback3(ch, method, properties, body):
print("queue3 [x] Received %r" % body)
def callback4(ch, method, properties, body):
print("queue4 [x] Received %r" % body)
channel.basic_consume(queue='queue1', on_message_callback=callback1, auto_ack=True)
channel.basic_consume(queue='queue2', on_message_callback=callback2, auto_ack=True)
channel.basic_consume(queue='queue3', on_message_callback=callback3, auto_ack=True)
channel.basic_consume(queue='queue4', on_message_callback=callback4, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
クライアント(Lubuntu 22.04)で実行します。
subro@Lubuntu2204:~/work/python/rabbitmq$ ./receive.py
[*] Waiting for messages. To exit press CTRL+C
qyeye1 [x] Received b'Hello World!'
qyeye2 [x] Received b'Hello World!'
qyeye3 [x] Received b'Hello World!'
qyeye4 [x] Received b'Hello World!'
上手く受信できています。
Ctrl+cで終了です。
もしこのような仕組みがない場合、メッセージの送信元で受信先を全て登録して個別に送る必要がありますね。
もしかすると送信元が非力なマシンで受信先が多数ある場合、このデータ送信作業で送信元が非常に重くなってしまうかも知れません。
データ受信側の事情でデータ送信元のサービスが滞ってしまうのは困りますので、こういった仕組みを送信元から離してあげることで、こういった問題を軽減できると思います。
また、受信先の増減に対しても、送信元をイジらないで良くなりますし、システム間連携を疎にできるという効果も期待できます。
送信元が IoT機器などの場合には、特に効果が得られそうな気がしました。