EnigmaIOT  0.9.8
Secure sensor and gateway platform based on ESP8266 and ESP32
GwOutput_mqtt.cpp
Go to the documentation of this file.
1 
11 #include <Arduino.h>
12 #include "GwOutput_mqtt.h"
13 #include <ArduinoJson.h>
14 #include <ESPAsyncWebServer.h>
15 #include <helperFunctions.h>
16 #include <EnigmaIOTdebug.h>
17 #include <PubSubClient.h>
18 
19 #ifdef ESP32
20 #include <WiFi.h>
21 #include <AsyncTCP.h>
22 #include "esp_system.h"
23 #include "esp_event.h"
24 #include "mqtt_client.h"
25 #include "esp_tls.h"
26 #elif defined(ESP8266)
27 #include <ESP8266WiFi.h>
28 #include <ESPAsyncTCP.h>
29 #include <Hash.h>
30 #include <SPI.h>
31 #ifdef SECURE_MQTT
32 #include <WiFiClientSecure.h>
33 #else
34 #include <WiFiClient.h>
35 #endif // SECURE_MQTT
36 #endif // ESP32
37 
38 #include <FS.h>
39 
40 
42 
44  enigmaIotGateway = enigmaIotGw;
45  mqttServerParam = new AsyncWiFiManagerParameter ("mqttserver", "MQTT Server", mqttgw_config.mqtt_server, 41, "required type=\"text\" maxlength=40");
46  char port[10];
47  itoa (mqttgw_config.mqtt_port, port, 10);
48  mqttPortParam = new AsyncWiFiManagerParameter ("mqttport", "MQTT Port", port, 6, "required type=\"number\" min=\"0\" max=\"65535\" step=\"1\"");
49  mqttUserParam = new AsyncWiFiManagerParameter ("mqttuser", "MQTT User", mqttgw_config.mqtt_user, 21, "required type=\"text\" maxlength=20");
50  mqttPassParam = new AsyncWiFiManagerParameter ("mqttpass", "MQTT Password", "", 41, "type=\"password\" maxlength=40");
51 
56 
57 }
58 
60  if (!FILESYSTEM.begin ()) {
61  DEBUG_WARN ("Error opening filesystem");
62  }
63  DEBUG_DBG ("Filesystem opened");
64 
65  File configFile = FILESYSTEM.open (CONFIG_FILE, "w");
66  if (!configFile) {
67  DEBUG_WARN ("Failed to open config file %s for writing", CONFIG_FILE);
68  return false;
69  } else {
70  DEBUG_DBG ("%s opened for writting", CONFIG_FILE);
71  }
72 
73  const size_t capacity = JSON_OBJECT_SIZE (4) + 110;
74  DynamicJsonDocument doc (capacity);
75 
76  doc["mqtt_server"] = mqttgw_config.mqtt_server;
77  doc["mqtt_port"] = mqttgw_config.mqtt_port;
78  doc["mqtt_user"] = mqttgw_config.mqtt_user;
79  doc["mqtt_pass"] = mqttgw_config.mqtt_pass;
80 
81  if (serializeJson (doc, configFile) == 0) {
82  DEBUG_ERROR ("Failed to write to file");
83  configFile.close ();
84  //FILESYSTEM.remove (CONFIG_FILE); // Testing only
85  return false;
86  }
87 
88  String output;
89  serializeJsonPretty (doc, output);
90 
91  DEBUG_DBG ("%s", output.c_str ());
92 
93  configFile.flush ();
94  //size_t size = configFile.size ();
95 
96  configFile.close ();
97  DEBUG_DBG ("Gateway configuration saved to flash. %u bytes", configFile.size ());
98  return true;
99 }
100 
102  //FILESYSTEM.remove (CONFIG_FILE); // Only for testing
103  bool json_correct = false;
104 
105  if (!FILESYSTEM.begin ()) {
106  DEBUG_WARN ("Error starting filesystem. Formatting");
107  FILESYSTEM.format ();
108  WiFi.disconnect ();
109  }
110 
111  if (FILESYSTEM.exists (CONFIG_FILE)) {
112 
113  DEBUG_DBG ("Opening %s file", CONFIG_FILE);
114  File configFile = FILESYSTEM.open (CONFIG_FILE, "r");
115  if (configFile) {
116  //size_t size = configFile.size ();
117  DEBUG_DBG ("%s opened. %u bytes", CONFIG_FILE, configFile.size ());
118 
119  const size_t capacity = JSON_OBJECT_SIZE (4) + 110;
120  DynamicJsonDocument doc (capacity);
121 
122  DeserializationError error = deserializeJson (doc, configFile);
123 
124  if (error) {
125  DEBUG_ERROR ("Failed to parse file");
126  } else {
127  DEBUG_DBG ("JSON file parsed");
128  }
129 
130  if (doc.containsKey ("mqtt_server") && doc.containsKey ("mqtt_port")
131  && doc.containsKey ("mqtt_user") && doc.containsKey ("mqtt_pass")) {
132  json_correct = true;
133  }
134 
135  strncpy (mqttgw_config.mqtt_server, doc["mqtt_server"] | "", sizeof (mqttgw_config.mqtt_server));
136  mqttgw_config.mqtt_port = doc["mqtt_port"].as<int> ();
137  strncpy (mqttgw_config.mqtt_user, doc["mqtt_user"] | "", sizeof (mqttgw_config.mqtt_user));
138  strncpy (mqttgw_config.mqtt_pass, doc["mqtt_pass"] | "", sizeof (mqttgw_config.mqtt_pass));
139 
140  configFile.close ();
141  if (json_correct) {
142  DEBUG_INFO ("MQTT output module configuration successfuly read");
143  }
144  DEBUG_DBG ("==== MQTT Configuration ====");
145  DEBUG_DBG ("MQTT server: %s", mqttgw_config.mqtt_server);
146  DEBUG_DBG ("MQTT port: %d", mqttgw_config.mqtt_port);
147  DEBUG_DBG ("MQTT user: %s", mqttgw_config.mqtt_user);
148  DEBUG_VERBOSE ("MQTT password: %s", mqttgw_config.mqtt_pass);
149 
150  String output;
151  serializeJsonPretty (doc, output);
152 
153  DEBUG_DBG ("JSON file %s", output.c_str ());
154 
155  } else {
156  DEBUG_WARN ("Error opening %s", CONFIG_FILE);
157  }
158  } else {
159  DEBUG_WARN ("%s do not exist", CONFIG_FILE);
160  }
161 
162  return json_correct;
163 }
164 
165 
167  DEBUG_INFO ("==== Config Portal MQTTGW result ====");
168  DEBUG_INFO ("MQTT server: %s", mqttServerParam->getValue ());
169  DEBUG_INFO ("MQTT port: %s", mqttPortParam->getValue ());
170  DEBUG_INFO ("MQTT user: %s", mqttUserParam->getValue ());
171  DEBUG_INFO ("MQTT password: %s", mqttPassParam->getValue ());
172  DEBUG_INFO ("Status: %s", status ? "true" : "false");
173 
175  memcpy (mqttgw_config.mqtt_server, mqttServerParam->getValue (), mqttServerParam->getValueLength ());
176  mqttgw_config.mqtt_server[mqttServerParam->getValueLength ()] = '\0';
177  DEBUG_DBG ("MQTT Server: %s", mqttgw_config.mqtt_server);
178  mqttgw_config.mqtt_port = atoi (mqttPortParam->getValue ());
179  memcpy (mqttgw_config.mqtt_user, mqttUserParam->getValue (), mqttUserParam->getValueLength ());
180  const char* mqtt_pass = mqttPassParam->getValue ();
181  if (mqtt_pass && (mqtt_pass[0] != '\0')) {// If password is empty, keep the old one
182  memcpy (mqttgw_config.mqtt_pass, mqtt_pass, mqttPassParam->getValueLength ());
183  mqttgw_config.mqtt_pass[mqttPassParam->getValueLength ()] = '\0';
184  } else {
185  DEBUG_INFO ("MQTT password field empty. Keeping the old one");
186  }
187  DEBUG_DBG ("MQTT pass: %s", mqttgw_config.mqtt_pass);
188  if (!saveConfig ()) {
189  DEBUG_ERROR ("Error writting MQTT config to filesystem.");
190  } else {
191  DEBUG_INFO ("Configuration stored");
192  }
193  } else {
194  DEBUG_DBG ("Configuration does not need to be saved");
195  }
196 
197  delete (mqttServerParam);
198  delete (mqttPortParam);
199  delete (mqttUserParam);
200  delete (mqttPassParam);
201 }
202 
204  //this->mqtt_queue = new EnigmaIOTRingBuffer<mqtt_queue_item_t> (MAX_MQTT_QUEUE_SIZE);
205 #ifdef SECURE_MQTT
206  randomSeed (micros ());
207 #ifdef ESP32
208  espClient.setCACert (DSTroot_CA);
209 #elif defined(ESP8266)
210  espClient.setTrustAnchors (&certificate);
211 #endif // ESP32
212  DEBUG_INFO ("CA store set");
213 #endif // SECURE_MQTT
215  DEBUG_INFO ("Set MQTT server %s - port %d", mqttgw_config.mqtt_server, mqttgw_config.mqtt_port);
216  mqtt_client.setBufferSize (MQTT_BUFFER_SIZE);
218 
219 #ifdef ESP32
220  uint64_t chipid = ESP.getEfuseMac ();
221  clientId = netName + String ((uint32_t)chipid, HEX);
222 #elif defined(ESP8266)
223  clientId = netName + String (ESP.getChipId (), HEX);
224 #endif // ESP32
225 
226  configTime (0, 0, NTP_SERVER_1, NTP_SERVER_2);
227  setenv ("TZ", TZINFO, 1);
228  tzset ();
229 
231  reconnect ();
232  return true;
233 }
234 
236  // Loop until we're reconnected
237  while (!mqtt_client.connected ()) {
238  // TODO: startConnectionFlash (500);
239  DEBUG_INFO ("Attempting MQTT connection...");
240  for (int i = 1; i < 5; i++) {
241  DEBUG_WARN ("Connecting to WiFi %s", WiFi.SSID ().c_str ());
242  delay (1000);
243  if (WiFi.isConnected ()) {
244  DEBUG_WARN ("WiFi is connected");
245  break;
246  } else {
247  WiFi.begin ();
248  }
249  }
250  // Create a random client ID
251  // Attempt to connect
252 //#ifdef SECURE_MQTT
253  setClock ();
254 //#endif
255  DEBUG_DBG ("Clock set.");
256  DEBUG_DBG ("Connect to MQTT server: user %s, pass %s, topic %s",
258  //client.setServer (mqttgw_config.mqtt_server, mqttgw_config.mqtt_port);
259  if (mqtt_client.connect (clientId.c_str (), mqttgw_config.mqtt_user, mqttgw_config.mqtt_pass, gwTopic.c_str (), 0, true, "0", true)) {
260  DEBUG_WARN ("MQTT connected");
261  // Once connected, publish an announcement...
262  publishMQTT (gwTopic.c_str (), "1", 1, true);
263  // ... and resubscribe
264  String dlTopic = netName + String ("/+/set/#");
265  mqtt_client.subscribe (dlTopic.c_str ());
266  dlTopic = netName + String ("/+/get/#");
267  mqtt_client.subscribe (dlTopic.c_str ());
268  mqtt_client.setCallback (onDlData);
269  // TODO: stopConnectionFlash ();
270  } else {
271  mqtt_client.disconnect ();
272  DEBUG_ERROR ("failed, rc=%d try again in 5 seconds", mqtt_client.state ());
273 #ifdef SECURE_MQTT
274  char error[100];
275 #ifdef ESP8266
276  int errorCode = espClient.getLastSSLError (error, 100);
277 #elif defined ESP32
278  int errorCode = espClient.lastError (error, 100);
279 #endif
280  DEBUG_ERROR ("Connect error %d: %s", errorCode, error);
281 #endif
282  // Wait 5 seconds before retrying
283 #ifdef ESP32
284  const TickType_t xDelay = 5000 / portTICK_PERIOD_MS;
285  vTaskDelay (xDelay);
286 #else
287  delay (5000);
288 #endif
289  }
290  }
291 }
292 
293 char* getTopicAddress (char* topic, unsigned int& len) {
294  if (!topic)
295  return nullptr;
296 
297  char* start = strchr (topic, '/') + 1;
298  char* end;
299 
300  if (start) {
301  end = strchr (start, '/');
302  } else {
303  return nullptr;
304  }
305  //DEBUG_INFO ("Start %p : %d", start, start - topic);
306  //DEBUG_INFO ("End %p : %d", end, end - topic);
307  if (end) {
308  len = end - start;
309  } else {
310  len = strlen (topic) - (start - topic);
311  }
312 
313  return start;
314 }
315 
317  DEBUG_DBG ("Type topic: %s", data.c_str ());
318  if (data == GET_VERSION) {
320  } else if (data == GET_SLEEP) {
322  } else if (data == SET_SLEEP) {
324  } else if (data == SET_OTA) {
326  } else if (data == SET_IDENTIFY) {
327  DEBUG_WARN ("IDENTIFY MESSAGE %s", data.c_str ());
329  } else if (data == SET_RESET_CONFIG) {
330  DEBUG_WARN ("RESET CONFIG MESSAGE %s", data.c_str ());
332  } else if (data == GET_RSSI) {
333  DEBUG_INFO ("GET RSSI MESSAGE %s", data.c_str ());
335  } else if (data == SET_USER_DATA) {
336  DEBUG_INFO ("USER DATA %s", data.c_str ());
338  } else if (data == GET_USER_DATA) {
339  DEBUG_INFO ("USER DATA %s", data.c_str ());
341  } else if (data == GET_NAME) {
342  DEBUG_INFO ("GET NODE NAME AND ADDRESS");
344  } else if (data == SET_NAME) {
345  DEBUG_INFO ("SET NODE NAME %s", data.c_str ());
347  } else if (data == SET_RESTART_MCU) {
348  DEBUG_INFO ("RESET MCU");
350  } else
352 }
353 
354 control_message_type_t getTopicType (char* topic, char*& userCommand) {
355  if (!topic)
357 
358  String command;
359 
360  //Discard address
361  char* start = strchr (topic, '/') + 1;
362  if (start)
363  start = strchr (start, '/') + 1;
364  else
366  //DEBUG_INFO ("Second Start %p", start);
367  if ((int)start > 0x01) { // TODO: Why this condition ????
368  command = String (start);
369  userCommand = start;
370  } else {
372  }
373  //DEBUG_INFO ("Start %p : %d", start, start - topic);
374  //DEBUG_INFO ("Command %s", command.c_str());
375 
376  control_message_type_t msgType = checkMsgType (command);
377 
378  return msgType;
379 }
380 
381 
382 void GwOutput_MQTT::onDlData (char* topic, uint8_t* data, unsigned int len) {
383  uint8_t addr[ENIGMAIOT_ADDR_LEN];
384  char* addressStr;
385  control_message_type_t msgType;
386  char* userCommand;
387  char* nodeName = nullptr;
388 
389 
390  DEBUG_DBG ("Topic %s", topic);
391 
392  unsigned int addressLen;
393 
394  addressStr = getTopicAddress (topic, addressLen);
395 
396  if (addressStr) {
397  //DEBUG_INFO ("Len: %u", addressLen);
398  DEBUG_DBG ("Address %.*s", addressLen, addressStr);
399  if (!str2mac (addressStr, addr)) {
400  DEBUG_INFO ("Not a mac address. Treating it as a node name");
401  if (addressLen) {
402  nodeName = (char*)calloc (addressLen + 1, sizeof (char));
403  memcpy (nodeName, addressStr, addressLen);
404  } else {
405  DEBUG_WARN ("Invalid address");
406  return;
407  }
408  } else {
409  DEBUG_DBG ("Hex Address = %s", printHexBuffer (addr, 6));
410  }
411  } else
412  return;
413 
414  msgType = getTopicType (topic, userCommand);
415 
416 
417  DEBUG_DBG ("User command: %s", userCommand);
418  DEBUG_DBG ("MsgType 0x%02X", msgType);
419  DEBUG_DBG ("Data: %.*s\n", len, data);
420 
421  if (msgType != control_message_type_t::INVALID) {
422  GwOutput.downlinkCb (addr, nodeName, msgType, (char*)data, len);
423  } else {
424  DEBUG_WARN ("Invalid message");
425  }
426 
427  if (nodeName) {
428  free (nodeName);
429  nodeName = NULL;
430  }
431 }
432 
434 
435  mqtt_client.loop ();
436  if (!mqtt_client.connected ()) {
437  reconnect ();
438  } else {
439  mqtt_queue_item_t* message;
440  static time_t statusLastUpdated;
441 
442  if (!mqtt_queue.empty ()) {
443  message = getMQTTqueue ();
444  if (publishMQTT (message->topic, message->payload, message->payload_len, message->retain)) {
445  DEBUG_DBG ("MQTT published. %s %.*s", message->topic, message->payload_len, message->payload);
446  popMQTTqueue ();
447  }
448  }
449  if (millis () - statusLastUpdated > STATUS_SEND_PERIOD) {
450  statusLastUpdated = millis ();
451  publishMQTT (gwTopic.c_str (), "1", 1, true);
452  }
453  }
454 }
455 
456 bool GwOutput_MQTT::publishMQTT (const char* topic, const char* payload, size_t len, bool retain) {
457  DEBUG_INFO ("Publish MQTT. %s : %.*s", topic, len, payload);
458  if (mqtt_client.connected ()) {
459  return mqtt_client.publish (topic, (uint8_t*)payload, len, retain);
460  } else {
461  DEBUG_WARN ("MQTT client not connected");
462  return false;
463  }
464 }
465 
466 //#ifdef SECURE_MQTT
468 #if DEBUG_LEVEL >= INFO
469  DEBUG_INFO ("\nWaiting for NTP time sync: ");
470  time_t now = time (nullptr);
471  while (now < 8 * 3600 * 2) {
472  delay (500);
473  Serial.print (".");
474  now = time (nullptr);
475  }
476  //Serial.println ("");
477  struct tm timeinfo;
478  gmtime_r (&now, &timeinfo);
479  DEBUG_INFO ("Current time: %s", asctime (&timeinfo));
480 #endif
481 }
482 //#endif
483 
484 bool GwOutput_MQTT::addMQTTqueue (const char* topic, char* payload, size_t len, bool retain) {
485  mqtt_queue_item_t message;
486 
487  if (mqtt_queue.size () >= MAX_MQTT_QUEUE_SIZE) {
488  mqtt_queue.pop ();
489  }
490 
491  //message.topic = (char*)malloc (strlen (topic) + 1);
492  strncpy (message.topic, topic, MAX_MQTT_TOPIC_LEN);
493  message.payload_len = len < MAX_MQTT_PLD_LEN ? len : MAX_MQTT_PLD_LEN;
494  //message->payload = (char*)malloc (len);
495  memcpy (message.payload, payload, message.payload_len);
496  message.retain = retain;
497 
498  mqtt_queue.push (&message);
499  DEBUG_DBG ("%d MQTT messages queued Len:%d %s %.*s", mqtt_queue.size (),
500  len,
501  message.topic,
502  message.payload_len, message.payload);
503 
504  return true;
505 }
506 
508  if (mqtt_queue.size ()) {
509  DEBUG_DBG ("MQTT message got from queue");
510  return mqtt_queue.front ();
511  }
512  return nullptr;
513 }
514 
516  if (mqtt_queue.size ()) {
517  mqtt_queue_item_t* message;
518 
519  message = mqtt_queue.front ();
520  if (message) {
521  if (message->topic) {
522  //delete(message->topic);
523  message->topic[0] = 0;
524  }
525  if (message->payload) {
526  //delete(message->payload);
527  message->payload[0] = 0;
528  }
529  message->payload_len = 0;
530  //delete message;
531  }
532  mqtt_queue.pop ();
533  DEBUG_DBG ("MQTT message pop. Size %d", mqtt_queue.size ());
534  }
535 }
536 
537 #if SUPPORT_HA_DISCOVERY
538 bool GwOutput_MQTT::rawMsgSend (const char* topic, char* payload, size_t len, bool retain) {
539  bool result;
540 
541  if ((result = addMQTTqueue (topic, payload, len, retain))) {
542  DEBUG_INFO ("MQTT queued %s. Length %d", topic, len);
543  } else {
544  DEBUG_WARN ("Error queuing MQTT %s", topic);
545  }
546  return result;
547 }
548 #endif
549 
550 
551 bool GwOutput_MQTT::outputDataSend (char* address, char* data, size_t length, GwOutput_data_type_t type) {
552  const int TOPIC_SIZE = 64;
553  char topic[TOPIC_SIZE];
554  bool result;
555  switch (type) {
557  snprintf (topic, TOPIC_SIZE, "%s/%s/%s", netName.c_str (), address, NODE_DATA);
558  break;
560  snprintf (topic, TOPIC_SIZE, "%s/%s/%s", netName.c_str (), address, LOST_MESSAGES);
561  break;
563  snprintf (topic, TOPIC_SIZE, "%s/%s/%s", netName.c_str (), address, NODE_STATUS);
564  break;
565  }
566  if ((result = addMQTTqueue (topic, data, length))) {
567  DEBUG_INFO ("MQTT queued %s. Length %d", topic, length);
568  } else {
569  DEBUG_WARN ("Error queuing MQTT %s", topic);
570  }
571  return result;
572 }
573 
574 bool GwOutput_MQTT::outputControlSend (char* address, uint8_t* data, size_t length) {
575  const int TOPIC_SIZE = 64;
576  const int PAYLOAD_SIZE = 512;
577  char topic[TOPIC_SIZE];
578  char payload[PAYLOAD_SIZE];
579  size_t pld_size = 0;
580  bool result = false;
581 
582  switch (data[0]) {
584  snprintf (topic, TOPIC_SIZE, "%s/%s/%s", netName.c_str (), address, GET_VERSION_ANS);
585  if (length >= 4) {
586  pld_size = snprintf (payload, PAYLOAD_SIZE, "{\"version\":\"%d.%d.%d\"}", data[1], data[2], data[3]);
587  }
588  if (addMQTTqueue (topic, payload, pld_size)) {
589  DEBUG_INFO ("Published MQTT %s %s", topic, payload);
590  result = true;
591  }
592  break;
594  uint32_t sleepTime;
595  memcpy (&sleepTime, data + 1, sizeof (sleepTime));
596  snprintf (topic, TOPIC_SIZE, "%s/%s/%s", netName.c_str (), address, GET_SLEEP_ANS);
597  pld_size = snprintf (payload, PAYLOAD_SIZE, "{\"sleeptime\":%u}", sleepTime);
598  if (addMQTTqueue (topic, payload, pld_size)) {
599  DEBUG_INFO ("Published MQTT %s %s", topic, payload);
600  result = true;
601  }
602  break;
604  snprintf (topic, TOPIC_SIZE, "%s/%s/%s", netName.c_str (), address, SET_RESET_ANS);
605  pld_size = snprintf (payload, PAYLOAD_SIZE, "{\"result\":\"OK\"}");
606  if (addMQTTqueue (topic, payload, pld_size)) {
607  DEBUG_INFO ("Published MQTT %s %s", topic, payload);
608  result = true;
609  }
610  break;
612  snprintf (topic, TOPIC_SIZE, "%s/%s/%s", netName.c_str (), address, GET_RSSI_ANS);
613  pld_size = snprintf (payload, PAYLOAD_SIZE, "{\"rssi\":%d,\"channel\":%u}", (int8_t)data[1], data[2]);
615  if (addMQTTqueue (topic, payload, pld_size)) {
616  DEBUG_INFO ("Published MQTT %s %s", topic, payload);
617  result = true;
618  }
619  break;
621  snprintf (topic, TOPIC_SIZE, "%s/%s/%s", netName.c_str (), address, GET_NAME_ANS);
622  char addrStr[ENIGMAIOT_ADDR_LEN * 3];
623  pld_size = snprintf (payload, PAYLOAD_SIZE, "{\"name\":\"%.*s\",\"address\":\"%s\"}", length - ENIGMAIOT_ADDR_LEN - 1, (char*)(data + 1 + ENIGMAIOT_ADDR_LEN), mac2str (data + 1, addrStr));
624  if (addMQTTqueue (topic, payload, pld_size)) {
625  DEBUG_INFO ("Published MQTT %s %s", topic, payload);
626  result = true;
627  }
628  break;
630  snprintf (topic, TOPIC_SIZE, "%s/%s/%s", netName.c_str (), address, RESTART_NOTIF);
631  if (length > 1) {
632  pld_size = snprintf (payload, PAYLOAD_SIZE, "{\"reason\":%d}", (int8_t)data[1]);
633  }
634  if (addMQTTqueue (topic, payload, pld_size)) {
635  DEBUG_INFO ("Published MQTT %s %s", topic, payload);
636  result = true;
637  }
638  break;
640  snprintf (topic, TOPIC_SIZE, "%s/%s/%s", netName.c_str (), address, SET_OTA_ANS);
641  switch (data[1]) {
643  pld_size = snprintf (payload, PAYLOAD_SIZE, "{\"result\":\"OTA Started\",\"status\":%u}", data[1]);
644  break;
646  pld_size = snprintf (payload, PAYLOAD_SIZE, "{\"result\":\"OTA Start error\",\"status\":%u}", data[1]);
647  break;
649  uint16_t lastGoodIdx;
650  memcpy ((uint8_t*)&lastGoodIdx, data + 2, sizeof (uint16_t));
651  pld_size = snprintf (payload, PAYLOAD_SIZE, "{\"last_chunk\":%d,\"result\":\"OTA out of sequence error\",\"status\":%u}", lastGoodIdx, data[1]);
652  break;
654  pld_size = snprintf (payload, PAYLOAD_SIZE, "{\"result\":\"OTA check OK\",\"status\":%u}", data[1]);
655  break;
657  pld_size = snprintf (payload, PAYLOAD_SIZE, "{\"result\":\"OTA check failed\",\"status\":%u}", data[1]);
658  break;
660  pld_size = snprintf (payload, PAYLOAD_SIZE, "{\"result\":\"OTA timeout\",\"status\":%u}", data[1]);
661  break;
663  pld_size = snprintf (payload, PAYLOAD_SIZE, "{\"result\":\"OTA finished OK\",\"status\":%u}", data[1]);
664  break;
665  }
666  if (addMQTTqueue (topic, payload, pld_size)) {
667  DEBUG_INFO ("Published MQTT %s %s", topic, payload);
668  result = true;
669  }
670  break;
671  default:
672  DEBUG_WARN ("Unknown control message. Code: 0x%02X", data[0]);
673  }
674 
675  return result;
676 }
677 
678 bool GwOutput_MQTT::newNodeSend (char* address, uint16_t node_id) {
679  const int TOPIC_SIZE = 64;
680 
681  char topic[TOPIC_SIZE];
682 
683  uint8_t* nodeAddress = enigmaIotGateway->getNodes ()->getNodeFromID (node_id)->getMacAddress ();
684  char addrStr[ENIGMAIOT_ADDR_LEN * 3];
685 
686  char payload[ENIGMAIOT_ADDR_LEN * 3 + 14];
687 
688  snprintf (payload, ENIGMAIOT_ADDR_LEN * 3 + 14, "{\"address\":\"%s\"}", mac2str (nodeAddress, addrStr));
689 
690  snprintf (topic, TOPIC_SIZE, "%s/%s/hello", netName.c_str (), address);
691  bool result = addMQTTqueue (topic, payload, ENIGMAIOT_ADDR_LEN * 3 + 13);
692  DEBUG_INFO ("Published MQTT %s", topic);
693  return result;
694 }
695 
697  const int TOPIC_SIZE = 64;
698  const int PAYLOAD_SIZE = 64;
699 
700  char topic[TOPIC_SIZE];
701  char payload[PAYLOAD_SIZE];
702  size_t pld_size;
703 
704  snprintf (topic, TOPIC_SIZE, "%s/%s/bye", netName.c_str (), address);
705  pld_size = snprintf (payload, PAYLOAD_SIZE, "{\"reason\":%d}", reason);
706  bool result = addMQTTqueue (topic, payload, pld_size);
707  DEBUG_INFO ("Published MQTT %s result = %s", topic, result ? "OK" : "Fail");
708  return result;
709 }
OTA_CHECK_OK
@ OTA_CHECK_OK
Definition: NodeList.h:79
GwOutput_MQTT::nodeDisconnectedSend
bool nodeDisconnectedSend(char *address, gwInvalidateReason_t reason)
Send node disconnection notification.
Definition: GwOutput_mqtt.cpp:696
NTP_SERVER_2
#define NTP_SERVER_2
Definition: EnigmaIoTconfig.h:22
VERSION_ANS
@ VERSION_ANS
Definition: NodeList.h:53
IDENTIFY
@ IDENTIFY
Definition: NodeList.h:57
getTopicType
control_message_type_t getTopicType(char *topic, char *&userCommand)
Definition: GwOutput_mqtt.cpp:354
GET_VERSION_ANS
#define GET_VERSION_ANS
Definition: GwOutput_mqtt.h:36
GwOutput_MQTT::configManagerStart
void configManagerStart(EnigmaIOTGatewayClass *enigmaIotGw)
Called when wifi manager starts config portal.
Definition: GwOutput_mqtt.cpp:43
GwOutput_MQTT::mqttPassParam
AsyncWiFiManagerParameter * mqttPassParam
Configuration field for MQTT server password.
Definition: GwOutput_mqtt.h:90
mqtt_queue_item_t
Definition: GwOutput_mqtt.h:77
SLEEP_SET
@ SLEEP_SET
Definition: NodeList.h:55
GwOutput_MQTT::newNodeSend
bool newNodeSend(char *address, uint16_t node_id)
Send new node notification.
Definition: GwOutput_mqtt.cpp:678
RESTART_NODE
@ RESTART_NODE
Definition: NodeList.h:65
NodeList::getNodeFromName
Node * getNodeFromName(const char *name)
Gets node that correspond with given node name.
Definition: NodeList.cpp:174
GET_SLEEP
#define GET_SLEEP
Definition: GwOutput_mqtt.h:37
USERDATA_SET
@ USERDATA_SET
Definition: NodeList.h:71
ENIGMAIOT_ADDR_LEN
static const size_t ENIGMAIOT_ADDR_LEN
Address size. Mac address = 6 bytes.
Definition: EnigmaIoTconfigAdvanced.h:23
SET_OTA
#define SET_OTA
Definition: GwOutput_mqtt.h:40
SET_OTA_ANS
#define SET_OTA_ANS
Definition: GwOutput_mqtt.h:41
EnigmaIOTGatewayClass
Main gateway class. Manages communication with nodes and sends data to upper layer.
Definition: EnigmaIOTGateway.h:120
GwOutput_MQTT::mqtt_queue
EnigmaIOTRingBuffer< mqtt_queue_item_t > mqtt_queue
Definition: GwOutput_mqtt.h:93
EnigmaIOTRingBuffer::size
int size()
Returns actual number of elements that buffer holds.
Definition: EnigmaIOTRingBuffer.h:54
GwOutput_MQTT::reconnect
void reconnect()
This is called anytime MQTT client is disconnected.
Definition: GwOutput_mqtt.cpp:235
GwOutput_MQTT::onDlData
static void onDlData(char *topic, uint8_t *data, unsigned int len)
Function that processes downlink data from network to node.
Definition: GwOutput_mqtt.cpp:382
GET_RSSI
#define GET_RSSI
Definition: GwOutput_mqtt.h:45
EnigmaIOTRingBuffer::push
bool push(Telement *item)
Adds a new item to buffer, deleting older element if it is full.
Definition: EnigmaIOTRingBuffer.h:73
GwOutput_MQTT::mqttPortParam
AsyncWiFiManagerParameter * mqttPortParam
Configuration field for MQTT server port.
Definition: GwOutput_mqtt.h:88
GwOutput_MQTT::begin
bool begin()
Starts output module.
Definition: GwOutput_mqtt.cpp:203
GwOutput_MQTT::popMQTTqueue
void popMQTTqueue()
Deletes next item in the queue.
Definition: GwOutput_mqtt.cpp:515
str2mac
uint8_t * str2mac(const char *macAddrString, uint8_t *macBytes)
Debug helper function that creates MAC address byte array from text representation.
Definition: helperFunctions.cpp:104
SET_NAME
#define SET_NAME
Definition: GwOutput_mqtt.h:48
TZINFO
#define TZINFO
Time zone.
Definition: EnigmaIoTconfig.h:20
SET_SLEEP
#define SET_SLEEP
Definition: GwOutput_mqtt.h:39
GwOutput_MQTT::mqttServerParam
AsyncWiFiManagerParameter * mqttServerParam
Configuration field for MQTT server address.
Definition: GwOutput_mqtt.h:87
mqttgw_config_t::mqtt_server
char mqtt_server[41]
Definition: GwOutput_mqtt.h:64
EnigmaIOTGatewayClass::getNetworkName
char * getNetworkName()
Gets EnigmaIOT network name.
Definition: EnigmaIOTGateway.h:349
RESTART_NOTIF
#define RESTART_NOTIF
Definition: GwOutput_mqtt.h:52
OTA
@ OTA
Definition: NodeList.h:68
NODE_DATA
#define NODE_DATA
Definition: GwOutput_mqtt.h:53
RESTART_CONFIRM
@ RESTART_CONFIRM
Definition: NodeList.h:66
Node::setRSSI
void setRSSI(int8_t rssi)
Stores last RSSI measurement of Gateway.
Definition: NodeList.h:426
GatewayOutput_generic::downlinkCb
onDlData_t downlinkCb
downlink processing function handle
Definition: GwOutput_generic.h:34
RESET
@ RESET
Definition: NodeList.h:58
printHexBuffer
char * printHexBuffer(const uint8_t *buffer, uint16_t len)
Debug helper function that generates a string that represent a buffer hexadecimal values.
Definition: helperFunctions.cpp:16
OTA_TIMEOUT
@ OTA_TIMEOUT
Definition: NodeList.h:82
GET_NAME
#define GET_NAME
Definition: GwOutput_mqtt.h:47
SET_RESET_CONFIG
#define SET_RESET_CONFIG
Definition: GwOutput_mqtt.h:43
SET_RESTART_MCU
#define SET_RESTART_MCU
Definition: GwOutput_mqtt.h:57
RSSI_GET
@ RSSI_GET
Definition: NodeList.h:60
GwOutput_MQTT::loop
void loop()
Should be called regularly for module management.
Definition: GwOutput_mqtt.cpp:433
EnigmaIOTGatewayClass::addWiFiManagerParameter
void addWiFiManagerParameter(AsyncWiFiManagerParameter *p)
Adds a parameter to configuration portal.
Definition: EnigmaIOTGateway.h:368
INVALID
@ INVALID
Definition: NodeList.h:72
OTA_START_ERROR
@ OTA_START_ERROR
Definition: NodeList.h:78
OTA_FINISHED
@ OTA_FINISHED
Definition: NodeList.h:83
GET_USER_DATA
#define GET_USER_DATA
Definition: GwOutput_mqtt.h:51
mqtt_queue_item_t::retain
bool retain
Definition: GwOutput_mqtt.h:81
GW_STATUS
#define GW_STATUS
Definition: GwOutput_mqtt.h:56
RSSI_ANS
@ RSSI_ANS
Definition: NodeList.h:61
EnigmaIOTRingBuffer::empty
bool empty()
Checks if buffer is empty.
Definition: EnigmaIOTRingBuffer.h:66
GET_SLEEP_ANS
#define GET_SLEEP_ANS
Definition: GwOutput_mqtt.h:38
GwOutput_MQTT::outputDataSend
bool outputDataSend(char *address, char *data, size_t length, GwOutput_data_type_t type=data)
Send data from nodes.
Definition: GwOutput_mqtt.cpp:551
MAX_MQTT_TOPIC_LEN
constexpr auto MAX_MQTT_TOPIC_LEN
Definition: GwOutput_mqtt.h:74
SLEEP_GET
@ SLEEP_GET
Definition: NodeList.h:54
EnigmaIOTRingBuffer::pop
bool pop()
Deletes older item from buffer, if buffer is not empty.
Definition: EnigmaIOTRingBuffer.h:106
mqtt_queue_item_t::payload
char payload[MAX_MQTT_PLD_LEN]
Definition: GwOutput_mqtt.h:79
GET_RSSI_ANS
#define GET_RSSI_ANS
Definition: GwOutput_mqtt.h:46
GwOutput_MQTT::mqttgw_config
mqttgw_config_t mqttgw_config
MQTT server configuration data.
Definition: GwOutput_mqtt.h:95
EnigmaIOTRingBuffer::front
Telement * front()
Gets a pointer to older item in buffer, if buffer is not empty.
Definition: EnigmaIOTRingBuffer.h:125
SET_IDENTIFY
#define SET_IDENTIFY
Definition: GwOutput_mqtt.h:42
GwOutput_MQTT::setClock
void setClock()
Synchronizes time over NTP to check certifitate expiration time.
Definition: GwOutput_mqtt.cpp:467
USERDATA_GET
@ USERDATA_GET
Definition: NodeList.h:70
GET_NAME_ANS
#define GET_NAME_ANS
Definition: GwOutput_mqtt.h:49
NTP_SERVER_1
#define NTP_SERVER_1
Definition: EnigmaIoTconfig.h:21
EnigmaIOTGatewayClass::getShouldSave
bool getShouldSave()
Gets flag that indicates if configuration should be saved.
Definition: EnigmaIOTGateway.cpp:55
GwOutput_MQTT
Definition: GwOutput_mqtt.h:85
OTA_OUT_OF_SEQUENCE
@ OTA_OUT_OF_SEQUENCE
Definition: NodeList.h:81
MQTT_BUFFER_SIZE
#define MQTT_BUFFER_SIZE
Definition: GwOutput_mqtt.h:32
NAME_ANS
@ NAME_ANS
Definition: NodeList.h:63
EnigmaIOTGatewayClass::getNodes
NodeList * getNodes()
Gets nodes data structure.
Definition: EnigmaIOTGateway.h:600
GwOutput
GwOutput_MQTT GwOutput
Definition: GwOutput_mqtt.cpp:41
EnigmaIOTGateway
EnigmaIOTGatewayClass EnigmaIOTGateway
Definition: EnigmaIOTGateway.cpp:2050
GwOutput_MQTT::configManagerExit
void configManagerExit(bool status)
Called when wifi manager exits config portal.
Definition: GwOutput_mqtt.cpp:166
GwOutput_MQTT::getMQTTqueue
mqtt_queue_item_t * getMQTTqueue()
Gets next item in the queue.
Definition: GwOutput_mqtt.cpp:507
mqtt_queue_item_t::topic
char topic[MAX_MQTT_TOPIC_LEN]
Definition: GwOutput_mqtt.h:78
GwOutput_data_type_t
enum GwOutput_data_type GwOutput_data_type_t
GatewayOutput_generic::clientId
String clientId
MQTT clientId.
Definition: GwOutput_generic.h:44
GwOutput_MQTT::outputControlSend
bool outputControlSend(char *address, uint8_t *data, size_t length)
Send control data from nodes.
Definition: GwOutput_mqtt.cpp:574
STATUS_SEND_PERIOD
const time_t STATUS_SEND_PERIOD
Definition: GwOutput_mqtt.h:59
GwOutput_mqtt.h
MQTT Gateway output module.
GatewayOutput_generic::gwTopic
String gwTopic
MQTT topic for gateway.
Definition: GwOutput_generic.h:45
mqttgw_config_t::mqtt_user
char mqtt_user[21]
Definition: GwOutput_mqtt.h:70
GwOutput_MQTT::mqttUserParam
AsyncWiFiManagerParameter * mqttUserParam
Configuration field for MQTT server user name.
Definition: GwOutput_mqtt.h:89
GwOutput_MQTT::espClient
WiFiClient espClient
TCP client.
Definition: GwOutput_mqtt.h:104
checkMsgType
control_message_type_t checkMsgType(String data)
Definition: GwOutput_mqtt.cpp:316
GatewayOutput_generic::enigmaIotGateway
EnigmaIOTGatewayClass * enigmaIotGateway
Pointer to EnigmaIOT gateway instance.
Definition: GwOutput_generic.h:33
RESET_ANS
@ RESET_ANS
Definition: NodeList.h:59
GET_VERSION
#define GET_VERSION
Definition: GwOutput_mqtt.h:35
Node::getMacAddress
uint8_t * getMacAddress()
Gets address from Node.
Definition: NodeList.h:128
VERSION
@ VERSION
Definition: NodeList.h:52
mqttgw_config_t::mqtt_pass
char mqtt_pass[41]
Definition: GwOutput_mqtt.h:71
OTA_ANS
@ OTA_ANS
Definition: NodeList.h:69
NAME_GET
@ NAME_GET
Definition: NodeList.h:62
GwOutput_MQTT::addMQTTqueue
bool addMQTTqueue(const char *topic, char *payload, size_t len, bool retain=false)
Add MQTT message to queue.
Definition: GwOutput_mqtt.cpp:484
data
@ data
Definition: GwOutput_generic.h:23
GatewayOutput_generic::netName
String netName
EnigmaIOT network name.
Definition: GwOutput_generic.h:43
lostmessages
@ lostmessages
Definition: GwOutput_generic.h:24
mqttgw_config_t::mqtt_port
int mqtt_port
Definition: GwOutput_mqtt.h:68
MAX_MQTT_PLD_LEN
constexpr auto MAX_MQTT_PLD_LEN
Definition: GwOutput_mqtt.h:75
mqtt_queue_item_t::payload_len
size_t payload_len
Definition: GwOutput_mqtt.h:80
GwOutput_MQTT::mqtt_client
PubSubClient mqtt_client
MQTT client.
Definition: GwOutput_mqtt.h:106
GwOutput_MQTT::saveConfig
bool saveConfig()
Saves output module configuration.
Definition: GwOutput_mqtt.cpp:59
mac2str
char * mac2str(const uint8_t *mac, char *extBuffer)
Debug helper function that generates a string that represent a MAC address.
Definition: helperFunctions.cpp:85
helperFunctions.h
Auxiliary function definition.
NodeList::getNodeFromID
Node * getNodeFromID(uint16_t nodeId)
Gets node that correspond with given nodeId.
Definition: NodeList.cpp:135
OTA_CHECK_FAIL
@ OTA_CHECK_FAIL
Definition: NodeList.h:80
MAX_MQTT_QUEUE_SIZE
static const size_t MAX_MQTT_QUEUE_SIZE
Maximum number of MQTT messages to be sent.
Definition: EnigmaIoTconfig.h:27
NODE_STATUS
#define NODE_STATUS
Definition: GwOutput_mqtt.h:55
gwInvalidateReason_t
gwInvalidateReason_t
Key invalidation reason definition.
Definition: EnigmaIOTGateway.h:75
CONFIG_FILE
constexpr auto CONFIG_FILE
Custom configuration file name.
Definition: ButtonController.cpp:11
EnigmaIOTdebug.h
Auxiliary functions for debugging over Serial.
getTopicAddress
char * getTopicAddress(char *topic, unsigned int &len)
Definition: GwOutput_mqtt.cpp:293
control_message_type_t
enum control_message_type control_message_type_t
NAME_SET
@ NAME_SET
Definition: NodeList.h:64
GwOutput_MQTT::loadConfig
bool loadConfig()
Loads output module configuration.
Definition: GwOutput_mqtt.cpp:101
SLEEP_ANS
@ SLEEP_ANS
Definition: NodeList.h:56
status
@ status
Definition: GwOutput_generic.h:25
LOST_MESSAGES
#define LOST_MESSAGES
Definition: GwOutput_mqtt.h:54
SET_USER_DATA
#define SET_USER_DATA
Definition: GwOutput_mqtt.h:50
OTA_STARTED
@ OTA_STARTED
Definition: NodeList.h:77
GwOutput_MQTT::publishMQTT
bool publishMQTT(const char *topic, const char *payload, size_t len, bool retain=false)
Publishes data over MQTT.
Definition: GwOutput_mqtt.cpp:456
SET_RESET_ANS
#define SET_RESET_ANS
Definition: GwOutput_mqtt.h:44