IoTデータをGridDBにRabbitMQで保存する

RabbitMQ は人気のメッセージキューイングシステムで、メッセージの配信が最も重要なさまざまなシステムで使用されています。私たちのケースでは、RabbitMQを使用して、後で処理するためにGridDBに配信されるフィールドセンサーデータの配信を確実にしたいと考えています。

もちろん、HTTP などの他の手段で常に現場からメインサーバーにデータを送信することもできますが、そのようなデータ転送方法は、気まぐれで安全ではありません。Apple Music で、接続状態が不安定な田舎の地域から音楽を聴こうとしたときに、接続エラーが発生してその後まったく音が聞こえなくなった経験は何度ありますか?いったん接続が途切れると、ハンドシェイクプロセス全体が再度発生するまで接続は回復せず、中継で送信されたデータはすべて完全に失われてしまいます。このプロジェクトにおけるRabbitMQの目標は、接続に問題が発生した場合でも、データがサーバーから受信確認を受け取り、GridDBに保存されたことを確認するまで、データを保持することです。

プロジェクト

この記事の目的は、IoT メッセージキューシステムの最も基本的な実証実験を行うことです。フィールドに設置された物理センサーが環境からデータを読み取り、その読み取り値を交換先に送信し、交換先からキューに、そして最終的にサーバーにデータを送信します。サーバーがすべてのデータを受信したことを確認すると、キューからその値を削除し、次の値(存在する場合)に処理を移します。

これを実現するには、まずハードウェアについて説明します。

ハードウェア

私たちは、大気質センサーAdafruit PMSA003I Air Quality BreakoutをこのSTEMMA HatとSTEMMAワイヤーを介して接続するために、Raspberry Pi 4をセットアップしました。この特定のセンサーについて詳しく知りたい方は、adafruitが提供するDocsページで読むことができます。

Ubuntuサーバーのキューからデータを受信します。仕様は重要ではありません。

次に、ソフトウェアを見てみましょう。

ソフトウェア

もちろん、関連データのメッセージのプッシュと受信には RabbitMQ を利用します。 RabbitMQ は多くのプログラミング言語に対応する各種コネクタを提供しているので、基本的には自由に組み合わせることができます(これも RabbitMQ をスタックに利用する隠れた利点です)。 今回は、生のセンサーデータを簡単に読み込んで変換できる Python ライブラリがすでに提供されているので、Python でペイロードデータをプッシュしたいと思います。GridDB Python Connectorの助けを借りて、別のPythonスクリプトでサーバー上でペイロードデータを受信することも可能ですが、GridDBのネイティブインターフェースであり、追加のダウンロードを必要としないJavaで受信することにします。

計画

全体的な計画は以下の通りです。

  1. RabbitMQをUbuntuサーバーにインストールする
  2. センサーの読み取りを行い、読み取り可能なデータペイロードに変換する(Python)
  3. 作成したExchange/Queueにデータをプッシュする
  4. Java(およびRabbitMQ)で受信キューを使用する
  5. 受信したペイロードを直接GridDBに保存する

実行方法

Pythonスクリプトは簡単に実行できます。必要なライブラリをインストールし、スクリプトを実行するだけです。python3 app.py

Javaの場合は、外部ライブラリへの依存関係があるため、それらを参照(libディレクトリにあります)し、そのように実行する必要があります。例えば:

$ cd lib/
$ export CP=.:amqp-client-5.16.0.jar:slf4j-api-1.7.36.jar:slf4j-simple-1.7.36.jar:gridstore-5.6.0.jar:jackson-databind-2.17.2.jar:jackson-core-2.17.2.jar:jackson-annotations-2.17.2.jar
$ java -cp $CP ../Recv.java

これら2つのファイルの実行順序は重要ではありません。キューが空の場合でも受信は維持されます。

事前準備と開始

このプロジェクトを1:1で実行したい場合に必要なものを以下に示します。

  1. Raspberry Pi
  2. STEMMA Hat & Wire(または他のボード接続手段
  3. Python、RabbitMQ、GridDB、Java、およびその他の各種ライブラリ

RabbitMQは、ダウンロードページからインストールできます。手順は簡単です。唯一の注意点は、新しいユーザーを作成し、適切な権限を設定する必要があることです。

$ sudo rabbitmqctl add_user username password
$ sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*"

ここで使用する認証情報は、データ送信者とデータ受信者の間の接続を確立する際に使用するものと同じになります。

注意点として、接続を行う際にパスワードに「特殊文字」を使用しようとしましたがうまくいきませんでした。そのため、現時点ではパスワードはシンプルなもの(A-zと数字のみ)にしておくことをお勧めします。

実装:プロデューサー

最後に、具体的な内容に入っていきましょう。まず、プロデューサー(Raspberry Pi)に焦点を当て、次にコンシューマー(サーバー)に移ります。また、メッセージが確実に配信され、データベースに保存されるように、いくつかの設定を行います。

データ読み取り用Pythonスクリプト

センサーデータの読み取りには、adafruitが提供するPythonスクリプトの修正版を使用します。基本的には、データの読み取り、JSONへの変換、Exchange/Queueへのプッシュという非常にシンプルなタスクです。まず、コードのハードウェア部分を見てみましょう。その後、キューを作成し、適切なマシンにプッシュするコードを見ていきます。

import board
import busio
from adafruit_pm25.i2c import PM25_I2C

reset_pin = None
i2c = busio.I2C(board.SCL, board.SDA, frequency=100000)
# Connect to a PM2.5 sensor over I2C
pm25 = PM25_I2C(i2c, reset_pin)
aqdata = pm25.read()

このコードの断片を読み込んで、センサーの読み取り値を翻訳する必要があります。これで、すべてが適切に接続されていると仮定して、現在の値を aqdata と呼ばれる変数に保存します。

Python コードによる RabbitMQ キューへのデータ作成とプッシュ

次に、RabbitMQ のコードを見てみましょう。まず、Ubuntu サーバーへの接続を確立します。アドレスをマシンの IP に指定し、ポートをデフォルトに設定します。また、以前に Ubuntu サーバーで作成した認証情報も使用します。

import pika

credentials = pika.PlainCredentials('israel', 'israel')
parameters = pika.ConnectionParameters('192.168.50.206',
                                   5672,
                                   '/',
                                   credentials)

connection = pika.BlockingConnection(parameters)
channel = connection.channel()

次に、キューにデータをメッセージとしてプッシュする方法を含め、キューのパラメータを作成して設定します。

channel.confirm_delivery()
channel.queue_declare(queue='airQuality', durable=True)

デフォルトでは、RabbitMQはスループットを何よりも優先します。つまり、たとえ接続が弱い場合でも、データを確実にサーバー(ブローカーとも呼ばれる)に送信できるように、デフォルトの構成オプションをいくつか変更する必要があります。

まず、confirm deliveryを有効にします。これにより、プロデューサーがブローカーからネガティブ・アクノレッジ(Nackとも呼ばれる)を受け取った場合に例外/エラーが発生します。つまり、データが落ちた場合、少なくともそのログが残るということです。残念ながら、Python側ではエラーメッセージの処理がそれほど堅牢ではありません。これが本番プロジェクトであれば、メッセージをさまざまな方法で処理できる他の言語にPythonから移行する必要があります。つまり、メッセージのバッチ処理を追加して、データの読み取りが中断される可能性を減らし、中断された作業の再送信を簡単にしたいと思います。

とにかく、今あるものを使って次にやることは、ブローカーがクラッシュしたり再起動したりした場合にキューを保存する「durable」を有効にすることです。これにより、「aqdata」を再作成する必要がなくなりますが、キュー内のメッセージは必ずしも保存されるわけではありません。

その後、データの読み取りと送信を同時に行います。

while True:
    time.sleep(1)

    try:
        aqdata = pm25.read()
        current_time = datetime.datetime.utcnow().replace(microsecond=0)
        now = current_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
        aqdata['ts'] = now
        aqdata['pm1'] = aqdata.pop('pm10 standard')
        aqdata['pm25'] = aqdata.pop('pm25 standard')
        aqdata['pm10'] = aqdata.pop('pm100 standard')
        aqdata['pm1e'] = aqdata.pop('pm10 env')
        aqdata['pm25e'] = aqdata.pop('pm25 env')
        aqdata['pm10e'] = aqdata.pop('pm100 env')
        aqdata['particles03'] = aqdata.pop('particles 03um')
        aqdata['particles05'] = aqdata.pop('particles 05um')
        aqdata['particles10'] = aqdata.pop('particles 10um')
        aqdata['particles25'] = aqdata.pop('particles 25um')
        aqdata['particles50'] = aqdata.pop('particles 50um')
        aqdata['particles100'] = aqdata.pop('particles 100um')
        #print(aqdata)
    except RuntimeError:
        print("Unable to read from sensor, retrying...")
        continue
    
    payload = json.dumps(aqdata)
    try: 
        channel.basic_publish(exchange='',
                        routing_key='airQuality',
                        body=payload,
                        properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
                        mandatory=True)
        print(" [x] Sent payload: " + payload)
    except pika.exceptions.UnroutableError:
        # If the message is not confirmed, it means something went wrong
        print("Message could not be confirmed")

このコードの断片では、センサーデータを読み取り、コンシューマー側で使用したいカラム名に変更し、その後、以前に作成したキューにペイロードをチャネルにプッシュします。ここで注意すべき点がいくつかあります。必須フラグを true に設定し、配信モードを永続的に設定しています。この2つの設定により、メッセージが安全に配信されたという肯定応答をブローカーから受け取らない場合、メッセージをディスクに保存しようと試みます。

例外は、ブローカーがプロデューサーに「ナック」(ネガティブ・アクノレッジ)を返した場合に発生します。

そして、1秒ごとに、スクリプトがセンサー値を読み取り、キューに追加します。データがブローカーによって確認されると、プロデューサーはもはやそのデータメッセージを気にかける必要がなくなります。

実装:コンシューマー

私たちのコンシューマーは Java で作成され、その役割は、私たちのブローカー(私たちの場合はコンシューマーと同じホストマシン)のキューからデータを読み取り、データを Java オブジェクトにアンマーシャルし、結果を GridDB に保存することです。

Java でのキューの消費

コンシューマー部分のコードは比較的シンプルです。接続を確立し、キューからデータを読み取ります。

private final static String QUEUE_NAME = "airQuality";
private final static boolean AUTO_ACK = false;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

ここでは、ブローカー(コンシューマーと同じマシンでホストされているため、localhost)への接続を行っています。読み込みたいキューを宣言し、いくつかのオプションを設定しています。最初の true を除いて、すべてデフォルト値を使用しています。この true は、前述のPythonセクションで説明したとおり、ブローカーがダウンした場合でもキューが持続することを意味する「耐久モード」に対応しています。

次に、実際の消費を実行してみましょう。

channel.basicConsume(QUEUE_NAME, AUTO_ACK, deliverCallback, consumerTag -> { });

ここで指摘したいのは、AUTO_ACKオプションをオフにしている(FALSEに設定している)ということです。これは、キューからメッセージが正常に読み込まれたかどうかを手動で確認する必要があることを意味します。

次に、キューから新しいメッセージが読み込まれるたびに実行されるコールバック関数です。

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            byte[] data = delivery.getBody();
   
            try {
                AirData ad = mapper.readValue(data, AirData.class);
                String jsonString = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(ad);
                System.out.println(jsonString);
                container.put(ad);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            } catch (Exception e) {
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                System.out.println("Setting nack");
            }
        };

以下がその処理です。メッセージ(バイト配列の一種)を読み込み、Jackson JSONライブラリを使用して、生のバイト列から、AirDataという名前のクラスに値をアンマーシャルします。

    static public class AirData {
        @JsonProperty("ts")
        @RowKey Date ts;
        @JsonProperty("pm1")
        double pm1;
        @JsonProperty("pm25")
        double pm25;
        @JsonProperty("pm10")
        double pm10;
        @JsonProperty("pm1e")
        double pm1e;
        @JsonProperty("pm25e")
        double pm25e;
        @JsonProperty("pm10e")
        double pm10e;
        @JsonProperty("particles03")
        double particles03;
        @JsonProperty("particles05")
        double particles05;
        @JsonProperty("particles10")
        double particles10;
        @JsonProperty("particles25")
        double particles25;
        @JsonProperty("particles50")
        double particles50;
        @JsonProperty("particles100")
        double particles100;
    }

次に、新たに作成したJavaオブジェクトをGridDBに保存し、最後にメッセージを受信したことをブローカーに通知します。何か問題が発生した場合は、nackを送信し、メッセージはackを受信するまでキューに残ります。

GridDB

最後に、GridDBがどのようにこの仕組みに適合するのかを見てみましょう。GridDBへの接続を行い、タイムシリーズコンテナを取得します。今回は、一度きりしか使わないJavaコードを書くよりも簡単なので、シェルでテーブル/コンテナを作成しました。

$ sudo su gsadm
$ gs_sh
gs> createtimeseries aqdata NO ts timestamp pm1 double pm25 double pm10 double pm1e double pm25e double pm10e double particles03 double particles05 double particles10 double particles25 double particles50 double particles100 double

そして今、Javaコードで接続を行います。

    public static GridStore GridDBNoSQL() throws GSException {

        GridStore store = null;

        try {
            Properties props = new Properties();
            props.setProperty("notificationMember", "127.0.0.1:10001");
            props.setProperty("clusterName", "myCluster");
            props.setProperty("user", "admin");
            props.setProperty("password", "admin");
            store = GridStoreFactory.getInstance().getGridStore(props);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return store;
    }

先ほど作成した AirData クラスを使用して、新しく作成したコンテナを取得します。

TimeSeries<airdata> container = store.getTimeSeries("aqdata", AirData.class);
System.out.println("Connected to GridDB!");</airdata>

そして、これはすでに上記で説明済みですが、新しいペイロードを受け取ると、すぐにGridDBに保存し、肯定応答を送信します。

container.put(ad);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

結論

本記事では、Pythonプロデューサーから名前なし(”)の交換にIoTデータを安全に転送し、airQualityと呼ばれるキューを格納するブローカーに転送し、最後にJavaコンシューマーが読み取るという堅牢なシステムを構築しました。

If you have any questions about the blog, please create a Stack Overflow post here https://stackoverflow.com/questions/ask?tags=griddb .
Make sure that you use the “griddb” tag so our engineers can quickly reply to your questions.

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

このサイトはスパムを低減するために Akismet を使っています。コメントデータの処理方法の詳細はこちらをご覧ください