herokuのCloudMQTTでデータを定期送信してみた(CとPHPで)
MQTTを試してみたいと思っていたら、herokuのadd-onにCloud MQTTが。
早速チャレンジしてみました。
MQTTは、データを送るPublisher、データを取得するSubscriber、その間でデータを受け渡しするBrokerの3本立てで動きます。
言語は、PublisherがC言語、SubscriberがPHPです。
(どっちもぱいそんで、みたいな事例は他へどうぞ……)
今回は、クライアントのIPアドレスと送信時間を、一時間に一回、MQTTで送ってみます。
1.cloudMQTT
MQTTのBrokerです。
CloudMQTT
https://devcenter.heroku.com/articles/cloudmqtt
herokuそのものの使い方はぐぐってください。
herokuのアプリのadd-onに、cloudMQTTを追加してください。
確認用なのでまずは無料のプランで大丈夫です。
追加すると、接続情報などが生成されます。
この画面のUserなどの情報を接続に使います。
heroku上で使うときは、環境変数に登録されているので、楽ちんです。
2.クライアントのプログラム
MQTTのPublisherです。
IoTに使う機器で、OSはubuntuなどDebian系です(ラズパイ的な)。
C言語です。
現在、現場で使用している機器が、CやC++だと扱いやすいことが多いからです。
クライアントのIPアドレスを取得して、時間を付け加えて、定期的にブローカーに送ってみます。
Cがすでにコンパイルできる前提ですすめていますが、Cが入っていないときは apt-get install build-essential でもしておいてください。
そして、MQTTのライブラリを入れます。ソースコードがここにあるので、落として、インストールしてください。
https://github.com/eclipse/paho.mqtt.c
https://www.eclipse.org/paho/clients/c/ ここのLinux用のコマンドを。
1 2 3 4 |
git clone https://github.com/eclipse/paho.mqtt.c.git cd org.eclipse.paho.mqtt.c.git make sudo make install |
そして、IPアドレスと時間をcloudMQTTに送るCのソースコードです。
こちらのサイトなどを参考にしました。
CでMQTTを利用する。
MQTTクライアントを使うことで、C言語など様々な言語でMilkcocoaを利用できるようになりました
ここではごく単純な、送るだけのプログラムです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
#include<stdio.h> #include<stdlib.h> #include<string.h> #include<errno.h> #include<getopt.h> //MQTT用 #include<MQTTClient.h> //IPアドレス用 #include<sys/types.h> #include<sys/socket.h> #include<unistd.h> #include<sys/ioctl.h> #include<netinet/in.h> #include<net/if.h> #include<arpa/inet.h> //時間用 #include<time.h> #define ADDRESS "tcp://m【Serverの番号】.cloudmqtt.com:【Port】" #define USERNAME "【User】" #define USERPASS "【Password】" #define QOS 1//【0~2】到達の確実性が変わる #define TIMEOUT 10000L //MQTT用 char connect_id[30]; connect_id[0]='\0';//接続IDの初期化 //実行時に、以下のようにIPを取りたいネットワークを指定する //ipsend ppp0 :これはsim //ipsend wlan0 :これは無線LAN int main(int argc,char*argv[]){ if(argc < 2){ fprintf(stderr,"parameter error\n"); return -1; } //時間用の変数 time_t timer; struct tm *t_st; char strtime[30]; //IP用の変数 int fd; struct ifreq ifr; //MQTT用の変数 int rc; MQTTClient client; MQTTClient_connectOptions conn_opts=MQTTClient_connectOptions_initializer; MQTTClient_message pubmsg=MQTTClient_message_initializer; MQTTClient_deliveryToken token; char strmsg[128];//message strmsg[0]='\0'; strcat(strmsg,"{"); char strtopic[128];//topic strtopic[0]='\0'; //topicを設定 strcat(strtopic,"client0001");//このクライアントの識別番号 strcat(strtopic,"/ip"); //時間を取得して文字列に time(&timer); t_st = localtime(&timer); sprintf(strtime,"%d-%d-%d %d:%d:%d",t_st->tm_year + 1900,t_st->tm_mon+1,t_st->tm_mday,t_st->tm_hour,t_st->tm_min,t_st->tm_sec); //時間をmessageに strcat(strmsg,"\"tm\":\""); strcat(strmsg,strtime); strcat(strmsg,"\","); //IP4アドレスを取得 fd=socket(AF_INET,SOCK_DGRAM,0); ifr.ifr_addr.sa_family=AF_INET; strncpy(ifr.ifr_name,argv[1],IFNAMSIZ-1); ioctl(fd,SIOCGIFADDR,&ifr); close(fd); //IP4アドレスをmessageに strcat(strmsg,"\"ip\":\""); strcat(strmsg,inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr)); strcat(strmsg,"\"}"); //MQTT接続 connect_id="user01";//この接続IDが同じものは同時に接続できない MQTTClient_create(&client,ADDRESS,client_id,MQTTCLIENT_PERSISTENCE_NONE,NULL); conn_opts.keepAliveInterval=20; conn_opts.cleansession=1; conn_opts.username=USERNAME; conn_opts.password=USERPASS; if((rc=MQTTClient_connect(client,&conn_opts)) != MQTTCLIENT_SUCCESS){ fprintf(stderr,"mqtt connect error\n"); return -1; } //MQTT設定 pubmsg.payload=strmsg; pubmsg.payloadlen=strlen(strmsg); pubmsg.qos=QOS; pubmsg.retained=0; //MQTT送信 MQTTClient_publishMessage(client,strtopic,&pubmsg,&token); rc=MQTTClient_waitForCompletion(client,token,TIMEOUT); printf("%s : %s \nMessage with delivery token %d delivered\n",strtopic,strmsg,token); MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); return 0; } |
「//この接続IDが同じものは同時に接続できない」とありますが、IDの違うクライアントからの同時接続数が、ライセンスの違いの一つです。
注意点は、メッセージが長すぎると受信してくれません。そもそも、そんな長いならわざわざMQTTにする意味がなさそう。
そして、コンパイル。デバッグなどのオプションは各々追加してください。
1 |
gcc ipsend.c -o ipsend -lm -lpaho-mqtt3c |
ipsendというアプリができたら crontabに定期実行を登録。
sudoかsuで、
1 2 3 4 5 6 7 8 9 |
crontab -e #エディタが開くので記入 #これは、9時-18時の営業時間内、毎時0分に、実行する 0 9-18 * * * /【実行ファイルまでのフルパス】/ipsend ppp0 #これはsim用。LANのときはwlan0やeth0 #五分ごとならこう */5 6-18 * * * /【実行ファイルまでのフルパス】/ipsend ppp0 |
fprintf でエラーを書き出しているので、書き出したいときは、 実行時にログファイルを指定してください。
cloudMQTTに送られているかを確認。
一行目のオレンジが、上のプログラムで送ったものです。
他のプログラムやクライアントからも次々と送られてきています。topicは階層でアクセスできます。
3.ウェブ側のプログラム
MQTTのSubscriberです。
MQTTサーバーは待ち受けているものを受け取るだけなので、受信したデータがないかを見に行かないと取得できません。
herokuにPHPで書きました。dynoをケチって、heroku Schedulerを使わずに、PHPで動いている社内アプリのあるサーバーで強引に動かしました。
VPSからのウェブアクセスで、このプログラムをたたき起こしています。
//herokuは30秒で返ってこないとエラーで止まってしまうので25秒間だけ動かすためのタイマー
などというダメな変数があるのですが、ケチらずにスケジューラーを使えば24時間を超えなければ必要ないのでは?
まず、phpMQTT.phpを入れる。
https://github.com/bluerhinos/phpMQTT
composerでも入るそうですがエラーになったので、ソースコードを落として置きました。
PHP用のSubscriberのサンプルコードは上にあるのですが、herokuのスケジューラーじゃなくてウェブサイトだと問題が。
while($mqtt->proc()) {} これが30秒で止まるため、この無限ループがエラーになる。
なのでこう作りまして。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
<?php //phpMQTT.phpを読み込む require('/app/lib/phpMQTT.php'); //"一階層/ip"というtopicのデータを取得したい //+はワイルドカード $tpc = "+/ip"; $url = parse_url(getenv('CLOUDMQTT_URL')."/".$tpc);//heroku上なので環境変数が使える $topic = substr($url['path'], 1); $connect_id="ipget";//ここが同じクライアントからは同時接続できない global $cnt;//処理数のカウンターにつかう $cnt=0; //処理の関数 function procmsg($topic, $msg){ //ここにデータを色々する処理 //処理の回数を数えてみる $cnt++; } //MQTTのクラスをつくる $mqtt = new Bluerhinos\phpMQTT($url['host'], $url['port'], $connect_id); //データが来ていないか問い合わせ続ける if ($mqtt->connect(true, NULL, $url['user'], $url['pass'])) { $topics[$topic] = array( "qos" => 0, "function" => "procmsg"//処理の関数名 ); $mqtt->subscribe($topics,0); //herokuは30秒で返ってこないとエラーで止まってしまうので25秒間だけ動かすためのタイマー $stime=time(); //25秒間だけ動かす while($mqtt->proc()){ if(time()-$stime>25)break; } if($cnt>1){ //上の処理が行われたときにだけ、最後に行いたい処理 //DBに入れるとか、メールを送るとか、時間のかかる処理を } }else{ echo "connect err"; } ?> |
三日くらい動かしてみましたが、SubscriberによってDBにどんどんIPアドレスが溜まっていくのを確認できました。