ThingsBoardでMQTTをためしてみた話

Pocket

ThingsBoardとAndroidでMQTTをためしてみた話

こんにちは。
とある事情によりThingsBoardでMQTTを試してみたのでまとめておきます。


ThingsBoardって何?

公式サイトによると

ThingsBoard
Open-source IoT Platform
Device management, data collection, processing and visualization for your IoT solution

ということで、IoT向けのデバイス管理やデータ処理、可視化などをするためのプラットフォームです。
環境構築は公式サイトをみていただくとして、本題に入ります。


ThingsBoardのMQTT API

ここにThingsBoard用のMQTT APIが記載されています。

ライブラリ

AndroidのMQTTライブラリはPahoが良さそうなのでそちらを使います。

公式サイト

プロジェクト直下のbuild.gradleに以下を

repositories {
    maven {
        url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
    }
}

app配下のbuild.gradleに以下を追加します。

dependencies {
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}

Connect

ThingsBoardはBrokerとして動作します。
Android端末からまずConnectしてみましょう。
今回はLocalHostにThingsBoardを立てた環境でやってみます。

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        val clientId = System.currentTimeMillis().toString()
        val client = MqttAndroidClient(applicationContext, "tcp://10.0.2.2:1883", clientId)

        client.setCallback(object : MqttCallbackExtended {
            override fun connectComplete(reconnect: Boolean, serverURI: String?) {
            }

            override fun messageArrived(topic: String?, message: MqttMessage?) {
            }

            override fun connectionLost(cause: Throwable?) {
            }

            override fun deliveryComplete(token: IMqttDeliveryToken?) {
            }
        })

        val mqttConnectOptions = MqttConnectOptions()
        mqttConnectOptions.isAutomaticReconnect = false
        mqttConnectOptions.isCleanSession = false
        mqttConnectOptions.userName = ThingsBoardで割り当てたデバイスのアクセストークン

        // ThingsBoardにConnect
        client.connect(mqttConnectOptions, null, object : IMqttActionListener {
            override fun onSuccess(asyncActionToken: IMqttToken?) {
            }

            override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
            }
        })
    }
}

AndroidManifest.xmlに以下を忘れないようにしましょう(忘れました)

        <service android:name="org.eclipse.paho.android.service.MqttService" />
    <uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
    <uses-permission android:name="android.permission.WAKE_LOCK" />

Publish

Telemetry にデータをPublishしてみましょう。

v1/devices/me/telemetry がTelemetryにPublishするためのTopicになります。

        client.setCallback(object : MqttCallbackExtended {
            override fun connectComplete(reconnect: Boolean, serverURI: String?) {
                var json = Gson().toJson(mapOf("foo" to "FOO", "bar" to "BAR"))
                val message = MqttMessage(json.toByteArray())
                client.publish("v1/devices/me/telemetry", message)
            }

これで指定したデバイスのTelemetryにデータ(fooとbar)がPublishされました。
ThingsBoard上のデバイスから確認してください。

Subscribe

さて、PublishしたからにはSubscribeもしてみましょう。

しかし

「どうやらTelemetryにはPublishしかできないらしい。Attributes APIを使用してPublish/Subscribeできるらしい? 」

という情報を得ました。

Attributesのtopicはv1/devices/me/attributes です。
つまりこのtopicに対してこんな感じ(↓)でPublish/SubscribeすればOKですね?

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        val clientId = System.currentTimeMillis().toString()
        val client = MqttAndroidClient(applicationContext, "tcp://10.0.2.2:1883", clientId)

        client.setCallback(object : MqttCallbackExtended {
            override fun connectComplete(reconnect: Boolean, serverURI: String?) {
                // subscribe登録
                client.subscribe("v1/devices/me/attributes", 0, null , object : IMqttActionListener {
                    override fun onSuccess(asyncActionToken: IMqttToken?) {
                        val json = Gson().toJson(mapOf("foo" to "FOO", "bar" to "BAR"))
                        val message = MqttMessage(json.toByteArray())
                        // ThingsBordにPublish
                        client.publish("v1/devices/me/attributes", message)
                    }

                    override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    }
                })
            }

            override fun messageArrived(topic: String?, message: MqttMessage?) {
            }

            override fun connectionLost(cause: Throwable?) {
            }

            override fun deliveryComplete(token: IMqttDeliveryToken?) {
            }
        })

        val mqttConnectOptions = MqttConnectOptions()
        mqttConnectOptions.isAutomaticReconnect = false
        mqttConnectOptions.isCleanSession = false
        mqttConnectOptions.userName = ThingsBoardで割り当てたデバイスのアクセストークン

        // ThingsBoardにConnect
        client.connect(mqttConnectOptions, null, object : IMqttActionListener {
            override fun onSuccess(asyncActionToken: IMqttToken?) {
            }

            override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
            }
        })
    }
}

これSubscribeの結果はどこに届くの・・・

ThingsBoardのデバイスの属性にはPublishしたデータは届いているのですが、Subscribeの結果がAndroid側に届いていません。
messageArrived()に届くと思っていましたが、どうやら違うようです・・・
どなたかご存知じゃないでしょうか?


おわりに

めちゃくちゃ中途半端になってしまいました。すいません。
進捗があったらまた書きます。

Pocket

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です