メッセージキューイングシステムの RabbitMQを2台のクラスタにして冗長化します。
「RabbitMQでメッセージを受け渡し」で RabbitMQのインストールをし、
「RabbitMQのクラスタ化(前編)」でサーバーを 2台使って冗長化しました。
ここでは前編の続きで、メッセージを入れるキューを作ります。
ただそのキューにはクラスタ内でデータを持ち合ってデータの冗長化もして欲しい、そういうコンセプトです。
RabbitMQには従来ミラーリングというデータレプリケーションの仕組みがありましたが、Ver.3.8.0から従来の方法に加えて「クォーラムキュー」という仕組みが導入されました。
かつ従来の方法は現在非推奨になっており、いずれ削除される機能と位置づけられているので、これからキューを作る場合はクオーラムキューにしておくのが良さそうです。
クォーラムキューについては以下のドキュメントを参照願います。
Quorum Queues
沢山書いてありますが、クォーラムキューの設計思想として「データを安全にする」という事があるようで、それ故にクラスタにおいてはデータのレプリケーションがデフォルトになっています。
また、データを永続ストレージに書き込む(要するにメモリじゃなくディスクに書くということ)ようにしていて、それ故にパフォーマンスがチョイ下がるとのこと。
純粋な情報通知など、ある程度データが失われても良い要件ではなく、データの伝送が非常に重要(お金に関わる事とか)な要件に使うべし、なんだそうです。
従来のキューの全ての機能が実装されている訳ではないので移行には気をつけてね、って書いてありました。
仕組み的に必要の無くなった機能以外でまだ実装されていない機能は順次実装中のようです。
ここで一つ問題があって、前編では実験環境のリソース使用量を少なくするため 2台のサーバーで冗長化を目論んでいたのですが、クォーラムキューの説明では奇数台のノードを作るよう書かれておりますので、ここでもう1つノードを増やしました。
[RabbitMQ2]を作った要領で[RabbitMQ3]を作っています。
結果、
マシン名 | IPアドレス | |
1 | RabbitMQ1 | 192.168.1.113 |
2 | RabbitMQ2 | 192.168.1.114 |
2 | RabbitMQ3 | 192.168.1.115 |
3 | Lubuntu2204 | DHCP |
という環境になりました。
subro@RabbitMQ3:~$ sudo rabbitmqctl cluster_status
Cluster status of node rabbit@RabbitMQ3 ...
Basics
Cluster name: rabbit@RabbitMQ3
Total CPU cores available cluster-wide: 6
Disk Nodes
rabbit@RabbitMQ1
rabbit@RabbitMQ2
rabbit@RabbitMQ3
Running Nodes
rabbit@RabbitMQ1
rabbit@RabbitMQ2
rabbit@RabbitMQ3
Versions
rabbit@RabbitMQ1: RabbitMQ 3.11.13 on Erlang 25.3
rabbit@RabbitMQ2: RabbitMQ 3.11.13 on Erlang 25.3
rabbit@RabbitMQ3: RabbitMQ 3.11.13 on Erlang 25.3
CPU Cores
Node: rabbit@RabbitMQ1, available CPU cores: 2
Node: rabbit@RabbitMQ2, available CPU cores: 2
Node: rabbit@RabbitMQ3, available CPU cores: 2
Maintenance status
Node: rabbit@RabbitMQ1, status: not under maintenance
Node: rabbit@RabbitMQ2, status: not under maintenance
Node: rabbit@RabbitMQ3, status: not under maintenance
Alarms
(none)
Network Partitions
(none)
Listeners
Node: rabbit@RabbitMQ1, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@RabbitMQ1, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@RabbitMQ2, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@RabbitMQ2, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@RabbitMQ3, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@RabbitMQ3, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Feature flags
Flag: classic_mirrored_queue_version, state: enabled
Flag: classic_queue_type_delivery_support, state: enabled
Flag: direct_exchange_routing_v2, state: enabled
Flag: feature_flags_v2, state: enabled
Flag: implicit_default_bindings, state: enabled
Flag: listener_records_in_ets, state: enabled
Flag: maintenance_mode_status, state: enabled
Flag: quorum_queue, state: enabled
Flag: stream_queue, state: enabled
Flag: stream_single_active_consumer, state: enabled
Flag: tracking_records_in_ets, state: enabled
Flag: user_limits, state: enabled
Flag: virtual_host_metadata, state: enabled
とりあえずこれで箱はできました。
これからクォーラムキューを作ります。
rabbitmqctlコマンドで早速作ろうとしたんですが、残念ながらこのコマンドにはその機能は無いようです。
どういう手があるかというと、
- 開発言語で APIを叩く
- WEB管理画面から作る
RabbitMQの環境を通信インフラと捉えると、実運用において開発者にキュー作成を自由にやらせたくありません。
だから運用管理者としては API以外の方法を取りたい。
WEB画面は RabbitMQのプラグインを導入すると綺麗な画面でできますし、このプラグインにより[rabbitmqadmin]というコマンド(実態はプラグインが用意する WebAPIのクライアントのようです)が使えるようになります。
どうしようか考えた末「本番環境には必要ないものは入れない」という考えより、Pythonで APIを叩くことにしました。
ここでまた、RabbitMQを操作できる Pythonのモジュール「Pika」を使いますが、日本人としてこの名前はなぁ…って今回も思います。
キューを作る前に一つ考えなければならないことが出てきました。
「キューをどこのバーチャルホストに作るか」を設計しないといけません。
RabbitMQでは「バーチャルホスト」という名称を使っていますが、要するに管理する枠のことで、省略時には「/」(無印)になっているだけのことなのでした。
バーチャルホストに対してキューを作り、そのキューを操作するユーザーの権限を設定することでマルチテナント環境を作れるようになっています。
本番運用では必須でしょうから、バーチャルホストを作り、ユーザーも作って権限設定もします。
[RabbitMQ3]で実行します。(他でも良い)
バーチャルホスト作成です。
ドキュメントはこちら
Virtual Hosts
名前は必ずしも「/」で始まる木構造風にしなくても良いようですが、無印のバーチャルホストが「/」で表されているので、「/」で始める名前にしている人が多いようです。
「/testvhost」という名前で作ることにしました。
subro@RabbitMQ3:~/work/python/rabbitmq$ sudo /usr/sbin/rabbitmqctl add_vhost /testvhost
Adding vhost "/testvhost" ...
何も出ませんが、できたようです。
バーチャルホストのリストを取ってみます。
subro@RabbitMQ3:~/work/python/rabbitmq$ sudo /usr/sbin/rabbitmqctl list_vhosts
Listing vhosts ...
name
/testvhost
/
できました。
ユーザーを作ります。
ドキュメントはこちら。
Managing Users and Permissions
「subro」ユーザーで、パスワードが「password」にします。
subro@RabbitMQ3:~/work/python/rabbitmq$ sudo /usr/sbin/rabbitmqctl add_user subro password
Adding user "subro" ...
Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more.
パーミッション(権限)設定するのを忘れるな!って言われていますね。
ユーザーリストを取ってみます。
subro@RabbitMQ3:~/work/python/rabbitmq$ sudo /usr/sbin/rabbitmqctl list_users
Listing users ...
user tags
subro []
guest [administrator]
ユーザーができています。
権限設定をします。
上のドキュメント(Managing Users and Permissions)よりチョイ下にある「Granting Permissions to a User」という段落に書いてあります。
[-p]オプションで、バーチャルホスト名・ユーザー名・設定変更権限・書き込み権限・読み込み権限、となっていて、この例では全権としています。
subro@RabbitMQ3:~/work/python/rabbitmq$ sudo /usr/sbin/rabbitmqctl set_permissions -p /testvhost subro ".*" ".*" ".*"
Setting permissions for user "subro" in vhost "/testvhost" ...
バーチャルホスト(/testvhost)の権限リストを取ってみます。
subro@RabbitMQ3:~/work/python/rabbitmq$ sudo /usr/sbin/rabbitmqctl list_permissions -p /testvhost
Listing permissions for vhost "/testvhost" ...
user configure write read
subro .* .* .*
ずれてますけど、設定できていると思います。
(ターミナルでもずれてました)
やっとクォーラムキュー作成です。
Pythonのプログラムは[RabbitMQ3]で実行します。(他でも良い)
Pikaを入手します。
subro@RabbitMQ3:~/work/python/rabbitmq$ python3 -m pip install pika
Defaulting to user installation because normal site-packages is not writeable
Collecting pika
Downloading pika-1.3.1-py3-none-any.whl (155 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 155.3/155.3 KB 3.7 MB/s eta 0:00:00
Installing collected packages: pika
Successfully installed pika-1.3.1
入りました。
2023年4月6日時点での最新版は、1.3.1です。
クォーラムキュー作成のソースはこんな風。
/testvhostに「test」という名前でクォーラムキューを作ります。
[create_query.py]
import pika
connection = pika.BlockingConnection( pika.ConnectionParameters( host="localhost", virtual_host="/testvhost", credentials=pika.PlainCredentials('subro', 'password') ))
channel = connection.channel()
# durable = true → RabbitMQを再起動してもキューが残る
# x-queue-type に quorum を指定することでクォーラムキューになる
message = channel.queue_declare(queue="test", durable=True, arguments={"x-queue-type": "quorum"})
print(message)
connection.close()
実行します。
subro@RabbitMQ3:~/work/python/rabbitmq$ python3 create_queue.py
<METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=test'])>"])>
できたようです。
キューの確認。
subro@RabbitMQ3:~/work/python/rabbitmq$ sudo rabbitmqctl list_queues -p /testvhost
Timeout: 60.0 seconds ...
Listing queues for vhost /testvhost ...
name messages
test 0
できていますね。
これでクォーラムキューができたわけですので、クライアントからメッセージを送ってみましょう。
RabbitMQのチュートリアルにあったプログラムをこの環境用に改変したものが以下です。
[send.py]
#!/usr/bin/python3
import pika
# virtual_host でバーチャルホスト名を指定
connection = pika.BlockingConnection( pika.ConnectionParameters( host="RabbitMQ3", virtual_host="/testvhost", credentials=pika.PlainCredentials('subro', 'password') ))
channel = connection.channel()
# routeing_key でキュー名を指定。
channel.basic_publish(exchange='',
routing_key='test',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
[Lubuntu2204]で実行します。
subro@Lubuntu2204:~/work/python/rabbitmq$ ./send.py
[x] Sent 'Hello World!
キューの状態を取ってみます。
これは[RabbitMQ3]で実行しています。
subro@RabbitMQ3:~/work/python/rabbitmq$ sudo rabbitmqctl list_queues -p /testvhost
Timeout: 60.0 seconds ...
Listing queues for vhost /testvhost ...
name messages
test 1
メッセージが 1つキューイングされました。
受信します。
これもチュートリアルのを改変したものです。
[receive.py]
#!/usr/bin/python3
import pika, sys, os
def main():
connection = pika.BlockingConnection( pika.ConnectionParameters( host="RabbitMQ3", virtual_host="/testvhost", credentials=pika.PlainCredentials('subro', 'password') ))
channel = connection.channel()
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='test', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
[Lubuntu2204]で実行します。
subro@Lubuntu2204:~/work/python/rabbitmq$ ./receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'Hello World!'
メッセージを受信できました。
永久ループしているのでCtrl+cで終わらせます。
長くなってしまいましたが、どうにかクォーラムキューを使ってのメッセージ送受信ができました。
「RabbitMQのクラスタ化(後編)では、冗長化の具合を見てみようと思います。